diff options
Diffstat (limited to 'pw_rpc/ts')
-rw-r--r-- | pw_rpc/ts/call.ts | 26 | ||||
-rw-r--r-- | pw_rpc/ts/call_test.ts | 12 | ||||
-rw-r--r-- | pw_rpc/ts/client.ts | 36 | ||||
-rw-r--r-- | pw_rpc/ts/client_test.ts | 99 | ||||
-rw-r--r-- | pw_rpc/ts/descriptors.ts | 19 | ||||
-rw-r--r-- | pw_rpc/ts/descriptors_test.ts | 14 | ||||
-rw-r--r-- | pw_rpc/ts/method.ts | 122 | ||||
-rw-r--r-- | pw_rpc/ts/packets.ts | 59 | ||||
-rw-r--r-- | pw_rpc/ts/packets_test.ts | 8 | ||||
-rw-r--r-- | pw_rpc/ts/queue.ts | 2 | ||||
-rw-r--r-- | pw_rpc/ts/rpc_classes.ts | 10 |
11 files changed, 236 insertions, 171 deletions
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts index 11894dc8c..e418760c3 100644 --- a/pw_rpc/ts/call.ts +++ b/pw_rpc/ts/call.ts @@ -12,12 +12,12 @@ // License for the specific language governing permissions and limitations under // the License. -import {Status} from 'pigweedjs/pw_status'; -import {Message} from 'google-protobuf'; +import { Status } from 'pigweedjs/pw_status'; +import { Message } from 'google-protobuf'; -import WaitQueue from "./queue"; +import WaitQueue from './queue'; -import {PendingCalls, Rpc} from './rpc_classes'; +import { PendingCalls, Rpc } from './rpc_classes'; export type Callback = (a: any) => any; @@ -70,7 +70,7 @@ export class Call { rpc: Rpc, onNext: Callback, onCompleted: Callback, - onError: Callback + onError: Callback, ) { this.rpcs = rpcs; this.rpc = rpc; @@ -86,7 +86,7 @@ export class Call { this.rpc, this, ignoreErrors, - request + request, ); if (previous !== undefined && !previous.completed) { @@ -98,13 +98,14 @@ export class Call { return this.status !== undefined || this.error !== undefined; } + // eslint-disable-next-line @typescript-eslint/ban-types private invokeCallback(func: () => {}) { try { func(); } catch (err: unknown) { if (err instanceof Error) { console.error( - `An exception was raised while invoking a callback: ${err}` + `An exception was raised while invoking a callback: ${err}`, ); this.callbackException = err; } @@ -131,8 +132,9 @@ export class Call { } private async queuePopWithTimeout( - timeoutMs: number + timeoutMs: number, ): Promise<Message | undefined> { + // eslint-disable-next-line no-async-promise-executor return new Promise(async (resolve, reject) => { let timeoutExpired = false; const timeoutWatcher = setTimeout(() => { @@ -170,7 +172,7 @@ export class Call { */ async *getResponses( count?: number, - timeoutMs?: number + timeoutMs?: number, ): AsyncGenerator<Message> { this.checkErrors(); @@ -213,6 +215,7 @@ export class Call { protected async unaryWait(timeoutMs?: number): Promise<[Status, Message]> { for await (const response of this.getResponses(1, timeoutMs)) { + // Do nothing. } if (this.status === undefined) { throw Error('Unexpected undefined status at end of stream'); @@ -225,6 +228,7 @@ export class Call { protected async streamWait(timeoutMs?: number): Promise<[Status, Message[]]> { for await (const response of this.getResponses(undefined, timeoutMs)) { + // Do nothing. } if (this.status === undefined) { throw Error('Unexpected undefined status at end of stream'); @@ -276,7 +280,7 @@ export class ClientStreamingCall extends Call { /** Ends the client stream and waits for the RPC to complete. */ async finishAndWait( requests: Message[] = [], - timeoutMs?: number + timeoutMs?: number, ): Promise<[Status, Message[]]> { this.finishClientStream(requests); return await this.streamWait(timeoutMs); @@ -300,7 +304,7 @@ export class BidirectionalStreamingCall extends Call { /** Ends the client stream and waits for the RPC to complete. */ async finishAndWait( requests: Array<Message> = [], - timeoutMs?: number + timeoutMs?: number, ): Promise<[Status, Array<Message>]> { this.finishClientStream(requests); return await this.streamWait(timeoutMs); diff --git a/pw_rpc/ts/call_test.ts b/pw_rpc/ts/call_test.ts index 47af6dab8..98590c88a 100644 --- a/pw_rpc/ts/call_test.ts +++ b/pw_rpc/ts/call_test.ts @@ -14,11 +14,11 @@ /* eslint-env browser */ -import {SomeMessage} from 'pigweedjs/protos/pw_rpc/ts/test2_pb'; +import { SomeMessage } from 'pigweedjs/protos/pw_rpc/ts/test2_pb'; -import {Call} from './call'; -import {Channel, Method, Service} from './descriptors'; -import {PendingCalls, Rpc} from './rpc_classes'; +import { Call } from './call'; +import { Channel, Method, Service } from './descriptors'; +import { PendingCalls, Rpc } from './rpc_classes'; class FakeRpc { readonly channel: any = undefined; @@ -33,7 +33,9 @@ describe('Call', () => { let call: Call; beforeEach(() => { - const noop = () => { }; + const noop = () => { + // Do nothing. + }; const pendingCalls = new PendingCalls(); const rpc = new FakeRpc(); call = new Call(pendingCalls, rpc, noop, noop, noop); diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts index e7a97176d..e99554877 100644 --- a/pw_rpc/ts/client.ts +++ b/pw_rpc/ts/client.ts @@ -14,18 +14,18 @@ /** Provides a pw_rpc client for TypeScript. */ -import {ProtoCollection} from 'pigweedjs/pw_protobuf_compiler'; -import {Status} from 'pigweedjs/pw_status'; -import {Message} from 'google-protobuf'; +import { ProtoCollection } from 'pigweedjs/pw_protobuf_compiler'; +import { Status } from 'pigweedjs/pw_status'; +import { Message } from 'google-protobuf'; import { PacketType, RpcPacket, } from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; -import {Channel, Service} from './descriptors'; -import {MethodStub, methodStubFactory} from './method'; +import { Channel, Service } from './descriptors'; +import { MethodStub, methodStubFactory } from './method'; import * as packets from './packets'; -import {PendingCalls, Rpc} from './rpc_classes'; +import { PendingCalls, Rpc } from './rpc_classes'; /** * Object for managing RPC service and contained methods. @@ -38,7 +38,7 @@ export class ServiceClient { constructor(client: Client, channel: Channel, service: Service) { this.service = service; const methods = service.methods; - methods.forEach(method => { + methods.forEach((method) => { const stub = methodStubFactory(client.rpcs, channel, method); this.methods.push(stub); this.methodsByName.set(method.name, stub); @@ -67,7 +67,7 @@ export class ChannelClient { constructor(client: Client, channel: Channel, services: Service[]) { this.channel = channel; - services.forEach(service => { + services.forEach((service) => { const serviceClient = new ServiceClient(client, this.channel, service); this.services.set(service.name, serviceClient); }); @@ -122,14 +122,14 @@ export class Client { constructor(channels: Channel[], services: Service[]) { this.rpcs = new PendingCalls(); - services.forEach(service => { + services.forEach((service) => { this.services.set(service.id, service); }); - channels.forEach(channel => { + channels.forEach((channel) => { this.channelsById.set( channel.id, - new ChannelClient(this, channel, services) + new ChannelClient(this, channel, services), ); }); } @@ -145,11 +145,11 @@ export class Client { static fromProtoSet(channels: Channel[], protoSet: ProtoCollection): Client { let services: Service[] = []; const descriptors = protoSet.fileDescriptorSet.getFileList(); - descriptors.forEach(fileDescriptor => { + descriptors.forEach((fileDescriptor) => { const packageName = fileDescriptor.getPackage()!; - fileDescriptor.getServiceList().forEach(serviceDescriptor => { + fileDescriptor.getServiceList().forEach((serviceDescriptor) => { services = services.concat( - new Service(serviceDescriptor, protoSet, packageName) + new Service(serviceDescriptor, protoSet, packageName), ); }); }); @@ -176,7 +176,7 @@ export class Client { */ private rpc( packet: RpcPacket, - channelClient: ChannelClient + channelClient: ChannelClient, ): Rpc | undefined { const service = this.services.get(packet.getServiceId()); if (service == undefined) { @@ -215,7 +215,7 @@ export class Client { private sendClientError( client: ChannelClient, packet: RpcPacket, - error: Status + error: Status, ) { client.channel.send(packets.encodeClientError(packet, error)); } @@ -290,10 +290,10 @@ export class Client { if (packet.getType() === PacketType.SERVER_ERROR) { if (status === Status.OK) { - throw 'Unexpected OK status on SERVER_ERROR'; + throw new Error('Unexpected OK status on SERVER_ERROR'); } if (status === undefined) { - throw 'Missing status on SERVER_ERROR'; + throw new Error('Missing status on SERVER_ERROR'); } console.warn(`${rpc}: invocation failed with status: ${Status[status]}`); call.handleError(status); diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts index 0dfed21b1..0535fddb9 100644 --- a/pw_rpc/ts/client_test.ts +++ b/pw_rpc/ts/client_test.ts @@ -14,21 +14,18 @@ /* eslint-env browser */ -import {Status} from 'pigweedjs/pw_status'; -import {MessageCreator} from 'pigweedjs/pw_protobuf_compiler'; -import {Message} from 'google-protobuf'; +import { Status } from 'pigweedjs/pw_status'; +import { MessageCreator } from 'pigweedjs/pw_protobuf_compiler'; +import { Message } from 'google-protobuf'; import { PacketType, RpcPacket, } from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; -import {ProtoCollection} from 'pigweedjs/protos/collection'; -import { - Request, - Response, -} from 'pigweedjs/protos/pw_rpc/ts/test_pb'; +import { ProtoCollection } from 'pigweedjs/protos/collection'; +import { Request, Response } from 'pigweedjs/protos/pw_rpc/ts/test_pb'; -import {Client} from './client'; -import {Channel, Method} from './descriptors'; +import { Client } from './client'; +import { Channel, Method } from './descriptors'; import { BidirectionalStreamingMethodStub, ClientStreamingMethodStub, @@ -77,10 +74,10 @@ describe('Client', () => { const channel = client.channel()!; expect(channel.methodStub('')).toBeUndefined(); expect( - channel.methodStub('pw.rpc.test1.Garbage.SomeUnary') + channel.methodStub('pw.rpc.test1.Garbage.SomeUnary'), ).toBeUndefined(); expect( - channel.methodStub('pw.rpc.test1.TheTestService.Garbage') + channel.methodStub('pw.rpc.test1.TheTestService.Garbage'), ).toBeUndefined(); }); @@ -134,7 +131,7 @@ describe('Client', () => { const packet = packets.encodeResponse( [1, service.id, method.id], - new Request() + new Request(), ); const status = client.processPacket(packet); expect(client.processPacket(packet)).toEqual(Status.OK); @@ -159,7 +156,12 @@ describe('RPC', () => { beforeEach(async () => { protoCollection = new ProtoCollection(); - const channels = [new Channel(1, handlePacket), new Channel(2, () => { })]; + const channels = [ + new Channel(1, handlePacket), + new Channel(2, () => { + // Do nothing. + }), + ]; client = Client.fromProtoSet(channels, protoCollection); lastPacketSent = undefined; requests = []; @@ -185,7 +187,7 @@ describe('RPC', () => { channelId: number, method: Method, status: Status, - response?: Message + response?: Message, ) { const packet = new RpcPacket(); packet.setType(PacketType.RESPONSE); @@ -194,7 +196,7 @@ describe('RPC', () => { packet.setMethodId(method.id); packet.setStatus(status); if (response === undefined) { - packet.setPayload(new Uint8Array()); + packet.setPayload(new Uint8Array(0)); } else { packet.setPayload(response.serializeBinary()); } @@ -205,7 +207,7 @@ describe('RPC', () => { channelId: number, method: Method, response: Message, - status: Status = Status.OK + status: Status = Status.OK, ) { const packet = new RpcPacket(); packet.setType(PacketType.SERVER_STREAM); @@ -221,7 +223,7 @@ describe('RPC', () => { channelId: number, method: Method, status: Status, - processStatus: Status + processStatus: Status, ) { const packet = new RpcPacket(); packet.setType(PacketType.SERVER_ERROR); @@ -276,8 +278,8 @@ describe('RPC', () => { unaryStub = client .channel() ?.methodStub( - 'pw.rpc.test1.TheTestService.SomeUnary' - )! as UnaryMethodStub; + 'pw.rpc.test1.TheTestService.SomeUnary', + ) as UnaryMethodStub; }); it('blocking call', async () => { @@ -286,7 +288,7 @@ describe('RPC', () => { 1, unaryStub.method, Status.ABORTED, - newResponse('0_o') + newResponse('0_o'), ); const [status, response] = await unaryStub.call(newRequest(6)); @@ -308,7 +310,7 @@ describe('RPC', () => { newRequest(5), onNext, onCompleted, - onError + onError, ); expect(sentPayload(Request).getMagicNumber()).toEqual(5); @@ -406,8 +408,8 @@ describe('RPC', () => { serverStreaming = client .channel() ?.methodStub( - 'pw.rpc.test1.TheTestService.SomeServerStreaming' - )! as ServerStreamingMethodStub; + 'pw.rpc.test1.TheTestService.SomeServerStreaming', + ) as ServerStreamingMethodStub; }); it('non-blocking call', () => { @@ -430,7 +432,7 @@ describe('RPC', () => { expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED); expect( - sentPayload(serverStreaming.method.requestType).getMagicNumber() + sentPayload(serverStreaming.method.requestType).getMagicNumber(), ).toEqual(4); } }); @@ -452,7 +454,7 @@ describe('RPC', () => { newRequest(3), onNext, onCompleted, - onError + onError, ); expect(requests).toHaveLength(0); @@ -507,8 +509,8 @@ describe('RPC', () => { clientStreaming = client .channel() ?.methodStub( - 'pw.rpc.test1.TheTestService.SomeClientStreaming' - )! as ClientStreamingMethodStub; + 'pw.rpc.test1.TheTestService.SomeClientStreaming', + ) as ClientStreamingMethodStub; }); it('non-blocking call', () => { @@ -584,7 +586,9 @@ describe('RPC', () => { enqueueResponse(1, clientStreaming.method, Status.OK, testResponse); stream.finishAndWait(); - expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM_END); + expect(lastRequest().getType()).toEqual( + PacketType.CLIENT_REQUEST_COMPLETION, + ); expect(onNext).toHaveBeenCalledWith(testResponse); expect(stream.completed).toBe(true); @@ -614,7 +618,7 @@ describe('RPC', () => { 1, clientStreaming.method, Status.INVALID_ARGUMENT, - Status.OK + Status.OK, ); stream.send(newRequest()); @@ -624,7 +628,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.INVALID_ARGUMENT); }); } @@ -633,12 +637,12 @@ describe('RPC', () => { it('non-blocking call server error after stream end', async () => { for (let i = 0; i < 3; i++) { const stream = clientStreaming.invoke(); - // Error will be sent in response to the CLIENT_STREAM_END packet. + // Error will be sent in response to the CLIENT_REQUEST_COMPLETION packet. enqueueError( 1, clientStreaming.method, Status.INVALID_ARGUMENT, - Status.OK + Status.OK, ); await stream @@ -646,7 +650,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.INVALID_ARGUMENT); }); } @@ -659,8 +663,7 @@ describe('RPC', () => { try { stream.send(newRequest()); - } - catch (e) { + } catch (e) { console.log(e); expect(e.status).toEqual(Status.CANCELLED); } @@ -675,7 +678,7 @@ describe('RPC', () => { 1, clientStreaming.method, Status.UNAVAILABLE, - enqueuedResponse + enqueuedResponse, ); const stream = clientStreaming.invoke(); @@ -696,7 +699,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.UNAVAILABLE); expect(stream.error).toEqual(Status.UNAVAILABLE); expect(stream.response).toBeUndefined(); @@ -721,8 +724,8 @@ describe('RPC', () => { bidiStreaming = client .channel() ?.methodStub( - 'pw.rpc.test1.TheTestService.SomeBidiStreaming' - )! as BidirectionalStreamingMethodStub; + 'pw.rpc.test1.TheTestService.SomeBidiStreaming', + ) as BidirectionalStreamingMethodStub; }); it('blocking call', async () => { @@ -745,7 +748,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.NOT_FOUND); }); }); @@ -756,7 +759,7 @@ describe('RPC', () => { for (let i = 0; i < 3; i++) { const testResponses: Array<Message> = []; - const stream = bidiStreaming.invoke(response => { + const stream = bidiStreaming.invoke((response) => { testResponses.push(response); }); expect(stream.completed).toBe(false); @@ -825,7 +828,7 @@ describe('RPC', () => { for (let i = 0; i < 3; i++) { const testResponses: Array<Message> = []; - const stream = bidiStreaming.invoke(response => { + const stream = bidiStreaming.invoke((response) => { testResponses.push(response); }); expect(stream.completed).toBe(false); @@ -849,7 +852,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.OUT_OF_RANGE); }); } @@ -858,12 +861,12 @@ describe('RPC', () => { for (let i = 0; i < 3; i++) { const stream = bidiStreaming.invoke(); - // Error is sent in response to CLIENT_STREAM_END packet. + // Error is sent in response to CLIENT_REQUEST_COMPLETION packet. enqueueError( 1, bidiStreaming.method, Status.INVALID_ARGUMENT, - Status.OK + Status.OK, ); await stream @@ -871,7 +874,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.INVALID_ARGUMENT); }); } @@ -915,7 +918,7 @@ describe('RPC', () => { .then(() => { fail('Promise should not be resolved'); }) - .catch(reason => { + .catch((reason) => { expect(reason.status).toEqual(Status.UNAVAILABLE); expect(stream.error).toEqual(Status.UNAVAILABLE); }); diff --git a/pw_rpc/ts/descriptors.ts b/pw_rpc/ts/descriptors.ts index eb4581f3e..bd0d470de 100644 --- a/pw_rpc/ts/descriptors.ts +++ b/pw_rpc/ts/descriptors.ts @@ -12,13 +12,13 @@ // License for the specific language governing permissions and limitations under // the License. -import {ProtoCollection} from 'pigweedjs/pw_protobuf_compiler'; +import { ProtoCollection } from 'pigweedjs/pw_protobuf_compiler'; import { MethodDescriptorProto, ServiceDescriptorProto, } from 'google-protobuf/google/protobuf/descriptor_pb'; -import {hash} from './hash'; +import { hash } from './hash'; interface ChannelOutput { (data: Uint8Array): void; @@ -28,7 +28,12 @@ export class Channel { readonly id: number; private output: ChannelOutput; - constructor(id: number, output: ChannelOutput = () => {}) { + constructor( + id: number, + output: ChannelOutput = () => { + /* do nothing. */ + }, + ) { this.id = id; this.output = output; } @@ -48,7 +53,7 @@ export class Service { constructor( descriptor: ServiceDescriptorProto, protoCollection: ProtoCollection, - packageName: string + packageName: string, ) { this.name = packageName + '.' + descriptor.getName()!; this.id = hash(this.name); @@ -83,7 +88,7 @@ export class Method { constructor( descriptor: MethodDescriptorProto, protoCollection: ProtoCollection, - service: Service + service: Service, ) { this.name = descriptor.getName()!; this.id = hash(this.name); @@ -97,10 +102,10 @@ export class Method { // Remove leading period if it exists. this.requestType = protoCollection.getMessageCreator( - requestTypePath.replace(/^\./, '') + requestTypePath.replace(/^\./, ''), )!; this.responseType = protoCollection.getMessageCreator( - responseTypePath.replace(/^\./, '') + responseTypePath.replace(/^\./, ''), )!; } diff --git a/pw_rpc/ts/descriptors_test.ts b/pw_rpc/ts/descriptors_test.ts index a6b11e26a..c09d3d3fa 100644 --- a/pw_rpc/ts/descriptors_test.ts +++ b/pw_rpc/ts/descriptors_test.ts @@ -14,11 +14,8 @@ /* eslint-env browser */ -import {ProtoCollection} from 'pigweedjs/protos/collection'; -import { - Request, - Response, -} from 'pigweedjs/protos/pw_rpc/ts/test_pb'; +import { ProtoCollection } from 'pigweedjs/protos/collection'; +import { Request, Response } from 'pigweedjs/protos/pw_rpc/ts/test_pb'; import * as descriptors from './descriptors'; @@ -27,13 +24,14 @@ const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin'; describe('Descriptors', () => { it('parses from ServiceDescriptor binary', async () => { const protoCollection = new ProtoCollection(); - const fd = protoCollection.fileDescriptorSet.getFileList() - .find((file: any) => file.array[1].indexOf("pw.rpc.test1") !== -1); + const fd = protoCollection.fileDescriptorSet + .getFileList() + .find((file: any) => file.array[1].indexOf('pw.rpc.test1') !== -1); const sd = fd.getServiceList()[0]; const service = new descriptors.Service( sd, protoCollection, - fd.getPackage()! + fd.getPackage()!, ); expect(service.name).toEqual('pw.rpc.test1.TheTestService'); diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts index 18cf6db07..f67d9cfb0 100644 --- a/pw_rpc/ts/method.ts +++ b/pw_rpc/ts/method.ts @@ -12,8 +12,8 @@ // License for the specific language governing permissions and limitations under // the License. -import {Status} from 'pigweedjs/pw_status'; -import {Message} from 'google-protobuf'; +import { Status } from 'pigweedjs/pw_status'; +import { Message } from 'google-protobuf'; import { BidirectionalStreamingCall, @@ -23,13 +23,13 @@ import { ServerStreamingCall, UnaryCall, } from './call'; -import {Channel, Method, MethodType, Service} from './descriptors'; -import {PendingCalls, Rpc} from './rpc_classes'; +import { Channel, Method, MethodType, Service } from './descriptors'; +import { PendingCalls, Rpc } from './rpc_classes'; export function methodStubFactory( rpcs: PendingCalls, channel: Channel, - method: Method + method: Method, ): MethodStub { switch (method.type) { case MethodType.BIDIRECTIONAL_STREAMING: @@ -64,16 +64,22 @@ export abstract class MethodStub { export class UnaryMethodStub extends MethodStub { invoke( request: Message, - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): UnaryCall { const call = new UnaryCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(request); return call; @@ -81,16 +87,22 @@ export class UnaryMethodStub extends MethodStub { open( request: Message, - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): UnaryCall { const call = new UnaryCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(request, true); return call; @@ -104,16 +116,22 @@ export class UnaryMethodStub extends MethodStub { export class ServerStreamingMethodStub extends MethodStub { invoke( request?: Message, - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): ServerStreamingCall { const call = new ServerStreamingCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(request); return call; @@ -121,16 +139,22 @@ export class ServerStreamingMethodStub extends MethodStub { open( request: Message, - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): UnaryCall { const call = new UnaryCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(request, true); return call; @@ -143,32 +167,44 @@ export class ServerStreamingMethodStub extends MethodStub { export class ClientStreamingMethodStub extends MethodStub { invoke( - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): ClientStreamingCall { const call = new ClientStreamingCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(); return call; } open( - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): ClientStreamingCall { const call = new ClientStreamingCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(undefined, true); return call; @@ -181,32 +217,44 @@ export class ClientStreamingMethodStub extends MethodStub { export class BidirectionalStreamingMethodStub extends MethodStub { invoke( - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): BidirectionalStreamingCall { const call = new BidirectionalStreamingCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(); return call; } open( - onNext: Callback = () => {}, - onCompleted: Callback = () => {}, - onError: Callback = () => {} + onNext: Callback = () => { + // Do nothing. + }, + onCompleted: Callback = () => { + // Do nothing. + }, + onError: Callback = () => { + // Do nothing. + }, ): BidirectionalStreamingCall { const call = new BidirectionalStreamingCall( this.rpcs, this.rpc, onNext, onCompleted, - onError + onError, ); call.invoke(undefined, true); return call; diff --git a/pw_rpc/ts/packets.ts b/pw_rpc/ts/packets.ts index 68cf80393..b6cb71395 100644 --- a/pw_rpc/ts/packets.ts +++ b/pw_rpc/ts/packets.ts @@ -14,33 +14,34 @@ /** Functions for working with pw_rpc packets. */ -import {Message} from 'google-protobuf'; -import {MethodDescriptorProto} from 'google-protobuf/google/protobuf/descriptor_pb'; -import * as packetPb from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; -import {Status} from 'pigweedjs/pw_status'; +import { Message } from 'google-protobuf'; +import { + RpcPacket, + PacketType, +} from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; +import { Status } from 'pigweedjs/pw_status'; // Channel, Service, Method type idSet = [number, number, number]; -export function decode(data: Uint8Array): packetPb.RpcPacket { - return packetPb.RpcPacket.deserializeBinary(data); +export function decode(data: Uint8Array): RpcPacket { + return RpcPacket.deserializeBinary(data); } -export function decodePayload(payload: Uint8Array, payloadType: any): Message { - const message = payloadType.deserializeBinary(payload); - return message; +export function decodePayload(payload: Uint8Array, payloadType: any): any { + return payloadType['deserializeBinary'](payload); } -export function forServer(packet: packetPb.RpcPacket): boolean { +export function forServer(packet: RpcPacket): boolean { return packet.getType() % 2 == 0; } export function encodeClientError( - packet: packetPb.RpcPacket, - status: Status + packet: RpcPacket, + status: Status, ): Uint8Array { - const errorPacket = new packetPb.RpcPacket(); - errorPacket.setType(packetPb.PacketType.CLIENT_ERROR); + const errorPacket = new RpcPacket(); + errorPacket.setType(PacketType.CLIENT_ERROR); errorPacket.setChannelId(packet.getChannelId()); errorPacket.setMethodId(packet.getMethodId()); errorPacket.setServiceId(packet.getServiceId()); @@ -49,18 +50,19 @@ export function encodeClientError( } export function encodeClientStream(ids: idSet, message: Message): Uint8Array { - const streamPacket = new packetPb.RpcPacket(); - streamPacket.setType(packetPb.PacketType.CLIENT_STREAM); + const streamPacket = new RpcPacket(); + streamPacket.setType(PacketType.CLIENT_STREAM); streamPacket.setChannelId(ids[0]); streamPacket.setServiceId(ids[1]); streamPacket.setMethodId(ids[2]); - streamPacket.setPayload(message.serializeBinary()); + const msgSerialized = (message as any)['serializeBinary'](); + streamPacket.setPayload(msgSerialized); return streamPacket.serializeBinary(); } export function encodeClientStreamEnd(ids: idSet): Uint8Array { - const streamEnd = new packetPb.RpcPacket(); - streamEnd.setType(packetPb.PacketType.CLIENT_STREAM_END); + const streamEnd = new RpcPacket(); + streamEnd.setType(PacketType.CLIENT_REQUEST_COMPLETION); streamEnd.setChannelId(ids[0]); streamEnd.setServiceId(ids[1]); streamEnd.setMethodId(ids[2]); @@ -70,11 +72,11 @@ export function encodeClientStreamEnd(ids: idSet): Uint8Array { export function encodeRequest(ids: idSet, request?: Message): Uint8Array { const payload: Uint8Array = typeof request !== 'undefined' - ? request.serializeBinary() - : new Uint8Array(); + ? (request as any)['serializeBinary']() + : new Uint8Array(0); - const packet = new packetPb.RpcPacket(); - packet.setType(packetPb.PacketType.REQUEST); + const packet = new RpcPacket(); + packet.setType(PacketType.REQUEST); packet.setChannelId(ids[0]); packet.setServiceId(ids[1]); packet.setMethodId(ids[2]); @@ -83,18 +85,19 @@ export function encodeRequest(ids: idSet, request?: Message): Uint8Array { } export function encodeResponse(ids: idSet, response: Message): Uint8Array { - const packet = new packetPb.RpcPacket(); - packet.setType(packetPb.PacketType.RESPONSE); + const packet = new RpcPacket(); + packet.setType(PacketType.RESPONSE); packet.setChannelId(ids[0]); packet.setServiceId(ids[1]); packet.setMethodId(ids[2]); - packet.setPayload(response.serializeBinary()); + const msgSerialized = (response as any)['serializeBinary'](); + packet.setPayload(msgSerialized); return packet.serializeBinary(); } export function encodeCancel(ids: idSet): Uint8Array { - const packet = new packetPb.RpcPacket(); - packet.setType(packetPb.PacketType.CLIENT_ERROR); + const packet = new RpcPacket(); + packet.setType(PacketType.CLIENT_ERROR); packet.setStatus(Status.CANCELLED); packet.setChannelId(ids[0]); packet.setServiceId(ids[1]); diff --git a/pw_rpc/ts/packets_test.ts b/pw_rpc/ts/packets_test.ts index b399e2578..50cdb3f1a 100644 --- a/pw_rpc/ts/packets_test.ts +++ b/pw_rpc/ts/packets_test.ts @@ -17,7 +17,7 @@ import { PacketType, RpcPacket, } from 'pigweedjs/protos/pw_rpc/internal/packet_pb'; -import {Status} from 'pigweedjs/pw_status'; +import { Status } from 'pigweedjs/pw_status'; import * as packets from './packets'; @@ -31,7 +31,9 @@ function addTestData(packet: RpcPacket) { } describe('Packets', () => { - beforeEach(() => { }); + beforeEach(() => { + // Do nothing. + }); it('encodeRequest sets packet fields', () => { const goldenRequest = new RpcPacket(); @@ -95,7 +97,7 @@ describe('Packets', () => { addTestData(request); expect(request.toObject()).toEqual( - packets.decode(request.serializeBinary()).toObject() + packets.decode(request.serializeBinary()).toObject(), ); }); diff --git a/pw_rpc/ts/queue.ts b/pw_rpc/ts/queue.ts index 0bd2b8c6c..44dd48cbd 100644 --- a/pw_rpc/ts/queue.ts +++ b/pw_rpc/ts/queue.ts @@ -32,7 +32,7 @@ export default class Queue<T> { } shift(): Promise<T> { - return new Promise(resolve => { + return new Promise((resolve) => { if (this.length > 0) { return resolve(this.queue.shift()!); } else { diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts index 703612e19..83001e268 100644 --- a/pw_rpc/ts/rpc_classes.ts +++ b/pw_rpc/ts/rpc_classes.ts @@ -12,11 +12,11 @@ // License for the specific language governing permissions and limitations under // the License. -import {Message} from 'google-protobuf'; -import {Status} from 'pigweedjs/pw_status'; +import { Message } from 'google-protobuf'; +import { Status } from 'pigweedjs/pw_status'; -import {Call} from './call'; -import {Channel, Method, Service} from './descriptors'; +import { Call } from './call'; +import { Channel, Method, Service } from './descriptors'; import * as packets from './packets'; /** Data class for a pending RPC call. */ @@ -70,7 +70,7 @@ export class PendingCalls { rpc: Rpc, call: Call, ignoreError: boolean, - request?: Message + request?: Message, ): Call | undefined { const previous = this.open(rpc, call); const packet = packets.encodeRequest(rpc.idSet, request); |