Merge pull request #19223 from agolubev/agolubev-#19215-Sink.inputStream-OutOfBoundsException
=str #19215 Sink.inputStream OutOfBoundsException
This commit is contained in:
commit
7e05fa4e37
2 changed files with 89 additions and 36 deletions
|
|
@ -11,7 +11,7 @@ import akka.stream._
|
|||
import akka.stream.impl.StreamSupervisor.Children
|
||||
import akka.stream.impl.io.InputStreamSinkStage
|
||||
import akka.stream.impl.{ ActorMaterializerImpl, StreamSupervisor }
|
||||
import akka.stream.scaladsl.{ Keep, Sink, StreamConverters }
|
||||
import akka.stream.scaladsl.{ Source, Keep, Sink, StreamConverters }
|
||||
import akka.stream.stage.InHandler
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.testkit.Utils._
|
||||
|
|
@ -31,7 +31,7 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
val timeout = 300.milliseconds
|
||||
def randomArray(size: Int) = {
|
||||
def randomArray(size: Int): Array[Byte] = {
|
||||
val a = new Array[Byte](size)
|
||||
Random.nextBytes(a)
|
||||
a
|
||||
|
|
@ -40,6 +40,11 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
val byteArray = randomArray(3)
|
||||
val byteString = ByteString(byteArray)
|
||||
|
||||
def newArray(size: Int, bs: ByteString): Array[Byte] = {
|
||||
val probe = new Array[Byte](size)
|
||||
bs.copyToArray(probe, 0)
|
||||
probe
|
||||
}
|
||||
def newArray() = new Array[Byte](3)
|
||||
|
||||
def expectSuccess[T](f: Future[T], value: T) =
|
||||
|
|
@ -80,24 +85,17 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
|
||||
"InputStreamSink" must {
|
||||
"read bytes from InputStream" in assertAllStagesStopped {
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run()
|
||||
|
||||
probe.sendNext(byteString)
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
val arr = newArray()
|
||||
inputStream.read(arr)
|
||||
inputStream.read(arr) should ===(3)
|
||||
arr should ===(byteArray)
|
||||
|
||||
probe.sendComplete()
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"read bytes correctly if requested by InputStream not in chunk size" in assertAllStagesStopped {
|
||||
val sinkProbe = TestProbe()
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run()
|
||||
|
||||
probe.sendNext(byteString)
|
||||
val byteArray2 = randomArray(3)
|
||||
probe.sendNext(ByteString(byteArray2))
|
||||
val inputStream = Source(byteString :: ByteString(byteArray2) :: Nil).runWith(testSink(sinkProbe))
|
||||
|
||||
sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push)
|
||||
|
||||
|
|
@ -113,15 +111,13 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
}
|
||||
|
||||
"returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped {
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run()
|
||||
|
||||
val data = randomArray(2)
|
||||
probe.sendNext(ByteString(data))
|
||||
val inputStream = Source.single(ByteString(data)).runWith(StreamConverters.asInputStream())
|
||||
|
||||
val arr = newArray()
|
||||
inputStream.read(arr) should ===(2)
|
||||
arr should ===(Array(data(0), data(1), 0))
|
||||
|
||||
probe.sendComplete()
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
|
|
@ -140,12 +136,8 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
}
|
||||
|
||||
"fill up buffer by default" in assertAllStagesStopped {
|
||||
import system.dispatcher
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run()
|
||||
|
||||
val array2 = randomArray(3)
|
||||
probe.sendNext(byteString)
|
||||
probe.sendNext(ByteString(array2))
|
||||
val inputStream = Source(byteString :: ByteString(array2) :: Nil).runWith(StreamConverters.asInputStream())
|
||||
|
||||
val arr1 = newArray()
|
||||
val arr2 = newArray()
|
||||
|
|
@ -157,7 +149,6 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
arr1 should ===(byteString)
|
||||
arr2 should ===(array2)
|
||||
|
||||
probe.sendComplete()
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
|
|
@ -187,14 +178,74 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
arr should ===(Array[Byte](bytes(0), 0, 0))
|
||||
}
|
||||
|
||||
"return -1 when read after stream is completed" in assertAllStagesStopped {
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run()
|
||||
"work when read chunks smaller than stream chunks" in assertAllStagesStopped {
|
||||
var bytes = ByteString(randomArray(10))
|
||||
val inputStream = Source.single(bytes).runWith(StreamConverters.asInputStream())
|
||||
|
||||
for (i ← 0 to 3) {
|
||||
val in = newArray()
|
||||
inputStream.read(in)
|
||||
in should ===(newArray(3, bytes.take(3)))
|
||||
bytes = bytes.drop(3)
|
||||
}
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"throw exception when call read with wrong parameters" in assertAllStagesStopped {
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), -1, 2))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 5))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(new Array[Byte](0), 0, 1))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 0))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"successfully read several chunks at once" in assertAllStagesStopped {
|
||||
val bytes = List.fill(4)(ByteString(randomArray(4)))
|
||||
val sinkProbe = TestProbe()
|
||||
val inputStream = Source[ByteString](bytes).runWith(testSink(sinkProbe))
|
||||
|
||||
//need to wait while all elements arrive to sink
|
||||
for (i ← 0 to 3) sinkProbe.expectMsg(InputStreamSinkTestMessages.Push)
|
||||
|
||||
for (i ← 0 to 1) {
|
||||
val in = new Array[Byte](8)
|
||||
inputStream.read(in) should ===(8)
|
||||
in should ===(bytes(i * 2) ++ bytes(i * 2 + 1))
|
||||
}
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"work when read chunks bigger than stream chunks" in assertAllStagesStopped {
|
||||
val bytes1 = ByteString(randomArray(10))
|
||||
val bytes2 = ByteString(randomArray(10))
|
||||
val sinkProbe = TestProbe()
|
||||
|
||||
val inputStream = Source(bytes1 :: bytes2 :: Nil).runWith(testSink(sinkProbe))
|
||||
|
||||
//need to wait while both elements arrive to sink
|
||||
sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push)
|
||||
|
||||
val in1 = new Array[Byte](15)
|
||||
inputStream.read(in1) should ===(15)
|
||||
in1 should ===(bytes1 ++ bytes2.take(5))
|
||||
|
||||
val in2 = new Array[Byte](15)
|
||||
inputStream.read(in2) should ===(5)
|
||||
in2 should ===(newArray(15, bytes2.drop(5)))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"return -1 when read after stream is completed" in assertAllStagesStopped {
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
|
||||
probe.sendNext(byteString)
|
||||
val arr = newArray()
|
||||
inputStream.read(arr)
|
||||
arr should ===(byteArray)
|
||||
probe.sendComplete()
|
||||
|
||||
Await.result(Future(inputStream.read(arr)), timeout) should ===(-1)
|
||||
|
||||
|
|
|
|||
|
|
@ -10,7 +10,6 @@ import akka.stream.impl.io.InputStreamSinkStage._
|
|||
import akka.stream.stage._
|
||||
import akka.util.ByteString
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.stream.{ Inlet, SinkShape, Attributes }
|
||||
|
||||
|
|
@ -98,7 +97,6 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
|
|||
var isActive = true
|
||||
var isStageAlive = true
|
||||
val subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible")
|
||||
var skipBytes = 0
|
||||
var detachedChunk: Option[ByteString] = None
|
||||
|
||||
@scala.throws(classOf[IOException])
|
||||
|
|
@ -118,6 +116,11 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
|
|||
|
||||
@scala.throws(classOf[IOException])
|
||||
override def read(a: Array[Byte], begin: Int, length: Int): Int = {
|
||||
require(a.length > 0, "array size must be >= 0")
|
||||
require(begin >= 0, "begin must be >= 0")
|
||||
require(length > 0, "length must be > 0")
|
||||
require(begin + length <= a.length, "begin + length must be smaller or equal to the array length")
|
||||
|
||||
executeIfNotClosed(() ⇒
|
||||
if (isStageAlive) {
|
||||
detachedChunk match {
|
||||
|
|
@ -146,7 +149,7 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
|
|||
|
||||
private[this] def readBytes(a: Array[Byte], begin: Int, length: Int): Int = {
|
||||
require(detachedChunk.nonEmpty, "Chunk must be pulled from shared buffer")
|
||||
val availableInChunk = detachedChunk.get.size - skipBytes
|
||||
val availableInChunk = detachedChunk.get.size
|
||||
val readBytes = getData(a, begin, length, 0)
|
||||
if (readBytes >= availableInChunk) sendToStage(ReadElementAcknowledgement)
|
||||
readBytes
|
||||
|
|
@ -166,18 +169,17 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
|
|||
gotBytes: Int): Int = {
|
||||
grabDataChunk() match {
|
||||
case Some(data) ⇒
|
||||
val size = data.size - skipBytes
|
||||
if (size + gotBytes <= length) {
|
||||
System.arraycopy(data.toArray, skipBytes, arr, begin, size)
|
||||
skipBytes = 0
|
||||
val size = data.size
|
||||
if (size <= length) {
|
||||
data.copyToArray(arr, begin, size)
|
||||
detachedChunk = None
|
||||
if (length - size == 0)
|
||||
if (size == length)
|
||||
gotBytes + size
|
||||
else
|
||||
getData(arr, begin + size, length - size, gotBytes + size)
|
||||
} else {
|
||||
System.arraycopy(data.toArray, skipBytes, arr, begin, length)
|
||||
skipBytes = length
|
||||
data.copyToArray(arr, begin, length)
|
||||
detachedChunk = Some(data.drop(length))
|
||||
gotBytes + length
|
||||
}
|
||||
case None ⇒ gotBytes
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue