aboutsummaryrefslogtreecommitdiff
path: root/ui/src/common/engine.ts
blob: 8b6cb465ccb960aeeabf715069ed1b722d3a6d18 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
// Copyright (C) 2018 The Android Open Source Project
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//      http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

import {defer, Deferred} from '../base/deferred';
import {assertExists, assertTrue} from '../base/logging';
import {perfetto} from '../gen/protos';

import {ProtoRingBuffer} from './proto_ring_buffer';
import {
  ComputeMetricArgs,
  ComputeMetricResult,
  DisableAndReadMetatraceResult,
  QueryArgs,
  ResetTraceProcessorArgs,
} from './protos';
import {NUM, NUM_NULL, STR} from './query_result';
import {
  createQueryResult,
  QueryError,
  QueryResult,
  WritableQueryResult,
} from './query_result';
import {TimeSpan} from './time';

import TraceProcessorRpc = perfetto.protos.TraceProcessorRpc;
import TraceProcessorRpcStream = perfetto.protos.TraceProcessorRpcStream;
import TPM = perfetto.protos.TraceProcessorRpc.TraceProcessorMethod;

export interface LoadingTracker {
  beginLoading(): void;
  endLoading(): void;
}

export class NullLoadingTracker implements LoadingTracker {
  beginLoading(): void {}
  endLoading(): void {}
}


// This is used to skip the decoding of queryResult from protobufjs and deal
// with it ourselves. See the comment below around `QueryResult.decode = ...`.
interface QueryResultBypass {
  rawQueryResult: Uint8Array;
}

export interface TraceProcessorConfig {
  cropTrackEvents: boolean;
  ingestFtraceInRawTable: boolean;
  analyzeTraceProtoContent: boolean;
}

// Abstract interface of a trace proccessor.
// This is the TypeScript equivalent of src/trace_processor/rpc.h.
// There are two concrete implementations:
//   1. WasmEngineProxy: creates a Wasm module and interacts over postMessage().
//   2. HttpRpcEngine: connects to an external `trace_processor_shell --httpd`.
//      and interacts via fetch().
// In both cases, we have a byte-oriented pipe to interact with TraceProcessor.
// The derived class is only expected to deal with these two functions:
// 1. Implement the abstract rpcSendRequestBytes() function, sending the
//    proto-encoded TraceProcessorRpc requests to the TraceProcessor instance.
// 2. Call onRpcResponseBytes() when response data is received.
export abstract class Engine {
  abstract readonly id: string;
  private _cpus?: number[];
  private _numGpus?: number;
  private loadingTracker: LoadingTracker;
  private txSeqId = 0;
  private rxSeqId = 0;
  private rxBuf = new ProtoRingBuffer();
  private pendingParses = new Array<Deferred<void>>();
  private pendingEOFs = new Array<Deferred<void>>();
  private pendingResetTraceProcessors = new Array<Deferred<void>>();
  private pendingQueries = new Array<WritableQueryResult>();
  private pendingRestoreTables = new Array<Deferred<void>>();
  private pendingComputeMetrics = new Array<Deferred<ComputeMetricResult>>();
  private pendingReadMetatrace?: Deferred<DisableAndReadMetatraceResult>;
  private _isMetatracingEnabled = false;

  constructor(tracker?: LoadingTracker) {
    this.loadingTracker = tracker ? tracker : new NullLoadingTracker();
  }

  // Called to send data to the TraceProcessor instance. This turns into a
  // postMessage() or a HTTP request, depending on the Engine implementation.
  abstract rpcSendRequestBytes(data: Uint8Array): void;

  // Called when an inbound message is received by the Engine implementation
  // (e.g. onmessage for the Wasm case, on when HTTP replies are received for
  // the HTTP+RPC case).
  onRpcResponseBytes(dataWillBeRetained: Uint8Array) {
    // Note: when hitting the fastpath inside ProtoRingBuffer, the |data| buffer
    // is returned back by readMessage() (% subarray()-ing it) and held onto by
    // other classes (e.g., QueryResult). For both fetch() and Wasm we are fine
    // because every response creates a new buffer.
    this.rxBuf.append(dataWillBeRetained);
    for (;;) {
      const msg = this.rxBuf.readMessage();
      if (msg === undefined) break;
      this.onRpcResponseMessage(msg);
    }
  }

