Merge pull request #18833 from drewhk/wip-18735-idleInject-drewhk

+str 18735: Added keepalive inject and initial delay ops
This commit is contained in:
drewhk 2015-11-03 11:50:45 +01:00
commit aa339e41ec
12 changed files with 595 additions and 71 deletions

View file

@ -379,8 +379,6 @@ Injecting keep-alive messages into a stream of ByteStrings
**Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages
but only if this does not interfere with normal traffic.
All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words,
whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the
preferred upstream effectively giving it an absolute priority.
There is a built-in operation that allows to do this directly:
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeKeepAlive.java#inject-keepalive

View file

@ -10,68 +10,16 @@ class RecipeKeepAlive extends RecipeSpec {
"Recipe for injecting keepalive messages" must {
"work" in {
type Tick = Unit
val tickPub = TestPublisher.probe[Tick]()
val dataPub = TestPublisher.probe[ByteString]()
val sub = TestSubscriber.manualProbe[ByteString]()
val ticks = Source(tickPub)
val dataStream = Source(dataPub)
val keepaliveMessage = ByteString(11)
val sink = Sink(sub)
//#inject-keepalive
val tickToKeepAlivePacket: Flow[Tick, ByteString, Unit] = Flow[Tick]
.conflate(seed = (tick) => keepaliveMessage)((msg, newTick) => msg)
val graph = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import FlowGraph.Implicits._
val unfairMerge = builder.add(MergePreferred[ByteString](1))
// If data is available then no keepalive is injected
dataStream ~> unfairMerge.preferred
ticks ~> tickToKeepAlivePacket ~> unfairMerge ~> sink
ClosedShape
})
import scala.concurrent.duration._
val injectKeepAlive: Flow[ByteString, ByteString, Unit] =
Flow[ByteString].keepAlive(1.second, () => keepaliveMessage)
//#inject-keepalive
graph.run()
val subscription = sub.expectSubscription()
// FIXME RK: remove (because I think this cannot deterministically be tested and it might also not do what it should anymore)
tickPub.sendNext(())
// pending data will overcome the keepalive
dataPub.sendNext(ByteString(1))
dataPub.sendNext(ByteString(2))
dataPub.sendNext(ByteString(3))
subscription.request(1)
sub.expectNext(ByteString(1))
subscription.request(2)
sub.expectNext(ByteString(2))
// This still gets through because there is some intrinsic fairness caused by the FIFO queue in the interpreter
// Expecting here a preferred element also only worked true accident with the old Pump.
sub.expectNext(keepaliveMessage)
subscription.request(1)
sub.expectNext(ByteString(3))
subscription.request(1)
tickPub.sendNext(())
sub.expectNext(keepaliveMessage)
dataPub.sendComplete()
tickPub.sendComplete()
sub.expectComplete()
}
// No need to test, this is a built-in stage with proper tests
}
}
}

View file

@ -368,8 +368,6 @@ Injecting keep-alive messages into a stream of ByteStrings
**Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages
but only if this does not interfere with normal traffic.
All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words,
whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the
preferred upstream effectively giving it an absolute priority.
There is a built-in operation that allows to do this directly:
.. includecode:: code/docs/stream/cookbook/RecipeKeepAlive.scala#inject-keepalive

View file

@ -27,9 +27,11 @@ import scala.runtime.BoxedUnit;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class FlowTest extends StreamTest {
@ -718,4 +720,69 @@ public class FlowTest extends StreamTest {
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
}
@Test
public void mustBeAbleToUseInitialTimeout() throws Exception {
try {
Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class).initialTimeout(Duration.create(1, "second")))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseCompletionTimeout() throws Exception {
try {
Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class).completionTimeout(Duration.create(1, "second")))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseIdleTimeout() throws Exception {
try {
Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class).idleTimeout(Duration.create(1, "second")))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseKeepAlive() throws Exception {
Integer result = Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class)
.keepAlive(Duration.create(1, "second"), new Creator<Integer>() {
public Integer create() {
return 0;
}
})
)
.takeWithin(Duration.create(1500, "milliseconds"))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
assertEquals((Object) 0, result);
}
}

View file

