Merge pull request #19559 from drewhk/wip-19342-subscription-timeout-drewhk

#19342 Eliminated race in SubstreamSubscriptionTimeoutSpec due to concat
This commit is contained in:
drewhk 2016-01-21 15:22:14 +01:00
commit e960c7b7d0
2 changed files with 10 additions and 213 deletions

View file

@ -41,10 +41,9 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
"groupBy and splitwhen" must {
"timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in assertAllStagesStopped {
val publisherProbe = TestPublisher.probe[Int]()
val publisher = Source.fromPublisher(publisherProbe).groupBy(3, _ % 3).lift(_ % 3).runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
val publisherProbe = TestPublisher.probe[Int]()
val publisher = Source.fromPublisher(publisherProbe).groupBy(3, _ % 3).lift(_ % 3).runWith(Sink.fromSubscriber(subscriber))
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
@ -56,7 +55,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s1) = subscriber.expectNext()
// should not break normal usage
val s1SubscriberProbe = TestSubscriber.manualProbe[Int]()
s1.runWith(Sink.asPublisher(false)).subscribe(s1SubscriberProbe)
s1.runWith(Sink.fromSubscriber(s1SubscriberProbe))
val s1Subscription = s1SubscriberProbe.expectSubscription()
s1Subscription.request(100)
s1SubscriberProbe.expectNext(1)
@ -64,7 +63,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s2) = subscriber.expectNext()
// should not break normal usage
val s2SubscriberProbe = TestSubscriber.manualProbe[Int]()
s2.runWith(Sink.asPublisher(false)).subscribe(s2SubscriberProbe)
s2.runWith(Sink.fromSubscriber(s2SubscriberProbe))
val s2Subscription = s2SubscriberProbe.expectSubscription()
s2Subscription.request(100)
s2SubscriberProbe.expectNext(2)
@ -74,7 +73,8 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
// sleep long enough for it to be cleaned up
Thread.sleep(1500)
val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException "expected" }
// Must be a Sink.seq, otherwise there is a race due to the concat in the `lift` implementation
val f = s3.runWith(Sink.seq).recover { case _: SubscriptionTimeoutException "expected" }
Await.result(f, 300.millis) should equal("expected")
publisherProbe.sendComplete()
@ -82,9 +82,8 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
"timeout and stop groupBy parent actor if none of the substreams are actually consumed" in assertAllStagesStopped {
val publisherProbe = TestPublisher.probe[Int]()
val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.fromSubscriber(subscriber))
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
@ -100,9 +99,8 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
"not timeout and cancel substream publishers when they have been subscribed to" in {
val publisherProbe = TestPublisher.probe[Int]()
val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.asPublisher(false))
val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]()
publisher.subscribe(subscriber)
val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.fromSubscriber(subscriber))
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
@ -113,7 +111,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s1) = subscriber.expectNext()
// should not break normal usage
val s1SubscriberProbe = TestSubscriber.manualProbe[Int]()
s1.runWith(Sink.asPublisher(false)).subscribe(s1SubscriberProbe)
s1.runWith(Sink.fromSubscriber(s1SubscriberProbe))
val s1Sub = s1SubscriberProbe.expectSubscription()
s1Sub.request(1)
s1SubscriberProbe.expectNext(1)
@ -121,7 +119,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
val (_, s2) = subscriber.expectNext()
// should not break normal usage
val s2SubscriberProbe = TestSubscriber.manualProbe[Int]()
s2.runWith(Sink.asPublisher(false)).subscribe(s2SubscriberProbe)
s2.runWith(Sink.fromSubscriber(s2SubscriberProbe))
val s2Sub = s2SubscriberProbe.expectSubscription()
// sleep long enough for timeout to trigger if not canceled

View file

@ -224,204 +224,3 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorMaterial
override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement
}
/**
* INTERNAL API
*/
private[akka] object TwoStreamInputProcessor {
class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
impl ! OtherStreamOnError(cause)
}
override def onComplete(): Unit = impl ! OtherStreamOnComplete
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
impl ! OtherStreamOnNext(element)
}
override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
impl ! OtherStreamOnSubscribe(subscription)
}
}
case object OtherStreamOnComplete extends DeadLetterSuppression
final case class OtherStreamOnNext(element: Any) extends DeadLetterSuppression
final case class OtherStreamOnSubscribe(subscription: Subscription) extends DeadLetterSuppression
final case class OtherStreamOnError(ex: Throwable) extends DeadLetterSuppression
}
/**
* INTERNAL API
*/
private[akka] abstract class TwoStreamInputProcessor(_settings: ActorMaterializerSettings, val other: Publisher[Any])
extends ActorProcessorImpl(_settings) {
import akka.stream.impl.TwoStreamInputProcessor._
val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) {
override val subreceive: SubReceive = new SubReceive(waitingForUpstream)
override def inputOnError(e: Throwable): Unit = TwoStreamInputProcessor.this.onError(e)
override def waitingForUpstream: Receive = {
case OtherStreamOnComplete onComplete()
case OtherStreamOnSubscribe(subscription) onSubscribe(subscription)
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override def upstreamRunning: Receive = {
case OtherStreamOnNext(element) enqueueInputElement(element)
case OtherStreamOnComplete onComplete()
case OtherStreamOnError(e) TwoStreamInputProcessor.this.onError(e)
}
override protected def completed: Actor.Receive = {
case OtherStreamOnSubscribe(_) throw ActorPublisher.NormalShutdownReason
}
}
override def activeReceive: Receive =
secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
other.subscribe(new OtherActorSubscriber(self))
override def pumpFinished(): Unit = {
secondaryInputs.cancel()
super.pumpFinished()
}
}
/**
* INTERNAL API
*/
private[akka] object MultiStreamInputProcessor {
case class SubstreamKey(id: Long)
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends AtomicReference[Subscription] with Subscriber[T] {
override def onError(cause: Throwable): Unit = {
ReactiveStreamsCompliance.requireNonNullException(cause)
impl ! SubstreamOnError(key, cause)
}
override def onComplete(): Unit = impl ! SubstreamOnComplete(key)
override def onNext(element: T): Unit = {
ReactiveStreamsCompliance.requireNonNullElement(element)
impl ! SubstreamOnNext(key, element)
}
override def onSubscribe(subscription: Subscription): Unit = {
ReactiveStreamsCompliance.requireNonNullSubscription(subscription)
if (compareAndSet(null, subscription)) impl ! SubstreamStreamOnSubscribe(key, subscription)
else subscription.cancel()
}
}
case class SubstreamOnComplete(key: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded
case class SubstreamOnNext(key: SubstreamKey, element: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded
case class SubstreamOnError(key: SubstreamKey, e: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded
case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded
class SubstreamInput(val key: SubstreamKey, bufferSize: Int, processor: MultiStreamInputProcessorLike, pump: Pump) extends BatchingInputBuffer(bufferSize, pump) {
// Not driven directly
override val subreceive = new SubReceive(Actor.emptyBehavior)
def substreamOnComplete(): Unit = onComplete()
def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription)
def substreamOnError(e: Throwable): Unit = onError(e)
def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem)
override protected def inputOnError(e: Throwable): Unit = {
super.inputOnError(e)
processor.invalidateSubstreamInput(key, e)
}
}
}
/**
* INTERNAL API
*/
private[akka] trait MultiStreamInputProcessorLike extends Pump { this: Actor
import MultiStreamInputProcessor._
protected def nextId(): Long
protected def inputBufferSize: Int
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInput]
private val waitingForOnSubscribe = collection.mutable.Map.empty[SubstreamKey, SubstreamSubscriber[Any]]
val inputSubstreamManagement: Receive = {
case SubstreamStreamOnSubscribe(key, subscription)
substreamInputs(key).substreamOnSubscribe(subscription)
waitingForOnSubscribe -= key
case SubstreamOnNext(key, element)
substreamInputs(key).substreamOnNext(element)
case SubstreamOnComplete(key)
substreamInputs(key).substreamOnComplete()
substreamInputs -= key
case SubstreamOnError(key, e)
substreamInputs(key).substreamOnError(e)
}
def createSubstreamInput(): SubstreamInput = {
val key = SubstreamKey(nextId())
val inputs = new SubstreamInput(key, inputBufferSize, this, this)
substreamInputs(key) = inputs
inputs
}
def createAndSubscribeSubstreamInput(p: Publisher[Any]): SubstreamInput = {
val inputs = createSubstreamInput()
val sub = new SubstreamSubscriber[Any](self, inputs.key)
waitingForOnSubscribe(inputs.key) = sub
p.subscribe(sub)
inputs
}
def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = {
substreamInputs(substream).cancel()
substreamInputs -= substream
pump()
}
protected def failInputs(e: Throwable): Unit = {
cancelWaitingForOnSubscribe()
substreamInputs.values foreach (_.cancel())
}
protected def finishInputs(): Unit = {
cancelWaitingForOnSubscribe()
substreamInputs.values foreach (_.cancel())
}
private def cancelWaitingForOnSubscribe(): Unit =
waitingForOnSubscribe.valuesIterator.foreach { sub
sub.getAndSet(CancelledSubscription) match {
case null // we were first
case subscription
// SubstreamOnSubscribe is still in flight and will not arrive
subscription.cancel()
}
}
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamInputProcessor(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamInputProcessorLike {
private var _nextId = 0L
protected def nextId(): Long = { _nextId += 1; _nextId }
override protected val inputBufferSize = _settings.initialInputBufferSize
override protected def fail(e: Throwable) = {
failInputs(e)
super.fail(e)
}
override def pumpFinished() = {
finishInputs()
super.pumpFinished()
}
override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse inputSubstreamManagement
}