=str #19585 fix long initialization for InputStreamSinkSpec

This commit is contained in:
Alexander Golubev 2016-01-26 14:41:45 -05:00
parent b9106da698
commit 08f8bbd560

View file

@ -22,6 +22,7 @@ private[akka] object InputStreamSinkStage {
sealed trait StreamToAdapterMessage
case class Data(data: ByteString) extends StreamToAdapterMessage
case object Finished extends StreamToAdapterMessage
case object Initialized extends StreamToAdapterMessage
case class Failed(cause: Throwable) extends StreamToAdapterMessage
sealed trait StageWithCallback {
@ -42,7 +43,7 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr
require(maxBuffer > 0, "Buffer size must be greater than 0")
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes): (GraphStageLogic, InputStream) = {
val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1)
val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 2)
val logic = new GraphStageLogic(shape) with StageWithCallback {
var pullRequestIsSent = true
@ -61,7 +62,10 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr
pull(in)
}
override def preStart() = pull(in)
override def preStart() = {
dataQueue.add(Initialized)
pull(in)
}
setHandler(in, new InHandler {
override def onPush(): Unit = {
@ -94,6 +98,7 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
readTimeout: FiniteDuration)
extends InputStream {
var isInitialized = false
var isActive = true
var isStageAlive = true
val subscriberClosedException = new IOException("Reactive stream is terminated, no reads are possible")
@ -101,8 +106,10 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
@scala.throws(classOf[IOException])
private[this] def executeIfNotClosed[T](f: () T): T =
if (isActive) f()
else throw subscriberClosedException
if (isActive) {
waitIfNotInitialized()
f()
} else throw subscriberClosedException
@scala.throws(classOf[IOException])
override def read(): Int = {
@ -186,6 +193,15 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
}
}
private[this] def waitIfNotInitialized(): Unit = {
if (!isInitialized) {
sharedBuffer.poll(readTimeout.toMillis, TimeUnit.MILLISECONDS) match {
case Initialized isInitialized = true
case _ require(false, "First message must be Initialized notification")
}
}
}
private[this] def grabDataChunk(): Option[ByteString] = {
detachedChunk match {
case None