@ -31,10 +31,12 @@ import scala.util.Try;
import java.util.*;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static akka.stream.testkit.TestPublisher.ManualProbe;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@SuppressWarnings("serial")
public class SourceTest extends StreamTest {
@ -653,4 +655,62 @@ public class SourceTest extends StreamTest {
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
}
@Test
public void mustBeAbleToUseInitialTimeout() throws Exception {
try {
Await.result(
Source.maybe().initialTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseCompletionTimeout() throws Exception {
try {
Await.result(
Source.maybe().completionTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseIdleTimeout() throws Exception {
try {
Await.result(
Source.maybe().idleTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseIdleInject() throws Exception {
Integer result = Await.result(
Source.maybe()
.keepAlive(Duration.create(1, "second"), new Creator<Integer>() {
public Integer create() {
return 0;
}
})
.takeWithin(Duration.create(1500, "milliseconds"))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
assertEquals((Object) 0, result);
}
}

View file

@ -0,0 +1,149 @@
package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ TestSubscriber, TestPublisher, Utils, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowIdleInjectSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
"keepAlive" must {
"not emit additional elements if upstream is fast enough" in Utils.assertAllStagesStopped {
Await.result(
Source(1 to 10).keepAlive(1.second, () 0).grouped(1000).runWith(Sink.head),
3.seconds) should ===(1 to 10)
}
"emit elements periodically after silent periods" in Utils.assertAllStagesStopped {
val sourceWithIdleGap = Source(1 to 5) ++ Source(6 to 10).initialDelay(2.second)
val result = Await.result(
sourceWithIdleGap.keepAlive(0.6.seconds, () 0).grouped(1000).runWith(Sink.head),
3.seconds) should ===(List(1, 2, 3, 4, 5, 0, 0, 0, 6, 7, 8, 9, 10))
}
"immediately pull upstream" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"immediately pull upstream after busy period" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
(Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(10)
downstream.expectNextN(1 to 10)
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"work if timer fires before initial request" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.ensureSubscription()
downstream.expectNoMsg(1.5.second)
downstream.request(1)
downstream.expectNext(0)
upstream.sendComplete()
downstream.expectComplete()
}
"work if timer fires before initial request after busy period" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
(Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(10)
downstream.expectNextN(1 to 10)
downstream.expectNoMsg(1.5.second)
downstream.request(1)
downstream.expectNext(0)
upstream.sendComplete()
downstream.expectComplete()
}
"prefer upstream element over injected" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.ensureSubscription()
downstream.expectNoMsg(1.5.second)
upstream.sendNext(1)
downstream.expectNoMsg(0.5.second)
downstream.request(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"prefer upstream element over injected after busy period" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
(Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(10)
downstream.expectNextN(1 to 10)
downstream.expectNoMsg(1.5.second)
upstream.sendNext(1)
downstream.expectNoMsg(0.5.second)
downstream.request(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"reset deadline properly after injected element" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(2)
downstream.expectNoMsg(0.5.second)
downstream.expectNext(0)
downstream.expectNoMsg(0.5 second)
downstream.expectNext(0)
}
}
}

View file

@ -0,0 +1,52 @@
package akka.stream.scaladsl
import java.util.concurrent.TimeoutException
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ Utils, TestSubscriber, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowInitialDelaySpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
"Flow initialDelay" must {
"work with zero delay" in Utils.assertAllStagesStopped {
Await.result(
Source(1 to 10).initialDelay(Duration.Zero).grouped(100).runWith(Sink.head),
1.second) should ===(1 to 10)
}
"delay elements by the specified time but not more" in Utils.assertAllStagesStopped {
a[TimeoutException] shouldBe thrownBy {
Await.result(
Source(1 to 10).initialDelay(2.seconds).initialTimeout(1.second).runWith(Sink.ignore),
2.seconds)
}
Await.ready(
Source(1 to 10).initialDelay(1.seconds).initialTimeout(2.second).runWith(Sink.ignore),
2.seconds)
}
"properly ignore timer while backpressured" in Utils.assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
Source(1 to 10).initialDelay(0.5.second).runWith(Sink(probe))
probe.ensureSubscription()
probe.expectNoMsg(1.5.second)
probe.request(20)
probe.expectNextN(1 to 10)
probe.expectComplete()
}
}
}

View file

@ -3,10 +3,10 @@ package akka.stream.impl
import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ BidiShape, Inlet, Outlet, Attributes }
import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }
import scala.concurrent.duration.{ Deadline, FiniteDuration }
import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration }
/**
* INTERNAL API
@ -19,7 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration }
* - if the timer fires before the event happens, these stages all fail the stream
* - otherwise, these streams do not interfere with the element flow, ordinary completion or failure
*/
private[stream] object Timeouts {
private[stream] object Timers {
private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._
FiniteDuration(
@ -145,4 +145,81 @@ private[stream] object Timeouts {
}
}
final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in: Inlet[T] = Inlet("IdleInject.in")
val out: Outlet[T] = Outlet("IdleInject.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private val IdleTimer = "DelayTimer"
override def preStart(): Unit = {
if (delay == Duration.Zero) open = true
else scheduleOnce(IdleTimer, delay)
}
private var open: Boolean = false
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
})
setHandler(out, new OutHandler {
override def onPull(): Unit = if (open) pull(in)
})
override protected def onTimer(timerKey: Any): Unit = {
open = true
if (isAvailable(out)) pull(in)
}
}
}
final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () O) extends GraphStage[FlowShape[I, O]] {
val in: Inlet[I] = Inlet("IdleInject.in")
val out: Outlet[O] = Outlet("IdleInject.out")
override val shape: FlowShape[I, O] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private val IdleTimer = "IdleTimer"
private var nextDeadline = Deadline.now + timeout
// Prefetching to ensure priority of actual upstream elements
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
nextDeadline = Deadline.now + timeout
cancelTimer(IdleTimer)
if (isAvailable(out)) {
push(out, grab(in))
pull(in)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isAvailable(in)) {
push(out, grab(in))
pull(in)
} else {
if (nextDeadline.isOverdue()) {
nextDeadline = Deadline.now + timeout
push(out, inject())
} else scheduleOnce(IdleTimer, nextDeadline.timeLeft)
}
}
})
override protected def onTimer(timerKey: Any): Unit = {
if (nextDeadline.isOverdue() && isAvailable(out)) {
push(out, inject())
nextDeadline = Deadline.now + timeout
}
}
}
}
}

