diff --git a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala index 6940cf7861..e351e1a78c 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/io/InputStreamSinkSpec.scala @@ -238,5 +238,14 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) { assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher") } finally shutdown(sys) } + + "work when more bytes pulled from InputStream than available" in assertAllStagesStopped { + val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream()) + + readN(inputStream, byteString.size * 2) should ===((byteString.size, byteString)) + inputStream.read() should ===(-1) + + inputStream.close() + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 0578e21aa7..8376a99e9c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -45,7 +45,6 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1) val logic = new GraphStageLogic(shape) with StageWithCallback { - var pullRequestIsSent = true private val callback: AsyncCallback[AdapterToStageMessage] = getAsyncCallback { @@ -56,10 +55,8 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr override def wakeUp(msg: AdapterToStageMessage): Unit = callback.invoke(msg) private def sendPullIfAllowed(): Unit = - if (!pullRequestIsSent) { - pullRequestIsSent = true + if (dataQueue.remainingCapacity() > 1 && !hasBeenPulled(in)) pull(in) - } override def preStart() = pull(in) @@ -67,7 +64,6 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr override def onPush(): Unit = { //1 is buffer for Finished or Failed callback require(dataQueue.remainingCapacity() > 1) - pullRequestIsSent = false dataQueue.add(Data(grab(in))) if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed() } @@ -193,6 +189,9 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt case Data(data) ⇒ detachedChunk = Some(data) detachedChunk + case Finished ⇒ + isStageAlive = false + None case _ ⇒ None } case Some(_) ⇒ detachedChunk