diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 1df7c5b068..98b8acf619 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -18,10 +18,9 @@ import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSource import akka.testkit.TestProbe import akka.util.ByteString - import scala.concurrent.duration._ +import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.{ Await, Future } -import scala.util.Random import scala.util.control.NoStackTrace class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { @@ -31,24 +30,20 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val materializer = ActorMaterializer(settings) val timeout = 300.milliseconds - def randomArray(size: Int): Array[Byte] = { + def randomByteString(size: Int): ByteString = { val a = new Array[Byte](size) - Random.nextBytes(a) - a + ThreadLocalRandom.current().nextBytes(a) + ByteString(a) } - val byteArray = randomArray(3) - val byteString = ByteString(byteArray) + val byteString = randomByteString(3) + val byteArray = byteString.toArray - def newArray(size: Int, bs: ByteString): Array[Byte] = { - val probe = new Array[Byte](size) - bs.copyToArray(probe, 0) - probe + private[this] def readN(is: InputStream, n: Int): (Int, ByteString) = { + val buf = new Array[Byte](n) + val r = is.read(buf) + (r, ByteString.fromArray(buf, 0, r)) } - def newArray() = new Array[Byte](3) - - def expectSuccess[T](f: Future[T], value: T) = - Await.result(f, timeout) should be(value) object InputStreamSinkTestMessages { case object Push extends NoSerializationVerificationNeeded @@ -86,156 +81,124 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "InputStreamSink" must { "read bytes from InputStream" in assertAllStagesStopped { val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) - val arr = newArray() - inputStream.read(arr) should ===(3) - arr should ===(byteArray) + readN(inputStream, byteString.size) should ===((byteString.size, byteString)) inputStream.close() } "read bytes correctly if requested by InputStream not in chunk size" in assertAllStagesStopped { val sinkProbe = TestProbe() - val byteArray2 = randomArray(3) - val inputStream = Source(byteString :: ByteString(byteArray2) :: Nil).runWith(testSink(sinkProbe)) + val byteString2 = randomByteString(3) + val inputStream = Source(byteString :: byteString2 :: Nil).runWith(testSink(sinkProbe)) sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push) - val arr = new Array[Byte](2) - inputStream.read(arr) - arr should ===(Array(byteArray(0), byteArray(1))) - inputStream.read(arr) - arr should ===(Array(byteArray(2), byteArray2(0))) - inputStream.read(arr) - arr should ===(Array(byteArray2(1), byteArray2(2))) + readN(inputStream, 2) should ===((2, byteString.take(2))) + readN(inputStream, 2) should ===((2, byteString.drop(2) ++ byteString2.take(1))) + readN(inputStream, 2) should ===((2, byteString2.drop(1))) inputStream.close() } "returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped { - val data = randomArray(2) - val inputStream = Source.single(ByteString(data)).runWith(StreamConverters.asInputStream()) + val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) - val arr = newArray() - inputStream.read(arr) should ===(2) - arr should ===(Array(data(0), data(1), 0)) + val arr = new Array[Byte](byteString.size + 1) + inputStream.read(arr) should ===(arr.size - 1) + ByteString(arr) should ===(byteString :+ 0) inputStream.close() } "block read until get requested number of bytes from upstream" in assertAllStagesStopped { val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() + val f = Future(inputStream.read(new Array[Byte](byteString.size))) - val arr = newArray() - val f = Future(inputStream.read(arr)) the[Exception] thrownBy Await.result(f, timeout) shouldBe a[TimeoutException] probe.sendNext(byteString) - expectSuccess(f, 3) + Await.result(f, timeout) should ===(byteString.size) probe.sendComplete() - inputStream.read(newArray()) + inputStream.read() should ===(-1) inputStream.close() } "fill up buffer by default" in assertAllStagesStopped { - val array2 = randomArray(3) - val inputStream = Source(byteString :: ByteString(array2) :: Nil).runWith(StreamConverters.asInputStream()) + val byteString2 = randomByteString(3) + val inputStream = Source(byteString :: byteString2 :: Nil).runWith(StreamConverters.asInputStream()) - val arr1 = newArray() - val arr2 = newArray() - val f1 = Future(inputStream.read(arr1)) - val f2 = Future(inputStream.read(arr2)) - Await.result(f1, timeout) should be(3) - Await.result(f2, timeout) should be(3) - - arr1 should ===(byteString) - arr2 should ===(array2) + readN(inputStream, 3) should ===((3, byteString)) + readN(inputStream, 3) should ===((3, byteString2)) inputStream.close() } "throw error when reactive stream is closed" in assertAllStagesStopped { val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() - probe.sendNext(byteString) inputStream.close() probe.expectCancellation() - the[Exception] thrownBy inputStream.read(newArray()) shouldBe a[IOException] + the[Exception] thrownBy inputStream.read() shouldBe a[IOException] } "return all data when upstream is completed" in assertAllStagesStopped { val sinkProbe = TestProbe() val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run() + val bytes = randomByteString(1) - val bytes = randomArray(1) - probe.sendNext(ByteString(bytes)) + probe.sendNext(bytes) sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) probe.sendComplete() sinkProbe.expectMsg(InputStreamSinkTestMessages.Finish) - val arr = newArray() - val f = Future(inputStream.read(arr)) - expectSuccess(f, 1) - arr should ===(Array[Byte](bytes(0), 0, 0)) + readN(inputStream, 3) should ===((1, bytes)) } "work when read chunks smaller than stream chunks" in assertAllStagesStopped { - var bytes = ByteString(randomArray(10)) + val bytes = randomByteString(10) val inputStream = Source.single(bytes).runWith(StreamConverters.asInputStream()) - for (i ← 0 to 3) { - val in = newArray() - inputStream.read(in) - in should ===(newArray(3, bytes.take(3))) - bytes = bytes.drop(3) - } + for (expect ← bytes.sliding(3, 3)) + readN(inputStream, 3) should ===((expect.size, expect)) + inputStream.close() } "throw exception when call read with wrong parameters" in assertAllStagesStopped { val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) - - an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), -1, 2)) - an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 5)) + val buf = new Array[Byte](3) + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(buf, -1, 2)) + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(buf, 0, 5)) an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(new Array[Byte](0), 0, 1)) - an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 0)) - + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(buf, 0, 0)) inputStream.close() } "successfully read several chunks at once" in assertAllStagesStopped { - val bytes = List.fill(4)(ByteString(randomArray(4))) + val bytes = List.fill(4)(randomByteString(4)) val sinkProbe = TestProbe() val inputStream = Source[ByteString](bytes).runWith(testSink(sinkProbe)) //need to wait while all elements arrive to sink - for (i ← 0 to 3) sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) + bytes foreach { _ ⇒ sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) } - for (i ← 0 to 1) { - val in = new Array[Byte](8) - inputStream.read(in) should ===(8) - in should ===(bytes(i * 2) ++ bytes(i * 2 + 1)) - } + for (i ← 0 to 1) + readN(inputStream, 8) should ===((8, bytes(i * 2) ++ bytes(i * 2 + 1))) inputStream.close() } "work when read chunks bigger than stream chunks" in assertAllStagesStopped { - val bytes1 = ByteString(randomArray(10)) - val bytes2 = ByteString(randomArray(10)) + val bytes1 = randomByteString(10) + val bytes2 = randomByteString(10) val sinkProbe = TestProbe() - val inputStream = Source(bytes1 :: bytes2 :: Nil).runWith(testSink(sinkProbe)) //need to wait while both elements arrive to sink sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push) - val in1 = new Array[Byte](15) - inputStream.read(in1) should ===(15) - in1 should ===(bytes1 ++ bytes2.take(5)) - - val in2 = new Array[Byte](15) - inputStream.read(in2) should ===(5) - in2 should ===(newArray(15, bytes2.drop(5))) + readN(inputStream, 15) should ===((15, bytes1 ++ bytes2.take(5))) + readN(inputStream, 15) should ===((5, bytes2.drop(5))) inputStream.close() } @@ -243,11 +206,8 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "return -1 when read after stream is completed" in assertAllStagesStopped { val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) - val arr = newArray() - inputStream.read(arr) - arr should ===(byteArray) - - Await.result(Future(inputStream.read(arr)), timeout) should ===(-1) + readN(inputStream, byteString.size) should ===((byteString.size, byteString)) + inputStream.read() should ===(-1) inputStream.close() } @@ -260,25 +220,17 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { probe.sendNext(byteString) sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) - val arr = newArray() - inputStream.read(arr) + readN(inputStream, byteString.size) should ===((byteString.size, byteString)) probe.sendError(ex) sinkProbe.expectMsg(InputStreamSinkTestMessages.Failure(ex)) - val p = Future(inputStream.read(arr)) - p.onFailure { - case e ⇒ - (e.isInstanceOf[IOException] && e.getCause.equals(ex)) should ===(true) - Unit - } - p.onSuccess { case _ ⇒ fail() } - + val e = intercept[IOException] { Await.result(Future(inputStream.read()), timeout) } + e.getCause should ===(ex) } "use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped { val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig) val materializer = ActorMaterializer()(sys) - try { TestSource.probe[ByteString].runWith(StreamConverters.asInputStream())(materializer) materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor) @@ -286,7 +238,5 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } - } - }