Merge pull request #19845 from ktoso/wip-subscription-timeout-modes-ktoso

=str #19791 subscription timeouts modes respected in prefixAndTail
This commit is contained in:
Konrad Malawski 2016-03-11 15:12:53 +01:00
commit aa9a572ffd
2 changed files with 35 additions and 5 deletions

View file

@ -107,10 +107,12 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
}
"signal error if substream has been not subscribed in time" in assertAllStagesStopped {
val ms = 300
val tightTimeoutMaterializer =
ActorMaterializer(ActorMaterializerSettings(system)
.withSubscriptionTimeoutSettings(
StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, 500.millisecond)))
StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.cancel, ms.millisecond)))
val futureSink = newHeadSink
val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer)
@ -121,7 +123,25 @@ class FlowPrefixAndTailSpec extends AkkaSpec {
Thread.sleep(1000)
tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer)
subscriber.expectSubscriptionAndError().getMessage should ===("Substream Source has not been materialized in 500 milliseconds")
subscriber.expectSubscriptionAndError().getMessage should ===(s"Substream Source has not been materialized in ${ms} milliseconds")
}
"not fail the stream if substream has not been subscribed in time and configured subscription timeout is noop" in assertAllStagesStopped {
val tightTimeoutMaterializer =
ActorMaterializer(ActorMaterializerSettings(system)
.withSubscriptionTimeoutSettings(
StreamSubscriptionTimeoutSettings(StreamSubscriptionTimeoutTerminationMode.noop, 1.millisecond)))
val futureSink = newHeadSink
val fut = Source(1 to 2).prefixAndTail(1).runWith(futureSink)(tightTimeoutMaterializer)
val (takes, tail) = Await.result(fut, 3.seconds)
takes should be(Seq(1))
val subscriber = TestSubscriber.probe[Int]()
Thread.sleep(200)
tail.to(Sink.fromSubscriber(subscriber)).run()(tightTimeoutMaterializer)
subscriber.expectSubscription().request(2)
subscriber.expectNext(2).expectComplete()
}
"shut down main stage if substream is empty, even when not subscribed" in assertAllStagesStopped {

View file

@ -122,9 +122,19 @@ final class PrefixAndTail[T](n: Int) extends GraphStage[FlowShape[T, (immutable.
private val SubscriptionTimer = "SubstreamSubscriptionTimer"
override protected def onTimer(timerKey: Any): Unit = {
val timeout = ActorMaterializer.downcast(interpreter.materializer).settings.subscriptionTimeoutSettings.timeout
tailSource.timeout(timeout)
if (tailSource.isClosed) completeStage()
val materializer = ActorMaterializer.downcast(interpreter.materializer)
val timeoutSettings = materializer.settings.subscriptionTimeoutSettings
val timeout = timeoutSettings.timeout
timeoutSettings.mode match {
case StreamSubscriptionTimeoutTerminationMode.CancelTermination
tailSource.timeout(timeout)
if (tailSource.isClosed) completeStage()
case StreamSubscriptionTimeoutTerminationMode.NoopTermination
// do nothing
case StreamSubscriptionTimeoutTerminationMode.WarnTermination
materializer.logger.warning("Substream subscription timeout triggered after {} in prefixAndTail({}).", timeout, n)
}
}
private def prefixComplete = builder eq null