aboutsummaryrefslogtreecommitdiff
path: root/src/cloud_trace_processor/orchestrator_impl.cc
diff options
context:
space:
mode:
Diffstat (limited to 'src/cloud_trace_processor/orchestrator_impl.cc')
-rw-r--r--src/cloud_trace_processor/orchestrator_impl.cc210
1 files changed, 210 insertions, 0 deletions
diff --git a/src/cloud_trace_processor/orchestrator_impl.cc b/src/cloud_trace_processor/orchestrator_impl.cc
new file mode 100644
index 000000000..69b831fad
--- /dev/null
+++ b/src/cloud_trace_processor/orchestrator_impl.cc
@@ -0,0 +1,210 @@
+/*
+ * Copyright (C) 2023 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "src/cloud_trace_processor/orchestrator_impl.h"
+
+#include <memory>
+#include <string>
+#include <utility>
+#include <vector>
+
+#include "perfetto/base/status.h"
+#include "perfetto/ext/base/flat_hash_map.h"
+#include "perfetto/ext/base/status_or.h"
+#include "perfetto/ext/base/threading/future.h"
+#include "perfetto/ext/base/threading/stream.h"
+#include "perfetto/ext/cloud_trace_processor/worker.h"
+#include "protos/perfetto/cloud_trace_processor/common.pb.h"
+#include "protos/perfetto/cloud_trace_processor/orchestrator.pb.h"
+#include "protos/perfetto/cloud_trace_processor/worker.pb.h"
+#include "src/trace_processor/util/status_macros.h"
+
+namespace perfetto {
+namespace cloud_trace_processor {
+namespace {
+
+base::Future<base::Status> CreateResponseToStatus(
+ base::StatusOr<protos::TracePoolShardCreateResponse> response_or) {
+ return response_or.status();
+}
+
+base::Future<base::Status> SetTracesResponseToStatus(
+ base::StatusOr<protos::TracePoolShardSetTracesResponse> response_or) {
+ return response_or.status();
+}
+
+base::Future<base::StatusOr<protos::TracePoolQueryResponse>>
+RpcResponseToPoolResponse(
+ base::StatusOr<protos::TracePoolShardQueryResponse> resp) {
+ RETURN_IF_ERROR(resp.status());
+ protos::TracePoolQueryResponse ret;
+ ret.set_trace(std::move(resp->trace()));
+ *ret.mutable_result() = std::move(*resp->mutable_result());
+ return ret;
+}
+
+base::StatusOrStream<protos::TracePoolShardSetTracesResponse>
+RoundRobinSetTraces(const std::vector<std::unique_ptr<Worker>>& workers,
+ const std::vector<std::string>& traces) {
+ uint32_t worker_idx = 0;
+ std::vector<protos::TracePoolShardSetTracesArgs> protos;
+ protos.resize(workers.size());
+ for (const auto& trace : traces) {
+ protos[worker_idx].add_traces(trace);
+ worker_idx = (worker_idx + 1) % workers.size();
+ }
+
+ using ShardResponse = protos::TracePoolShardSetTracesResponse;
+ std::vector<base::StatusOrStream<ShardResponse>> streams;
+ for (uint32_t i = 0; i < protos.size(); ++i) {
+ streams.emplace_back(workers[i]->TracePoolShardSetTraces(protos[i]));
+ }
+ return base::FlattenStreams(std::move(streams));
+}
+} // namespace
+
+Orchestrator::~Orchestrator() = default;
+
+std::unique_ptr<Orchestrator> Orchestrator::CreateInProcess(
+ std::vector<std::unique_ptr<Worker>> workers) {
+ return std::unique_ptr<Orchestrator>(
+ new OrchestratorImpl(std::move(workers)));
+}
+
+OrchestratorImpl::OrchestratorImpl(std::vector<std::unique_ptr<Worker>> workers)
+ : workers_(std::move(workers)) {}
+
+base::StatusOrFuture<protos::TracePoolCreateResponse>
+OrchestratorImpl::TracePoolCreate(const protos::TracePoolCreateArgs& args) {
+ if (args.pool_type() != protos::TracePoolType::SHARED) {
+ return base::StatusOr<protos::TracePoolCreateResponse>(
+ base::ErrStatus("Currently only SHARED pools are supported"));
+ }
+ if (!args.has_shared_pool_name()) {
+ return base::StatusOr<protos::TracePoolCreateResponse>(
+ base::ErrStatus("Pool name must be provided for SHARED pools"));
+ }
+
+ std::string id = "shared:" + args.shared_pool_name();
+ TracePool* exist = pools_.Find(id);
+ if (exist) {
+ return base::StatusOr<protos::TracePoolCreateResponse>(
+ base::ErrStatus("Pool %s already exists", id.c_str()));
+ }
+ protos::TracePoolShardCreateArgs group_args;
+ group_args.set_pool_id(id);
+ group_args.set_pool_type(args.pool_type());
+
+ using ShardResponse = protos::TracePoolShardCreateResponse;
+ std::vector<base::StatusOrStream<ShardResponse>> shards;
+ for (uint32_t i = 0; i < workers_.size(); ++i) {
+ shards.emplace_back(
+ base::StreamFromFuture(workers_[i]->TracePoolShardCreate(group_args)));
+ }
+ return base::FlattenStreams(std::move(shards))
+ .MapFuture(&CreateResponseToStatus)
+ .Collect(base::AllOkCollector())
+ .ContinueWith(
+ [this, id](base::StatusOr<ShardResponse> resp)
+ -> base::StatusOrFuture<protos::TracePoolCreateResponse> {
+ RETURN_IF_ERROR(resp.status());
+ auto it_and_inserted = pools_.Insert(id, TracePool());
+ if (!it_and_inserted.second) {
+ return base::ErrStatus("Unable to insert pool %s", id.c_str());
+ }
+ return protos::TracePoolCreateResponse();
+ });
+}
+
+base::StatusOrFuture<protos::TracePoolSetTracesResponse>
+OrchestratorImpl::TracePoolSetTraces(
+ const protos::TracePoolSetTracesArgs& args) {
+ std::string id = args.pool_id();
+ TracePool* pool = pools_.Find(id);
+ if (!pool) {
+ return base::StatusOr<protos::TracePoolSetTracesResponse>(
+ base::ErrStatus("Unable to find pool %s", id.c_str()));
+ }
+ if (!pool->loaded_traces.empty()) {
+ return base::StatusOr<protos::TracePoolSetTracesResponse>(base::ErrStatus(
+ "Incrementally adding/removing items to pool not currently supported"));
+ }
+ pool->loaded_traces.assign(args.traces().begin(), args.traces().end());
+ return RoundRobinSetTraces(workers_, pool->loaded_traces)
+ .MapFuture(&SetTracesResponseToStatus)
+ .Collect(base::AllOkCollector())
+ .ContinueWith(
+ [](base::Status status)
+ -> base::StatusOrFuture<protos::TracePoolSetTracesResponse> {
+ RETURN_IF_ERROR(status);
+ return protos::TracePoolSetTracesResponse();
+ });
+}
+
+base::StatusOrStream<protos::TracePoolQueryResponse>
+OrchestratorImpl::TracePoolQuery(const protos::TracePoolQueryArgs& args) {
+ TracePool* pool = pools_.Find(args.pool_id());
+ if (!pool) {
+ return base::StreamOf(base::StatusOr<protos::TracePoolQueryResponse>(
+ base::ErrStatus("Unable to find pool %s", args.pool_id().c_str())));
+ }
+ protos::TracePoolShardQueryArgs shard_args;
+ *shard_args.mutable_pool_id() = args.pool_id();
+ *shard_args.mutable_sql_query() = args.sql_query();
+
+ using ShardResponse = protos::TracePoolShardQueryResponse;
+ std::vector<base::StatusOrStream<ShardResponse>> streams;
+ for (uint32_t i = 0; i < workers_.size(); ++i) {
+ streams.emplace_back(workers_[i]->TracePoolShardQuery(shard_args));
+ }
+ return base::FlattenStreams(std::move(streams))
+ .MapFuture(&RpcResponseToPoolResponse);
+}
+
+base::StatusOrFuture<protos::TracePoolDestroyResponse>
+OrchestratorImpl::TracePoolDestroy(const protos::TracePoolDestroyArgs& args) {
+ std::string id = args.pool_id();
+ TracePool* pool = pools_.Find(id);
+ if (!pool) {
+ return base::StatusOr<protos::TracePoolDestroyResponse>(
+ base::ErrStatus("Unable to find pool %s", id.c_str()));
+ }
+ protos::TracePoolShardDestroyArgs shard_args;
+ *shard_args.mutable_pool_id() = id;
+
+ using ShardResponse = protos::TracePoolShardDestroyResponse;
+ std::vector<base::StatusOrStream<ShardResponse>> streams;
+ for (uint32_t i = 0; i < workers_.size(); ++i) {
+ streams.emplace_back(
+ base::StreamFromFuture(workers_[i]->TracePoolShardDestroy(shard_args)));
+ }
+ return base::FlattenStreams(std::move(streams))
+ .MapFuture(
+ [](base::StatusOr<ShardResponse> resp) -> base::Future<base::Status> {
+ return resp.status();
+ })
+ .Collect(base::AllOkCollector())
+ .ContinueWith(
+ [this, id](base::Status status)
+ -> base::StatusOrFuture<protos::TracePoolDestroyResponse> {
+ RETURN_IF_ERROR(status);
+ PERFETTO_CHECK(pools_.Erase(id));
+ return protos::TracePoolDestroyResponse();
+ });
+}
+
+} // namespace cloud_trace_processor
+} // namespace perfetto