+str #18556 add delay combinator

This commit is contained in:
Alexander Golubev 2015-12-02 14:58:30 -05:00
parent d5cae10a67
commit 5aa83594fa
5 changed files with 10 additions and 4 deletions

View file

@ -120,8 +120,8 @@ class FlowDelaySpec extends AkkaSpec {
c.expectNoMsg(300.millis)
pSub.sendNext(17)
c.expectNext(100.millis, 1)
//fail will terminate despite of non empty internal buffer
pSub.sendError(new RuntimeException() with NoStackTrace)
c.expectError()
}
}
}

View file

@ -899,7 +899,7 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg
}
def grabAndPull(pullCondition: Boolean): Unit = {
buffer.enqueue((System.currentTimeMillis(), grab(in)))
buffer.enqueue((System.nanoTime(), grab(in)))
if (pullCondition) pull(in)
}
@ -921,13 +921,13 @@ private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrateg
def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage()
def nextElementWaitTime(): Long = d.toMillis - (System.currentTimeMillis() - buffer.peek()._1)
def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000
final override protected def onTimer(key: Any): Unit = {
push(out, buffer.dequeue()._2)
if (!buffer.isEmpty) {
val waitTime = nextElementWaitTime()
if (waitTime > 0) scheduleOnce(timerName, waitTime.millis)
if (waitTime > 10) scheduleOnce(timerName, waitTime.millis)
}
completeIfReady()
}

View file

@ -509,6 +509,8 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
*
* Delay precession is 10ms to avoid unnecessary timer scheduling cycles
*
* Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed

View file

@ -715,6 +715,8 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
*
* Delay precession is 10ms to avoid unnecessary timer scheduling cycles
*
* Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed

View file

@ -722,6 +722,8 @@ trait FlowOps[+Out, +Mat] {
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
*
* Delay precession is 10ms to avoid unnecessary timer scheduling cycles
*
* Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
*
* '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed