Merge pull request #19660 from drewhk/wip-19392-fix-asinputstream-drewhk
#19392: asInputStream timeout
This commit is contained in:
commit
1aaf74afa8
2 changed files with 13 additions and 5 deletions
|
|
@ -238,5 +238,14 @@ class InputStreamSinkSpec extends AkkaSpec(UnboundedMailboxConfig) {
|
|||
assertDispatcher(ref, "akka.stream.default-blocking-io-dispatcher")
|
||||
} finally shutdown(sys)
|
||||
}
|
||||
|
||||
"work when more bytes pulled from InputStream than available" in assertAllStagesStopped {
|
||||
val inputStream = Source.single(byteString).runWith(StreamConverters.asInputStream())
|
||||
|
||||
readN(inputStream, byteString.size * 2) should ===((byteString.size, byteString))
|
||||
inputStream.read() should ===(-1)
|
||||
|
||||
inputStream.close()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,6 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr
|
|||
val dataQueue = new LinkedBlockingDeque[StreamToAdapterMessage](maxBuffer + 1)
|
||||
|
||||
val logic = new GraphStageLogic(shape) with StageWithCallback {
|
||||
var pullRequestIsSent = true
|
||||
|
||||
private val callback: AsyncCallback[AdapterToStageMessage] =
|
||||
getAsyncCallback {
|
||||
|
|
@ -56,10 +55,8 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr
|
|||
override def wakeUp(msg: AdapterToStageMessage): Unit = callback.invoke(msg)
|
||||
|
||||
private def sendPullIfAllowed(): Unit =
|
||||
if (!pullRequestIsSent) {
|
||||
pullRequestIsSent = true
|
||||
if (dataQueue.remainingCapacity() > 1 && !hasBeenPulled(in))
|
||||
pull(in)
|
||||
}
|
||||
|
||||
override def preStart() = pull(in)
|
||||
|
||||
|
|
@ -67,7 +64,6 @@ private[akka] class InputStreamSinkStage(readTimeout: FiniteDuration) extends Gr
|
|||
override def onPush(): Unit = {
|
||||
//1 is buffer for Finished or Failed callback
|
||||
require(dataQueue.remainingCapacity() > 1)
|
||||
pullRequestIsSent = false
|
||||
dataQueue.add(Data(grab(in)))
|
||||
if (dataQueue.remainingCapacity() > 1) sendPullIfAllowed()
|
||||
}
|
||||
|
|
@ -193,6 +189,9 @@ private[akka] class InputStreamAdapter(sharedBuffer: BlockingQueue[StreamToAdapt
|
|||
case Data(data) ⇒
|
||||
detachedChunk = Some(data)
|
||||
detachedChunk
|
||||
case Finished ⇒
|
||||
isStageAlive = false
|
||||
None
|
||||
case _ ⇒ None
|
||||
}
|
||||
case Some(_) ⇒ detachedChunk
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue