Merge pull request #17085 from drewhk/wip-16924-flowspec-terminated-fanout-fix-drewhk

=str #16924: Fix FanoutProcessor to not overwrite last termination cause
This commit is contained in:
drewhk 2015-03-27 15:24:15 +01:00
commit dd4b9986bc
7 changed files with 15 additions and 14 deletions

View file

@ -457,7 +457,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
}
}
"call future subscribers' onComplete instead of onSubscribed after initial upstream was completed" in {
"call future subscribers' onError after onSubscribe if initial upstream was completed" in {
new ChainSetup(identity, settings.copy(initialInputBufferSize = 1),
toFanoutPublisher(initialBufferSize = 1, maximumBufferSize = 1)) {
val downstream2 = StreamTestKit.SubscriberProbe[Any]()
@ -489,11 +489,10 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
downstream2.expectNext("a3")
downstream2.expectComplete()
// FIXME when adding a sleep before the following link this will fail with IllegalStateExc shut-down
// what is the expected shutdown behavior? Is the title of this test wrong?
// val downstream3 = StreamTestKit.SubscriberProbe[Any]()
// publisher.subscribe(downstream3)
// downstream3.expectComplete()
val downstream3 = StreamTestKit.SubscriberProbe[Any]()
publisher.subscribe(downstream3)
downstream3.expectSubscription()
downstream3.expectError() should ===(ActorPublisher.NormalShutdownReason)
}
}

View file

@ -16,8 +16,10 @@ import org.reactivestreams.Subscription
* INTERNAL API
*/
private[akka] object ActorPublisher {
class NormalShutdownException extends IllegalStateException("Cannot subscribe to shut-down Publisher") with NoStackTrace
val NormalShutdownReason: Option[Throwable] = Some(new NormalShutdownException)
val NormalShutdownReasonMessage = "Cannot subscribe to shut-down Publisher"
class NormalShutdownException extends IllegalStateException(NormalShutdownReasonMessage) with NoStackTrace
val NormalShutdownReason: Throwable = new NormalShutdownException
val SomeNormalShutdownReason: Some[Throwable] = Some(NormalShutdownReason)
def apply[T](impl: ActorRef): ActorPublisher[T] = {
val a = new ActorPublisher[T](impl)

View file

@ -42,8 +42,8 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
if (!downstreamCompleted) {
downstreamCompleted = true
abortDownstream(e)
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
}
if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e))
}
def isClosed: Boolean = downstreamCompleted
@ -58,7 +58,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
override protected def shutdown(completed: Boolean): Unit = {
if (exposedPublisher ne null) {
if (completed) exposedPublisher.shutdown(None)
else exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
else exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
}
afterShutdown()
}

View file

@ -54,7 +54,7 @@ private[akka] class FuturePublisher(future: Future[Any], settings: ActorFlowMate
var subscriptions = Map.empty[FutureSubscription, Subscriber[Any]]
var subscriptionsReadyForPush = Set.empty[FutureSubscription]
var futureValue: Option[Try[Any]] = future.value
var shutdownReason = ActorPublisher.NormalShutdownReason
var shutdownReason: Option[Throwable] = ActorPublisher.SomeNormalShutdownReason
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy

View file

@ -277,7 +277,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: ActorFlowMateria
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override protected def completed: Actor.Receive = {
case OtherStreamOnSubscribe(_) throw new IllegalStateException("Cannot subscribe shutdown subscriber")
case OtherStreamOnSubscribe(_) throw ActorPublisher.NormalShutdownReason
}
}

View file

@ -32,7 +32,7 @@ private[akka] object SubscriberManagement {
def apply[T](subscriber: Subscriber[T]): Unit = tryOnError(subscriber, cause)
}
val ShutDown = new ErrorCompleted(new IllegalStateException("Cannot subscribe to shut-down Publisher"))
val ShutDown = new ErrorCompleted(ActorPublisher.NormalShutdownReason)
}
/**

View file

@ -124,7 +124,7 @@ private[akka] class TickPublisher(initialDelay: FiniteDuration, interval: Finite
tickTask.foreach(_.cancel)
cancelled.set(true)
if (exposedPublisher ne null)
exposedPublisher.shutdown(ActorPublisher.NormalShutdownReason)
exposedPublisher.shutdown(ActorPublisher.SomeNormalShutdownReason)
if (subscriber ne null)
tryOnComplete(subscriber)
}