diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala index bb42a97d14..0362ebbefa 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala @@ -4,12 +4,15 @@ package akka.stream.impl -import java.util.concurrent.TimeoutException +import java.util.concurrent.{ TimeUnit, TimeoutException } + import akka.Done import akka.stream.scaladsl._ import akka.stream.testkit.Utils._ import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream._ +import org.scalatest.{ Matchers, WordSpecLike } + import scala.concurrent.duration._ import scala.concurrent.{ Await, Future } @@ -354,3 +357,24 @@ class TimeoutsSpec extends StreamSpec { } } + +class TimeoutChecksSpec extends WordSpecLike with Matchers { + + "Timeout check interval" must { + import scala.concurrent.duration.{ Duration, FiniteDuration } + + "run twice for timeouts under 800ms" in { + Timers.timeoutCheckInterval(800.millis) should ===(100.millis) + } + + "run every 8th of the value for timeouts for timeouts under 1s" in { + Timers.timeoutCheckInterval(999.millis).toNanos should ===(999.millis.toNanos / 8) + } + + "run every 1s for timeouts over 1s" in { + Timers.timeoutCheckInterval(1001.millis) should ===(1.second) + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index 53b74ee9e6..8b913478cb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -25,11 +25,17 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure */ @InternalApi private[akka] object Timers { - private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { + + /** + * Given a timeout computes how often the check should be run without causing + * excessive load or loosing timeout precision. + */ + private[akka] def timeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { import scala.concurrent.duration._ - FiniteDuration( - math.min(math.max(timeout.toNanos / 8, 100.millis.toNanos), timeout.toNanos / 2), - TimeUnit.NANOSECONDS) + if (timeout > 1.second) 1.second + else { + FiniteDuration(math.min(math.max(timeout.toNanos / 8, 100.millis.toNanos), timeout.toNanos / 2), TimeUnit.NANOSECONDS) + } } final class Initial[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { @@ -100,7 +106,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } if (nextDeadline - System.nanoTime < 0) failStage(new TimeoutException(s"No elements passed in the last $timeout.")) - override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) + override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout)) } override def toString = "IdleTimeout" @@ -132,7 +138,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } if (waitingDemand && (nextDeadline - System.nanoTime < 0)) failStage(new TimeoutException(s"No demand signalled in the last $timeout.")) - override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) + override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout)) } override def toString = "BackpressureTimeout" @@ -160,13 +166,14 @@ import scala.concurrent.duration.{ Duration, FiniteDuration } if (nextDeadline - System.nanoTime < 0) failStage(new TimeoutException(s"No elements passed in the last $timeout.")) - override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) + override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout)) class IdleBidiHandler[P](in: Inlet[P], out: Outlet[P]) extends InHandler with OutHandler { override def onPush(): Unit = { onActivity() push(out, grab(in)) } + override def onPull(): Unit = pull(in) override def onUpstreamFinish(): Unit = complete(out) override def onDownstreamFinish(): Unit = cancel(in)