aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java
diff options
context:
space:
mode:
Diffstat (limited to 'pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java')
-rw-r--r--pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java59
1 files changed, 34 insertions, 25 deletions
diff --git a/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java b/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java
index 64d934b6b..a62d77d76 100644
--- a/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java
+++ b/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java
@@ -14,12 +14,12 @@
package dev.pigweed.pw_rpc;
-// import com.google.common.flogger.FluentLogger;
import com.google.protobuf.ExtensionRegistryLite;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.MessageLite;
-import dev.pigweed.pw.rpc.internal.Packet.PacketType;
-import dev.pigweed.pw.rpc.internal.Packet.RpcPacket;
+import dev.pigweed.pw_log.Logger;
+import dev.pigweed.pw_rpc.internal.Packet.PacketType;
+import dev.pigweed.pw_rpc.internal.Packet.RpcPacket;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.List;
@@ -33,8 +33,7 @@ import javax.annotation.Nullable;
* through the processPacket function.
*/
public class Client {
- // TODO(pwbug/611): Restore logging without a mandatory Flogger dependency.
- // private static final FluentLogger logger = FluentLogger.forEnclosingClass();
+ private static final Logger logger = Logger.forClass(Client.class);
private final Map<Integer, Channel> channels;
private final Map<Integer, Service> services;
@@ -81,17 +80,17 @@ public class Client {
return create(channels, services, (rpc) -> new StreamObserver<MessageLite>() {
@Override
public void onNext(MessageLite value) {
- // logger.atFine().log("%s received response: %s", rpc, value);
+ logger.atFine().log("%s received response: %s", rpc, value);
}
@Override
public void onCompleted(Status status) {
- // logger.atInfo().log("%s completed with status %s", rpc, status);
+ logger.atInfo().log("%s completed with status %s", rpc, status);
}
@Override
public void onError(Status status) {
- // logger.atWarning().log("%s terminated with error %s", rpc, status);
+ logger.atWarning().log("%s terminated with error %s", rpc, status);
}
});
}
@@ -167,31 +166,30 @@ public class Client {
try {
packet = RpcPacket.parseFrom(data, ExtensionRegistryLite.getEmptyRegistry());
} catch (InvalidProtocolBufferException e) {
- // logger.atWarning().withCause(e).log("Failed to decode packet");
+ logger.atWarning().withCause(e).log("Failed to decode packet");
return false;
}
if (packet.getChannelId() == 0 || packet.getServiceId() == 0 || packet.getMethodId() == 0) {
- // logger.atWarning().log("Received corrupt packet with unset IDs");
+ logger.atWarning().log("Received corrupt packet with unset IDs");
return false;
}
// Packets for the server use even type values.
if (packet.getTypeValue() % 2 == 0) {
- // logger.atFine().log("Ignoring %s packet for server", packet.getType().name());
+ logger.atFine().log("Ignoring %s packet for server", packet.getType().name());
return false;
}
Channel channel = channels.get(packet.getChannelId());
if (channel == null) {
- // logger.atWarning().log(
- // "Received packet for unrecognized channel %d", packet.getChannelId());
+ logger.atWarning().log("Received packet for unrecognized channel %d", packet.getChannelId());
return false;
}
PendingRpc rpc = lookupRpc(channel, packet);
if (rpc == null) {
- // logger.atInfo().log("Ignoring packet for unknown service method");
+ logger.atInfo().log("Ignoring packet for unknown service method");
sendError(channel, packet, Status.NOT_FOUND);
return true; // true since the packet was handled, even though it was invalid.
}
@@ -200,31 +198,42 @@ public class Client {
StreamObserverCall<?, ?> call =
packet.getType().equals(PacketType.SERVER_STREAM) ? rpcs.getPending(rpc) : rpcs.clear(rpc);
if (call == null) {
- // logger.atInfo().log(
- // "Ignoring packet for RPC (%s) that isn't pending. Pending RPCs are: %s", rpc, rpcs);
+ logger.atFine().log(
+ "Ignoring packet for %s, which isn't pending. Pending RPCs are %s", rpc, rpcs);
sendError(channel, packet, Status.FAILED_PRECONDITION);
return true;
}
switch (packet.getType()) {
- case SERVER_ERROR:
+ case SERVER_ERROR: {
Status status = decodeStatus(packet);
- // logger.atWarning().log("RPC %s failed with error %s", rpc, status);
+ logger.atWarning().log("%s failed with error %s", rpc, status);
call.onError(status);
break;
- case RESPONSE:
+ }
+ case RESPONSE: {
+ Status status = decodeStatus(packet);
// Server streaming an unary RPCs include a payload with their response packet.
if (!rpc.method().isServerStreaming()) {
+ logger.atFiner().log("%s completed with status %s and %d B payload",
+ rpc,
+ status,
+ packet.getPayload().size());
call.onNext(packet.getPayload());
+ } else {
+ logger.atFiner().log("%s completed with status %s", rpc, status);
}
- call.onCompleted(decodeStatus(packet));
+ call.onCompleted(status);
break;
+ }
case SERVER_STREAM:
+ logger.atFiner().log(
+ "%s received server stream with %d B payload", rpc, packet.getPayload().size());
call.onNext(packet.getPayload());
break;
default:
- // logger.atWarning().log(
- // "Unexpected PacketType %d for RPC %s", packet.getType().getNumber(), rpc);
+ logger.atWarning().log(
+ "%s received unexpected PacketType %d", rpc, packet.getType().getNumber());
}
return true;
@@ -234,7 +243,7 @@ public class Client {
try {
channel.send(Packets.error(packet, status));
} catch (ChannelOutputException e) {
- // logger.atWarning().withCause(e).log("Failed to send error packet");
+ logger.atWarning().withCause(e).log("Failed to send error packet");
}
}
@@ -254,8 +263,8 @@ public class Client {
private static Status decodeStatus(RpcPacket packet) {
Status status = Status.fromCode(packet.getStatus());
if (status == null) {
- // logger.atWarning().log(
- // "Illegal status code %d in packet; using Status.UNKNOWN ", packet.getStatus());
+ logger.atWarning().log(
+ "Illegal status code %d in packet; using Status.UNKNOWN ", packet.getStatus());
return Status.UNKNOWN;
}
return status;