diff options
Diffstat (limited to 'src/cloud_trace_processor/worker_impl.cc')
-rw-r--r-- | src/cloud_trace_processor/worker_impl.cc | 118 |
1 files changed, 118 insertions, 0 deletions
diff --git a/src/cloud_trace_processor/worker_impl.cc b/src/cloud_trace_processor/worker_impl.cc new file mode 100644 index 000000000..6f115601d --- /dev/null +++ b/src/cloud_trace_processor/worker_impl.cc @@ -0,0 +1,118 @@ +/* + * Copyright (C) 2022 The Android Open Source Project + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "src/cloud_trace_processor/worker_impl.h" + +#include <memory> + +#include "perfetto/base/status.h" +#include "perfetto/ext/base/status_or.h" +#include "perfetto/ext/base/threading/stream.h" +#include "perfetto/ext/base/uuid.h" +#include "protos/perfetto/cloud_trace_processor/common.pb.h" +#include "protos/perfetto/cloud_trace_processor/orchestrator.pb.h" +#include "protos/perfetto/cloud_trace_processor/worker.pb.h" +#include "src/cloud_trace_processor/trace_processor_wrapper.h" +#include "src/trace_processor/util/status_macros.h" + +namespace perfetto { +namespace cloud_trace_processor { + +Worker::~Worker() = default; + +std::unique_ptr<Worker> Worker::CreateInProcesss(CtpEnvironment* environment, + base::ThreadPool* pool) { + return std::make_unique<WorkerImpl>(environment, pool); +} + +WorkerImpl::WorkerImpl(CtpEnvironment* environment, base::ThreadPool* pool) + : environment_(environment), thread_pool_(pool) {} + +base::StatusOrFuture<protos::TracePoolShardCreateResponse> +WorkerImpl::TracePoolShardCreate(const protos::TracePoolShardCreateArgs& args) { + if (args.pool_type() == protos::TracePoolType::DEDICATED) { + return base::ErrStatus("Dedicated pools are not currently supported"); + } + auto it_and_inserted = shards_.Insert(args.pool_id(), TracePoolShard()); + if (!it_and_inserted.second) { + return base::ErrStatus("Shard for pool %s already exists", + args.pool_id().c_str()); + } + return base::StatusOr(protos::TracePoolShardCreateResponse()); +} + +base::StatusOrStream<protos::TracePoolShardSetTracesResponse> +WorkerImpl::TracePoolShardSetTraces( + const protos::TracePoolShardSetTracesArgs& args) { + using Response = protos::TracePoolShardSetTracesResponse; + using StatusOrResponse = base::StatusOr<Response>; + + TracePoolShard* shard = shards_.Find(args.pool_id()); + if (!shard) { + return base::StreamOf<StatusOrResponse>(base::ErrStatus( + "Unable to find shard for pool %s", args.pool_id().c_str())); + } + + std::vector<base::StatusOrStream<Response>> streams; + for (const std::string& trace : args.traces()) { + // TODO(lalitm): add support for stateful trace processor in dedicated + // pools. + auto tp = std::make_unique<TraceProcessorWrapper>( + trace, thread_pool_, TraceProcessorWrapper::Statefulness::kStateless); + auto load_trace_future = + tp->LoadTrace(environment_->ReadFile(trace)) + .ContinueWith( + [trace](base::Status status) -> base::Future<StatusOrResponse> { + RETURN_IF_ERROR(status); + protos::TracePoolShardSetTracesResponse resp; + *resp.mutable_trace() = trace; + return resp; + }); + streams.emplace_back(base::StreamFromFuture(std::move(load_trace_future))); + shard->tps.emplace_back(std::move(tp)); + } + return base::FlattenStreams(std::move(streams)); +} + +base::StatusOrStream<protos::TracePoolShardQueryResponse> +WorkerImpl::TracePoolShardQuery(const protos::TracePoolShardQueryArgs& args) { + using Response = protos::TracePoolShardQueryResponse; + using StatusOrResponse = base::StatusOr<Response>; + TracePoolShard* shard = shards_.Find(args.pool_id()); + if (!shard) { + return base::StreamOf<StatusOrResponse>(base::ErrStatus( + "Unable to find shard for pool %s", args.pool_id().c_str())); + } + std::vector<base::StatusOrStream<Response>> streams; + streams.reserve(shard->tps.size()); + for (std::unique_ptr<TraceProcessorWrapper>& tp : shard->tps) { + streams.emplace_back(tp->Query(args.sql_query())); + } + return base::FlattenStreams(std::move(streams)); +} + +base::StatusOrFuture<protos::TracePoolShardDestroyResponse> +WorkerImpl::TracePoolShardDestroy( + const protos::TracePoolShardDestroyArgs& args) { + if (!shards_.Erase(args.pool_id())) { + return base::ErrStatus("Unable to find shard for pool %s", + args.pool_id().c_str()); + } + return base::StatusOr(protos::TracePoolShardDestroyResponse()); +} + +} // namespace cloud_trace_processor +} // namespace perfetto |