From 83d3143236bc61a6555394a36834e626601aeff2 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Wed, 25 Nov 2015 21:29:35 -0500 Subject: [PATCH] +str #18556 add delay combinator --- .../akka/stream/testkit/StreamTestKit.scala | 10 ++ .../stream/scaladsl/ActorRefSinkSpec.scala | 1 - .../akka/stream/scaladsl/FlowDelaySpec.scala | 106 +++++++++++++++--- .../scala/akka/stream/OverflowStrategy.scala | 64 +++++++++-- .../stream/impl/AcknowledgePublisher.scala | 2 +- .../stream/impl/ActorRefSourceActor.scala | 2 +- .../scala/akka/stream/impl/fusing/Ops.scala | 74 +++++++++--- .../main/scala/akka/stream/javadsl/Flow.scala | 26 +++-- .../scala/akka/stream/javadsl/Source.scala | 26 +++-- .../scala/akka/stream/scaladsl/Flow.scala | 26 +++-- 10 files changed, 277 insertions(+), 60 deletions(-) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 94d39c368b..50117066b0 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -263,6 +263,16 @@ object TestSubscriber { self } + /** + * Fluent DSL + * + * Expect a stream element during specified time, then timeout. + */ + def expectNext(d: FiniteDuration, element: I): Self = { + probe.expectMsg(d, OnNext(element)) + self + } + /** * Fluent DSL * diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index 447f83efb2..44a4849bfb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -10,7 +10,6 @@ import akka.stream.testkit.scaladsl._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props -import akka.stream.OverflowStrategy object ActorRefSinkSpec { case class Fw(ref: ActorRef) extends Actor { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 6d2bf2b4b3..02680ad0c3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -3,40 +3,116 @@ */ package akka.stream.scaladsl -import akka.stream.ActorMaterializer +import akka.stream.Attributes._ +import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } -import akka.stream.testkit.Utils._ +import akka.stream.{ DelayOverflowStrategy, ActorMaterializer } +import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.control.NoStackTrace class FlowDelaySpec extends AkkaSpec { implicit val materializer = ActorMaterializer() "A Delay" must { + "deliver elements with some time shift" in { + Await.result( + Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head), + 1200.millis) should ===(1 to 10) + } - "deliver element after time passed" in { + "deliver element after time passed from actual receiving element" in { Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) - .request(3) - .expectNoMsg(100.millis) - .expectNext(1) - .expectNoMsg(100.millis) - .expectNext(2) - .expectNoMsg(100.millis) - .expectNext(3) + .request(2) + .expectNoMsg(200.millis) //delay + .expectNext(200.millis, 1) //delayed element + .expectNext(100.millis, 2) //buffered element + .expectNoMsg(200.millis) + .request(1) + .expectNext(3) //buffered element .expectComplete() } - "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { + "deliver elements with delay for slow stream" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() - Source(1 to 3).delay(300.millis).to(Sink(c)).run() + val p = TestPublisher.manualProbe[Int]() + + Source(p).delay(300.millis).to(Sink(c)).run() val cSub = c.expectSubscription() - c.expectNoMsg(200.millis) + val pSub = p.expectSubscription() cSub.request(100) - (1 to 3) foreach { n ⇒ c.expectNext(n) } - c.expectComplete() + pSub.sendNext(1) c.expectNoMsg(200.millis) + c.expectNext(1) + pSub.sendNext(2) + c.expectNoMsg(200.millis) + c.expectNext(2) + pSub.sendComplete() + c.expectComplete() + } + + "drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===((1 to 15).toList :+ 20) + } + + "drop head for internal buffer if it's full in DropHead mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropHead).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===(5 to 20) + } + + "clear all for internal buffer if it's full in DropBuffer mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropBuffer).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===(17 to 20) + } + + "pass elements with delay through normally in backpressured mode" in assertAllStagesStopped { + Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).runWith(TestSink.probe[Int]) + .request(5) + .expectNoMsg(200.millis) + .expectNext(200.millis, 1) + .expectNoMsg(200.millis) + .expectNext(200.millis, 2) + .expectNoMsg(200.millis) + .expectNext(200.millis, 3) + } + + "fail on overflow in Fail mode" in assertAllStagesStopped { + Source(1 to 20).delay(300.millis, DelayOverflowStrategy.fail) + .withAttributes(inputBuffer(16, 16)) + .runWith(TestSink.probe[Int]) + .request(100) + .expectError(new DelayOverflowStrategy.Fail.BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!")) + + } + + "emit early when buffer is full and in EmitEarly mode" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + val p = TestPublisher.manualProbe[Int]() + + Source(p).delay(10.seconds, DelayOverflowStrategy.emitEarly).withAttributes(inputBuffer(16, 16)).to(Sink(c)).run() + val cSub = c.expectSubscription() + val pSub = p.expectSubscription() + cSub.request(20) + + for (i ← 1 to 16) pSub.sendNext(i) + c.expectNoMsg(300.millis) + pSub.sendNext(17) + c.expectNext(100.millis, 1) + pSub.sendError(new RuntimeException() with NoStackTrace) + c.expectError() } } } diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index dd1e860c84..25f0fb5c89 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -6,42 +6,45 @@ package akka.stream /** * Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element. */ -sealed abstract class OverflowStrategy +sealed abstract class OverflowStrategy extends Serializable +sealed trait DelayOverflowStrategy extends Serializable -object OverflowStrategy { +private[akka] trait BaseOverflowStrategy { /** * INTERNAL API */ - private[akka] final case object DropHead extends OverflowStrategy + private[akka] case object DropHead extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropTail extends OverflowStrategy + private[akka] case object DropTail extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropBuffer extends OverflowStrategy + private[akka] case object DropBuffer extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropNew extends OverflowStrategy + private[akka] case object DropNew extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object Backpressure extends OverflowStrategy + private[akka] case object Backpressure extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object Fail extends OverflowStrategy { + private[akka] case object Fail extends OverflowStrategy with DelayOverflowStrategy { final case class BufferOverflowException(msg: String) extends RuntimeException(msg) } +} +object OverflowStrategy extends BaseOverflowStrategy { /** * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. @@ -75,3 +78,48 @@ object OverflowStrategy { */ def fail: OverflowStrategy = Fail } + +object DelayOverflowStrategy extends BaseOverflowStrategy { + /** + * INTERNAL API + */ + private[akka] case object EmitEarly extends DelayOverflowStrategy + + /** + * If the buffer is full when a new element is available this strategy send next element downstream without waiting + */ + def emitEarly: DelayOverflowStrategy = EmitEarly + + /** + * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for + * the new element. + */ + def dropHead: DelayOverflowStrategy = DropHead + + /** + * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for + * the new element. + */ + def dropTail: DelayOverflowStrategy = DropTail + + /** + * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. + */ + def dropBuffer: DelayOverflowStrategy = DropBuffer + + /** + * If the buffer is full when a new element arrives, drops the new element. + */ + def dropNew: DelayOverflowStrategy = DropNew + + /** + * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until + * space becomes available in the buffer. + */ + def backpressure: DelayOverflowStrategy = Backpressure + + /** + * If the buffer is full when a new element is available this strategy completes the stream with failure. + */ + def fail: DelayOverflowStrategy = Fail +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala index eb3772ca61..1f7ed0e1e1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala @@ -53,7 +53,7 @@ private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: Over sendAck(false) } else if (!buffer.isFull) enqueueAndSendAck(elem) - else overflowStrategy match { + else (overflowStrategy: @unchecked) match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 897e4a6978..1e9c35ed4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -58,7 +58,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf log.debug("Dropping element because there is no downstream demand: [{}]", elem) else if (!buffer.isFull) buffer.enqueue(elem) - else overflowStrategy match { + else (overflowStrategy: @unchecked) match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 8a9ffc5949..cf2155696b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -5,7 +5,8 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } -import akka.stream.Attributes.LogLevels +import akka.stream.Attributes.{ InputBuffer, LogLevels } +import akka.stream.DelayOverflowStrategy.EmitEarly import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ @@ -14,10 +15,10 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy +import scala.concurrent.duration.{ FiniteDuration, _ } /** * INTERNAL API @@ -398,7 +399,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt else ctx.absorbTermination() val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { - overflowStrategy match { + (overflowStrategy: @unchecked) match { case DropHead ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.dropHead() buffer.enqueue(elem) @@ -857,31 +858,78 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS } } -private[stream] class Delay[T](d: FiniteDuration) extends SimpleLinearGraphStage[T] { +private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - var element: T = _ + val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val buffer = FixedSizeBuffer[(Long, T)](maxBuffer) // buffer has pairs timestamp with upstream element val timerName = "DelayedTimer" var willStop = false - setHandler(in, new InHandler { + setHandler(in, handler = new InHandler { override def onPush(): Unit = { - element = grab(in) - scheduleOnce("DelayedTimer", d) + if (buffer.isFull) (strategy: @unchecked) match { + case EmitEarly ⇒ + if (!isTimerActive(timerName)) + push(out, buffer.dequeue()._2) + else { + cancelTimer(timerName) + onTimer(timerName) + } + case DelayOverflowStrategy.DropHead ⇒ + buffer.dropHead() + grabElementAndSchedule(true) + case DelayOverflowStrategy.DropTail ⇒ + buffer.dropTail() + grabElementAndSchedule(true) + case DelayOverflowStrategy.DropNew ⇒ + grab(in) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + case DelayOverflowStrategy.DropBuffer ⇒ + buffer.clear() + grabElementAndSchedule(true) + case DelayOverflowStrategy.Fail ⇒ + failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $maxBuffer)!")) + case DelayOverflowStrategy.Backpressure ⇒ //do nothing here + } + else { + grabElementAndSchedule(strategy != DelayOverflowStrategy.Backpressure) + } } - override def onUpstreamFinish(): Unit = + + def grabElementAndSchedule(pullCondition: Boolean): Unit = { + buffer.enqueue((System.currentTimeMillis(), grab(in))) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + if (pullCondition) pull(in) + } + + override def onUpstreamFinish(): Unit = { if (isAvailable(out) && isTimerActive(timerName)) willStop = true else completeStage() + } }) setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) + override def onPull(): Unit = { + if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) + push(out, buffer.dequeue()._2) + + if (!willStop && !hasBeenPulled(in)) pull(in) + completeIfReady() + } }) + def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() + + def nextElementWaitTime(): Long = d.toMillis - (System.currentTimeMillis() - buffer.peek()._1) + final override protected def onTimer(key: Any): Unit = { - push(out, element) - if (willStop) - completeStage() + push(out, buffer.dequeue()._2) + if (!buffer.isEmpty) { + val waitTime = nextElementWaitTime() + if (waitTime > 0) scheduleOnce(timerName, waitTime.millis) + } + completeIfReady() } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6cdf1f2890..fc64d85b8f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -504,18 +504,30 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** - * Shifts emissions in time by a specified amount + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. * - * '''Emits when''' upstream emitted and configured time elapsed + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * - * '''Backpressures when''' downstream backpressures + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full * - * '''Completes when''' upstream completes + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full * - * '''Cancels when''' downstream completes + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration): javadsl.Flow[In, Out, Mat] = - new Flow(delegate.delay(of)) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.delay(of, strategy)) /** * Discard the given number of elements at the beginning of the stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index cdc4a5ea84..a7340b5c0e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -710,18 +710,30 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** - * Shifts emissions in time by a specified amount + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. * - * '''Emits when''' upstream emitted and configured time elapsed + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * - * '''Backpressures when''' downstream backpressures + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full * - * '''Completes when''' upstream completes + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full * - * '''Cancels when''' downstream completes + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration): javadsl.Source[Out, Mat] = - new Source(delegate.delay(of)) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): javadsl.Source[Out, Mat] = + new Source(delegate.delay(of, strategy)) /** * Discard the given number of elements at the beginning of the stream. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f9bd47a448..b44ed25960 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -717,18 +717,30 @@ trait FlowOps[+Out, +Mat] { } /** - * Shifts emissions in time by a specified amount + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. * - * '''Emits when''' upstream emitted and configured time elapsed + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * - * '''Backpressures when''' downstream backpressures + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full * - * '''Completes when''' upstream completes + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full * - * '''Cancels when''' downstream completes + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration): Repr[Out, Mat] = - via(new Delay[Out](of).withAttributes(name("delay"))) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out, Mat] = + via(new Delay[Out](of, strategy).withAttributes(name("delay"))) /** * Discard the given number of elements at the beginning of the stream.