aboutsummaryrefslogtreecommitdiff
path: root/android/guava/src/com/google/common/util/concurrent/ListenerCallQueue.java
blob: e6284e1c7ded930da34ac96e7c52196058acc272 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
/*
 * Copyright (C) 2014 The Guava Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except
 * in compliance with the License. You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software distributed under the License
 * is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
 * or implied. See the License for the specific language governing permissions and limitations under
 * the License.
 */

package com.google.common.util.concurrent;

import static com.google.common.base.Preconditions.checkNotNull;

import com.google.common.annotations.GwtIncompatible;
import com.google.common.annotations.J2ktIncompatible;
import com.google.common.base.Preconditions;
import com.google.common.collect.Queues;
import com.google.errorprone.annotations.concurrent.GuardedBy;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Queue;
import java.util.concurrent.Executor;
import java.util.logging.Level;

/**
 * A list of listeners for implementing a concurrency friendly observable object.
 *
 * <p>Listeners are registered once via {@link #addListener} and then may be invoked by {@linkplain
 * #enqueue enqueueing} and then {@linkplain #dispatch dispatching} events.
 *
 * <p>The API of this class is designed to make it easy to achieve the following properties
 *
 * <ul>
 *   <li>Multiple events for the same listener are never dispatched concurrently.
 *   <li>Events for the different listeners are dispatched concurrently.
 *   <li>All events for a given listener dispatch on the provided {@link #executor}.
 *   <li>It is easy for the user to ensure that listeners are never invoked while holding locks.
 * </ul>
 *
 * The last point is subtle. Often the observable object will be managing its own internal state
 * using a lock, however it is dangerous to dispatch listeners while holding a lock because they
 * might run on the {@code directExecutor()} or be otherwise re-entrant (call back into your
 * object). So it is important to not call {@link #dispatch} while holding any locks. This is why
 * {@link #enqueue} and {@link #dispatch} are 2 different methods. It is expected that the decision
 * to run a particular event is made during the state change, but the decision to actually invoke
 * the listeners can be delayed slightly so that locks can be dropped. Also, because {@link
 * #dispatch} is expected to be called concurrently, it is idempotent.
 */
@J2ktIncompatible
@GwtIncompatible
@ElementTypesAreNonnullByDefault
final class ListenerCallQueue<L> {
  // TODO(cpovirk): consider using the logger associated with listener.getClass().
  private static final LazyLogger logger = new LazyLogger(ListenerCallQueue.class);

  // TODO(chrisn): promote AppendOnlyCollection for use here.
  private final List<PerListenerQueue<L>> listeners =
      Collections.synchronizedList(new ArrayList<PerListenerQueue<L>>());

  /** Method reference-compatible listener event. */
  interface Event<L> {
    /** Call a method on the listener. */
    void call(L listener);
  }

  /**
   * Adds a listener that will be called using the given executor when events are later {@link
   * #enqueue enqueued} and {@link #dispatch dispatched}.
   */
  public void addListener(L listener, Executor executor) {
    checkNotNull(listener, "listener");
    checkNotNull(executor, "executor");
    listeners.add(new PerListenerQueue<>(listener, executor));
  }

  /**
   * Enqueues an event to be run on currently known listeners.
   *
   * <p>The {@code toString} method of the Event itself will be used to describe the event in the
   * case of an error.
   *
   * @param event the callback to execute on {@link #dispatch}
   */
  public void enqueue(Event<L> event) {
    enqueueHelper(event, event);
  }

  /**
   * Enqueues an event to be run on currently known listeners, with a label.
   *
   * @param event the callback to execute on {@link #dispatch}
   * @param label a description of the event to use in the case of an error
   */
  public void enqueue(Event<L> event, String label) {
    enqueueHelper(event, label);
  }

  private void enqueueHelper(Event<L> event, Object label) {
    checkNotNull(event, "event");
    checkNotNull(label, "label");
    synchronized (listeners) {
      for (PerListenerQueue<L> queue : listeners) {
        queue.add(event, label);
      }
    }
  }

  /**
   * Dispatches all events enqueued prior to this call, serially and in order, for every listener.
   *
   * <p>Note: this method is idempotent and safe to call from any thread
   */
  public void dispatch() {
    // iterate by index to avoid concurrent modification exceptions
    for (int i = 0; i < listeners.size(); i++) {
      listeners.get(i).dispatch();
    }
  }

  /**
   * A special purpose queue/executor that dispatches listener events serially on a configured
   * executor. Each event can be added and dispatched as separate phases.
   *
   * <p>This class is very similar to {@link SequentialExecutor} with the exception that events can
   * be added without necessarily executing immediately.
   */
  private static final class PerListenerQueue<L> implements Runnable {
    final L listener;
    final Executor executor;

    @GuardedBy("this")
    final Queue<ListenerCallQueue.Event<L>> waitQueue = Queues.newArrayDeque();

    @GuardedBy("this")
    final Queue<Object> labelQueue = Queues.newArrayDeque();

    @GuardedBy("this")
    boolean isThreadScheduled;

    PerListenerQueue(L listener, Executor executor) {
      this.listener = checkNotNull(listener);
      this.executor = checkNotNull(executor);
    }

    /** Enqueues an event to be run. */
    synchronized void add(ListenerCallQueue.Event<L> event, Object label) {
      waitQueue.add(event);
      labelQueue.add(label);
    }

    /**
     * Dispatches all listeners {@linkplain #enqueue enqueued} prior to this call, serially and in
     * order.
     */
    @SuppressWarnings("CatchingUnchecked") // sneaky checked exception
    void dispatch() {
      boolean scheduleEventRunner = false;
      synchronized (this) {
        if (!isThreadScheduled) {
          isThreadScheduled = true;
          scheduleEventRunner = true;
        }
      }
      if (scheduleEventRunner) {
        try {
          executor.execute(this);
        } catch (Exception e) { // sneaky checked exception
          // reset state in case of an error so that later dispatch calls will actually do something
          synchronized (this) {
            isThreadScheduled = false;
          }
          // Log it and keep going.
          logger
              .get()
              .log(
                  Level.SEVERE,
                  "Exception while running callbacks for " + listener + " on " + executor,
                  e);
          throw e;
        }
      }
    }

    @Override
    @SuppressWarnings("CatchingUnchecked") // sneaky checked exception
    public void run() {
      boolean stillRunning = true;
      try {
        while (true) {
          ListenerCallQueue.Event<L> nextToRun;
          Object nextLabel;
          synchronized (PerListenerQueue.this) {
            Preconditions.checkState(isThreadScheduled);
            nextToRun = waitQueue.poll();
            nextLabel = labelQueue.poll();
            if (nextToRun == null) {
              isThreadScheduled = false;
              stillRunning = false;
              break;
            }
          }

          // Always run while _not_ holding the lock, to avoid deadlocks.
          try {
            nextToRun.call(listener);
          } catch (Exception e) { // sneaky checked exception
            // Log it and keep going.
            logger
                .get()
                .log(
                    Level.SEVERE,
                    "Exception while executing callback: " + listener + " " + nextLabel,
                    e);
          }
        }
      } finally {
        if (stillRunning) {
          // An Error is bubbling up. We should mark ourselves as no longer running. That way, if
          // anyone tries to keep using us, we won't be corrupted.
          synchronized (PerListenerQueue.this) {
            isThreadScheduled = false;
          }
        }
      }
    }
  }
}