diff options
author | Neil Fuller <nfuller@google.com> | 2015-04-13 13:06:22 +0100 |
---|---|---|
committer | Neil Fuller <nfuller@google.com> | 2015-04-16 11:44:31 +0100 |
commit | a2cab72aa5ff730ba2ae987b45398faafffeb505 (patch) | |
tree | 283de306182e8f1faff93d4e6515298539d8c21d /okhttp-ws | |
parent | 9fa0698523b8a573b8862c0e62c533be8bd53bda (diff) | |
download | okhttp-a2cab72aa5ff730ba2ae987b45398faafffeb505.tar.gz |
Roll-up of upstream OkHttp and Okio changes
OkHttp:
From b609edd07864d7191dcda8ba1f6c833c9fe170ad
to b40f99a950cb407eff52537a97420bd253a64f63
Okio:
From 654ddf5e8f6311fda77e429c22d5e0e15f713b8d
to b5811711b141b230e4e58f577c79cfbf4c2d4028
Both "to" are head as of 20150413.
Patches applied cleanly without conflicts.
This submission will break some CTS tests due
to https://github.com/square/okhttp/issues/1552
Solutions will be made upstream and patched in.
The CTS tests broken are related to SPDY/HTTP2
which are not used by Android's embedded OkHttp.
Change-Id: I84d55b6f5c8dbc05148e86bd9421a2c393b563d4
Diffstat (limited to 'okhttp-ws')
9 files changed, 1277 insertions, 0 deletions
diff --git a/okhttp-ws/README.md b/okhttp-ws/README.md new file mode 100644 index 0000000..054ea91 --- /dev/null +++ b/okhttp-ws/README.md @@ -0,0 +1,22 @@ +OkHttp Web Sockets +================== + +RFC6455-compliant web socket implementation. + +Create a `WebSocketCall` with a `Request` and an `OkHttpClient` instance. +```java +WebSocketCall call = WebSocketCall.create(client, request); +``` + +A `WebSocketListener` will notify of the initial connection, server-sent messages, and any failures +on the connection. + +Start the web socket by calling `enqueue` on `WebSocketCall` with the `WebSocketListener`. +```java +call.enqueue(new WebSocketListener() { + // ... +}); +``` + +*Note: This module's API should be considered experimental and may be subject to breaking changes +in future releases.* diff --git a/okhttp-ws/pom.xml b/okhttp-ws/pom.xml new file mode 100644 index 0000000..ae34464 --- /dev/null +++ b/okhttp-ws/pom.xml @@ -0,0 +1,37 @@ +<?xml version="1.0" encoding="UTF-8"?> + +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd"> + <modelVersion>4.0.0</modelVersion> + + <parent> + <groupId>com.squareup.okhttp</groupId> + <artifactId>parent</artifactId> + <version>2.4.0-SNAPSHOT</version> + </parent> + + <artifactId>okhttp-ws</artifactId> + <name>OkHttp Web Sockets</name> + + <dependencies> + <dependency> + <groupId>com.squareup.okhttp</groupId> + <artifactId>okhttp</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.maven.plugins</groupId> + <artifactId>maven-javadoc-plugin</artifactId> + <configuration> + <excludePackageNames>com.squareup.okhttp.internal.*</excludePackageNames> + <links> + <link>http://square.github.io/okhttp/javadoc/</link> + </links> + </configuration> + </plugin> + </plugins> + </build> +</project> diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java new file mode 100644 index 0000000..a647ac7 --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/RealWebSocket.java @@ -0,0 +1,189 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.ws; + +import com.squareup.okhttp.internal.NamedRunnable; +import com.squareup.okhttp.ws.WebSocket; +import com.squareup.okhttp.ws.WebSocketListener; +import java.io.IOException; +import java.net.ProtocolException; +import java.util.Random; +import java.util.concurrent.Executor; +import okio.Buffer; +import okio.BufferedSink; +import okio.BufferedSource; + +import static com.squareup.okhttp.internal.ws.WebSocketReader.FrameCallback; + +public abstract class RealWebSocket implements WebSocket { + /** A close code which indicates that the peer encountered a protocol exception. */ + private static final int CLOSE_PROTOCOL_EXCEPTION = 1002; + + private final WebSocketWriter writer; + private final WebSocketReader reader; + private final WebSocketListener listener; + + /** True after calling {@link #close(int, String)}. No writes are allowed afterward. */ + private volatile boolean writerSentClose; + /** True after a close frame was read by the reader. No frames will follow it. */ + private volatile boolean readerSentClose; + /** Lock required to negotiate closing the connection. */ + private final Object closeLock = new Object(); + + public RealWebSocket(boolean isClient, BufferedSource source, BufferedSink sink, Random random, + final Executor replyExecutor, final WebSocketListener listener, final String url) { + this.listener = listener; + + writer = new WebSocketWriter(isClient, sink, random); + reader = new WebSocketReader(isClient, source, new FrameCallback() { + @Override public void onMessage(BufferedSource source, PayloadType type) throws IOException { + listener.onMessage(source, type); + } + + @Override public void onPing(final Buffer buffer) { + replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Pong Reply", url) { + @Override protected void execute() { + try { + writer.writePong(buffer); + } catch (IOException ignored) { + } + } + }); + } + + @Override public void onPong(Buffer buffer) { + listener.onPong(buffer); + } + + @Override public void onClose(final int code, final String reason) { + replyExecutor.execute(new NamedRunnable("OkHttp %s WebSocket Close Reply", url) { + @Override protected void execute() { + peerClose(code, reason); + } + }); + } + }); + } + + /** + * Read a single message from the web socket and deliver it to the listener. This method should + * be called in a loop with the return value indicating whether looping should continue. + */ + public boolean readMessage() { + try { + reader.processNextFrame(); + return !readerSentClose; + } catch (IOException e) { + readerErrorClose(e); + return false; + } + } + + @Override public BufferedSink newMessageSink(PayloadType type) { + if (writerSentClose) throw new IllegalStateException("closed"); + return writer.newMessageSink(type); + } + + @Override public void sendMessage(PayloadType type, Buffer payload) throws IOException { + if (writerSentClose) throw new IllegalStateException("closed"); + writer.sendMessage(type, payload); + } + + @Override public void sendPing(Buffer payload) throws IOException { + if (writerSentClose) throw new IllegalStateException("closed"); + writer.writePing(payload); + } + + /** Send an unsolicited pong with the specified payload. */ + public void sendPong(Buffer payload) throws IOException { + if (writerSentClose) throw new IllegalStateException("closed"); + writer.writePong(payload); + } + + @Override public void close(int code, String reason) throws IOException { + if (writerSentClose) throw new IllegalStateException("closed"); + + boolean closeConnection; + synchronized (closeLock) { + writerSentClose = true; + + // If the reader has also indicated a desire to close we will close the connection. + closeConnection = readerSentClose; + } + + writer.writeClose(code, reason); + + if (closeConnection) { + closeConnection(); + } + } + + /** Replies and closes this web socket when a close frame is read from the peer. */ + private void peerClose(int code, String reason) { + boolean writeCloseResponse; + synchronized (closeLock) { + readerSentClose = true; + + // If the writer has not indicated a desire to close we will write a close response. + writeCloseResponse = !writerSentClose; + } + + if (writeCloseResponse) { + try { + writer.writeClose(code, reason); + } catch (IOException ignored) { + } + } + + try { + closeConnection(); + } catch (IOException ignored) { + } + + listener.onClose(code, reason); + } + + /** Called on the reader thread when an error occurs. */ + private void readerErrorClose(IOException e) { + boolean writeCloseResponse; + synchronized (closeLock) { + readerSentClose = true; + + // If the writer has not closed we will close the connection. + writeCloseResponse = !writerSentClose; + } + + if (writeCloseResponse) { + if (e instanceof ProtocolException) { + // For protocol exceptions, try to inform the server of such. + try { + writer.writeClose(CLOSE_PROTOCOL_EXCEPTION, null); + } catch (IOException ignored) { + } + } + } + + try { + closeConnection(); + } catch (IOException ignored) { + } + + listener.onFailure(e); + } + + /** Perform any tear-down work on the connection (close the socket, recycle, etc.). */ + protected abstract void closeConnection() throws IOException; +} diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java new file mode 100644 index 0000000..2b93398 --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketProtocol.java @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.ws; + +public final class WebSocketProtocol { + /** Magic value which must be appended to the key in a response header. */ + public static final String ACCEPT_MAGIC = "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"; + + /* + Each frame starts with two bytes of data. + + 0 1 2 3 4 5 6 7 0 1 2 3 4 5 6 7 + +-+-+-+-+-------+ +-+-------------+ + |F|R|R|R| OP | |M| LENGTH | + |I|S|S|S| CODE | |A| | + |N|V|V|V| | |S| | + | |1|2|3| | |K| | + +-+-+-+-+-------+ +-+-------------+ + */ + + /** Byte 0 flag for whether this is the final fragment in a message. */ + static final int B0_FLAG_FIN = 0b10000000; + /** Byte 0 reserved flag 1. Must be 0 unless negotiated otherwise. */ + static final int B0_FLAG_RSV1 = 0b01000000; + /** Byte 0 reserved flag 2. Must be 0 unless negotiated otherwise. */ + static final int B0_FLAG_RSV2 = 0b00100000; + /** Byte 0 reserved flag 3. Must be 0 unless negotiated otherwise. */ + static final int B0_FLAG_RSV3 = 0b00010000; + /** Byte 0 mask for the frame opcode. */ + static final int B0_MASK_OPCODE = 0b00001111; + /** Flag in the opcode which indicates a control frame. */ + static final int OPCODE_FLAG_CONTROL = 0b00001000; + + /** + * Byte 1 flag for whether the payload data is masked. + * <p> + * If this flag is set, the next four bytes represent the mask key. These bytes appear after + * any additional bytes specified by {@link #B1_MASK_LENGTH}. + */ + static final int B1_FLAG_MASK = 0b10000000; + /** + * Byte 1 mask for the payload length. + * <p> + * If this value is {@link #PAYLOAD_SHORT}, the next two bytes represent the length. + * If this value is {@link #PAYLOAD_LONG}, the next eight bytes represent the length. + */ + static final int B1_MASK_LENGTH = 0b01111111; + + static final int OPCODE_CONTINUATION = 0x0; + static final int OPCODE_TEXT = 0x1; + static final int OPCODE_BINARY = 0x2; + + static final int OPCODE_CONTROL_CLOSE = 0x8; + static final int OPCODE_CONTROL_PING = 0x9; + static final int OPCODE_CONTROL_PONG = 0xa; + + /** + * Maximum length of frame payload. Larger payloads, if supported, can use the special values + * {@link #PAYLOAD_SHORT} or {@link #PAYLOAD_LONG}. + */ + static final int PAYLOAD_MAX = 125; + /** + * Value for {@link #B1_MASK_LENGTH} which indicates the next two bytes are the unsigned length. + */ + static final int PAYLOAD_SHORT = 126; + /** + * Value for {@link #B1_MASK_LENGTH} which indicates the next eight bytes are the unsigned + * length. + */ + static final int PAYLOAD_LONG = 127; + + static void toggleMask(byte[] buffer, long byteCount, byte[] key, long frameBytesRead) { + int keyLength = key.length; + for (int i = 0; i < byteCount; i++, frameBytesRead++) { + int keyIndex = (int) (frameBytesRead % keyLength); + buffer[i] = (byte) (buffer[i] ^ key[keyIndex]); + } + } + + private WebSocketProtocol() { + throw new AssertionError("No instances."); + } +} diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java new file mode 100644 index 0000000..ee4e482 --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketReader.java @@ -0,0 +1,287 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.ws; + +import java.io.EOFException; +import java.io.IOException; +import java.net.ProtocolException; +import okio.Buffer; +import okio.BufferedSource; +import okio.Okio; +import okio.Source; +import okio.Timeout; + +import static com.squareup.okhttp.ws.WebSocket.PayloadType; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_FIN; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV1; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV2; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_RSV3; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_MASK_OPCODE; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_FLAG_MASK; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_MASK_LENGTH; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTINUATION; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_CLOSE; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PING; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PONG; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_FLAG_CONTROL; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_LONG; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_MAX; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.toggleMask; +import static java.lang.Integer.toHexString; + +/** + * An <a href="http://tools.ietf.org/html/rfc6455">RFC 6455</a>-compatible WebSocket frame reader. + */ +public final class WebSocketReader { + public interface FrameCallback { + void onMessage(BufferedSource source, PayloadType type) throws IOException; + void onPing(Buffer buffer); + void onPong(Buffer buffer); + void onClose(int code, String reason); + } + + private final boolean isClient; + private final BufferedSource source; + private final FrameCallback frameCallback; + + private final Source framedMessageSource = new FramedMessageSource(); + + private boolean closed; + private boolean messageClosed; + + // Stateful data about the current frame. + private int opcode; + private long frameLength; + private long frameBytesRead; + private boolean isFinalFrame; + private boolean isControlFrame; + private boolean isMasked; + + private final byte[] maskKey = new byte[4]; + private final byte[] maskBuffer = new byte[2048]; + + public WebSocketReader(boolean isClient, BufferedSource source, FrameCallback frameCallback) { + if (source == null) throw new NullPointerException("source == null"); + if (frameCallback == null) throw new NullPointerException("frameCallback == null"); + this.isClient = isClient; + this.source = source; + this.frameCallback = frameCallback; + } + + /** + * Process the next protocol frame. + * <ul> + * <li>If it is a control frame this will result in a single call to {@link FrameCallback}.</li> + * <li>If it is a message frame this will result in a single call to {@link + * WebSocketListener#onMessage}. If the message spans multiple frames, each interleaved control + * frame will result in a corresponding call to {@link FrameCallback}. + * </ul> + */ + public void processNextFrame() throws IOException { + readHeader(); + if (isControlFrame) { + readControlFrame(); + } else { + readMessageFrame(); + } + } + + private void readHeader() throws IOException { + if (closed) throw new IOException("closed"); + + int b0 = source.readByte() & 0xff; + + opcode = b0 & B0_MASK_OPCODE; + isFinalFrame = (b0 & B0_FLAG_FIN) != 0; + isControlFrame = (b0 & OPCODE_FLAG_CONTROL) != 0; + + // Control frames must be final frames (cannot contain continuations). + if (isControlFrame && !isFinalFrame) { + throw new ProtocolException("Control frames must be final."); + } + + boolean reservedFlag1 = (b0 & B0_FLAG_RSV1) != 0; + boolean reservedFlag2 = (b0 & B0_FLAG_RSV2) != 0; + boolean reservedFlag3 = (b0 & B0_FLAG_RSV3) != 0; + if (reservedFlag1 || reservedFlag2 || reservedFlag3) { + // Reserved flags are for extensions which we currently do not support. + throw new ProtocolException("Reserved flags are unsupported."); + } + + int b1 = source.readByte() & 0xff; + + isMasked = (b1 & B1_FLAG_MASK) != 0; + if (isMasked == isClient) { + // Masked payloads must be read on the server. Unmasked payloads must be read on the client. + throw new ProtocolException("Client-sent frames must be masked. Server sent must not."); + } + + // Get frame length, optionally reading from follow-up bytes if indicated by special values. + frameLength = b1 & B1_MASK_LENGTH; + if (frameLength == PAYLOAD_SHORT) { + frameLength = source.readShort() & 0xffffL; // Value is unsigned. + } else if (frameLength == PAYLOAD_LONG) { + frameLength = source.readLong(); + if (frameLength < 0) { + throw new ProtocolException( + "Frame length 0x" + Long.toHexString(frameLength) + " > 0x7FFFFFFFFFFFFFFF"); + } + } + frameBytesRead = 0; + + if (isControlFrame && frameLength > PAYLOAD_MAX) { + throw new ProtocolException("Control frame must be less than " + PAYLOAD_MAX + "B."); + } + + if (isMasked) { + // Read the masking key as bytes so that they can be used directly for unmasking. + source.readFully(maskKey); + } + } + + private void readControlFrame() throws IOException { + Buffer buffer = null; + if (frameBytesRead < frameLength) { + buffer = new Buffer(); + + if (isClient) { + source.readFully(buffer, frameLength); + } else { + while (frameBytesRead < frameLength) { + int toRead = (int) Math.min(frameLength - frameBytesRead, maskBuffer.length); + int read = source.read(maskBuffer, 0, toRead); + if (read == -1) throw new EOFException(); + toggleMask(maskBuffer, read, maskKey, frameBytesRead); + buffer.write(maskBuffer, 0, read); + frameBytesRead += read; + } + } + } + + switch (opcode) { + case OPCODE_CONTROL_PING: + frameCallback.onPing(buffer); + break; + case OPCODE_CONTROL_PONG: + frameCallback.onPong(buffer); + break; + case OPCODE_CONTROL_CLOSE: + int code = 0; + String reason = ""; + if (buffer != null) { + code = buffer.readShort(); + reason = buffer.readUtf8(); + } + frameCallback.onClose(code, reason); + closed = true; + break; + default: + throw new IllegalStateException("Unknown control opcode: " + toHexString(opcode)); + } + } + + private void readMessageFrame() throws IOException { + PayloadType type; + switch (opcode) { + case OPCODE_TEXT: + type = PayloadType.TEXT; + break; + case OPCODE_BINARY: + type = PayloadType.BINARY; + break; + default: + throw new IllegalStateException("Unknown opcode: " + toHexString(opcode)); + } + + messageClosed = false; + frameCallback.onMessage(Okio.buffer(framedMessageSource), type); + if (!messageClosed) { + throw new IllegalStateException("Listener failed to call close on message payload."); + } + } + + /** Read headers and process any control frames until we reach a non-control frame. */ + private void readUntilNonControlFrame() throws IOException { + while (!closed) { + readHeader(); + if (!isControlFrame) { + break; + } + readControlFrame(); + } + } + + /** + * A special source which knows how to read a message body across one or more frames. Control + * frames that occur between fragments will be processed. If the message payload is masked this + * will unmask as it's being processed. + */ + private final class FramedMessageSource implements Source { + @Override public long read(Buffer sink, long byteCount) throws IOException { + if (closed) throw new IOException("closed"); + if (messageClosed) throw new IllegalStateException("closed"); + + if (frameBytesRead == frameLength) { + if (isFinalFrame) return -1; // We are exhausted and have no continuations. + + readUntilNonControlFrame(); + if (opcode != OPCODE_CONTINUATION) { + throw new ProtocolException("Expected continuation opcode. Got: " + toHexString(opcode)); + } + if (isFinalFrame && frameLength == 0) { + return -1; // Fast-path for empty final frame. + } + } + + long toRead = Math.min(byteCount, frameLength - frameBytesRead); + + long read; + if (isMasked) { + toRead = Math.min(toRead, maskBuffer.length); + read = source.read(maskBuffer, 0, (int) toRead); + if (read == -1) throw new EOFException(); + toggleMask(maskBuffer, read, maskKey, frameBytesRead); + sink.write(maskBuffer, 0, (int) read); + } else { + read = source.read(sink, toRead); + if (read == -1) throw new EOFException(); + } + + frameBytesRead += read; + return read; + } + + @Override public Timeout timeout() { + return source.timeout(); + } + + @Override public void close() throws IOException { + if (messageClosed) return; + messageClosed = true; + if (closed) return; + + // Exhaust the remainder of the message, if any. + source.skip(frameLength - frameBytesRead); + while (!isFinalFrame) { + readUntilNonControlFrame(); + source.skip(frameLength); + } + } + } +} diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java new file mode 100644 index 0000000..74bd083 --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/internal/ws/WebSocketWriter.java @@ -0,0 +1,294 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.internal.ws; + +import java.io.IOException; +import java.util.Random; +import okio.Buffer; +import okio.BufferedSink; +import okio.BufferedSource; +import okio.Okio; +import okio.Sink; +import okio.Timeout; + +import static com.squareup.okhttp.ws.WebSocket.PayloadType; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B0_FLAG_FIN; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.B1_FLAG_MASK; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_BINARY; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTINUATION; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_CLOSE; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PING; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_CONTROL_PONG; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.OPCODE_TEXT; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_LONG; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_MAX; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.PAYLOAD_SHORT; +import static com.squareup.okhttp.internal.ws.WebSocketProtocol.toggleMask; + +/** + * An <a href="http://tools.ietf.org/html/rfc6455">RFC 6455</a>-compatible WebSocket frame writer. + * <p> + * This class is partially thread safe. Only a single "main" thread should be sending messages via + * calls to {@link #newMessageSink} or {@link #sendMessage} as well as any calls to + * {@link #writePing} or {@link #writeClose}. Other threads may call {@link #writePing}, + * {@link #writePong}, or {@link #writeClose} which will interleave on the wire with frames from + * the main thread. + */ +public final class WebSocketWriter { + private final boolean isClient; + /** Writes must be guarded by synchronizing on this instance! */ + private final BufferedSink sink; + private final Random random; + + private final FrameSink frameSink = new FrameSink(); + + private boolean closed; + private boolean activeWriter; + + private final byte[] maskKey; + private final byte[] maskBuffer; + + public WebSocketWriter(boolean isClient, BufferedSink sink, Random random) { + if (sink == null) throw new NullPointerException("sink == null"); + if (random == null) throw new NullPointerException("random == null"); + this.isClient = isClient; + this.sink = sink; + this.random = random; + + // Masks are only a concern for client writers. + maskKey = isClient ? new byte[4] : null; + maskBuffer = isClient ? new byte[2048] : null; + } + + /** Send a ping with the supplied {@code payload}. Payload may be {@code null} */ + public void writePing(Buffer payload) throws IOException { + synchronized (sink) { + writeControlFrame(OPCODE_CONTROL_PING, payload); + } + } + + /** Send a pong with the supplied {@code payload}. Payload may be {@code null} */ + public void writePong(Buffer payload) throws IOException { + synchronized (sink) { + writeControlFrame(OPCODE_CONTROL_PONG, payload); + } + } + + /** + * Send a close frame with optional code and reason. + * + * @param code Status code as defined by + * <a href="http://tools.ietf.org/html/rfc6455#section-7.4">Section 7.4 of RFC 6455</a> or + * {@code 0}. + * @param reason Reason for shutting down or {@code null}. {@code code} is required if set. + */ + public void writeClose(int code, String reason) throws IOException { + Buffer payload = null; + if (code != 0) { + if (code < 1000 || code >= 5000) { + throw new IllegalArgumentException("Code must be in range [1000,5000)."); + } + payload = new Buffer(); + payload.writeShort(code); + if (reason != null) { + payload.writeUtf8(reason); + } + } else if (reason != null) { + throw new IllegalArgumentException("Code required to include reason."); + } + + synchronized (sink) { + writeControlFrame(OPCODE_CONTROL_CLOSE, payload); + closed = true; + } + } + + private void writeControlFrame(int opcode, Buffer payload) throws IOException { + if (closed) throw new IOException("closed"); + + int length = 0; + if (payload != null) { + length = (int) payload.size(); + if (length > PAYLOAD_MAX) { + throw new IllegalArgumentException( + "Payload size must be less than or equal to " + PAYLOAD_MAX); + } + } + + int b0 = B0_FLAG_FIN | opcode; + sink.writeByte(b0); + + int b1 = length; + if (isClient) { + b1 |= B1_FLAG_MASK; + sink.writeByte(b1); + + random.nextBytes(maskKey); + sink.write(maskKey); + + if (payload != null) { + writeAllMasked(payload, length); + } + } else { + sink.writeByte(b1); + + if (payload != null) { + sink.writeAll(payload); + } + } + + sink.flush(); + } + + /** + * Stream a message payload as a series of frames. This allows control frames to be interleaved + * between parts of the message. + */ + public BufferedSink newMessageSink(PayloadType type) { + if (type == null) throw new NullPointerException("type == null"); + if (activeWriter) { + throw new IllegalStateException("Another message writer is active. Did you call close()?"); + } + activeWriter = true; + + frameSink.payloadType = type; + frameSink.isFirstFrame = true; + return Okio.buffer(frameSink); + } + + /** + * Send a message payload as a single frame. This will block any control frames that need sent + * until it is completed. + */ + public void sendMessage(PayloadType type, Buffer payload) throws IOException { + if (type == null) throw new NullPointerException("type == null"); + if (payload == null) throw new NullPointerException("payload == null"); + if (activeWriter) { + throw new IllegalStateException("A message writer is active. Did you call close()?"); + } + writeFrame(type, payload, payload.size(), true /* first frame */, true /* final */); + } + + private void writeFrame(PayloadType payloadType, Buffer source, long byteCount, + boolean isFirstFrame, boolean isFinal) throws IOException { + if (closed) throw new IOException("closed"); + + int opcode = OPCODE_CONTINUATION; + if (isFirstFrame) { + switch (payloadType) { + case TEXT: + opcode = OPCODE_TEXT; + break; + case BINARY: + opcode = OPCODE_BINARY; + break; + default: + throw new IllegalStateException("Unknown payload type: " + payloadType); + } + } + + synchronized (sink) { + int b0 = opcode; + if (isFinal) { + b0 |= B0_FLAG_FIN; + } + sink.writeByte(b0); + + int b1 = 0; + if (isClient) { + b1 |= B1_FLAG_MASK; + random.nextBytes(maskKey); + } + if (byteCount <= PAYLOAD_MAX) { + b1 |= (int) byteCount; + sink.writeByte(b1); + } else if (byteCount <= 0xffffL) { // Unsigned short. + b1 |= PAYLOAD_SHORT; + sink.writeByte(b1); + sink.writeShort((int) byteCount); + } else { + b1 |= PAYLOAD_LONG; + sink.writeByte(b1); + sink.writeLong(byteCount); + } + + if (isClient) { + sink.write(maskKey); + writeAllMasked(source, byteCount); + } else { + sink.write(source, byteCount); + } + + sink.flush(); + } + } + + private void writeAllMasked(BufferedSource source, long byteCount) throws IOException { + long written = 0; + while (written < byteCount) { + int toRead = (int) Math.min(byteCount, maskBuffer.length); + int read = source.read(maskBuffer, 0, toRead); + if (read == -1) throw new AssertionError(); + toggleMask(maskBuffer, read, maskKey, written); + sink.write(maskBuffer, 0, read); + written += read; + } + } + + private final class FrameSink implements Sink { + private PayloadType payloadType; + private boolean isFirstFrame; + + @Override public void write(Buffer source, long byteCount) throws IOException { + writeFrame(payloadType, source, byteCount, isFirstFrame, false /* final */); + isFirstFrame = false; + } + + @Override public void flush() throws IOException { + if (closed) throw new IOException("closed"); + + synchronized (sink) { + sink.flush(); + } + } + + @Override public Timeout timeout() { + return sink.timeout(); + } + + @SuppressWarnings("PointlessBitwiseExpression") + @Override public void close() throws IOException { + if (closed) throw new IOException("closed"); + + int length = 0; + + synchronized (sink) { + sink.writeByte(B0_FLAG_FIN | OPCODE_CONTINUATION); + + if (isClient) { + sink.writeByte(B1_FLAG_MASK | length); + random.nextBytes(maskKey); + sink.write(maskKey); + } else { + sink.writeByte(length); + } + sink.flush(); + } + + activeWriter = false; + } + } +} diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java new file mode 100644 index 0000000..4cf2f42 --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocket.java @@ -0,0 +1,68 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.ws; + +import java.io.IOException; +import okio.Buffer; +import okio.BufferedSink; + +/** Blocking interface to connect and write to a web socket. */ +public interface WebSocket { + /** The format of a message payload. */ + enum PayloadType { + /** UTF8-encoded text data. */ + TEXT, + /** Arbitrary binary data. */ + BINARY + } + + /** + * Stream a message payload to the server of the specified {code type}. + * <p> + * You must call {@link BufferedSink#close() close()} to complete the message. Calls to + * {@link BufferedSink#flush() flush()} write a frame fragment. The message may be empty. + * + * @throws IllegalStateException if not connected, already closed, or another writer is active. + */ + BufferedSink newMessageSink(WebSocket.PayloadType type); + + /** + * Send a message payload to the server of the specified {@code type}. + * + * @throws IllegalStateException if not connected, already closed, or another writer is active. + */ + void sendMessage(WebSocket.PayloadType type, Buffer payload) throws IOException; + + /** + * Send a ping to the server with optional payload. + * + * @throws IllegalStateException if already closed. + */ + void sendPing(Buffer payload) throws IOException; + + /** + * Send a close frame to the server. + * <p> + * The corresponding {@link WebSocketListener} will continue to get messages until its + * {@link WebSocketListener#onClose onClose()} method is called. + * <p> + * It is an error to call this method before calling close on an active writer. Calling this + * method more than once has no effect. + * + * @throws IllegalStateException if already closed. + */ + void close(int code, String reason) throws IOException; +} diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java new file mode 100644 index 0000000..422167c --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketCall.java @@ -0,0 +1,225 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.ws; + +import com.squareup.okhttp.Call; +import com.squareup.okhttp.Callback; +import com.squareup.okhttp.Connection; +import com.squareup.okhttp.OkHttpClient; +import com.squareup.okhttp.Request; +import com.squareup.okhttp.Response; +import com.squareup.okhttp.internal.Internal; +import com.squareup.okhttp.internal.NamedRunnable; +import com.squareup.okhttp.internal.Util; +import com.squareup.okhttp.internal.ws.RealWebSocket; +import com.squareup.okhttp.internal.ws.WebSocketProtocol; +import java.io.IOException; +import java.net.ProtocolException; +import java.net.Socket; +import java.security.SecureRandom; +import java.util.Collections; +import java.util.Random; +import java.util.concurrent.Executor; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ThreadPoolExecutor; +import okio.BufferedSink; +import okio.BufferedSource; +import okio.ByteString; +import okio.Okio; + +import static java.util.concurrent.TimeUnit.SECONDS; + +public final class WebSocketCall { + /** + * Prepares the {@code request} to create a web socket at some point in the future. + */ + public static WebSocketCall create(OkHttpClient client, Request request) { + return new WebSocketCall(client, request); + } + + private final Request request; + private final Call call; + private final Random random; + private final String key; + + WebSocketCall(OkHttpClient client, Request request) { + this(client, request, new SecureRandom()); + } + + WebSocketCall(OkHttpClient client, Request request, Random random) { + if (!"GET".equals(request.method())) { + throw new IllegalArgumentException("Request must be GET: " + request.method()); + } + String url = request.urlString(); + String httpUrl; + if (url.startsWith("ws://")) { + httpUrl = "http://" + url.substring(5); + } else if (url.startsWith("wss://")) { + httpUrl = "https://" + url.substring(6); + } else if (url.startsWith("http://") || url.startsWith("https://")) { + httpUrl = url; + } else { + throw new IllegalArgumentException( + "Request url must use 'ws', 'wss', 'http', or 'https' scheme: " + url); + } + + this.random = random; + + byte[] nonce = new byte[16]; + random.nextBytes(nonce); + key = ByteString.of(nonce).base64(); + + // Copy the client. Otherwise changes (socket factory, redirect policy, + // etc.) may incorrectly be reflected in the request when it is executed. + client = client.clone(); + // Force HTTP/1.1 until the WebSocket over HTTP/2 version is finalized. + client.setProtocols(Collections.singletonList(com.squareup.okhttp.Protocol.HTTP_1_1)); + + request = request.newBuilder() + .url(httpUrl) + .header("Upgrade", "websocket") + .header("Connection", "Upgrade") + .header("Sec-WebSocket-Key", key) + .header("Sec-WebSocket-Version", "13") + .build(); + this.request = request; + + call = client.newCall(request); + } + + /** + * Schedules the request to be executed at some point in the future. + * + * <p>The {@link OkHttpClient#getDispatcher dispatcher} defines when the request will run: + * usually immediately unless there are several other requests currently being executed. + * + * <p>This client will later call back {@code responseCallback} with either an HTTP response or a + * failure exception. If you {@link #cancel} a request before it completes the callback will not + * be invoked. + * + * @throws IllegalStateException when the call has already been executed. + */ + public void enqueue(final WebSocketListener listener) { + Callback responseCallback = new Callback() { + @Override public void onResponse(Response response) throws IOException { + try { + createWebSocket(response, listener); + } catch (IOException e) { + listener.onFailure(e); + } + } + + @Override public void onFailure(Request request, IOException e) { + listener.onFailure(e); + } + }; + // TODO call.enqueue(responseCallback, true); + Internal.instance.callEnqueue(call, responseCallback, true); + } + + /** Cancels the request, if possible. Requests that are already complete cannot be canceled. */ + public void cancel() { + call.cancel(); + } + + private void createWebSocket(Response response, WebSocketListener listener) + throws IOException { + if (response.code() != 101) { + // TODO call.engine.releaseConnection(); + Internal.instance.callEngineReleaseConnection(call); + throw new ProtocolException("Expected HTTP 101 response but was '" + + response.code() + + " " + + response.message() + + "'"); + } + + String headerConnection = response.header("Connection"); + if (!"Upgrade".equalsIgnoreCase(headerConnection)) { + throw new ProtocolException( + "Expected 'Connection' header value 'Upgrade' but was '" + headerConnection + "'"); + } + String headerUpgrade = response.header("Upgrade"); + if (!"websocket".equalsIgnoreCase(headerUpgrade)) { + throw new ProtocolException( + "Expected 'Upgrade' header value 'websocket' but was '" + headerUpgrade + "'"); + } + String headerAccept = response.header("Sec-WebSocket-Accept"); + String acceptExpected = Util.shaBase64(key + WebSocketProtocol.ACCEPT_MAGIC); + if (!acceptExpected.equals(headerAccept)) { + throw new ProtocolException("Expected 'Sec-WebSocket-Accept' header value '" + + acceptExpected + + "' but was '" + + headerAccept + + "'"); + } + + // TODO connection = call.engine.getConnection(); + Connection connection = Internal.instance.callEngineGetConnection(call); + // TODO if (!connection.clearOwner()) { + if (!Internal.instance.clearOwner(connection)) { + throw new IllegalStateException("Unable to take ownership of connection."); + } + + Socket socket = connection.getSocket(); + BufferedSource source = Okio.buffer(Okio.source(socket)); + BufferedSink sink = Okio.buffer(Okio.sink(socket)); + + final RealWebSocket webSocket = + ConnectionWebSocket.create(response, connection, source, sink, random, listener); + + // Start a dedicated thread for reading the web socket. + new Thread(new NamedRunnable("OkHttp WebSocket reader %s", request.urlString()) { + @Override protected void execute() { + while (webSocket.readMessage()) { + } + } + }).start(); + + // TODO connection.setOwner(webSocket); + Internal.instance.connectionSetOwner(connection, webSocket); + + listener.onOpen(webSocket, request, response); + } + + // Keep static so that the WebSocketCall instance can be garbage collected. + private static class ConnectionWebSocket extends RealWebSocket { + static RealWebSocket create(Response response, Connection connection, BufferedSource source, + BufferedSink sink, Random random, WebSocketListener listener) { + String url = response.request().urlString(); + ThreadPoolExecutor replyExecutor = + new ThreadPoolExecutor(1, 1, 1, SECONDS, new LinkedBlockingDeque<Runnable>(), + Util.threadFactory(String.format("OkHttp %s WebSocket", url), true)); + replyExecutor.allowCoreThreadTimeOut(true); + + return new ConnectionWebSocket(connection, source, sink, random, replyExecutor, listener, + url); + } + + private final Connection connection; + + private ConnectionWebSocket(Connection connection, BufferedSource source, BufferedSink sink, + Random random, Executor replyExecutor, WebSocketListener listener, String url) { + super(true /* is client */, source, sink, random, replyExecutor, listener, url); + this.connection = connection; + } + + @Override protected void closeConnection() throws IOException { + // TODO connection.closeIfOwnedBy(this); + Internal.instance.closeIfOwnedBy(connection, this); + } + } +} diff --git a/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java new file mode 100644 index 0000000..a113eed --- /dev/null +++ b/okhttp-ws/src/main/java/com/squareup/okhttp/ws/WebSocketListener.java @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2014 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.squareup.okhttp.ws; + +import com.squareup.okhttp.Request; +import com.squareup.okhttp.Response; +import java.io.IOException; +import okio.Buffer; +import okio.BufferedSource; + +import static com.squareup.okhttp.ws.WebSocket.PayloadType; + +/** Listener for server-initiated messages on a connected {@link WebSocket}. */ +public interface WebSocketListener { + void onOpen(WebSocket webSocket, Request request, Response response) throws IOException; + + /** + * Called when a server message is received. The {@code type} indicates whether the + * {@code payload} should be interpreted as UTF-8 text or binary data. + * + * <p>Implementations <strong>must</strong> call {@code source.close()} before returning. This + * indicates completion of parsing the message payload and will consume any remaining bytes in + * the message. + */ + void onMessage(BufferedSource payload, PayloadType type) throws IOException; + + /** + * Called when a server pong is received. This is usually a result of calling {@link + * WebSocket#sendPing(Buffer)} but might also be unsolicited. + */ + void onPong(Buffer payload); + + /** + * Called when the server sends a close message. This may have been initiated + * from a call to {@link WebSocket#close(int, String) close()} or as an unprompted + * message from the server. + * + * @param code The <a href="http://tools.ietf.org/html/rfc6455#section-7.4.1">RFC-compliant</a> + * status code. + * @param reason Reason for close or an empty string. + */ + void onClose(int code, String reason); + + /** Called when the transport or protocol layer of this web socket errors during communication. */ + void onFailure(IOException e); +} |