diff options
Diffstat (limited to 'src/cloud_trace_processor/orchestrator_impl.cc')
-rw-r--r-- | src/cloud_trace_processor/orchestrator_impl.cc | 210 |
1 files changed, 210 insertions, 0 deletions
diff --git a/src/cloud_trace_processor/orchestrator_impl.cc b/src/cloud_trace_processor/orchestrator_impl.cc new file mode 100644 index 000000000..69b831fad --- /dev/null +++ b/src/cloud_trace_processor/orchestrator_impl.cc @@ -0,0 +1,210 @@ +/* + * Copyright (C) 2023 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "src/cloud_trace_processor/orchestrator_impl.h" + +#include <memory> +#include <string> +#include <utility> +#include <vector> + +#include "perfetto/base/status.h" +#include "perfetto/ext/base/flat_hash_map.h" +#include "perfetto/ext/base/status_or.h" +#include "perfetto/ext/base/threading/future.h" +#include "perfetto/ext/base/threading/stream.h" +#include "perfetto/ext/cloud_trace_processor/worker.h" +#include "protos/perfetto/cloud_trace_processor/common.pb.h" +#include "protos/perfetto/cloud_trace_processor/orchestrator.pb.h" +#include "protos/perfetto/cloud_trace_processor/worker.pb.h" +#include "src/trace_processor/util/status_macros.h" + +namespace perfetto { +namespace cloud_trace_processor { +namespace { + +base::Future<base::Status> CreateResponseToStatus( + base::StatusOr<protos::TracePoolShardCreateResponse> response_or) { + return response_or.status(); +} + +base::Future<base::Status> SetTracesResponseToStatus( + base::StatusOr<protos::TracePoolShardSetTracesResponse> response_or) { + return response_or.status(); +} + +base::Future<base::StatusOr<protos::TracePoolQueryResponse>> +RpcResponseToPoolResponse( + base::StatusOr<protos::TracePoolShardQueryResponse> resp) { + RETURN_IF_ERROR(resp.status()); + protos::TracePoolQueryResponse ret; + ret.set_trace(std::move(resp->trace())); + *ret.mutable_result() = std::move(*resp->mutable_result()); + return ret; +} + +base::StatusOrStream<protos::TracePoolShardSetTracesResponse> +RoundRobinSetTraces(const std::vector<std::unique_ptr<Worker>>& workers, + const std::vector<std::string>& traces) { + uint32_t worker_idx = 0; + std::vector<protos::TracePoolShardSetTracesArgs> protos; + protos.resize(workers.size()); + for (const auto& trace : traces) { + protos[worker_idx].add_traces(trace); + worker_idx = (worker_idx + 1) % workers.size(); + } + + using ShardResponse = protos::TracePoolShardSetTracesResponse; + std::vector<base::StatusOrStream<ShardResponse>> streams; + for (uint32_t i = 0; i < protos.size(); ++i) { + streams.emplace_back(workers[i]->TracePoolShardSetTraces(protos[i])); + } + return base::FlattenStreams(std::move(streams)); +} +} // namespace + +Orchestrator::~Orchestrator() = default; + +std::unique_ptr<Orchestrator> Orchestrator::CreateInProcess( + std::vector<std::unique_ptr<Worker>> workers) { + return std::unique_ptr<Orchestrator>( + new OrchestratorImpl(std::move(workers))); +} + +OrchestratorImpl::OrchestratorImpl(std::vector<std::unique_ptr<Worker>> workers) + : workers_(std::move(workers)) {} + +base::StatusOrFuture<protos::TracePoolCreateResponse> +OrchestratorImpl::TracePoolCreate(const protos::TracePoolCreateArgs& args) { + if (args.pool_type() != protos::TracePoolType::SHARED) { + return base::StatusOr<protos::TracePoolCreateResponse>( + base::ErrStatus("Currently only SHARED pools are supported")); + } + if (!args.has_shared_pool_name()) { + return base::StatusOr<protos::TracePoolCreateResponse>( + base::ErrStatus("Pool name must be provided for SHARED pools")); + } + + std::string id = "shared:" + args.shared_pool_name(); + TracePool* exist = pools_.Find(id); + if (exist) { + return base::StatusOr<protos::TracePoolCreateResponse>( + base::ErrStatus("Pool %s already exists", id.c_str())); + } + protos::TracePoolShardCreateArgs group_args; + group_args.set_pool_id(id); + group_args.set_pool_type(args.pool_type()); + + using ShardResponse = protos::TracePoolShardCreateResponse; + std::vector<base::StatusOrStream<ShardResponse>> shards; + for (uint32_t i = 0; i < workers_.size(); ++i) { + shards.emplace_back( + base::StreamFromFuture(workers_[i]->TracePoolShardCreate(group_args))); + } + return base::FlattenStreams(std::move(shards)) + .MapFuture(&CreateResponseToStatus) + .Collect(base::AllOkCollector()) + .ContinueWith( + [this, id](base::StatusOr<ShardResponse> resp) + -> base::StatusOrFuture<protos::TracePoolCreateResponse> { + RETURN_IF_ERROR(resp.status()); + auto it_and_inserted = pools_.Insert(id, TracePool()); + if (!it_and_inserted.second) { + return base::ErrStatus("Unable to insert pool %s", id.c_str()); + } + return protos::TracePoolCreateResponse(); + }); +} + +base::StatusOrFuture<protos::TracePoolSetTracesResponse> +OrchestratorImpl::TracePoolSetTraces( + const protos::TracePoolSetTracesArgs& args) { + std::string id = args.pool_id(); + TracePool* pool = pools_.Find(id); + if (!pool) { + return base::StatusOr<protos::TracePoolSetTracesResponse>( + base::ErrStatus("Unable to find pool %s", id.c_str())); + } + if (!pool->loaded_traces.empty()) { + return base::StatusOr<protos::TracePoolSetTracesResponse>(base::ErrStatus( + "Incrementally adding/removing items to pool not currently supported")); + } + pool->loaded_traces.assign(args.traces().begin(), args.traces().end()); + return RoundRobinSetTraces(workers_, pool->loaded_traces) + .MapFuture(&SetTracesResponseToStatus) + .Collect(base::AllOkCollector()) + .ContinueWith( + [](base::Status status) + -> base::StatusOrFuture<protos::TracePoolSetTracesResponse> { + RETURN_IF_ERROR(status); + return protos::TracePoolSetTracesResponse(); + }); +} + +base::StatusOrStream<protos::TracePoolQueryResponse> +OrchestratorImpl::TracePoolQuery(const protos::TracePoolQueryArgs& args) { + TracePool* pool = pools_.Find(args.pool_id()); + if (!pool) { + return base::StreamOf(base::StatusOr<protos::TracePoolQueryResponse>( + base::ErrStatus("Unable to find pool %s", args.pool_id().c_str()))); + } + protos::TracePoolShardQueryArgs shard_args; + *shard_args.mutable_pool_id() = args.pool_id(); + *shard_args.mutable_sql_query() = args.sql_query(); + + using ShardResponse = protos::TracePoolShardQueryResponse; + std::vector<base::StatusOrStream<ShardResponse>> streams; + for (uint32_t i = 0; i < workers_.size(); ++i) { + streams.emplace_back(workers_[i]->TracePoolShardQuery(shard_args)); + } + return base::FlattenStreams(std::move(streams)) + .MapFuture(&RpcResponseToPoolResponse); +} + +base::StatusOrFuture<protos::TracePoolDestroyResponse> +OrchestratorImpl::TracePoolDestroy(const protos::TracePoolDestroyArgs& args) { + std::string id = args.pool_id(); + TracePool* pool = pools_.Find(id); + if (!pool) { + return base::StatusOr<protos::TracePoolDestroyResponse>( + base::ErrStatus("Unable to find pool %s", id.c_str())); + } + protos::TracePoolShardDestroyArgs shard_args; + *shard_args.mutable_pool_id() = id; + + using ShardResponse = protos::TracePoolShardDestroyResponse; + std::vector<base::StatusOrStream<ShardResponse>> streams; + for (uint32_t i = 0; i < workers_.size(); ++i) { + streams.emplace_back( + base::StreamFromFuture(workers_[i]->TracePoolShardDestroy(shard_args))); + } + return base::FlattenStreams(std::move(streams)) + .MapFuture( + [](base::StatusOr<ShardResponse> resp) -> base::Future<base::Status> { + return resp.status(); + }) + .Collect(base::AllOkCollector()) + .ContinueWith( + [this, id](base::Status status) + -> base::StatusOrFuture<protos::TracePoolDestroyResponse> { + RETURN_IF_ERROR(status); + PERFETTO_CHECK(pools_.Erase(id)); + return protos::TracePoolDestroyResponse(); + }); +} + +} // namespace cloud_trace_processor +} // namespace perfetto |