=str #15347 in concat propagate errors in secondary upstream properly

This commit is contained in:
Johannes Rudolph 2014-06-10 14:09:08 +02:00
parent ce8bdf7454
commit 3b77234eb8
2 changed files with 18 additions and 1 deletions

View file

@ -120,7 +120,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
*/
private[akka] object TwoStreamInputProcessor {
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onError(cause: Throwable): Unit = impl ! OtherStreamOnError(cause)
override def onComplete(): Unit = impl ! OtherStreamOnComplete
override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription)
@ -129,6 +129,7 @@ private[akka] object TwoStreamInputProcessor {
case object OtherStreamOnComplete
case class OtherStreamOnNext(element: Any)
case class OtherStreamOnSubscribe(subscription: Subscription)
case class OtherStreamOnError(ex: Throwable)
}
/**
@ -146,11 +147,13 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
override def waitingForUpstream: Receive = {
case OtherStreamOnComplete onComplete()
case OtherStreamOnSubscribe(subscription) onSubscribe(subscription)
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override def upstreamRunning: Receive = {
case OtherStreamOnNext(element) enqueueInputElement(element)
case OtherStreamOnComplete onComplete()
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override protected def completed: Actor.Receive = {
case OtherStreamOnSubscribe(_) throw new IllegalStateException("Cannot subscribe shutdown subscriber")