aboutsummaryrefslogtreecommitdiff
path: root/src/com/kenai/jbosh/BOSHClient.java
diff options
context:
space:
mode:
Diffstat (limited to 'src/com/kenai/jbosh/BOSHClient.java')
-rw-r--r--src/com/kenai/jbosh/BOSHClient.java1536
1 files changed, 1536 insertions, 0 deletions
diff --git a/src/com/kenai/jbosh/BOSHClient.java b/src/com/kenai/jbosh/BOSHClient.java
new file mode 100644
index 0000000..b96d188
--- /dev/null
+++ b/src/com/kenai/jbosh/BOSHClient.java
@@ -0,0 +1,1536 @@
+/*
+ * Copyright 2009 Mike Cumings
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.kenai.jbosh;
+
+import com.kenai.jbosh.ComposableBody.Builder;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedSet;
+import java.util.TreeSet;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.Executors;
+import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * BOSH Client session instance. Each communication session with a remote
+ * connection manager is represented and handled by an instance of this
+ * class. This is the main entry point for client-side communications.
+ * To create a new session, a client configuration must first be created
+ * and then used to create a client instance:
+ * <pre>
+ * BOSHClientConfig cfg = BOSHClientConfig.Builder.create(
+ * "http://server:1234/httpbind", "jabber.org")
+ * .setFrom("user@jabber.org")
+ * .build();
+ * BOSHClient client = BOSHClient.create(cfg);
+ * </pre>
+ * Additional client configuration options are available. See the
+ * {@code BOSHClientConfig.Builder} class for more information.
+ * <p/>
+ * Once a {@code BOSHClient} instance has been created, communication with
+ * the remote connection manager can begin. No attempt will be made to
+ * establish a connection to the connection manager until the first call
+ * is made to the {@code send(ComposableBody)} method. Note that it is
+ * possible to send an empty body to cause an immediate connection attempt
+ * to the connection manager. Sending an empty message would look like
+ * the following:
+ * <pre>
+ * client.send(ComposableBody.builder().build());
+ * </pre>
+ * For more information on creating body messages with content, see the
+ * {@code ComposableBody.Builder} class documentation.
+ * <p/>
+ * Once a session has been successfully started, the client instance can be
+ * used to send arbitrary payload data. All aspects of the BOSH
+ * protocol involving setting and processing attributes in the BOSH
+ * namespace will be handled by the client code transparently and behind the
+ * scenes. The user of the client instance can therefore concentrate
+ * entirely on the content of the message payload, leaving the semantics of
+ * the BOSH protocol to the client implementation.
+ * <p/>
+ * To be notified of incoming messages from the remote connection manager,
+ * a {@code BOSHClientResponseListener} should be added to the client instance.
+ * All incoming messages will be published to all response listeners as they
+ * arrive and are processed. As with the transmission of payload data via
+ * the {@code send(ComposableBody)} method, there is no need to worry about
+ * handling of the BOSH attributes, since this is handled behind the scenes.
+ * <p/>
+ * If the connection to the remote connection manager is terminated (either
+ * explicitly or due to a terminal condition of some sort), all connection
+ * listeners will be notified. After the connection has been closed, the
+ * client instance is considered dead and a new one must be created in order
+ * to resume communications with the remote server.
+ * <p/>
+ * Instances of this class are thread-safe.
+ *
+ * @see BOSHClientConfig.Builder
+ * @see BOSHClientResponseListener
+ * @see BOSHClientConnListener
+ * @see ComposableBody.Builder
+ */
+public final class BOSHClient {
+
+ /**
+ * Logger.
+ */
+ private static final Logger LOG = Logger.getLogger(
+ BOSHClient.class.getName());
+
+ /**
+ * Value of the 'type' attribute used for session termination.
+ */
+ private static final String TERMINATE = "terminate";
+
+ /**
+ * Value of the 'type' attribute used for recoverable errors.
+ */
+ private static final String ERROR = "error";
+
+ /**
+ * Message to use for interrupted exceptions.
+ */
+ private static final String INTERRUPTED = "Interrupted";
+
+ /**
+ * Message used for unhandled exceptions.
+ */
+ private static final String UNHANDLED = "Unhandled Exception";
+
+ /**
+ * Message used whena null listener is detected.
+ */
+ private static final String NULL_LISTENER = "Listener may not b enull";
+
+ /**
+ * Default empty request delay.
+ */
+ private static final int DEFAULT_EMPTY_REQUEST_DELAY = 100;
+
+ /**
+ * Amount of time to wait before sending an empty request, in
+ * milliseconds.
+ */
+ private static final int EMPTY_REQUEST_DELAY = Integer.getInteger(
+ BOSHClient.class.getName() + ".emptyRequestDelay",
+ DEFAULT_EMPTY_REQUEST_DELAY);
+
+ /**
+ * Default value for the pause margin.
+ */
+ private static final int DEFAULT_PAUSE_MARGIN = 500;
+
+ /**
+ * The amount of time in milliseconds which will be reserved as a
+ * safety margin when scheduling empty requests against a maxpause
+ * value. This should give us enough time to build the message
+ * and transport it to the remote host.
+ */
+ private static final int PAUSE_MARGIN = Integer.getInteger(
+ BOSHClient.class.getName() + ".pauseMargin",
+ DEFAULT_PAUSE_MARGIN);
+
+ /**
+ * Flag indicating whether or not we want to perform assertions.
+ */
+ private static final boolean ASSERTIONS;
+
+ /**
+ * Connection listeners.
+ */
+ private final Set<BOSHClientConnListener> connListeners =
+ new CopyOnWriteArraySet<BOSHClientConnListener>();
+
+ /**
+ * Request listeners.
+ */
+ private final Set<BOSHClientRequestListener> requestListeners =
+ new CopyOnWriteArraySet<BOSHClientRequestListener>();
+
+ /**
+ * Response listeners.
+ */
+ private final Set<BOSHClientResponseListener> responseListeners =
+ new CopyOnWriteArraySet<BOSHClientResponseListener>();
+
+ /**
+ * Lock instance.
+ */
+ private final ReentrantLock lock = new ReentrantLock();
+
+ /**
+ * Condition indicating that there are messages to be exchanged.
+ */
+ private final Condition notEmpty = lock.newCondition();
+
+ /**
+ * Condition indicating that there are available slots for sending
+ * messages.
+ */
+ private final Condition notFull = lock.newCondition();
+
+ /**
+ * Condition indicating that there are no outstanding connections.
+ */
+ private final Condition drained = lock.newCondition();
+
+ /**
+ * Session configuration.
+ */
+ private final BOSHClientConfig cfg;
+
+ /**
+ * Processor thread runnable instance.
+ */
+ private final Runnable procRunnable = new Runnable() {
+ /**
+ * Process incoming messages.
+ */
+ public void run() {
+ processMessages();
+ }
+ };
+
+ /**
+ * Processor thread runnable instance.
+ */
+ private final Runnable emptyRequestRunnable = new Runnable() {
+ /**
+ * Process incoming messages.
+ */
+ public void run() {
+ sendEmptyRequest();
+ }
+ };
+
+ /**
+ * HTTPSender instance.
+ */
+ private final HTTPSender httpSender =
+ new ApacheHTTPSender();
+
+ /**
+ * Storage for test hook implementation.
+ */
+ private final AtomicReference<ExchangeInterceptor> exchInterceptor =
+ new AtomicReference<ExchangeInterceptor>();
+
+ /**
+ * Request ID sequence to use for the session.
+ */
+ private final RequestIDSequence requestIDSeq = new RequestIDSequence();
+
+ /**
+ * ScheduledExcecutor to use for deferred tasks.
+ */
+ private final ScheduledExecutorService schedExec =
+ Executors.newSingleThreadScheduledExecutor();
+
+ /************************************************************
+ * The following vars must be accessed via the lock instance.
+ */
+
+ /**
+ * Thread which is used to process responses from the connection
+ * manager. Becomes null when session is terminated.
+ */
+ private Thread procThread;
+
+ /**
+ * Future for sending a deferred empty request, if needed.
+ */
+ private ScheduledFuture emptyRequestFuture;
+
+ /**
+ * Connection Manager session parameters. Only available when in a
+ * connected state.
+ */
+ private CMSessionParams cmParams;
+
+ /**
+ * List of active/outstanding requests.
+ */
+ private Queue<HTTPExchange> exchanges = new LinkedList<HTTPExchange>();
+
+ /**
+ * Set of RIDs which have been received, for the purpose of sending
+ * response acknowledgements.
+ */
+ private SortedSet<Long> pendingResponseAcks = new TreeSet<Long>();
+
+ /**
+ * The highest RID that we've already received a response for. This value
+ * is used to implement response acks.
+ */
+ private Long responseAck = Long.valueOf(-1L);
+
+ /**
+ * List of requests which have been made but not yet acknowledged. This
+ * list remains unpopulated if the CM is not acking requests.
+ */
+ private List<ComposableBody> pendingRequestAcks =
+ new ArrayList<ComposableBody>();
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Classes:
+
+ /**
+ * Class used in testing to dynamically manipulate received exchanges
+ * at test runtime.
+ */
+ abstract static class ExchangeInterceptor {
+ /**
+ * Limit construction.
+ */
+ ExchangeInterceptor() {
+ // Empty;
+ }
+
+ /**
+ * Hook to manipulate an HTTPExchange as is is about to be processed.
+ *
+ * @param exch original exchange that would be processed
+ * @return replacement exchange instance, or {@code null} to skip
+ * processing of this exchange
+ */
+ abstract HTTPExchange interceptExchange(final HTTPExchange exch);
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Constructors:
+
+ /**
+ * Determine whether or not we should perform assertions. Assertions
+ * can be specified via system property explicitly, or defaulted to
+ * the JVM assertions status.
+ */
+ static {
+ final String prop =
+ BOSHClient.class.getSimpleName() + ".assertionsEnabled";
+ boolean enabled = false;
+ if (System.getProperty(prop) == null) {
+ assert enabled = true;
+ } else {
+ enabled = Boolean.getBoolean(prop);
+ }
+ ASSERTIONS = enabled;
+ }
+
+ /**
+ * Prevent direct construction.
+ */
+ private BOSHClient(final BOSHClientConfig sessCfg) {
+ cfg = sessCfg;
+ init();
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Public methods:
+
+ /**
+ * Create a new BOSH client session using the client configuration
+ * information provided.
+ *
+ * @param clientCfg session configuration
+ * @return BOSH session instance
+ */
+ public static BOSHClient create(final BOSHClientConfig clientCfg) {
+ if (clientCfg == null) {
+ throw(new IllegalArgumentException(
+ "Client configuration may not be null"));
+ }
+ return new BOSHClient(clientCfg);
+ }
+
+ /**
+ * Get the client configuration that was used to create this client
+ * instance.
+ *
+ * @return client configuration
+ */
+ public BOSHClientConfig getBOSHClientConfig() {
+ return cfg;
+ }
+
+ /**
+ * Adds a connection listener to the session.
+ *
+ * @param listener connection listener to add, if not already added
+ */
+ public void addBOSHClientConnListener(
+ final BOSHClientConnListener listener) {
+ if (listener == null) {
+ throw(new IllegalArgumentException(NULL_LISTENER));
+ }
+ connListeners.add(listener);
+ }
+
+ /**
+ * Removes a connection listener from the session.
+ *
+ * @param listener connection listener to remove, if previously added
+ */
+ public void removeBOSHClientConnListener(
+ final BOSHClientConnListener listener) {
+ if (listener == null) {
+ throw(new IllegalArgumentException(NULL_LISTENER));
+ }
+ connListeners.remove(listener);
+ }
+
+ /**
+ * Adds a request message listener to the session.
+ *
+ * @param listener request listener to add, if not already added
+ */
+ public void addBOSHClientRequestListener(
+ final BOSHClientRequestListener listener) {
+ if (listener == null) {
+ throw(new IllegalArgumentException(NULL_LISTENER));
+ }
+ requestListeners.add(listener);
+ }
+
+ /**
+ * Removes a request message listener from the session, if previously
+ * added.
+ *
+ * @param listener instance to remove
+ */
+ public void removeBOSHClientRequestListener(
+ final BOSHClientRequestListener listener) {
+ if (listener == null) {
+ throw(new IllegalArgumentException(NULL_LISTENER));
+ }
+ requestListeners.remove(listener);
+ }
+
+ /**
+ * Adds a response message listener to the session.
+ *
+ * @param listener response listener to add, if not already added
+ */
+ public void addBOSHClientResponseListener(
+ final BOSHClientResponseListener listener) {
+ if (listener == null) {
+ throw(new IllegalArgumentException(NULL_LISTENER));
+ }
+ responseListeners.add(listener);
+ }
+
+ /**
+ * Removes a response message listener from the session, if previously
+ * added.
+ *
+ * @param listener instance to remove
+ */
+ public void removeBOSHClientResponseListener(
+ final BOSHClientResponseListener listener) {
+ if (listener == null) {
+ throw(new IllegalArgumentException(NULL_LISTENER));
+ }
+ responseListeners.remove(listener);
+ }
+
+ /**
+ * Send the provided message data to the remote connection manager. The
+ * provided message body does not need to have any BOSH-specific attribute
+ * information set. It only needs to contain the actual message payload
+ * that should be delivered to the remote server.
+ * <p/>
+ * The first call to this method will result in a connection attempt
+ * to the remote connection manager. Subsequent calls to this method
+ * will block until the underlying session state allows for the message
+ * to be transmitted. In certain scenarios - such as when the maximum
+ * number of outbound connections has been reached - calls to this method
+ * will block for short periods of time.
+ *
+ * @param body message data to send to remote server
+ * @throws BOSHException on message transmission failure
+ */
+ public void send(final ComposableBody body) throws BOSHException {
+ assertUnlocked();
+ if (body == null) {
+ throw(new IllegalArgumentException(
+ "Message body may not be null"));
+ }
+
+ HTTPExchange exch;
+ CMSessionParams params;
+ lock.lock();
+ try {
+ blockUntilSendable(body);
+ if (!isWorking() && !isTermination(body)) {
+ throw(new BOSHException(
+ "Cannot send message when session is closed"));
+ }
+
+ long rid = requestIDSeq.getNextRID();
+ ComposableBody request = body;
+ params = cmParams;
+ if (params == null && exchanges.isEmpty()) {
+ // This is the first message being sent
+ request = applySessionCreationRequest(rid, body);
+ } else {
+ request = applySessionData(rid, body);
+ if (cmParams.isAckingRequests()) {
+ pendingRequestAcks.add(request);
+ }
+ }
+ exch = new HTTPExchange(request);
+ exchanges.add(exch);
+ notEmpty.signalAll();
+ clearEmptyRequest();
+ } finally {
+ lock.unlock();
+ }
+ AbstractBody finalReq = exch.getRequest();
+ HTTPResponse resp = httpSender.send(params, finalReq);
+ exch.setHTTPResponse(resp);
+ fireRequestSent(finalReq);
+ }
+
+ /**
+ * Attempt to pause the current session. When supported by the remote
+ * connection manager, pausing the session will result in the connection
+ * manager closing out all outstanding requests (including the pause
+ * request) and increases the inactivity timeout of the session. The
+ * exact value of the temporary timeout is dependent upon the connection
+ * manager. This method should be used if a client encounters an
+ * exceptional temporary situation during which it will be unable to send
+ * requests to the connection manager for a period of time greater than
+ * the maximum inactivity period.
+ *
+ * The session will revert back to it's normal, unpaused state when the
+ * client sends it's next message.
+ *
+ * @return {@code true} if the connection manager supports session pausing,
+ * {@code false} if the connection manager does not support session
+ * pausing or if the session has not yet been established
+ */
+ public boolean pause() {
+ assertUnlocked();
+ lock.lock();
+ AttrMaxPause maxPause = null;
+ try {
+ if (cmParams == null) {
+ return false;
+ }
+
+ maxPause = cmParams.getMaxPause();
+ if (maxPause == null) {
+ return false;
+ }
+ } finally {
+ lock.unlock();
+ }
+ try {
+ send(ComposableBody.builder()
+ .setAttribute(Attributes.PAUSE, maxPause.toString())
+ .build());
+ } catch (BOSHException boshx) {
+ LOG.log(Level.FINEST, "Could not send pause", boshx);
+ }
+ return true;
+ }
+
+ /**
+ * End the BOSH session by disconnecting from the remote BOSH connection
+ * manager.
+ *
+ * @throws BOSHException when termination message cannot be sent
+ */
+ public void disconnect() throws BOSHException {
+ disconnect(ComposableBody.builder().build());
+ }
+
+ /**
+ * End the BOSH session by disconnecting from the remote BOSH connection
+ * manager, sending the provided content in the final connection
+ * termination message.
+ *
+ * @param msg final message to send
+ * @throws BOSHException when termination message cannot be sent
+ */
+ public void disconnect(final ComposableBody msg) throws BOSHException {
+ if (msg == null) {
+ throw(new IllegalArgumentException(
+ "Message body may not be null"));
+ }
+
+ Builder builder = msg.rebuild();
+ builder.setAttribute(Attributes.TYPE, TERMINATE);
+ send(builder.build());
+ }
+
+ /**
+ * Forcibly close this client session instance. The preferred mechanism
+ * to close the connection is to send a disconnect message and wait for
+ * organic termination. Calling this method simply shuts down the local
+ * session without sending a termination message, releasing all resources
+ * associated with the session.
+ */
+ public void close() {
+ dispose(new BOSHException("Session explicitly closed by caller"));
+ }
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Package-private methods:
+
+ /**
+ * Get the current CM session params.
+ *
+ * @return current session params, or {@code null}
+ */
+ CMSessionParams getCMSessionParams() {
+ lock.lock();
+ try {
+ return cmParams;
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Wait until no more messages are waiting to be processed.
+ */
+ void drain() {
+ lock.lock();
+ try {
+ LOG.finest("Waiting while draining...");
+ while (isWorking()
+ && (emptyRequestFuture == null
+ || emptyRequestFuture.isDone())) {
+ try {
+ drained.await();
+ } catch (InterruptedException intx) {
+ LOG.log(Level.FINEST, INTERRUPTED, intx);
+ }
+ }
+ LOG.finest("Drained");
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Test method used to forcibly discard next exchange.
+ *
+ * @param interceptor exchange interceptor
+ */
+ void setExchangeInterceptor(final ExchangeInterceptor interceptor) {
+ exchInterceptor.set(interceptor);
+ }
+
+
+ ///////////////////////////////////////////////////////////////////////////
+ // Private methods:
+
+ /**
+ * Initialize the session. This initializes the underlying HTTP
+ * transport implementation and starts the receive thread.
+ */
+ private void init() {
+ assertUnlocked();
+
+ lock.lock();
+ try {
+ httpSender.init(cfg);
+ procThread = new Thread(procRunnable);
+ procThread.setDaemon(true);
+ procThread.setName(BOSHClient.class.getSimpleName()
+ + "[" + System.identityHashCode(this)
+ + "]: Receive thread");
+ procThread.start();
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Destroy this session.
+ *
+ * @param cause the reason for the session termination, or {@code null}
+ * for normal termination
+ */
+ private void dispose(final Throwable cause) {
+ assertUnlocked();
+
+ lock.lock();
+ try {
+ if (procThread == null) {
+ // Already disposed
+ return;
+ }
+ procThread = null;
+ } finally {
+ lock.unlock();
+ }
+
+ if (cause == null) {
+ fireConnectionClosed();
+ } else {
+ fireConnectionClosedOnError(cause);
+ }
+
+ lock.lock();
+ try {
+ clearEmptyRequest();
+ exchanges = null;
+ cmParams = null;
+ pendingResponseAcks = null;
+ pendingRequestAcks = null;
+ notEmpty.signalAll();
+ notFull.signalAll();
+ drained.signalAll();
+ } finally {
+ lock.unlock();
+ }
+
+ httpSender.destroy();
+ schedExec.shutdownNow();
+ }
+
+ /**
+ * Determines if the message body specified indicates a request to
+ * pause the session.
+ *
+ * @param msg message to evaluate
+ * @return {@code true} if the message is a pause request, {@code false}
+ * otherwise
+ */
+ private static boolean isPause(final AbstractBody msg) {
+ return msg.getAttribute(Attributes.PAUSE) != null;
+ }
+
+ /**
+ * Determines if the message body specified indicates a termination of
+ * the session.
+ *
+ * @param msg message to evaluate
+ * @return {@code true} if the message is a session termination,
+ * {@code false} otherwise
+ */
+ private static boolean isTermination(final AbstractBody msg) {
+ return TERMINATE.equals(msg.getAttribute(Attributes.TYPE));
+ }
+
+ /**
+ * Evaluates the HTTP response code and response message and returns the
+ * terminal binding condition that it describes, if any.
+ *
+ * @param respCode HTTP response code
+ * @param respBody response body
+ * @return terminal binding condition, or {@code null} if not a terminal
+ * binding condition message
+ */
+ private TerminalBindingCondition getTerminalBindingCondition(
+ final int respCode,
+ final AbstractBody respBody) {
+ assertLocked();
+
+ if (isTermination(respBody)) {
+ String str = respBody.getAttribute(Attributes.CONDITION);
+ return TerminalBindingCondition.forString(str);
+ }
+ // Check for deprecated HTTP Error Conditions
+ if (cmParams != null && cmParams.getVersion() == null) {
+ return TerminalBindingCondition.forHTTPResponseCode(respCode);
+ }
+ return null;
+ }
+
+ /**
+ * Determines if the message specified is immediately sendable or if it
+ * needs to block until the session state changes.
+ *
+ * @param msg message to evaluate
+ * @return {@code true} if the message can be immediately sent,
+ * {@code false} otherwise
+ */
+ private boolean isImmediatelySendable(final AbstractBody msg) {
+ assertLocked();
+
+ if (cmParams == null) {
+ // block if we're waiting for a response to our first request
+ return exchanges.isEmpty();
+ }
+
+ AttrRequests requests = cmParams.getRequests();
+ if (requests == null) {
+ return true;
+ }
+ int maxRequests = requests.intValue();
+ if (exchanges.size() < maxRequests) {
+ return true;
+ }
+ if (exchanges.size() == maxRequests
+ && (isTermination(msg) || isPause(msg))) {
+ // One additional terminate or pause message is allowed
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Determines whether or not the session is still active.
+ *
+ * @return {@code true} if it is, {@code false} otherwise
+ */
+ private boolean isWorking() {
+ assertLocked();
+
+ return procThread != null;
+ }
+
+ /**
+ * Blocks until either the message provided becomes immediately
+ * sendable or until the session is terminated.
+ *
+ * @param msg message to evaluate
+ */
+ private void blockUntilSendable(final AbstractBody msg) {
+ assertLocked();
+
+ while (isWorking() && !isImmediatelySendable(msg)) {
+ try {
+ notFull.await();
+ } catch (InterruptedException intx) {
+ LOG.log(Level.FINEST, INTERRUPTED, intx);
+ }
+ }
+ }
+
+ /**
+ * Modifies the specified body message such that it becomes a new
+ * BOSH session creation request.
+ *
+ * @param rid request ID to use
+ * @param orig original body to modify
+ * @return modified message which acts as a session creation request
+ */
+ private ComposableBody applySessionCreationRequest(
+ final long rid, final ComposableBody orig) throws BOSHException {
+ assertLocked();
+
+ Builder builder = orig.rebuild();
+ builder.setAttribute(Attributes.TO, cfg.getTo());
+ builder.setAttribute(Attributes.XML_LANG, cfg.getLang());
+ builder.setAttribute(Attributes.VER,
+ AttrVersion.getSupportedVersion().toString());
+ builder.setAttribute(Attributes.WAIT, "60");
+ builder.setAttribute(Attributes.HOLD, "1");
+ builder.setAttribute(Attributes.RID, Long.toString(rid));
+ applyRoute(builder);
+ applyFrom(builder);
+ builder.setAttribute(Attributes.ACK, "1");
+
+ // Make sure the following are NOT present (i.e., during retries)
+ builder.setAttribute(Attributes.SID, null);
+ return builder.build();
+ }
+
+ /**
+ * Applies routing information to the request message who's builder has
+ * been provided.
+ *
+ * @param builder builder instance to add routing information to
+ */
+ private void applyRoute(final Builder builder) {
+ assertLocked();
+
+ String route = cfg.getRoute();
+ if (route != null) {
+ builder.setAttribute(Attributes.ROUTE, route);
+ }
+ }
+
+ /**
+ * Applies the local station ID information to the request message who's
+ * builder has been provided.
+ *
+ * @param builder builder instance to add station ID information to
+ */
+ private void applyFrom(final Builder builder) {
+ assertLocked();
+
+ String from = cfg.getFrom();
+ if (from != null) {
+ builder.setAttribute(Attributes.FROM, from);
+ }
+ }
+
+ /**
+ * Applies existing session data to the outbound request, returning the
+ * modified request.
+ *
+ * This method assumes the lock is currently held.
+ *
+ * @param rid request ID to use
+ * @param orig original/raw request
+ * @return modified request with session information applied
+ */
+ private ComposableBody applySessionData(
+ final long rid,
+ final ComposableBody orig) throws BOSHException {
+ assertLocked();
+
+ Builder builder = orig.rebuild();
+ builder.setAttribute(Attributes.SID,
+ cmParams.getSessionID().toString());
+ builder.setAttribute(Attributes.RID, Long.toString(rid));
+ applyResponseAcknowledgement(builder, rid);
+ return builder.build();
+ }
+
+ /**
+ * Sets the 'ack' attribute of the request to the value of the highest
+ * 'rid' of a request for which it has already received a response in the
+ * case where it has also received all responses associated with lower
+ * 'rid' values. The only exception is that, after its session creation
+ * request, the client SHOULD NOT include an 'ack' attribute in any request
+ * if it has received responses to all its previous requests.
+ *
+ * @param builder message builder
+ * @param rid current request RID
+ */
+ private void applyResponseAcknowledgement(
+ final Builder builder,
+ final long rid) {
+ assertLocked();
+
+ if (responseAck.equals(Long.valueOf(-1L))) {
+ // We have not received any responses yet
+ return;
+ }
+
+ Long prevRID = Long.valueOf(rid - 1L);
+ if (responseAck.equals(prevRID)) {
+ // Implicit ack
+ return;
+ }
+
+ builder.setAttribute(Attributes.ACK, responseAck.toString());
+ }
+
+ /**
+ * While we are "connected", process received responses.
+ *
+ * This method is run in the processing thread.
+ */
+ private void processMessages() {
+ LOG.log(Level.FINEST, "Processing thread starting");
+ try {
+ HTTPExchange exch;
+ do {
+ exch = nextExchange();
+ if (exch == null) {
+ break;
+ }
+
+ // Test hook to manipulate what the client sees:
+ ExchangeInterceptor interceptor = exchInterceptor.get();
+ if (interceptor != null) {
+ HTTPExchange newExch = interceptor.interceptExchange(exch);
+ if (newExch == null) {
+ LOG.log(Level.FINE, "Discarding exchange on request "
+ + "of test hook: RID="
+ + exch.getRequest().getAttribute(
+ Attributes.RID));
+ lock.lock();
+ try {
+ exchanges.remove(exch);
+ } finally {
+ lock.unlock();
+ }
+ continue;
+ }
+ exch = newExch;
+ }
+
+ processExchange(exch);
+ } while (true);
+ } finally {
+ LOG.log(Level.FINEST, "Processing thread exiting");
+ }
+
+ }
+
+ /**
+ * Get the next message exchange to process, blocking until one becomes
+ * available if nothing is already waiting for processing.
+ *
+ * @return next available exchange to process, or {@code null} if no
+ * exchanges are immediately available
+ */
+ private HTTPExchange nextExchange() {
+ assertUnlocked();
+
+ final Thread thread = Thread.currentThread();
+ HTTPExchange exch = null;
+ lock.lock();
+ try {
+ do {
+ if (!thread.equals(procThread)) {
+ break;
+ }
+ exch = exchanges.peek();
+ if (exch == null) {
+ try {
+ notEmpty.await();
+ } catch (InterruptedException intx) {
+ LOG.log(Level.FINEST, INTERRUPTED, intx);
+ }
+ }
+ } while (exch == null);
+ } finally {
+ lock.unlock();
+ }
+ return exch;
+ }
+
+ /**
+ * Process the next, provided exchange. This is the main processing
+ * method of the receive thread.
+ *
+ * @param exch message exchange to process
+ */
+ private void processExchange(final HTTPExchange exch) {
+ assertUnlocked();
+
+ HTTPResponse resp;
+ AbstractBody body;
+ int respCode;
+ try {
+ resp = exch.getHTTPResponse();
+ body = resp.getBody();
+ respCode = resp.getHTTPStatus();
+ } catch (BOSHException boshx) {
+ LOG.log(Level.FINEST, "Could not obtain response", boshx);
+ dispose(boshx);
+ return;
+ } catch (InterruptedException intx) {
+ LOG.log(Level.FINEST, INTERRUPTED, intx);
+ dispose(intx);
+ return;
+ }
+ fireResponseReceived(body);
+
+ // Process the message with the current session state
+ AbstractBody req = exch.getRequest();
+ CMSessionParams params;
+ List<HTTPExchange> toResend = null;
+ lock.lock();
+ try {
+ // Check for session creation response info, if needed
+ if (cmParams == null) {
+ cmParams = CMSessionParams.fromSessionInit(req, body);
+
+ // The following call handles the lock. It's not an escape.
+ fireConnectionEstablished();
+ }
+ params = cmParams;
+
+ checkForTerminalBindingConditions(body, respCode);
+ if (isTermination(body)) {
+ // Explicit termination
+ lock.unlock();
+ dispose(null);
+ return;
+ }
+
+ if (isRecoverableBindingCondition(body)) {
+ // Retransmit outstanding requests
+ if (toResend == null) {
+ toResend = new ArrayList<HTTPExchange>(exchanges.size());
+ }
+ for (HTTPExchange exchange : exchanges) {
+ HTTPExchange resendExch =
+ new HTTPExchange(exchange.getRequest());
+ toResend.add(resendExch);
+ }
+ for (HTTPExchange exchange : toResend) {
+ exchanges.add(exchange);
+ }
+ } else {
+ // Process message as normal
+ processRequestAcknowledgements(req, body);
+ processResponseAcknowledgementData(req);
+ HTTPExchange resendExch =
+ processResponseAcknowledgementReport(body);
+ if (resendExch != null && toResend == null) {
+ toResend = new ArrayList<HTTPExchange>(1);
+ toResend.add(resendExch);
+ exchanges.add(resendExch);
+ }
+ }
+ } catch (BOSHException boshx) {
+ LOG.log(Level.FINEST, "Could not process response", boshx);
+ lock.unlock();
+ dispose(boshx);
+ return;
+ } finally {
+ if (lock.isHeldByCurrentThread()) {
+ try {
+ exchanges.remove(exch);
+ if (exchanges.isEmpty()) {
+ scheduleEmptyRequest(processPauseRequest(req));
+ }
+ notFull.signalAll();
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ if (toResend != null) {
+ for (HTTPExchange resend : toResend) {
+ HTTPResponse response =
+ httpSender.send(params, resend.getRequest());
+ resend.setHTTPResponse(response);
+ fireRequestSent(resend.getRequest());
+ }
+ }
+ }
+
+ /**
+ * Clears any scheduled empty requests.
+ */
+ private void clearEmptyRequest() {
+ assertLocked();
+
+ if (emptyRequestFuture != null) {
+ emptyRequestFuture.cancel(false);
+ emptyRequestFuture = null;
+ }
+ }
+
+ /**
+ * Calculates the default empty request delay/interval to use for the
+ * active session.
+ *
+ * @return delay in milliseconds
+ */
+ private long getDefaultEmptyRequestDelay() {
+ assertLocked();
+
+ // Figure out how long we should wait before sending an empty request
+ AttrPolling polling = cmParams.getPollingInterval();
+ long delay;
+ if (polling == null) {
+ delay = EMPTY_REQUEST_DELAY;
+ } else {
+ delay = polling.getInMilliseconds();
+ }
+ return delay;
+ }
+
+ /**
+ * Schedule an empty request to be sent if no other requests are
+ * sent in a reasonable amount of time.
+ */
+ private void scheduleEmptyRequest(long delay) {
+ assertLocked();
+ if (delay < 0L) {
+ throw(new IllegalArgumentException(
+ "Empty request delay must be >= 0 (was: " + delay + ")"));
+ }
+
+ clearEmptyRequest();
+ if (!isWorking()) {
+ return;
+ }
+
+ // Schedule the transmission
+ if (LOG.isLoggable(Level.FINER)) {
+ LOG.finer("Scheduling empty request in " + delay + "ms");
+ }
+ try {
+ emptyRequestFuture = schedExec.schedule(emptyRequestRunnable,
+ delay, TimeUnit.MILLISECONDS);
+ } catch (RejectedExecutionException rex) {
+ LOG.log(Level.FINEST, "Could not schedule empty request", rex);
+ }
+ drained.signalAll();
+ }
+
+ /**
+ * Sends an empty request to maintain session requirements. If a request
+ * is sent within a reasonable time window, the empty request transmission
+ * will be cancelled.
+ */
+ private void sendEmptyRequest() {
+ assertUnlocked();
+ // Send an empty request
+ LOG.finest("Sending empty request");
+ try {
+ send(ComposableBody.builder().build());
+ } catch (BOSHException boshx) {
+ dispose(boshx);
+ }
+ }
+
+ /**
+ * Assert that the internal lock is held.
+ */
+ private void assertLocked() {
+ if (ASSERTIONS) {
+ if (!lock.isHeldByCurrentThread()) {
+ throw(new AssertionError("Lock is not held by current thread"));
+ }
+ return;
+ }
+ }
+
+ /**
+ * Assert that the internal lock is *not* held.
+ */
+ private void assertUnlocked() {
+ if (ASSERTIONS) {
+ if (lock.isHeldByCurrentThread()) {
+ throw(new AssertionError("Lock is held by current thread"));
+ }
+ return;
+ }
+ }
+
+ /**
+ * Checks to see if the response indicates a terminal binding condition
+ * (as per XEP-0124 section 17). If it does, an exception is thrown.
+ *
+ * @param body response body to evaluate
+ * @param code HTTP response code
+ * @throws BOSHException if a terminal binding condition is detected
+ */
+ private void checkForTerminalBindingConditions(
+ final AbstractBody body,
+ final int code)
+ throws BOSHException {
+ TerminalBindingCondition cond =
+ getTerminalBindingCondition(code, body);
+ if (cond != null) {
+ throw(new BOSHException(
+ "Terminal binding condition encountered: "
+ + cond.getCondition() + " ("
+ + cond.getMessage() + ")"));
+ }
+ }
+
+ /**
+ * Determines whether or not the response indicates a recoverable
+ * binding condition (as per XEP-0124 section 17).
+ *
+ * @param resp response body
+ * @return {@code true} if it does, {@code false} otherwise
+ */
+ private static boolean isRecoverableBindingCondition(
+ final AbstractBody resp) {
+ return ERROR.equals(resp.getAttribute(Attributes.TYPE));
+ }
+
+ /**
+ * Process the request to determine if the empty request delay
+ * can be determined by looking to see if the request is a pause
+ * request. If it can, the request's delay is returned, otherwise
+ * the default delay is returned.
+ *
+ * @return delay in milliseconds that should elapse prior to an
+ * empty message being sent
+ */
+ private long processPauseRequest(
+ final AbstractBody req) {
+ assertLocked();
+
+ if (cmParams != null && cmParams.getMaxPause() != null) {
+ try {
+ AttrPause pause = AttrPause.createFromString(
+ req.getAttribute(Attributes.PAUSE));
+ if (pause != null) {
+ long delay = pause.getInMilliseconds() - PAUSE_MARGIN;
+ if (delay < 0) {
+ delay = EMPTY_REQUEST_DELAY;
+ }
+ return delay;
+ }
+ } catch (BOSHException boshx) {
+ LOG.log(Level.FINEST, "Could not extract", boshx);
+ }
+ }
+
+ return getDefaultEmptyRequestDelay();
+ }
+
+ /**
+ * Check the response for request acknowledgements and take appropriate
+ * action.
+ *
+ * This method assumes the lock is currently held.
+ *
+ * @param req request
+ * @param resp response
+ */
+ private void processRequestAcknowledgements(
+ final AbstractBody req, final AbstractBody resp) {
+ assertLocked();
+
+ if (!cmParams.isAckingRequests()) {
+ return;
+ }
+
+ // If a report or time attribute is set, we aren't acking anything
+ if (resp.getAttribute(Attributes.REPORT) != null) {
+ return;
+ }
+
+ // Figure out what the highest acked RID is
+ String acked = resp.getAttribute(Attributes.ACK);
+ Long ackUpTo;
+ if (acked == null) {
+ // Implicit ack of all prior requests up until RID
+ ackUpTo = Long.parseLong(req.getAttribute(Attributes.RID));
+ } else {
+ ackUpTo = Long.parseLong(acked);
+ }
+
+ // Remove the acked requests from the list
+ if (LOG.isLoggable(Level.FINEST)) {
+ LOG.finest("Removing pending acks up to: " + ackUpTo);
+ }
+ Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
+ while (iter.hasNext()) {
+ AbstractBody pending = iter.next();
+ Long pendingRID = Long.parseLong(
+ pending.getAttribute(Attributes.RID));
+ if (pendingRID.compareTo(ackUpTo) <= 0) {
+ iter.remove();
+ }
+ }
+ }
+
+ /**
+ * Process the response in order to update the response acknowlegement
+ * data.
+ *
+ * This method assumes the lock is currently held.
+ *
+ * @param req request
+ */
+ private void processResponseAcknowledgementData(
+ final AbstractBody req) {
+ assertLocked();
+
+ Long rid = Long.parseLong(req.getAttribute(Attributes.RID));
+ if (responseAck.equals(Long.valueOf(-1L))) {
+ // This is the first request
+ responseAck = rid;
+ } else {
+ pendingResponseAcks.add(rid);
+ // Remove up until the first missing response (or end of queue)
+ Long whileVal = responseAck;
+ while (whileVal.equals(pendingResponseAcks.first())) {
+ responseAck = whileVal;
+ pendingResponseAcks.remove(whileVal);
+ whileVal = Long.valueOf(whileVal.longValue() + 1);
+ }
+ }
+ }
+
+ /**
+ * Process the response in order to check for and respond to any potential
+ * ack reports.
+ *
+ * This method assumes the lock is currently held.
+ *
+ * @param resp response
+ * @return exchange to transmit if a resend is to be performed, or
+ * {@code null} if no resend is necessary
+ * @throws BOSHException when a a retry is needed but cannot be performed
+ */
+ private HTTPExchange processResponseAcknowledgementReport(
+ final AbstractBody resp)
+ throws BOSHException {
+ assertLocked();
+
+ String reportStr = resp.getAttribute(Attributes.REPORT);
+ if (reportStr == null) {
+ // No report on this message
+ return null;
+ }
+
+ Long report = Long.parseLong(reportStr);
+ Long time = Long.parseLong(resp.getAttribute(Attributes.TIME));
+ if (LOG.isLoggable(Level.FINE)) {
+ LOG.fine("Received report of missing request (RID="
+ + report + ", time=" + time + "ms)");
+ }
+
+ // Find the missing request
+ Iterator<ComposableBody> iter = pendingRequestAcks.iterator();
+ AbstractBody req = null;
+ while (iter.hasNext() && req == null) {
+ AbstractBody pending = iter.next();
+ Long pendingRID = Long.parseLong(
+ pending.getAttribute(Attributes.RID));
+ if (report.equals(pendingRID)) {
+ req = pending;
+ }
+ }
+
+ if (req == null) {
+ throw(new BOSHException("Report of missing message with RID '"
+ + reportStr
+ + "' but local copy of that request was not found"));
+ }
+
+ // Resend the missing request
+ HTTPExchange exch = new HTTPExchange(req);
+ exchanges.add(exch);
+ notEmpty.signalAll();
+ return exch;
+ }
+
+ /**
+ * Notifies all request listeners that the specified request is being
+ * sent.
+ *
+ * @param request request being sent
+ */
+ private void fireRequestSent(final AbstractBody request) {
+ assertUnlocked();
+
+ BOSHMessageEvent event = null;
+ for (BOSHClientRequestListener listener : requestListeners) {
+ if (event == null) {
+ event = BOSHMessageEvent.createRequestSentEvent(this, request);
+ }
+ try {
+ listener.requestSent(event);
+ } catch (Exception ex) {
+ LOG.log(Level.WARNING, UNHANDLED, ex);
+ }
+ }
+ }
+
+ /**
+ * Notifies all response listeners that the specified response has been
+ * received.
+ *
+ * @param response response received
+ */
+ private void fireResponseReceived(final AbstractBody response) {
+ assertUnlocked();
+
+ BOSHMessageEvent event = null;
+ for (BOSHClientResponseListener listener : responseListeners) {
+ if (event == null) {
+ event = BOSHMessageEvent.createResponseReceivedEvent(
+ this, response);
+ }
+ try {
+ listener.responseReceived(event);
+ } catch (Exception ex) {
+ LOG.log(Level.WARNING, UNHANDLED, ex);
+ }
+ }
+ }
+
+ /**
+ * Notifies all connection listeners that the session has been successfully
+ * established.
+ */
+ private void fireConnectionEstablished() {
+ final boolean hadLock = lock.isHeldByCurrentThread();
+ if (hadLock) {
+ lock.unlock();
+ }
+ try {
+ BOSHClientConnEvent event = null;
+ for (BOSHClientConnListener listener : connListeners) {
+ if (event == null) {
+ event = BOSHClientConnEvent
+ .createConnectionEstablishedEvent(this);
+ }
+ try {
+ listener.connectionEvent(event);
+ } catch (Exception ex) {
+ LOG.log(Level.WARNING, UNHANDLED, ex);
+ }
+ }
+ } finally {
+ if (hadLock) {
+ lock.lock();
+ }
+ }
+ }
+
+ /**
+ * Notifies all connection listeners that the session has been
+ * terminated normally.
+ */
+ private void fireConnectionClosed() {
+ assertUnlocked();
+
+ BOSHClientConnEvent event = null;
+ for (BOSHClientConnListener listener : connListeners) {
+ if (event == null) {
+ event = BOSHClientConnEvent.createConnectionClosedEvent(this);
+ }
+ try {
+ listener.connectionEvent(event);
+ } catch (Exception ex) {
+ LOG.log(Level.WARNING, UNHANDLED, ex);
+ }
+ }
+ }
+
+ /**
+ * Notifies all connection listeners that the session has been
+ * terminated due to the exceptional condition provided.
+ *
+ * @param cause cause of the termination
+ */
+ private void fireConnectionClosedOnError(
+ final Throwable cause) {
+ assertUnlocked();
+
+ BOSHClientConnEvent event = null;
+ for (BOSHClientConnListener listener : connListeners) {
+ if (event == null) {
+ event = BOSHClientConnEvent
+ .createConnectionClosedOnErrorEvent(
+ this, pendingRequestAcks, cause);
+ }
+ try {
+ listener.connectionEvent(event);
+ } catch (Exception ex) {
+ LOG.log(Level.WARNING, UNHANDLED, ex);
+ }
+ }
+ }
+
+}