  // Parses a response message.
  // |rpcMsgEncoded| is a sub-array to to the start of a TraceProcessorRpc
  // proto-encoded message (without the proto preamble and varint size).
  private onRpcResponseMessage(rpcMsgEncoded: Uint8Array) {
    // Here we override the protobufjs-generated code to skip the parsing of the
    // new streaming QueryResult and instead passing it through like a buffer.
    // This is the overall problem: All trace processor responses are wrapped
    // into a perfetto.protos.TraceProcessorRpc proto message. In all cases %
    // TPM_QUERY_STREAMING, we want protobufjs to decode the proto bytes and
    // give us a structured object. In the case of TPM_QUERY_STREAMING, instead,
    // we want to deal with the proto parsing ourselves using the new
    // QueryResult.appendResultBatch() method, because that handled streaming
    // results more efficiently and skips several copies.
    // By overriding the decode method below, we achieve two things:
    // 1. We avoid protobufjs decoding the TraceProcessorRpc.query_result field.
    // 2. We stash (a view of) the original buffer into the |rawQueryResult| so
    //    the `case TPM_QUERY_STREAMING` below can take it.
    perfetto.protos.QueryResult.decode =
        (reader: protobuf.Reader, length: number) => {
          const res =
              perfetto.protos.QueryResult.create() as {} as QueryResultBypass;
          res.rawQueryResult =
              reader.buf.subarray(reader.pos, reader.pos + length);
          // All this works only if protobufjs returns the original ArrayBuffer
          // from |rpcMsgEncoded|. It should be always the case given the
          // current implementation. This check mainly guards against future
          // behavioral changes of protobufjs. We don't want to accidentally
          // hold onto some internal protobufjs buffer. We are fine holding
          // onto |rpcMsgEncoded| because those come from ProtoRingBuffer which
          // is buffer-retention-friendly.
          assertTrue(res.rawQueryResult.buffer === rpcMsgEncoded.buffer);
          reader.pos += length;
          return res as {} as perfetto.protos.QueryResult;
        };

    const rpc = TraceProcessorRpc.decode(rpcMsgEncoded);

    if (rpc.fatalError !== undefined && rpc.fatalError.length > 0) {
      throw new Error(`${rpc.fatalError}`);
    }

    // Allow restarting sequences from zero (when reloading the browser).
    if (rpc.seq !== this.rxSeqId + 1 && this.rxSeqId !== 0 && rpc.seq !== 0) {
      // "(ERR:rpc_seq)" is intercepted by error_dialog.ts to show a more
      // graceful and actionable error.
      throw new Error(`RPC sequence id mismatch cur=${rpc.seq} last=${
          this.rxSeqId} (ERR:rpc_seq)`);
    }

    this.rxSeqId = rpc.seq;

    let isFinalResponse = true;

    switch (rpc.response) {
      case TPM.TPM_APPEND_TRACE_DATA:
        const appendResult = assertExists(rpc.appendResult);
        const pendingPromise = assertExists(this.pendingParses.shift());
        if (appendResult.error && appendResult.error.length > 0) {
          pendingPromise.reject(appendResult.error);
        } else {
          pendingPromise.resolve();
        }
        break;
      case TPM.TPM_FINALIZE_TRACE_DATA:
        assertExists(this.pendingEOFs.shift()).resolve();
        break;
      case TPM.TPM_RESET_TRACE_PROCESSOR:
        assertExists(this.pendingResetTraceProcessors.shift()).resolve();
        break;
      case TPM.TPM_RESTORE_INITIAL_TABLES:
        assertExists(this.pendingRestoreTables.shift()).resolve();
        break;
      case TPM.TPM_QUERY_STREAMING:
        const qRes = assertExists(rpc.queryResult) as {} as QueryResultBypass;
        const pendingQuery = assertExists(this.pendingQueries[0]);
        pendingQuery.appendResultBatch(qRes.rawQueryResult);
        if (pendingQuery.isComplete()) {
          this.pendingQueries.shift();
        } else {
          isFinalResponse = false;
        }
        break;
      case TPM.TPM_COMPUTE_METRIC:
        const metricRes = assertExists(rpc.metricResult) as ComputeMetricResult;
        const pendingComputeMetric =
            assertExists(this.pendingComputeMetrics.shift());
        if (metricRes.error && metricRes.error.length > 0) {
          const error =
              new QueryError(`ComputeMetric() error: ${metricRes.error}`, {
                query: 'COMPUTE_METRIC',
              });
          pendingComputeMetric.reject(error);
        } else {
          pendingComputeMetric.resolve(metricRes);
        }
        break;
      case TPM.TPM_DISABLE_AND_READ_METATRACE:
        const metatraceRes =
            assertExists(rpc.metatrace) as DisableAndReadMetatraceResult;
        assertExists(this.pendingReadMetatrace).resolve(metatraceRes);
        this.pendingReadMetatrace = undefined;
        break;
      default:
        console.log(
            'Unexpected TraceProcessor response received: ', rpc.response);
        break;
    }  // switch(rpc.response);

    if (isFinalResponse) {
      this.loadingTracker.endLoading();
    }
  }

