diff --git a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala index c997b6f4c7..04773f199d 100644 --- a/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala +++ b/akka-bench-jmh-dev/src/main/scala/akka/stream/InterpreterBenchmark.scala @@ -79,8 +79,6 @@ object InterpreterBenchmark { if (expected > 0) pull(in) // Otherwise do nothing, it will exit the interpreter } - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) def requestOne(): Unit = pull(in) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala new file mode 100644 index 0000000000..6d2bf2b4b3 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -0,0 +1,42 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.ActorMaterializer +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } +import akka.stream.testkit.Utils._ + +import scala.concurrent.duration._ + +class FlowDelaySpec extends AkkaSpec { + + implicit val materializer = ActorMaterializer() + + "A Delay" must { + + "deliver element after time passed" in { + Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) + .request(3) + .expectNoMsg(100.millis) + .expectNext(1) + .expectNoMsg(100.millis) + .expectNext(2) + .expectNoMsg(100.millis) + .expectNext(3) + .expectComplete() + } + + "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { + val c = TestSubscriber.manualProbe[Int]() + Source(1 to 3).delay(300.millis).to(Sink(c)).run() + val cSub = c.expectSubscription() + c.expectNoMsg(200.millis) + cSub.request(100) + (1 to 3) foreach { n ⇒ c.expectNext(n) } + c.expectComplete() + c.expectNoMsg(200.millis) + } + } +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala index 536fb9855f..25f63ea9ad 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowTakeWithinSpec.scala @@ -37,7 +37,7 @@ class FlowTakeWithinSpec extends AkkaSpec { c.expectNoMsg(200.millis) } - "deliver bufferd elements onComplete before the timeout" in assertAllStagesStopped { + "deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { val c = TestSubscriber.manualProbe[Int]() Source(1 to 3).takeWithin(1.second).to(Sink(c)).run() val cSub = c.expectSubscription() diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 9fcff50e7a..8a9ffc5949 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -857,13 +857,42 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS } } +private[stream] class Delay[T](d: FiniteDuration) extends SimpleLinearGraphStage[T] { + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { + var element: T = _ + val timerName = "DelayedTimer" + var willStop = false + + setHandler(in, new InHandler { + override def onPush(): Unit = { + element = grab(in) + scheduleOnce("DelayedTimer", d) + } + override def onUpstreamFinish(): Unit = + if (isAvailable(out) && isTimerActive(timerName)) willStop = true + else completeStage() + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = pull(in) + }) + + final override protected def onTimer(key: Any): Unit = { + push(out, element) + if (willStop) + completeStage() + } + } + + override def toString = "Delay" +} + private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { setHandler(in, new InHandler { override def onPush(): Unit = push(out, grab(in)) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) setHandler(out, new OutHandler { @@ -887,8 +916,6 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea override def onPush(): Unit = if (allow) push(out, grab(in)) else pull(in) - override def onUpstreamFinish(): Unit = completeStage() - override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex) }) setHandler(out, new OutHandler { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 4b939b5bdb..6cdf1f2890 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -503,6 +503,20 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] = new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts emissions in time by a specified amount + * + * '''Emits when''' upstream emitted and configured time elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream completes + */ + def delay(of: FiniteDuration): javadsl.Flow[In, Out, Mat] = + new Flow(delegate.delay(of)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index b0f98bc9fd..cdc4a5ea84 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,7 +8,9 @@ import java.io.{ OutputStream, InputStream, File } import akka.actor.{ ActorRef, Cancellable, Props } import akka.event.LoggingAdapter import akka.japi.{ Pair, Util, function } +import akka.stream.Attributes._ import akka.stream._ +import akka.stream.impl.fusing.Delay import akka.stream.impl.{ ConstantFun, StreamLayout } import akka.stream.stage.Stage import akka.util.ByteString @@ -707,6 +709,20 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] = new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step + /** + * Shifts emissions in time by a specified amount + * + * '''Emits when''' upstream emitted and configured time elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream completes + */ + def delay(of: FiniteDuration): javadsl.Source[Out, Mat] = + new Source(delegate.delay(of)) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b5708b9308..f9bd47a448 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -8,7 +8,7 @@ import akka.stream.Attributes._ import akka.stream._ import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage } import akka.stream.impl.StreamLayout.{ EmptyModule, Module } -import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered } +import akka.stream.impl.fusing._ import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers } import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage } import akka.stream.stage._ @@ -716,6 +716,20 @@ trait FlowOps[+Out, +Mat] { via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin"))) } + /** + * Shifts emissions in time by a specified amount + * + * '''Emits when''' upstream emitted and configured time elapsed + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream completes + */ + def delay(of: FiniteDuration): Repr[Out, Mat] = + via(new Delay[Out](of).withAttributes(name("delay"))) + /** * Discard the given number of elements at the beginning of the stream. * No elements will be dropped if `n` is zero or negative. @@ -881,7 +895,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' prefix elements has been consumed and substream has been consumed * * '''Cancels when''' downstream cancels or substream cancels - * */ def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, Unit]), Mat] = deprecatedAndThen(PrefixAndTail(n)) @@ -956,7 +969,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes * * '''Cancels when''' downstream cancels and substreams cancel - * */ def splitWhen[U >: Out](p: Out ⇒ Boolean): Repr[Source[U, Unit], Mat] = deprecatedAndThen(Split.when(p.asInstanceOf[Any ⇒ Boolean])) @@ -1008,7 +1020,6 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels - * */ def flatMapConcat[T](f: Out ⇒ Source[T, _]): Repr[T, Mat] = deprecatedAndThen(ConcatAll(f.asInstanceOf[Any ⇒ Source[Any, _]]))