diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala index c997b6f4c7..04773f199d 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -79,8 +79,6 @@ object InterpreterBenchmark { if (expected > 0) pull(in) // Otherwise do nothing, it will exit the interpreter } - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) def requestOne(): Unit = pull(in) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 326e697f2e..42c0fffac4 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -266,6 +266,16 @@ object TestSubscriber { self } + /** + * Fluent DSL + * + * Expect a stream element during specified time or timeout. + */ + def expectNext(d: FiniteDuration, element: I): Self = { + probe.expectMsg(d, OnNext(element)) + self + } + /** * Fluent DSL * diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index 447f83efb2..44a4849bfb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -10,7 +10,6 @@ import akka.stream.testkit.scaladsl._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props -import akka.stream.OverflowStrategy object ActorRefSinkSpec { case class Fw(ref: ActorRef) extends Actor { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala new file mode 100644 index 0000000000..72939734b4 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -0,0 +1,127 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.Attributes._ +import akka.stream.testkit.Utils._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } +import akka.stream.{ DelayOverflowStrategy, ActorMaterializer } + +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.util.control.NoStackTrace + +class FlowDelaySpec extends AkkaSpec { + + implicit val materializer = ActorMaterializer() + + "A Delay" must { + "deliver elements with some time shift" in { + Await.result( + Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head), + 1200.millis) should ===(1 to 10) + } + + "add delay to initialDelay if exists upstream" in { + Source(1 to 10).initialDelay(1.second).delay(1.second).runWith(TestSink.probe[Int]) + .request(10) + .expectNoMsg(1800.millis) + .expectNext(300.millis, 1) + .expectNextN(2 to 10) + .expectComplete() + } + + "deliver element after time passed from actual receiving element" in { + Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) + .request(2) + .expectNoMsg(200.millis) //delay + .expectNext(200.millis, 1) //delayed element + .expectNext(100.millis, 2) //buffered element + .expectNoMsg(200.millis) + .request(1) + .expectNext(3) //buffered element + .expectComplete() + } + + "deliver elements with delay for slow stream" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + val p = TestPublisher.manualProbe[Int]() + + Source(p).delay(300.millis).to(Sink(c)).run() + val cSub = c.expectSubscription() + val pSub = p.expectSubscription() + cSub.request(100) + pSub.sendNext(1) + c.expectNoMsg(200.millis) + c.expectNext(1) + pSub.sendNext(2) + c.expectNoMsg(200.millis) + c.expectNext(2) + pSub.sendComplete() + c.expectComplete() + } + + "drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===((1 to 15).toList :+ 20) + } + + "drop head for internal buffer if it's full in DropHead mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropHead).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===(5 to 20) + } + + "clear all for internal buffer if it's full in DropBuffer mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropBuffer).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===(17 to 20) + } + + "pass elements with delay through normally in backpressured mode" in assertAllStagesStopped { + Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).runWith(TestSink.probe[Int]) + .request(5) + .expectNoMsg(200.millis) + .expectNext(200.millis, 1) + .expectNoMsg(200.millis) + .expectNext(200.millis, 2) + .expectNoMsg(200.millis) + .expectNext(200.millis, 3) + } + + "fail on overflow in Fail mode" in assertAllStagesStopped { + Source(1 to 20).delay(300.millis, DelayOverflowStrategy.fail) + .withAttributes(inputBuffer(16, 16)) + .runWith(TestSink.probe[Int]) + .request(100) + .expectError(new DelayOverflowStrategy.Fail.BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!")) + + } + + "emit early when buffer is full and in EmitEarly mode" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + val p = TestPublisher.manualProbe[Int]() + + Source(p).delay(10.seconds, DelayOverflowStrategy.emitEarly).withAttributes(inputBuffer(16, 16)).to(Sink(c)).run() + val cSub = c.expectSubscription() + val pSub = p.expectSubscription() + cSub.request(20) + + for (i ← 1 to 16) pSub.sendNext(i) + c.expectNoMsg(300.millis) + pSub.sendNext(17) + c.expectNext(100.millis, 1) + //fail will terminate despite of non empty internal buffer + pSub.sendError(new RuntimeException() with NoStackTrace) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala index 536fb9855f..25f63ea9ad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala @@ -37,7 +37,7 @@ class FlowTakeWithinSpec extends AkkaSpec { c.expectNoMsg(200.millis) } - "deliver bufferd elements onComplete before the timeout" in assertAllStagesStopped { + "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() Source(1 to 3).takeWithin(1.second).to(Sink(c)).run() val cSub = c.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index dd1e860c84..25f0fb5c89 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -6,42 +6,45 @@ package akka.stream /** * Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element. */ -sealed abstract class OverflowStrategy +sealed abstract class OverflowStrategy extends Serializable +sealed trait DelayOverflowStrategy extends Serializable -object OverflowStrategy { +private[akka] trait BaseOverflowStrategy { /** * INTERNAL API */ - private[akka] final case object DropHead extends OverflowStrategy + private[akka] case object DropHead extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropTail extends OverflowStrategy + private[akka] case object DropTail extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropBuffer extends OverflowStrategy + private[akka] case object DropBuffer extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropNew extends OverflowStrategy + private[akka] case object DropNew extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object Backpressure extends OverflowStrategy + private[akka] case object Backpressure extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object Fail extends OverflowStrategy { + private[akka] case object Fail extends OverflowStrategy with DelayOverflowStrategy { final case class BufferOverflowException(msg: String) extends RuntimeException(msg) } +} +object OverflowStrategy extends BaseOverflowStrategy { /** * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. @@ -75,3 +78,48 @@ object OverflowStrategy { */ def fail: OverflowStrategy = Fail } + +object DelayOverflowStrategy extends BaseOverflowStrategy { + /** + * INTERNAL API + */ + private[akka] case object EmitEarly extends DelayOverflowStrategy + + /** + * If the buffer is full when a new element is available this strategy send next element downstream without waiting + */ + def emitEarly: DelayOverflowStrategy = EmitEarly + + /** + * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for + * the new element. + */ + def dropHead: DelayOverflowStrategy = DropHead + + /** + * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for + * the new element. + */ + def dropTail: DelayOverflowStrategy = DropTail + + /** + * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. + */ + def dropBuffer: DelayOverflowStrategy = DropBuffer + + /** + * If the buffer is full when a new element arrives, drops the new element. + */ + def dropNew: DelayOverflowStrategy = DropNew + + /** + * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until + * space becomes available in the buffer. + */ + def backpressure: DelayOverflowStrategy = Backpressure + + /** + * If the buffer is full when a new element is available this strategy completes the stream with failure. + */ + def fail: DelayOverflowStrategy = Fail +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala index eb3772ca61..1f7ed0e1e1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala @@ -53,7 +53,7 @@ private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: Over sendAck(false) } else if (!buffer.isFull) enqueueAndSendAck(elem) - else overflowStrategy match { + else (overflowStrategy: @unchecked) match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 897e4a6978..1e9c35ed4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -58,7 +58,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf log.debug("Dropping element because there is no downstream demand: [{}]", elem) else if (!buffer.isFull) buffer.enqueue(elem) - else overflowStrategy match { + else (overflowStrategy: @unchecked) match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 2dc1437da3..b2d8cfe8c6 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -5,7 +5,8 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } -import akka.stream.Attributes.LogLevels +import akka.stream.Attributes.{ InputBuffer, LogLevels } +import akka.stream.DelayOverflowStrategy.EmitEarly import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ @@ -14,10 +15,10 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy +import scala.concurrent.duration.{ FiniteDuration, _ } /** * INTERNAL API @@ -398,7 +399,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt else ctx.absorbTermination() val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { - overflowStrategy match { + (overflowStrategy: @unchecked) match { case DropHead ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.dropHead() buffer.enqueue(elem) @@ -857,13 +858,89 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS } } +private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element + val timerName = "DelayedTimer" + var willStop = false + + setHandler(in, handler = new InHandler { + override def onPush(): Unit = { + if (buffer.isFull) (strategy: @unchecked) match { + case EmitEarly ⇒ + if (!isTimerActive(timerName)) + push(out, buffer.dequeue()._2) + else { + cancelTimer(timerName) + onTimer(timerName) + } + case DelayOverflowStrategy.DropHead ⇒ + buffer.dropHead() + grabAndPull(true) + case DelayOverflowStrategy.DropTail ⇒ + buffer.dropTail() + grabAndPull(true) + case DelayOverflowStrategy.DropNew ⇒ + grab(in) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + case DelayOverflowStrategy.DropBuffer ⇒ + buffer.clear() + grabAndPull(true) + case DelayOverflowStrategy.Fail ⇒ + failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) + case DelayOverflowStrategy.Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") + } + else { + grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + } + } + + def grabAndPull(pullCondition: Boolean): Unit = { + buffer.enqueue((System.nanoTime(), grab(in))) + if (pullCondition) pull(in) + } + + override def onUpstreamFinish(): Unit = { + if (isAvailable(out) && isTimerActive(timerName)) willStop = true + else completeStage() + } + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) + push(out, buffer.dequeue()._2) + + if (!willStop && !hasBeenPulled(in)) pull(in) + completeIfReady() + } + }) + + def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() + + def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000 + + final override protected def onTimer(key: Any): Unit = { + push(out, buffer.dequeue()._2) + if (!buffer.isEmpty) { + val waitTime = nextElementWaitTime() + if (waitTime > 10) scheduleOnce(timerName, waitTime.millis) + } + completeIfReady() + } + } + + override def toString = "Delay" +} + private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) setHandler(out, new OutHandler { @@ -888,8 +965,6 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea override def onPush(): Unit = if (allow) push(out, grab(in)) else pull(in) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) setHandler(out, new OutHandler { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index f1913fe19a..a4bda1c594 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -541,6 +541,34 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): Flow[In, Out, Mat] = + new Flow(delegate.delay(of, strategy)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 1f583f5e01..3f72020851 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,7 +8,9 @@ import java.io.{ OutputStream, InputStream, File } import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } +import akka.stream.Attributes._ import akka.stream._ +import akka.stream.impl.fusing.Delay import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.stage.Stage import akka.util.ByteString @@ -903,6 +905,34 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): Source[Out, Mat] = + new Source(delegate.delay(of, strategy)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 70f7c43cc3..96cf1711fe 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -386,6 +386,34 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo def groupedWithin(n: Int, d: FiniteDuration): SubFlow[In, java.util.List[Out @uncheckedVariance], Mat] = new SubFlow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubFlow[In, Out, Mat] = + new SubFlow(delegate.delay(of, strategy)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 3200688900..264cc3299b 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -448,6 +448,34 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source def dropWhile(p: function.Predicate[Out]): SubSource[Out, Mat] = new SubSource(delegate.dropWhile(p.test)) + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): SubSource[Out, Mat] = + new SubSource(delegate.delay(of, strategy)) + /** * Recover allows to send last element on failure and gracefully complete the stream * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 15e0939782..4e09889c3e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,7 +9,7 @@ import akka.stream._ import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } import akka.stream.impl._ -import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUnordered, TakeWithin } +import akka.stream.impl.fusing._ import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } @@ -719,6 +719,34 @@ trait FlowOps[+Out, +Mat] { via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin"))) } + /** + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. + * + * Delay precision is 10ms to avoid unnecessary timer scheduling cycles + * + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` + * + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full + * + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full + * + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out] = + via(new Delay[Out](of, strategy).withAttributes(name("delay"))) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -881,7 +909,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' prefix elements has been consumed and substream has been consumed * * '''Cancels when''' downstream cancels or substream cancels - * */ def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, Unit])] = deprecatedAndThen(PrefixAndTail(n))