+str 18735: Added keepalive inject and initial delay ops

Also, improved documentation of timeout operations
Added missing Java DSL smoke tests
This commit is contained in:
Endre Sándor Varga 2015-11-02 15:30:10 +01:00
parent fb3dd99eb3
commit 8e62c0d9d7
12 changed files with 595 additions and 71 deletions

View file

@ -379,8 +379,6 @@ Injecting keep-alive messages into a stream of ByteStrings
**Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages **Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages
but only if this does not interfere with normal traffic. but only if this does not interfere with normal traffic.
All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words, There is a built-in operation that allows to do this directly:
whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the
preferred upstream effectively giving it an absolute priority.
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeKeepAlive.java#inject-keepalive .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/cookbook/RecipeKeepAlive.java#inject-keepalive

View file

@ -10,68 +10,16 @@ class RecipeKeepAlive extends RecipeSpec {
"Recipe for injecting keepalive messages" must { "Recipe for injecting keepalive messages" must {
"work" in { "work" in {
type Tick = Unit
val tickPub = TestPublisher.probe[Tick]()
val dataPub = TestPublisher.probe[ByteString]()
val sub = TestSubscriber.manualProbe[ByteString]()
val ticks = Source(tickPub)
val dataStream = Source(dataPub)
val keepaliveMessage = ByteString(11) val keepaliveMessage = ByteString(11)
val sink = Sink(sub)
//#inject-keepalive //#inject-keepalive
val tickToKeepAlivePacket: Flow[Tick, ByteString, Unit] = Flow[Tick] import scala.concurrent.duration._
.conflate(seed = (tick) => keepaliveMessage)((msg, newTick) => msg) val injectKeepAlive: Flow[ByteString, ByteString, Unit] =
Flow[ByteString].keepAlive(1.second, () => keepaliveMessage)
val graph = RunnableGraph.fromGraph(FlowGraph.create() { implicit builder =>
import FlowGraph.Implicits._
val unfairMerge = builder.add(MergePreferred[ByteString](1))
// If data is available then no keepalive is injected
dataStream ~> unfairMerge.preferred
ticks ~> tickToKeepAlivePacket ~> unfairMerge ~> sink
ClosedShape
})
//#inject-keepalive //#inject-keepalive
graph.run() // No need to test, this is a built-in stage with proper tests
val subscription = sub.expectSubscription()
// FIXME RK: remove (because I think this cannot deterministically be tested and it might also not do what it should anymore)
tickPub.sendNext(())
// pending data will overcome the keepalive
dataPub.sendNext(ByteString(1))
dataPub.sendNext(ByteString(2))
dataPub.sendNext(ByteString(3))
subscription.request(1)
sub.expectNext(ByteString(1))
subscription.request(2)
sub.expectNext(ByteString(2))
// This still gets through because there is some intrinsic fairness caused by the FIFO queue in the interpreter
// Expecting here a preferred element also only worked true accident with the old Pump.
sub.expectNext(keepaliveMessage)
subscription.request(1)
sub.expectNext(ByteString(3))
subscription.request(1)
tickPub.sendNext(())
sub.expectNext(keepaliveMessage)
dataPub.sendComplete()
tickPub.sendComplete()
sub.expectComplete()
} }
} }
} }

View file

@ -368,8 +368,6 @@ Injecting keep-alive messages into a stream of ByteStrings
**Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages **Situation:** Given a communication channel expressed as a stream of ByteStrings we want to inject keep-alive messages
but only if this does not interfere with normal traffic. but only if this does not interfere with normal traffic.
All this recipe needs is the ``MergePreferred`` element which is a version of a merge that is not fair. In other words, There is a built-in operation that allows to do this directly:
whenever the merge can choose because multiple upstream producers have elements to produce it will always choose the
preferred upstream effectively giving it an absolute priority.
.. includecode:: code/docs/stream/cookbook/RecipeKeepAlive.scala#inject-keepalive .. includecode:: code/docs/stream/cookbook/RecipeKeepAlive.scala#inject-keepalive

