diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala index f028a5c8cc..c51c61e199 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -41,10 +41,9 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "groupBy and splitwhen" must { "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in assertAllStagesStopped { - val publisherProbe = TestPublisher.probe[Int]() - val publisher = Source.fromPublisher(publisherProbe).groupBy(3, _ % 3).lift(_ % 3).runWith(Sink.asPublisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() - publisher.subscribe(subscriber) + val publisherProbe = TestPublisher.probe[Int]() + val publisher = Source.fromPublisher(publisherProbe).groupBy(3, _ % 3).lift(_ % 3).runWith(Sink.fromSubscriber(subscriber)) val downstreamSubscription = subscriber.expectSubscription() downstreamSubscription.request(100) @@ -56,7 +55,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = TestSubscriber.manualProbe[Int]() - s1.runWith(Sink.asPublisher(false)).subscribe(s1SubscriberProbe) + s1.runWith(Sink.fromSubscriber(s1SubscriberProbe)) val s1Subscription = s1SubscriberProbe.expectSubscription() s1Subscription.request(100) s1SubscriberProbe.expectNext(1) @@ -64,7 +63,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = TestSubscriber.manualProbe[Int]() - s2.runWith(Sink.asPublisher(false)).subscribe(s2SubscriberProbe) + s2.runWith(Sink.fromSubscriber(s2SubscriberProbe)) val s2Subscription = s2SubscriberProbe.expectSubscription() s2Subscription.request(100) s2SubscriberProbe.expectNext(2) @@ -74,7 +73,8 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { // sleep long enough for it to be cleaned up Thread.sleep(1500) - val f = s3.runWith(Sink.head).recover { case _: SubscriptionTimeoutException ⇒ "expected" } + // Must be a Sink.seq, otherwise there is a race due to the concat in the `lift` implementation + val f = s3.runWith(Sink.seq).recover { case _: SubscriptionTimeoutException ⇒ "expected" } Await.result(f, 300.millis) should equal("expected") publisherProbe.sendComplete() @@ -82,9 +82,8 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in assertAllStagesStopped { val publisherProbe = TestPublisher.probe[Int]() - val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.asPublisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() - publisher.subscribe(subscriber) + val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.fromSubscriber(subscriber)) val downstreamSubscription = subscriber.expectSubscription() downstreamSubscription.request(100) @@ -100,9 +99,8 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { "not timeout and cancel substream publishers when they have been subscribed to" in { val publisherProbe = TestPublisher.probe[Int]() - val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.asPublisher(false)) val subscriber = TestSubscriber.manualProbe[(Int, Source[Int, _])]() - publisher.subscribe(subscriber) + val publisher = Source.fromPublisher(publisherProbe).groupBy(2, _ % 2).lift(_ % 2).runWith(Sink.fromSubscriber(subscriber)) val downstreamSubscription = subscriber.expectSubscription() downstreamSubscription.request(100) @@ -113,7 +111,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s1) = subscriber.expectNext() // should not break normal usage val s1SubscriberProbe = TestSubscriber.manualProbe[Int]() - s1.runWith(Sink.asPublisher(false)).subscribe(s1SubscriberProbe) + s1.runWith(Sink.fromSubscriber(s1SubscriberProbe)) val s1Sub = s1SubscriberProbe.expectSubscription() s1Sub.request(1) s1SubscriberProbe.expectNext(1) @@ -121,7 +119,7 @@ class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { val (_, s2) = subscriber.expectNext() // should not break normal usage val s2SubscriberProbe = TestSubscriber.manualProbe[Int]() - s2.runWith(Sink.asPublisher(false)).subscribe(s2SubscriberProbe) + s2.runWith(Sink.fromSubscriber(s2SubscriberProbe)) val s2Sub = s2SubscriberProbe.expectSubscription() // sleep long enough for timeout to trigger if not canceled diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index c659035bd3..8e3ba9d86c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -224,204 +224,3 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: ActorMaterial override def activeReceive: Receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse outputSubstreamManagement } -/** - * INTERNAL API - */ -private[akka] object TwoStreamInputProcessor { - class OtherActorSubscriber[T](val impl: ActorRef) extends Subscriber[T] { - override def onError(cause: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(cause) - impl ! OtherStreamOnError(cause) - } - override def onComplete(): Unit = impl ! OtherStreamOnComplete - override def onNext(element: T): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(element) - impl ! OtherStreamOnNext(element) - } - override def onSubscribe(subscription: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(subscription) - impl ! OtherStreamOnSubscribe(subscription) - } - } - - case object OtherStreamOnComplete extends DeadLetterSuppression - final case class OtherStreamOnNext(element: Any) extends DeadLetterSuppression - final case class OtherStreamOnSubscribe(subscription: Subscription) extends DeadLetterSuppression - final case class OtherStreamOnError(ex: Throwable) extends DeadLetterSuppression -} - -/** - * INTERNAL API - */ -private[akka] abstract class TwoStreamInputProcessor(_settings: ActorMaterializerSettings, val other: Publisher[Any]) - extends ActorProcessorImpl(_settings) { - import akka.stream.impl.TwoStreamInputProcessor._ - - val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { - override val subreceive: SubReceive = new SubReceive(waitingForUpstream) - - override def inputOnError(e: Throwable): Unit = TwoStreamInputProcessor.this.onError(e) - - override def waitingForUpstream: Receive = { - case OtherStreamOnComplete ⇒ onComplete() - case OtherStreamOnSubscribe(subscription) ⇒ onSubscribe(subscription) - case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e) - } - - override def upstreamRunning: Receive = { - case OtherStreamOnNext(element) ⇒ enqueueInputElement(element) - case OtherStreamOnComplete ⇒ onComplete() - case OtherStreamOnError(e) ⇒ TwoStreamInputProcessor.this.onError(e) - } - override protected def completed: Actor.Receive = { - case OtherStreamOnSubscribe(_) ⇒ throw ActorPublisher.NormalShutdownReason - } - } - - override def activeReceive: Receive = - secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive - - other.subscribe(new OtherActorSubscriber(self)) - - override def pumpFinished(): Unit = { - secondaryInputs.cancel() - super.pumpFinished() - } - -} - -/** - * INTERNAL API - */ -private[akka] object MultiStreamInputProcessor { - case class SubstreamKey(id: Long) - - class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends AtomicReference[Subscription] with Subscriber[T] { - override def onError(cause: Throwable): Unit = { - ReactiveStreamsCompliance.requireNonNullException(cause) - impl ! SubstreamOnError(key, cause) - } - override def onComplete(): Unit = impl ! SubstreamOnComplete(key) - override def onNext(element: T): Unit = { - ReactiveStreamsCompliance.requireNonNullElement(element) - impl ! SubstreamOnNext(key, element) - } - override def onSubscribe(subscription: Subscription): Unit = { - ReactiveStreamsCompliance.requireNonNullSubscription(subscription) - if (compareAndSet(null, subscription)) impl ! SubstreamStreamOnSubscribe(key, subscription) - else subscription.cancel() - } - } - - case class SubstreamOnComplete(key: SubstreamKey) extends DeadLetterSuppression with NoSerializationVerificationNeeded - case class SubstreamOnNext(key: SubstreamKey, element: Any) extends DeadLetterSuppression with NoSerializationVerificationNeeded - case class SubstreamOnError(key: SubstreamKey, e: Throwable) extends DeadLetterSuppression with NoSerializationVerificationNeeded - case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) extends DeadLetterSuppression with NoSerializationVerificationNeeded - - class SubstreamInput(val key: SubstreamKey, bufferSize: Int, processor: MultiStreamInputProcessorLike, pump: Pump) extends BatchingInputBuffer(bufferSize, pump) { - // Not driven directly - override val subreceive = new SubReceive(Actor.emptyBehavior) - - def substreamOnComplete(): Unit = onComplete() - def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription) - def substreamOnError(e: Throwable): Unit = onError(e) - def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem) - - override protected def inputOnError(e: Throwable): Unit = { - super.inputOnError(e) - processor.invalidateSubstreamInput(key, e) - } - } - -} - -/** - * INTERNAL API - */ -private[akka] trait MultiStreamInputProcessorLike extends Pump { this: Actor ⇒ - - import MultiStreamInputProcessor._ - - protected def nextId(): Long - protected def inputBufferSize: Int - - private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInput] - private val waitingForOnSubscribe = collection.mutable.Map.empty[SubstreamKey, SubstreamSubscriber[Any]] - - val inputSubstreamManagement: Receive = { - case SubstreamStreamOnSubscribe(key, subscription) ⇒ - substreamInputs(key).substreamOnSubscribe(subscription) - waitingForOnSubscribe -= key - case SubstreamOnNext(key, element) ⇒ - substreamInputs(key).substreamOnNext(element) - case SubstreamOnComplete(key) ⇒ - substreamInputs(key).substreamOnComplete() - substreamInputs -= key - case SubstreamOnError(key, e) ⇒ - substreamInputs(key).substreamOnError(e) - } - - def createSubstreamInput(): SubstreamInput = { - val key = SubstreamKey(nextId()) - val inputs = new SubstreamInput(key, inputBufferSize, this, this) - substreamInputs(key) = inputs - inputs - } - - def createAndSubscribeSubstreamInput(p: Publisher[Any]): SubstreamInput = { - val inputs = createSubstreamInput() - val sub = new SubstreamSubscriber[Any](self, inputs.key) - waitingForOnSubscribe(inputs.key) = sub - p.subscribe(sub) - inputs - } - - def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = { - substreamInputs(substream).cancel() - substreamInputs -= substream - pump() - } - - protected def failInputs(e: Throwable): Unit = { - cancelWaitingForOnSubscribe() - substreamInputs.values foreach (_.cancel()) - } - - protected def finishInputs(): Unit = { - cancelWaitingForOnSubscribe() - substreamInputs.values foreach (_.cancel()) - } - - private def cancelWaitingForOnSubscribe(): Unit = - waitingForOnSubscribe.valuesIterator.foreach { sub ⇒ - sub.getAndSet(CancelledSubscription) match { - case null ⇒ // we were first - case subscription ⇒ - // SubstreamOnSubscribe is still in flight and will not arrive - subscription.cancel() - } - } - -} - -/** - * INTERNAL API - */ -private[akka] abstract class MultiStreamInputProcessor(_settings: ActorMaterializerSettings) extends ActorProcessorImpl(_settings) with MultiStreamInputProcessorLike { - private var _nextId = 0L - protected def nextId(): Long = { _nextId += 1; _nextId } - - override protected val inputBufferSize = _settings.initialInputBufferSize - - override protected def fail(e: Throwable) = { - failInputs(e) - super.fail(e) - } - - override def pumpFinished() = { - finishInputs() - super.pumpFinished() - } - - override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse inputSubstreamManagement -}