diff --git a/akka-docs-dev/rst/java/stream-cookbook.rst b/akka-docs-dev/rst/java/stream-cookbook.rst index 14151d9fda..9de4be0abf 100644 --- a/akka-docs-dev/rst/java/stream-cookbook.rst +++ b/akka-docs-dev/rst/java/stream-cookbook.rst @@ -379,8 +379,6 @@ Injecting keep-alive messages into a stream of ByteStrings **Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages but only if this does not interfere with normal traffic. -All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words, -whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the -preferred upstream effectively giving it an absolute priority. +There is a built-in operation that allows to do this directly: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeKeepAlive.java#inject-keepalive diff --git a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala index c7ac25835e..9a205b977a 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/cookbook/RecipeKeepAlive.scala @@ -10,68 +10,16 @@ class RecipeKeepAlive extends RecipeSpec { "Recipe for injecting keepalive messages" must { "work" in { - - type Tick = Unit - - val tickPub = TestPublisher.probe[Tick]() - val dataPub = TestPublisher.probe[ByteString]() - val sub = TestSubscriber.manualProbe[ByteString]() - val ticks = Source(tickPub) - - val dataStream = Source(dataPub) val keepaliveMessage = ByteString(11) - val sink = Sink(sub) //#inject-keepalive - val tickToKeepAlivePacket: Flow[Tick, ByteString, Unit] = Flow[Tick] - .conflate(seed = (tick) => keepaliveMessage)((msg, newTick) => msg) - - val graph = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder => - import FlowGraph.Implicits._ - val unfairMerge = builder.add(MergePreferred[ByteString](1)) - - // If data is available then no keepalive is injected - dataStream ~> unfairMerge.preferred - ticks ~> tickToKeepAlivePacket ~> unfairMerge ~> sink - ClosedShape - }) + import scala.concurrent.duration._ + val injectKeepAlive: Flow[ByteString, ByteString, Unit] = + Flow[ByteString].keepAlive(1.second, () => keepaliveMessage) //#inject-keepalive - graph.run() - - val subscription = sub.expectSubscription() - - // FIXME RK: remove (because I think this cannot deterministically be tested and it might also not do what it should anymore) - - tickPub.sendNext(()) - - // pending data will overcome the keepalive - dataPub.sendNext(ByteString(1)) - dataPub.sendNext(ByteString(2)) - dataPub.sendNext(ByteString(3)) - - subscription.request(1) - sub.expectNext(ByteString(1)) - subscription.request(2) - sub.expectNext(ByteString(2)) - // This still gets through because there is some intrinsic fairness caused by the FIFO queue in the interpreter - // Expecting here a preferred element also only worked true accident with the old Pump. - sub.expectNext(keepaliveMessage) - - subscription.request(1) - sub.expectNext(ByteString(3)) - - subscription.request(1) - tickPub.sendNext(()) - sub.expectNext(keepaliveMessage) - - dataPub.sendComplete() - tickPub.sendComplete() - - sub.expectComplete() - + // No need to test, this is a built-in stage with proper tests } - } } diff --git a/akka-docs-dev/rst/scala/stream-cookbook.rst b/akka-docs-dev/rst/scala/stream-cookbook.rst index e863806115..f8c5ec5115 100644 --- a/akka-docs-dev/rst/scala/stream-cookbook.rst +++ b/akka-docs-dev/rst/scala/stream-cookbook.rst @@ -368,8 +368,6 @@ Injecting keep-alive messages into a stream of ByteStrings **Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages but only if this does not interfere with normal traffic. -All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words, -whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the -preferred upstream effectively giving it an absolute priority. +There is a built-in operation that allows to do this directly: .. includecode:: code/docs/stream/cookbook/RecipeKeepAlive.scala#inject-keepalive diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 4ad1b7cffc..b309b5c56e 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -27,9 +27,11 @@ import scala.runtime.BoxedUnit; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @SuppressWarnings("serial") public class FlowTest extends StreamTest { @@ -718,4 +720,69 @@ public class FlowTest extends StreamTest { probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); } + @Test + public void mustBeAbleToUseInitialTimeout() throws Exception { + try { + Await.result( + Source.maybe() + .via(Flow.of(Integer.class).initialTimeout(Duration.create(1, "second"))) + .runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + fail("A TimeoutException was expected"); + } catch(TimeoutException e) { + // expected + } + } + + + @Test + public void mustBeAbleToUseCompletionTimeout() throws Exception { + try { + Await.result( + Source.maybe() + .via(Flow.of(Integer.class).completionTimeout(Duration.create(1, "second"))) + .runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + fail("A TimeoutException was expected"); + } catch(TimeoutException e) { + // expected + } + } + + @Test + public void mustBeAbleToUseIdleTimeout() throws Exception { + try { + Await.result( + Source.maybe() + .via(Flow.of(Integer.class).idleTimeout(Duration.create(1, "second"))) + .runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + fail("A TimeoutException was expected"); + } catch(TimeoutException e) { + // expected + } + } + + @Test + public void mustBeAbleToUseKeepAlive() throws Exception { + Integer result = Await.result( + Source.maybe() + .via(Flow.of(Integer.class) + .keepAlive(Duration.create(1, "second"), new Creator() { + public Integer create() { + return 0; + } + }) + ) + .takeWithin(Duration.create(1500, "milliseconds")) + .runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + + assertEquals((Object) 0, result); + } + } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 80b3c301a3..2f19c93dd5 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -31,10 +31,12 @@ import scala.util.Try; import java.util.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.TestPublisher.ManualProbe; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; @SuppressWarnings("serial") public class SourceTest extends StreamTest { @@ -653,4 +655,62 @@ public class SourceTest extends StreamTest { probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); } + + @Test + public void mustBeAbleToUseInitialTimeout() throws Exception { + try { + Await.result( + Source.maybe().initialTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + fail("A TimeoutException was expected"); + } catch(TimeoutException e) { + // expected + } + } + + + @Test + public void mustBeAbleToUseCompletionTimeout() throws Exception { + try { + Await.result( + Source.maybe().completionTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + fail("A TimeoutException was expected"); + } catch(TimeoutException e) { + // expected + } + } + + @Test + public void mustBeAbleToUseIdleTimeout() throws Exception { + try { + Await.result( + Source.maybe().idleTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + fail("A TimeoutException was expected"); + } catch(TimeoutException e) { + // expected + } + } + + @Test + public void mustBeAbleToUseIdleInject() throws Exception { + Integer result = Await.result( + Source.maybe() + .keepAlive(Duration.create(1, "second"), new Creator() { + public Integer create() { + return 0; + } + }) + .takeWithin(Duration.create(1500, "milliseconds")) + .runWith(Sink.head(), materializer), + Duration.create(3, "second") + ); + + assertEquals((Object) 0, result); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala new file mode 100644 index 0000000000..968e3e8271 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowIdleInjectSpec.scala @@ -0,0 +1,149 @@ +package akka.stream.scaladsl + +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.{ TestSubscriber, TestPublisher, Utils, AkkaSpec } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowIdleInjectSpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val materializer = ActorMaterializer(settings) + + "keepAlive" must { + + "not emit additional elements if upstream is fast enough" in Utils.assertAllStagesStopped { + Await.result( + Source(1 to 10).keepAlive(1.second, () ⇒ 0).grouped(1000).runWith(Sink.head), + 3.seconds) should ===(1 to 10) + } + + "emit elements periodically after silent periods" in Utils.assertAllStagesStopped { + val sourceWithIdleGap = Source(1 to 5) ++ Source(6 to 10).initialDelay(2.second) + + val result = Await.result( + sourceWithIdleGap.keepAlive(0.6.seconds, () ⇒ 0).grouped(1000).runWith(Sink.head), + 3.seconds) should ===(List(1, 2, 3, 4, 5, 0, 0, 0, 6, 7, 8, 9, 10)) + } + + "immediately pull upstream" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.request(1) + + upstream.sendNext(1) + downstream.expectNext(1) + + upstream.sendComplete() + downstream.expectComplete() + } + + "immediately pull upstream after busy period" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + (Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.request(10) + downstream.expectNextN(1 to 10) + + downstream.request(1) + + upstream.sendNext(1) + downstream.expectNext(1) + + upstream.sendComplete() + downstream.expectComplete() + } + + "work if timer fires before initial request" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.ensureSubscription() + downstream.expectNoMsg(1.5.second) + downstream.request(1) + downstream.expectNext(0) + + upstream.sendComplete() + downstream.expectComplete() + } + + "work if timer fires before initial request after busy period" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + (Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.request(10) + downstream.expectNextN(1 to 10) + + downstream.expectNoMsg(1.5.second) + downstream.request(1) + downstream.expectNext(0) + + upstream.sendComplete() + downstream.expectComplete() + } + + "prefer upstream element over injected" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.ensureSubscription() + downstream.expectNoMsg(1.5.second) + upstream.sendNext(1) + downstream.expectNoMsg(0.5.second) + downstream.request(1) + downstream.expectNext(1) + + upstream.sendComplete() + downstream.expectComplete() + } + + "prefer upstream element over injected after busy period" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + (Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.request(10) + downstream.expectNextN(1 to 10) + + downstream.expectNoMsg(1.5.second) + upstream.sendNext(1) + downstream.expectNoMsg(0.5.second) + downstream.request(1) + downstream.expectNext(1) + + upstream.sendComplete() + downstream.expectComplete() + } + + "reset deadline properly after injected element" in { + val upstream = TestPublisher.probe[Int]() + val downstream = TestSubscriber.probe[Int]() + + Source(upstream).keepAlive(1.second, () ⇒ 0).runWith(Sink(downstream)) + + downstream.request(2) + downstream.expectNoMsg(0.5.second) + downstream.expectNext(0) + + downstream.expectNoMsg(0.5 second) + downstream.expectNext(0) + } + + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala new file mode 100644 index 0000000000..c6b278c322 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowInitialDelaySpec.scala @@ -0,0 +1,52 @@ +package akka.stream.scaladsl + +import java.util.concurrent.TimeoutException + +import akka.stream.{ ActorMaterializer, ActorMaterializerSettings } +import akka.stream.testkit.{ Utils, TestSubscriber, AkkaSpec } + +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowInitialDelaySpec extends AkkaSpec { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + + implicit val materializer = ActorMaterializer(settings) + + "Flow initialDelay" must { + + "work with zero delay" in Utils.assertAllStagesStopped { + Await.result( + Source(1 to 10).initialDelay(Duration.Zero).grouped(100).runWith(Sink.head), + 1.second) should ===(1 to 10) + } + + "delay elements by the specified time but not more" in Utils.assertAllStagesStopped { + a[TimeoutException] shouldBe thrownBy { + Await.result( + Source(1 to 10).initialDelay(2.seconds).initialTimeout(1.second).runWith(Sink.ignore), + 2.seconds) + } + + Await.ready( + Source(1 to 10).initialDelay(1.seconds).initialTimeout(2.second).runWith(Sink.ignore), + 2.seconds) + } + + "properly ignore timer while backpressured" in Utils.assertAllStagesStopped { + val probe = TestSubscriber.probe[Int]() + Source(1 to 10).initialDelay(0.5.second).runWith(Sink(probe)) + + probe.ensureSubscription() + probe.expectNoMsg(1.5.second) + probe.request(20) + probe.expectNextN(1 to 10) + + probe.expectComplete() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Timeouts.scala b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala similarity index 66% rename from akka-stream/src/main/scala/akka/stream/impl/Timeouts.scala rename to akka-stream/src/main/scala/akka/stream/impl/Timers.scala index 7744619ff6..aa0d4160f7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Timeouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Timers.scala @@ -3,10 +3,10 @@ package akka.stream.impl import java.util.concurrent.{ TimeUnit, TimeoutException } import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.{ BidiShape, Inlet, Outlet, Attributes } +import akka.stream._ import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } -import scala.concurrent.duration.{ Deadline, FiniteDuration } +import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration } /** * INTERNAL API @@ -19,7 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration } * - if the timer fires before the event happens, these stages all fail the stream * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure */ -private[stream] object Timeouts { +private[stream] object Timers { private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { import scala.concurrent.duration._ FiniteDuration( @@ -145,4 +145,81 @@ private[stream] object Timeouts { } } + final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] { + val in: Inlet[T] = Inlet("IdleInject.in") + val out: Outlet[T] = Outlet("IdleInject.out") + override val shape: FlowShape[T, T] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + private val IdleTimer = "DelayTimer" + + override def preStart(): Unit = { + if (delay == Duration.Zero) open = true + else scheduleOnce(IdleTimer, delay) + } + + private var open: Boolean = false + + setHandler(in, new InHandler { + override def onPush(): Unit = push(out, grab(in)) + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = if (open) pull(in) + }) + + override protected def onTimer(timerKey: Any): Unit = { + open = true + if (isAvailable(out)) pull(in) + } + } + } + + final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () ⇒ O) extends GraphStage[FlowShape[I, O]] { + val in: Inlet[I] = Inlet("IdleInject.in") + val out: Outlet[O] = Outlet("IdleInject.out") + override val shape: FlowShape[I, O] = FlowShape(in, out) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + private val IdleTimer = "IdleTimer" + private var nextDeadline = Deadline.now + timeout + + // Prefetching to ensure priority of actual upstream elements + override def preStart(): Unit = pull(in) + + setHandler(in, new InHandler { + override def onPush(): Unit = { + nextDeadline = Deadline.now + timeout + cancelTimer(IdleTimer) + if (isAvailable(out)) { + push(out, grab(in)) + pull(in) + } + } + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (isAvailable(in)) { + push(out, grab(in)) + pull(in) + } else { + if (nextDeadline.isOverdue()) { + nextDeadline = Deadline.now + timeout + push(out, inject()) + } else scheduleOnce(IdleTimer, nextDeadline.timeLeft) + } + } + }) + + override protected def onTimer(timerKey: Any): Unit = { + if (nextDeadline.isOverdue() && isAvailable(out)) { + push(out, inject()) + nextDeadline = Deadline.now + timeout + } + } + } + + } + } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 701d05e91b..513ceb285c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -5,6 +5,7 @@ package akka.stream.javadsl import akka.event.LoggingAdapter import akka.japi.{ function, Pair } +import akka.stream.impl.Timers.{ DelayInitial, IdleInject } import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.{ scaladsl, _ } import akka.stream.stage.Stage @@ -998,6 +999,14 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * If the first element has not passed through this stage before the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels */ def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.initialTimeout(timeout)) @@ -1005,6 +1014,14 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels */ def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.completionTimeout(timeout)) @@ -1012,10 +1029,52 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends /** * If the time between two processed elements exceed the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels */ def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = new Flow(delegate.idleTimeout(timeout)) + /** + * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] = + new Flow(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.initialDelay(delay)) + override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = new Flow(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 924bcbd799..68d96219fa 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -849,6 +849,14 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap /** * If the first element has not passed through this stage before the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels */ def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.initialTimeout(timeout)) @@ -856,6 +864,14 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels */ def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.completionTimeout(timeout)) @@ -863,10 +879,52 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap /** * If the time between two processed elements exceed the provided timeout, the stream is failed * with a [[java.util.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels */ def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = new Source(delegate.idleTimeout(timeout)) + /** + * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] = + new Source(delegate.keepAlive(maxIdle, () ⇒ injectedElem.create())) + + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.initialDelay(delay)) + override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = new Source(delegate.withAttributes(attr)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala index beaea9a013..b3c165ee1d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/BidiFlow.scala @@ -5,7 +5,7 @@ package akka.stream.scaladsl import akka.stream._ import akka.stream.impl.StreamLayout.Module -import akka.stream.impl.Timeouts +import akka.stream.impl.Timers import scala.concurrent.duration.FiniteDuration @@ -207,5 +207,5 @@ object BidiFlow { * the *joint* frequencies of the elements in both directions. */ def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] = - fromGraph(new Timeouts.IdleBidi(timeout)) + fromGraph(new Timers.IdleBidi(timeout)) } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 9fe04c863e..d9eaff2d14 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,8 +9,9 @@ import akka.stream._ import akka.stream.impl.SplitDecision._ import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } +import akka.stream.impl.Timers import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } -import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timeouts } +import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -1016,20 +1017,77 @@ trait FlowOps[+Out, +Mat] { /** * If the first element has not passed through this stage before the provided timeout, the stream is failed * with a [[scala.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before first element arrives + * + * '''Cancels when''' downstream cancels */ - def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Initial[Out](timeout)) + def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Initial[Out](timeout)) /** * If the completion of the stream does not happen until the provided timeout, the stream is failed * with a [[scala.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses before upstream completes + * + * '''Cancels when''' downstream cancels */ - def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Completion[Out](timeout)) + def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Completion[Out](timeout)) /** * If the time between two processed elements exceed the provided timeout, the stream is failed * with a [[scala.concurrent.TimeoutException]]. + * + * '''Emits when''' upstream emits an element + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements + * + * '''Cancels when''' downstream cancels */ - def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Idle[Out](timeout)) + def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Idle[Out](timeout)) + + /** + * Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this + * stage attempts to maintains a base rate of emitted elements towards the downstream. + * + * If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements + * do not accumulate during this period. + * + * Upstream elements are always preferred over injected elements. + * + * '''Emits when''' upstream emits an element or if the upstream was idle for the configured period + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () ⇒ U): Repr[U, Mat] = + via(new Timers.IdleInject[Out, U](maxIdle, injectedElem)) + + /** + * Delays the initial element by the specified duration. + * + * '''Emits when''' upstream emits an element if the initial delay already elapsed + * + * '''Backpressures when''' downstream backpressures or initial delay not yet elapsed + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + */ + def initialDelay(delay: FiniteDuration): Repr[Out, Mat] = via(new Timers.DelayInitial[Out](delay)) /** * Logs elements flowing through the stream as well as completion and erroring.