summaryrefslogtreecommitdiff
path: root/platform/script-debugger/backend/src/org/jetbrains/rpc/MessageManager.java
blob: f7a29497e1c211cbbe77db95f9d5224854fea473 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
package org.jetbrains.rpc;

import com.intellij.openapi.diagnostic.Logger;
import com.intellij.util.containers.ConcurrentIntObjectMap;
import com.intellij.util.containers.StripedLockIntObjectConcurrentHashMap;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
import java.util.Arrays;

/**
 * @param <OUTGOING> type of outgoing message
 * @param <INCOMING> type of incoming message
 * @param <INCOMING_WITH_SEQ> type of incoming message that is a command (has sequence number)
 */
public final class MessageManager<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> {
  public static final Logger LOG = Logger.getInstance(MessageManager.class);

  private final ConcurrentIntObjectMap<AsyncResultCallback<SUCCESS, ERROR_DETAILS>> callbackMap = new StripedLockIntObjectConcurrentHashMap<AsyncResultCallback<SUCCESS, ERROR_DETAILS>>();
  private final Handler<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> handler;

  private volatile boolean closed;

  public MessageManager(Handler<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> handler) {
    this.handler = handler;
  }

  public interface Handler<OUTGOING, INCOMING, INCOMING_WITH_SEQ, SUCCESS, ERROR_DETAILS> {
    int getUpdatedSequence(OUTGOING message);

    boolean write(@NotNull OUTGOING message) throws IOException;

    INCOMING_WITH_SEQ readIfHasSequence(INCOMING incoming);

    int getSequence(INCOMING_WITH_SEQ incomingWithSeq);

    void acceptNonSequence(INCOMING incoming);

    void call(INCOMING_WITH_SEQ response, AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback);
  }

  public void send(@NotNull OUTGOING message, @NotNull AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback) {
    if (closed) {
      callback.onError("Connection is closed", null);
      return;
    }

    int sequence = handler.getUpdatedSequence(message);
    callbackMap.put(sequence, callback);
    doSend(message, sequence);
  }

  private void doSend(@NotNull OUTGOING message, int sequence) {
    boolean success;
    try {
      success = handler.write(message);
    }
    catch (Throwable e) {
      try {
        failedToSend(sequence);
      }
      finally {
        LOG.error("Failed to send", e);
      }
      return;
    }

    if (!success) {
      failedToSend(sequence);
    }
  }

  private void failedToSend(int sequence) {
    AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = callbackMap.remove(sequence);
    if (callback != null) {
      callback.onError("Failed to send", null);
    }
  }

  public void processIncoming(INCOMING incomingParsed) {
    INCOMING_WITH_SEQ commandResponse = handler.readIfHasSequence(incomingParsed);
    if (commandResponse == null) {
      if (closed) {
        // just ignore
        LOG.info("Connection closed, ignore incoming");
      }
      else {
        handler.acceptNonSequence(incomingParsed);
      }
      return;
    }

    AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = getCallbackAndRemove(handler.getSequence(commandResponse));
    try {
      if (closed) {
        callback.onError("Connection closed", null);
      }
      else {
        handler.call(commandResponse, callback);
      }
    }
    catch (Throwable e) {
      callback.onError("Failed to dispatch response to callback", null);
      LOG.error("Failed to dispatch response to callback", e);
    }
  }

  public AsyncResultCallback<SUCCESS, ERROR_DETAILS> getCallbackAndRemove(int id) {
    AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = callbackMap.remove(id);
    if (callback == null) {
      throw new IllegalArgumentException("Cannot find callback with id " + id);
    }
    return callback;
  }

  public void closed() {
    closed = true;
  }

  public void cancelWaitingRequests() {
    // we should call them in the order they have been submitted
    ConcurrentIntObjectMap<AsyncResultCallback<SUCCESS, ERROR_DETAILS>> map = callbackMap;
    int[] keys = map.keys();
    Arrays.sort(keys);
    for (int key : keys) {
      try {
        AsyncResultCallback<SUCCESS, ERROR_DETAILS> callback = map.get(key);
        if (callback != null) {
          callback.onError("Connection closed", null);
        }
      }
      catch (Throwable e) {
        LOG.error("Failed to reject callback on connection closed", e);
      }
    }
  }
}