aboutsummaryrefslogtreecommitdiff
path: root/okhttp-ws
diff options
context:
space:
mode:
authorTobias Thierer <tobiast@google.com>2016-06-24 19:04:17 +0100
committerTobias Thierer <tobiast@google.com>2016-06-29 16:23:09 +0100
commit6c251e20f00c7574b217bd4351ac81666f574380 (patch)
tree2d66a76721f4c8170b990742922675f32ad38122 /okhttp-ws
parent68e16131f12f0174c4c9e5785f6e63297cd02adf (diff)
downloadokhttp-6c251e20f00c7574b217bd4351ac81666f574380.tar.gz
Update OkHttp to 2.7.5 and advance okio by one commit.
This brings OkHttp and okio exactly in line with upstream commits with no local changes. Corresponding upstream commits: okhttp:6e236ce3b80f21369dc544f0e1053ff71be8689b (= parent-2.7.5) okio: 02481cc0cc84bc92e3eab6d5212a226496f56a7e The okio commit differs from the one in the previous pull from Sep 2015 (AOSP commit 71b9f47b26fb57ac3e436a19519c6e3ec70e86eb) only by a single upstream commit, the switch to 8 KiB segments. That commit was previously cherry-picked in AOSP. This CL will temporarily revert the AOSP changes to okio, but those AOSP changes to okio will be reapplied in the subsequent CL. Compilation and tests do not pass after this CL, they will only pass at the end of the chain of 11 CLs going in at the same time. 9 of these 11 CLs are in external/okhttp, the others affect libcore and frameworks/base. Details of behavioural changes introduced by this upgrade are at: https://docs.google.com/document/d/19PF3Exd_q32gAGCiRFWRf0Pq_xrIWs-cRViHkFTxJg8/edit This CL includes files that are not used in Android, such as - top level dot files (.travis.yml etc.) - subdirectories okurl, okhttp-apache, samples, which aren't used - tests in okhttp-hpacktests, okhttp-ws-tests that aren't run or test functionality that we aren't used Test: I've run the following tests *at the end* of the chain of commits, in cts-tradefed: 1.) run cts -p android.core.tests.libcore.package.harmony_java_net 2.) run cts -c libcore.java.net.URLConnectionTest 3.) run cts -p android.core.tests.libcore.package.okhttp 4.) run cts -p android.core.tests.libcore.package.libcore 1.-3.) all passed 4.) had 24 unrelated failures per b/29496407 and b/29744850 Change-Id: Id798d6cf49fa4a7a4ab8ae3b699a38104bf42db3
Diffstat (limited to 'okhttp-ws')
-rw-r--r--okhttp-ws/pom.xml2
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java153
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java8
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java58
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java194
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java34
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java61
-rw-r--r--okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java9
8 files changed, 268 insertions, 251 deletions
diff --git a/okhttp-ws/pom.xml b/okhttp-ws/pom.xml
index 81f8afd..688b538 100644
--- a/okhttp-ws/pom.xml
+++ b/okhttp-ws/pom.xml
@@ -6,7 +6,7 @@
<parent>
<groupId>com.squareup.okhttp</groupId>
<artifactId>parent</artifactId>
- <version>2.6.0-SNAPSHOT</version>
+ <version>2.7.5</version>
</parent>
<artifactId>okhttp-ws</artifactId>
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java
index 8d6b7c4..ea55b5a 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java
@@ -15,6 +15,9 @@
*/
package com.squareup.okhttp.internal.ws;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.RequestBody;
+import com.squareup.okhttp.ResponseBody;
import com.squareup.okhttp.internal.NamedRunnable;
import com.squareup.okhttp.ws.WebSocket;
import com.squareup.okhttp.ws.WebSocketListener;
@@ -22,14 +25,17 @@ import java.io.IOException;
import java.net.ProtocolException;
import java.util.Random;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
import okio.Buffer;
import okio.BufferedSink;
import okio.BufferedSource;
+import okio.Okio;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback;
public abstract class RealWebSocket implements WebSocket {
- /** A close code which indicates that the peer encountered a protocol exception. */
private static final int CLOSE_PROTOCOL_EXCEPTION = 1002;
private final WebSocketWriter writer;
@@ -38,10 +44,13 @@ public abstract class RealWebSocket implements WebSocket {
/** True after calling {@link #close(int, String)}. No writes are allowed afterward. */
private volatile boolean writerSentClose;
+ /** True after {@link IOException}. {@link #close(int, String)} becomes only valid call. */
+ private boolean writerWantsClose;
/** True after a close frame was read by the reader. No frames will follow it. */
- private volatile boolean readerSentClose;
- /** Lock required to negotiate closing the connection. */
- private final Object closeLock = new Object();
+ private boolean readerSentClose;
+
+ /** True after calling {@link #close()} to free connection resources. */
+ private final AtomicBoolean connectionClosed = new AtomicBoolean();
public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random,
final Executor replyExecutor, final WebSocketListener listener, final String url) {
@@ -49,8 +58,8 @@ public abstract class RealWebSocket implements WebSocket {
writer = new WebSocketWriter(isClient, sink, random);
reader = new WebSocketReader(isClient, source, new FrameCallback() {
- @Override public void onMessage(BufferedSource source, PayloadType type) throws IOException {
- listener.onMessage(source, type);
+ @Override public void onMessage(ResponseBody message) throws IOException {
+ listener.onMessage(message);
}
@Override public void onPing(final Buffer buffer) {
@@ -69,17 +78,10 @@ public abstract class RealWebSocket implements WebSocket {
}
@Override public void onClose(final int code, final String reason) {
- final boolean writeCloseResponse;
- synchronized (closeLock) {
- readerSentClose = true;
-
- // If the writer has not indicated a desire to close we will write a close response.
- writeCloseResponse = !writerSentClose;
- }
-
+ readerSentClose = true;
replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) {
@Override protected void execute() {
- peerClose(code, reason, writeCloseResponse);
+ peerClose(code, reason);
}
});
}
@@ -100,57 +102,96 @@ public abstract class RealWebSocket implements WebSocket {
}
}
- @Override public BufferedSink newMessageSink(PayloadType type) {
+ @Override public void sendMessage(RequestBody message) throws IOException {
+ if (message == null) throw new NullPointerException("message == null");
if (writerSentClose) throw new IllegalStateException("closed");
- return writer.newMessageSink(type);
- }
+ if (writerWantsClose) throw new IllegalStateException("must call close()");
- @Override public void sendMessage(PayloadType type, Buffer payload) throws IOException {
- if (writerSentClose) throw new IllegalStateException("closed");
- writer.sendMessage(type, payload);
+ MediaType contentType = message.contentType();
+ if (contentType == null) {
+ throw new IllegalArgumentException(
+ "Message content type was null. Must use WebSocket.TEXT or WebSocket.BINARY.");
+ }
+ String contentSubtype = contentType.subtype();
+
+ int formatOpcode;
+ if (WebSocket.TEXT.subtype().equals(contentSubtype)) {
+ formatOpcode = OPCODE_TEXT;
+ } else if (WebSocket.BINARY.subtype().equals(contentSubtype)) {
+ formatOpcode = OPCODE_BINARY;
+ } else {
+ throw new IllegalArgumentException("Unknown message content type: "
+ + contentType.type() + "/" + contentType.subtype() // Omit any implicitly added charset.
+ + ". Must use WebSocket.TEXT or WebSocket.BINARY.");
+ }
+
+ BufferedSink sink = Okio.buffer(writer.newMessageSink(formatOpcode));
+ try {
+ message.writeTo(sink);
+ sink.close();
+ } catch (IOException e) {
+ writerWantsClose = true;
+ throw e;
+ }
}
@Override public void sendPing(Buffer payload) throws IOException {
if (writerSentClose) throw new IllegalStateException("closed");
- writer.writePing(payload);
+ if (writerWantsClose) throw new IllegalStateException("must call close()");
+
+ try {
+ writer.writePing(payload);
+ } catch (IOException e) {
+ writerWantsClose = true;
+ throw e;
+ }
}
/** Send an unsolicited pong with the specified payload. */
public void sendPong(Buffer payload) throws IOException {
if (writerSentClose) throw new IllegalStateException("closed");
- writer.writePong(payload);
+ if (writerWantsClose) throw new IllegalStateException("must call close()");
+
+ try {
+ writer.writePong(payload);
+ } catch (IOException e) {
+ writerWantsClose = true;
+ throw e;
+ }
}
@Override public void close(int code, String reason) throws IOException {
if (writerSentClose) throw new IllegalStateException("closed");
+ writerSentClose = true;
- boolean closeConnection;
- synchronized (closeLock) {
- writerSentClose = true;
-
- // If the reader has also indicated a desire to close we will close the connection.
- closeConnection = readerSentClose;
- }
-
- writer.writeClose(code, reason);
-
- if (closeConnection) {
- closeConnection();
+ try {
+ writer.writeClose(code, reason);
+ } catch (IOException e) {
+ if (connectionClosed.compareAndSet(false, true)) {
+ // Try to close the connection without masking the original exception.
+ try {
+ close();
+ } catch (IOException ignored) {
+ }
+ }
+ throw e;
}
}
/** Replies and closes this web socket when a close frame is read from the peer. */
- private void peerClose(int code, String reason, boolean writeCloseResponse) {
- if (writeCloseResponse) {
+ private void peerClose(int code, String reason) {
+ if (!writerSentClose) {
try {
writer.writeClose(code, reason);
} catch (IOException ignored) {
}
}
- try {
- closeConnection();
- } catch (IOException ignored) {
+ if (connectionClosed.compareAndSet(false, true)) {
+ try {
+ close();
+ } catch (IOException ignored) {
+ }
}
listener.onClose(code, reason);
@@ -158,32 +199,24 @@ public abstract class RealWebSocket implements WebSocket {
/** Called on the reader thread when an error occurs. */
private void readerErrorClose(IOException e) {
- boolean writeCloseResponse;
- synchronized (closeLock) {
- readerSentClose = true;
-
- // If the writer has not closed we will close the connection.
- writeCloseResponse = !writerSentClose;
- }
-
- if (writeCloseResponse) {
- if (e instanceof ProtocolException) {
- // For protocol exceptions, try to inform the server of such.
- try {
- writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null);
- } catch (IOException ignored) {
- }
+ // For protocol exceptions, try to inform the server of such.
+ if (!writerSentClose && e instanceof ProtocolException) {
+ try {
+ writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null);
+ } catch (IOException ignored) {
}
}
- try {
- closeConnection();
- } catch (IOException ignored) {
+ if (connectionClosed.compareAndSet(false, true)) {
+ try {
+ close();
+ } catch (IOException ignored) {
+ }
}
listener.onFailure(e, null);
}
- /** Perform any tear-down work on the connection (close the socket, recycle, etc.). */
- protected abstract void closeConnection() throws IOException;
+ /** Perform any tear-down work (close the connection, shutdown executors). */
+ protected abstract void close() throws IOException;
}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java
index 2b93398..0778278 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java
@@ -68,14 +68,16 @@ public final class WebSocketProtocol {
static final int OPCODE_CONTROL_PONG = 0xa;
/**
- * Maximum length of frame payload. Larger payloads, if supported, can use the special values
- * {@link #PAYLOAD_SHORT} or {@link #PAYLOAD_LONG}.
+ * Maximum length of frame payload. Larger payloads, if supported by the frame type, can use the
+ * special values {@link #PAYLOAD_SHORT} or {@link #PAYLOAD_LONG}.
*/
- static final int PAYLOAD_MAX = 125;
+ static final long PAYLOAD_BYTE_MAX = 125L;
/**
* Value for {@link #B1_MASK_LENGTH} which indicates the next two bytes are the unsigned length.
*/
static final int PAYLOAD_SHORT = 126;
+ /** Maximum length of a frame payload to be denoted as {@link #PAYLOAD_SHORT}. */
+ static final long PAYLOAD_SHORT_MAX = 0xffffL;
/**
* Value for {@link #B1_MASK_LENGTH} which indicates the next eight bytes are the unsigned
* length.
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java
index ce548b1..d81785a 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java
@@ -15,6 +15,9 @@
*/
package com.squareup.okhttp.internal.ws;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.ResponseBody;
+import com.squareup.okhttp.ws.WebSocket;
import java.io.EOFException;
import java.io.IOException;
import java.net.ProtocolException;
@@ -24,7 +27,6 @@ import okio.Okio;
import okio.Source;
import okio.Timeout;
-import static com.squareup.okhttp.ws.WebSocket.PayloadType;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_FIN;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV1;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV2;
@@ -40,7 +42,7 @@ import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_P
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_FLAG_CONTROL;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_LONG;
-import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_MAX;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_BYTE_MAX;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.toggleMask;
import static java.lang.Integer.toHexString;
@@ -50,7 +52,7 @@ import static java.lang.Integer.toHexString;
*/
public final class WebSocketReader {
public interface FrameCallback {
- void onMessage(BufferedSource source, PayloadType type) throws IOException;
+ void onMessage(ResponseBody body) throws IOException;
void onPing(Buffer buffer);
void onPong(Buffer buffer);
void onClose(int code, String reason);
@@ -145,8 +147,8 @@ public final class WebSocketReader {
}
frameBytesRead = 0;
- if (isControlFrame && frameLength > PAYLOAD_MAX) {
- throw new ProtocolException("Control frame must be less than " + PAYLOAD_MAX + "B.");
+ if (isControlFrame && frameLength > PAYLOAD_BYTE_MAX) {
+ throw new ProtocolException("Control frame must be less than " + PAYLOAD_BYTE_MAX + "B.");
}
if (isMasked) {
@@ -182,18 +184,23 @@ public final class WebSocketReader {
frameCallback.onPong(buffer);
break;
case OPCODE_CONTROL_CLOSE:
- int code = 0;
+ int code = 1000;
String reason = "";
if (buffer != null) {
- if (buffer.size() < 2) {
- throw new ProtocolException("Close payload must be at least two bytes.");
+ long bufferSize = buffer.size();
+ if (bufferSize == 1) {
+ throw new ProtocolException("Malformed close payload length of 1.");
+ } else if (bufferSize != 0) {
+ code = buffer.readShort();
+ if (code < 1000 || code >= 5000) {
+ throw new ProtocolException("Code must be in range [1000,5000): " + code);
+ }
+ if ((code >= 1004 && code <= 1006) || (code >= 1012 && code <= 2999)) {
+ throw new ProtocolException("Code " + code + " is reserved and may not be used.");
+ }
+
+ reason = buffer.readUtf8();
}
- code = buffer.readShort();
- if (code < 1000 || code >= 5000) {
- throw new ProtocolException("Code must be in range [1000,5000): " + code);
- }
-
- reason = buffer.readUtf8();
}
frameCallback.onClose(code, reason);
closed = true;
@@ -204,20 +211,35 @@ public final class WebSocketReader {
}
private void readMessageFrame() throws IOException {
- PayloadType type;
+ final MediaType type;
switch (opcode) {
case OPCODE_TEXT:
- type = PayloadType.TEXT;
+ type = WebSocket.TEXT;
break;
case OPCODE_BINARY:
- type = PayloadType.BINARY;
+ type = WebSocket.BINARY;
break;
default:
throw new ProtocolException("Unknown opcode: " + toHexString(opcode));
}
+ final BufferedSource source = Okio.buffer(framedMessageSource);
+ ResponseBody body = new ResponseBody() {
+ @Override public MediaType contentType() {
+ return type;
+ }
+
+ @Override public long contentLength() throws IOException {
+ return -1;
+ }
+
+ @Override public BufferedSource source() throws IOException {
+ return source;
+ }
+ };
+
messageClosed = false;
- frameCallback.onMessage(Okio.buffer(framedMessageSource), type);
+ frameCallback.onMessage(body);
if (!messageClosed) {
throw new IllegalStateException("Listener failed to call close on message payload.");
}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java
index fc5de75..feece7a 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java
@@ -20,42 +20,41 @@ import java.util.Random;
import okio.Buffer;
import okio.BufferedSink;
import okio.BufferedSource;
-import okio.Okio;
import okio.Sink;
import okio.Timeout;
-import static com.squareup.okhttp.ws.WebSocket.PayloadType;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_FIN;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_FLAG_MASK;
-import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTINUATION;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_CLOSE;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PING;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PONG;
-import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_LONG;
-import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_MAX;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_BYTE_MAX;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT;
+import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT_MAX;
import static com.squareup.okhttp.internal.ws.WebSocketProtocol.toggleMask;
/**
* An <a href="http://tools.ietf.org/html/rfc6455">RFC 6455</a>-compatible WebSocket frame writer.
* <p>
* This class is partially thread safe. Only a single "main" thread should be sending messages via
- * calls to {@link #newMessageSink} or {@link #sendMessage} as well as any calls to
- * {@link #writePing} or {@link #writeClose}. Other threads may call {@link #writePing},
- * {@link #writePong}, or {@link #writeClose} which will interleave on the wire with frames from
- * the main thread.
+ * calls to {@link #newMessageSink}, {@link #writePing}, or {@link #writeClose}. Other threads may
+ * call {@link #writePing}, {@link #writePong}, or {@link #writeClose} which will interleave on the
+ * wire with frames from the "main" sending thread.
*/
public final class WebSocketWriter {
private final boolean isClient;
- /** Writes must be guarded by synchronizing on this instance! */
- private final BufferedSink sink;
private final Random random;
+ /** Writes must be guarded by synchronizing on 'this'. */
+ private final BufferedSink sink;
+ /** Access must be guarded by synchronizing on 'this'. */
+ private boolean writerClosed;
+
+ private final Buffer buffer = new Buffer();
private final FrameSink frameSink = new FrameSink();
- private boolean closed;
private boolean activeWriter;
private final byte[] maskKey;
@@ -75,15 +74,15 @@ public final class WebSocketWriter {
/** Send a ping with the supplied {@code payload}. Payload may be {@code null} */
public void writePing(Buffer payload) throws IOException {
- synchronized (sink) {
- writeControlFrame(OPCODE_CONTROL_PING, payload);
+ synchronized (this) {
+ writeControlFrameSynchronized(OPCODE_CONTROL_PING, payload);
}
}
/** Send a pong with the supplied {@code payload}. Payload may be {@code null} */
public void writePong(Buffer payload) throws IOException {
- synchronized (sink) {
- writeControlFrame(OPCODE_CONTROL_PONG, payload);
+ synchronized (this) {
+ writeControlFrameSynchronized(OPCODE_CONTROL_PONG, payload);
}
}
@@ -108,21 +107,23 @@ public final class WebSocketWriter {
}
}
- synchronized (sink) {
- writeControlFrame(OPCODE_CONTROL_CLOSE, payload);
- closed = true;
+ synchronized (this) {
+ writeControlFrameSynchronized(OPCODE_CONTROL_CLOSE, payload);
+ writerClosed = true;
}
}
- private void writeControlFrame(int opcode, Buffer payload) throws IOException {
- if (closed) throw new IOException("closed");
+ private void writeControlFrameSynchronized(int opcode, Buffer payload) throws IOException {
+ assert Thread.holdsLock(this);
+
+ if (writerClosed) throw new IOException("closed");
int length = 0;
if (payload != null) {
length = (int) payload.size();
- if (length > PAYLOAD_MAX) {
+ if (length > PAYLOAD_BYTE_MAX) {
throw new IllegalArgumentException(
- "Payload size must be less than or equal to " + PAYLOAD_MAX);
+ "Payload size must be less than or equal to " + PAYLOAD_BYTE_MAX);
}
}
@@ -138,7 +139,7 @@ public final class WebSocketWriter {
sink.write(maskKey);
if (payload != null) {
- writeAllMasked(payload, length);
+ writeMaskedSynchronized(payload, length);
}
} else {
sink.writeByte(b1);
@@ -148,93 +149,70 @@ public final class WebSocketWriter {
}
}
- sink.flush();
+ sink.emit();
}
/**
* Stream a message payload as a series of frames. This allows control frames to be interleaved
* between parts of the message.
*/
- public BufferedSink newMessageSink(PayloadType type) {
- if (type == null) throw new NullPointerException("type == null");
+ public Sink newMessageSink(int formatOpcode) {
if (activeWriter) {
throw new IllegalStateException("Another message writer is active. Did you call close()?");
}
activeWriter = true;
- frameSink.payloadType = type;
+ // Reset FrameSink state for a new writer.
+ frameSink.formatOpcode = formatOpcode;
frameSink.isFirstFrame = true;
- return Okio.buffer(frameSink);
- }
+ frameSink.closed = false;
- /**
- * Send a message payload as a single frame. This will block any control frames that need sent
- * until it is completed.
- */
- public void sendMessage(PayloadType type, Buffer payload) throws IOException {
- if (type == null) throw new NullPointerException("type == null");
- if (payload == null) throw new NullPointerException("payload == null");
- if (activeWriter) {
- throw new IllegalStateException("A message writer is active. Did you call close()?");
- }
- writeFrame(type, payload, payload.size(), true /* first frame */, true /* final */);
+ return frameSink;
}
- private void writeFrame(PayloadType payloadType, Buffer source, long byteCount,
- boolean isFirstFrame, boolean isFinal) throws IOException {
- if (closed) throw new IOException("closed");
-
- int opcode = OPCODE_CONTINUATION;
- if (isFirstFrame) {
- switch (payloadType) {
- case TEXT:
- opcode = OPCODE_TEXT;
- break;
- case BINARY:
- opcode = OPCODE_BINARY;
- break;
- default:
- throw new IllegalStateException("Unknown payload type: " + payloadType);
- }
- }
+ private void writeMessageFrameSynchronized(int formatOpcode, long byteCount, boolean isFirstFrame,
+ boolean isFinal) throws IOException {
+ assert Thread.holdsLock(this);
- synchronized (sink) {
- int b0 = opcode;
- if (isFinal) {
- b0 |= B0_FLAG_FIN;
- }
- sink.writeByte(b0);
+ if (writerClosed) throw new IOException("closed");
- int b1 = 0;
- if (isClient) {
- b1 |= B1_FLAG_MASK;
- random.nextBytes(maskKey);
- }
- if (byteCount <= PAYLOAD_MAX) {
- b1 |= (int) byteCount;
- sink.writeByte(b1);
- } else if (byteCount <= 0xffffL) { // Unsigned short.
- b1 |= PAYLOAD_SHORT;
- sink.writeByte(b1);
- sink.writeShort((int) byteCount);
- } else {
- b1 |= PAYLOAD_LONG;
- sink.writeByte(b1);
- sink.writeLong(byteCount);
- }
+ int b0 = isFirstFrame ? formatOpcode : OPCODE_CONTINUATION;
+ if (isFinal) {
+ b0 |= B0_FLAG_FIN;
+ }
+ sink.writeByte(b0);
- if (isClient) {
- sink.write(maskKey);
- writeAllMasked(source, byteCount);
- } else {
- sink.write(source, byteCount);
- }
+ int b1 = 0;
+ if (isClient) {
+ b1 |= B1_FLAG_MASK;
+ random.nextBytes(maskKey);
+ }
+ if (byteCount <= PAYLOAD_BYTE_MAX) {
+ b1 |= (int) byteCount;
+ sink.writeByte(b1);
+ } else if (byteCount <= PAYLOAD_SHORT_MAX) {
+ b1 |= PAYLOAD_SHORT;
+ sink.writeByte(b1);
+ sink.writeShort((int) byteCount);
+ } else {
+ b1 |= PAYLOAD_LONG;
+ sink.writeByte(b1);
+ sink.writeLong(byteCount);
+ }
- sink.flush();
+ if (isClient) {
+ sink.write(maskKey);
+ writeMaskedSynchronized(buffer, byteCount);
+ } else {
+ sink.write(buffer, byteCount);
}
+
+ sink.emit();
}
- private void writeAllMasked(BufferedSource source, long byteCount) throws IOException {
+ private void writeMaskedSynchronized(BufferedSource source, long byteCount) throws IOException {
+ assert Thread.holdsLock(this);
+
long written = 0;
while (written < byteCount) {
int toRead = (int) Math.min(byteCount, maskBuffer.length);
@@ -247,20 +225,31 @@ public final class WebSocketWriter {
}
private final class FrameSink implements Sink {
- private PayloadType payloadType;
+ private int formatOpcode;
private boolean isFirstFrame;
+ private boolean closed;
@Override public void write(Buffer source, long byteCount) throws IOException {
- writeFrame(payloadType, source, byteCount, isFirstFrame, false /* final */);
- isFirstFrame = false;
+ if (closed) throw new IOException("closed");
+
+ buffer.write(source, byteCount);
+
+ long emitCount = buffer.completeSegmentByteCount();
+ if (emitCount > 0) {
+ synchronized (WebSocketWriter.this) {
+ writeMessageFrameSynchronized(formatOpcode, emitCount, isFirstFrame, false /* final */);
+ }
+ isFirstFrame = false;
+ }
}
@Override public void flush() throws IOException {
if (closed) throw new IOException("closed");
- synchronized (sink) {
- sink.flush();
+ synchronized (WebSocketWriter.this) {
+ writeMessageFrameSynchronized(formatOpcode, buffer.size(), isFirstFrame, false /* final */);
}
+ isFirstFrame = false;
}
@Override public Timeout timeout() {
@@ -271,21 +260,10 @@ public final class WebSocketWriter {
@Override public void close() throws IOException {
if (closed) throw new IOException("closed");
- int length = 0;
-
- synchronized (sink) {
- sink.writeByte(B0_FLAG_FIN | OPCODE_CONTINUATION);
-
- if (isClient) {
- sink.writeByte(B1_FLAG_MASK | length);
- random.nextBytes(maskKey);
- sink.write(maskKey);
- } else {
- sink.writeByte(length);
- }
- sink.flush();
+ synchronized (WebSocketWriter.this) {
+ writeMessageFrameSynchronized(formatOpcode, buffer.size(), isFirstFrame, true /* final */);
}
-
+ closed = true;
activeWriter = false;
}
}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java
index 4cf2f42..a3eebe7 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java
@@ -15,40 +15,35 @@
*/
package com.squareup.okhttp.ws;
+import com.squareup.okhttp.MediaType;
+import com.squareup.okhttp.RequestBody;
import java.io.IOException;
import okio.Buffer;
-import okio.BufferedSink;
/** Blocking interface to connect and write to a web socket. */
public interface WebSocket {
- /** The format of a message payload. */
- enum PayloadType {
- /** UTF8-encoded text data. */
- TEXT,
- /** Arbitrary binary data. */
- BINARY
- }
+ /** A {@link MediaType} indicating UTF-8 text frames should be used when sending the message. */
+ MediaType TEXT = MediaType.parse("application/vnd.okhttp.websocket+text; charset=utf-8");
+ /** A {@link MediaType} indicating binary frames should be used when sending the message. */
+ MediaType BINARY = MediaType.parse("application/vnd.okhttp.websocket+binary");
/**
- * Stream a message payload to the server of the specified {code type}.
- * <p>
- * You must call {@link BufferedSink#close() close()} to complete the message. Calls to
- * {@link BufferedSink#flush() flush()} write a frame fragment. The message may be empty.
+ * Send a message payload to the server.
*
- * @throws IllegalStateException if not connected, already closed, or another writer is active.
- */
- BufferedSink newMessageSink(WebSocket.PayloadType type);
-
- /**
- * Send a message payload to the server of the specified {@code type}.
+ * <p>The {@linkplain RequestBody#contentType() content type} of {@code message} should be either
+ * {@link #TEXT} or {@link #BINARY}.
*
+ * @throws IOException if unable to write the message. Clients must call {@link #close} when this
+ * happens to ensure resources are cleaned up.
* @throws IllegalStateException if not connected, already closed, or another writer is active.
*/
- void sendMessage(WebSocket.PayloadType type, Buffer payload) throws IOException;
+ void sendMessage(RequestBody message) throws IOException;
/**
* Send a ping to the server with optional payload.
*
+ * @throws IOException if unable to write the ping. Clients must call {@link #close} when this
+ * happens to ensure resources are cleaned up.
* @throws IllegalStateException if already closed.
*/
void sendPing(Buffer payload) throws IOException;
@@ -62,6 +57,7 @@ public interface WebSocket {
* It is an error to call this method before calling close on an active writer. Calling this
* method more than once has no effect.
*
+ * @throws IOException if unable to write the close message. Resources will still be freed.
* @throws IllegalStateException if already closed.
*/
void close(int code, String reason) throws IOException;
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java
index 46ee8a1..5950850 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java
@@ -17,12 +17,12 @@ package com.squareup.okhttp.ws;
import com.squareup.okhttp.Call;
import com.squareup.okhttp.Callback;
-import com.squareup.okhttp.Connection;
import com.squareup.okhttp.OkHttpClient;
import com.squareup.okhttp.Request;
import com.squareup.okhttp.Response;
import com.squareup.okhttp.internal.Internal;
import com.squareup.okhttp.internal.Util;
+import com.squareup.okhttp.internal.http.StreamAllocation;
import com.squareup.okhttp.internal.ws.RealWebSocket;
import com.squareup.okhttp.internal.ws.WebSocketProtocol;
import java.io.IOException;
@@ -30,11 +30,9 @@ import java.net.ProtocolException;
import java.security.SecureRandom;
import java.util.Collections;
import java.util.Random;
-import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ThreadPoolExecutor;
-import okio.BufferedSink;
-import okio.BufferedSource;
import okio.ByteString;
import static java.util.concurrent.TimeUnit.SECONDS;
@@ -47,7 +45,6 @@ public final class WebSocketCall {
return new WebSocketCall(client, request);
}
- private final Request request;
private final Call call;
private final Random random;
private final String key;
@@ -78,7 +75,6 @@ public final class WebSocketCall {
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build();
- this.request = request;
call = client.newCall(request);
}
@@ -118,11 +114,9 @@ public final class WebSocketCall {
call.cancel();
}
- private void createWebSocket(Response response, WebSocketListener listener)
- throws IOException {
+ private void createWebSocket(Response response, WebSocketListener listener) throws IOException {
if (response.code() != 101) {
- // TODO call.engine.releaseConnection();
- Internal.instance.callEngineReleaseConnection(call);
+ Util.closeQuietly(response.body());
throw new ProtocolException("Expected HTTP 101 response but was '"
+ response.code()
+ " "
@@ -150,21 +144,9 @@ public final class WebSocketCall {
+ "'");
}
- // TODO connection = call.engine.getConnection();
- Connection connection = Internal.instance.callEngineGetConnection(call);
- // TODO if (!connection.clearOwner()) {
- if (!Internal.instance.clearOwner(connection)) {
- throw new IllegalStateException("Unable to take ownership of connection.");
- }
-
- BufferedSource source = Internal.instance.connectionRawSource(connection);
- BufferedSink sink = Internal.instance.connectionRawSink(connection);
-
- final RealWebSocket webSocket =
- ConnectionWebSocket.create(response, connection, source, sink, random, listener);
-
- // TODO connection.setOwner(webSocket);
- Internal.instance.connectionSetOwner(connection, webSocket);
+ StreamAllocation streamAllocation = Internal.instance.callEngineGetStreamAllocation(call);
+ RealWebSocket webSocket = StreamWebSocket.create(
+ streamAllocation, response, random, listener);
listener.onOpen(webSocket, response);
@@ -173,30 +155,33 @@ public final class WebSocketCall {
}
// Keep static so that the WebSocketCall instance can be garbage collected.
- private static class ConnectionWebSocket extends RealWebSocket {
- static RealWebSocket create(Response response, Connection connection, BufferedSource source,
- BufferedSink sink, Random random, WebSocketListener listener) {
+ private static class StreamWebSocket extends RealWebSocket {
+ static RealWebSocket create(StreamAllocation streamAllocation, Response response,
+ Random random, WebSocketListener listener) {
String url = response.request().urlString();
ThreadPoolExecutor replyExecutor =
new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(),
Util.threadFactory(String.format("OkHttp %s WebSocket", url), true));
replyExecutor.allowCoreThreadTimeOut(true);
- return new ConnectionWebSocket(connection, source, sink, random, replyExecutor, listener,
- url);
+ return new StreamWebSocket(streamAllocation, random, replyExecutor, listener, url);
}
- private final Connection connection;
+ private final StreamAllocation streamAllocation;
+ private final ExecutorService replyExecutor;
- private ConnectionWebSocket(Connection connection, BufferedSource source, BufferedSink sink,
- Random random, Executor replyExecutor, WebSocketListener listener, String url) {
- super(true /* is client */, source, sink, random, replyExecutor, listener, url);
- this.connection = connection;
+ private StreamWebSocket(StreamAllocation streamAllocation,
+ Random random, ExecutorService replyExecutor, WebSocketListener listener, String url) {
+ super(true /* is client */, streamAllocation.connection().source,
+ streamAllocation.connection().sink, random, replyExecutor, listener, url);
+ this.streamAllocation = streamAllocation;
+ this.replyExecutor = replyExecutor;
}
- @Override protected void closeConnection() throws IOException {
- // TODO connection.closeIfOwnedBy(this);
- Internal.instance.closeIfOwnedBy(connection, this);
+ @Override protected void close() throws IOException {
+ replyExecutor.shutdown();
+ streamAllocation.noNewStreams();
+ streamAllocation.streamFinished(streamAllocation.stream());
}
}
}
diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java
index 8941b74..5a5a8b1 100644
--- a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java
+++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java
@@ -16,11 +16,9 @@
package com.squareup.okhttp.ws;
import com.squareup.okhttp.Response;
+import com.squareup.okhttp.ResponseBody;
import java.io.IOException;
import okio.Buffer;
-import okio.BufferedSource;
-
-import static com.squareup.okhttp.ws.WebSocket.PayloadType;
/** Listener for server-initiated messages on a connected {@link WebSocket}. */
public interface WebSocketListener {
@@ -50,8 +48,11 @@ public interface WebSocketListener {
* <p>Implementations <strong>must</strong> call {@code source.close()} before returning. This
* indicates completion of parsing the message payload and will consume any remaining bytes in
* the message.
+ *
+ * <p>The {@linkplain ResponseBody#contentType() content type} of {@code message} will be either
+ * {@link WebSocket#TEXT} or {@link WebSocket#BINARY} which indicates the format of the message.
*/
- void onMessage(BufferedSource payload, PayloadType type) throws IOException;
+ void onMessage(ResponseBody message) throws IOException;
/**
* Called when a server pong is received. This is usually a result of calling {@link