diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala index 3aa7c5c2e1..91cecffb9f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowPrefixAndTailSpec.scala @@ -107,10 +107,12 @@ class FlowPrefixAndTailSpec extends AkkaSpec { } "signal error if substream has been not subscribed in time" in assertAllStagesStopped { + val ms = 300 + val tightTimeoutMaterializer = ActorMaterializer(ActorMaterializerSettings(system) .withSubscriptionTimeoutSettings( - StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, 500.millisecond))) + StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, ms.millisecond))) val futureSink = newHeadSink val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer) @@ -121,7 +123,25 @@ class FlowPrefixAndTailSpec extends AkkaSpec { Thread.sleep(1000) tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) - subscriber.expectSubscriptionAndError().getMessage should ===("Substream Source has not been materialized in 500 milliseconds") + subscriber.expectSubscriptionAndError().getMessage should ===(s"Substream Source has not been materialized in ${ms} milliseconds") + } + "not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped { + val tightTimeoutMaterializer = + ActorMaterializer(ActorMaterializerSettings(system) + .withSubscriptionTimeoutSettings( + StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.noop, 1.millisecond))) + + val futureSink = newHeadSink + val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer) + val (takes, tail) = Await.result(fut, 3.seconds) + takes should be(Seq(1)) + + val subscriber = TestSubscriber.probe[Int]() + Thread.sleep(200) + + tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer) + subscriber.expectSubscription().request(2) + subscriber.expectNext(2).expectComplete() } "shut down main stage if substream is empty, even when not subscribed" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index 3607877997..e2791f917c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -122,9 +122,19 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable. private val SubscriptionTimer = "SubstreamSubscriptionTimer" override protected def onTimer(timerKey: Any): Unit = { - val timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout - tailSource.timeout(timeout) - if (tailSource.isClosed) completeStage() + val materializer = ActorMaterializer.downcast(interpreter.materializer) + val timeoutSettings = materializer.settings.subscriptionTimeoutSettings + val timeout = timeoutSettings.timeout + + timeoutSettings.mode match { + case StreamSubscriptionTimeoutTerminationMode.CancelTermination ⇒ + tailSource.timeout(timeout) + if (tailSource.isClosed) completeStage() + case StreamSubscriptionTimeoutTerminationMode.NoopTermination ⇒ + // do nothing + case StreamSubscriptionTimeoutTerminationMode.WarnTermination ⇒ + materializer.logger.warning("Substream subscription timeout triggered after {} in prefixAndTail({}).", timeout, n) + } } private def prefixComplete = builder eq null