View file

@ -27,9 +27,11 @@ import scala.runtime.BoxedUnit;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class FlowTest extends StreamTest { public class FlowTest extends StreamTest {
@ -718,4 +720,69 @@ public class FlowTest extends StreamTest {
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
} }
@Test
public void mustBeAbleToUseInitialTimeout() throws Exception {
try {
Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class).initialTimeout(Duration.create(1, "second")))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseCompletionTimeout() throws Exception {
try {
Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class).completionTimeout(Duration.create(1, "second")))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseIdleTimeout() throws Exception {
try {
Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class).idleTimeout(Duration.create(1, "second")))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseKeepAlive() throws Exception {
Integer result = Await.result(
Source.<Integer>maybe()
.via(Flow.of(Integer.class)
.keepAlive(Duration.create(1, "second"), new Creator<Integer>() {
public Integer create() {
return 0;
}
})
)
.takeWithin(Duration.create(1500, "milliseconds"))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
assertEquals((Object) 0, result);
}
} }

View file

@ -31,10 +31,12 @@ import scala.util.Try;
import java.util.*; import java.util.*;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
import static akka.stream.testkit.TestPublisher.ManualProbe; import static akka.stream.testkit.TestPublisher.ManualProbe;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
@SuppressWarnings("serial") @SuppressWarnings("serial")
public class SourceTest extends StreamTest { public class SourceTest extends StreamTest {
@ -653,4 +655,62 @@ public class SourceTest extends StreamTest {
probe.expectMsgAllOf("A", "B", "C", "D", "E", "F"); probe.expectMsgAllOf("A", "B", "C", "D", "E", "F");
} }
@Test
public void mustBeAbleToUseInitialTimeout() throws Exception {
try {
Await.result(
Source.maybe().initialTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseCompletionTimeout() throws Exception {
try {
Await.result(
Source.maybe().completionTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseIdleTimeout() throws Exception {
try {
Await.result(
Source.maybe().idleTimeout(Duration.create(1, "second")).runWith(Sink.head(), materializer),
Duration.create(3, "second")
);
fail("A TimeoutException was expected");
} catch(TimeoutException e) {
// expected
}
}
@Test
public void mustBeAbleToUseIdleInject() throws Exception {
Integer result = Await.result(
Source.maybe()
.keepAlive(Duration.create(1, "second"), new Creator<Integer>() {
public Integer create() {
return 0;
}
})
.takeWithin(Duration.create(1500, "milliseconds"))
.runWith(Sink.<Integer>head(), materializer),
Duration.create(3, "second")
);
assertEquals((Object) 0, result);
}
} }

View file

@ -0,0 +1,149 @@
package akka.stream.scaladsl
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ TestSubscriber, TestPublisher, Utils, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowIdleInjectSpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
"keepAlive" must {
"not emit additional elements if upstream is fast enough" in Utils.assertAllStagesStopped {
Await.result(
Source(1 to 10).keepAlive(1.second, () 0).grouped(1000).runWith(Sink.head),
3.seconds) should ===(1 to 10)
}
"emit elements periodically after silent periods" in Utils.assertAllStagesStopped {
val sourceWithIdleGap = Source(1 to 5) ++ Source(6 to 10).initialDelay(2.second)
val result = Await.result(
sourceWithIdleGap.keepAlive(0.6.seconds, () 0).grouped(1000).runWith(Sink.head),
3.seconds) should ===(List(1, 2, 3, 4, 5, 0, 0, 0, 6, 7, 8, 9, 10))
}
"immediately pull upstream" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"immediately pull upstream after busy period" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
(Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(10)
downstream.expectNextN(1 to 10)
downstream.request(1)
upstream.sendNext(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"work if timer fires before initial request" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.ensureSubscription()
downstream.expectNoMsg(1.5.second)
downstream.request(1)
downstream.expectNext(0)
upstream.sendComplete()
downstream.expectComplete()
}
"work if timer fires before initial request after busy period" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
(Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(10)
downstream.expectNextN(1 to 10)
downstream.expectNoMsg(1.5.second)
downstream.request(1)
downstream.expectNext(0)
upstream.sendComplete()
downstream.expectComplete()
}
"prefer upstream element over injected" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.ensureSubscription()
downstream.expectNoMsg(1.5.second)
upstream.sendNext(1)
downstream.expectNoMsg(0.5.second)
downstream.request(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"prefer upstream element over injected after busy period" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
(Source(1 to 10) ++ Source(upstream)).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(10)
downstream.expectNextN(1 to 10)
downstream.expectNoMsg(1.5.second)
upstream.sendNext(1)
downstream.expectNoMsg(0.5.second)
downstream.request(1)
downstream.expectNext(1)
upstream.sendComplete()
downstream.expectComplete()
}
"reset deadline properly after injected element" in {
val upstream = TestPublisher.probe[Int]()
val downstream = TestSubscriber.probe[Int]()
Source(upstream).keepAlive(1.second, () 0).runWith(Sink(downstream))
downstream.request(2)
downstream.expectNoMsg(0.5.second)
downstream.expectNext(0)
downstream.expectNoMsg(0.5 second)
downstream.expectNext(0)
}
}
}

View file

@ -0,0 +1,52 @@
package akka.stream.scaladsl
import java.util.concurrent.TimeoutException
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.testkit.{ Utils, TestSubscriber, AkkaSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowInitialDelaySpec extends AkkaSpec {
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
"Flow initialDelay" must {
"work with zero delay" in Utils.assertAllStagesStopped {
Await.result(
Source(1 to 10).initialDelay(Duration.Zero).grouped(100).runWith(Sink.head),
1.second) should ===(1 to 10)
}
"delay elements by the specified time but not more" in Utils.assertAllStagesStopped {
a[TimeoutException] shouldBe thrownBy {
Await.result(
Source(1 to 10).initialDelay(2.seconds).initialTimeout(1.second).runWith(Sink.ignore),
2.seconds)
}
Await.ready(
Source(1 to 10).initialDelay(1.seconds).initialTimeout(2.second).runWith(Sink.ignore),
2.seconds)
}
"properly ignore timer while backpressured" in Utils.assertAllStagesStopped {
val probe = TestSubscriber.probe[Int]()
Source(1 to 10).initialDelay(0.5.second).runWith(Sink(probe))
probe.ensureSubscription()
probe.expectNoMsg(1.5.second)
probe.request(20)
probe.expectNextN(1 to 10)
probe.expectComplete()
}
}
}

View file

@ -3,10 +3,10 @@ package akka.stream.impl
import java.util.concurrent.{ TimeUnit, TimeoutException } import java.util.concurrent.{ TimeUnit, TimeoutException }
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.{ BidiShape, Inlet, Outlet, Attributes } import akka.stream._
import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic } import akka.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler, TimerGraphStageLogic }
import scala.concurrent.duration.{ Deadline, FiniteDuration } import scala.concurrent.duration.{ Duration, Deadline, FiniteDuration }
/** /**
* INTERNAL API * INTERNAL API
@ -19,7 +19,7 @@ import scala.concurrent.duration.{ Deadline, FiniteDuration }
* - if the timer fires before the event happens, these stages all fail the stream * - if the timer fires before the event happens, these stages all fail the stream
* - otherwise, these streams do not interfere with the element flow, ordinary completion or failure * - otherwise, these streams do not interfere with the element flow, ordinary completion or failure
*/ */
private[stream] object Timeouts { private[stream] object Timers {
private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = { private def idleTimeoutCheckInterval(timeout: FiniteDuration): FiniteDuration = {
import scala.concurrent.duration._ import scala.concurrent.duration._
FiniteDuration( FiniteDuration(
@ -145,4 +145,81 @@ private[stream] object Timeouts {
} }
} }
final class DelayInitial[T](val delay: FiniteDuration) extends GraphStage[FlowShape[T, T]] {
val in: Inlet[T] = Inlet("IdleInject.in")
val out: Outlet[T] = Outlet("IdleInject.out")
override val shape: FlowShape[T, T] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private val IdleTimer = "DelayTimer"
override def preStart(): Unit = {
if (delay == Duration.Zero) open = true
else scheduleOnce(IdleTimer, delay)
}
private var open: Boolean = false
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
})
setHandler(out, new OutHandler {
override def onPull(): Unit = if (open) pull(in)
})
override protected def onTimer(timerKey: Any): Unit = {
open = true
if (isAvailable(out)) pull(in)
}
}
}
final class IdleInject[I, O >: I](val timeout: FiniteDuration, inject: () O) extends GraphStage[FlowShape[I, O]] {
val in: Inlet[I] = Inlet("IdleInject.in")
val out: Outlet[O] = Outlet("IdleInject.out")
override val shape: FlowShape[I, O] = FlowShape(in, out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
private val IdleTimer = "IdleTimer"
private var nextDeadline = Deadline.now + timeout
// Prefetching to ensure priority of actual upstream elements
override def preStart(): Unit = pull(in)
setHandler(in, new InHandler {
override def onPush(): Unit = {
nextDeadline = Deadline.now + timeout
cancelTimer(IdleTimer)
if (isAvailable(out)) {
push(out, grab(in))
pull(in)
}
}
})
setHandler(out, new OutHandler {
override def onPull(): Unit = {
if (isAvailable(in)) {
push(out, grab(in))
pull(in)
} else {
if (nextDeadline.isOverdue()) {
nextDeadline = Deadline.now + timeout
push(out, inject())
} else scheduleOnce(IdleTimer, nextDeadline.timeLeft)
}
}
})
override protected def onTimer(timerKey: Any): Unit = {
if (nextDeadline.isOverdue() && isAvailable(out)) {
push(out, inject())
nextDeadline = Deadline.now + timeout
}
}
}
}
} }

View file

@ -5,6 +5,7 @@ package akka.stream.javadsl
import akka.event.LoggingAdapter import akka.event.LoggingAdapter
import akka.japi.{ function, Pair } import akka.japi.{ function, Pair }
import akka.stream.impl.Timers.{ DelayInitial, IdleInject }
import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream.{ scaladsl, _ } import akka.stream.{ scaladsl, _ }
import akka.stream.stage.Stage import akka.stream.stage.Stage
@ -999,6 +1000,14 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/** /**
* If the first element has not passed through this stage before the provided timeout, the stream is failed * If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/ */
def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = def initialTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialTimeout(timeout)) new Flow(delegate.initialTimeout(timeout))
@ -1006,6 +1015,14 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/** /**
* If the completion of the stream does not happen until the provided timeout, the stream is failed * If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/ */
def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = def completionTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.completionTimeout(timeout)) new Flow(delegate.completionTimeout(timeout))
@ -1013,10 +1030,52 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/ */
def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] = def idleTimeout(timeout: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.idleTimeout(timeout)) new Flow(delegate.idleTimeout(timeout))
/**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Flow[In, U, Mat] =
new Flow(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.initialDelay(delay))
override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] = override def withAttributes(attr: Attributes): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.withAttributes(attr)) new Flow(delegate.withAttributes(attr))

View file

@ -849,6 +849,14 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/** /**
* If the first element has not passed through this stage before the provided timeout, the stream is failed * If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/ */
def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = def initialTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialTimeout(timeout)) new Source(delegate.initialTimeout(timeout))
@ -856,6 +864,14 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/** /**
* If the completion of the stream does not happen until the provided timeout, the stream is failed * If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/ */
def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = def completionTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.completionTimeout(timeout)) new Source(delegate.completionTimeout(timeout))
@ -863,10 +879,52 @@ class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[Sour
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[java.util.concurrent.TimeoutException]]. * with a [[java.util.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/ */
def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] = def idleTimeout(timeout: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.idleTimeout(timeout)) new Source(delegate.idleTimeout(timeout))
/**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: function.Creator[U]): javadsl.Source[U, Mat] =
new Source(delegate.keepAlive(maxIdle, () injectedElem.create()))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.initialDelay(delay))
override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] = override def withAttributes(attr: Attributes): javadsl.Source[Out, Mat] =
new Source(delegate.withAttributes(attr)) new Source(delegate.withAttributes(attr))

View file

@ -5,7 +5,7 @@ package akka.stream.scaladsl
import akka.stream._ import akka.stream._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.Timeouts import akka.stream.impl.Timers
import scala.concurrent.duration.FiniteDuration import scala.concurrent.duration.FiniteDuration
@ -207,5 +207,5 @@ object BidiFlow {
* the *joint* frequencies of the elements in both directions. * the *joint* frequencies of the elements in both directions.
*/ */
def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] = def bidirectionalIdleTimeout[I, O](timeout: FiniteDuration): BidiFlow[I, I, O, O, Unit] =
fromGraph(new Timeouts.IdleBidi(timeout)) fromGraph(new Timers.IdleBidi(timeout))
} }

