From 5aa83594fae4ba8ab42996559ec3ed33a618f63d Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Wed, 2 Dec 2015 14:58:30 -0500 Subject: [PATCH] +str #18556 add delay combinator --- .../src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala | 2 +- .../src/main/scala/akka/stream/impl/fusing/Ops.scala | 6 +++--- akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala | 2 ++ akka-stream/src/main/scala/akka/stream/javadsl/Source.scala | 2 ++ akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala | 2 ++ 5 files changed, 10 insertions(+), 4 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala index 8099575a6e..72939734b4 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowDelaySpec.scala @@ -120,8 +120,8 @@ class FlowDelaySpec extends AkkaSpec { c.expectNoMsg(300.millis) pSub.sendNext(17) c.expectNext(100.millis, 1) + //fail will terminate despite of non empty internal buffer pSub.sendError(new RuntimeException() with NoStackTrace) - c.expectError() } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index d98be5f3e0..cb2b9f0c14 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -899,7 +899,7 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg } def grabAndPull(pullCondition: Boolean): Unit = { - buffer.enqueue((System.currentTimeMillis(), grab(in))) + buffer.enqueue((System.nanoTime(), grab(in))) if (pullCondition) pull(in) } @@ -921,13 +921,13 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage() - def nextElementWaitTime(): Long = d.toMillis - (System.currentTimeMillis() - buffer.peek()._1) + def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000 final override protected def onTimer(key: Any): Unit = { push(out, buffer.dequeue()._2) if (!buffer.isEmpty) { val waitTime = nextElementWaitTime() - if (waitTime > 0) scheduleOnce(timerName, waitTime.millis) + if (waitTime > 10) scheduleOnce(timerName, waitTime.millis) } completeIfReady() } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index fc64d85b8f..44573d56d6 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -509,6 +509,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if * there is no space available in the buffer. * + * Delay precession is 10ms to avoid unnecessary timer scheduling cycles + * * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index a7340b5c0e..d230c64616 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -715,6 +715,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if * there is no space available in the buffer. * + * Delay precession is 10ms to avoid unnecessary timer scheduling cycles + * * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index b44ed25960..50a215070b 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -722,6 +722,8 @@ trait FlowOps[+Out, +Mat] { * [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if * there is no space available in the buffer. * + * Delay precession is 10ms to avoid unnecessary timer scheduling cycles + * * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)` * * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed