From 5559c34ca99bde04585505017c5b283dc2f830e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 27 Mar 2015 13:13:44 +0100 Subject: [PATCH] =str #16924: Fix FanoutProcessor to not overwrite last termination cause Also fix FlowSpec to expect onError instead of onComplete for late subscribers --- .../test/scala/akka/stream/scaladsl/FlowSpec.scala | 11 +++++------ .../main/scala/akka/stream/impl/ActorPublisher.scala | 6 ++++-- .../main/scala/akka/stream/impl/FanoutProcessor.scala | 4 ++-- .../main/scala/akka/stream/impl/FuturePublisher.scala | 2 +- .../akka/stream/impl/StreamOfStreamProcessors.scala | 2 +- .../scala/akka/stream/impl/SubscriberManagement.scala | 2 +- .../main/scala/akka/stream/impl/TickPublisher.scala | 2 +- 7 files changed, 15 insertions(+), 14 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala index de24a5c230..7e72959f43 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowSpec.scala @@ -457,7 +457,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } } - "call future subscribers' onComplete instead of onSubscribed after initial upstream was completed" in { + "call future subscribers' onError after onSubscribe if initial upstream was completed" in { new ChainSetup(identity, settings.copy(initialInputBufferSize = 1), toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() @@ -489,11 +489,10 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece downstream2.expectNext("a3") downstream2.expectComplete() - // FIXME when adding a sleep before the following link this will fail with IllegalStateExc shut-down - // what is the expected shutdown behavior? Is the title of this test wrong? - // val downstream3 = StreamTestKit.SubscriberProbe[Any]() - // publisher.subscribe(downstream3) - // downstream3.expectComplete() + val downstream3 = StreamTestKit.SubscriberProbe[Any]() + publisher.subscribe(downstream3) + downstream3.expectSubscription() + downstream3.expectError() should ===(ActorPublisher.NormalShutdownReason) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala index 4b92ed8f11..86d08af7f2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorPublisher.scala @@ -16,8 +16,10 @@ import org.reactivestreams.Subscription * INTERNAL API */ private[akka] object ActorPublisher { - class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down Publisher") with NoStackTrace - val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException) + val NormalShutdownReasonMessage = "Cannot subscribe to shut-down Publisher" + class NormalShutdownException extends IllegalStateException(NormalShutdownReasonMessage) with NoStackTrace + val NormalShutdownReason: Throwable = new NormalShutdownException + val SomeNormalShutdownReason: Some[Throwable] = Some(NormalShutdownReason) def apply[T](impl: ActorRef): ActorPublisher[T] = { val a = new ActorPublisher[T](impl) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 8aadb00946..ed37d664bd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -42,8 +42,8 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu if (!downstreamCompleted) { downstreamCompleted = true abortDownstream(e) + if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } - if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } def isClosed: Boolean = downstreamCompleted @@ -58,7 +58,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu override protected def shutdown(completed: Boolean): Unit = { if (exposedPublisher ne null) { if (completed) exposedPublisher.shutdown(None) - else exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) + else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason) } afterShutdown() } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala index b27f5ae94a..5fd0d05ab3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FuturePublisher.scala @@ -54,7 +54,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]] var subscriptionsReadyForPush = Set.empty[FutureSubscription] var futureValue: Option[Try[Any]] = future.value - var shutdownReason = ActorPublisher.NormalShutdownReason + var shutdownReason: Option[Throwable] = ActorPublisher.SomeNormalShutdownReason override val supervisorStrategy = SupervisorStrategy.stoppingStrategy diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index d780b7a738..b35065503d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -277,7 +277,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: ActorFlowMateria case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e) } override protected def completed: Actor.Receive = { - case OtherStreamOnSubscribe(_) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") + case OtherStreamOnSubscribe(_) ⇒ throw ActorPublisher.NormalShutdownReason } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala index 844746d72b..3b8051b349 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala @@ -32,7 +32,7 @@ private[akka] object SubscriberManagement { def apply[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, cause) } - val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down Publisher")) + val ShutDown = new ErrorCompleted(ActorPublisher.NormalShutdownReason) } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala index 47cf953ac2..54eb4365ed 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/TickPublisher.scala @@ -124,7 +124,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite tickTask.foreach(_.cancel) cancelled.set(true) if (exposedPublisher ne null) - exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason) + exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason) if (subscriber ne null) tryOnComplete(subscriber) }