Merge pull request #16215 from ktoso/wip-timeout-untouchedstreams-ktoso

+str #15086 subscription-timeout for publishers in streamsofstreams
This commit is contained in:
Konrad Malawski 2014-11-06 16:15:20 +01:00
commit 48d5e1e109
15 changed files with 423 additions and 61 deletions

View file

@ -3,14 +3,13 @@ package akka.stream.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings; import akka.stream.MaterializerSettings;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.FlowMaterializer;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test; import org.junit.Test;
import org.reactivestreams.Publisher; import org.reactivestreams.Publisher;
@ -38,7 +37,7 @@ public class ActorPublisherTest {
final ActorSystem system = actorSystemResource.getSystem(); final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test

View file

@ -3,11 +3,11 @@ package akka.stream.actor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.actor.Props; import akka.actor.Props;
import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings; import akka.stream.MaterializerSettings;
import akka.stream.javadsl.AkkaJUnitActorSystemResource; import akka.stream.javadsl.AkkaJUnitActorSystemResource;
import akka.stream.javadsl.Sink; import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source; import akka.stream.javadsl.Source;
import akka.stream.FlowMaterializer;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
import org.junit.ClassRule; import org.junit.ClassRule;
@ -57,7 +57,7 @@ public class ActorSubscriberTest {
final ActorSystem system = actorSystemResource.getSystem(); final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test

View file

@ -7,7 +7,10 @@ import akka.dispatch.Futures;
import akka.dispatch.OnSuccess; import akka.dispatch.OnSuccess;
import akka.japi.Pair; import akka.japi.Pair;
import akka.japi.Util; import akka.japi.Util;
import akka.stream.*; import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings;
import akka.stream.OverflowStrategy;
import akka.stream.Transformer;
import akka.stream.javadsl.japi.*; import akka.stream.javadsl.japi.*;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import akka.testkit.JavaTestKit; import akka.testkit.JavaTestKit;
@ -37,7 +40,7 @@ public class FlowTest {
final ActorSystem system = actorSystemResource.getSystem(); final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test

View file

@ -3,21 +3,20 @@
*/ */
package akka.stream.javadsl; package akka.stream.javadsl;
import java.util.ArrayList;
import java.util.List;
import akka.stream.javadsl.japi.Function2;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.actor.ActorSystem; import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer; import akka.stream.FlowMaterializer;
import akka.stream.MaterializerSettings; import akka.stream.MaterializerSettings;
import akka.stream.javadsl.japi.Function2;
import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.AkkaSpec;
import org.junit.ClassRule;
import org.junit.Test;
import org.reactivestreams.Publisher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import java.util.ArrayList;
import java.util.List;
public class SinkTest { public class SinkTest {
@ -27,7 +26,7 @@ public class SinkTest {
final ActorSystem system = actorSystemResource.getSystem(); final ActorSystem system = actorSystemResource.getSystem();
final MaterializerSettings settings = new MaterializerSettings(2, 4, 2, 4, "akka.test.stream-dispatcher"); final MaterializerSettings settings = MaterializerSettings.create(system);
final FlowMaterializer materializer = FlowMaterializer.create(settings, system); final FlowMaterializer materializer = FlowMaterializer.create(settings, system);
@Test @Test

View file

@ -42,7 +42,7 @@ class FlowGroupBySpec extends AkkaSpec {
def getSubFlow(expectedKey: Int): Source[Int] = { def getSubFlow(expectedKey: Int): Source[Int] = {
masterSubscription.request(1) masterSubscription.request(1)
expectSubFlow(expectedKey: Int) expectSubFlow(expectedKey)
} }
def expectSubFlow(expectedKey: Int): Source[Int] = { def expectSubFlow(expectedKey: Int): Source[Int] = {
@ -122,23 +122,21 @@ class FlowGroupBySpec extends AkkaSpec {
} }
"accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) {
pending val substream = StreamPuppet(getSubFlow(1).runWith(Sink.publisher))
// FIXME: Needs handling of loose substreams that no one refers to anymore.
// val substream = StreamPuppet(getSubproducer(1)) substream.request(1)
// substream.expectNext(1)
// substream.request(1)
// substream.expectNext(1) masterSubscription.cancel()
// masterSubscriber.expectNoMsg(100.millis)
// masterSubscription.cancel()
// masterSubscriber.expectNoMsg(100.millis) // Open substreams still work, others are discarded
// substream.request(4)
// // Open substreams still work, others are discarded substream.expectNext(4)
// substream.request(4) substream.expectNext(7)
// substream.expectNext(4) substream.expectNext(10)
// substream.expectNext(7) substream.expectNext(13)
// substream.expectNext(10) substream.expectComplete()
// substream.expectNext(13)
// substream.expectComplete()
} }
"work with empty input stream" in { "work with empty input stream" in {

View file

@ -0,0 +1,122 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.actor.{ ExtendedActorSystem, ActorIdentity, ActorRef, Identify }
import akka.stream.{ FlowMaterializer, MaterializerSettings }
import akka.stream.impl.SubscriptionTimeoutException
import akka.stream.testkit._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
class SubstreamSubscriptionTimeoutSpec(conf: String) extends AkkaSpec(conf) {
def this(subscriptionTimeout: FiniteDuration) {
this(
s"""
|akka.stream.materializer {
| subscription-timeout {
| mode = cancel
|
| timeout = ${subscriptionTimeout.toMillis}ms
| }
|}""".stripMargin)
}
def this() {
this(300.millis)
}
val settings = MaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 2)
.withFanOutBuffer(initialSize = 2, maxSize = 2)
implicit val dispatcher = system.dispatcher
implicit val materializer = FlowMaterializer(settings)
"groupBy" must {
"timeout and cancel substream publishers when no-one subscribes to them after some time (time them out)" in {
val publisherProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbe).groupBy(_ % 3).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
upstreamSubscription.sendNext(2)
upstreamSubscription.sendNext(3)
val (_, s1) = subscriber.expectNext()
// should not break normal usage
val s1SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s1.runWith(Sink.publisher).subscribe(s1SubscriberProbe)
s1SubscriberProbe.expectSubscription().request(100)
s1SubscriberProbe.expectNext(1)
val (_, s2) = subscriber.expectNext()
// should not break normal usage
val s2SubscriberProbe = StreamTestKit.SubscriberProbe[Int]()
s2.runWith(Sink.publisher).subscribe(s2SubscriberProbe)
s2SubscriberProbe.expectSubscription().request(100)
s2SubscriberProbe.expectNext(2)
val (_, s3) = subscriber.expectNext()
// sleep long enough for it to be cleaned up
Thread.sleep(1000)
val f = s3.runWith(Sink.future).recover { case _: SubscriptionTimeoutException "expected" }
Await.result(f, 300.millis) should equal("expected")
}
"timeout and stop groupBy parent actor if none of the substreams are actually consumed" in {
val publisherProbe = StreamTestKit.PublisherProbe[Int]()
val publisher = Source(publisherProbe).groupBy(_ % 2).runWith(Sink.publisher)
val subscriber = StreamTestKit.SubscriberProbe[(Int, Source[Int])]()
publisher.subscribe(subscriber)
val upstreamSubscription = publisherProbe.expectSubscription()
val downstreamSubscription = subscriber.expectSubscription()
downstreamSubscription.request(100)
upstreamSubscription.sendNext(1)
upstreamSubscription.sendNext(2)
upstreamSubscription.sendNext(3)
upstreamSubscription.sendComplete()
val (_, s1) = subscriber.expectNext()
val (_, s2) = subscriber.expectNext()
val groupByActor = watchGroupByActor(5) // update this number based on how many streams the test above has...
// it should be terminated after none of it's substreams are used within the timeout
expectTerminated(groupByActor, 1000.millis)
}
}
private def watchGroupByActor(flowNr: Int): ActorRef = {
implicit val t = Timeout(300.millis)
import akka.pattern.ask
val path = s"/user/$$a/flow-${flowNr}-1-groupBy"
val gropByPath = system.actorSelection(path)
val groupByActor = try {
Await.result((gropByPath ? Identify("")).mapTo[ActorIdentity], 300.millis).ref.get
} catch {
case ex: Exception
alert(s"Unable to find groupBy actor by path: [$path], please adjust it's flowId, here's the current actor tree:\n" +
system.asInstanceOf[ExtendedActorSystem].printTree)
throw ex
}
watch(groupByActor)
}
}

View file

@ -22,6 +22,25 @@ akka {
# to be used by FlowMaterialiser when creating Actors. # to be used by FlowMaterialiser when creating Actors.
# When this value is left empty, the default-dispatcher will be used. # When this value is left empty, the default-dispatcher will be used.
dispatcher = "" dispatcher = ""
# Cleanup leaked publishers and subscribers when they are not used within a given deadline
subscription-timeout {
# Fully qualified config path which holds the dispatcher configuration
# to be used for the scheduled stream cancellations.
# When this value is left empty, the containing actor context's dispatcher will be used.
dispatcher = ""
# when the subscription timeout is reached one of the following strategies on the "stale" publisher:
# cancel - cancel it (via `onError` or subscribing to the publisher and `cancel()`ing the subscription right away
# warn - log a warning statement about the stale element (then drop the reference to it)
# noop - do nothing (not recommended)
mode = cancel
# time after which a subscriber / publisher is considered stale and eligible for cancelation (see `akka.stream.subscription-timeout.mode`)
timeout = 5s
}
} }
} }
} }

View file

@ -3,8 +3,6 @@
*/ */
package akka.stream package akka.stream
import org.reactivestreams.Publisher
/** /**
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements. * Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
*/ */

View file

@ -3,14 +3,26 @@
*/ */
package akka.stream package akka.stream
import java.util.Locale
import java.util.concurrent.TimeUnit
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import akka.stream.impl.FlowNameCounter
import akka.stream.impl.StreamSupervisor
import scala.collection.immutable import scala.collection.immutable
import akka.actor.{ ActorContext, ActorRefFactory, ActorSystem, ExtendedActorSystem } import akka.actor.ActorContext
import akka.stream.impl.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor } import akka.actor.ActorRefFactory
import akka.actor.ActorSystem
import akka.actor.ExtendedActorSystem
import com.typesafe.config.Config import com.typesafe.config.Config
import org.reactivestreams.Publisher import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber import org.reactivestreams.Subscriber
import scala.concurrent.duration._
object FlowMaterializer { object FlowMaterializer {
/** /**
@ -171,7 +183,8 @@ object MaterializerSettings {
config.getInt("max-input-buffer-size"), config.getInt("max-input-buffer-size"),
config.getInt("initial-fan-out-buffer-size"), config.getInt("initial-fan-out-buffer-size"),
config.getInt("max-fan-out-buffer-size"), config.getInt("max-fan-out-buffer-size"),
config.getString("dispatcher")) config.getString("dispatcher"),
StreamSubscriptionTimeoutSettings(config))
/** /**
* Java API * Java API
@ -209,7 +222,8 @@ final case class MaterializerSettings(
maxInputBufferSize: Int, maxInputBufferSize: Int,
initialFanOutBufferSize: Int, initialFanOutBufferSize: Int,
maxFanOutBufferSize: Int, maxFanOutBufferSize: Int,
dispatcher: String) { dispatcher: String,
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
@ -234,3 +248,36 @@ final case class MaterializerSettings(
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0
} }
object StreamSubscriptionTimeoutSettings {
/** Java API */
def create(config: Config): StreamSubscriptionTimeoutSettings =
apply(config)
def apply(config: Config): StreamSubscriptionTimeoutSettings = {
val c = config.getConfig("subscription-timeout")
StreamSubscriptionTimeoutSettings(
mode = c.getString("mode").toLowerCase(Locale.ROOT) match {
case "no" | "off" | "false" | "noop" NoopTermination
case "warn" WarnTermination
case "cancel" CancelTermination
},
timeout = c.getDuration("timeout", TimeUnit.MILLISECONDS).millis,
dispatcher = c.getString("dispatcher"))
}
}
final case class StreamSubscriptionTimeoutSettings(mode: StreamSubscriptionTimeoutTerminationMode, timeout: FiniteDuration, dispatcher: String)
sealed abstract class StreamSubscriptionTimeoutTerminationMode
object StreamSubscriptionTimeoutTerminationMode {
/** Java API */
def noop = NoopTermination
/** Java API */
def warn = WarnTermination
/** Java API */
def cancel = CancelTermination
}
case object NoopTermination extends StreamSubscriptionTimeoutTerminationMode
case object WarnTermination extends StreamSubscriptionTimeoutTerminationMode
case object CancelTermination extends StreamSubscriptionTimeoutTerminationMode

View file

@ -31,4 +31,5 @@ private[akka] class ConcatAllImpl(materializer: FlowMaterializer)
nextPhase(takeNextSubstream) nextPhase(takeNextSubstream)
override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e) override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e)
} }