View file

@ -5,6 +5,7 @@ package akka.stream.javadsl
import akka.event.LoggingAdapter
import akka.japi.{ function, Pair }
import akka.stream.impl.Timers.{ DelayInitial, IdleInject }
import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream.{ scaladsl, _ }
import akka.stream.stage.Stage
@ -998,6 +999,14 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialTimeout(timeout))
@ -1005,6 +1014,14 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.completionTimeout(timeout))
@ -1012,10 +1029,52 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.idleTimeout(timeout))
/**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] =
new Flow(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialDelay(delay))
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr))

View file

@ -849,6 +849,14 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialTimeout(timeout))
@ -856,6 +864,14 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.completionTimeout(timeout))
@ -863,10 +879,52 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.idleTimeout(timeout))
/**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] =
new Source(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialDelay(delay))
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr))

View file

@ -5,7 +5,7 @@ package akka.stream.scaladsl
import akka.stream._
import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.Timeouts
import akka.stream.impl.Timers
import scala.concurrent.duration.FiniteDuration
@ -207,5 +207,5 @@ object BidiFlow {
* the *joint* frequencies of the elements in both directions.
*/
def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] =
fromGraph(new Timeouts.IdleBidi(timeout))
fromGraph(new Timers.IdleBidi(timeout))
}

View file

@ -9,8 +9,9 @@ import akka.stream._
import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.Timers
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered }
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timeouts }
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers }
import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage }
import akka.stream.stage._
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -1016,20 +1017,77 @@ trait FlowOps[+Out, +Mat] {
/**
* If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/
def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Initial[Out](timeout))
def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Initial[Out](timeout))
/**
* If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/
def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Completion[Out](timeout))
def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Completion[Out](timeout))
/**
* If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/
def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Idle[Out](timeout))
def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Idle[Out](timeout))
/**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () U): Repr[U, Mat] =
via(new Timers.IdleInject[Out, U](maxIdle, injectedElem))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: FiniteDuration): Repr[Out, Mat] = via(new Timers.DelayInitial[Out](delay))
/**
* Logs elements flowing through the stream as well as completion and erroring.