From 9c763cc85084acc39328a20831699a9cd108e466 Mon Sep 17 00:00:00 2001 From: Konrad Malawski Date: Fri, 19 Feb 2016 17:20:30 +0100 Subject: [PATCH] =str #19791 subscription timeouts modes respected in prefixAndTail --- .../scaladsl/FlowPrefixAndTailSpec.scala | 24 +++++++++++++++++-- .../stream/impl/fusing/StreamOfStreams.scala | 16 ++++++++++--- 2 files changed, 35 insertions(+), 5 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 28f3286ecb..a4be7f8110 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -108,10 +108,12 @@ class FlowPrefixAndTailSpec extends AkkaSpec { } "signal error if substream has been not subscribed in time" in assertAllStagesStopped { + val ms = 300 + val tightTimeoutMaterializer = ActorMaterializer(ActorMaterializerSettings(system) .withSubscriptionTimeoutSettings( - StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, 500.millisecond))) + StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, ms.millisecond))) val futureSink = newHeadSink val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer) @@ -122,7 +124,25 @@ class FlowPrefixAndTailSpec extends AkkaSpec { Thread.sleep(1000) tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) - subscriber.expectSubscriptionAndError().getMessage should ===("Substream Source has not been materialized in 500 milliseconds") + subscriber.expectSubscriptionAndError().getMessage should ===(s"Substream Source has not been materialized in ${ms} milliseconds") + } + "not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped { + val tightTimeoutMaterializer = + ActorMaterializer(ActorMaterializerSettings(system) + .withSubscriptionTimeoutSettings( + StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.noop, 1.millisecond))) + + val futureSink = newHeadSink + val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer) + val (takes, tail) = Await.result(fut, 3.seconds) + takes should be(Seq(1)) + + val subscriber = TestSubscriber.probe[Int]() + Thread.sleep(200) + + tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) + subscriber.expectSubscription().request(2) + subscriber.expectNext(2).expectComplete() } "shut down main stage if substream is empty, even when not subscribed" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index a692f359d7..dc7ad3e287 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -127,9 +127,19 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. private val SubscriptionTimer = "SubstreamSubscriptionTimer" override protected def onTimer(timerKey: Any): Unit = { - val timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout - tailSource.timeout(timeout) - if (tailSource.isClosed) completeStage() + val materializer = ActorMaterializer.downcast(interpreter.materializer) + val timeoutSettings = materializer.settings.subscriptionTimeoutSettings + val timeout = timeoutSettings.timeout + + timeoutSettings.mode match { + case StreamSubscriptionTimeoutTerminationMode.CancelTermination ⇒ + tailSource.timeout(timeout) + if (tailSource.isClosed) completeStage() + case StreamSubscriptionTimeoutTerminationMode.NoopTermination ⇒ + // do nothing + case StreamSubscriptionTimeoutTerminationMode.WarnTermination ⇒ + materializer.logger.warning("Substream subscription timeout triggered after {} in prefixAndTail({}).", timeout, n) + } } private def prefixComplete = builder eq null