  // TraceProcessor methods below this point.
  // The methods below are called by the various controllers in the UI and
  // deal with marshalling / unmarshaling requests to/from TraceProcessor.


  // Push trace data into the engine. The engine is supposed to automatically
  // figure out the type of the trace (JSON vs Protobuf).
  parse(data: Uint8Array): Promise<void> {
    const asyncRes = defer<void>();
    this.pendingParses.push(asyncRes);
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_APPEND_TRACE_DATA;
    rpc.appendTraceData = data;
    this.rpcSendRequest(rpc);
    return asyncRes;  // Linearize with the worker.
  }

  // Notify the engine that we reached the end of the trace.
  // Called after the last parse() call.
  notifyEof(): Promise<void> {
    const asyncRes = defer<void>();
    this.pendingEOFs.push(asyncRes);
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_FINALIZE_TRACE_DATA;
    this.rpcSendRequest(rpc);
    return asyncRes;  // Linearize with the worker.
  }

  // Updates the TraceProcessor Config. This method creates a new
  // TraceProcessor instance, so it should be called before passing any trace
  // data.
  resetTraceProcessor(
      {cropTrackEvents, ingestFtraceInRawTable, analyzeTraceProtoContent}:
          TraceProcessorConfig): Promise<void> {
    const asyncRes = defer<void>();
    this.pendingResetTraceProcessors.push(asyncRes);
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_RESET_TRACE_PROCESSOR;
    const args = rpc.resetTraceProcessorArgs = new ResetTraceProcessorArgs();
    args.dropTrackEventDataBefore = cropTrackEvents ?
        ResetTraceProcessorArgs.DropTrackEventDataBefore
            .TRACK_EVENT_RANGE_OF_INTEREST :
        ResetTraceProcessorArgs.DropTrackEventDataBefore.NO_DROP;
    args.ingestFtraceInRawTable = ingestFtraceInRawTable;
    args.analyzeTraceProtoContent = analyzeTraceProtoContent;
    this.rpcSendRequest(rpc);
    return asyncRes;
  }

  // Resets the trace processor state by destroying any table/views created by
  // the UI after loading.
  restoreInitialTables(): Promise<void> {
    const asyncRes = defer<void>();
    this.pendingRestoreTables.push(asyncRes);
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_RESTORE_INITIAL_TABLES;
    this.rpcSendRequest(rpc);
    return asyncRes;  // Linearize with the worker.
  }

  // Shorthand for sending a compute metrics request to the engine.
  async computeMetric(metrics: string[]): Promise<ComputeMetricResult> {
    const asyncRes = defer<ComputeMetricResult>();
    this.pendingComputeMetrics.push(asyncRes);
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_COMPUTE_METRIC;
    const args = rpc.computeMetricArgs = new ComputeMetricArgs();
    args.metricNames = metrics;
    args.format = ComputeMetricArgs.ResultFormat.TEXTPROTO;
    this.rpcSendRequest(rpc);
    return asyncRes;
  }

  // Issues a streaming query and retrieve results in batches.
  // The returned QueryResult object will be populated over time with batches
  // of rows (each batch conveys ~128KB of data and a variable number of rows).
  // The caller can decide whether to wait that all batches have been received
  // (by awaiting the returned object or calling result.waitAllRows()) or handle
  // the rows incrementally.
  //
  // Example usage:
  // const res = engine.query('SELECT foo, bar FROM table');
  // console.log(res.numRows());  // Will print 0 because we didn't await.
  // await(res.waitAllRows());
  // console.log(res.numRows());  // Will print the total number of rows.
  //
  // for (const it = res.iter({foo: NUM, bar:STR}); it.valid(); it.next()) {
  //   console.log(it.foo, it.bar);
  // }
  //
  // Optional |tag| (usually a component name) can be provided to allow
  // attributing trace processor workload to different UI components.
  query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult {
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_QUERY_STREAMING;
    rpc.queryArgs = new QueryArgs();
    rpc.queryArgs.sqlQuery = sqlQuery;
    if (tag) {
      rpc.queryArgs.tag = tag;
    }
    const result = createQueryResult({
      query: sqlQuery,
    });
    this.pendingQueries.push(result);
    this.rpcSendRequest(rpc);
    return result;
  }

  isMetatracingEnabled(): boolean {
    return this._isMetatracingEnabled;
  }