View file

@ -37,7 +37,7 @@ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val key
} }
} }
def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemandOrCancel) { ()
if (primaryOutputs.isClosed) { if (primaryOutputs.isClosed) {
// Just drop, we do not open any more substreams // Just drop, we do not open any more substreams
nextPhase(waitNext) nextPhase(waitNext)

View file

@ -4,6 +4,8 @@
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorLogging
import akka.actor.Cancellable
import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings } import akka.stream.{ ReactiveStreamsConstants, MaterializerSettings }
import akka.actor.{ Actor, ActorRef } import akka.actor.{ Actor, ActorRef }
import akka.stream.MaterializerSettings import akka.stream.MaterializerSettings
@ -36,7 +38,8 @@ private[akka] object MultiStreamOutputProcessor {
final case class Failed(e: Throwable) extends CompletedState final case class Failed(e: Throwable) extends CompletedState
} }
class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump) extends SimpleOutputs(actor, pump) with Publisher[Any] { class SubstreamOutput(val key: SubstreamKey, actor: ActorRef, pump: Pump, subscriptionTimeout: SubscriptionTimeout)
extends SimpleOutputs(actor, pump) with Publisher[Any] {
import SubstreamOutput._ import SubstreamOutput._
@ -79,12 +82,14 @@ private[akka] object MultiStreamOutputProcessor {
} }
override def subscribe(s: Subscriber[_ >: Any]): Unit = { override def subscribe(s: Subscriber[_ >: Any]): Unit = {
if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s) if (subscriptionTimeout.cancelAndHandle(s)) {
else { if (state.compareAndSet(Open, Attached(s))) actor ! SubstreamSubscribe(key, s)
state.get() match { else {
case _: Attached s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) state.get() match {
case c: CompletedState closeSubscriber(s, c) case _: Attached s.onError(new IllegalStateException("GroupBy substream publisher " + ReactiveStreamsConstants.SupportsOnlyASingleSubscriber))
case Open throw new IllegalStateException("Publisher cannot become open after being used before") case c: CompletedState closeSubscriber(s, c)
case Open throw new IllegalStateException("Publisher cannot become open after being used before")
}
} }
} }
} }
@ -100,7 +105,9 @@ private[akka] object MultiStreamOutputProcessor {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor private[akka] trait MultiStreamOutputProcessorLike extends Pump with StreamSubscriptionTimeoutSupport {
this: Actor with ActorLogging
import MultiStreamOutputProcessor._ import MultiStreamOutputProcessor._
protected def nextId(): Long protected def nextId(): Long
@ -109,7 +116,9 @@ private[akka] trait MultiStreamOutputProcessorLike extends Pump { this: Actor
protected def createSubstreamOutput(): SubstreamOutput = { protected def createSubstreamOutput(): SubstreamOutput = {
val id = SubstreamKey(nextId()) val id = SubstreamKey(nextId())
val outputs = new SubstreamOutput(id, self, this) val outputs = publisherWithStreamSubscriptionTimeout {
new SubstreamOutput(id, self, this, _)
}
substreamOutputs(outputs.key) = outputs substreamOutputs(outputs.key) = outputs
outputs outputs
} }
@ -142,6 +151,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
private var _nextId = 0L private var _nextId = 0L
protected def nextId(): Long = { _nextId += 1; _nextId } protected def nextId(): Long = { _nextId += 1; _nextId }
override val subscriptionTimeoutSettings = _settings.subscriptionTimeoutSettings
override protected def fail(e: Throwable): Unit = { override protected def fail(e: Throwable): Unit = {
failOutputs(e) failOutputs(e)
super.fail(e) super.fail(e)

View file

@ -0,0 +1,155 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.util.concurrent.atomic.AtomicBoolean
import akka.actor._
import akka.dispatch.ExecutionContexts
import akka.stream.CancelTermination
import akka.stream.NoopTermination
import akka.stream.StreamSubscriptionTimeoutSettings
import akka.stream.WarnTermination
import org.reactivestreams.Processor
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.util.control.NoStackTrace
/**
* Handed to a [[Publisher]] participating in subscription-timeouts.
*
* It *MUST* cancel this timeout the earliest it can in it's `subscribe(Subscriber[T])` method to prevent the timeout from being triggered spuriously.
*/
trait SubscriptionTimeout {
/**
* Cancels the subscription timeout and returns `true` if the given `Subscriber` is valid to be processed.
* For example, if termination is in progress already the Processor should not process this incoming subscriber.
* In case of returning `false` as in "do not handle this subscriber", this method takes care of cancelling the Subscriber
* automatically by signalling `onError` with an adequate description of the subscription-timeout being exceeded.
*
* [[Publisher]] implementations *MUST* use this method to guard any handling of Subscribers (in `Publisher#subscribe`).
*/
def cancelAndHandle(s: Subscriber[_]): Boolean
}
/**
* Provides support methods to create Publishers and Subscribers which time-out gracefully,
* and are cancelled subscribing an `CancellingSubscriber` to the publisher, or by calling `onError` on the timed-out subscriber.
*
* See `akka.stream.materializer.subscription-timeout` for configuration options.
*/
trait StreamSubscriptionTimeoutSupport {
this: Actor with ActorLogging
/** Default settings for subscription timeouts. */
def subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings
/**
* Creates a [[Publisher]] using the given `mkPublisher` function and registers it for subscription-timeout termination,
* using the default timeout from the configuration.
*
* The created Publisher MUST wrap it's code handling a Subscribers incoming subscription in an `if (subscriptionTimeout.cancel())` block.
* This is in order to avoid races between the timer cancelling the publisher and it acknowlaging an incoming Subscriber.
*/
def publisherWithStreamSubscriptionTimeout[Pub <: Publisher[_]](mkPublisher: SubscriptionTimeout Pub): Pub =
publisherWithStreamSubscriptionTimeout(subscriptionTimeoutSettings.timeout)(mkPublisher)
/**
* Creates a [[Publisher]] using the given `mkPublisher` function and registers it for subscription-timeout termination,
* using the passed in timeout.
*
* The created Publisher MUST wrap it's code handling a Subscribers incoming subscription in an `if (subscriptionTimeout.cancel())` block.
* This is in order to avoid races between the timer cancelling the publisher and it acknowlaging an incoming Subscriber.
*/
def publisherWithStreamSubscriptionTimeout[Pub <: Publisher[_]](timeoutOverride: FiniteDuration)(mkPublisher: SubscriptionTimeout Pub): Pub = {
val p = Promise[Publisher[_]]() // to break chicken-and-egg with subscriptionTimeout
val subscriptionTimeout = scheduleSubscriptionTimeout(p.future, timeoutOverride)
val pub = mkPublisher(subscriptionTimeout)
p.success(pub)
pub
}
private def scheduleSubscriptionTimeout(rs: Future[_], timeout: FiniteDuration): SubscriptionTimeout = {
implicit val dispatcher =
if (subscriptionTimeoutSettings.dispatcher.trim.isEmpty) context.dispatcher
else context.system.dispatchers.lookup(subscriptionTimeoutSettings.dispatcher)
new SubscriptionTimeout {
private val safeToCancelTimer = new AtomicBoolean(true)
val subscriptionTimeout = context.system.scheduler.scheduleOnce(timeout, new Runnable {
override def run(): Unit = {
if (safeToCancelTimer.compareAndSet(true, false))
onReactiveStream { terminate(_, timeout) }
}
})
override def cancelAndHandle(s: Subscriber[_]): Boolean = s match {
case _ if subscriptionTimeout.isCancelled
// there was some initial subscription already, which cancelled the timeout => continue normal operation
true
case _ if safeToCancelTimer.get
// first subscription signal, cancel the subscription-timeout
safeToCancelTimer.compareAndSet(true, false) && subscriptionTimeout.cancel()
true
case CancellingSubscriber if !safeToCancelTimer.get
// publisher termination in progress - normally we'd onError all subscribers, except the CancellationSubscriber (!)
// guaranteed that no other subscribers are coming in now
true
case _
// terminated - kill incoming subscribers
onReactiveStream { rs
s.onError(new SubscriptionTimeoutException(s"Publisher (${rs}) you are trying to subscribe to has been shut-down " +
s"because exceeding it's subscription-timeout.") with NoStackTrace)
}
false
}
private final def onReactiveStream(block: Any Unit) =
rs.foreach { rs block(rs) }(ExecutionContexts.sameThreadExecutionContext)
}
}
private def cancel(rs: Any, timeout: FiniteDuration): Unit = rs match {
case p: Processor[_, _]
log.debug("Cancelling {} Processor's publisher and subscriber sides (after {})", p, timeout)
p.subscribe(CancellingSubscriber)
p.onError(new SubscriptionTimeoutException(s"Publisher was not attached to upstream within deadline (${timeout})") with NoStackTrace)
case p: Publisher[_]
log.debug("Cancelling {} using CancellingSubscriber (after: {})", p, timeout)
p.subscribe(CancellingSubscriber)
}
private def warn(rs: Any, timeout: FiniteDuration): Unit = {
log.warning("Timed out {} detected (after {})! You should investigate if you either cancel or consume all {} instances",
rs, timeout, rs.getClass.getCanonicalName)
}
private def terminate(el: Any, timeout: FiniteDuration): Unit = subscriptionTimeoutSettings.mode match {
case NoopTermination // ignore...
case WarnTermination warn(el, timeout)
case CancelTermination cancel(el, timeout)
}
private final case object CancellingSubscriber extends Subscriber[Any] {
override def onSubscribe(s: Subscription): Unit = s.cancel()
override def onError(t: Throwable): Unit = ()
override def onComplete(): Unit = ()
override def onNext(t: Any): Unit = ()
}
}
class SubscriptionTimeoutException(msg: String) extends RuntimeException(msg)

View file

@ -4,16 +4,25 @@
package akka.stream.io package akka.stream.io
import akka.actor.{ ActorLogging, ActorRefFactory, Actor, ActorRef }
import akka.stream.impl._
import akka.util.{ ByteStringBuilder, ByteString }
import java.nio.ByteBuffer import java.nio.ByteBuffer
import java.security.cert.Certificate
import java.security.Principal import java.security.Principal
import java.security.cert.Certificate
import javax.net.ssl.SSLEngineResult.HandshakeStatus._ import javax.net.ssl.SSLEngineResult.HandshakeStatus._
import javax.net.ssl.SSLEngineResult.Status._ import javax.net.ssl.SSLEngineResult.Status._
import javax.net.ssl.{ SSLEngineResult, SSLPeerUnverifiedException, SSLSession, SSLEngine } import javax.net.ssl.SSLEngine
import org.reactivestreams.{ Subscription, Publisher, Subscriber } import javax.net.ssl.SSLEngineResult
import javax.net.ssl.SSLPeerUnverifiedException
import javax.net.ssl.SSLSession
import akka.actor.Actor
import akka.actor.ActorLogging
import akka.actor.ActorRef
import akka.stream.MaterializerSettings
import akka.stream.impl._
import akka.util.ByteString
import akka.util.ByteStringBuilder
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import scala.annotation.tailrec import scala.annotation.tailrec
@ -88,11 +97,13 @@ class SslTlsCipherActor(val requester: ActorRef, val sessionNegotioation: SslTls
with MultiStreamOutputProcessorLike with MultiStreamOutputProcessorLike
with MultiStreamInputProcessorLike { with MultiStreamInputProcessorLike {
override val subscriptionTimeoutSettings = MaterializerSettings(context.system).subscriptionTimeoutSettings
def this(requester: ActorRef, sessionNegotioation: SslTlsCipher.SessionNegotiation) = def this(requester: ActorRef, sessionNegotioation: SslTlsCipher.SessionNegotiation) =
this(requester, sessionNegotioation, false) this(requester, sessionNegotioation, false)
import SslTlsCipherActor._
import MultiStreamInputProcessor.SubstreamSubscriber import MultiStreamInputProcessor.SubstreamSubscriber
import SslTlsCipherActor._
private var _nextId = 0L private var _nextId = 0L
protected def nextId(): Long = { _nextId += 1; _nextId } protected def nextId(): Long = { _nextId += 1; _nextId }

View file

@ -379,7 +379,6 @@ trait FlowOps[+Out] {
*/ */
def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U] = strategy match { def flatten[U](strategy: akka.stream.FlattenStrategy[Out, U]): Repr[U] = strategy match {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll) case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll) // TODO remove duality here?
case _ case _
throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]") throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getName}]")
} }