From 08f8bbd560ec406033004f322d8bbd28f0ad1b92 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Tue, 26 Jan 2016 14:41:45 -0500 Subject: [PATCH] =str #19585 fix long initialization for InputStreamSinkSpec --- .../stream/impl/io/InputStreamSinkStage.scala | 24 +++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala index 0578e21aa7..85451ff8d8 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/io/InputStreamSinkStage.scala @@ -22,6 +22,7 @@ private[akka] object InputStreamSinkStage { sealed trait StreamToAdapterMessage case class Data(data: ByteString) extends StreamToAdapterMessage case object Finished extends StreamToAdapterMessage + case object Initialized extends StreamToAdapterMessage case class Failed(cause: Throwable) extends StreamToAdapterMessage sealed trait StageWithCallback { @@ -42,7 +43,7 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr require(maxBuffer > 0, "Buffer size must be greater than 0") override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InputStream) = { - val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1) + val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2) val logic = new GraphStageLogic(shape) with StageWithCallback { var pullRequestIsSent = true @@ -61,7 +62,10 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr pull(in) } - override def preStart() = pull(in) + override def preStart() = { + dataQueue.add(Initialized) + pull(in) + } setHandler(in, new InHandler { override def onPush(): Unit = { @@ -94,6 +98,7 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt readTimeout: FiniteDuration) extends InputStream { + var isInitialized = false var isActive = true var isStageAlive = true val subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible") @@ -101,8 +106,10 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt @scala.throws(classOf[IOException]) private[this] def executeIfNotClosed[T](f: () ⇒ T): T = - if (isActive) f() - else throw subscriberClosedException + if (isActive) { + waitIfNotInitialized() + f() + } else throw subscriberClosedException @scala.throws(classOf[IOException]) override def read(): Int = { @@ -186,6 +193,15 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt } } + private[this] def waitIfNotInitialized(): Unit = { + if (!isInitialized) { + sharedBuffer.poll(readTimeout.toMillis, TimeUnit.MILLISECONDS) match { + case Initialized ⇒ isInitialized = true + case _ ⇒ require(false, "First message must be Initialized notification") + } + } + } + private[this] def grabDataChunk(): Option[ByteString] = { detachedChunk match { case None ⇒