=str #16272 splitWhen should drop reference when done with substream

The completeSubstreamOutput is used to not early complete the stream,
where as invalidating would shutdown the stream too early (and elements
wouldn't be emitted as expected).
This commit is contained in:
Konrad 'ktoso' Malawski 2014-11-12 10:16:22 +01:00
parent bb07c20547
commit 0f4abb687f
3 changed files with 15 additions and 8 deletions

View file

@ -124,9 +124,13 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
}
protected def invalidateSubstreamOutput(substream: SubstreamKey): Unit = {
completeSubstreamOutput(substream)
pump()
}
protected def completeSubstreamOutput(substream: SubstreamKey): Unit = {
substreamOutputs(substream).complete()
substreamOutputs -= substream
pump()
}
protected def failOutputs(e: Throwable): Unit = {
@ -138,7 +142,10 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubsc
}
val outputSubstreamManagement: Receive = {
case SubstreamRequestMore(key, demand) substreamOutputs(key).enqueueOutputDemand(demand)
case SubstreamRequestMore(key, demand) substreamOutputs.get(key) match {
case Some(substream) substream.enqueueOutputDemand(demand)
case _ // ignore...
}
case SubstreamCancel(key) invalidateSubstreamOutput(key)
case SubstreamSubscribe(key, subscriber) substreamOutputs(key).attachSubscriber(subscriber)
}