Introduced backpressure timeout (#20131) stage.

This commit is contained in:
Robert Budźko 2016-04-19 16:31:17 +02:00
parent 2420b96bc6
commit 8534adf603
13 changed files with 550 additions and 141 deletions

View file

@ -63,7 +63,10 @@ Timeouts
Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded
as a more general purpose streaming infrastructure feature. as a more general purpose streaming infrastructure feature.
However, akka-stream should soon provide such a feature.
It should be noted that Akka Streams provide various timeout functionality so any API that uses streams can benefit
from the stream stages such as ``idleTimeout``, ``backpressureTimeout``, ``completionTimeout``, ``initialTimeout``
and ``throttle``. To learn more about these refer to their documentation in Akka Streams (and Java Doc).
.. _http-client-layer-java: .. _http-client-layer-java:

View file

@ -996,6 +996,90 @@ merging. The maximum number of merged sources has to be specified.
**completes** when upstream completes and all consumed substreams complete **completes** when upstream completes and all consumed substreams complete
Time aware stages
-----------------
Those stages operate taking time into consideration.
initialTimeout
^^^^^^^^^^^^^^
If the first element has not passed through this stage before the provided timeout, the stream is failed
with a ``TimeoutException``.
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses before first element arrives
**cancels** when downstream cancels
completionTimeout
^^^^^^^^^^^^^^^^^
If the completion of the stream does not happen until the provided timeout, the stream is failed
with a ``TimeoutException``.
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses before upstream completes
**cancels** when downstream cancels
idleTimeout
^^^^^^^^^^^
If the time between two processed elements exceeds the provided timeout, the stream is failed
with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the
check is one period (equals to timeout value).
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses between two emitted elements
**cancels** when downstream cancels
backpressureTimeout
^^^^^^^^^^^^^^^^^^^
If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
the stream is failed with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the
check is one period (equals to timeout value).
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand.
**cancels** when downstream cancels
keepAlive
^^^^^^^^^
Injects additional (configured) elements if upstream does not emit for a configured amount of time.
**emits** when upstream emits an element or if the upstream was idle for the configured period
**backpressures** when downstream backpressures
**completes** when upstream completes
**cancels** when downstream cancels
initialDelay
^^^^^^^^^^^^
Delays the initial element by the specified duration.
**emits** when upstream emits an element if the initial delay is already elapsed
**backpressures** when downstream backpressures or initial delay is not yet elapsed
**completes** when upstream completes
**cancels** when downstream cancels
Fan-in stages Fan-in stages
------------- -------------

View file

@ -66,9 +66,9 @@ Timeouts
Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded
as a more general purpose streaming infrastructure feature. as a more general purpose streaming infrastructure feature.
It should be noted that Akka Streams provide various timeout functionality so any API that uses a streams can benefit It should be noted that Akka Streams provide various timeout functionality so any API that uses streams can benefit
from the stream stages such as ``idleTimeout``, ``completionTimeout``, ``initialTimeout`` and even ``throttle``. from the stream stages such as ``idleTimeout``, ``backpressureTimeout``, ``completionTimeout``, ``initialTimeout``
To learn more about these refer to their documentation in Akka Streams (and Scala Doc). and ``throttle``. To learn more about these refer to their documentation in Akka Streams (and Scala Doc).
For more details about timeout support in Akka HTTP in general refer to :ref:`http-timeouts`. For more details about timeout support in Akka HTTP in general refer to :ref:`http-timeouts`.

View file

@ -987,6 +987,90 @@ merging. The maximum number of merged sources has to be specified.
**completes** when upstream completes and all consumed substreams complete **completes** when upstream completes and all consumed substreams complete
Time aware stages
-----------------
Those stages operate taking time into consideration.
initialTimeout
^^^^^^^^^^^^^^
If the first element has not passed through this stage before the provided timeout, the stream is failed
with a ``TimeoutException``.
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses before first element arrives
**cancels** when downstream cancels
completionTimeout
^^^^^^^^^^^^^^^^^
If the completion of the stream does not happen until the provided timeout, the stream is failed
with a ``TimeoutException``.
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses before upstream completes
**cancels** when downstream cancels
idleTimeout
^^^^^^^^^^^
If the time between two processed elements exceeds the provided timeout, the stream is failed
with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the
check is one period (equals to timeout value).
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses between two emitted elements
**cancels** when downstream cancels
backpressureTimeout
^^^^^^^^^^^^^^^^^^^
If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
the stream is failed with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the
check is one period (equals to timeout value).
**emits** when upstream emits an element
**backpressures** when downstream backpressures
**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand.
**cancels** when downstream cancels
keepAlive
^^^^^^^^^
Injects additional (configured) elements if upstream does not emit for a configured amount of time.
**emits** when upstream emits an element or if the upstream was idle for the configured period
**backpressures** when downstream backpressures
**completes** when upstream completes
**cancels** when downstream cancels
initialDelay
^^^^^^^^^^^^
Delays the initial element by the specified duration.
**emits** when upstream emits an element if the initial delay is already elapsed
**backpressures** when downstream backpressures or initial delay is not yet elapsed
**completes** when upstream completes
**cancels** when downstream cancels
Fan-in stages Fan-in stages
------------- -------------

View file

@ -130,6 +130,117 @@ class TimeoutsSpec extends AkkaSpec {
} }
"BackpressureTimeout" must {
"pass through elements unmodified" in assertAllStagesStopped {
Await.result(Source(1 to 100).backpressureTimeout(1.second).grouped(200).runWith(Sink.head), 3.seconds) should === (1 to 100)
}
"succeed if subscriber demand arrives" in assertAllStagesStopped {
val subscriber = TestSubscriber.probe[Int]()
Source(1 to 4)
.backpressureTimeout(1.second)
.runWith(Sink.fromSubscriber(subscriber))
for (i 1 to 3) {
subscriber.requestNext(i)
subscriber.expectNoMsg(250.millis)
}
subscriber.requestNext(4)
subscriber.expectComplete()
}
"not throw if publisher is less frequent than timeout" in assertAllStagesStopped {
val publisher = TestPublisher.probe[String]()
val subscriber = TestSubscriber.probe[String]()
Source.fromPublisher(publisher)
.backpressureTimeout(1.second)
.runWith(Sink.fromSubscriber(subscriber))
subscriber.request(2)
subscriber.expectNoMsg(1.second)
publisher.sendNext("Quick Msg")
subscriber.expectNext("Quick Msg")
subscriber.expectNoMsg(3.seconds)
publisher.sendNext("Slow Msg")
subscriber.expectNext("Slow Msg")
publisher.sendComplete()
subscriber.expectComplete()
}
"not throw if publisher won't perform emission ever" in assertAllStagesStopped {
val publisher = TestPublisher.probe[String]()
val subscriber = TestSubscriber.probe[String]()
Source.fromPublisher(publisher)
.backpressureTimeout(1.second)
.runWith(Sink.fromSubscriber(subscriber))
subscriber.request(16)
subscriber.expectNoMsg(2.second)
publisher.sendComplete()
subscriber.expectComplete()
}
"throw if subscriber won't generate demand on time" in assertAllStagesStopped {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()
Source.fromPublisher(publisher)
.backpressureTimeout(1.second)
.runWith(Sink.fromSubscriber(subscriber))
subscriber.request(1)
publisher.sendNext(1)
subscriber.expectNext(1)
Thread.sleep(3000)
subscriber.expectError().getMessage should === ("No demand signalled in the last 1 second.")
}
"throw if subscriber never generate demand" in assertAllStagesStopped {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()
Source.fromPublisher(publisher)
.backpressureTimeout(1.second)
.runWith(Sink.fromSubscriber(subscriber))
subscriber.expectSubscription()
Thread.sleep(3000)
subscriber.expectError().getMessage should === ("No demand signalled in the last 1 second.")
}
"not throw if publisher completes without fulfilling subscriber's demand" in assertAllStagesStopped {
val publisher = TestPublisher.probe[Int]()
val subscriber = TestSubscriber.probe[Int]()
Source.fromPublisher(publisher)
.backpressureTimeout(1.second)
.runWith(Sink.fromSubscriber(subscriber))
subscriber.request(2)
publisher.sendNext(1)
subscriber.expectNext(1)
subscriber.expectNoMsg(2.second)
publisher.sendComplete()
subscriber.expectComplete()
}
}
"IdleTimeoutBidi" must { "IdleTimeoutBidi" must {
"not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped { "not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped {

View file

@ -62,6 +62,14 @@ private[stream] object Stages {
val processorWithKey = name("processorWithKey") val processorWithKey = name("processorWithKey")
val identityOp = name("identityOp") val identityOp = name("identityOp")
val initial = name("initial")
val completion = name("completion")
val idle = name("idle")
val idleTimeoutBidi = name("idleTimeoutBidi")
val delayInitial = name("delayInitial")
val idleInject = name("idleInject")
val backpressureTimeout = name("backpressureTimeout")
val merge = name("merge") val merge = name("merge")
val mergePreferred = name("mergePreferred") val mergePreferred = name("mergePreferred")
val flattenMerge = name("flattenMerge") val flattenMerge = name("flattenMerge")

View file

@ -5,11 +5,12 @@ package akka.stream.impl
import java.util.concurrent.{ TimeUnit, TimeoutException } import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream._ import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.stage._
import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration } import scala.concurrent.duration.{ Duration, FiniteDuration }
/** /**
* INTERNAL API * INTERNAL API
@ -31,73 +32,110 @@ private[stream] object Timers {
} }
final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def initialAttributes = DefaultAttributes.initial
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var initialHasPassed = false private var initialHasPassed = false
setHandler(in, new InHandler { setHandlers(in, out, this)
override def onPush(): Unit = { override def onPush(): Unit = {
initialHasPassed = true initialHasPassed = true
push(out, grab(in)) push(out, grab(in))
} }
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
})
final override protected def onTimer(key: Any): Unit = final override protected def onTimer(key: Any): Unit =
if (!initialHasPassed) if (!initialHasPassed)
failStage(new TimeoutException(s"The first element has not yet passed through in $timeout.")) failStage(new TimeoutException(s"The first element has not yet passed through in $timeout."))
override def preStart(): Unit = scheduleOnce("InitialTimeout", timeout) override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout)
} }
override def toString = "InitialTimeoutTimer" override def toString = "InitialTimeoutTimer"
} }
final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.completion
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
setHandlers(in, out, this)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in)) override def onPush(): Unit = push(out, grab(in))
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
})
final override protected def onTimer(key: Any): Unit = final override protected def onTimer(key: Any): Unit =
failStage(new TimeoutException(s"The stream has not been completed in $timeout.")) failStage(new TimeoutException(s"The stream has not been completed in $timeout."))
override def preStart(): Unit = scheduleOnce("CompletionTimeoutTimer", timeout) override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout)
} }
override def toString = "CompletionTimeout" override def toString = "CompletionTimeout"
} }
final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.idle
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
setHandlers(in, out, this)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var nextDeadline: Deadline = Deadline.now + timeout
setHandler(in, new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
nextDeadline = Deadline.now + timeout nextDeadline = System.nanoTime + timeout.toNanos
push(out, grab(in)) push(out, grab(in))
} }
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in) override def onPull(): Unit = pull(in)
})
final override protected def onTimer(key: Any): Unit = final override protected def onTimer(key: Any): Unit =
if (nextDeadline.isOverdue()) if (nextDeadline - System.nanoTime < 0)
failStage(new TimeoutException(s"No elements passed in the last $timeout.")) failStage(new TimeoutException(s"No elements passed in the last $timeout."))
override def preStart(): Unit = schedulePeriodically("IdleTimeoutCheckTimer", interval = idleTimeoutCheckInterval(timeout)) override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout))
} }
override def toString = "IdleTimeout" override def toString = "IdleTimeout"
}
final class BackpressureTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def initialAttributes = DefaultAttributes.backpressureTimeout
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
private var waitingDemand: Boolean = true
setHandlers(in, out, this)
override def onPush(): Unit = {
push(out, grab(in))
nextDeadline = System.nanoTime + timeout.toNanos
waitingDemand = true
}
override def onPull(): Unit = {
waitingDemand = false
pull(in)
}
final override protected def onTimer(key: Any): Unit =
if (waitingDemand && (nextDeadline - System.nanoTime < 0))
failStage(new TimeoutException(s"No demand signalled in the last $timeout."))
override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout))
}
override def toString = "BackpressureTimeout"
} }
final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] {
@ -105,99 +143,91 @@ private[stream] object Timers {
val in2 = Inlet[O]("in2") val in2 = Inlet[O]("in2")
val out1 = Outlet[I]("out1") val out1 = Outlet[I]("out1")
val out2 = Outlet[O]("out2") val out2 = Outlet[O]("out2")
override def initialAttributes = Attributes.name("IdleTimeoutBidi")
val shape = BidiShape(in1, out1, in2, out2) val shape = BidiShape(in1, out1, in2, out2)
override def initialAttributes = DefaultAttributes.idleTimeoutBidi
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var nextDeadline: Long = System.nanoTime + timeout.toNanos
setHandlers(in1, out1, new IdleBidiHandler(in1, out1))
setHandlers(in2, out2, new IdleBidiHandler(in2, out2))
private def onActivity(): Unit = nextDeadline = System.nanoTime + timeout.toNanos
final override def onTimer(key: Any): Unit =
if (nextDeadline - System.nanoTime < 0)
failStage(new TimeoutException(s"No elements passed in the last $timeout."))
override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout))
class IdleBidiHandler[P](in: Inlet[P], out: Outlet[P]) extends InHandler with OutHandler {
override def onPush(): Unit = {
onActivity()
push(out, grab(in))
}
override def onPull(): Unit = pull(in)
override def onUpstreamFinish(): Unit = complete(out)
override def onDownstreamFinish(): Unit = cancel(in)
}
}
override def toString = "IdleTimeoutBidi" override def toString = "IdleTimeoutBidi"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private var nextDeadline: Deadline = Deadline.now + timeout
setHandler(in1, new InHandler {
override def onPush(): Unit = {
onActivity()
push(out1, grab(in1))
}
override def onUpstreamFinish(): Unit = complete(out1)
})
setHandler(in2, new InHandler {
override def onPush(): Unit = {
onActivity()
push(out2, grab(in2))
}
override def onUpstreamFinish(): Unit = complete(out2)
})
setHandler(out1, new OutHandler {
override def onPull(): Unit = pull(in1)
override def onDownstreamFinish(): Unit = cancel(in1)
})
setHandler(out2, new OutHandler {
override def onPull(): Unit = pull(in2)
override def onDownstreamFinish(): Unit = cancel(in2)
})
private def onActivity(): Unit = nextDeadline = Deadline.now + timeout
final override def onTimer(key: Any): Unit =
if (nextDeadline.isOverdue())
failStage(new TimeoutException(s"No elements passed in the last $timeout."))
override def preStart(): Unit = schedulePeriodically("IdleTimeoutCheckTimer", idleTimeoutCheckInterval(timeout))
}
} }
final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] { final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in: Inlet[T] = Inlet("IdleInject.in") val in: Inlet[T] = Inlet("IdleInject.in")
val out: Outlet[T] = Outlet("IdleInject.out") val out: Outlet[T] = Outlet("IdleInject.out")
override def initialAttributes = Attributes.name("DelayInitial")
override val shape: FlowShape[T, T] = FlowShape(in, out) override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def initialAttributes = DefaultAttributes.delayInitial
private val IdleTimer = "DelayTimer"
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var open: Boolean = false
setHandlers(in, out, this)
override def preStart(): Unit = { override def preStart(): Unit = {
if (delay == Duration.Zero) open = true if (delay == Duration.Zero) open = true
else scheduleOnce(IdleTimer, delay) else scheduleOnce(GraphStageLogicTimer, delay)
} }
private var open: Boolean = false
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in)) override def onPush(): Unit = push(out, grab(in))
})
setHandler(out, new OutHandler {
override def onPull(): Unit = if (open) pull(in) override def onPull(): Unit = if (open) pull(in)
})
override protected def onTimer(timerKey: Any): Unit = { override protected def onTimer(timerKey: Any): Unit = {
open = true open = true
if (isAvailable(out)) pull(in) if (isAvailable(out)) pull(in)
} }
} }
override def toString = "DelayTimer"
} }
final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () O) extends GraphStage[FlowShape[I, O]] { final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () O) extends GraphStage[FlowShape[I, O]] {
val in: Inlet[I] = Inlet("IdleInject.in") val in: Inlet[I] = Inlet("IdleInject.in")
val out: Outlet[O] = Outlet("IdleInject.out") val out: Outlet[O] = Outlet("IdleInject.out")
override def initialAttributes = Attributes.name("IdleInject")
override def initialAttributes = DefaultAttributes.idleInject
override val shape: FlowShape[I, O] = FlowShape(in, out) override val shape: FlowShape[I, O] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
private val IdleTimer = "IdleTimer" new TimerGraphStageLogic(shape) with InHandler with OutHandler {
private var nextDeadline = Deadline.now + timeout private var nextDeadline: Long = System.nanoTime + timeout.toNanos
setHandlers(in, out, this)
// Prefetching to ensure priority of actual upstream elements // Prefetching to ensure priority of actual upstream elements
override def preStart(): Unit = pull(in) override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
nextDeadline = Deadline.now + timeout nextDeadline = System.nanoTime + timeout.toNanos
cancelTimer(IdleTimer) cancelTimer(GraphStageLogicTimer)
if (isAvailable(out)) { if (isAvailable(out)) {
push(out, grab(in)) push(out, grab(in))
pull(in) pull(in)
@ -207,31 +237,33 @@ private[stream] object Timers {
override def onUpstreamFinish(): Unit = { override def onUpstreamFinish(): Unit = {
if (!isAvailable(in)) completeStage() if (!isAvailable(in)) completeStage()
} }
})
setHandler(out, new OutHandler {
override def onPull(): Unit = { override def onPull(): Unit = {
if (isAvailable(in)) { if (isAvailable(in)) {
push(out, grab(in)) push(out, grab(in))
if (isClosed(in)) completeStage() if (isClosed(in)) completeStage()
else pull(in) else pull(in)
} else { } else {
if (nextDeadline.isOverdue()) { val time = System.nanoTime
nextDeadline = Deadline.now + timeout if (nextDeadline - time < 0) {
nextDeadline = time + timeout.toNanos
push(out, inject()) push(out, inject())
} else scheduleOnce(IdleTimer, nextDeadline.timeLeft) } else scheduleOnce(GraphStageLogicTimer, FiniteDuration(nextDeadline - time, TimeUnit.NANOSECONDS))
} }
} }
})
override protected def onTimer(timerKey: Any): Unit = { override protected def onTimer(timerKey: Any): Unit = {
if (nextDeadline.isOverdue() && isAvailable(out)) { val time = System.nanoTime
if ((nextDeadline - time < 0) && isAvailable(out)) {
push(out, inject()) push(out, inject())
nextDeadline = Deadline.now + timeout nextDeadline = time + timeout.toNanos
} }
} }
} }
override def toString = "IdleTimer"
} }
case object GraphStageLogicTimer
} }

View file

@ -6,7 +6,7 @@ package akka.stream.javadsl
import akka.{ NotUsed, Done } import akka.{ NotUsed, Done }
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ function, Pair } import akka.japi.{ function, Pair }
import akka.stream.impl.{ConstantFun, StreamLayout} import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream._ import akka.stream._
import akka.stream.stage.Stage import akka.stream.stage.Stage
import org.reactivestreams.Processor import org.reactivestreams.Processor
@ -1594,8 +1594,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.completionTimeout(timeout)) new Flow(delegate.completionTimeout(timeout))
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
* *
* '''Emits when''' upstream emits an element * '''Emits when''' upstream emits an element
* *
@ -1609,7 +1610,23 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.idleTimeout(timeout)) new Flow(delegate.idleTimeout(timeout))
/** /**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this * If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.backpressureTimeout(timeout))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream. * stage attempts to maintains a base rate of emitted elements towards the downstream.
* *
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
@ -1726,9 +1743,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *
* '''Emits when''' upstream emits an element if the initial delay already elapsed * '''Emits when''' upstream emits an element if the initial delay is already elapsed
* *
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
* *
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *

View file

@ -1809,8 +1809,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.completionTimeout(timeout)) new Source(delegate.completionTimeout(timeout))
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
* *
* '''Emits when''' upstream emits an element * '''Emits when''' upstream emits an element
* *
@ -1824,7 +1825,23 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.idleTimeout(timeout)) new Source(delegate.idleTimeout(timeout))
/** /**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this * If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.backpressureTimeout(timeout))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream. * stage attempts to maintains a base rate of emitted elements towards the downstream.
* *
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
@ -1941,9 +1958,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *
* '''Emits when''' upstream emits an element if the initial delay already elapsed * '''Emits when''' upstream emits an element if the initial delay is already elapsed
* *
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
* *
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *

View file

@ -1100,8 +1100,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
new SubFlow(delegate.completionTimeout(timeout)) new SubFlow(delegate.completionTimeout(timeout))
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
* *
* '''Emits when''' upstream emits an element * '''Emits when''' upstream emits an element
* *
@ -1115,7 +1116,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
new SubFlow(delegate.idleTimeout(timeout)) new SubFlow(delegate.idleTimeout(timeout))
/** /**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this * If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: FiniteDuration): SubFlow[In, Out, Mat] =
new SubFlow(delegate.backpressureTimeout(timeout))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream. * stage attempts to maintains a base rate of emitted elements towards the downstream.
* *
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
@ -1214,9 +1231,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *
* '''Emits when''' upstream emits an element if the initial delay already elapsed * '''Emits when''' upstream emits an element if the initial delay is already elapsed
* *
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
* *
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *

View file

@ -1097,8 +1097,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
new SubSource(delegate.completionTimeout(timeout)) new SubSource(delegate.completionTimeout(timeout))
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
* *
* '''Emits when''' upstream emits an element * '''Emits when''' upstream emits an element
* *
@ -1112,7 +1113,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
new SubSource(delegate.idleTimeout(timeout)) new SubSource(delegate.idleTimeout(timeout))
/** /**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this * If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: FiniteDuration): SubSource[Out, Mat] =
new SubSource(delegate.backpressureTimeout(timeout))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream. * stage attempts to maintains a base rate of emitted elements towards the downstream.
* *
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
@ -1204,9 +1221,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *
* '''Emits when''' upstream emits an element if the initial delay already elapsed * '''Emits when''' upstream emits an element if the initial delay is already elapsed
* *
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
* *
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *

View file

@ -1438,8 +1438,9 @@ trait FlowOps[+Out, +Mat] {
def completionTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.Completion[Out](timeout)) def completionTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.Completion[Out](timeout))
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceeds the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]]. * with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
* *
* '''Emits when''' upstream emits an element * '''Emits when''' upstream emits an element
* *
@ -1452,7 +1453,22 @@ trait FlowOps[+Out, +Mat] {
def idleTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.Idle[Out](timeout)) def idleTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.Idle[Out](timeout))
/** /**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this * If the time between the emission of an element and the following downstream demand exceeds the provided timeout,
* the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically,
* so the resolution of the check is one period (equals to timeout value).
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand.
*
* '''Cancels when''' downstream cancels
*/
def backpressureTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.BackpressureTimeout[Out](timeout))
/**
* Injects additional elements if upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream. * stage attempts to maintains a base rate of emitted elements towards the downstream.
* *
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
@ -1551,9 +1567,9 @@ trait FlowOps[+Out, +Mat] {
/** /**
* Delays the initial element by the specified duration. * Delays the initial element by the specified duration.
* *
* '''Emits when''' upstream emits an element if the initial delay already elapsed * '''Emits when''' upstream emits an element if the initial delay is already elapsed
* *
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed
* *
* '''Completes when''' upstream completes * '''Completes when''' upstream completes
* *

View file

@ -777,7 +777,10 @@ object MiMa extends AutoPlugin {
// #19225 - GraphStage and removal of isTerminated // #19225 - GraphStage and removal of isTerminated
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.parsing.HttpMessageParser.isTerminated"), ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.parsing.HttpMessageParser.isTerminated"),
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.engine.parsing.HttpMessageParser.stage") ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.engine.parsing.HttpMessageParser.stage"),
// #20131 - flow combinator
ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.backpressureTimeout")
) )
) )
} }