From 15abdaeb15a86ddf8569f4b144fda0a184071b25 Mon Sep 17 00:00:00 2001 From: Konrad 'ktoso' Malawski Date: Mon, 3 Nov 2014 15:29:02 +0100 Subject: [PATCH] +str #15086 substream publishers now have subscription-timeouts --- .../akka/stream/actor/ActorPublisherTest.java | 5 +- .../stream/actor/ActorSubscriberTest.java | 4 +- .../java/akka/stream/javadsl/FlowTest.java | 7 +- .../java/akka/stream/javadsl/SinkTest.java | 23 ++- .../stream/scaladsl/FlowGroupBySpec.scala | 35 ++-- .../SubstreamSubscriptionTimeoutSpec.scala | 122 ++++++++++++++ akka-stream/src/main/resources/reference.conf | 19 +++ .../scala/akka/stream/FlattenStrategy.scala | 2 - .../scala/akka/stream/FlowMaterializer.scala | 55 ++++++- .../akka/stream/impl/ConcatAllImpl.scala | 1 + .../stream/impl/GroupByProcessorImpl.scala | 2 +- .../impl/StreamOfStreamProcessors.scala | 29 +++- .../impl/StreamSubscriptionTimeout.scala | 155 ++++++++++++++++++ .../scala/akka/stream/io/SslTlsCipher.scala | 25 ++- .../scala/akka/stream/scaladsl/Flow.scala | 1 - 15 files changed, 423 insertions(+), 62 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java index e8f60bf02b..341f477194 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorPublisherTest.java @@ -3,14 +3,13 @@ package akka.stream.actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.stream.FlowMaterializer; import akka.stream.MaterializerSettings; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Source; -import akka.stream.FlowMaterializer; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import org.junit.ClassRule; -import org.junit.Ignore; import org.junit.Test; import org.reactivestreams.Publisher; @@ -38,7 +37,7 @@ public class ActorPublisherTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java index 8947bf4058..2340385719 100644 --- a/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/actor/ActorSubscriberTest.java @@ -3,11 +3,11 @@ package akka.stream.actor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.Props; +import akka.stream.FlowMaterializer; import akka.stream.MaterializerSettings; import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; -import akka.stream.FlowMaterializer; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; import org.junit.ClassRule; @@ -57,7 +57,7 @@ public class ActorSubscriberTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 82e894a708..4e6823fb62 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -7,7 +7,10 @@ import akka.dispatch.Futures; import akka.dispatch.OnSuccess; import akka.japi.Pair; import akka.japi.Util; -import akka.stream.*; +import akka.stream.FlowMaterializer; +import akka.stream.MaterializerSettings; +import akka.stream.OverflowStrategy; +import akka.stream.Transformer; import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -37,7 +40,7 @@ public class FlowTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 8c8b9b4719..60657e8514 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -3,21 +3,20 @@ */ package akka.stream.javadsl; -import java.util.ArrayList; -import java.util.List; - -import akka.stream.javadsl.japi.Function2; -import org.junit.ClassRule; -import org.junit.Test; -import org.reactivestreams.Publisher; - -import scala.concurrent.Await; -import scala.concurrent.Future; -import scala.concurrent.duration.Duration; import akka.actor.ActorSystem; import akka.stream.FlowMaterializer; import akka.stream.MaterializerSettings; +import akka.stream.javadsl.japi.Function2; import akka.stream.testkit.AkkaSpec; +import org.junit.ClassRule; +import org.junit.Test; +import org.reactivestreams.Publisher; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +import java.util.ArrayList; +import java.util.List; public class SinkTest { @@ -27,7 +26,7 @@ public class SinkTest { final ActorSystem system = actorSystemResource.getSystem(); - final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); + final MaterializerSettings settings = MaterializerSettings.create(system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system); @Test diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala index 1e8bbee842..d4152b0ac3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowGroupBySpec.scala @@ -11,7 +11,6 @@ import akka.stream.MaterializerSettings import akka.stream.testkit._ import org.reactivestreams.Publisher -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowGroupBySpec extends AkkaSpec { val settings = MaterializerSettings(system) @@ -43,7 +42,7 @@ class FlowGroupBySpec extends AkkaSpec { def getSubFlow(expectedKey: Int): Source[Int] = { masterSubscription.request(1) - expectSubFlow(expectedKey: Int) + expectSubFlow(expectedKey) } def expectSubFlow(expectedKey: Int): Source[Int] = { @@ -123,23 +122,21 @@ class FlowGroupBySpec extends AkkaSpec { } "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { - pending - // FIXME: Needs handling of loose substreams that no one refers to anymore. - // val substream = StreamPuppet(getSubproducer(1)) - // - // substream.request(1) - // substream.expectNext(1) - // - // masterSubscription.cancel() - // masterSubscriber.expectNoMsg(100.millis) - // - // // Open substreams still work, others are discarded - // substream.request(4) - // substream.expectNext(4) - // substream.expectNext(7) - // substream.expectNext(10) - // substream.expectNext(13) - // substream.expectComplete() + val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher)) + + substream.request(1) + substream.expectNext(1) + + masterSubscription.cancel() + masterSubscriber.expectNoMsg(100.millis) + + // Open substreams still work, others are discarded + substream.request(4) + substream.expectNext(4) + substream.expectNext(7) + substream.expectNext(10) + substream.expectNext(13) + substream.expectComplete() } "work with empty input stream" in { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala new file mode 100644 index 0000000000..9375b5464f --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SubstreamSubscriptionTimeoutSpec.scala @@ -0,0 +1,122 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.actor.{ ExtendedActorSystem, ActorIdentity, ActorRef, Identify } +import akka.stream.{ FlowMaterializer, MaterializerSettings } +import akka.stream.impl.SubscriptionTimeoutException +import akka.stream.testkit._ +import akka.util.Timeout + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) { + + def this(subscriptionTimeout: FiniteDuration) { + this( + s""" + |akka.stream.materializer { + | subscription-timeout { + | mode = cancel + | + | timeout = ${subscriptionTimeout.toMillis}ms + | } + |}""".stripMargin) + } + + def this() { + this(300.millis) + } + + val settings = MaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 2) + .withFanOutBuffer(initialSize = 2, maxSize = 2) + + implicit val dispatcher = system.dispatcher + implicit val materializer = FlowMaterializer(settings) + + "groupBy" must { + + "timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in { + val publisherProbe = StreamTestKit.PublisherProbe[Int]() + val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + upstreamSubscription.sendNext(2) + upstreamSubscription.sendNext(3) + + val (_, s1) = subscriber.expectNext() + // should not break normal usage + val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() + s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe) + s1SubscriberProbe.expectSubscription().request(100) + s1SubscriberProbe.expectNext(1) + + val (_, s2) = subscriber.expectNext() + // should not break normal usage + val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]() + s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe) + s2SubscriberProbe.expectSubscription().request(100) + s2SubscriberProbe.expectNext(2) + + val (_, s3) = subscriber.expectNext() + + // sleep long enough for it to be cleaned up + Thread.sleep(1000) + + val f = s3.runWith(Sink.future).recover { case _: SubscriptionTimeoutException ⇒ "expected" } + Await.result(f, 300.millis) should equal("expected") + } + + "timeout and stop groupBy parent actor if none of the substreams are actually consumed" in { + val publisherProbe = StreamTestKit.PublisherProbe[Int]() + val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher) + val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]() + publisher.subscribe(subscriber) + + val upstreamSubscription = publisherProbe.expectSubscription() + + val downstreamSubscription = subscriber.expectSubscription() + downstreamSubscription.request(100) + + upstreamSubscription.sendNext(1) + upstreamSubscription.sendNext(2) + upstreamSubscription.sendNext(3) + upstreamSubscription.sendComplete() + + val (_, s1) = subscriber.expectNext() + val (_, s2) = subscriber.expectNext() + + val groupByActor = watchGroupByActor(5) // update this number based on how many streams the test above has... + + // it should be terminated after none of it's substreams are used within the timeout + expectTerminated(groupByActor, 1000.millis) + } + } + + private def watchGroupByActor(flowNr: Int): ActorRef = { + implicit val t = Timeout(300.millis) + import akka.pattern.ask + val path = s"/user/$$a/flow-${flowNr}-1-groupBy" + val gropByPath = system.actorSelection(path) + val groupByActor = try { + Await.result((gropByPath ? Identify("")).mapTo[ActorIdentity], 300.millis).ref.get + } catch { + case ex: Exception ⇒ + alert(s"Unable to find groupBy actor by path: [$path], please adjust it's flowId, here's the current actor tree:\n" + + system.asInstanceOf[ExtendedActorSystem].printTree) + throw ex + } + watch(groupByActor) + } + +} diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index ea10b33255..49c055c288 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -22,6 +22,25 @@ akka { # to be used by FlowMaterialiser when creating Actors. # When this value is left empty, the default-dispatcher will be used. dispatcher = "" + + # Cleanup leaked publishers and subscribers when they are not used within a given deadline + subscription-timeout { + # Fully qualified config path which holds the dispatcher configuration + # to be used for the scheduled stream cancellations. + # When this value is left empty, the containing actor context's dispatcher will be used. + dispatcher = "" + + # when the subscription timeout is reached one of the following strategies on the "stale" publisher: + # cancel - cancel it (via `onError` or subscribing to the publisher and `cancel()`ing the subscription right away + # warn - log a warning statement about the stale element (then drop the reference to it) + # noop - do nothing (not recommended) + mode = cancel + + # time after which a subscriber / publisher is considered stale and eligible for cancelation (see `akka.stream.subscription-timeout.mode`) + timeout = 5s + } + } + } } diff --git a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala index 8f56fb8773..41862c6339 100644 --- a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala @@ -3,8 +3,6 @@ */ package akka.stream -import org.reactivestreams.Publisher - /** * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. */ diff --git a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index 3635f89251..f052392c34 100644 --- a/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -3,14 +3,26 @@ */ package akka.stream +import java.util.Locale +import java.util.concurrent.TimeUnit + +import akka.stream.impl.ActorBasedFlowMaterializer +import akka.stream.impl.Ast +import akka.stream.impl.FlowNameCounter +import akka.stream.impl.StreamSupervisor + import scala.collection.immutable -import akka.actor.{ ActorContext, ActorRefFactory, ActorSystem, ExtendedActorSystem } -import akka.stream.impl.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor } +import akka.actor.ActorContext +import akka.actor.ActorRefFactory +import akka.actor.ActorSystem +import akka.actor.ExtendedActorSystem import com.typesafe.config.Config import org.reactivestreams.Publisher import org.reactivestreams.Subscriber +import scala.concurrent.duration._ + object FlowMaterializer { /** @@ -171,7 +183,8 @@ object MaterializerSettings { config.getInt("max-input-buffer-size"), config.getInt("initial-fan-out-buffer-size"), config.getInt("max-fan-out-buffer-size"), - config.getString("dispatcher")) + config.getString("dispatcher"), + StreamSubscriptionTimeoutSettings(config)) /** * Java API @@ -209,7 +222,8 @@ final case class MaterializerSettings( maxInputBufferSize: Int, initialFanOutBufferSize: Int, maxFanOutBufferSize: Int, - dispatcher: String) { + dispatcher: String, + subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -234,3 +248,36 @@ final case class MaterializerSettings( private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 } + +object StreamSubscriptionTimeoutSettings { + + /** Java API */ + def create(config: Config): StreamSubscriptionTimeoutSettings = + apply(config) + + def apply(config: Config): StreamSubscriptionTimeoutSettings = { + val c = config.getConfig("subscription-timeout") + StreamSubscriptionTimeoutSettings( + mode = c.getString("mode").toLowerCase(Locale.ROOT) match { + case "no" | "off" | "false" | "noop" ⇒ NoopTermination + case "warn" ⇒ WarnTermination + case "cancel" ⇒ CancelTermination + }, + timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis, + dispatcher = c.getString("dispatcher")) + } +} +final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration, dispatcher: String) + +sealed abstract class StreamSubscriptionTimeoutTerminationMode +object StreamSubscriptionTimeoutTerminationMode { + /** Java API */ + def noop = NoopTermination + /** Java API */ + def warn = WarnTermination + /** Java API */ + def cancel = CancelTermination +} +case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode +case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode +case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala index a60bf5783d..e9ac39f3c5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -31,4 +31,5 @@ private[akka] class ConcatAllImpl(materializer: FlowMaterializer) nextPhase(takeNextSubstream) override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 07a4ff8d26..58b4881777 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -37,7 +37,7 @@ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val key } } - def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemandOrCancel) { () ⇒ if (primaryOutputs.isClosed) { // Just drop, we do not open any more substreams nextPhase(waitNext) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 5439e70fb4..2635dc807b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -4,6 +4,8 @@ package akka.stream.impl import java.util.concurrent.atomic.AtomicReference +import akka.actor.ActorLogging +import akka.actor.Cancellable import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } import akka.actor.{ Actor, ActorRef } import akka.stream.MaterializerSettings @@ -36,7 +38,8 @@ private[akka] object MultiStreamOutputProcessor { final case class Failed(e: Throwable) extends CompletedState } - class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump) extends SimpleOutputs(actor, pump) with Publisher[Any] { + class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: SubscriptionTimeout) + extends SimpleOutputs(actor, pump) with Publisher[Any] { import SubstreamOutput._ @@ -79,12 +82,14 @@ private[akka] object MultiStreamOutputProcessor { } override def subscribe(s: Subscriber[_ >: Any]): Unit = { - if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) - else { - state.get() match { - case _: Attached ⇒ s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) - case c: CompletedState ⇒ closeSubscriber(s, c) - case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") + if (subscriptionTimeout.cancelAndHandle(s)) { + if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) + else { + state.get() match { + case _: Attached ⇒ s.onError(new IllegalStateException("GroupBy substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber)) + case c: CompletedState ⇒ closeSubscriber(s, c) + case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") + } } } } @@ -100,7 +105,9 @@ private[akka] object MultiStreamOutputProcessor { /** * INTERNAL API */ -private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor ⇒ +private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubscriptionTimeoutSupport { + this: Actor with ActorLogging ⇒ + import MultiStreamOutputProcessor._ protected def nextId(): Long @@ -109,7 +116,9 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor protected def createSubstreamOutput(): SubstreamOutput = { val id = SubstreamKey(nextId()) - val outputs = new SubstreamOutput(id, self, this) + val outputs = publisherWithStreamSubscriptionTimeout { + new SubstreamOutput(id, self, this, _) + } substreamOutputs(outputs.key) = outputs outputs } @@ -142,6 +151,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS private var _nextId = 0L protected def nextId(): Long = { _nextId += 1; _nextId } + override val subscriptionTimeoutSettings = _settings.subscriptionTimeoutSettings + override protected def fail(e: Throwable): Unit = { failOutputs(e) super.fail(e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala new file mode 100644 index 0000000000..a6ba235349 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamSubscriptionTimeout.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import java.util.concurrent.atomic.AtomicBoolean + +import akka.actor._ +import akka.dispatch.ExecutionContexts +import akka.stream.CancelTermination +import akka.stream.NoopTermination +import akka.stream.StreamSubscriptionTimeoutSettings +import akka.stream.WarnTermination +import org.reactivestreams.Processor +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription + +import scala.concurrent.duration.FiniteDuration +import scala.concurrent.Future +import scala.concurrent.Promise +import scala.util.control.NoStackTrace + +/** + * Handed to a [[Publisher]] participating in subscription-timeouts. + * + * It *MUST* cancel this timeout the earliest it can in it's `subscribe(Subscriber[T])` method to prevent the timeout from being triggered spuriously. + */ +trait SubscriptionTimeout { + /** + * Cancels the subscription timeout and returns `true` if the given `Subscriber` is valid to be processed. + * For example, if termination is in progress already the Processor should not process this incoming subscriber. + * In case of returning `false` as in "do not handle this subscriber", this method takes care of cancelling the Subscriber + * automatically by signalling `onError` with an adequate description of the subscription-timeout being exceeded. + * + * [[Publisher]] implementations *MUST* use this method to guard any handling of Subscribers (in `Publisher#subscribe`). + */ + def cancelAndHandle(s: Subscriber[_]): Boolean +} + +/** + * Provides support methods to create Publishers and Subscribers which time-out gracefully, + * and are cancelled subscribing an `CancellingSubscriber` to the publisher, or by calling `onError` on the timed-out subscriber. + * + * See `akka.stream.materializer.subscription-timeout` for configuration options. + */ +trait StreamSubscriptionTimeoutSupport { + this: Actor with ActorLogging ⇒ + + /** Default settings for subscription timeouts. */ + def subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings + + /** + * Creates a [[Publisher]] using the given `mkPublisher` function and registers it for subscription-timeout termination, + * using the default timeout from the configuration. + * + * The created Publisher MUST wrap it's code handling a Subscribers incoming subscription in an `if (subscriptionTimeout.cancel())` block. + * This is in order to avoid races between the timer cancelling the publisher and it acknowlaging an incoming Subscriber. + */ + def publisherWithStreamSubscriptionTimeout[Pub <: Publisher[_]](mkPublisher: SubscriptionTimeout ⇒ Pub): Pub = + publisherWithStreamSubscriptionTimeout(subscriptionTimeoutSettings.timeout)(mkPublisher) + + /** + * Creates a [[Publisher]] using the given `mkPublisher` function and registers it for subscription-timeout termination, + * using the passed in timeout. + * + * The created Publisher MUST wrap it's code handling a Subscribers incoming subscription in an `if (subscriptionTimeout.cancel())` block. + * This is in order to avoid races between the timer cancelling the publisher and it acknowlaging an incoming Subscriber. + */ + def publisherWithStreamSubscriptionTimeout[Pub <: Publisher[_]](timeoutOverride: FiniteDuration)(mkPublisher: SubscriptionTimeout ⇒ Pub): Pub = { + val p = Promise[Publisher[_]]() // to break chicken-and-egg with subscriptionTimeout + + val subscriptionTimeout = scheduleSubscriptionTimeout(p.future, timeoutOverride) + val pub = mkPublisher(subscriptionTimeout) + p.success(pub) + + pub + } + + private def scheduleSubscriptionTimeout(rs: Future[_], timeout: FiniteDuration): SubscriptionTimeout = { + implicit val dispatcher = + if (subscriptionTimeoutSettings.dispatcher.trim.isEmpty) context.dispatcher + else context.system.dispatchers.lookup(subscriptionTimeoutSettings.dispatcher) + + new SubscriptionTimeout { + private val safeToCancelTimer = new AtomicBoolean(true) + + val subscriptionTimeout = context.system.scheduler.scheduleOnce(timeout, new Runnable { + override def run(): Unit = { + if (safeToCancelTimer.compareAndSet(true, false)) + onReactiveStream { terminate(_, timeout) } + } + }) + + override def cancelAndHandle(s: Subscriber[_]): Boolean = s match { + case _ if subscriptionTimeout.isCancelled ⇒ + // there was some initial subscription already, which cancelled the timeout => continue normal operation + true + + case _ if safeToCancelTimer.get ⇒ + // first subscription signal, cancel the subscription-timeout + safeToCancelTimer.compareAndSet(true, false) && subscriptionTimeout.cancel() + true + + case CancellingSubscriber if !safeToCancelTimer.get ⇒ + // publisher termination in progress - normally we'd onError all subscribers, except the CancellationSubscriber (!) + // guaranteed that no other subscribers are coming in now + true + + case _ ⇒ + // terminated - kill incoming subscribers + onReactiveStream { rs ⇒ + s.onError(new SubscriptionTimeoutException(s"Publisher (${rs}) you are trying to subscribe to has been shut-down " + + s"because exceeding it's subscription-timeout.") with NoStackTrace) + } + + false + } + + private final def onReactiveStream(block: Any ⇒ Unit) = + rs.foreach { rs ⇒ block(rs) }(ExecutionContexts.sameThreadExecutionContext) + } + } + + private def cancel(rs: Any, timeout: FiniteDuration): Unit = rs match { + case p: Processor[_, _] ⇒ + log.debug("Cancelling {} Processor's publisher and subscriber sides (after {})", p, timeout) + p.subscribe(CancellingSubscriber) + p.onError(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${timeout})") with NoStackTrace) + + case p: Publisher[_] ⇒ + log.debug("Cancelling {} using CancellingSubscriber (after: {})", p, timeout) + p.subscribe(CancellingSubscriber) + } + + private def warn(rs: Any, timeout: FiniteDuration): Unit = { + log.warning("Timed out {} detected (after {})! You should investigate if you either cancel or consume all {} instances", + rs, timeout, rs.getClass.getCanonicalName) + } + private def terminate(el: Any, timeout: FiniteDuration): Unit = subscriptionTimeoutSettings.mode match { + case NoopTermination ⇒ // ignore... + case WarnTermination ⇒ warn(el, timeout) + case CancelTermination ⇒ cancel(el, timeout) + } + + private final case object CancellingSubscriber extends Subscriber[Any] { + override def onSubscribe(s: Subscription): Unit = s.cancel() + override def onError(t: Throwable): Unit = () + override def onComplete(): Unit = () + override def onNext(t: Any): Unit = () + } + +} + +class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg) \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala b/akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala index 0f86694024..a1ca79ca10 100644 --- a/akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala +++ b/akka-stream/src/main/scala/akka/stream/io/SslTlsCipher.scala @@ -4,16 +4,25 @@ package akka.stream.io -import akka.actor.{ ActorLogging, ActorRefFactory, Actor, ActorRef } -import akka.stream.impl._ -import akka.util.{ ByteStringBuilder, ByteString } import java.nio.ByteBuffer -import java.security.cert.Certificate import java.security.Principal +import java.security.cert.Certificate import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.Status._ -import javax.net.ssl.{ SSLEngineResult, SSLPeerUnverifiedException, SSLSession, SSLEngine } -import org.reactivestreams.{ Subscription, Publisher, Subscriber } +import javax.net.ssl.SSLEngine +import javax.net.ssl.SSLEngineResult +import javax.net.ssl.SSLPeerUnverifiedException +import javax.net.ssl.SSLSession + +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.stream.MaterializerSettings +import akka.stream.impl._ +import akka.util.ByteString +import akka.util.ByteStringBuilder +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber import scala.annotation.tailrec @@ -88,11 +97,13 @@ class SslTlsCipherActor(val requester: ActorRef, val sessionNegotioation: SslTls with MultiStreamOutputProcessorLike with MultiStreamInputProcessorLike { + override val subscriptionTimeoutSettings = MaterializerSettings(context.system).subscriptionTimeoutSettings + def this(requester: ActorRef, sessionNegotioation: SslTlsCipher.SessionNegotiation) = this(requester, sessionNegotioation, false) - import SslTlsCipherActor._ import MultiStreamInputProcessor.SubstreamSubscriber + import SslTlsCipherActor._ private var _nextId = 0L protected def nextId(): Long = { _nextId += 1; _nextId } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 23cef6f6b5..25c4b25fbe 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -408,7 +408,6 @@ trait FlowOps[+Out] { */ def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U] = strategy match { case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) - case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) // TODO remove duality here? case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") }