diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index fbecebd2f5..6795f9213d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -120,7 +120,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS */ private[akka] object TwoStreamInputProcessor { class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] { - override def onError(cause: Throwable): Unit = impl ! OnError(cause) + override def onError(cause: Throwable): Unit = impl ! OtherStreamOnError(cause) override def onComplete(): Unit = impl ! OtherStreamOnComplete override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element) override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription) @@ -129,6 +129,7 @@ private[akka] object TwoStreamInputProcessor { case object OtherStreamOnComplete case class OtherStreamOnNext(element: Any) case class OtherStreamOnSubscribe(subscription: Subscription) + case class OtherStreamOnError(ex: Throwable) } /** @@ -146,11 +147,13 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett override def waitingForUpstream: Receive = { case OtherStreamOnComplete ⇒ onComplete() case OtherStreamOnSubscribe(subscription) ⇒ onSubscribe(subscription) + case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e) } override def upstreamRunning: Receive = { case OtherStreamOnNext(element) ⇒ enqueueInputElement(element) case OtherStreamOnComplete ⇒ onComplete() + case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e) } override protected def completed: Actor.Receive = { case OtherStreamOnSubscribe(_) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index 1e65e1bee5..21e6ec88fd 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -8,6 +8,7 @@ import akka.stream.scaladsl.Flow import org.reactivestreams.api.Producer import akka.stream.testkit.OnSubscribe import akka.stream.testkit.OnError +import scala.concurrent.Promise class FlowConcatSpec extends TwoStreamsSetup { @@ -92,5 +93,18 @@ class FlowConcatSpec extends TwoStreamsSetup { consumer2.expectErrorOrSubscriptionFollowedByError(TestException) } + "correctly handle async errors in secondary upstream" in { + val promise = Promise[Int]() + val flow = Flow(List(1, 2, 3)).concat(Flow(promise.future).toProducer(materializer)) + val consumer = StreamTestKit.consumerProbe[Int] + flow.produceTo(materializer, consumer) + val subscription = consumer.expectSubscription() + subscription.requestMore(4) + consumer.expectNext(1) + consumer.expectNext(2) + consumer.expectNext(3) + promise.failure(TestException) + consumer.expectError(TestException) + } } }