=str - 19327 - Fixes race condition in InputStreamSinkSpec
+ Cleanup of that spec
This commit is contained in:
parent
1ac9b4eafb
commit
fc4cdcdf53
1 changed files with 52 additions and 102 deletions
|
|
@ -18,10 +18,9 @@ import akka.stream.testkit.Utils._
|
|||
import akka.stream.testkit.scaladsl.TestSource
|
||||
import akka.testkit.TestProbe
|
||||
import akka.util.ByteString
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.util.Random
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
||||
|
|
@ -31,24 +30,20 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
val timeout = 300.milliseconds
|
||||
def randomArray(size: Int): Array[Byte] = {
|
||||
def randomByteString(size: Int): ByteString = {
|
||||
val a = new Array[Byte](size)
|
||||
Random.nextBytes(a)
|
||||
a
|
||||
ThreadLocalRandom.current().nextBytes(a)
|
||||
ByteString(a)
|
||||
}
|
||||
|
||||
val byteArray = randomArray(3)
|
||||
val byteString = ByteString(byteArray)
|
||||
val byteString = randomByteString(3)
|
||||
val byteArray = byteString.toArray
|
||||
|
||||
def newArray(size: Int, bs: ByteString): Array[Byte] = {
|
||||
val probe = new Array[Byte](size)
|
||||
bs.copyToArray(probe, 0)
|
||||
probe
|
||||
private[this] def readN(is: InputStream, n: Int): (Int, ByteString) = {
|
||||
val buf = new Array[Byte](n)
|
||||
val r = is.read(buf)
|
||||
(r, ByteString.fromArray(buf, 0, r))
|
||||
}
|
||||
def newArray() = new Array[Byte](3)
|
||||
|
||||
def expectSuccess[T](f: Future[T], value: T) =
|
||||
Await.result(f, timeout) should be(value)
|
||||
|
||||
object InputStreamSinkTestMessages {
|
||||
case object Push extends NoSerializationVerificationNeeded
|
||||
|
|
@ -86,156 +81,124 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
"InputStreamSink" must {
|
||||
"read bytes from InputStream" in assertAllStagesStopped {
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
val arr = newArray()
|
||||
inputStream.read(arr) should ===(3)
|
||||
arr should ===(byteArray)
|
||||
readN(inputStream, byteString.size) should ===((byteString.size, byteString))
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"read bytes correctly if requested by InputStream not in chunk size" in assertAllStagesStopped {
|
||||
val sinkProbe = TestProbe()
|
||||
val byteArray2 = randomArray(3)
|
||||
val inputStream = Source(byteString :: ByteString(byteArray2) :: Nil).runWith(testSink(sinkProbe))
|
||||
val byteString2 = randomByteString(3)
|
||||
val inputStream = Source(byteString :: byteString2 :: Nil).runWith(testSink(sinkProbe))
|
||||
|
||||
sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push)
|
||||
|
||||
val arr = new Array[Byte](2)
|
||||
inputStream.read(arr)
|
||||
arr should ===(Array(byteArray(0), byteArray(1)))
|
||||
inputStream.read(arr)
|
||||
arr should ===(Array(byteArray(2), byteArray2(0)))
|
||||
inputStream.read(arr)
|
||||
arr should ===(Array(byteArray2(1), byteArray2(2)))
|
||||
readN(inputStream, 2) should ===((2, byteString.take(2)))
|
||||
readN(inputStream, 2) should ===((2, byteString.drop(2) ++ byteString2.take(1)))
|
||||
readN(inputStream, 2) should ===((2, byteString2.drop(1)))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"returns less than was expected when the data source has provided some but not enough data" in assertAllStagesStopped {
|
||||
val data = randomArray(2)
|
||||
val inputStream = Source.single(ByteString(data)).runWith(StreamConverters.asInputStream())
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
|
||||
val arr = newArray()
|
||||
inputStream.read(arr) should ===(2)
|
||||
arr should ===(Array(data(0), data(1), 0))
|
||||
val arr = new Array[Byte](byteString.size + 1)
|
||||
inputStream.read(arr) should ===(arr.size - 1)
|
||||
ByteString(arr) should ===(byteString :+ 0)
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"block read until get requested number of bytes from upstream" in assertAllStagesStopped {
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run()
|
||||
val f = Future(inputStream.read(new Array[Byte](byteString.size)))
|
||||
|
||||
val arr = newArray()
|
||||
val f = Future(inputStream.read(arr))
|
||||
the[Exception] thrownBy Await.result(f, timeout) shouldBe a[TimeoutException]
|
||||
probe.sendNext(byteString)
|
||||
expectSuccess(f, 3)
|
||||
Await.result(f, timeout) should ===(byteString.size)
|
||||
|
||||
probe.sendComplete()
|
||||
inputStream.read(newArray())
|
||||
inputStream.read() should ===(-1)
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"fill up buffer by default" in assertAllStagesStopped {
|
||||
val array2 = randomArray(3)
|
||||
val inputStream = Source(byteString :: ByteString(array2) :: Nil).runWith(StreamConverters.asInputStream())
|
||||
val byteString2 = randomByteString(3)
|
||||
val inputStream = Source(byteString :: byteString2 :: Nil).runWith(StreamConverters.asInputStream())
|
||||
|
||||
val arr1 = newArray()
|
||||
val arr2 = newArray()
|
||||
val f1 = Future(inputStream.read(arr1))
|
||||
val f2 = Future(inputStream.read(arr2))
|
||||
Await.result(f1, timeout) should be(3)
|
||||
Await.result(f2, timeout) should be(3)
|
||||
|
||||
arr1 should ===(byteString)
|
||||
arr2 should ===(array2)
|
||||
readN(inputStream, 3) should ===((3, byteString))
|
||||
readN(inputStream, 3) should ===((3, byteString2))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"throw error when reactive stream is closed" in assertAllStagesStopped {
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(StreamConverters.asInputStream())(Keep.both).run()
|
||||
|
||||
probe.sendNext(byteString)
|
||||
inputStream.close()
|
||||
probe.expectCancellation()
|
||||
the[Exception] thrownBy inputStream.read(newArray()) shouldBe a[IOException]
|
||||
the[Exception] thrownBy inputStream.read() shouldBe a[IOException]
|
||||
}
|
||||
|
||||
"return all data when upstream is completed" in assertAllStagesStopped {
|
||||
val sinkProbe = TestProbe()
|
||||
val (probe, inputStream) = TestSource.probe[ByteString].toMat(testSink(sinkProbe))(Keep.both).run()
|
||||
val bytes = randomByteString(1)
|
||||
|
||||
val bytes = randomArray(1)
|
||||
probe.sendNext(ByteString(bytes))
|
||||
probe.sendNext(bytes)
|
||||
sinkProbe.expectMsg(InputStreamSinkTestMessages.Push)
|
||||
|
||||
probe.sendComplete()
|
||||
sinkProbe.expectMsg(InputStreamSinkTestMessages.Finish)
|
||||
|
||||
val arr = newArray()
|
||||
val f = Future(inputStream.read(arr))
|
||||
expectSuccess(f, 1)
|
||||
arr should ===(Array[Byte](bytes(0), 0, 0))
|
||||
readN(inputStream, 3) should ===((1, bytes))
|
||||
}
|
||||
|
||||
"work when read chunks smaller than stream chunks" in assertAllStagesStopped {
|
||||
var bytes = ByteString(randomArray(10))
|
||||
val bytes = randomByteString(10)
|
||||
val inputStream = Source.single(bytes).runWith(StreamConverters.asInputStream())
|
||||
|
||||
for (i ← 0 to 3) {
|
||||
val in = newArray()
|
||||
inputStream.read(in)
|
||||
in should ===(newArray(3, bytes.take(3)))
|
||||
bytes = bytes.drop(3)
|
||||
}
|
||||
for (expect ← bytes.sliding(3, 3))
|
||||
readN(inputStream, 3) should ===((expect.size, expect))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"throw exception when call read with wrong parameters" in assertAllStagesStopped {
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), -1, 2))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 5))
|
||||
val buf = new Array[Byte](3)
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(buf, -1, 2))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(buf, 0, 5))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(new Array[Byte](0), 0, 1))
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(newArray(), 0, 0))
|
||||
|
||||
an[IllegalArgumentException] shouldBe thrownBy(inputStream.read(buf, 0, 0))
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"successfully read several chunks at once" in assertAllStagesStopped {
|
||||
val bytes = List.fill(4)(ByteString(randomArray(4)))
|
||||
val bytes = List.fill(4)(randomByteString(4))
|
||||
val sinkProbe = TestProbe()
|
||||
val inputStream = Source[ByteString](bytes).runWith(testSink(sinkProbe))
|
||||
|
||||
//need to wait while all elements arrive to sink
|
||||
for (i ← 0 to 3) sinkProbe.expectMsg(InputStreamSinkTestMessages.Push)
|
||||
bytes foreach { _ ⇒ sinkProbe.expectMsg(InputStreamSinkTestMessages.Push) }
|
||||
|
||||
for (i ← 0 to 1) {
|
||||
val in = new Array[Byte](8)
|
||||
inputStream.read(in) should ===(8)
|
||||
in should ===(bytes(i * 2) ++ bytes(i * 2 + 1))
|
||||
}
|
||||
for (i ← 0 to 1)
|
||||
readN(inputStream, 8) should ===((8, bytes(i * 2) ++ bytes(i * 2 + 1)))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
||||
"work when read chunks bigger than stream chunks" in assertAllStagesStopped {
|
||||
val bytes1 = ByteString(randomArray(10))
|
||||
val bytes2 = ByteString(randomArray(10))
|
||||
val bytes1 = randomByteString(10)
|
||||
val bytes2 = randomByteString(10)
|
||||
val sinkProbe = TestProbe()
|
||||
|
||||
val inputStream = Source(bytes1 :: bytes2 :: Nil).runWith(testSink(sinkProbe))
|
||||
|
||||
//need to wait while both elements arrive to sink
|
||||
sinkProbe.expectMsgAllOf(InputStreamSinkTestMessages.Push, InputStreamSinkTestMessages.Push)
|
||||
|
||||
val in1 = new Array[Byte](15)
|
||||
inputStream.read(in1) should ===(15)
|
||||
in1 should ===(bytes1 ++ bytes2.take(5))
|
||||
|
||||
val in2 = new Array[Byte](15)
|
||||
inputStream.read(in2) should ===(5)
|
||||
in2 should ===(newArray(15, bytes2.drop(5)))
|
||||
readN(inputStream, 15) should ===((15, bytes1 ++ bytes2.take(5)))
|
||||
readN(inputStream, 15) should ===((5, bytes2.drop(5)))
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
|
@ -243,11 +206,8 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
"return -1 when read after stream is completed" in assertAllStagesStopped {
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
|
||||
val arr = newArray()
|
||||
inputStream.read(arr)
|
||||
arr should ===(byteArray)
|
||||
|
||||
Await.result(Future(inputStream.read(arr)), timeout) should ===(-1)
|
||||
readN(inputStream, byteString.size) should ===((byteString.size, byteString))
|
||||
inputStream.read() should ===(-1)
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
|
|
@ -260,25 +220,17 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
probe.sendNext(byteString)
|
||||
sinkProbe.expectMsg(InputStreamSinkTestMessages.Push)
|
||||
|
||||
val arr = newArray()
|
||||
inputStream.read(arr)
|
||||
readN(inputStream, byteString.size) should ===((byteString.size, byteString))
|
||||
|
||||
probe.sendError(ex)
|
||||
sinkProbe.expectMsg(InputStreamSinkTestMessages.Failure(ex))
|
||||
val p = Future(inputStream.read(arr))
|
||||
p.onFailure {
|
||||
case e ⇒
|
||||
(e.isInstanceOf[IOException] && e.getCause.equals(ex)) should ===(true)
|
||||
Unit
|
||||
}
|
||||
p.onSuccess { case _ ⇒ fail() }
|
||||
|
||||
val e = intercept[IOException] { Await.result(Future(inputStream.read()), timeout) }
|
||||
e.getCause should ===(ex)
|
||||
}
|
||||
|
||||
"use dedicated default-blocking-io-dispatcher by default" in assertAllStagesStopped {
|
||||
val sys = ActorSystem("dispatcher-testing", UnboundedMailboxConfig)
|
||||
val materializer = ActorMaterializer()(sys)
|
||||
|
||||
try {
|
||||
TestSource.probe[ByteString].runWith(StreamConverters.asInputStream())(materializer)
|
||||
materializer.asInstanceOf[ActorMaterializerImpl].supervisor.tell(StreamSupervisor.GetChildren, testActor)
|
||||
|
|
@ -286,7 +238,5 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
|
||||
} finally shutdown(sys)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue