From 5835e2fa6ec95301d0017ac6d060a0d326a2fe9a Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 18 Dec 2015 00:45:52 -0500 Subject: [PATCH] =str #19215 Sink.inputStream OutOfBoundsException --- .../akka/stream/io/InputStreamSinkSpec.scala | 103 +++++++++++++----- .../stream/impl/io/InputStreamSinkStage.scala | 22 ++-- 2 files changed, 89 insertions(+), 36 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index fa948e05e9..1df7c5b068 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -11,7 +11,7 @@ import akka.stream._ import akka.stream.impl.StreamSupervisor.Children import akka.stream.impl.io.InputStreamSinkStage import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor } -import akka.stream.scaladsl.{ Keep, Sink, StreamConverters } +import akka.stream.scaladsl.{ Source, Keep, Sink, StreamConverters } import akka.stream.stage.InHandler import akka.stream.testkit.AkkaSpec import akka.stream.testkit.Utils._ @@ -31,7 +31,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { implicit val materializer = ActorMaterializer(settings) val timeout = 300.milliseconds - def randomArray(size: Int) = { + def randomArray(size: Int): Array[Byte] = { val a = new Array[Byte](size) Random.nextBytes(a) a @@ -40,6 +40,11 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { val byteArray = randomArray(3) val byteString = ByteString(byteArray) + def newArray(size: Int, bs: ByteString): Array[Byte] = { + val probe = new Array[Byte](size) + bs.copyToArray(probe, 0) + probe + } def newArray() = new Array[Byte](3) def expectSuccess[T](f: Future[T], value: T) = @@ -80,24 +85,17 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { "InputStreamSink" must { "read bytes from InputStream" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() - - probe.sendNext(byteString) + val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) val arr = newArray() - inputStream.read(arr) + inputStream.read(arr) should ===(3) arr should ===(byteArray) - - probe.sendComplete() inputStream.close() } "read bytes correctly if requested by InputStream not in chunk size" in assertAllStagesStopped { val sinkProbe = TestProbe() - val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run() - - probe.sendNext(byteString) val byteArray2 = randomArray(3) - probe.sendNext(ByteString(byteArray2)) + val inputStream = Source(byteString :: ByteString(byteArray2) :: Nil).runWith(testSink(sinkProbe)) sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push) @@ -113,15 +111,13 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() - val data = randomArray(2) - probe.sendNext(ByteString(data)) + val inputStream = Source.single(ByteString(data)).runWith(StreamConverters.asInputStream()) + val arr = newArray() inputStream.read(arr) should ===(2) arr should ===(Array(data(0), data(1), 0)) - probe.sendComplete() inputStream.close() } @@ -140,12 +136,8 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { } "fill up buffer by default" in assertAllStagesStopped { - import system.dispatcher - val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() - val array2 = randomArray(3) - probe.sendNext(byteString) - probe.sendNext(ByteString(array2)) + val inputStream = Source(byteString :: ByteString(array2) :: Nil).runWith(StreamConverters.asInputStream()) val arr1 = newArray() val arr2 = newArray() @@ -157,7 +149,6 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { arr1 should ===(byteString) arr2 should ===(array2) - probe.sendComplete() inputStream.close() } @@ -187,14 +178,74 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { arr should ===(Array[Byte](bytes(0), 0, 0)) } - "return -1 when read after stream is completed" in assertAllStagesStopped { - val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run() + "work when read chunks smaller than stream chunks" in assertAllStagesStopped { + var bytes = ByteString(randomArray(10)) + val inputStream = Source.single(bytes).runWith(StreamConverters.asInputStream()) + + for (i ← 0 to 3) { + val in = newArray() + inputStream.read(in) + in should ===(newArray(3, bytes.take(3))) + bytes = bytes.drop(3) + } + inputStream.close() + } + + "throw exception when call read with wrong parameters" in assertAllStagesStopped { + val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) + + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), -1, 2)) + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 5)) + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(new Array[Byte](0), 0, 1)) + an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 0)) + + inputStream.close() + } + + "successfully read several chunks at once" in assertAllStagesStopped { + val bytes = List.fill(4)(ByteString(randomArray(4))) + val sinkProbe = TestProbe() + val inputStream = Source[ByteString](bytes).runWith(testSink(sinkProbe)) + + //need to wait while all elements arrive to sink + for (i ← 0 to 3) sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) + + for (i ← 0 to 1) { + val in = new Array[Byte](8) + inputStream.read(in) should ===(8) + in should ===(bytes(i * 2) ++ bytes(i * 2 + 1)) + } + + inputStream.close() + } + + "work when read chunks bigger than stream chunks" in assertAllStagesStopped { + val bytes1 = ByteString(randomArray(10)) + val bytes2 = ByteString(randomArray(10)) + val sinkProbe = TestProbe() + + val inputStream = Source(bytes1 :: bytes2 :: Nil).runWith(testSink(sinkProbe)) + + //need to wait while both elements arrive to sink + sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push) + + val in1 = new Array[Byte](15) + inputStream.read(in1) should ===(15) + in1 should ===(bytes1 ++ bytes2.take(5)) + + val in2 = new Array[Byte](15) + inputStream.read(in2) should ===(5) + in2 should ===(newArray(15, bytes2.drop(5))) + + inputStream.close() + } + + "return -1 when read after stream is completed" in assertAllStagesStopped { + val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) - probe.sendNext(byteString) val arr = newArray() inputStream.read(arr) arr should ===(byteArray) - probe.sendComplete() Await.result(Future(inputStream.read(arr)), timeout) should ===(-1) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 76b20008c3..d5cf4a3f32 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -10,7 +10,6 @@ import akka.stream.impl.io.InputStreamSinkStage._ import akka.stream.stage._ import akka.util.ByteString import scala.annotation.tailrec -import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import akka.stream.{ Inlet, SinkShape, Attributes } @@ -98,7 +97,6 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt var isActive = true var isStageAlive = true val subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible") - var skipBytes = 0 var detachedChunk: Option[ByteString] = None @scala.throws(classOf[IOException]) @@ -118,6 +116,11 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt @scala.throws(classOf[IOException]) override def read(a: Array[Byte], begin: Int, length: Int): Int = { + require(a.length > 0, "array size must be >= 0") + require(begin >= 0, "begin must be >= 0") + require(length > 0, "length must be > 0") + require(begin + length <= a.length, "begin + length must be smaller or equal to the array length") + executeIfNotClosed(() ⇒ if (isStageAlive) { detachedChunk match { @@ -146,7 +149,7 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt private[this] def readBytes(a: Array[Byte], begin: Int, length: Int): Int = { require(detachedChunk.nonEmpty, "Chunk must be pulled from shared buffer") - val availableInChunk = detachedChunk.get.size - skipBytes + val availableInChunk = detachedChunk.get.size val readBytes = getData(a, begin, length, 0) if (readBytes >= availableInChunk) sendToStage(ReadElementAcknowledgement) readBytes @@ -166,18 +169,17 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt gotBytes: Int): Int = { grabDataChunk() match { case Some(data) ⇒ - val size = data.size - skipBytes - if (size + gotBytes <= length) { - System.arraycopy(data.toArray, skipBytes, arr, begin, size) - skipBytes = 0 + val size = data.size + if (size <= length) { + data.copyToArray(arr, begin, size) detachedChunk = None - if (length - size == 0) + if (size == length) gotBytes + size else getData(arr, begin + size, length - size, gotBytes + size) } else { - System.arraycopy(data.toArray, skipBytes, arr, begin, length) - skipBytes = length + data.copyToArray(arr, begin, length) + detachedChunk = Some(data.drop(length)) gotBytes + length } case None ⇒ gotBytes