Adds test and caps the timeout check period to 1s #24616

This commit is contained in:
Ignasi Marimon-Clos 2018-03-01 11:47:02 +01:00 committed by Johan Andrén
parent c9ff3cbf80
commit 3de7a4c27c
2 changed files with 39 additions and 8 deletions

View file

@ -4,12 +4,15 @@
package akka.stream.impl package akka.stream.impl
import java.util.concurrent.TimeoutException import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.Done import akka.Done
import akka.stream.scaladsl._ import akka.stream.scaladsl._
import akka.stream.testkit.Utils._ import akka.stream.testkit.Utils._
import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ StreamSpec, TestPublisher, TestSubscriber }
import akka.stream._ import akka.stream._
import org.scalatest.{ Matchers, WordSpecLike }
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.{ Await, Future } import scala.concurrent.{ Await, Future }
@ -354,3 +357,24 @@ class TimeoutsSpec extends StreamSpec {
} }
} }
class TimeoutChecksSpec extends WordSpecLike with Matchers {
"Timeout check interval" must {
import scala.concurrent.duration.{ Duration, FiniteDuration }
"run twice for timeouts under 800ms" in {
Timers.timeoutCheckInterval(800.millis) should ===(100.millis)
}
"run every 8th of the value for timeouts for timeouts under 1s" in {
Timers.timeoutCheckInterval(999.millis).toNanos should ===(999.millis.toNanos / 8)
}
"run every 1s for timeouts over 1s" in {
Timers.timeoutCheckInterval(1001.millis) should ===(1.second)
}
}
}

View file

@ -25,11 +25,17 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
* - otherwise, these streams do not interfere with the element flow, ordinary completion or failure * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure
*/ */
@InternalApi private[akka] object Timers { @InternalApi private[akka] object Timers {
private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
/**
* Given a timeout computes how often the check should be run without causing
* excessive load or loosing timeout precision.
*/
private[akka] def timeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._ import scala.concurrent.duration._
FiniteDuration( if (timeout > 1.second) 1.second
math.min(math.max(timeout.toNanos / 8, 100.millis.toNanos), timeout.toNanos / 2), else {
TimeUnit.NANOSECONDS) FiniteDuration(math.min(math.max(timeout.toNanos / 8, 100.millis.toNanos), timeout.toNanos / 2), TimeUnit.NANOSECONDS)
}
} }
final class Initial[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Initial[T](val timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
@ -100,7 +106,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
if (nextDeadline - System.nanoTime < 0) if (nextDeadline - System.nanoTime < 0)
failStage(new TimeoutException(s"No elements passed in the last $timeout.")) failStage(new TimeoutException(s"No elements passed in the last $timeout."))
override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout))
} }
override def toString = "IdleTimeout" override def toString = "IdleTimeout"
@ -132,7 +138,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
if (waitingDemand && (nextDeadline - System.nanoTime < 0)) if (waitingDemand && (nextDeadline - System.nanoTime < 0))
failStage(new TimeoutException(s"No demand signalled in the last $timeout.")) failStage(new TimeoutException(s"No demand signalled in the last $timeout."))
override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout))
} }
override def toString = "BackpressureTimeout" override def toString = "BackpressureTimeout"
@ -160,13 +166,14 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
if (nextDeadline - System.nanoTime < 0) if (nextDeadline - System.nanoTime < 0)
failStage(new TimeoutException(s"No elements passed in the last $timeout.")) failStage(new TimeoutException(s"No elements passed in the last $timeout."))
override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, timeoutCheckInterval(timeout))
class IdleBidiHandler[P](in: Inlet[P], out: Outlet[P]) extends InHandler with OutHandler { class IdleBidiHandler[P](in: Inlet[P], out: Outlet[P]) extends InHandler with OutHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
onActivity() onActivity()
push(out, grab(in)) push(out, grab(in))
} }
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = complete(out) override def onUpstreamFinish(): Unit = complete(out)
override def onDownstreamFinish(): Unit = cancel(in) override def onDownstreamFinish(): Unit = cancel(in)