View file

@ -9,8 +9,9 @@ import akka.stream._
import akka.stream.impl.SplitDecision._ import akka.stream.impl.SplitDecision._
import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage } import akka.stream.impl.Stages.{ SymbolicGraphStage, StageModule, DirectProcessor, SymbolicStage }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.Timers
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered }
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timeouts } import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers }
import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage }
import akka.stream.stage._ import akka.stream.stage._
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
@ -1021,20 +1022,77 @@ trait FlowOps[+Out, +Mat] {
/** /**
* If the first element has not passed through this stage before the provided timeout, the stream is failed * If the first element has not passed through this stage before the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]]. * with a [[scala.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before first element arrives
*
* '''Cancels when''' downstream cancels
*/ */
def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Initial[Out](timeout)) def initialTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Initial[Out](timeout))
/** /**
* If the completion of the stream does not happen until the provided timeout, the stream is failed * If the completion of the stream does not happen until the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]]. * with a [[scala.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses before upstream completes
*
* '''Cancels when''' downstream cancels
*/ */
def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Completion[Out](timeout)) def completionTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Completion[Out](timeout))
/** /**
* If the time between two processed elements exceed the provided timeout, the stream is failed * If the time between two processed elements exceed the provided timeout, the stream is failed
* with a [[scala.concurrent.TimeoutException]]. * with a [[scala.concurrent.TimeoutException]].
*
* '''Emits when''' upstream emits an element
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes or fails if timeout elapses between two emitted elements
*
* '''Cancels when''' downstream cancels
*/ */
def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timeouts.Idle[Out](timeout)) def idleTimeout(timeout: FiniteDuration): Repr[Out, Mat] = via(new Timers.Idle[Out](timeout))
/**
* Injects additional elements if the upstream does not emit for a configured amount of time. In other words, this
* stage attempts to maintains a base rate of emitted elements towards the downstream.
*
* If the downstream backpressures then no element is injected until downstream demand arrives. Injected elements
* do not accumulate during this period.
*
* Upstream elements are always preferred over injected elements.
*
* '''Emits when''' upstream emits an element or if the upstream was idle for the configured period
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def keepAlive[U >: Out](maxIdle: FiniteDuration, injectedElem: () U): Repr[U, Mat] =
via(new Timers.IdleInject[Out, U](maxIdle, injectedElem))
/**
* Delays the initial element by the specified duration.
*
* '''Emits when''' upstream emits an element if the initial delay already elapsed
*
* '''Backpressures when''' downstream backpressures or initial delay not yet elapsed
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels
*/
def initialDelay(delay: FiniteDuration): Repr[Out, Mat] = via(new Timers.DelayInitial[Out](delay))
/** /**
* Logs elements flowing through the stream as well as completion and erroring. * Logs elements flowing through the stream as well as completion and erroring.