diff options
Diffstat (limited to 'okio/src/jvmTest/java/okio/PipeTest.java')
-rw-r--r-- | okio/src/jvmTest/java/okio/PipeTest.java | 376 |
1 files changed, 376 insertions, 0 deletions
diff --git a/okio/src/jvmTest/java/okio/PipeTest.java b/okio/src/jvmTest/java/okio/PipeTest.java new file mode 100644 index 00000000..030e6ba0 --- /dev/null +++ b/okio/src/jvmTest/java/okio/PipeTest.java @@ -0,0 +1,376 @@ +/* + * Copyright (C) 2016 Square, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package okio; + +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.Random; +import java.util.concurrent.Callable; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import org.junit.After; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +public final class PipeTest { + final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(2); + + @After public void tearDown() throws Exception { + executorService.shutdown(); + } + + @Test public void test() throws Exception { + Pipe pipe = new Pipe(6); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3L); + + Source source = pipe.source(); + Buffer readBuffer = new Buffer(); + assertEquals(3L, source.read(readBuffer, 6L)); + assertEquals("abc", readBuffer.readUtf8()); + + pipe.sink().close(); + assertEquals(-1L, source.read(readBuffer, 6L)); + + source.close(); + } + + /** + * A producer writes the first 16 MiB of bytes generated by {@code new Random(0)} to a sink, and a + * consumer consumes them. Both compute hashes of their data to confirm that they're as expected. + */ + @Test public void largeDataset() throws Exception { + final Pipe pipe = new Pipe(1000L); // An awkward size to force producer/consumer exchange. + final long totalBytes = 16L * 1024L * 1024L; + ByteString expectedHash = ByteString.decodeHex("7c3b224bea749086babe079360cf29f98d88262d"); + + // Write data to the sink. + Future<ByteString> sinkHash = executorService.submit(new Callable<ByteString>() { + @Override public ByteString call() throws Exception { + HashingSink hashingSink = HashingSink.sha1(pipe.sink()); + Random random = new Random(0); + byte[] data = new byte[8192]; + + Buffer buffer = new Buffer(); + for (long i = 0L; i < totalBytes; i += data.length) { + random.nextBytes(data); + buffer.write(data); + hashingSink.write(buffer, buffer.size()); + } + + hashingSink.close(); + return hashingSink.hash(); + } + }); + + // Read data from the source. + Future<ByteString> sourceHash = executorService.submit(new Callable<ByteString>() { + @Override public ByteString call() throws Exception { + Buffer blackhole = new Buffer(); + HashingSink hashingSink = HashingSink.sha1(blackhole); + + Buffer buffer = new Buffer(); + while (pipe.source().read(buffer, Long.MAX_VALUE) != -1) { + hashingSink.write(buffer, buffer.size()); + blackhole.clear(); + } + + pipe.source().close(); + return hashingSink.hash(); + } + }); + + assertEquals(expectedHash, sinkHash.get()); + assertEquals(expectedHash, sourceHash.get()); + } + + @Test public void sinkTimeout() throws Exception { + TestUtil.INSTANCE.assumeNotWindows(); + + Pipe pipe = new Pipe(3); + pipe.sink().timeout().timeout(1000, TimeUnit.MILLISECONDS); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3L); + double start = now(); + try { + pipe.sink().write(new Buffer().writeUtf8("def"), 3L); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(1000.0, start); + + Buffer readBuffer = new Buffer(); + assertEquals(3L, pipe.source().read(readBuffer, 6L)); + assertEquals("abc", readBuffer.readUtf8()); + } + + @Test public void sourceTimeout() throws Exception { + TestUtil.INSTANCE.assumeNotWindows(); + + Pipe pipe = new Pipe(3L); + pipe.source().timeout().timeout(1000, TimeUnit.MILLISECONDS); + double start = now(); + Buffer readBuffer = new Buffer(); + try { + pipe.source().read(readBuffer, 6L); + fail(); + } catch (InterruptedIOException expected) { + assertEquals("timeout", expected.getMessage()); + } + assertElapsed(1000.0, start); + assertEquals(0, readBuffer.size()); + } + + /** + * The writer is writing 12 bytes as fast as it can to a 3 byte buffer. The reader alternates + * sleeping 1000 ms, then reading 3 bytes. That should make for an approximate timeline like + * this: + * + * 0: writer writes 'abc', blocks 0: reader sleeps until 1000 + * 1000: reader reads 'abc', sleeps until 2000 + * 1000: writer writes 'def', blocks + * 2000: reader reads 'def', sleeps until 3000 + * 2000: writer writes 'ghi', blocks + * 3000: reader reads 'ghi', sleeps until 4000 + * 3000: writer writes 'jkl', returns + * 4000: reader reads 'jkl', returns + * + * Because the writer is writing to a buffer, it finishes before the reader does. + */ + @Test public void sinkBlocksOnSlowReader() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Buffer buffer = new Buffer(); + Thread.sleep(1000L); + assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE)); + assertEquals("abc", buffer.readUtf8()); + Thread.sleep(1000L); + assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE)); + assertEquals("def", buffer.readUtf8()); + Thread.sleep(1000L); + assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE)); + assertEquals("ghi", buffer.readUtf8()); + Thread.sleep(1000L); + assertEquals(3, pipe.source().read(buffer, Long.MAX_VALUE)); + assertEquals("jkl", buffer.readUtf8()); + } catch (IOException | InterruptedException e) { + throw new AssertionError(); + } + } + }); + + double start = now(); + pipe.sink().write(new Buffer().writeUtf8("abcdefghijkl"), 12); + assertElapsed(3000.0, start); + } + + @Test public void sinkWriteFailsByClosedReader() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.schedule(new Runnable() { + @Override public void run() { + try { + pipe.source().close(); + } catch (IOException e) { + throw new AssertionError(); + } + } + }, 1000, TimeUnit.MILLISECONDS); + + double start = now(); + try { + pipe.sink().write(new Buffer().writeUtf8("abcdef"), 6); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + assertElapsed(1000.0, start); + } + } + + @Test public void sinkFlushDoesntWaitForReader() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3); + pipe.sink().flush(); + + BufferedSource bufferedSource = Okio.buffer(pipe.source()); + assertEquals("abc", bufferedSource.readUtf8(3)); + } + + @Test public void sinkFlushFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3); + pipe.source().close(); + try { + pipe.sink().flush(); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + } + } + + @Test public void sinkCloseFailsIfReaderIsClosedBeforeAllDataIsRead() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3); + pipe.source().close(); + try { + pipe.sink().close(); + fail(); + } catch (IOException expected) { + assertEquals("source is closed", expected.getMessage()); + } + } + + @Test public void sinkClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink().close(); + try { + pipe.sink().write(new Buffer().writeUtf8("abc"), 3); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + try { + pipe.sink().flush(); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sinkMultipleClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink().close(); + pipe.sink().close(); + } + + @Test public void sinkCloseDoesntWaitForSourceRead() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3); + pipe.sink().close(); + + BufferedSource bufferedSource = Okio.buffer(pipe.source()); + assertEquals("abc", bufferedSource.readUtf8()); + assertTrue(bufferedSource.exhausted()); + } + + @Test public void sourceClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.source().close(); + try { + pipe.source().read(new Buffer(), 3); + fail(); + } catch (IllegalStateException expected) { + assertEquals("closed", expected.getMessage()); + } + } + + @Test public void sourceMultipleClose() throws Exception { + Pipe pipe = new Pipe(100L); + pipe.source().close(); + pipe.source().close(); + } + + @Test public void sourceReadUnblockedByClosedSink() throws Exception { + final Pipe pipe = new Pipe(3L); + executorService.schedule(new Runnable() { + @Override public void run() { + try { + pipe.sink().close(); + } catch (IOException e) { + throw new AssertionError(); + } + } + }, 1000, TimeUnit.MILLISECONDS); + + double start = now(); + Buffer readBuffer = new Buffer(); + assertEquals(-1, pipe.source().read(readBuffer, Long.MAX_VALUE)); + assertEquals(0, readBuffer.size()); + assertElapsed(1000.0, start); + } + + /** + * The writer has 12 bytes to write. It alternates sleeping 1000 ms, then writing 3 bytes. The + * reader is reading as fast as it can. That should make for an approximate timeline like this: + * + * 0: writer sleeps until 1000 + * 0: reader blocks + * 1000: writer writes 'abc', sleeps until 2000 + * 1000: reader reads 'abc' + * 2000: writer writes 'def', sleeps until 3000 + * 2000: reader reads 'def' + * 3000: writer writes 'ghi', sleeps until 4000 + * 3000: reader reads 'ghi' + * 4000: writer writes 'jkl', returns + * 4000: reader reads 'jkl', returns + */ + @Test public void sourceBlocksOnSlowWriter() throws Exception { + final Pipe pipe = new Pipe(100L); + executorService.execute(new Runnable() { + @Override public void run() { + try { + Thread.sleep(1000L); + pipe.sink().write(new Buffer().writeUtf8("abc"), 3); + Thread.sleep(1000L); + pipe.sink().write(new Buffer().writeUtf8("def"), 3); + Thread.sleep(1000L); + pipe.sink().write(new Buffer().writeUtf8("ghi"), 3); + Thread.sleep(1000L); + pipe.sink().write(new Buffer().writeUtf8("jkl"), 3); + } catch (IOException | InterruptedException e) { + throw new AssertionError(); + } + } + }); + + double start = now(); + Buffer readBuffer = new Buffer(); + + assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE)); + assertEquals("abc", readBuffer.readUtf8()); + assertElapsed(1000.0, start); + + assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE)); + assertEquals("def", readBuffer.readUtf8()); + assertElapsed(2000.0, start); + + assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE)); + assertEquals("ghi", readBuffer.readUtf8()); + assertElapsed(3000.0, start); + + assertEquals(3, pipe.source().read(readBuffer, Long.MAX_VALUE)); + assertEquals("jkl", readBuffer.readUtf8()); + assertElapsed(4000.0, start); + } + + /** Returns the nanotime in milliseconds as a double for measuring timeouts. */ + private double now() { + return System.nanoTime() / 1000000.0d; + } + + /** + * Fails the test unless the time from start until now is duration, accepting differences in + * -50..+450 milliseconds. + */ + private void assertElapsed(double duration, double start) { + assertEquals(duration, now() - start - 200d, 250.0); + } +} |