aboutsummaryrefslogtreecommitdiff
path: root/okhttp-ws
diff options
context:
space:
mode:
authorNeil Fuller <nfuller@google.com>2015-04-13 13:06:22 +0100
committerNeil Fuller <nfuller@google.com>2015-04-16 11:44:31 +0100
commita2cab72aa5ff730ba2ae987b45398faafffeb505 (patch)
tree283de306182e8f1faff93d4e6515298539d8c21d /okhttp-ws
parent9fa0698523b8a573b8862c0e62c533be8bd53bda (diff)
downloadokhttp-a2cab72aa5ff730ba2ae987b45398faafffeb505.tar.gz
Roll-up of upstream OkHttp and Okio changes
OkHttp: From b609edd07864d7191dcda8ba1f6c833c9fe170ad to b40f99a950cb407eff52537a97420bd253a64f63 Okio: From 654ddf5e8f6311fda77e429c22d5e0e15f713b8d to b5811711b141b230e4e58f577c79cfbf4c2d4028 Both "to" are head as of 20150413. Patches applied cleanly without conflicts. This submission will break some CTS tests due to https://github.com/square/okhttp/issues/1552 Solutions will be made upstream and patched in. The CTS tests broken are related to SPDY/HTTP2 which are not used by Android's embedded OkHttp. Change-Id: I84d55b6f5c8dbc05148e86bd9421a2c393b563d4
Diffstat (limited to 'okhttp-ws')
-rw-r--r--okhttp-ws/README.md22
-rw-r--r--okhttp-ws/pom.xml37
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java189
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java96
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java287
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java294
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java68
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java225
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java59
9 files changed, 1277 insertions, 0 deletions
diff --git a/okhttp-ws/README.md b/okhttp-ws/README.md
new file mode 100644
index 0000000..054ea91
--- /dev/null
+++ b/okhttp-ws/README.md
@@ -0,0 +1,22 @@
+OkHttp Web Sockets
+==================
+
+RFC6455-compliant web socket implementation.
+
+Create a `WebSocketCall` with a `Request` and an `OkHttpClient` instance.
+```java
+WebSocketCall call = WebSocketCall.create(client, request);
+```
+
+A `WebSocketListener` will notify of the initial connection, server-sent messages, and any failures
+on the connection.
+
+Start the web socket by calling `enqueue` on `WebSocketCall` with the `WebSocketListener`.
+```java
+call.enqueue(new WebSocketListener() {
+ // ...
+});
+```
+
+*Note: This module's API should be considered experimental and may be subject to breaking changes
+in future releases.*
diff --git a/okhttp-ws/pom.xml b/okhttp-ws/pom.xml
new file mode 100644
index 0000000..ae34464
--- /dev/null
+++ b/okhttp-ws/pom.xml
@@ -0,0 +1,37 @@
+<?xml version="1.0" encoding="UTF-8"?>
+
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>com.squareup.okhttp</groupId>
+ <artifactId>parent</artifactId>
+ <version>2.4.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>okhttp-ws</artifactId>
+ <name>OkHttp Web Sockets</name>
+
+ <dependencies>
+ <dependency>
+ <groupId>com.squareup.okhttp</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-javadoc-plugin</artifactId>
+ <configuration>
+ <excludePackageNames>com.squareup.okhttp.internal.*</excludePackageNames>
+ <links>
+ <link>http://square.github.io/okhttp/javadoc/</link>
+ </links>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java
new file mode 100644
index 0000000..a647ac7
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java
@@ -0,0 +1,189 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.internal.ws;
+
+import com.squareup.okhttp.internal.NamedRunnable;
+import com.squareup.okhttp.ws.WebSocket;
+import com.squareup.okhttp.ws.WebSocketListener;
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import okio.Buffer;
+import okio.BufferedSink;
+import okio.BufferedSource;
+
+import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback;
+
+public abstract class RealWebSocket implements WebSocket {
+ /** A close code which indicates that the peer encountered a protocol exception. */
+ private static final int CLOSE_PROTOCOL_EXCEPTION = 1002;
+
+ private final WebSocketWriter writer;
+ private final WebSocketReader reader;
+ private final WebSocketListener listener;
+
+ /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */
+ private volatile boolean writerSentClose;
+ /** True after a close frame was read by the reader. No frames will follow it. */
+ private volatile boolean readerSentClose;
+ /** Lock required to negotiate closing the connection. */
+ private final Object closeLock = new Object();
+
+ public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random,
+ final Executor replyExecutor, final WebSocketListener listener, final String url) {
+ this.listener = listener;
+
+ writer = new WebSocketWriter(isClient, sink, random);
+ reader = new WebSocketReader(isClient, source, new FrameCallback() {
+ @Override public void onMessage(BufferedSource source, PayloadType type) throws IOException {
+ listener.onMessage(source, type);
+ }
+
+ @Override public void onPing(final Buffer buffer) {
+ replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) {
+ @Override protected void execute() {
+ try {
+ writer.writePong(buffer);
+ } catch (IOException ignored) {
+ }
+ }
+ });
+ }
+
+ @Override public void onPong(Buffer buffer) {
+ listener.onPong(buffer);
+ }
+
+ @Override public void onClose(final int code, final String reason) {
+ replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) {
+ @Override protected void execute() {
+ peerClose(code, reason);
+ }
+ });
+ }
+ });
+ }
+
+ /**
+ * Read a single message from the web socket and deliver it to the listener. This method should
+ * be called in a loop with the return value indicating whether looping should continue.
+ */
+ public boolean readMessage() {
+ try {
+ reader.processNextFrame();
+ return !readerSentClose;
+ } catch (IOException e) {
+ readerErrorClose(e);
+ return false;
+ }
+ }
+
+ @Override public BufferedSink newMessageSink(PayloadType type) {
+ if (writerSentClose) throw new IllegalStateException("closed");
+ return writer.newMessageSink(type);
+ }
+
+ @Override public void sendMessage(PayloadType type, Buffer payload) throws IOException {
+ if (writerSentClose) throw new IllegalStateException("closed");
+ writer.sendMessage(type, payload);
+ }
+
+ @Override public void sendPing(Buffer payload) throws IOException {
+ if (writerSentClose) throw new IllegalStateException("closed");
+ writer.writePing(payload);
+ }
+
+ /** Send an unsolicited pong with the specified payload. */
+ public void sendPong(Buffer payload) throws IOException {
+ if (writerSentClose) throw new IllegalStateException("closed");
+ writer.writePong(payload);
+ }
+
+ @Override public void close(int code, String reason) throws IOException {
+ if (writerSentClose) throw new IllegalStateException("closed");
+
+ boolean closeConnection;
+ synchronized (closeLock) {
+ writerSentClose = true;
+
+ // If the reader has also indicated a desire to close we will close the connection.
+ closeConnection = readerSentClose;
+ }
+
+ writer.writeClose(code, reason);
+
+ if (closeConnection) {
+ closeConnection();
+ }
+ }
+
+ /** Replies and closes this web socket when a close frame is read from the peer. */
+ private void peerClose(int code, String reason) {
+ boolean writeCloseResponse;
+ synchronized (closeLock) {
+ readerSentClose = true;
+
+ // If the writer has not indicated a desire to close we will write a close response.
+ writeCloseResponse = !writerSentClose;
+ }
+
+ if (writeCloseResponse) {
+ try {
+ writer.writeClose(code, reason);
+ } catch (IOException ignored) {
+ }
+ }
+
+ try {
+ closeConnection();
+ } catch (IOException ignored) {
+ }
+
+ listener.onClose(code, reason);
+ }
+
+ /** Called on the reader thread when an error occurs. */
+ private void readerErrorClose(IOException e) {
+ boolean writeCloseResponse;
+ synchronized (closeLock) {
+ readerSentClose = true;
+
+ // If the writer has not closed we will close the connection.
+ writeCloseResponse = !writerSentClose;
+ }
+
+ if (writeCloseResponse) {
+ if (e instanceof ProtocolException) {
+ // For protocol exceptions, try to inform the server of such.
+ try {
+ writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null);
+ } catch (IOException ignored) {
+ }
+ }
+ }
+
+ try {
+ closeConnection();
+ } catch (IOException ignored) {
+ }
+
+ listener.onFailure(e);
+ }
+
+ /** Perform any tear-down work on the connection (close the socket, recycle, etc.). */
+ protected abstract void closeConnection() throws IOException;
+}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java
new file mode 100644
index 0000000..2b93398
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java
@@ -0,0 +1,96 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.internal.ws;
+
+public final class WebSocketProtocol {
+ /** Magic value which must be appended to the key in a response header. */
+ public static final String ACCEPT_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
+
+ /*
+ Each frame starts with two bytes of data.
+
+ 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7
+ +-+-+-+-+-------+ +-+-------------+
+ |F|R|R|R| OP | |M| LENGTH |
+ |I|S|S|S| CODE | |A| |
+ |N|V|V|V| | |S| |
+ | |1|2|3| | |K| |
+ +-+-+-+-+-------+ +-+-------------+
+ */
+
+ /** Byte 0 flag for whether this is the final fragment in a message. */
+ static final int B0_FLAG_FIN = 0b10000000;
+ /** Byte 0 reserved flag 1. Must be 0 unless negotiated otherwise. */
+ static final int B0_FLAG_RSV1 = 0b01000000;
+ /** Byte 0 reserved flag 2. Must be 0 unless negotiated otherwise. */
+ static final int B0_FLAG_RSV2 = 0b00100000;
+ /** Byte 0 reserved flag 3. Must be 0 unless negotiated otherwise. */
+ static final int B0_FLAG_RSV3 = 0b00010000;
+ /** Byte 0 mask for the frame opcode. */
+ static final int B0_MASK_OPCODE = 0b00001111;
+ /** Flag in the opcode which indicates a control frame. */
+ static final int OPCODE_FLAG_CONTROL = 0b00001000;
+
+ /**
+ * Byte 1 flag for whether the payload data is masked.
+ * <p>
+ * If this flag is set, the next four bytes represent the mask key. These bytes appear after
+ * any additional bytes specified by {@link #B1_MASK_LENGTH}.
+ */
+ static final int B1_FLAG_MASK = 0b10000000;
+ /**
+ * Byte 1 mask for the payload length.
+ * <p>
+ * If this value is {@link #PAYLOAD_SHORT}, the next two bytes represent the length.
+ * If this value is {@link #PAYLOAD_LONG}, the next eight bytes represent the length.
+ */
+ static final int B1_MASK_LENGTH = 0b01111111;
+
+ static final int OPCODE_CONTINUATION = 0x0;
+ static final int OPCODE_TEXT = 0x1;
+ static final int OPCODE_BINARY = 0x2;
+
+ static final int OPCODE_CONTROL_CLOSE = 0x8;
+ static final int OPCODE_CONTROL_PING = 0x9;
+ static final int OPCODE_CONTROL_PONG = 0xa;
+
+ /**
+ * Maximum length of frame payload. Larger payloads, if supported, can use the special values
+ * {@link #PAYLOAD_SHORT} or {@link #PAYLOAD_LONG}.
+ */
+ static final int PAYLOAD_MAX = 125;
+ /**
+ * Value for {@link #B1_MASK_LENGTH} which indicates the next two bytes are the unsigned length.
+ */
+ static final int PAYLOAD_SHORT = 126;
+ /**
+ * Value for {@link #B1_MASK_LENGTH} which indicates the next eight bytes are the unsigned
+ * length.
+ */
+ static final int PAYLOAD_LONG = 127;
+
+ static void toggleMask(byte[] buffer, long byteCount, byte[] key, long frameBytesRead) {
+ int keyLength = key.length;
+ for (int i = 0; i < byteCount; i++, frameBytesRead++) {
+ int keyIndex = (int) (frameBytesRead % keyLength);
+ buffer[i] = (byte) (buffer[i] ^ key[keyIndex]);
+ }
+ }
+
+ private WebSocketProtocol() {
+ throw new AssertionError("No instances.");
+ }
+}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java
new file mode 100644
index 0000000..ee4e482
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.internal.ws;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.net.ProtocolException;
+import okio.Buffer;
+import okio.BufferedSource;
+import okio.Okio;
+import okio.Source;
+import okio.Timeout;
+
+import static com.squareup.okhttp.ws.WebSocket.PayloadType;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_FIN;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV1;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV2;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV3;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_MASK_OPCODE;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_FLAG_MASK;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_MASK_LENGTH;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTINUATION;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_CLOSE;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PING;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PONG;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_FLAG_CONTROL;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_LONG;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_MAX;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.toggleMask;
+import static java.lang.Integer.toHexString;
+
+/**
+ * An <a href="http://tools.ietf.org/html/rfc6455">RFC 6455</a>-compatible WebSocket frame reader.
+ */
+public final class WebSocketReader {
+ public interface FrameCallback {
+ void onMessage(BufferedSource source, PayloadType type) throws IOException;
+ void onPing(Buffer buffer);
+ void onPong(Buffer buffer);
+ void onClose(int code, String reason);
+ }
+
+ private final boolean isClient;
+ private final BufferedSource source;
+ private final FrameCallback frameCallback;
+
+ private final Source framedMessageSource = new FramedMessageSource();
+
+ private boolean closed;
+ private boolean messageClosed;
+
+ // Stateful data about the current frame.
+ private int opcode;
+ private long frameLength;
+ private long frameBytesRead;
+ private boolean isFinalFrame;
+ private boolean isControlFrame;
+ private boolean isMasked;
+
+ private final byte[] maskKey = new byte[4];
+ private final byte[] maskBuffer = new byte[2048];
+
+ public WebSocketReader(boolean isClient, BufferedSource source, FrameCallback frameCallback) {
+ if (source == null) throw new NullPointerException("source == null");
+ if (frameCallback == null) throw new NullPointerException("frameCallback == null");
+ this.isClient = isClient;
+ this.source = source;
+ this.frameCallback = frameCallback;
+ }
+
+ /**
+ * Process the next protocol frame.
+ * <ul>
+ * <li>If it is a control frame this will result in a single call to {@link FrameCallback}.</li>
+ * <li>If it is a message frame this will result in a single call to {@link
+ * WebSocketListener#onMessage}. If the message spans multiple frames, each interleaved control
+ * frame will result in a corresponding call to {@link FrameCallback}.
+ * </ul>
+ */
+ public void processNextFrame() throws IOException {
+ readHeader();
+ if (isControlFrame) {
+ readControlFrame();
+ } else {
+ readMessageFrame();
+ }
+ }
+
+ private void readHeader() throws IOException {
+ if (closed) throw new IOException("closed");
+
+ int b0 = source.readByte() & 0xff;
+
+ opcode = b0 & B0_MASK_OPCODE;
+ isFinalFrame = (b0 & B0_FLAG_FIN) != 0;
+ isControlFrame = (b0 & OPCODE_FLAG_CONTROL) != 0;
+
+ // Control frames must be final frames (cannot contain continuations).
+ if (isControlFrame && !isFinalFrame) {
+ throw new ProtocolException("Control frames must be final.");
+ }
+
+ boolean reservedFlag1 = (b0 & B0_FLAG_RSV1) != 0;
+ boolean reservedFlag2 = (b0 & B0_FLAG_RSV2) != 0;
+ boolean reservedFlag3 = (b0 & B0_FLAG_RSV3) != 0;
+ if (reservedFlag1 || reservedFlag2 || reservedFlag3) {
+ // Reserved flags are for extensions which we currently do not support.
+ throw new ProtocolException("Reserved flags are unsupported.");
+ }
+
+ int b1 = source.readByte() & 0xff;
+
+ isMasked = (b1 & B1_FLAG_MASK) != 0;
+ if (isMasked == isClient) {
+ // Masked payloads must be read on the server. Unmasked payloads must be read on the client.
+ throw new ProtocolException("Client-sent frames must be masked. Server sent must not.");
+ }
+
+ // Get frame length, optionally reading from follow-up bytes if indicated by special values.
+ frameLength = b1 & B1_MASK_LENGTH;
+ if (frameLength == PAYLOAD_SHORT) {
+ frameLength = source.readShort() & 0xffffL; // Value is unsigned.
+ } else if (frameLength == PAYLOAD_LONG) {
+ frameLength = source.readLong();
+ if (frameLength < 0) {
+ throw new ProtocolException(
+ "Frame length 0x" + Long.toHexString(frameLength) + " > 0x7FFFFFFFFFFFFFFF");
+ }
+ }
+ frameBytesRead = 0;
+
+ if (isControlFrame && frameLength > PAYLOAD_MAX) {
+ throw new ProtocolException("Control frame must be less than " + PAYLOAD_MAX + "B.");
+ }
+
+ if (isMasked) {
+ // Read the masking key as bytes so that they can be used directly for unmasking.
+ source.readFully(maskKey);
+ }
+ }
+
+ private void readControlFrame() throws IOException {
+ Buffer buffer = null;
+ if (frameBytesRead < frameLength) {
+ buffer = new Buffer();
+
+ if (isClient) {
+ source.readFully(buffer, frameLength);
+ } else {
+ while (frameBytesRead < frameLength) {
+ int toRead = (int) Math.min(frameLength - frameBytesRead, maskBuffer.length);
+ int read = source.read(maskBuffer, 0, toRead);
+ if (read == -1) throw new EOFException();
+ toggleMask(maskBuffer, read, maskKey, frameBytesRead);
+ buffer.write(maskBuffer, 0, read);
+ frameBytesRead += read;
+ }
+ }
+ }
+
+ switch (opcode) {
+ case OPCODE_CONTROL_PING:
+ frameCallback.onPing(buffer);
+ break;
+ case OPCODE_CONTROL_PONG:
+ frameCallback.onPong(buffer);
+ break;
+ case OPCODE_CONTROL_CLOSE:
+ int code = 0;
+ String reason = "";
+ if (buffer != null) {
+ code = buffer.readShort();
+ reason = buffer.readUtf8();
+ }
+ frameCallback.onClose(code, reason);
+ closed = true;
+ break;
+ default:
+ throw new IllegalStateException("Unknown control opcode: " + toHexString(opcode));
+ }
+ }
+
+ private void readMessageFrame() throws IOException {
+ PayloadType type;
+ switch (opcode) {
+ case OPCODE_TEXT:
+ type = PayloadType.TEXT;
+ break;
+ case OPCODE_BINARY:
+ type = PayloadType.BINARY;
+ break;
+ default:
+ throw new IllegalStateException("Unknown opcode: " + toHexString(opcode));
+ }
+
+ messageClosed = false;
+ frameCallback.onMessage(Okio.buffer(framedMessageSource), type);
+ if (!messageClosed) {
+ throw new IllegalStateException("Listener failed to call close on message payload.");
+ }
+ }
+
+ /** Read headers and process any control frames until we reach a non-control frame. */
+ private void readUntilNonControlFrame() throws IOException {
+ while (!closed) {
+ readHeader();
+ if (!isControlFrame) {
+ break;
+ }
+ readControlFrame();
+ }
+ }
+
+ /**
+ * A special source which knows how to read a message body across one or more frames. Control
+ * frames that occur between fragments will be processed. If the message payload is masked this
+ * will unmask as it's being processed.
+ */
+ private final class FramedMessageSource implements Source {
+ @Override public long read(Buffer sink, long byteCount) throws IOException {
+ if (closed) throw new IOException("closed");
+ if (messageClosed) throw new IllegalStateException("closed");
+
+ if (frameBytesRead == frameLength) {
+ if (isFinalFrame) return -1; // We are exhausted and have no continuations.
+
+ readUntilNonControlFrame();
+ if (opcode != OPCODE_CONTINUATION) {
+ throw new ProtocolException("Expected continuation opcode. Got: " + toHexString(opcode));
+ }
+ if (isFinalFrame && frameLength == 0) {
+ return -1; // Fast-path for empty final frame.
+ }
+ }
+
+ long toRead = Math.min(byteCount, frameLength - frameBytesRead);
+
+ long read;
+ if (isMasked) {
+ toRead = Math.min(toRead, maskBuffer.length);
+ read = source.read(maskBuffer, 0, (int) toRead);
+ if (read == -1) throw new EOFException();
+ toggleMask(maskBuffer, read, maskKey, frameBytesRead);
+ sink.write(maskBuffer, 0, (int) read);
+ } else {
+ read = source.read(sink, toRead);
+ if (read == -1) throw new EOFException();
+ }
+
+ frameBytesRead += read;
+ return read;
+ }
+
+ @Override public Timeout timeout() {
+ return source.timeout();
+ }
+
+ @Override public void close() throws IOException {
+ if (messageClosed) return;
+ messageClosed = true;
+ if (closed) return;
+
+ // Exhaust the remainder of the message, if any.
+ source.skip(frameLength - frameBytesRead);
+ while (!isFinalFrame) {
+ readUntilNonControlFrame();
+ source.skip(frameLength);
+ }
+ }
+ }
+}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java
new file mode 100644
index 0000000..74bd083
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java
@@ -0,0 +1,294 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.internal.ws;
+
+import java.io.IOException;
+import java.util.Random;
+import okio.Buffer;
+import okio.BufferedSink;
+import okio.BufferedSource;
+import okio.Okio;
+import okio.Sink;
+import okio.Timeout;
+
+import static com.squareup.okhttp.ws.WebSocket.PayloadType;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_FIN;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_FLAG_MASK;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTINUATION;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_CLOSE;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PING;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PONG;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_LONG;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_MAX;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.toggleMask;
+
+/**
+ * An <a href="http://tools.ietf.org/html/rfc6455">RFC 6455</a>-compatible WebSocket frame writer.
+ * <p>
+ * This class is partially thread safe. Only a single "main" thread should be sending messages via
+ * calls to {@link #newMessageSink} or {@link #sendMessage} as well as any calls to
+ * {@link #writePing} or {@link #writeClose}. Other threads may call {@link #writePing},
+ * {@link #writePong}, or {@link #writeClose} which will interleave on the wire with frames from
+ * the main thread.
+ */
+public final class WebSocketWriter {
+ private final boolean isClient;
+ /** Writes must be guarded by synchronizing on this instance! */
+ private final BufferedSink sink;
+ private final Random random;
+
+ private final FrameSink frameSink = new FrameSink();
+
+ private boolean closed;
+ private boolean activeWriter;
+
+ private final byte[] maskKey;
+ private final byte[] maskBuffer;
+
+ public WebSocketWriter(boolean isClient, BufferedSink sink, Random random) {
+ if (sink == null) throw new NullPointerException("sink == null");
+ if (random == null) throw new NullPointerException("random == null");
+ this.isClient = isClient;
+ this.sink = sink;
+ this.random = random;
+
+ // Masks are only a concern for client writers.
+ maskKey = isClient ? new byte[4] : null;
+ maskBuffer = isClient ? new byte[2048] : null;
+ }
+
+ /** Send a ping with the supplied {@code payload}. Payload may be {@code null} */
+ public void writePing(Buffer payload) throws IOException {
+ synchronized (sink) {
+ writeControlFrame(OPCODE_CONTROL_PING, payload);
+ }
+ }
+
+ /** Send a pong with the supplied {@code payload}. Payload may be {@code null} */
+ public void writePong(Buffer payload) throws IOException {
+ synchronized (sink) {
+ writeControlFrame(OPCODE_CONTROL_PONG, payload);
+ }
+ }
+
+ /**
+ * Send a close frame with optional code and reason.
+ *
+ * @param code Status code as defined by
+ * <a href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a> or
+ * {@code 0}.
+ * @param reason Reason for shutting down or {@code null}. {@code code} is required if set.
+ */
+ public void writeClose(int code, String reason) throws IOException {
+ Buffer payload = null;
+ if (code != 0) {
+ if (code < 1000 || code >= 5000) {
+ throw new IllegalArgumentException("Code must be in range [1000,5000).");
+ }
+ payload = new Buffer();
+ payload.writeShort(code);
+ if (reason != null) {
+ payload.writeUtf8(reason);
+ }
+ } else if (reason != null) {
+ throw new IllegalArgumentException("Code required to include reason.");
+ }
+
+ synchronized (sink) {
+ writeControlFrame(OPCODE_CONTROL_CLOSE, payload);
+ closed = true;
+ }
+ }
+
+ private void writeControlFrame(int opcode, Buffer payload) throws IOException {
+ if (closed) throw new IOException("closed");
+
+ int length = 0;
+ if (payload != null) {
+ length = (int) payload.size();
+ if (length > PAYLOAD_MAX) {
+ throw new IllegalArgumentException(
+ "Payload size must be less than or equal to " + PAYLOAD_MAX);
+ }
+ }
+
+ int b0 = B0_FLAG_FIN | opcode;
+ sink.writeByte(b0);
+
+ int b1 = length;
+ if (isClient) {
+ b1 |= B1_FLAG_MASK;
+ sink.writeByte(b1);
+
+ random.nextBytes(maskKey);
+ sink.write(maskKey);
+
+ if (payload != null) {
+ writeAllMasked(payload, length);
+ }
+ } else {
+ sink.writeByte(b1);
+
+ if (payload != null) {
+ sink.writeAll(payload);
+ }
+ }
+
+ sink.flush();
+ }
+
+ /**
+ * Stream a message payload as a series of frames. This allows control frames to be interleaved
+ * between parts of the message.
+ */
+ public BufferedSink newMessageSink(PayloadType type) {
+ if (type == null) throw new NullPointerException("type == null");
+ if (activeWriter) {
+ throw new IllegalStateException("Another message writer is active. Did you call close()?");
+ }
+ activeWriter = true;
+
+ frameSink.payloadType = type;
+ frameSink.isFirstFrame = true;
+ return Okio.buffer(frameSink);
+ }
+
+ /**
+ * Send a message payload as a single frame. This will block any control frames that need sent
+ * until it is completed.
+ */
+ public void sendMessage(PayloadType type, Buffer payload) throws IOException {
+ if (type == null) throw new NullPointerException("type == null");
+ if (payload == null) throw new NullPointerException("payload == null");
+ if (activeWriter) {
+ throw new IllegalStateException("A message writer is active. Did you call close()?");
+ }
+ writeFrame(type, payload, payload.size(), true /* first frame */, true /* final */);
+ }
+
+ private void writeFrame(PayloadType payloadType, Buffer source, long byteCount,
+ boolean isFirstFrame, boolean isFinal) throws IOException {
+ if (closed) throw new IOException("closed");
+
+ int opcode = OPCODE_CONTINUATION;
+ if (isFirstFrame) {
+ switch (payloadType) {
+ case TEXT:
+ opcode = OPCODE_TEXT;
+ break;
+ case BINARY:
+ opcode = OPCODE_BINARY;
+ break;
+ default:
+ throw new IllegalStateException("Unknown payload type: " + payloadType);
+ }
+ }
+
+ synchronized (sink) {
+ int b0 = opcode;
+ if (isFinal) {
+ b0 |= B0_FLAG_FIN;
+ }
+ sink.writeByte(b0);
+
+ int b1 = 0;
+ if (isClient) {
+ b1 |= B1_FLAG_MASK;
+ random.nextBytes(maskKey);
+ }
+ if (byteCount <= PAYLOAD_MAX) {
+ b1 |= (int) byteCount;
+ sink.writeByte(b1);
+ } else if (byteCount <= 0xffffL) { // Unsigned short.
+ b1 |= PAYLOAD_SHORT;
+ sink.writeByte(b1);
+ sink.writeShort((int) byteCount);
+ } else {
+ b1 |= PAYLOAD_LONG;
+ sink.writeByte(b1);
+ sink.writeLong(byteCount);
+ }
+
+ if (isClient) {
+ sink.write(maskKey);
+ writeAllMasked(source, byteCount);
+ } else {
+ sink.write(source, byteCount);
+ }
+
+ sink.flush();
+ }
+ }
+
+ private void writeAllMasked(BufferedSource source, long byteCount) throws IOException {
+ long written = 0;
+ while (written < byteCount) {
+ int toRead = (int) Math.min(byteCount, maskBuffer.length);
+ int read = source.read(maskBuffer, 0, toRead);
+ if (read == -1) throw new AssertionError();
+ toggleMask(maskBuffer, read, maskKey, written);
+ sink.write(maskBuffer, 0, read);
+ written += read;
+ }
+ }
+
+ private final class FrameSink implements Sink {
+ private PayloadType payloadType;
+ private boolean isFirstFrame;
+
+ @Override public void write(Buffer source, long byteCount) throws IOException {
+ writeFrame(payloadType, source, byteCount, isFirstFrame, false /* final */);
+ isFirstFrame = false;
+ }
+
+ @Override public void flush() throws IOException {
+ if (closed) throw new IOException("closed");
+
+ synchronized (sink) {
+ sink.flush();
+ }
+ }
+
+ @Override public Timeout timeout() {
+ return sink.timeout();
+ }
+
+ @SuppressWarnings("PointlessBitwiseExpression")
+ @Override public void close() throws IOException {
+ if (closed) throw new IOException("closed");
+
+ int length = 0;
+
+ synchronized (sink) {
+ sink.writeByte(B0_FLAG_FIN | OPCODE_CONTINUATION);
+
+ if (isClient) {
+ sink.writeByte(B1_FLAG_MASK | length);
+ random.nextBytes(maskKey);
+ sink.write(maskKey);
+ } else {
+ sink.writeByte(length);
+ }
+ sink.flush();
+ }
+
+ activeWriter = false;
+ }
+ }
+}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java
new file mode 100644
index 0000000..4cf2f42
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.ws;
+
+import java.io.IOException;
+import okio.Buffer;
+import okio.BufferedSink;
+
+/** Blocking interface to connect and write to a web socket. */
+public interface WebSocket {
+ /** The format of a message payload. */
+ enum PayloadType {
+ /** UTF8-encoded text data. */
+ TEXT,
+ /** Arbitrary binary data. */
+ BINARY
+ }
+
+ /**
+ * Stream a message payload to the server of the specified {code type}.
+ * <p>
+ * You must call {@link BufferedSink#close() close()} to complete the message. Calls to
+ * {@link BufferedSink#flush() flush()} write a frame fragment. The message may be empty.
+ *
+ * @throws IllegalStateException if not connected, already closed, or another writer is active.
+ */
+ BufferedSink newMessageSink(WebSocket.PayloadType type);
+
+ /**
+ * Send a message payload to the server of the specified {@code type}.
+ *
+ * @throws IllegalStateException if not connected, already closed, or another writer is active.
+ */
+ void sendMessage(WebSocket.PayloadType type, Buffer payload) throws IOException;
+
+ /**
+ * Send a ping to the server with optional payload.
+ *
+ * @throws IllegalStateException if already closed.
+ */
+ void sendPing(Buffer payload) throws IOException;
+
+ /**
+ * Send a close frame to the server.
+ * <p>
+ * The corresponding {@link WebSocketListener} will continue to get messages until its
+ * {@link WebSocketListener#onClose onClose()} method is called.
+ * <p>
+ * It is an error to call this method before calling close on an active writer. Calling this
+ * method more than once has no effect.
+ *
+ * @throws IllegalStateException if already closed.
+ */
+ void close(int code, String reason) throws IOException;
+}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java
new file mode 100644
index 0000000..422167c
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java
@@ -0,0 +1,225 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.ws;
+
+import com.squareup.okhttp.Call;
+import com.squareup.okhttp.Callback;
+import com.squareup.okhttp.Connection;
+import com.squareup.okhttp.OkHttpClient;
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.Response;
+import com.squareup.okhttp.internal.Internal;
+import com.squareup.okhttp.internal.NamedRunnable;
+import com.squareup.okhttp.internal.Util;
+import com.squareup.okhttp.internal.ws.RealWebSocket;
+import com.squareup.okhttp.internal.ws.WebSocketProtocol;
+import java.io.IOException;
+import java.net.ProtocolException;
+import java.net.Socket;
+import java.security.SecureRandom;
+import java.util.Collections;
+import java.util.Random;
+import java.util.concurrent.Executor;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadPoolExecutor;
+import okio.BufferedSink;
+import okio.BufferedSource;
+import okio.ByteString;
+import okio.Okio;
+
+import static java.util.concurrent.TimeUnit.SECONDS;
+
+public final class WebSocketCall {
+ /**
+ * Prepares the {@code request} to create a web socket at some point in the future.
+ */
+ public static WebSocketCall create(OkHttpClient client, Request request) {
+ return new WebSocketCall(client, request);
+ }
+
+ private final Request request;
+ private final Call call;
+ private final Random random;
+ private final String key;
+
+ WebSocketCall(OkHttpClient client, Request request) {
+ this(client, request, new SecureRandom());
+ }
+
+ WebSocketCall(OkHttpClient client, Request request, Random random) {
+ if (!"GET".equals(request.method())) {
+ throw new IllegalArgumentException("Request must be GET: " + request.method());
+ }
+ String url = request.urlString();
+ String httpUrl;
+ if (url.startsWith("ws://")) {
+ httpUrl = "http://" + url.substring(5);
+ } else if (url.startsWith("wss://")) {
+ httpUrl = "https://" + url.substring(6);
+ } else if (url.startsWith("http://") || url.startsWith("https://")) {
+ httpUrl = url;
+ } else {
+ throw new IllegalArgumentException(
+ "Request url must use 'ws', 'wss', 'http', or 'https' scheme: " + url);
+ }
+
+ this.random = random;
+
+ byte[] nonce = new byte[16];
+ random.nextBytes(nonce);
+ key = ByteString.of(nonce).base64();
+
+ // Copy the client. Otherwise changes (socket factory, redirect policy,
+ // etc.) may incorrectly be reflected in the request when it is executed.
+ client = client.clone();
+ // Force HTTP/1.1 until the WebSocket over HTTP/2 version is finalized.
+ client.setProtocols(Collections.singletonList(com.squareup.okhttp.Protocol.HTTP_1_1));
+
+ request = request.newBuilder()
+ .url(httpUrl)
+ .header("Upgrade", "websocket")
+ .header("Connection", "Upgrade")
+ .header("Sec-WebSocket-Key", key)
+ .header("Sec-WebSocket-Version", "13")
+ .build();
+ this.request = request;
+
+ call = client.newCall(request);
+ }
+
+ /**
+ * Schedules the request to be executed at some point in the future.
+ *
+ * <p>The {@link OkHttpClient#getDispatcher dispatcher} defines when the request will run:
+ * usually immediately unless there are several other requests currently being executed.
+ *
+ * <p>This client will later call back {@code responseCallback} with either an HTTP response or a
+ * failure exception. If you {@link #cancel} a request before it completes the callback will not
+ * be invoked.
+ *
+ * @throws IllegalStateException when the call has already been executed.
+ */
+ public void enqueue(final WebSocketListener listener) {
+ Callback responseCallback = new Callback() {
+ @Override public void onResponse(Response response) throws IOException {
+ try {
+ createWebSocket(response, listener);
+ } catch (IOException e) {
+ listener.onFailure(e);
+ }
+ }
+
+ @Override public void onFailure(Request request, IOException e) {
+ listener.onFailure(e);
+ }
+ };
+ // TODO call.enqueue(responseCallback, true);
+ Internal.instance.callEnqueue(call, responseCallback, true);
+ }
+
+ /** Cancels the request, if possible. Requests that are already complete cannot be canceled. */
+ public void cancel() {
+ call.cancel();
+ }
+
+ private void createWebSocket(Response response, WebSocketListener listener)
+ throws IOException {
+ if (response.code() != 101) {
+ // TODO call.engine.releaseConnection();
+ Internal.instance.callEngineReleaseConnection(call);
+ throw new ProtocolException("Expected HTTP 101 response but was '"
+ + response.code()
+ + " "
+ + response.message()
+ + "'");
+ }
+
+ String headerConnection = response.header("Connection");
+ if (!"Upgrade".equalsIgnoreCase(headerConnection)) {
+ throw new ProtocolException(
+ "Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'");
+ }
+ String headerUpgrade = response.header("Upgrade");
+ if (!"websocket".equalsIgnoreCase(headerUpgrade)) {
+ throw new ProtocolException(
+ "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'");
+ }
+ String headerAccept = response.header("Sec-WebSocket-Accept");
+ String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC);
+ if (!acceptExpected.equals(headerAccept)) {
+ throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '"
+ + acceptExpected
+ + "' but was '"
+ + headerAccept
+ + "'");
+ }
+
+ // TODO connection = call.engine.getConnection();
+ Connection connection = Internal.instance.callEngineGetConnection(call);
+ // TODO if (!connection.clearOwner()) {
+ if (!Internal.instance.clearOwner(connection)) {
+ throw new IllegalStateException("Unable to take ownership of connection.");
+ }
+
+ Socket socket = connection.getSocket();
+ BufferedSource source = Okio.buffer(Okio.source(socket));
+ BufferedSink sink = Okio.buffer(Okio.sink(socket));
+
+ final RealWebSocket webSocket =
+ ConnectionWebSocket.create(response, connection, source, sink, random, listener);
+
+ // Start a dedicated thread for reading the web socket.
+ new Thread(new NamedRunnable("OkHttp WebSocket reader %s", request.urlString()) {
+ @Override protected void execute() {
+ while (webSocket.readMessage()) {
+ }
+ }
+ }).start();
+
+ // TODO connection.setOwner(webSocket);
+ Internal.instance.connectionSetOwner(connection, webSocket);
+
+ listener.onOpen(webSocket, request, response);
+ }
+
+ // Keep static so that the WebSocketCall instance can be garbage collected.
+ private static class ConnectionWebSocket extends RealWebSocket {
+ static RealWebSocket create(Response response, Connection connection, BufferedSource source,
+ BufferedSink sink, Random random, WebSocketListener listener) {
+ String url = response.request().urlString();
+ ThreadPoolExecutor replyExecutor =
+ new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(),
+ Util.threadFactory(String.format("OkHttp %s WebSocket", url), true));
+ replyExecutor.allowCoreThreadTimeOut(true);
+
+ return new ConnectionWebSocket(connection, source, sink, random, replyExecutor, listener,
+ url);
+ }
+
+ private final Connection connection;
+
+ private ConnectionWebSocket(Connection connection, BufferedSource source, BufferedSink sink,
+ Random random, Executor replyExecutor, WebSocketListener listener, String url) {
+ super(true /* is client */, source, sink, random, replyExecutor, listener, url);
+ this.connection = connection;
+ }
+
+ @Override protected void closeConnection() throws IOException {
+ // TODO connection.closeIfOwnedBy(this);
+ Internal.instance.closeIfOwnedBy(connection, this);
+ }
+ }
+}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java
new file mode 100644
index 0000000..a113eed
--- /dev/null
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java
@@ -0,0 +1,59 @@
+/*
+ * Copyright (C) 2014 Square, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.squareup.okhttp.ws;
+
+import com.squareup.okhttp.Request;
+import com.squareup.okhttp.Response;
+import java.io.IOException;
+import okio.Buffer;
+import okio.BufferedSource;
+
+import static com.squareup.okhttp.ws.WebSocket.PayloadType;
+
+/** Listener for server-initiated messages on a connected {@link WebSocket}. */
+public interface WebSocketListener {
+ void onOpen(WebSocket webSocket, Request request, Response response) throws IOException;
+
+ /**
+ * Called when a server message is received. The {@code type} indicates whether the
+ * {@code payload} should be interpreted as UTF-8 text or binary data.
+ *
+ * <p>Implementations <strong>must</strong> call {@code source.close()} before returning. This
+ * indicates completion of parsing the message payload and will consume any remaining bytes in
+ * the message.
+ */
+ void onMessage(BufferedSource payload, PayloadType type) throws IOException;
+
+ /**
+ * Called when a server pong is received. This is usually a result of calling {@link
+ * WebSocket#sendPing(Buffer)} but might also be unsolicited.
+ */
+ void onPong(Buffer payload);
+
+ /**
+ * Called when the server sends a close message. This may have been initiated
+ * from a call to {@link WebSocket#close(int, String) close()} or as an unprompted
+ * message from the server.
+ *
+ * @param code The <a href="http://tools.ietf.org/html/rfc6455#section-7.4.1">RFC-compliant</a>
+ * status code.
+ * @param reason Reason for close or an empty string.
+ */
+ void onClose(int code, String reason);
+
+ /** Called when the transport or protocol layer of this web socket errors during communication. */
+ void onFailure(IOException e);
+}