diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
new file mode 100644
index 0000000000..bdb7312ce7
--- /dev/null
+++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/delayWith.md
@@ -0,0 +1,30 @@
+# delayWith
+
+Delay every element passed through with a duration that can be controlled dynamically.
+
+@ref[Timer driven operators](../index.md#timer-driven-operators)
+
+@@@div { .group-scala }
+
+## Signature
+
+@@signature [Flow.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala) { #delayWith }
+
+@@@
+
+## Description
+
+Delay every element passed through with a duration that can be controlled dynamically, individually for each elements (via the `DelayStrategy`).
+
+
+@@@div { .callout }
+
+**emits** there is a pending element in the buffer and configured time for this element elapsed
+
+**backpressures** differs, depends on `OverflowStrategy` set
+
+**completes** when upstream completes and buffered elements has been drained
+
+
+@@@
+
diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md
index c0a0ec1d1f..7e8c01eab4 100644
--- a/akka-docs/src/main/paradox/stream/operators/index.md
+++ b/akka-docs/src/main/paradox/stream/operators/index.md
@@ -208,6 +208,7 @@ These operators process elements using timers, delaying, dropping or grouping el
| |Operator|Description|
|--|--|--|
|Source/Flow|@ref[delay](Source-or-Flow/delay.md)|Delay every element passed through with a specific duration.|
+|Source/Flow|@ref[delayWith](Source-or-Flow/delayWith.md)|Delay every element passed through with a duration that can be controlled dynamically.|
|Source/Flow|@ref[dropWithin](Source-or-Flow/dropWithin.md)|Drop elements until a timeout has fired|
|Source/Flow|@ref[groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)|Chunk up this stream into groups of elements received within a time window, or limited by the weight of the elements, whatever happens first.|
|Source/Flow|@ref[groupedWithin](Source-or-Flow/groupedWithin.md)|Chunk up this stream into groups of elements received within a time window, or limited by the number of the elements, whatever happens first.|
@@ -424,6 +425,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [groupedWithin](Source-or-Flow/groupedWithin.md)
* [groupedWeightedWithin](Source-or-Flow/groupedWeightedWithin.md)
* [delay](Source-or-Flow/delay.md)
+* [delayWith](Source-or-Flow/delayWith.md)
* [drop](Source-or-Flow/drop.md)
* [dropWithin](Source-or-Flow/dropWithin.md)
* [takeWhile](Source-or-Flow/takeWhile.md)
diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala
index 07934844ab..e909576d52 100644
--- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala
+++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala
@@ -14,6 +14,7 @@ import akka.stream.testkit.StreamSpec
import akka.stream.testkit.TestPublisher
import akka.stream.testkit.TestSubscriber
import akka.testkit.TimingTest
+import akka.testkit.TestDuration
import org.scalatest.concurrent.PatienceConfiguration
import org.scalatest.time.Milliseconds
import org.scalatest.time.Span
@@ -267,5 +268,73 @@ class FlowDelaySpec extends StreamSpec {
result should ===((1 to 9).toSeq)
}
+
+ "work with empty source" in {
+ Source.empty[Int].delay(Duration.Zero).runWith(TestSink.probe).request(1).expectComplete()
+ }
+
+ "work with fixed delay" in {
+
+ val fixedDelay = 1.second
+
+ val elems = 1 to 10
+
+ val probe = Source(elems)
+ .map(_ => System.nanoTime())
+ .delay(fixedDelay)
+ .map(start => System.nanoTime() - start)
+ .runWith(TestSink.probe)
+
+ elems.foreach(_ => {
+ val next = probe.request(1).expectNext(fixedDelay + fixedDelay.dilated)
+ next should be >= fixedDelay.toNanos
+ })
+
+ probe.expectComplete()
+
+ }
+
+ "work without delay" in {
+
+ val elems = Vector(1, 2, 3, 4, 5, 6, 7, 8, 9, 0)
+
+ Source(elems).delay(Duration.Zero).runWith(TestSink.probe).request(elems.size).expectNextN(elems).expectComplete()
+ }
+
+ "work with linear increasing delay" taggedAs TimingTest in {
+
+ val elems = 1 to 10
+ val step = 1.second
+ val initial = 1.second
+ val max = 5.seconds
+
+ def incWhile(i: (Int, Long)): Boolean = i._1 < 7
+
+ val probe = Source(elems)
+ .map(e => (e, System.nanoTime()))
+ .delayWith(
+ () => DelayStrategy.linearIncreasingDelay(step, incWhile, initial, max),
+ OverflowStrategy.backpressure)
+ .map(start => System.nanoTime() - start._2)
+ .runWith(TestSink.probe)
+
+ elems.foreach(e =>
+ if (incWhile((e, 1L))) {
+ val afterIncrease = initial + e * step
+ val delay = if (afterIncrease < max) {
+ afterIncrease
+ } else {
+ max
+ }
+ val next = probe.request(1).expectNext(delay + delay.dilated)
+ next should be >= delay.toNanos
+ } else {
+ val next = probe.request(1).expectNext(initial + initial.dilated)
+ next should be >= initial.toNanos
+ })
+
+ probe.expectComplete()
+
+ }
}
}
diff --git a/akka-stream/src/main/mima-filters/2.6.0.backwards.excludes/pr-25000-delaywith.excludes b/akka-stream/src/main/mima-filters/2.6.0.backwards.excludes/pr-25000-delaywith.excludes
new file mode 100644
index 0000000000..a8c8b6dfa4
--- /dev/null
+++ b/akka-stream/src/main/mima-filters/2.6.0.backwards.excludes/pr-25000-delaywith.excludes
@@ -0,0 +1,6 @@
+# DelayWith (internal api changes)
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Delay.strategy")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Delay.d")
+ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.impl.fusing.Delay.DelayPrecisionMS")
+ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.stream.impl.fusing.Delay.this")
+ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.stream.scaladsl.FlowOps.delayWith")
diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
index 0e5aa0228f..1b6d48eecc 100644
--- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
+++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala
@@ -15,7 +15,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl }
-import akka.stream.scaladsl.{ Flow, Keep, Source }
+import akka.stream.scaladsl.{ DelayStrategy, Flow, Keep, Source }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
@@ -1700,34 +1700,38 @@ private[stream] object Collect {
/**
* INTERNAL API
*/
-@InternalApi private[akka] final class Delay[T](val d: FiniteDuration, val strategy: DelayOverflowStrategy)
+@InternalApi private[akka] final class Delay[T](
+ private[this] val delayStrategySupplier: () => DelayStrategy[_ >: T],
+ private[this] val overflowStrategy: DelayOverflowStrategy)
extends SimpleLinearGraphStage[T] {
private[this] def timerName = "DelayedTimer"
- final val DelayPrecisionMS = 10
+ private[this] val DelayPrecisionMS = 10
override def initialAttributes: Attributes = DefaultAttributes.delay
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new TimerGraphStageLogic(shape) with InHandler with OutHandler {
- val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
+ private[this] val size = inheritedAttributes.mandatoryAttribute[InputBuffer].max
- val delayMillis = d.toMillis
+ private[this] val delayStrategy = delayStrategySupplier()
- var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
+ private[this] var buffer
+ : BufferImpl[(Long, T)] = _ // buffer has pairs timestamp of expected push with upstream element
override def preStart(): Unit = buffer = BufferImpl(size, inheritedAttributes)
- val onPushWhenBufferFull: () => Unit = strategy match {
+ private[this] val onPushWhenBufferFull: () => Unit = overflowStrategy match {
case EmitEarly =>
() => {
- if (!isTimerActive(timerName))
+ if (isAvailable(out)) {
+ if (isTimerActive(timerName))
+ cancelTimer(timerName)
+
push(out, buffer.dequeue()._2)
- else {
- cancelTimer(timerName)
- onTimer(timerName)
+ grabAndPull()
+ completeIfReady()
}
- grabAndPull()
}
case _: DropHead =>
() => {
@@ -1742,7 +1746,7 @@ private[stream] object Collect {
case _: DropNew =>
() => {
grab(in)
- pull(in)
+ if (pullCondition) pull(in)
}
case _: DropBuffer =>
() => {
@@ -1751,7 +1755,7 @@ private[stream] object Collect {
}
case _: Fail =>
() => {
- failStage(BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!"))
+ failStage(new BufferOverflowException(s"Buffer overflow for delay operator (max capacity was: $size)!"))
}
case _: Backpressure =>
() => {
@@ -1765,28 +1769,22 @@ private[stream] object Collect {
else {
grabAndPull()
if (!isTimerActive(timerName)) {
- // schedule a timer for the full-delay `d` only if the buffer is empty, because otherwise a
- // full-length timer will starve subsequent `onPull` callbacks, preventing overdue elements
- // to be discharged.
- if (buffer.isEmpty)
- scheduleOnce(timerName, d)
- else
- scheduleOnce(timerName, Math.max(DelayPrecisionMS, nextElementWaitTime()).millis)
+ val waitTime = nextElementWaitTime()
+ if (waitTime <= DelayPrecisionMS && isAvailable(out)) {
+ push(out, buffer.dequeue()._2)
+ completeIfReady()
+ } else
+ scheduleOnce(timerName, waitTime.millis)
}
}
}
- def pullCondition: Boolean = strategy match {
- case EmitEarly =>
- // when buffer is full we can only emit early if out is available
- buffer.used < size || isAvailable(out)
- case _ =>
- !strategy.isBackpressure || buffer.used < size
- }
+ private def pullCondition: Boolean =
+ !overflowStrategy.isBackpressure || buffer.used < size
- def grabAndPull(): Unit = {
- if (buffer.used == size) throw new IllegalStateException("Trying to enqueue but buffer is full")
- buffer.enqueue((System.nanoTime(), grab(in)))
+ private def grabAndPull(): Unit = {
+ val element = grab(in)
+ buffer.enqueue((System.nanoTime() + delayStrategy.nextDelay(element).toNanos, element))
if (pullCondition) pull(in)
}
@@ -1796,11 +1794,10 @@ private[stream] object Collect {
def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty) {
val waitTime = nextElementWaitTime()
- if (waitTime < 0) {
+ if (waitTime <= DelayPrecisionMS)
push(out, buffer.dequeue()._2)
- } else {
- scheduleOnce(timerName, Math.max(DelayPrecisionMS, waitTime).millis)
- }
+ else
+ scheduleOnce(timerName, waitTime.millis)
}
if (!isClosed(in) && !hasBeenPulled(in) && pullCondition)
@@ -1814,19 +1811,14 @@ private[stream] object Collect {
def completeIfReady(): Unit = if (isClosed(in) && buffer.isEmpty) completeStage()
- def nextElementWaitTime(): Long = {
- delayMillis - NANOSECONDS.toMillis(System.nanoTime() - buffer.peek()._1)
+ private def nextElementWaitTime(): Long = {
+ NANOSECONDS.toMillis(buffer.peek()._1 - System.nanoTime())
}
final override protected def onTimer(key: Any): Unit = {
if (isAvailable(out))
push(out, buffer.dequeue()._2)
- if (!buffer.isEmpty) {
- val waitTime = nextElementWaitTime()
- if (waitTime > DelayPrecisionMS)
- scheduleOnce(timerName, waitTime.millis)
- }
completeIfReady()
}
}
@@ -2266,7 +2258,6 @@ private[stream] object Collect {
matVal
}
-
}
(stageLogic, matPromise.future)
}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/DelayStrategy.scala b/akka-stream/src/main/scala/akka/stream/javadsl/DelayStrategy.scala
new file mode 100644
index 0000000000..3f972e0bf9
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/DelayStrategy.scala
@@ -0,0 +1,108 @@
+/*
+ * Copyright (C) 2018-2019 Lightbend Inc.
+ */
+
+package akka.stream.javadsl
+
+import akka.annotation.InternalApi
+import akka.stream.scaladsl
+import akka.util.JavaDurationConverters.JavaDurationOps
+
+import scala.concurrent.duration.FiniteDuration
+
+/**
+ * Allows to manage delay and can be stateful to compute delay for any sequence of elements,
+ * all elements go through nextDelay() updating state and returning delay for each element
+ */
+trait DelayStrategy[T] {
+
+ /**
+ * Returns delay for ongoing element, `Duration.Zero` means passing without delay
+ */
+ def nextDelay(elem: T): java.time.Duration
+
+}
+
+object DelayStrategy {
+
+ /** INTERNAL API */
+ @InternalApi
+ private[javadsl] def asScala[T](delayStrategy: DelayStrategy[T]) = new scaladsl.DelayStrategy[T] {
+ override def nextDelay(elem: T): FiniteDuration = delayStrategy.nextDelay(elem).asScala
+ }
+
+ /**
+ * Fixed delay strategy, always returns constant delay for any element.
+ * @param delay value of the delay
+ */
+ def fixedDelay[T](delay: java.time.Duration): DelayStrategy[T] = new DelayStrategy[T] {
+ override def nextDelay(elem: T): java.time.Duration = delay
+ }
+
+ /**
+ * Strategy with linear increasing delay.
+ * It starts with zero delay for each element,
+ * increases by `increaseStep` every time when `needsIncrease` returns `true`,
+ * when `needsIncrease` returns `false` it resets to `initialDelay`.
+ * @param increaseStep step by which delay is increased
+ * @param needsIncrease if `true` delay increases, if `false` delay resets to `initialDelay`
+ */
+ def linearIncreasingDelay[T](increaseStep: java.time.Duration, needsIncrease: T => Boolean): DelayStrategy[T] =
+ linearIncreasingDelay(increaseStep, needsIncrease, java.time.Duration.ZERO)
+
+ /**
+ * Strategy with linear increasing delay.
+ * It starts with `initialDelay` for each element,
+ * increases by `increaseStep` every time when `needsIncrease` returns `true`.
+ * when `needsIncrease` returns `false` it resets to `initialDelay`.
+ * @param increaseStep step by which delay is increased
+ * @param needsIncrease if `true` delay increases, if `false` delay resets to `initialDelay`
+ * @param initialDelay initial delay for each of elements
+ */
+ def linearIncreasingDelay[T](
+ increaseStep: java.time.Duration,
+ needsIncrease: T => Boolean,
+ initialDelay: java.time.Duration): DelayStrategy[T] =
+ linearIncreasingDelay(increaseStep, needsIncrease, initialDelay, java.time.Duration.ofNanos(Long.MaxValue))
+
+ /**
+ * Strategy with linear increasing delay.
+ * It starts with `initialDelay` for each element,
+ * increases by `increaseStep` every time when `needsIncrease` returns `true` up to `maxDelay`,
+ * when `needsIncrease` returns `false` it resets to `initialDelay`.
+ * @param increaseStep step by which delay is increased
+ * @param needsIncrease if `true` delay increases, if `false` delay resets to `initialDelay`
+ * @param initialDelay initial delay for each of elements
+ * @param maxDelay limits maximum delay
+ */
+ def linearIncreasingDelay[T](
+ increaseStep: java.time.Duration,
+ needsIncrease: T => Boolean,
+ initialDelay: java.time.Duration,
+ maxDelay: java.time.Duration): DelayStrategy[T] = {
+ require(increaseStep.compareTo(java.time.Duration.ZERO) > 0, "Increase step must be positive")
+ require(maxDelay.compareTo(initialDelay) >= 0, "Initial delay may not exceed max delay")
+
+ new DelayStrategy[T] {
+
+ private[this] var delay = initialDelay
+
+ override def nextDelay(elem: T): java.time.Duration = {
+ if (needsIncrease(elem)) {
+ val next = delay.plus(increaseStep)
+ if (next.compareTo(maxDelay) < 0) {
+ delay = next
+ } else {
+ delay = maxDelay
+ }
+ } else {
+ delay = initialDelay
+ }
+ delay
+ }
+
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
index 3c80776150..a8e7d38c7b 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala
@@ -1348,6 +1348,42 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] =
delay(of.asScala, strategy)
+ /**
+ * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.
+ * It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined
+ * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
+ * there is no space available in the buffer.
+ *
+ * It determines delay for each ongoing element invoking `DelayStrategy.nextDelay(elem: T): FiniteDuration`.
+ *
+ * Note that elements are not re-ordered: if an element is given a delay much shorter than its predecessor,
+ * it will still have to wait for the preceding element before being emitted.
+ * It is also important to notice that [[DelayStrategy]] can be stateful.
+ *
+ * Delay precision is 10ms to avoid unnecessary timer scheduling cycles.
+ *
+ * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
+ *
+ * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
+ * * EmitEarly - strategy do not wait to emit element if buffer is full
+ *
+ * '''Backpressures when''' depending on OverflowStrategy
+ * * Backpressure - backpressures when buffer is full
+ * * DropHead, DropTail, DropBuffer - never backpressures
+ * * Fail - fails the stream if buffer gets full
+ *
+ * '''Completes when''' upstream completes and buffered elements have been drained
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
+ * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
+ */
+ def delayWith(
+ delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ overFlowStrategy: DelayOverflowStrategy): Flow[In, Out, Mat] =
+ new Flow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
index 89a356f65d..bfe997d2ee 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala
@@ -2656,6 +2656,42 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): Source[Out, Mat] =
delay(of.asScala, strategy)
+ /**
+ * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.
+ * It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined
+ * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
+ * there is no space available in the buffer.
+ *
+ * It determines delay for each ongoing element invoking `DelayStrategy.nextDelay(elem: T): FiniteDuration`.
+ *
+ * Note that elements are not re-ordered: if an element is given a delay much shorter than its predecessor,
+ * it will still have to wait for the preceding element before being emitted.
+ * It is also important to notice that [[DelayStrategy]] can be stateful.
+ *
+ * Delay precision is 10ms to avoid unnecessary timer scheduling cycles.
+ *
+ * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
+ *
+ * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
+ * * EmitEarly - strategy do not wait to emit element if buffer is full
+ *
+ * '''Backpressures when''' depending on OverflowStrategy
+ * * Backpressure - backpressures when buffer is full
+ * * DropHead, DropTail, DropBuffer - never backpressures
+ * * Fail - fails the stream if buffer gets full
+ *
+ * '''Completes when''' upstream completes and buffered elements have been drained
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
+ * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
+ */
+ def delayWith(
+ delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ overFlowStrategy: DelayOverflowStrategy): Source[Out, Mat] =
+ new Source(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
index 582c511752..dd6588785e 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala
@@ -15,6 +15,7 @@ import akka.util.ccompat.JavaConverters._
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import java.util.Comparator
+import java.util.function.Supplier
import scala.compat.java8.FutureConverters._
import java.util.concurrent.CompletionStage
@@ -791,6 +792,42 @@ class SubFlow[In, Out, Mat](
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] =
delay(of.asScala, strategy)
+ /**
+ * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.
+ * It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined
+ * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
+ * there is no space available in the buffer.
+ *
+ * It determines delay for each ongoing element invoking `DelayStrategy.nextDelay(elem: T): FiniteDuration`.
+ *
+ * Note that elements are not re-ordered: if an element is given a delay much shorter than its predecessor,
+ * it will still have to wait for the preceding element before being emitted.
+ * It is also important to notice that [[DelayStrategy]] can be stateful.
+ *
+ * Delay precision is 10ms to avoid unnecessary timer scheduling cycles.
+ *
+ * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
+ *
+ * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
+ * * EmitEarly - strategy do not wait to emit element if buffer is full
+ *
+ * '''Backpressures when''' depending on OverflowStrategy
+ * * Backpressure - backpressures when buffer is full
+ * * DropHead, DropTail, DropBuffer - never backpressures
+ * * Fail - fails the stream if buffer gets full
+ *
+ * '''Completes when''' upstream completes and buffered elements have been drained
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
+ * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
+ */
+ def delayWith(
+ delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ overFlowStrategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] =
+ new SubFlow(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
index 1d0c321014..5cc3d6be8c 100755
--- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
+++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala
@@ -16,6 +16,7 @@ import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.duration.FiniteDuration
import java.util.Comparator
import java.util.concurrent.CompletionStage
+import java.util.function.Supplier
import com.github.ghik.silencer.silent
@@ -883,6 +884,42 @@ class SubSource[Out, Mat](
def delay(of: java.time.Duration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] =
delay(of.asScala, strategy)
+ /**
+ * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.
+ * It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined
+ * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
+ * there is no space available in the buffer.
+ *
+ * It determines delay for each ongoing element invoking `DelayStrategy.nextDelay(elem: T): FiniteDuration`.
+ *
+ * Note that elements are not re-ordered: if an element is given a delay much shorter than its predecessor,
+ * it will still have to wait for the preceding element before being emitted.
+ * It is also important to notice that [[DelayStrategy]] can be stateful.
+ *
+ * Delay precision is 10ms to avoid unnecessary timer scheduling cycles.
+ *
+ * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
+ *
+ * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
+ * * EmitEarly - strategy do not wait to emit element if buffer is full
+ *
+ * '''Backpressures when''' depending on OverflowStrategy
+ * * Backpressure - backpressures when buffer is full
+ * * DropHead, DropTail, DropBuffer - never backpressures
+ * * Fail - fails the stream if buffer gets full
+ *
+ * '''Completes when''' upstream completes and buffered elements have been drained
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
+ * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
+ */
+ def delayWith(
+ delayStrategySupplier: Supplier[DelayStrategy[Out]],
+ overFlowStrategy: DelayOverflowStrategy): SubSource[Out, Mat] =
+ new SubSource(delegate.delayWith(() => DelayStrategy.asScala(delayStrategySupplier.get), overFlowStrategy))
+
/**
* Recover allows to send last element on failure and gracefully complete the stream
* Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements.
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/DelayStrategy.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/DelayStrategy.scala
new file mode 100644
index 0000000000..dd622a7823
--- /dev/null
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/DelayStrategy.scala
@@ -0,0 +1,70 @@
+/*
+ * Copyright (C) 2018-2019 Lightbend Inc.
+ */
+
+package akka.stream.scaladsl
+
+import scala.concurrent.duration.{ Duration, FiniteDuration }
+
+/**
+ * Allows to manage delay. Can be stateful to compute delay for any sequence
+ * of elements, as instances are not shared among running streams and all
+ * elements go through nextDelay(), updating state and returning delay for that
+ * element.
+ */
+trait DelayStrategy[-T] {
+
+ /**
+ * Returns delay for ongoing element, `Duration.Zero` means passing without delay
+ */
+ def nextDelay(elem: T): FiniteDuration
+
+}
+
+object DelayStrategy {
+
+ /**
+ * Fixed delay strategy, always returns constant delay for any element.
+ * @param delay value of the delay
+ */
+ def fixedDelay(delay: FiniteDuration): DelayStrategy[Any] = new DelayStrategy[Any] {
+ override def nextDelay(elem: Any): FiniteDuration = delay
+ }
+
+ /**
+ * Strategy with linear increasing delay.
+ * It starts with `initialDelay` for each element,
+ * increases by `increaseStep` every time when `needsIncrease` returns `true` up to `maxDelay`,
+ * when `needsIncrease` returns `false` it resets to `initialDelay`.
+ * @param increaseStep step by which delay is increased
+ * @param needsIncrease if `true` delay increases, if `false` delay resets to `initialDelay`
+ * @param initialDelay initial delay for each of elements
+ * @param maxDelay limits maximum delay
+ */
+ def linearIncreasingDelay[T](
+ increaseStep: FiniteDuration,
+ needsIncrease: T => Boolean,
+ initialDelay: FiniteDuration = Duration.Zero,
+ maxDelay: Duration = Duration.Inf): DelayStrategy[T] = {
+ require(increaseStep > Duration.Zero, "Increase step must be positive")
+ require(maxDelay > initialDelay, "Max delay must be bigger than initial delay")
+
+ new DelayStrategy[T] {
+
+ private[this] var delay: FiniteDuration = initialDelay
+
+ override def nextDelay(elem: T): FiniteDuration = {
+ if (needsIncrease(elem)) {
+ // minimum of a finite and an infinite duration is finite
+ delay = Seq(delay + increaseStep, maxDelay).min.asInstanceOf[FiniteDuration]
+ } else {
+ delay = initialDelay
+ }
+ delay
+ }
+
+ }
+
+ }
+
+}
diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
index aebbc30a27..ecf5acd6bc 100755
--- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
+++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala
@@ -1602,7 +1602,41 @@ trait FlowOps[+Out, +Mat] {
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] =
- via(new Delay[Out](of, strategy))
+ via(new Delay[Out](() => DelayStrategy.fixedDelay(of), strategy))
+
+ /**
+ * Shifts elements emission in time by an amount individually determined through delay strategy a specified amount.
+ * It allows to store elements in internal buffer while waiting for next element to be emitted. Depending on the defined
+ * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
+ * there is no space available in the buffer.
+ *
+ * It determines delay for each ongoing element invoking `DelayStrategy.nextDelay(elem: T): FiniteDuration`.
+ *
+ * Note that elements are not re-ordered: if an element is given a delay much shorter than its predecessor,
+ * it will still have to wait for the preceding element before being emitted.
+ * It is also important to notice that [[scaladsl.DelayStrategy]] can be stateful.
+ *
+ * Delay precision is 10ms to avoid unnecessary timer scheduling cycles.
+ *
+ * Internal buffer has default capacity 16. You can set buffer size by calling `addAttributes(inputBuffer)`
+ *
+ * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
+ * * EmitEarly - strategy do not wait to emit element if buffer is full
+ *
+ * '''Backpressures when''' depending on OverflowStrategy
+ * * Backpressure - backpressures when buffer is full
+ * * DropHead, DropTail, DropBuffer - never backpressures
+ * * Fail - fails the stream if buffer gets full
+ *
+ * '''Completes when''' upstream completes and buffered elements have been drained
+ *
+ * '''Cancels when''' downstream cancels
+ *
+ * @param delayStrategySupplier creates new [[DelayStrategy]] object for each materialization
+ * @param overFlowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
+ */
+ def delayWith(delayStrategySupplier: () => DelayStrategy[Out], overFlowStrategy: DelayOverflowStrategy): Repr[Out] =
+ via(new Delay[Out](delayStrategySupplier, overFlowStrategy))
/**
* Discard the given number of elements at the beginning of the stream.