Merge pull request #15386 from spray/w/fix-15347

=str #15347 in `concat` propagate errors in secondary upstream properly
This commit is contained in:
Patrik Nordwall 2014-06-10 17:06:28 +02:00
commit 8e2dfe835d
2 changed files with 18 additions and 1 deletions

View file

@ -120,7 +120,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
*/
private[akka] object TwoStreamInputProcessor {
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! OnError(cause)
override def onError(cause: Throwable): Unit = impl ! OtherStreamOnError(cause)
override def onComplete(): Unit = impl ! OtherStreamOnComplete
override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element)
override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription)
@ -129,6 +129,7 @@ private[akka] object TwoStreamInputProcessor {
case object OtherStreamOnComplete
case class OtherStreamOnNext(element: Any)
case class OtherStreamOnSubscribe(subscription: Subscription)
case class OtherStreamOnError(ex: Throwable)
}
/**
@ -146,11 +147,13 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
override def waitingForUpstream: Receive = {
case OtherStreamOnComplete onComplete()
case OtherStreamOnSubscribe(subscription) onSubscribe(subscription)
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override def upstreamRunning: Receive = {
case OtherStreamOnNext(element) enqueueInputElement(element)
case OtherStreamOnComplete onComplete()
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override protected def completed: Actor.Receive = {
case OtherStreamOnSubscribe(_) throw new IllegalStateException("Cannot subscribe shutdown subscriber")

View file

@ -8,6 +8,7 @@ import akka.stream.scaladsl.Flow
import org.reactivestreams.api.Producer
import akka.stream.testkit.OnSubscribe
import akka.stream.testkit.OnError
import scala.concurrent.Promise
class FlowConcatSpec extends TwoStreamsSetup {
@ -92,5 +93,18 @@ class FlowConcatSpec extends TwoStreamsSetup {
consumer2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"correctly handle async errors in secondary upstream" in {
val promise = Promise[Int]()
val flow = Flow(List(1, 2, 3)).concat(Flow(promise.future).toProducer(materializer))
val consumer = StreamTestKit.consumerProbe[Int]
flow.produceTo(materializer, consumer)
val subscription = consumer.expectSubscription()
subscription.requestMore(4)
consumer.expectNext(1)
consumer.expectNext(2)
consumer.expectNext(3)
promise.failure(TestException)
consumer.expectError(TestException)
}
}
}