From 270ef4135973af672dfaaa8e6f9054ec4f4bbd39 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Sat, 21 Nov 2015 13:48:10 -0500 Subject: [PATCH 1/4] =str #18556 add delay combinator --- .../akka/stream/InterpreterBenchmark.scala | 2 - .../akka/stream/scaladsl/FlowDelaySpec.scala | 42 +++++++++++++++++++ .../stream/scaladsl/FlowTakeWithinSpec.scala | 2 +- .../scala/akka/stream/impl/fusing/Ops.scala | 35 ++++++++++++++-- .../main/scala/akka/stream/javadsl/Flow.scala | 14 +++++++ .../scala/akka/stream/javadsl/Source.scala | 16 +++++++ .../scala/akka/stream/scaladsl/Flow.scala | 19 +++++++-- 7 files changed, 119 insertions(+), 11 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala index c997b6f4c7..04773f199d 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -79,8 +79,6 @@ object InterpreterBenchmark { if (expected > 0) pull(in) // Otherwise do nothing, it will exit the interpreter } - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) def requestOne(): Unit = pull(in) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala new file mode 100644 index 0000000000..6d2bf2b4b3 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorMaterializer +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } +import akka.stream.testkit.Utils._ + +import scala.concurrent.duration._ + +class FlowDelaySpec extends AkkaSpec { + + implicit val materializer = ActorMaterializer() + + "A Delay" must { + + "deliver element after time passed" in { + Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) + .request(3) + .expectNoMsg(100.millis) + .expectNext(1) + .expectNoMsg(100.millis) + .expectNext(2) + .expectNoMsg(100.millis) + .expectNext(3) + .expectComplete() + } + + "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + Source(1 to 3).delay(300.millis).to(Sink(c)).run() + val cSub = c.expectSubscription() + c.expectNoMsg(200.millis) + cSub.request(100) + (1 to 3) foreach { n ⇒ c.expectNext(n) } + c.expectComplete() + c.expectNoMsg(200.millis) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala index 536fb9855f..25f63ea9ad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala @@ -37,7 +37,7 @@ class FlowTakeWithinSpec extends AkkaSpec { c.expectNoMsg(200.millis) } - "deliver bufferd elements onComplete before the timeout" in assertAllStagesStopped { + "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() Source(1 to 3).takeWithin(1.second).to(Sink(c)).run() val cSub = c.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9fcff50e7a..8a9ffc5949 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -857,13 +857,42 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS } } +private[stream] class Delay[T](d: FiniteDuration) extends SimpleLinearGraphStage[T] { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + var element: T = _ + val timerName = "DelayedTimer" + var willStop = false + + setHandler(in, new InHandler { + override def onPush(): Unit = { + element = grab(in) + scheduleOnce("DelayedTimer", d) + } + override def onUpstreamFinish(): Unit = + if (isAvailable(out) && isTimerActive(timerName)) willStop = true + else completeStage() + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + + final override protected def onTimer(key: Any): Unit = { + push(out, element) + if (willStop) + completeStage() + } + } + + override def toString = "Delay" +} + private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) setHandler(out, new OutHandler { @@ -887,8 +916,6 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea override def onPush(): Unit = if (allow) push(out, grab(in)) else pull(in) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) setHandler(out, new OutHandler { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 4b939b5bdb..6cdf1f2890 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -503,6 +503,20 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts emissions in time by a specified amount + * + * '''Emits when''' upstream emitted and configured time elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream completes + */ + def delay(of: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.delay(of)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b0f98bc9fd..cdc4a5ea84 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,7 +8,9 @@ import java.io.{ OutputStream, InputStream, File } import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } +import akka.stream.Attributes._ import akka.stream._ +import akka.stream.impl.fusing.Delay import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.stage.Stage import akka.util.ByteString @@ -707,6 +709,20 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts emissions in time by a specified amount + * + * '''Emits when''' upstream emitted and configured time elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream completes + */ + def delay(of: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.delay(of)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b5708b9308..f9bd47a448 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -8,7 +8,7 @@ import akka.stream.Attributes._ import akka.stream._ import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } +import akka.stream.impl.fusing._ import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage._ @@ -716,6 +716,20 @@ trait FlowOps[+Out, +Mat] { via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin"))) } + /** + * Shifts emissions in time by a specified amount + * + * '''Emits when''' upstream emitted and configured time elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream completes + */ + def delay(of: FiniteDuration): Repr[Out, Mat] = + via(new Delay[Out](of).withAttributes(name("delay"))) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -881,7 +895,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' prefix elements has been consumed and substream has been consumed * * '''Cancels when''' downstream cancels or substream cancels - * */ def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, Unit]), Mat] = deprecatedAndThen(PrefixAndTail(n)) @@ -956,7 +969,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels and substreams cancel - * */ def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U, Unit], Mat] = deprecatedAndThen(Split.when(p.asInstanceOf[Any ⇒ Boolean])) @@ -1008,7 +1020,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels - * */ def flatMapConcat[T](f: Out ⇒ Source[T, _]): Repr[T, Mat] = deprecatedAndThen(ConcatAll(f.asInstanceOf[Any ⇒ Source[Any, _]])) From 83d3143236bc61a6555394a36834e626601aeff2 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Wed, 25 Nov 2015 21:29:35 -0500 Subject: [PATCH 2/4] +str #18556 add delay combinator --- .../akka/stream/testkit/StreamTestKit.scala | 10 ++ .../stream/scaladsl/ActorRefSinkSpec.scala | 1 - .../akka/stream/scaladsl/FlowDelaySpec.scala | 106 +++++++++++++++--- .../scala/akka/stream/OverflowStrategy.scala | 64 +++++++++-- .../stream/impl/AcknowledgePublisher.scala | 2 +- .../stream/impl/ActorRefSourceActor.scala | 2 +- .../scala/akka/stream/impl/fusing/Ops.scala | 74 +++++++++--- .../main/scala/akka/stream/javadsl/Flow.scala | 26 +++-- .../scala/akka/stream/javadsl/Source.scala | 26 +++-- .../scala/akka/stream/scaladsl/Flow.scala | 26 +++-- 10 files changed, 277 insertions(+), 60 deletions(-) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 94d39c368b..50117066b0 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -263,6 +263,16 @@ object TestSubscriber { self } + /** + * Fluent DSL + * + * Expect a stream element during specified time, then timeout. + */ + def expectNext(d: FiniteDuration, element: I): Self = { + probe.expectMsg(d, OnNext(element)) + self + } + /** * Fluent DSL * diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala index 447f83efb2..44a4849bfb 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/ActorRefSinkSpec.scala @@ -10,7 +10,6 @@ import akka.stream.testkit.scaladsl._ import akka.actor.Actor import akka.actor.ActorRef import akka.actor.Props -import akka.stream.OverflowStrategy object ActorRefSinkSpec { case class Fw(ref: ActorRef) extends Actor { diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 6d2bf2b4b3..02680ad0c3 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -3,40 +3,116 @@ */ package akka.stream.scaladsl -import akka.stream.ActorMaterializer +import akka.stream.Attributes._ +import akka.stream.testkit.Utils._ import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } -import akka.stream.testkit.Utils._ +import akka.stream.{ DelayOverflowStrategy, ActorMaterializer } +import scala.concurrent.Await import scala.concurrent.duration._ +import scala.util.control.NoStackTrace class FlowDelaySpec extends AkkaSpec { implicit val materializer = ActorMaterializer() "A Delay" must { + "deliver elements with some time shift" in { + Await.result( + Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head), + 1200.millis) should ===(1 to 10) + } - "deliver element after time passed" in { + "deliver element after time passed from actual receiving element" in { Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) - .request(3) - .expectNoMsg(100.millis) - .expectNext(1) - .expectNoMsg(100.millis) - .expectNext(2) - .expectNoMsg(100.millis) - .expectNext(3) + .request(2) + .expectNoMsg(200.millis) //delay + .expectNext(200.millis, 1) //delayed element + .expectNext(100.millis, 2) //buffered element + .expectNoMsg(200.millis) + .request(1) + .expectNext(3) //buffered element .expectComplete() } - "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { + "deliver elements with delay for slow stream" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() - Source(1 to 3).delay(300.millis).to(Sink(c)).run() + val p = TestPublisher.manualProbe[Int]() + + Source(p).delay(300.millis).to(Sink(c)).run() val cSub = c.expectSubscription() - c.expectNoMsg(200.millis) + val pSub = p.expectSubscription() cSub.request(100) - (1 to 3) foreach { n ⇒ c.expectNext(n) } - c.expectComplete() + pSub.sendNext(1) c.expectNoMsg(200.millis) + c.expectNext(1) + pSub.sendNext(2) + c.expectNoMsg(200.millis) + c.expectNext(2) + pSub.sendComplete() + c.expectComplete() + } + + "drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===((1 to 15).toList :+ 20) + } + + "drop head for internal buffer if it's full in DropHead mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropHead).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===(5 to 20) + } + + "clear all for internal buffer if it's full in DropBuffer mode" in assertAllStagesStopped { + Await.result( + Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropBuffer).withAttributes(inputBuffer(16, 16)) + .grouped(100) + .runWith(Sink.head), + 1200.millis) should ===(17 to 20) + } + + "pass elements with delay through normally in backpressured mode" in assertAllStagesStopped { + Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).runWith(TestSink.probe[Int]) + .request(5) + .expectNoMsg(200.millis) + .expectNext(200.millis, 1) + .expectNoMsg(200.millis) + .expectNext(200.millis, 2) + .expectNoMsg(200.millis) + .expectNext(200.millis, 3) + } + + "fail on overflow in Fail mode" in assertAllStagesStopped { + Source(1 to 20).delay(300.millis, DelayOverflowStrategy.fail) + .withAttributes(inputBuffer(16, 16)) + .runWith(TestSink.probe[Int]) + .request(100) + .expectError(new DelayOverflowStrategy.Fail.BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!")) + + } + + "emit early when buffer is full and in EmitEarly mode" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + val p = TestPublisher.manualProbe[Int]() + + Source(p).delay(10.seconds, DelayOverflowStrategy.emitEarly).withAttributes(inputBuffer(16, 16)).to(Sink(c)).run() + val cSub = c.expectSubscription() + val pSub = p.expectSubscription() + cSub.request(20) + + for (i ← 1 to 16) pSub.sendNext(i) + c.expectNoMsg(300.millis) + pSub.sendNext(17) + c.expectNext(100.millis, 1) + pSub.sendError(new RuntimeException() with NoStackTrace) + c.expectError() } } } diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala index dd1e860c84..25f0fb5c89 100644 --- a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -6,42 +6,45 @@ package akka.stream /** * Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element. */ -sealed abstract class OverflowStrategy +sealed abstract class OverflowStrategy extends Serializable +sealed trait DelayOverflowStrategy extends Serializable -object OverflowStrategy { +private[akka] trait BaseOverflowStrategy { /** * INTERNAL API */ - private[akka] final case object DropHead extends OverflowStrategy + private[akka] case object DropHead extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropTail extends OverflowStrategy + private[akka] case object DropTail extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropBuffer extends OverflowStrategy + private[akka] case object DropBuffer extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object DropNew extends OverflowStrategy + private[akka] case object DropNew extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object Backpressure extends OverflowStrategy + private[akka] case object Backpressure extends OverflowStrategy with DelayOverflowStrategy /** * INTERNAL API */ - private[akka] final case object Fail extends OverflowStrategy { + private[akka] case object Fail extends OverflowStrategy with DelayOverflowStrategy { final case class BufferOverflowException(msg: String) extends RuntimeException(msg) } +} +object OverflowStrategy extends BaseOverflowStrategy { /** * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * the new element. @@ -75,3 +78,48 @@ object OverflowStrategy { */ def fail: OverflowStrategy = Fail } + +object DelayOverflowStrategy extends BaseOverflowStrategy { + /** + * INTERNAL API + */ + private[akka] case object EmitEarly extends DelayOverflowStrategy + + /** + * If the buffer is full when a new element is available this strategy send next element downstream without waiting + */ + def emitEarly: DelayOverflowStrategy = EmitEarly + + /** + * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for + * the new element. + */ + def dropHead: DelayOverflowStrategy = DropHead + + /** + * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for + * the new element. + */ + def dropTail: DelayOverflowStrategy = DropTail + + /** + * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. + */ + def dropBuffer: DelayOverflowStrategy = DropBuffer + + /** + * If the buffer is full when a new element arrives, drops the new element. + */ + def dropNew: DelayOverflowStrategy = DropNew + + /** + * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until + * space becomes available in the buffer. + */ + def backpressure: DelayOverflowStrategy = Backpressure + + /** + * If the buffer is full when a new element is available this strategy completes the stream with failure. + */ + def fail: DelayOverflowStrategy = Fail +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala index eb3772ca61..1f7ed0e1e1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/AcknowledgePublisher.scala @@ -53,7 +53,7 @@ private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: Over sendAck(false) } else if (!buffer.isFull) enqueueAndSendAck(elem) - else overflowStrategy match { + else (overflowStrategy: @unchecked) match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index 897e4a6978..1e9c35ed4e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -58,7 +58,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf log.debug("Dropping element because there is no downstream demand: [{}]", elem) else if (!buffer.isFull) buffer.enqueue(elem) - else overflowStrategy match { + else (overflowStrategy: @unchecked) match { case DropHead ⇒ log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") buffer.dropHead() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 8a9ffc5949..cf2155696b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -5,7 +5,8 @@ package akka.stream.impl.fusing import akka.event.Logging.LogLevel import akka.event.{ LogSource, Logging, LoggingAdapter } -import akka.stream.Attributes.LogLevels +import akka.stream.Attributes.{ InputBuffer, LogLevels } +import akka.stream.DelayOverflowStrategy.EmitEarly import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.stage._ @@ -14,10 +15,10 @@ import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.Future -import scala.concurrent.duration.FiniteDuration import scala.util.control.NonFatal import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy +import scala.concurrent.duration.{ FiniteDuration, _ } /** * INTERNAL API @@ -398,7 +399,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt else ctx.absorbTermination() val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = { - overflowStrategy match { + (overflowStrategy: @unchecked) match { case DropHead ⇒ (ctx, elem) ⇒ if (buffer.isFull) buffer.dropHead() buffer.enqueue(elem) @@ -857,31 +858,78 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS } } -private[stream] class Delay[T](d: FiniteDuration) extends SimpleLinearGraphStage[T] { +private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - var element: T = _ + val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val buffer = FixedSizeBuffer[(Long, T)](maxBuffer) // buffer has pairs timestamp with upstream element val timerName = "DelayedTimer" var willStop = false - setHandler(in, new InHandler { + setHandler(in, handler = new InHandler { override def onPush(): Unit = { - element = grab(in) - scheduleOnce("DelayedTimer", d) + if (buffer.isFull) (strategy: @unchecked) match { + case EmitEarly ⇒ + if (!isTimerActive(timerName)) + push(out, buffer.dequeue()._2) + else { + cancelTimer(timerName) + onTimer(timerName) + } + case DelayOverflowStrategy.DropHead ⇒ + buffer.dropHead() + grabElementAndSchedule(true) + case DelayOverflowStrategy.DropTail ⇒ + buffer.dropTail() + grabElementAndSchedule(true) + case DelayOverflowStrategy.DropNew ⇒ + grab(in) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + case DelayOverflowStrategy.DropBuffer ⇒ + buffer.clear() + grabElementAndSchedule(true) + case DelayOverflowStrategy.Fail ⇒ + failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $maxBuffer)!")) + case DelayOverflowStrategy.Backpressure ⇒ //do nothing here + } + else { + grabElementAndSchedule(strategy != DelayOverflowStrategy.Backpressure) + } } - override def onUpstreamFinish(): Unit = + + def grabElementAndSchedule(pullCondition: Boolean): Unit = { + buffer.enqueue((System.currentTimeMillis(), grab(in))) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) + if (pullCondition) pull(in) + } + + override def onUpstreamFinish(): Unit = { if (isAvailable(out) && isTimerActive(timerName)) willStop = true else completeStage() + } }) setHandler(out, new OutHandler { - override def onPull(): Unit = pull(in) + override def onPull(): Unit = { + if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0) + push(out, buffer.dequeue()._2) + + if (!willStop && !hasBeenPulled(in)) pull(in) + completeIfReady() + } }) + def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() + + def nextElementWaitTime(): Long = d.toMillis - (System.currentTimeMillis() - buffer.peek()._1) + final override protected def onTimer(key: Any): Unit = { - push(out, element) - if (willStop) - completeStage() + push(out, buffer.dequeue()._2) + if (!buffer.isEmpty) { + val waitTime = nextElementWaitTime() + if (waitTime > 0) scheduleOnce(timerName, waitTime.millis) + } + completeIfReady() } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 6cdf1f2890..fc64d85b8f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -504,18 +504,30 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** - * Shifts emissions in time by a specified amount + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. * - * '''Emits when''' upstream emitted and configured time elapsed + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * - * '''Backpressures when''' downstream backpressures + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full * - * '''Completes when''' upstream completes + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full * - * '''Cancels when''' downstream completes + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration): javadsl.Flow[In, Out, Mat] = - new Flow(delegate.delay(of)) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.delay(of, strategy)) /** * Discard the given number of elements at the beginning of the stream. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index cdc4a5ea84..a7340b5c0e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -710,18 +710,30 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step /** - * Shifts emissions in time by a specified amount + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. * - * '''Emits when''' upstream emitted and configured time elapsed + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * - * '''Backpressures when''' downstream backpressures + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full * - * '''Completes when''' upstream completes + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full * - * '''Cancels when''' downstream completes + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration): javadsl.Source[Out, Mat] = - new Source(delegate.delay(of)) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): javadsl.Source[Out, Mat] = + new Source(delegate.delay(of, strategy)) /** * Discard the given number of elements at the beginning of the stream. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f9bd47a448..b44ed25960 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -717,18 +717,30 @@ trait FlowOps[+Out, +Mat] { } /** - * Shifts emissions in time by a specified amount + * Shifts elements emission in time by a specified amount. It allows to store elements + * in internal buffer while waiting for next element to be emitted. Depending on the defined + * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if + * there is no space available in the buffer. * - * '''Emits when''' upstream emitted and configured time elapsed + * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * - * '''Backpressures when''' downstream backpressures + * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed + * * EmitEarly - strategy do not wait to emit element if buffer is full * - * '''Completes when''' upstream completes + * '''Backpressures when''' depending on OverflowStrategy + * * Backpressure - backpressures when buffer is full + * * DropHead, DropTail, DropBuffer - never backpressures + * * Fail - fails the stream if buffer gets full * - * '''Cancels when''' downstream completes + * '''Completes when''' upstream completes and buffered elements has been drained + * + * '''Cancels when''' downstream cancels + * + * @param of time to shift all messages + * @param strategy Strategy that is used when incoming elements cannot fit inside the buffer */ - def delay(of: FiniteDuration): Repr[Out, Mat] = - via(new Delay[Out](of).withAttributes(name("delay"))) + def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out, Mat] = + via(new Delay[Out](of, strategy).withAttributes(name("delay"))) /** * Discard the given number of elements at the beginning of the stream. From d5cae10a67625807c60255ae8ba1bb7686dd3a9b Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Fri, 27 Nov 2015 15:46:35 -0500 Subject: [PATCH 3/4] +str #18556 add delay combinator --- .../akka/stream/testkit/StreamTestKit.scala | 2 +- .../akka/stream/scaladsl/FlowDelaySpec.scala | 9 +++++++++ .../scala/akka/stream/impl/fusing/Ops.scala | 20 +++++++++---------- 3 files changed, 20 insertions(+), 11 deletions(-) diff --git a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala index 50117066b0..dba0ed61b8 100644 --- a/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream-testkit/src/main/scala/akka/stream/testkit/StreamTestKit.scala @@ -266,7 +266,7 @@ object TestSubscriber { /** * Fluent DSL * - * Expect a stream element during specified time, then timeout. + * Expect a stream element during specified time or timeout. */ def expectNext(d: FiniteDuration, element: I): Self = { probe.expectMsg(d, OnNext(element)) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 02680ad0c3..8099575a6e 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -24,6 +24,15 @@ class FlowDelaySpec extends AkkaSpec { 1200.millis) should ===(1 to 10) } + "add delay to initialDelay if exists upstream" in { + Source(1 to 10).initialDelay(1.second).delay(1.second).runWith(TestSink.probe[Int]) + .request(10) + .expectNoMsg(1800.millis) + .expectNext(300.millis, 1) + .expectNextN(2 to 10) + .expectComplete() + } + "deliver element after time passed from actual receiving element" in { Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) .request(2) diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index cf2155696b..d98be5f3e0 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -861,8 +861,8 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { - val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - val buffer = FixedSizeBuffer[(Long, T)](maxBuffer) // buffer has pairs timestamp with upstream element + val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element val timerName = "DelayedTimer" var willStop = false @@ -878,28 +878,28 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg } case DelayOverflowStrategy.DropHead ⇒ buffer.dropHead() - grabElementAndSchedule(true) + grabAndPull(true) case DelayOverflowStrategy.DropTail ⇒ buffer.dropTail() - grabElementAndSchedule(true) + grabAndPull(true) case DelayOverflowStrategy.DropNew ⇒ grab(in) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) case DelayOverflowStrategy.DropBuffer ⇒ buffer.clear() - grabElementAndSchedule(true) + grabAndPull(true) case DelayOverflowStrategy.Fail ⇒ - failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $maxBuffer)!")) - case DelayOverflowStrategy.Backpressure ⇒ //do nothing here + failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!")) + case DelayOverflowStrategy.Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } else { - grabElementAndSchedule(strategy != DelayOverflowStrategy.Backpressure) + grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1) + if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } } - def grabElementAndSchedule(pullCondition: Boolean): Unit = { + def grabAndPull(pullCondition: Boolean): Unit = { buffer.enqueue((System.currentTimeMillis(), grab(in))) - if (!isTimerActive(timerName)) scheduleOnce(timerName, d) if (pullCondition) pull(in) } From 5aa83594fae4ba8ab42996559ec3ed33a618f63d Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Wed, 2 Dec 2015 14:58:30 -0500 Subject: [PATCH 4/4] +str #18556 add delay combinator --- .../src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala | 2 +- .../src/main/scala/akka/stream/impl/fusing/Ops.scala | 6 +++--- akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala | 2 ++ akka-stream/src/main/scala/akka/stream/javadsl/Source.scala | 2 ++ akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala | 2 ++ 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 8099575a6e..72939734b4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -120,8 +120,8 @@ class FlowDelaySpec extends AkkaSpec { c.expectNoMsg(300.millis) pSub.sendNext(17) c.expectNext(100.millis, 1) + //fail will terminate despite of non empty internal buffer pSub.sendError(new RuntimeException() with NoStackTrace) - c.expectError() } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d98be5f3e0..cb2b9f0c14 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -899,7 +899,7 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg } def grabAndPull(pullCondition: Boolean): Unit = { - buffer.enqueue((System.currentTimeMillis(), grab(in))) + buffer.enqueue((System.nanoTime(), grab(in))) if (pullCondition) pull(in) } @@ -921,13 +921,13 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() - def nextElementWaitTime(): Long = d.toMillis - (System.currentTimeMillis() - buffer.peek()._1) + def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000 final override protected def onTimer(key: Any): Unit = { push(out, buffer.dequeue()._2) if (!buffer.isEmpty) { val waitTime = nextElementWaitTime() - if (waitTime > 0) scheduleOnce(timerName, waitTime.millis) + if (waitTime > 10) scheduleOnce(timerName, waitTime.millis) } completeIfReady() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index fc64d85b8f..44573d56d6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -509,6 +509,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if * there is no space available in the buffer. * + * Delay precession is 10ms to avoid unnecessary timer scheduling cycles + * * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index a7340b5c0e..d230c64616 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -715,6 +715,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if * there is no space available in the buffer. * + * Delay precession is 10ms to avoid unnecessary timer scheduling cycles + * * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b44ed25960..50a215070b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -722,6 +722,8 @@ trait FlowOps[+Out, +Mat] { * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if * there is no space available in the buffer. * + * Delay precession is 10ms to avoid unnecessary timer scheduling cycles + * * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed