From edd3dbdbc851e5b79606d51dfe169f083ce58663 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 1 Feb 2016 14:08:46 +0100 Subject: [PATCH] #19392: asInputStream timeout --- .../test/scala/akka/stream/io/InputStreamSinkSpec.scala | 9 +++++++++ .../scala/akka/stream/impl/io/InputStreamSinkStage.scala | 9 ++++----- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 6940cf7861..e351e1a78c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -238,5 +238,14 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } + + "work when more bytes pulled from InputStream than available" in assertAllStagesStopped { + val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) + + readN(inputStream, byteString.size * 2) should ===((byteString.size, byteString)) + inputStream.read() should ===(-1) + + inputStream.close() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 0578e21aa7..8376a99e9c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -45,7 +45,6 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1) val logic = new GraphStageLogic(shape) with StageWithCallback { - var pullRequestIsSent = true private val callback: AsyncCallback[AdapterToStageMessage] = getAsyncCallback { @@ -56,10 +55,8 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr override def wakeUp(msg: AdapterToStageMessage): Unit = callback.invoke(msg) private def sendPullIfAllowed(): Unit = - if (!pullRequestIsSent) { - pullRequestIsSent = true + if (dataQueue.remainingCapacity() > 1 && !hasBeenPulled(in)) pull(in) - } override def preStart() = pull(in) @@ -67,7 +64,6 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr override def onPush(): Unit = { //1 is buffer for Finished or Failed callback require(dataQueue.remainingCapacity() > 1) - pullRequestIsSent = false dataQueue.add(Data(grab(in))) if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed() } @@ -193,6 +189,9 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt case Data(data) ⇒ detachedChunk = Some(data) detachedChunk + case Finished ⇒ + isStageAlive = false + None case _ ⇒ None } case Some(_) ⇒ detachedChunk