diff options
Diffstat (limited to 'pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java')
-rw-r--r-- | pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java | 59 |
1 files changed, 34 insertions, 25 deletions
diff --git a/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java b/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java index 64d934b6b..a62d77d76 100644 --- a/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java +++ b/pw_rpc/java/main/dev/pigweed/pw_rpc/Client.java @@ -14,12 +14,12 @@ package dev.pigweed.pw_rpc; -// import com.google.common.flogger.FluentLogger; import com.google.protobuf.ExtensionRegistryLite; import com.google.protobuf.InvalidProtocolBufferException; import com.google.protobuf.MessageLite; -import dev.pigweed.pw.rpc.internal.Packet.PacketType; -import dev.pigweed.pw.rpc.internal.Packet.RpcPacket; +import dev.pigweed.pw_log.Logger; +import dev.pigweed.pw_rpc.internal.Packet.PacketType; +import dev.pigweed.pw_rpc.internal.Packet.RpcPacket; import java.nio.ByteBuffer; import java.util.HashMap; import java.util.List; @@ -33,8 +33,7 @@ import javax.annotation.Nullable; * through the processPacket function. */ public class Client { - // TODO(pwbug/611): Restore logging without a mandatory Flogger dependency. - // private static final FluentLogger logger = FluentLogger.forEnclosingClass(); + private static final Logger logger = Logger.forClass(Client.class); private final Map<Integer, Channel> channels; private final Map<Integer, Service> services; @@ -81,17 +80,17 @@ public class Client { return create(channels, services, (rpc) -> new StreamObserver<MessageLite>() { @Override public void onNext(MessageLite value) { - // logger.atFine().log("%s received response: %s", rpc, value); + logger.atFine().log("%s received response: %s", rpc, value); } @Override public void onCompleted(Status status) { - // logger.atInfo().log("%s completed with status %s", rpc, status); + logger.atInfo().log("%s completed with status %s", rpc, status); } @Override public void onError(Status status) { - // logger.atWarning().log("%s terminated with error %s", rpc, status); + logger.atWarning().log("%s terminated with error %s", rpc, status); } }); } @@ -167,31 +166,30 @@ public class Client { try { packet = RpcPacket.parseFrom(data, ExtensionRegistryLite.getEmptyRegistry()); } catch (InvalidProtocolBufferException e) { - // logger.atWarning().withCause(e).log("Failed to decode packet"); + logger.atWarning().withCause(e).log("Failed to decode packet"); return false; } if (packet.getChannelId() == 0 || packet.getServiceId() == 0 || packet.getMethodId() == 0) { - // logger.atWarning().log("Received corrupt packet with unset IDs"); + logger.atWarning().log("Received corrupt packet with unset IDs"); return false; } // Packets for the server use even type values. if (packet.getTypeValue() % 2 == 0) { - // logger.atFine().log("Ignoring %s packet for server", packet.getType().name()); + logger.atFine().log("Ignoring %s packet for server", packet.getType().name()); return false; } Channel channel = channels.get(packet.getChannelId()); if (channel == null) { - // logger.atWarning().log( - // "Received packet for unrecognized channel %d", packet.getChannelId()); + logger.atWarning().log("Received packet for unrecognized channel %d", packet.getChannelId()); return false; } PendingRpc rpc = lookupRpc(channel, packet); if (rpc == null) { - // logger.atInfo().log("Ignoring packet for unknown service method"); + logger.atInfo().log("Ignoring packet for unknown service method"); sendError(channel, packet, Status.NOT_FOUND); return true; // true since the packet was handled, even though it was invalid. } @@ -200,31 +198,42 @@ public class Client { StreamObserverCall<?, ?> call = packet.getType().equals(PacketType.SERVER_STREAM) ? rpcs.getPending(rpc) : rpcs.clear(rpc); if (call == null) { - // logger.atInfo().log( - // "Ignoring packet for RPC (%s) that isn't pending. Pending RPCs are: %s", rpc, rpcs); + logger.atFine().log( + "Ignoring packet for %s, which isn't pending. Pending RPCs are %s", rpc, rpcs); sendError(channel, packet, Status.FAILED_PRECONDITION); return true; } switch (packet.getType()) { - case SERVER_ERROR: + case SERVER_ERROR: { Status status = decodeStatus(packet); - // logger.atWarning().log("RPC %s failed with error %s", rpc, status); + logger.atWarning().log("%s failed with error %s", rpc, status); call.onError(status); break; - case RESPONSE: + } + case RESPONSE: { + Status status = decodeStatus(packet); // Server streaming an unary RPCs include a payload with their response packet. if (!rpc.method().isServerStreaming()) { + logger.atFiner().log("%s completed with status %s and %d B payload", + rpc, + status, + packet.getPayload().size()); call.onNext(packet.getPayload()); + } else { + logger.atFiner().log("%s completed with status %s", rpc, status); } - call.onCompleted(decodeStatus(packet)); + call.onCompleted(status); break; + } case SERVER_STREAM: + logger.atFiner().log( + "%s received server stream with %d B payload", rpc, packet.getPayload().size()); call.onNext(packet.getPayload()); break; default: - // logger.atWarning().log( - // "Unexpected PacketType %d for RPC %s", packet.getType().getNumber(), rpc); + logger.atWarning().log( + "%s received unexpected PacketType %d", rpc, packet.getType().getNumber()); } return true; @@ -234,7 +243,7 @@ public class Client { try { channel.send(Packets.error(packet, status)); } catch (ChannelOutputException e) { - // logger.atWarning().withCause(e).log("Failed to send error packet"); + logger.atWarning().withCause(e).log("Failed to send error packet"); } } @@ -254,8 +263,8 @@ public class Client { private static Status decodeStatus(RpcPacket packet) { Status status = Status.fromCode(packet.getStatus()); if (status == null) { - // logger.atWarning().log( - // "Illegal status code %d in packet; using Status.UNKNOWN ", packet.getStatus()); + logger.atWarning().log( + "Illegal status code %d in packet; using Status.UNKNOWN ", packet.getStatus()); return Status.UNKNOWN; } return status; |