aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/ts
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/ts')
-rw-r--r--pw_rpc/ts/call.ts26
-rw-r--r--pw_rpc/ts/call_test.ts12
-rw-r--r--pw_rpc/ts/client.ts36
-rw-r--r--pw_rpc/ts/client_test.ts99
-rw-r--r--pw_rpc/ts/descriptors.ts19
-rw-r--r--pw_rpc/ts/descriptors_test.ts14
-rw-r--r--pw_rpc/ts/method.ts122
-rw-r--r--pw_rpc/ts/packets.ts59
-rw-r--r--pw_rpc/ts/packets_test.ts8
-rw-r--r--pw_rpc/ts/queue.ts2
-rw-r--r--pw_rpc/ts/rpc_classes.ts10
11 files changed, 236 insertions, 171 deletions
diff --git a/pw_rpc/ts/call.ts b/pw_rpc/ts/call.ts
index 11894dc8c..e418760c3 100644
--- a/pw_rpc/ts/call.ts
+++ b/pw_rpc/ts/call.ts
@@ -12,12 +12,12 @@
// License for the specific language governing permissions and limitations under
// the License.
-import {Status} from 'pigweedjs/pw_status';
-import {Message} from 'google-protobuf';
+import { Status } from 'pigweedjs/pw_status';
+import { Message } from 'google-protobuf';
-import WaitQueue from "./queue";
+import WaitQueue from './queue';
-import {PendingCalls, Rpc} from './rpc_classes';
+import { PendingCalls, Rpc } from './rpc_classes';
export type Callback = (a: any) => any;
@@ -70,7 +70,7 @@ export class Call {
rpc: Rpc,
onNext: Callback,
onCompleted: Callback,
- onError: Callback
+ onError: Callback,
) {
this.rpcs = rpcs;
this.rpc = rpc;
@@ -86,7 +86,7 @@ export class Call {
this.rpc,
this,
ignoreErrors,
- request
+ request,
);
if (previous !== undefined && !previous.completed) {
@@ -98,13 +98,14 @@ export class Call {
return this.status !== undefined || this.error !== undefined;
}
+ // eslint-disable-next-line @typescript-eslint/ban-types
private invokeCallback(func: () => {}) {
try {
func();
} catch (err: unknown) {
if (err instanceof Error) {
console.error(
- `An exception was raised while invoking a callback: ${err}`
+ `An exception was raised while invoking a callback: ${err}`,
);
this.callbackException = err;
}
@@ -131,8 +132,9 @@ export class Call {
}
private async queuePopWithTimeout(
- timeoutMs: number
+ timeoutMs: number,
): Promise<Message | undefined> {
+ // eslint-disable-next-line no-async-promise-executor
return new Promise(async (resolve, reject) => {
let timeoutExpired = false;
const timeoutWatcher = setTimeout(() => {
@@ -170,7 +172,7 @@ export class Call {
*/
async *getResponses(
count?: number,
- timeoutMs?: number
+ timeoutMs?: number,
): AsyncGenerator<Message> {
this.checkErrors();
@@ -213,6 +215,7 @@ export class Call {
protected async unaryWait(timeoutMs?: number): Promise<[Status, Message]> {
for await (const response of this.getResponses(1, timeoutMs)) {
+ // Do nothing.
}
if (this.status === undefined) {
throw Error('Unexpected undefined status at end of stream');
@@ -225,6 +228,7 @@ export class Call {
protected async streamWait(timeoutMs?: number): Promise<[Status, Message[]]> {
for await (const response of this.getResponses(undefined, timeoutMs)) {
+ // Do nothing.
}
if (this.status === undefined) {
throw Error('Unexpected undefined status at end of stream');
@@ -276,7 +280,7 @@ export class ClientStreamingCall extends Call {
/** Ends the client stream and waits for the RPC to complete. */
async finishAndWait(
requests: Message[] = [],
- timeoutMs?: number
+ timeoutMs?: number,
): Promise<[Status, Message[]]> {
this.finishClientStream(requests);
return await this.streamWait(timeoutMs);
@@ -300,7 +304,7 @@ export class BidirectionalStreamingCall extends Call {
/** Ends the client stream and waits for the RPC to complete. */
async finishAndWait(
requests: Array<Message> = [],
- timeoutMs?: number
+ timeoutMs?: number,
): Promise<[Status, Array<Message>]> {
this.finishClientStream(requests);
return await this.streamWait(timeoutMs);
diff --git a/pw_rpc/ts/call_test.ts b/pw_rpc/ts/call_test.ts
index 47af6dab8..98590c88a 100644
--- a/pw_rpc/ts/call_test.ts
+++ b/pw_rpc/ts/call_test.ts
@@ -14,11 +14,11 @@
/* eslint-env browser */
-import {SomeMessage} from 'pigweedjs/protos/pw_rpc/ts/test2_pb';
+import { SomeMessage } from 'pigweedjs/protos/pw_rpc/ts/test2_pb';
-import {Call} from './call';
-import {Channel, Method, Service} from './descriptors';
-import {PendingCalls, Rpc} from './rpc_classes';
+import { Call } from './call';
+import { Channel, Method, Service } from './descriptors';
+import { PendingCalls, Rpc } from './rpc_classes';
class FakeRpc {
readonly channel: any = undefined;
@@ -33,7 +33,9 @@ describe('Call', () => {
let call: Call;
beforeEach(() => {
- const noop = () => { };
+ const noop = () => {
+ // Do nothing.
+ };
const pendingCalls = new PendingCalls();
const rpc = new FakeRpc();
call = new Call(pendingCalls, rpc, noop, noop, noop);
diff --git a/pw_rpc/ts/client.ts b/pw_rpc/ts/client.ts
index e7a97176d..e99554877 100644
--- a/pw_rpc/ts/client.ts
+++ b/pw_rpc/ts/client.ts
@@ -14,18 +14,18 @@
/** Provides a pw_rpc client for TypeScript. */
-import {ProtoCollection} from 'pigweedjs/pw_protobuf_compiler';
-import {Status} from 'pigweedjs/pw_status';
-import {Message} from 'google-protobuf';
+import { ProtoCollection } from 'pigweedjs/pw_protobuf_compiler';
+import { Status } from 'pigweedjs/pw_status';
+import { Message } from 'google-protobuf';
import {
PacketType,
RpcPacket,
} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
-import {Channel, Service} from './descriptors';
-import {MethodStub, methodStubFactory} from './method';
+import { Channel, Service } from './descriptors';
+import { MethodStub, methodStubFactory } from './method';
import * as packets from './packets';
-import {PendingCalls, Rpc} from './rpc_classes';
+import { PendingCalls, Rpc } from './rpc_classes';
/**
* Object for managing RPC service and contained methods.
@@ -38,7 +38,7 @@ export class ServiceClient {
constructor(client: Client, channel: Channel, service: Service) {
this.service = service;
const methods = service.methods;
- methods.forEach(method => {
+ methods.forEach((method) => {
const stub = methodStubFactory(client.rpcs, channel, method);
this.methods.push(stub);
this.methodsByName.set(method.name, stub);
@@ -67,7 +67,7 @@ export class ChannelClient {
constructor(client: Client, channel: Channel, services: Service[]) {
this.channel = channel;
- services.forEach(service => {
+ services.forEach((service) => {
const serviceClient = new ServiceClient(client, this.channel, service);
this.services.set(service.name, serviceClient);
});
@@ -122,14 +122,14 @@ export class Client {
constructor(channels: Channel[], services: Service[]) {
this.rpcs = new PendingCalls();
- services.forEach(service => {
+ services.forEach((service) => {
this.services.set(service.id, service);
});
- channels.forEach(channel => {
+ channels.forEach((channel) => {
this.channelsById.set(
channel.id,
- new ChannelClient(this, channel, services)
+ new ChannelClient(this, channel, services),
);
});
}
@@ -145,11 +145,11 @@ export class Client {
static fromProtoSet(channels: Channel[], protoSet: ProtoCollection): Client {
let services: Service[] = [];
const descriptors = protoSet.fileDescriptorSet.getFileList();
- descriptors.forEach(fileDescriptor => {
+ descriptors.forEach((fileDescriptor) => {
const packageName = fileDescriptor.getPackage()!;
- fileDescriptor.getServiceList().forEach(serviceDescriptor => {
+ fileDescriptor.getServiceList().forEach((serviceDescriptor) => {
services = services.concat(
- new Service(serviceDescriptor, protoSet, packageName)
+ new Service(serviceDescriptor, protoSet, packageName),
);
});
});
@@ -176,7 +176,7 @@ export class Client {
*/
private rpc(
packet: RpcPacket,
- channelClient: ChannelClient
+ channelClient: ChannelClient,
): Rpc | undefined {
const service = this.services.get(packet.getServiceId());
if (service == undefined) {
@@ -215,7 +215,7 @@ export class Client {
private sendClientError(
client: ChannelClient,
packet: RpcPacket,
- error: Status
+ error: Status,
) {
client.channel.send(packets.encodeClientError(packet, error));
}
@@ -290,10 +290,10 @@ export class Client {
if (packet.getType() === PacketType.SERVER_ERROR) {
if (status === Status.OK) {
- throw 'Unexpected OK status on SERVER_ERROR';
+ throw new Error('Unexpected OK status on SERVER_ERROR');
}
if (status === undefined) {
- throw 'Missing status on SERVER_ERROR';
+ throw new Error('Missing status on SERVER_ERROR');
}
console.warn(`${rpc}: invocation failed with status: ${Status[status]}`);
call.handleError(status);
diff --git a/pw_rpc/ts/client_test.ts b/pw_rpc/ts/client_test.ts
index 0dfed21b1..0535fddb9 100644
--- a/pw_rpc/ts/client_test.ts
+++ b/pw_rpc/ts/client_test.ts
@@ -14,21 +14,18 @@
/* eslint-env browser */
-import {Status} from 'pigweedjs/pw_status';
-import {MessageCreator} from 'pigweedjs/pw_protobuf_compiler';
-import {Message} from 'google-protobuf';
+import { Status } from 'pigweedjs/pw_status';
+import { MessageCreator } from 'pigweedjs/pw_protobuf_compiler';
+import { Message } from 'google-protobuf';
import {
PacketType,
RpcPacket,
} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
-import {ProtoCollection} from 'pigweedjs/protos/collection';
-import {
- Request,
- Response,
-} from 'pigweedjs/protos/pw_rpc/ts/test_pb';
+import { ProtoCollection } from 'pigweedjs/protos/collection';
+import { Request, Response } from 'pigweedjs/protos/pw_rpc/ts/test_pb';
-import {Client} from './client';
-import {Channel, Method} from './descriptors';
+import { Client } from './client';
+import { Channel, Method } from './descriptors';
import {
BidirectionalStreamingMethodStub,
ClientStreamingMethodStub,
@@ -77,10 +74,10 @@ describe('Client', () => {
const channel = client.channel()!;
expect(channel.methodStub('')).toBeUndefined();
expect(
- channel.methodStub('pw.rpc.test1.Garbage.SomeUnary')
+ channel.methodStub('pw.rpc.test1.Garbage.SomeUnary'),
).toBeUndefined();
expect(
- channel.methodStub('pw.rpc.test1.TheTestService.Garbage')
+ channel.methodStub('pw.rpc.test1.TheTestService.Garbage'),
).toBeUndefined();
});
@@ -134,7 +131,7 @@ describe('Client', () => {
const packet = packets.encodeResponse(
[1, service.id, method.id],
- new Request()
+ new Request(),
);
const status = client.processPacket(packet);
expect(client.processPacket(packet)).toEqual(Status.OK);
@@ -159,7 +156,12 @@ describe('RPC', () => {
beforeEach(async () => {
protoCollection = new ProtoCollection();
- const channels = [new Channel(1, handlePacket), new Channel(2, () => { })];
+ const channels = [
+ new Channel(1, handlePacket),
+ new Channel(2, () => {
+ // Do nothing.
+ }),
+ ];
client = Client.fromProtoSet(channels, protoCollection);
lastPacketSent = undefined;
requests = [];
@@ -185,7 +187,7 @@ describe('RPC', () => {
channelId: number,
method: Method,
status: Status,
- response?: Message
+ response?: Message,
) {
const packet = new RpcPacket();
packet.setType(PacketType.RESPONSE);
@@ -194,7 +196,7 @@ describe('RPC', () => {
packet.setMethodId(method.id);
packet.setStatus(status);
if (response === undefined) {
- packet.setPayload(new Uint8Array());
+ packet.setPayload(new Uint8Array(0));
} else {
packet.setPayload(response.serializeBinary());
}
@@ -205,7 +207,7 @@ describe('RPC', () => {
channelId: number,
method: Method,
response: Message,
- status: Status = Status.OK
+ status: Status = Status.OK,
) {
const packet = new RpcPacket();
packet.setType(PacketType.SERVER_STREAM);
@@ -221,7 +223,7 @@ describe('RPC', () => {
channelId: number,
method: Method,
status: Status,
- processStatus: Status
+ processStatus: Status,
) {
const packet = new RpcPacket();
packet.setType(PacketType.SERVER_ERROR);
@@ -276,8 +278,8 @@ describe('RPC', () => {
unaryStub = client
.channel()
?.methodStub(
- 'pw.rpc.test1.TheTestService.SomeUnary'
- )! as UnaryMethodStub;
+ 'pw.rpc.test1.TheTestService.SomeUnary',
+ ) as UnaryMethodStub;
});
it('blocking call', async () => {
@@ -286,7 +288,7 @@ describe('RPC', () => {
1,
unaryStub.method,
Status.ABORTED,
- newResponse('0_o')
+ newResponse('0_o'),
);
const [status, response] = await unaryStub.call(newRequest(6));
@@ -308,7 +310,7 @@ describe('RPC', () => {
newRequest(5),
onNext,
onCompleted,
- onError
+ onError,
);
expect(sentPayload(Request).getMagicNumber()).toEqual(5);
@@ -406,8 +408,8 @@ describe('RPC', () => {
serverStreaming = client
.channel()
?.methodStub(
- 'pw.rpc.test1.TheTestService.SomeServerStreaming'
- )! as ServerStreamingMethodStub;
+ 'pw.rpc.test1.TheTestService.SomeServerStreaming',
+ ) as ServerStreamingMethodStub;
});
it('non-blocking call', () => {
@@ -430,7 +432,7 @@ describe('RPC', () => {
expect(onCompleted).toHaveBeenCalledWith(Status.ABORTED);
expect(
- sentPayload(serverStreaming.method.requestType).getMagicNumber()
+ sentPayload(serverStreaming.method.requestType).getMagicNumber(),
).toEqual(4);
}
});
@@ -452,7 +454,7 @@ describe('RPC', () => {
newRequest(3),
onNext,
onCompleted,
- onError
+ onError,
);
expect(requests).toHaveLength(0);
@@ -507,8 +509,8 @@ describe('RPC', () => {
clientStreaming = client
.channel()
?.methodStub(
- 'pw.rpc.test1.TheTestService.SomeClientStreaming'
- )! as ClientStreamingMethodStub;
+ 'pw.rpc.test1.TheTestService.SomeClientStreaming',
+ ) as ClientStreamingMethodStub;
});
it('non-blocking call', () => {
@@ -584,7 +586,9 @@ describe('RPC', () => {
enqueueResponse(1, clientStreaming.method, Status.OK, testResponse);
stream.finishAndWait();
- expect(lastRequest().getType()).toEqual(PacketType.CLIENT_STREAM_END);
+ expect(lastRequest().getType()).toEqual(
+ PacketType.CLIENT_REQUEST_COMPLETION,
+ );
expect(onNext).toHaveBeenCalledWith(testResponse);
expect(stream.completed).toBe(true);
@@ -614,7 +618,7 @@ describe('RPC', () => {
1,
clientStreaming.method,
Status.INVALID_ARGUMENT,
- Status.OK
+ Status.OK,
);
stream.send(newRequest());
@@ -624,7 +628,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
});
}
@@ -633,12 +637,12 @@ describe('RPC', () => {
it('non-blocking call server error after stream end', async () => {
for (let i = 0; i < 3; i++) {
const stream = clientStreaming.invoke();
- // Error will be sent in response to the CLIENT_STREAM_END packet.
+ // Error will be sent in response to the CLIENT_REQUEST_COMPLETION packet.
enqueueError(
1,
clientStreaming.method,
Status.INVALID_ARGUMENT,
- Status.OK
+ Status.OK,
);
await stream
@@ -646,7 +650,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
});
}
@@ -659,8 +663,7 @@ describe('RPC', () => {
try {
stream.send(newRequest());
- }
- catch (e) {
+ } catch (e) {
console.log(e);
expect(e.status).toEqual(Status.CANCELLED);
}
@@ -675,7 +678,7 @@ describe('RPC', () => {
1,
clientStreaming.method,
Status.UNAVAILABLE,
- enqueuedResponse
+ enqueuedResponse,
);
const stream = clientStreaming.invoke();
@@ -696,7 +699,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.UNAVAILABLE);
expect(stream.error).toEqual(Status.UNAVAILABLE);
expect(stream.response).toBeUndefined();
@@ -721,8 +724,8 @@ describe('RPC', () => {
bidiStreaming = client
.channel()
?.methodStub(
- 'pw.rpc.test1.TheTestService.SomeBidiStreaming'
- )! as BidirectionalStreamingMethodStub;
+ 'pw.rpc.test1.TheTestService.SomeBidiStreaming',
+ ) as BidirectionalStreamingMethodStub;
});
it('blocking call', async () => {
@@ -745,7 +748,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.NOT_FOUND);
});
});
@@ -756,7 +759,7 @@ describe('RPC', () => {
for (let i = 0; i < 3; i++) {
const testResponses: Array<Message> = [];
- const stream = bidiStreaming.invoke(response => {
+ const stream = bidiStreaming.invoke((response) => {
testResponses.push(response);
});
expect(stream.completed).toBe(false);
@@ -825,7 +828,7 @@ describe('RPC', () => {
for (let i = 0; i < 3; i++) {
const testResponses: Array<Message> = [];
- const stream = bidiStreaming.invoke(response => {
+ const stream = bidiStreaming.invoke((response) => {
testResponses.push(response);
});
expect(stream.completed).toBe(false);
@@ -849,7 +852,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.OUT_OF_RANGE);
});
}
@@ -858,12 +861,12 @@ describe('RPC', () => {
for (let i = 0; i < 3; i++) {
const stream = bidiStreaming.invoke();
- // Error is sent in response to CLIENT_STREAM_END packet.
+ // Error is sent in response to CLIENT_REQUEST_COMPLETION packet.
enqueueError(
1,
bidiStreaming.method,
Status.INVALID_ARGUMENT,
- Status.OK
+ Status.OK,
);
await stream
@@ -871,7 +874,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.INVALID_ARGUMENT);
});
}
@@ -915,7 +918,7 @@ describe('RPC', () => {
.then(() => {
fail('Promise should not be resolved');
})
- .catch(reason => {
+ .catch((reason) => {
expect(reason.status).toEqual(Status.UNAVAILABLE);
expect(stream.error).toEqual(Status.UNAVAILABLE);
});
diff --git a/pw_rpc/ts/descriptors.ts b/pw_rpc/ts/descriptors.ts
index eb4581f3e..bd0d470de 100644
--- a/pw_rpc/ts/descriptors.ts
+++ b/pw_rpc/ts/descriptors.ts
@@ -12,13 +12,13 @@
// License for the specific language governing permissions and limitations under
// the License.
-import {ProtoCollection} from 'pigweedjs/pw_protobuf_compiler';
+import { ProtoCollection } from 'pigweedjs/pw_protobuf_compiler';
import {
MethodDescriptorProto,
ServiceDescriptorProto,
} from 'google-protobuf/google/protobuf/descriptor_pb';
-import {hash} from './hash';
+import { hash } from './hash';
interface ChannelOutput {
(data: Uint8Array): void;
@@ -28,7 +28,12 @@ export class Channel {
readonly id: number;
private output: ChannelOutput;
- constructor(id: number, output: ChannelOutput = () => {}) {
+ constructor(
+ id: number,
+ output: ChannelOutput = () => {
+ /* do nothing. */
+ },
+ ) {
this.id = id;
this.output = output;
}
@@ -48,7 +53,7 @@ export class Service {
constructor(
descriptor: ServiceDescriptorProto,
protoCollection: ProtoCollection,
- packageName: string
+ packageName: string,
) {
this.name = packageName + '.' + descriptor.getName()!;
this.id = hash(this.name);
@@ -83,7 +88,7 @@ export class Method {
constructor(
descriptor: MethodDescriptorProto,
protoCollection: ProtoCollection,
- service: Service
+ service: Service,
) {
this.name = descriptor.getName()!;
this.id = hash(this.name);
@@ -97,10 +102,10 @@ export class Method {
// Remove leading period if it exists.
this.requestType = protoCollection.getMessageCreator(
- requestTypePath.replace(/^\./, '')
+ requestTypePath.replace(/^\./, ''),
)!;
this.responseType = protoCollection.getMessageCreator(
- responseTypePath.replace(/^\./, '')
+ responseTypePath.replace(/^\./, ''),
)!;
}
diff --git a/pw_rpc/ts/descriptors_test.ts b/pw_rpc/ts/descriptors_test.ts
index a6b11e26a..c09d3d3fa 100644
--- a/pw_rpc/ts/descriptors_test.ts
+++ b/pw_rpc/ts/descriptors_test.ts
@@ -14,11 +14,8 @@
/* eslint-env browser */
-import {ProtoCollection} from 'pigweedjs/protos/collection';
-import {
- Request,
- Response,
-} from 'pigweedjs/protos/pw_rpc/ts/test_pb';
+import { ProtoCollection } from 'pigweedjs/protos/collection';
+import { Request, Response } from 'pigweedjs/protos/pw_rpc/ts/test_pb';
import * as descriptors from './descriptors';
@@ -27,13 +24,14 @@ const TEST_PROTO_PATH = 'pw_rpc/ts/test_protos-descriptor-set.proto.bin';
describe('Descriptors', () => {
it('parses from ServiceDescriptor binary', async () => {
const protoCollection = new ProtoCollection();
- const fd = protoCollection.fileDescriptorSet.getFileList()
- .find((file: any) => file.array[1].indexOf("pw.rpc.test1") !== -1);
+ const fd = protoCollection.fileDescriptorSet
+ .getFileList()
+ .find((file: any) => file.array[1].indexOf('pw.rpc.test1') !== -1);
const sd = fd.getServiceList()[0];
const service = new descriptors.Service(
sd,
protoCollection,
- fd.getPackage()!
+ fd.getPackage()!,
);
expect(service.name).toEqual('pw.rpc.test1.TheTestService');
diff --git a/pw_rpc/ts/method.ts b/pw_rpc/ts/method.ts
index 18cf6db07..f67d9cfb0 100644
--- a/pw_rpc/ts/method.ts
+++ b/pw_rpc/ts/method.ts
@@ -12,8 +12,8 @@
// License for the specific language governing permissions and limitations under
// the License.
-import {Status} from 'pigweedjs/pw_status';
-import {Message} from 'google-protobuf';
+import { Status } from 'pigweedjs/pw_status';
+import { Message } from 'google-protobuf';
import {
BidirectionalStreamingCall,
@@ -23,13 +23,13 @@ import {
ServerStreamingCall,
UnaryCall,
} from './call';
-import {Channel, Method, MethodType, Service} from './descriptors';
-import {PendingCalls, Rpc} from './rpc_classes';
+import { Channel, Method, MethodType, Service } from './descriptors';
+import { PendingCalls, Rpc } from './rpc_classes';
export function methodStubFactory(
rpcs: PendingCalls,
channel: Channel,
- method: Method
+ method: Method,
): MethodStub {
switch (method.type) {
case MethodType.BIDIRECTIONAL_STREAMING:
@@ -64,16 +64,22 @@ export abstract class MethodStub {
export class UnaryMethodStub extends MethodStub {
invoke(
request: Message,
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): UnaryCall {
const call = new UnaryCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke(request);
return call;
@@ -81,16 +87,22 @@ export class UnaryMethodStub extends MethodStub {
open(
request: Message,
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): UnaryCall {
const call = new UnaryCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke(request, true);
return call;
@@ -104,16 +116,22 @@ export class UnaryMethodStub extends MethodStub {
export class ServerStreamingMethodStub extends MethodStub {
invoke(
request?: Message,
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): ServerStreamingCall {
const call = new ServerStreamingCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke(request);
return call;
@@ -121,16 +139,22 @@ export class ServerStreamingMethodStub extends MethodStub {
open(
request: Message,
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): UnaryCall {
const call = new UnaryCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke(request, true);
return call;
@@ -143,32 +167,44 @@ export class ServerStreamingMethodStub extends MethodStub {
export class ClientStreamingMethodStub extends MethodStub {
invoke(
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): ClientStreamingCall {
const call = new ClientStreamingCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke();
return call;
}
open(
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): ClientStreamingCall {
const call = new ClientStreamingCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke(undefined, true);
return call;
@@ -181,32 +217,44 @@ export class ClientStreamingMethodStub extends MethodStub {
export class BidirectionalStreamingMethodStub extends MethodStub {
invoke(
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): BidirectionalStreamingCall {
const call = new BidirectionalStreamingCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke();
return call;
}
open(
- onNext: Callback = () => {},
- onCompleted: Callback = () => {},
- onError: Callback = () => {}
+ onNext: Callback = () => {
+ // Do nothing.
+ },
+ onCompleted: Callback = () => {
+ // Do nothing.
+ },
+ onError: Callback = () => {
+ // Do nothing.
+ },
): BidirectionalStreamingCall {
const call = new BidirectionalStreamingCall(
this.rpcs,
this.rpc,
onNext,
onCompleted,
- onError
+ onError,
);
call.invoke(undefined, true);
return call;
diff --git a/pw_rpc/ts/packets.ts b/pw_rpc/ts/packets.ts
index 68cf80393..b6cb71395 100644
--- a/pw_rpc/ts/packets.ts
+++ b/pw_rpc/ts/packets.ts
@@ -14,33 +14,34 @@
/** Functions for working with pw_rpc packets. */
-import {Message} from 'google-protobuf';
-import {MethodDescriptorProto} from 'google-protobuf/google/protobuf/descriptor_pb';
-import * as packetPb from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
-import {Status} from 'pigweedjs/pw_status';
+import { Message } from 'google-protobuf';
+import {
+ RpcPacket,
+ PacketType,
+} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
+import { Status } from 'pigweedjs/pw_status';
// Channel, Service, Method
type idSet = [number, number, number];
-export function decode(data: Uint8Array): packetPb.RpcPacket {
- return packetPb.RpcPacket.deserializeBinary(data);
+export function decode(data: Uint8Array): RpcPacket {
+ return RpcPacket.deserializeBinary(data);
}
-export function decodePayload(payload: Uint8Array, payloadType: any): Message {
- const message = payloadType.deserializeBinary(payload);
- return message;
+export function decodePayload(payload: Uint8Array, payloadType: any): any {
+ return payloadType['deserializeBinary'](payload);
}
-export function forServer(packet: packetPb.RpcPacket): boolean {
+export function forServer(packet: RpcPacket): boolean {
return packet.getType() % 2 == 0;
}
export function encodeClientError(
- packet: packetPb.RpcPacket,
- status: Status
+ packet: RpcPacket,
+ status: Status,
): Uint8Array {
- const errorPacket = new packetPb.RpcPacket();
- errorPacket.setType(packetPb.PacketType.CLIENT_ERROR);
+ const errorPacket = new RpcPacket();
+ errorPacket.setType(PacketType.CLIENT_ERROR);
errorPacket.setChannelId(packet.getChannelId());
errorPacket.setMethodId(packet.getMethodId());
errorPacket.setServiceId(packet.getServiceId());
@@ -49,18 +50,19 @@ export function encodeClientError(
}
export function encodeClientStream(ids: idSet, message: Message): Uint8Array {
- const streamPacket = new packetPb.RpcPacket();
- streamPacket.setType(packetPb.PacketType.CLIENT_STREAM);
+ const streamPacket = new RpcPacket();
+ streamPacket.setType(PacketType.CLIENT_STREAM);
streamPacket.setChannelId(ids[0]);
streamPacket.setServiceId(ids[1]);
streamPacket.setMethodId(ids[2]);
- streamPacket.setPayload(message.serializeBinary());
+ const msgSerialized = (message as any)['serializeBinary']();
+ streamPacket.setPayload(msgSerialized);
return streamPacket.serializeBinary();
}
export function encodeClientStreamEnd(ids: idSet): Uint8Array {
- const streamEnd = new packetPb.RpcPacket();
- streamEnd.setType(packetPb.PacketType.CLIENT_STREAM_END);
+ const streamEnd = new RpcPacket();
+ streamEnd.setType(PacketType.CLIENT_REQUEST_COMPLETION);
streamEnd.setChannelId(ids[0]);
streamEnd.setServiceId(ids[1]);
streamEnd.setMethodId(ids[2]);
@@ -70,11 +72,11 @@ export function encodeClientStreamEnd(ids: idSet): Uint8Array {
export function encodeRequest(ids: idSet, request?: Message): Uint8Array {
const payload: Uint8Array =
typeof request !== 'undefined'
- ? request.serializeBinary()
- : new Uint8Array();
+ ? (request as any)['serializeBinary']()
+ : new Uint8Array(0);
- const packet = new packetPb.RpcPacket();
- packet.setType(packetPb.PacketType.REQUEST);
+ const packet = new RpcPacket();
+ packet.setType(PacketType.REQUEST);
packet.setChannelId(ids[0]);
packet.setServiceId(ids[1]);
packet.setMethodId(ids[2]);
@@ -83,18 +85,19 @@ export function encodeRequest(ids: idSet, request?: Message): Uint8Array {
}
export function encodeResponse(ids: idSet, response: Message): Uint8Array {
- const packet = new packetPb.RpcPacket();
- packet.setType(packetPb.PacketType.RESPONSE);
+ const packet = new RpcPacket();
+ packet.setType(PacketType.RESPONSE);
packet.setChannelId(ids[0]);
packet.setServiceId(ids[1]);
packet.setMethodId(ids[2]);
- packet.setPayload(response.serializeBinary());
+ const msgSerialized = (response as any)['serializeBinary']();
+ packet.setPayload(msgSerialized);
return packet.serializeBinary();
}
export function encodeCancel(ids: idSet): Uint8Array {
- const packet = new packetPb.RpcPacket();
- packet.setType(packetPb.PacketType.CLIENT_ERROR);
+ const packet = new RpcPacket();
+ packet.setType(PacketType.CLIENT_ERROR);
packet.setStatus(Status.CANCELLED);
packet.setChannelId(ids[0]);
packet.setServiceId(ids[1]);
diff --git a/pw_rpc/ts/packets_test.ts b/pw_rpc/ts/packets_test.ts
index b399e2578..50cdb3f1a 100644
--- a/pw_rpc/ts/packets_test.ts
+++ b/pw_rpc/ts/packets_test.ts
@@ -17,7 +17,7 @@ import {
PacketType,
RpcPacket,
} from 'pigweedjs/protos/pw_rpc/internal/packet_pb';
-import {Status} from 'pigweedjs/pw_status';
+import { Status } from 'pigweedjs/pw_status';
import * as packets from './packets';
@@ -31,7 +31,9 @@ function addTestData(packet: RpcPacket) {
}
describe('Packets', () => {
- beforeEach(() => { });
+ beforeEach(() => {
+ // Do nothing.
+ });
it('encodeRequest sets packet fields', () => {
const goldenRequest = new RpcPacket();
@@ -95,7 +97,7 @@ describe('Packets', () => {
addTestData(request);
expect(request.toObject()).toEqual(
- packets.decode(request.serializeBinary()).toObject()
+ packets.decode(request.serializeBinary()).toObject(),
);
});
diff --git a/pw_rpc/ts/queue.ts b/pw_rpc/ts/queue.ts
index 0bd2b8c6c..44dd48cbd 100644
--- a/pw_rpc/ts/queue.ts
+++ b/pw_rpc/ts/queue.ts
@@ -32,7 +32,7 @@ export default class Queue<T> {
}
shift(): Promise<T> {
- return new Promise(resolve => {
+ return new Promise((resolve) => {
if (this.length > 0) {
return resolve(this.queue.shift()!);
} else {
diff --git a/pw_rpc/ts/rpc_classes.ts b/pw_rpc/ts/rpc_classes.ts
index 703612e19..83001e268 100644
--- a/pw_rpc/ts/rpc_classes.ts
+++ b/pw_rpc/ts/rpc_classes.ts
@@ -12,11 +12,11 @@
// License for the specific language governing permissions and limitations under
// the License.
-import {Message} from 'google-protobuf';
-import {Status} from 'pigweedjs/pw_status';
+import { Message } from 'google-protobuf';
+import { Status } from 'pigweedjs/pw_status';
-import {Call} from './call';
-import {Channel, Method, Service} from './descriptors';
+import { Call } from './call';
+import { Channel, Method, Service } from './descriptors';
import * as packets from './packets';
/** Data class for a pending RPC call. */
@@ -70,7 +70,7 @@ export class PendingCalls {
rpc: Rpc,
call: Call,
ignoreError: boolean,
- request?: Message
+ request?: Message,
): Call | undefined {
const previous = this.open(rpc, call);
const packet = packets.encodeRequest(rpc.idSet, request);