diff --git a/akka-docs/rst/java/http/client-side/connection-level.rst b/akka-docs/rst/java/http/client-side/connection-level.rst index eb2ff3c021..558237a521 100644 --- a/akka-docs/rst/java/http/client-side/connection-level.rst +++ b/akka-docs/rst/java/http/client-side/connection-level.rst @@ -63,7 +63,10 @@ Timeouts Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded as a more general purpose streaming infrastructure feature. -However, akka-stream should soon provide such a feature. + +It should be noted that Akka Streams provide various timeout functionality so any API that uses streams can benefit +from the stream stages such as ``idleTimeout``, ``backpressureTimeout``, ``completionTimeout``, ``initialTimeout`` +and ``throttle``. To learn more about these refer to their documentation in Akka Streams (and Java Doc). .. _http-client-layer-java: diff --git a/akka-docs/rst/java/stream/stages-overview.rst b/akka-docs/rst/java/stream/stages-overview.rst index a0a26b8d36..6373eb8e70 100644 --- a/akka-docs/rst/java/stream/stages-overview.rst +++ b/akka-docs/rst/java/stream/stages-overview.rst @@ -996,6 +996,90 @@ merging. The maximum number of merged sources has to be specified. **completes** when upstream completes and all consumed substreams complete +Time aware stages +----------------- + +Those stages operate taking time into consideration. + +initialTimeout +^^^^^^^^^^^^^^ +If the first element has not passed through this stage before the provided timeout, the stream is failed +with a ``TimeoutException``. + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses before first element arrives + +**cancels** when downstream cancels + +completionTimeout +^^^^^^^^^^^^^^^^^ +If the completion of the stream does not happen until the provided timeout, the stream is failed +with a ``TimeoutException``. + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses before upstream completes + +**cancels** when downstream cancels + +idleTimeout +^^^^^^^^^^^ +If the time between two processed elements exceeds the provided timeout, the stream is failed +with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the +check is one period (equals to timeout value). + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses between two emitted elements + +**cancels** when downstream cancels + +backpressureTimeout +^^^^^^^^^^^^^^^^^^^ +If the time between the emission of an element and the following downstream demand exceeds the provided timeout, +the stream is failed with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the +check is one period (equals to timeout value). + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand. + +**cancels** when downstream cancels + +keepAlive +^^^^^^^^^ +Injects additional (configured) elements if upstream does not emit for a configured amount of time. + +**emits** when upstream emits an element or if the upstream was idle for the configured period + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +**cancels** when downstream cancels + +initialDelay +^^^^^^^^^^^^ +Delays the initial element by the specified duration. + +**emits** when upstream emits an element if the initial delay is already elapsed + +**backpressures** when downstream backpressures or initial delay is not yet elapsed + +**completes** when upstream completes + +**cancels** when downstream cancels + + Fan-in stages ------------- diff --git a/akka-docs/rst/scala/http/client-side/connection-level.rst b/akka-docs/rst/scala/http/client-side/connection-level.rst index b29ba71176..761b1b86c3 100644 --- a/akka-docs/rst/scala/http/client-side/connection-level.rst +++ b/akka-docs/rst/scala/http/client-side/connection-level.rst @@ -66,9 +66,9 @@ Timeouts Currently Akka HTTP doesn't implement client-side request timeout checking itself as this functionality can be regarded as a more general purpose streaming infrastructure feature. -It should be noted that Akka Streams provide various timeout functionality so any API that uses a streams can benefit -from the stream stages such as ``idleTimeout``, ``completionTimeout``, ``initialTimeout`` and even ``throttle``. -To learn more about these refer to their documentation in Akka Streams (and Scala Doc). +It should be noted that Akka Streams provide various timeout functionality so any API that uses streams can benefit +from the stream stages such as ``idleTimeout``, ``backpressureTimeout``, ``completionTimeout``, ``initialTimeout`` +and ``throttle``. To learn more about these refer to their documentation in Akka Streams (and Scala Doc). For more details about timeout support in Akka HTTP in general refer to :ref:`http-timeouts`. diff --git a/akka-docs/rst/scala/stream/stages-overview.rst b/akka-docs/rst/scala/stream/stages-overview.rst index 09a5288523..dcae95bc47 100644 --- a/akka-docs/rst/scala/stream/stages-overview.rst +++ b/akka-docs/rst/scala/stream/stages-overview.rst @@ -987,6 +987,90 @@ merging. The maximum number of merged sources has to be specified. **completes** when upstream completes and all consumed substreams complete +Time aware stages +----------------- + +Those stages operate taking time into consideration. + +initialTimeout +^^^^^^^^^^^^^^ +If the first element has not passed through this stage before the provided timeout, the stream is failed +with a ``TimeoutException``. + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses before first element arrives + +**cancels** when downstream cancels + +completionTimeout +^^^^^^^^^^^^^^^^^ +If the completion of the stream does not happen until the provided timeout, the stream is failed +with a ``TimeoutException``. + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses before upstream completes + +**cancels** when downstream cancels + +idleTimeout +^^^^^^^^^^^ +If the time between two processed elements exceeds the provided timeout, the stream is failed +with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the +check is one period (equals to timeout value). + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses between two emitted elements + +**cancels** when downstream cancels + +backpressureTimeout +^^^^^^^^^^^^^^^^^^^ +If the time between the emission of an element and the following downstream demand exceeds the provided timeout, +the stream is failed with a ``TimeoutException``. The timeout is checked periodically, so the resolution of the +check is one period (equals to timeout value). + +**emits** when upstream emits an element + +**backpressures** when downstream backpressures + +**completes** when upstream completes or fails if timeout elapses between element emission and downstream demand. + +**cancels** when downstream cancels + +keepAlive +^^^^^^^^^ +Injects additional (configured) elements if upstream does not emit for a configured amount of time. + +**emits** when upstream emits an element or if the upstream was idle for the configured period + +**backpressures** when downstream backpressures + +**completes** when upstream completes + +**cancels** when downstream cancels + +initialDelay +^^^^^^^^^^^^ +Delays the initial element by the specified duration. + +**emits** when upstream emits an element if the initial delay is already elapsed + +**backpressures** when downstream backpressures or initial delay is not yet elapsed + +**completes** when upstream completes + +**cancels** when downstream cancels + + Fan-in stages ------------- diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala index b0f19d844b..a88fc8f0f4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/TimeoutsSpec.scala @@ -130,6 +130,117 @@ class TimeoutsSpec extends AkkaSpec { } + "BackpressureTimeout" must { + + "pass through elements unmodified" in assertAllStagesStopped { + Await.result(Source(1 to 100).backpressureTimeout(1.second).grouped(200).runWith(Sink.head), 3.seconds) should === (1 to 100) + } + + "succeed if subscriber demand arrives" in assertAllStagesStopped { + val subscriber = TestSubscriber.probe[Int]() + + Source(1 to 4) + .backpressureTimeout(1.second) + .runWith(Sink.fromSubscriber(subscriber)) + + for (i ← 1 to 3) { + subscriber.requestNext(i) + subscriber.expectNoMsg(250.millis) + } + + subscriber.requestNext(4) + subscriber.expectComplete() + } + + "not throw if publisher is less frequent than timeout" in assertAllStagesStopped { + val publisher = TestPublisher.probe[String]() + val subscriber = TestSubscriber.probe[String]() + + Source.fromPublisher(publisher) + .backpressureTimeout(1.second) + .runWith(Sink.fromSubscriber(subscriber)) + + subscriber.request(2) + + subscriber.expectNoMsg(1.second) + publisher.sendNext("Quick Msg") + subscriber.expectNext("Quick Msg") + + subscriber.expectNoMsg(3.seconds) + publisher.sendNext("Slow Msg") + subscriber.expectNext("Slow Msg") + + publisher.sendComplete() + subscriber.expectComplete() + } + + "not throw if publisher won't perform emission ever" in assertAllStagesStopped { + val publisher = TestPublisher.probe[String]() + val subscriber = TestSubscriber.probe[String]() + + Source.fromPublisher(publisher) + .backpressureTimeout(1.second) + .runWith(Sink.fromSubscriber(subscriber)) + + subscriber.request(16) + subscriber.expectNoMsg(2.second) + + publisher.sendComplete() + subscriber.expectComplete() + } + + "throw if subscriber won't generate demand on time" in assertAllStagesStopped { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source.fromPublisher(publisher) + .backpressureTimeout(1.second) + .runWith(Sink.fromSubscriber(subscriber)) + + subscriber.request(1) + publisher.sendNext(1) + subscriber.expectNext(1) + + Thread.sleep(3000) + + subscriber.expectError().getMessage should === ("No demand signalled in the last 1 second.") + } + + "throw if subscriber never generate demand" in assertAllStagesStopped { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source.fromPublisher(publisher) + .backpressureTimeout(1.second) + .runWith(Sink.fromSubscriber(subscriber)) + + subscriber.expectSubscription() + + Thread.sleep(3000) + + subscriber.expectError().getMessage should === ("No demand signalled in the last 1 second.") + } + + "not throw if publisher completes without fulfilling subscriber's demand" in assertAllStagesStopped { + val publisher = TestPublisher.probe[Int]() + val subscriber = TestSubscriber.probe[Int]() + + Source.fromPublisher(publisher) + .backpressureTimeout(1.second) + .runWith(Sink.fromSubscriber(subscriber)) + + subscriber.request(2) + publisher.sendNext(1) + subscriber.expectNext(1) + + subscriber.expectNoMsg(2.second) + + publisher.sendComplete() + subscriber.expectComplete() + } + + } + "IdleTimeoutBidi" must { "not signal error in simple loopback case and pass through elements unmodified" in assertAllStagesStopped { diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index e654d45551..fad770ed7c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -62,6 +62,14 @@ private[stream] object Stages { val processorWithKey = name("processorWithKey") val identityOp = name("identityOp") + val initial = name("initial") + val completion = name("completion") + val idle = name("idle") + val idleTimeoutBidi = name("idleTimeoutBidi") + val delayInitial = name("delayInitial") + val idleInject = name("idleInject") + val backpressureTimeout = name("backpressureTimeout") + val merge = name("merge") val mergePreferred = name("mergePreferred") val flattenMerge = name("flattenMerge") diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala index e14248e50a..8f6ad1081a 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timers.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -5,11 +5,12 @@ package akka.stream.impl import java.util.concurrent.{ TimeUnit, TimeoutException } -import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream._ -import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.stage._ -import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration } +import scala.concurrent.duration.{ Duration, FiniteDuration } /** * INTERNAL API @@ -31,73 +32,110 @@ private[stream] object Timers { } final class Initial[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - private var initialHasPassed = false + override def initialAttributes = DefaultAttributes.initial + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private var initialHasPassed = false + + setHandlers(in, out, this) - setHandler(in, new InHandler { override def onPush(): Unit = { initialHasPassed = true push(out, grab(in)) } - }) - setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) - }) - final override protected def onTimer(key: Any): Unit = - if (!initialHasPassed) - failStage(new TimeoutException(s"The first element has not yet passed through in $timeout.")) + final override protected def onTimer(key: Any): Unit = + if (!initialHasPassed) + failStage(new TimeoutException(s"The first element has not yet passed through in $timeout.")) - override def preStart(): Unit = scheduleOnce("InitialTimeout", timeout) - } + override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout) + } override def toString = "InitialTimeoutTimer" + } final class Completion[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + override def initialAttributes = DefaultAttributes.completion + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + setHandlers(in, out, this) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) - }) - setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) - }) - final override protected def onTimer(key: Any): Unit = - failStage(new TimeoutException(s"The stream has not been completed in $timeout.")) + final override protected def onTimer(key: Any): Unit = + failStage(new TimeoutException(s"The stream has not been completed in $timeout.")) - override def preStart(): Unit = scheduleOnce("CompletionTimeoutTimer", timeout) - } + override def preStart(): Unit = scheduleOnce(GraphStageLogicTimer, timeout) + } override def toString = "CompletionTimeout" + } final class Idle[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + override def initialAttributes = DefaultAttributes.idle + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private var nextDeadline: Long = System.nanoTime + timeout.toNanos + + setHandlers(in, out, this) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - private var nextDeadline: Deadline = Deadline.now + timeout - setHandler(in, new InHandler { override def onPush(): Unit = { - nextDeadline = Deadline.now + timeout + nextDeadline = System.nanoTime + timeout.toNanos push(out, grab(in)) } - }) - setHandler(out, new OutHandler { override def onPull(): Unit = pull(in) - }) - final override protected def onTimer(key: Any): Unit = - if (nextDeadline.isOverdue()) - failStage(new TimeoutException(s"No elements passed in the last $timeout.")) + final override protected def onTimer(key: Any): Unit = + if (nextDeadline - System.nanoTime < 0) + failStage(new TimeoutException(s"No elements passed in the last $timeout.")) - override def preStart(): Unit = schedulePeriodically("IdleTimeoutCheckTimer", interval = idleTimeoutCheckInterval(timeout)) - } + override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) + } override def toString = "IdleTimeout" + + } + + final class BackpressureTimeout[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { + override def initialAttributes = DefaultAttributes.backpressureTimeout + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private var nextDeadline: Long = System.nanoTime + timeout.toNanos + private var waitingDemand: Boolean = true + + setHandlers(in, out, this) + + override def onPush(): Unit = { + push(out, grab(in)) + nextDeadline = System.nanoTime + timeout.toNanos + waitingDemand = true + } + + override def onPull(): Unit = { + waitingDemand = false + pull(in) + } + + final override protected def onTimer(key: Any): Unit = + if (waitingDemand && (nextDeadline - System.nanoTime < 0)) + failStage(new TimeoutException(s"No demand signalled in the last $timeout.")) + + override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) + } + + override def toString = "BackpressureTimeout" + } final class IdleTimeoutBidi[I, O](val timeout: FiniteDuration) extends GraphStage[BidiShape[I, I, O, O]] { @@ -105,99 +143,91 @@ private[stream] object Timers { val in2 = Inlet[O]("in2") val out1 = Outlet[I]("out1") val out2 = Outlet[O]("out2") - - override def initialAttributes = Attributes.name("IdleTimeoutBidi") val shape = BidiShape(in1, out1, in2, out2) + override def initialAttributes = DefaultAttributes.idleTimeoutBidi + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + private var nextDeadline: Long = System.nanoTime + timeout.toNanos + + setHandlers(in1, out1, new IdleBidiHandler(in1, out1)) + setHandlers(in2, out2, new IdleBidiHandler(in2, out2)) + + private def onActivity(): Unit = nextDeadline = System.nanoTime + timeout.toNanos + + final override def onTimer(key: Any): Unit = + if (nextDeadline - System.nanoTime < 0) + failStage(new TimeoutException(s"No elements passed in the last $timeout.")) + + override def preStart(): Unit = schedulePeriodically(GraphStageLogicTimer, idleTimeoutCheckInterval(timeout)) + + class IdleBidiHandler[P](in: Inlet[P], out: Outlet[P]) extends InHandler with OutHandler { + override def onPush(): Unit = { + onActivity() + push(out, grab(in)) + } + override def onPull(): Unit = pull(in) + override def onUpstreamFinish(): Unit = complete(out) + override def onDownstreamFinish(): Unit = cancel(in) + } + } + override def toString = "IdleTimeoutBidi" - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - private var nextDeadline: Deadline = Deadline.now + timeout - - setHandler(in1, new InHandler { - override def onPush(): Unit = { - onActivity() - push(out1, grab(in1)) - } - override def onUpstreamFinish(): Unit = complete(out1) - }) - - setHandler(in2, new InHandler { - override def onPush(): Unit = { - onActivity() - push(out2, grab(in2)) - } - override def onUpstreamFinish(): Unit = complete(out2) - }) - - setHandler(out1, new OutHandler { - override def onPull(): Unit = pull(in1) - override def onDownstreamFinish(): Unit = cancel(in1) - }) - - setHandler(out2, new OutHandler { - override def onPull(): Unit = pull(in2) - override def onDownstreamFinish(): Unit = cancel(in2) - }) - - private def onActivity(): Unit = nextDeadline = Deadline.now + timeout - - final override def onTimer(key: Any): Unit = - if (nextDeadline.isOverdue()) - failStage(new TimeoutException(s"No elements passed in the last $timeout.")) - - override def preStart(): Unit = schedulePeriodically("IdleTimeoutCheckTimer", idleTimeoutCheckInterval(timeout)) - } } final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] { val in: Inlet[T] = Inlet("IdleInject.in") val out: Outlet[T] = Outlet("IdleInject.out") - override def initialAttributes = Attributes.name("DelayInitial") override val shape: FlowShape[T, T] = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - private val IdleTimer = "DelayTimer" + override def initialAttributes = DefaultAttributes.delayInitial - override def preStart(): Unit = { - if (delay == Duration.Zero) open = true - else scheduleOnce(IdleTimer, delay) - } + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private var open: Boolean = false - private var open: Boolean = false + setHandlers(in, out, this) + + override def preStart(): Unit = { + if (delay == Duration.Zero) open = true + else scheduleOnce(GraphStageLogicTimer, delay) + } - setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) - }) - setHandler(out, new OutHandler { override def onPull(): Unit = if (open) pull(in) - }) - override protected def onTimer(timerKey: Any): Unit = { - open = true - if (isAvailable(out)) pull(in) + override protected def onTimer(timerKey: Any): Unit = { + open = true + if (isAvailable(out)) pull(in) + } } - } + + override def toString = "DelayTimer" + } final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () ⇒ O) extends GraphStage[FlowShape[I, O]] { val in: Inlet[I] = Inlet("IdleInject.in") val out: Outlet[O] = Outlet("IdleInject.out") - override def initialAttributes = Attributes.name("IdleInject") + + override def initialAttributes = DefaultAttributes.idleInject + override val shape: FlowShape[I, O] = FlowShape(in, out) - override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - private val IdleTimer = "IdleTimer" - private var nextDeadline = Deadline.now + timeout + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new TimerGraphStageLogic(shape) with InHandler with OutHandler { + private var nextDeadline: Long = System.nanoTime + timeout.toNanos - // Prefetching to ensure priority of actual upstream elements - override def preStart(): Unit = pull(in) + setHandlers(in, out, this) + + // Prefetching to ensure priority of actual upstream elements + override def preStart(): Unit = pull(in) - setHandler(in, new InHandler { override def onPush(): Unit = { - nextDeadline = Deadline.now + timeout - cancelTimer(IdleTimer) + nextDeadline = System.nanoTime + timeout.toNanos + cancelTimer(GraphStageLogicTimer) if (isAvailable(out)) { push(out, grab(in)) pull(in) @@ -207,31 +237,33 @@ private[stream] object Timers { override def onUpstreamFinish(): Unit = { if (!isAvailable(in)) completeStage() } - }) - setHandler(out, new OutHandler { override def onPull(): Unit = { if (isAvailable(in)) { push(out, grab(in)) if (isClosed(in)) completeStage() else pull(in) } else { - if (nextDeadline.isOverdue()) { - nextDeadline = Deadline.now + timeout + val time = System.nanoTime + if (nextDeadline - time < 0) { + nextDeadline = time + timeout.toNanos push(out, inject()) - } else scheduleOnce(IdleTimer, nextDeadline.timeLeft) + } else scheduleOnce(GraphStageLogicTimer, FiniteDuration(nextDeadline - time, TimeUnit.NANOSECONDS)) } } - }) - override protected def onTimer(timerKey: Any): Unit = { - if (nextDeadline.isOverdue() && isAvailable(out)) { - push(out, inject()) - nextDeadline = Deadline.now + timeout + override protected def onTimer(timerKey: Any): Unit = { + val time = System.nanoTime + if ((nextDeadline - time < 0) && isAvailable(out)) { + push(out, inject()) + nextDeadline = time + timeout.toNanos + } } } - } + + override def toString = "IdleTimer" } + case object GraphStageLogicTimer } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index fe3ce1dded..32524be0f5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -6,7 +6,7 @@ package akka.stream.javadsl import akka.{ NotUsed, Done } import akka.event.LoggingAdapter import akka.japi.{ function, Pair } -import akka.stream.impl.{ConstantFun, StreamLayout} +import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream._ import akka.stream.stage.Stage import org.reactivestreams.Processor @@ -1594,8 +1594,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.completionTimeout(timeout)) /** - * If the time between two processed elements exceed the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element * @@ -1609,7 +1610,23 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.idleTimeout(timeout)) /** - * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.backpressureTimeout(timeout)) + + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. * * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements @@ -1726,9 +1743,9 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * Delays the initial element by the specified duration. * - * '''Emits when''' upstream emits an element if the initial delay already elapsed + * '''Emits when''' upstream emits an element if the initial delay is already elapsed * - * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed * * '''Completes when''' upstream completes * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 9d2a650a61..bfcee5976a 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -1809,8 +1809,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.completionTimeout(timeout)) /** - * If the time between two processed elements exceed the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element * @@ -1824,7 +1825,23 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.idleTimeout(timeout)) /** - * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.backpressureTimeout(timeout)) + + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. * * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements @@ -1941,9 +1958,9 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap /** * Delays the initial element by the specified duration. * - * '''Emits when''' upstream emits an element if the initial delay already elapsed + * '''Emits when''' upstream emits an element if the initial delay is already elapsed * - * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed * * '''Completes when''' upstream completes * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 76efdbc8a7..444a6c5a04 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -1100,8 +1100,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo new SubFlow(delegate.completionTimeout(timeout)) /** - * If the time between two processed elements exceed the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element * @@ -1115,7 +1116,23 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo new SubFlow(delegate.idleTimeout(timeout)) /** - * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: FiniteDuration): SubFlow[In, Out, Mat] = + new SubFlow(delegate.backpressureTimeout(timeout)) + + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. * * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements @@ -1214,9 +1231,9 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo /** * Delays the initial element by the specified duration. * - * '''Emits when''' upstream emits an element if the initial delay already elapsed + * '''Emits when''' upstream emits an element if the initial delay is already elapsed * - * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed * * '''Completes when''' upstream completes * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index fcd153e2a0..5eaf3958b1 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -1097,8 +1097,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source new SubSource(delegate.completionTimeout(timeout)) /** - * If the time between two processed elements exceed the provided timeout, the stream is failed - * with a [[java.util.concurrent.TimeoutException]]. + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element * @@ -1112,7 +1113,23 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source new SubSource(delegate.idleTimeout(timeout)) /** - * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[java.util.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: FiniteDuration): SubSource[Out, Mat] = + new SubSource(delegate.backpressureTimeout(timeout)) + + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. * * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements @@ -1204,9 +1221,9 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source /** * Delays the initial element by the specified duration. * - * '''Emits when''' upstream emits an element if the initial delay already elapsed + * '''Emits when''' upstream emits an element if the initial delay is already elapsed * - * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed * * '''Completes when''' upstream completes * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index bb9941d036..5d034e905d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -1438,8 +1438,9 @@ trait FlowOps[+Out, +Mat] { def completionTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.Completion[Out](timeout)) /** - * If the time between two processed elements exceed the provided timeout, the stream is failed - * with a [[scala.concurrent.TimeoutException]]. + * If the time between two processed elements exceeds the provided timeout, the stream is failed + * with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). * * '''Emits when''' upstream emits an element * @@ -1452,7 +1453,22 @@ trait FlowOps[+Out, +Mat] { def idleTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.Idle[Out](timeout)) /** - * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * If the time between the emission of an element and the following downstream demand exceeds the provided timeout, + * the stream is failed with a [[scala.concurrent.TimeoutException]]. The timeout is checked periodically, + * so the resolution of the check is one period (equals to timeout value). + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between element emission and downstream demand. + * + * '''Cancels when''' downstream cancels + */ + def backpressureTimeout(timeout: FiniteDuration): Repr[Out] = via(new Timers.BackpressureTimeout[Out](timeout)) + + /** + * Injects additional elements if upstream does not emit for a configured amount of time. In other words, this * stage attempts to maintains a base rate of emitted elements towards the downstream. * * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements @@ -1551,9 +1567,9 @@ trait FlowOps[+Out, +Mat] { /** * Delays the initial element by the specified duration. * - * '''Emits when''' upstream emits an element if the initial delay already elapsed + * '''Emits when''' upstream emits an element if the initial delay is already elapsed * - * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * '''Backpressures when''' downstream backpressures or initial delay is not yet elapsed * * '''Completes when''' upstream completes * diff --git a/project/MiMa.scala b/project/MiMa.scala index dbfadd075a..083620d8d1 100644 --- a/project/MiMa.scala +++ b/project/MiMa.scala @@ -777,7 +777,10 @@ object MiMa extends AutoPlugin { // #19225 - GraphStage and removal of isTerminated ProblemFilters.exclude[DirectMissingMethodProblem]("akka.http.impl.engine.parsing.HttpMessageParser.isTerminated"), - ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.engine.parsing.HttpMessageParser.stage") + ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.http.impl.engine.parsing.HttpMessageParser.stage"), + + // #20131 - flow combinator + ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.backpressureTimeout") ) ) }