  enableMetatrace(categories?: perfetto.protos.MetatraceCategories) {
    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_ENABLE_METATRACE;
    if (categories) {
      rpc.enableMetatraceArgs = new perfetto.protos.EnableMetatraceArgs();
      rpc.enableMetatraceArgs.categories = categories;
    }
    this._isMetatracingEnabled = true;
    this.rpcSendRequest(rpc);
  }

  stopAndGetMetatrace(): Promise<DisableAndReadMetatraceResult> {
    // If we are already finalising a metatrace, ignore the request.
    if (this.pendingReadMetatrace) {
      return Promise.reject(new Error('Already finalising a metatrace'));
    }

    const result = defer<DisableAndReadMetatraceResult>();

    const rpc = TraceProcessorRpc.create();
    rpc.request = TPM.TPM_DISABLE_AND_READ_METATRACE;
    this._isMetatracingEnabled = false;
    this.pendingReadMetatrace = result;
    this.rpcSendRequest(rpc);
    return result;
  }

  // Marshals the TraceProcessorRpc request arguments and sends the request
  // to the concrete Engine (Wasm or HTTP).
  private rpcSendRequest(rpc: TraceProcessorRpc) {
    rpc.seq = this.txSeqId++;
    // Each message is wrapped in a TraceProcessorRpcStream to add the varint
    // preamble with the size, which allows tokenization on the other end.
    const outerProto = TraceProcessorRpcStream.create();
    outerProto.msg.push(rpc);
    const buf = TraceProcessorRpcStream.encode(outerProto).finish();
    this.loadingTracker.beginLoading();
    this.rpcSendRequestBytes(buf);
  }

  // TODO(hjd): When streaming must invalidate this somehow.
  async getCpus(): Promise<number[]> {
    if (!this._cpus) {
      const cpus = [];
      const queryRes = await this.query(
          'select distinct(cpu) as cpu from sched order by cpu;');
      for (const it = queryRes.iter({cpu: NUM}); it.valid(); it.next()) {
        cpus.push(it.cpu);
      }
      this._cpus = cpus;
    }
    return this._cpus;
  }

  async getNumberOfGpus(): Promise<number> {
    if (!this._numGpus) {
      const result = await this.query(`
        select count(distinct(gpu_id)) as gpuCount
        from gpu_counter_track
        where name = 'gpufreq';
      `);
      this._numGpus = result.firstRow({gpuCount: NUM}).gpuCount;
    }
    return this._numGpus;
  }

  // TODO: This should live in code that's more specific to chrome, instead of
  // in engine.
  async getNumberOfProcesses(): Promise<number> {
    const result = await this.query('select count(*) as cnt from process;');
    return result.firstRow({cnt: NUM}).cnt;
  }

  async getTraceTimeBounds(): Promise<TimeSpan> {
    const result = await this.query(
        `select start_ts as startTs, end_ts as endTs from trace_bounds`);
    const bounds = result.firstRow({
      startTs: NUM,
      endTs: NUM,
    });
    return new TimeSpan(bounds.startTs / 1e9, bounds.endTs / 1e9);
  }

  async getTracingMetadataTimeBounds(): Promise<TimeSpan> {
    const queryRes = await this.query(`select
         name,
         int_value as intValue
         from metadata
         where name = 'tracing_started_ns' or name = 'tracing_disabled_ns'
         or name = 'all_data_source_started_ns'`);
    let startBound = -Infinity;
    let endBound = Infinity;
    const it = queryRes.iter({'name': STR, 'intValue': NUM_NULL});
    for (; it.valid(); it.next()) {
      const columnName = it.name;
      const timestamp = it.intValue;
      if (timestamp === null) continue;
      if (columnName === 'tracing_disabled_ns') {
        endBound = Math.min(endBound, timestamp / 1e9);
      } else {
        startBound = Math.max(startBound, timestamp / 1e9);
      }
    }

    return new TimeSpan(startBound, endBound);
  }

  getProxy(tag: string): EngineProxy {
    return new EngineProxy(this, tag);
  }
}

// Lightweight wrapper over Engine exposing only `query` method and annotating
// all queries going through it with a tag.
export class EngineProxy {
  private engine: Engine;
  private tag: string;

  constructor(engine: Engine, tag: string) {
    this.engine = engine;
    this.tag = tag;
  }

  query(sqlQuery: string, tag?: string): Promise<QueryResult>&QueryResult {
    return this.engine.query(sqlQuery, tag || this.tag);
  }

  get engineId(): string {
    return this.engine.id;
  }
}