From 5fe12a3e9d13060e272309ec539eb0dba4bfa664 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Fri, 5 Jul 2019 08:19:46 +0200 Subject: [PATCH] Subscription timeouts not working #19980 --- .../scala/akka/stream/impl/TimeoutsSpec.scala | 31 ++++++++++++++ .../mima-filters/2.5.x.backwards.excludes | 4 ++ .../akka/stream/impl/ActorProcessor.scala | 8 +++- .../akka/stream/impl/FanoutProcessor.scala | 41 ++++++++++++++++--- .../main/scala/akka/stream/impl/Sinks.scala | 11 ++++- .../scala/akka/stream/impl/StreamLayout.scala | 19 +++++++++ 6 files changed, 107 insertions(+), 7 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala index 602d55ca82..665df72f11 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala @@ -336,6 +336,37 @@ class TimeoutsSpec extends StreamSpec { } + "Subscription timeouts" must { + + implicit val materializer = ActorMaterializer( + ActorMaterializerSettings(system).withSubscriptionTimeoutSettings( + StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, 100.millis))) + + "be effective for dangling downstream (no fanout)" in assertAllStagesStopped { + val upstream = TestPublisher.probe() + val (sub, _) = Flow[Int].map(_.toString).runWith(Source.asSubscriber, Sink.asPublisher(false)) + upstream.subscribe(sub) + upstream.expectCancellation() + } + + "be effective for dangling downstream (with fanout)" in assertAllStagesStopped { + val upstream = TestPublisher.probe() + val (sub, _) = Flow[Int].map(_.toString).runWith(Source.asSubscriber, Sink.asPublisher(true)) + upstream.subscribe(sub) + upstream.expectCancellation() + } + + // this one seems close to impossible to actually implement + "be effective for dangling upstream" in pendingUntilFixed(assertAllStagesStopped { + val downstream = TestSubscriber.probe[String]() + val (_, pub) = Flow[Int].map(_.toString).runWith(Source.asSubscriber, Sink.asPublisher(false)) + pub.subscribe(downstream) + downstream.ensureSubscription() + downstream.expectError() shouldBe a[SubscriptionTimeoutException] + }) + + } + } class TimeoutChecksSpec extends WordSpecLike with Matchers { diff --git a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes index 722753f54e..786ca79836 100644 --- a/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.x.backwards.excludes @@ -129,3 +129,7 @@ ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamSour ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$Continue$") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher") ProblemFilters.exclude[MissingClassProblem]("akka.stream.impl.io.InputStreamPublisher$") + +# #19980 subscription timeouts for streams +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.impl.ActorProcessorImpl.subTimeoutHandling") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.FanoutOutputs.this") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 346be46b3a..8d720e9722 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -250,6 +250,10 @@ import akka.util.unused } +private[akka] object ActorProcessorImpl { + case object SubscriptionTimeout +} + /** * INTERNAL API */ @@ -268,6 +272,7 @@ import akka.util.unused } protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) + def subTimeoutHandling: Receive /** * Subclass may override [[#activeReceive]] @@ -279,7 +284,8 @@ import akka.util.unused } } - def activeReceive: Receive = primaryInputs.subreceive.orElse[Any, Unit](primaryOutputs.subreceive) + def activeReceive: Receive = + primaryInputs.subreceive.orElse[Any, Unit](primaryOutputs.subreceive).orElse(subTimeoutHandling) protected def onError(e: Throwable): Unit = fail(e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala index 94ef3c2df9..c3cd23cb11 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -4,15 +4,20 @@ package akka.stream.impl -import akka.actor.{ Actor, ActorRef, Deploy, Props } -import akka.annotation.{ DoNotInherit, InternalApi } -import akka.stream.{ ActorMaterializerSettings, Attributes } +import akka.actor.Actor +import akka.actor.ActorRef +import akka.actor.Deploy +import akka.actor.Props +import akka.annotation.InternalApi +import akka.stream.ActorMaterializerSettings +import akka.stream.Attributes +import akka.stream.StreamSubscriptionTimeoutTerminationMode import org.reactivestreams.Subscriber /** * INTERNAL API */ -@DoNotInherit private[akka] abstract class FanoutOutputs( +@InternalApi private[akka] abstract class FanoutOutputs( val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, @@ -20,9 +25,14 @@ import org.reactivestreams.Subscriber extends DefaultOutputTransferStates with SubscriberManagement[Any] { + private var _subscribed = false + def subscribed: Boolean = _subscribed + override type S = ActorSubscriptionWithCursor[_ >: Any] - override def createSubscription(subscriber: Subscriber[_ >: Any]): S = + override def createSubscription(subscriber: Subscriber[_ >: Any]): S = { + _subscribed = true new ActorSubscriptionWithCursor(self, subscriber) + } protected var exposedPublisher: ActorPublisher[Any] = _ @@ -111,6 +121,12 @@ import org.reactivestreams.Subscriber @InternalApi private[akka] class FanoutProcessorImpl(attributes: Attributes, _settings: ActorMaterializerSettings) extends ActorProcessorImpl(attributes, _settings) { + if (settings.subscriptionTimeoutSettings.mode != StreamSubscriptionTimeoutTerminationMode.noop) { + import context.dispatcher + context.system.scheduler + .scheduleOnce(_settings.subscriptionTimeoutSettings.timeout, self, ActorProcessorImpl.SubscriptionTimeout) + } + override val primaryOutputs: FanoutOutputs = { val inputBuffer = attributes.mandatoryAttribute[Attributes.InputBuffer] new FanoutOutputs(inputBuffer.max, inputBuffer.initial, self, this) { @@ -130,4 +146,19 @@ import org.reactivestreams.Subscriber def afterFlush(): Unit = context.stop(self) initialPhase(1, running) + + def subTimeoutHandling: Receive = { + case ActorProcessorImpl.SubscriptionTimeout => + import StreamSubscriptionTimeoutTerminationMode._ + if (!primaryOutputs.subscribed) { + settings.subscriptionTimeoutSettings.mode match { + case CancelTermination => + primaryInputs.cancel() + context.stop(self) + case WarnTermination => + context.system.log.warning("Subscription timeout for {}", this) + case NoopTermination => // won't happen + } + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index f516390331..d85250d1f7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -88,7 +88,16 @@ import org.reactivestreams.Subscriber * subscription a VirtualProcessor would perform (and it also saves overhead). */ override def create(context: MaterializationContext): (AnyRef, Publisher[In]) = { + val proc = new VirtualPublisher[In] + context.materializer match { + case am: ActorMaterializer => + if (am.settings.subscriptionTimeoutSettings.mode != StreamSubscriptionTimeoutTerminationMode.noop) + am.scheduleOnce(am.settings.subscriptionTimeoutSettings.timeout, new Runnable { + def run(): Unit = proc.onSubscriptionTimeout(am) + }) + case _ => // not possible to setup timeout + } (proc, proc) } @@ -110,8 +119,8 @@ import org.reactivestreams.Subscriber context, FanoutProcessorImpl.props(context.effectiveAttributes, actorMaterializer.settings)) val fanoutProcessor = new ActorProcessor[In, In](impl) - impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]]) // Resolve cyclic dependency with actor. This MUST be the first message no matter what. + impl ! ExposedPublisher(fanoutProcessor.asInstanceOf[ActorPublisher[Any]]) (fanoutProcessor, fanoutProcessor) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala index e3cc2e3bff..e71c35a91f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamLayout.scala @@ -455,10 +455,12 @@ import scala.util.control.NonFatal * the use of `Inert.subscriber` as a tombstone. */ @InternalApi private[impl] class VirtualPublisher[T] extends AtomicReference[AnyRef] with Publisher[T] { + import ReactiveStreamsCompliance._ import VirtualProcessor.Inert override def subscribe(subscriber: Subscriber[_ >: T]): Unit = { requireNonNullSubscriber(subscriber) + if (VirtualProcessor.Debug) println(s"$this.subscribe: $subscriber") @tailrec def rec(): Unit = { get() match { case null => @@ -495,6 +497,23 @@ import scala.util.control.NonFatal } } + // this is when the subscription timeout hits, implemented like this to + // avoid allocating a separate object for that + def onSubscriptionTimeout(am: ActorMaterializer): Unit = { + import StreamSubscriptionTimeoutTerminationMode._ + get() match { + case null | _: Publisher[_] => + am.settings.subscriptionTimeoutSettings.mode match { + case CancelTermination => subscribe(new CancellingSubscriber[T]) + case WarnTermination => + am.logger.warning("Subscription timeout for {}", this) + case NoopTermination => // never happens + } + + case _ => // we're ok + } + } + override def toString: String = s"VirtualPublisher(state = ${get()})" }