aboutsummaryrefslogtreecommitdiff
path: root/pw_rpc/java/main/dev/pigweed/pw_rpc/RpcManager.java
blob: 12e6db1a657b194f907845e3f034b2a35a6424f8 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
// Copyright 2021 The Pigweed Authors
//
// Licensed under the Apache License, Version 2.0 (the "License"); you may not
// use this file except in compliance with the License. You may obtain a copy of
// the License at
//
//     https://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations under
// the License.

package dev.pigweed.pw_rpc;

// import com.google.common.flogger.FluentLogger;
import com.google.protobuf.MessageLite;
import java.util.HashMap;
import java.util.Map;
import javax.annotation.Nullable;

/** Tracks the state of service method invocations. */
public class RpcManager {
  // TODO(pwbug/611): Restore logging without a mandatory Flogger dependency.
  // private static final FluentLogger logger = FluentLogger.forEnclosingClass();
  private final Map<PendingRpc, StreamObserverCall<?, ?>> pending = new HashMap<>();

  /**
   * Invokes an RPC.
   *
   * @param rpc channel / service / method tuple that unique identifies this RPC
   * @param call object for this RPC
   * @param payload the request
   */
  @Nullable
  public synchronized StreamObserverCall<?, ?> start(
      PendingRpc rpc, StreamObserverCall<?, ?> call, @Nullable MessageLite payload)
      throws ChannelOutputException {
    // logger.atFine().log("Start %s", rpc);
    rpc.channel().send(Packets.request(rpc, payload));
    return pending.put(rpc, call);
  }

  /**
   * Invokes an RPC, but ignores errors and keeps the RPC active if the invocation fails.
   *
   * <p>The RPC remains open until it is closed by the server (either with a response or error
   * packet) or cancelled.
   */
  @Nullable
  public synchronized StreamObserverCall<?, ?> open(
      PendingRpc rpc, StreamObserverCall<?, ?> call, @Nullable MessageLite payload) {
    // logger.atFine().log("Open %s", rpc);
    try {
      rpc.channel().send(Packets.request(rpc, payload));
    } catch (ChannelOutputException e) {
      // logger.atFine().withCause(e).log(
      //    "Ignoring error opening %s; listening for unrequested responses", rpc);
    }
    return pending.put(rpc, call);
  }

  /** Cancels an ongoing RPC */
  @Nullable
  public synchronized StreamObserverCall<?, ?> cancel(PendingRpc rpc)
      throws ChannelOutputException {
    StreamObserverCall<?, ?> call = pending.remove(rpc);
    if (call != null) {
      // logger.atFine().log("Cancel %s", rpc);
      rpc.channel().send(Packets.cancel(rpc));
    }
    return call;
  }

  @Nullable
  public synchronized StreamObserverCall<?, ?> clientStream(PendingRpc rpc, MessageLite payload)
      throws ChannelOutputException {
    StreamObserverCall<?, ?> call = pending.get(rpc);
    if (call != null) {
      rpc.channel().send(Packets.clientStream(rpc, payload));
    }
    return call;
  }

  @Nullable
  public synchronized StreamObserverCall<?, ?> clientStreamEnd(PendingRpc rpc)
      throws ChannelOutputException {
    StreamObserverCall<?, ?> call = pending.get(rpc);
    if (call != null) {
      rpc.channel().send(Packets.clientStreamEnd(rpc));
    }
    return call;
  }

  @Nullable
  public synchronized StreamObserverCall<?, ?> clear(PendingRpc rpc) {
    StreamObserverCall<?, ?> call = pending.remove(rpc);
    if (call != null) {
      // logger.atFine().log("Clear %s", rpc);
    }
    return call;
  }

  @Nullable
  public synchronized StreamObserverCall<?, ?> getPending(PendingRpc rpc) {
    return pending.get(rpc);
  }

  @Override
  public synchronized String toString() {
    return "RpcManager{"
        + "pending=" + pending + '}';
  }
}