=str #18556 add delay combinator

This commit is contained in:
Alexander Golubev 2015-11-21 13:48:10 -05:00
parent ce2d666c06
commit 270ef41359
7 changed files with 119 additions and 11 deletions

View file

@ -79,8 +79,6 @@ object InterpreterBenchmark {
if (expected > 0) pull(in)
// Otherwise do nothing, it will exit the interpreter
}
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
def requestOne(): Unit = pull(in)

View file

@ -0,0 +1,42 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.ActorMaterializer
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils._
import scala.concurrent.duration._
class FlowDelaySpec extends AkkaSpec {
implicit val materializer = ActorMaterializer()
"A Delay" must {
"deliver element after time passed" in {
Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int])
.request(3)
.expectNoMsg(100.millis)
.expectNext(1)
.expectNoMsg(100.millis)
.expectNext(2)
.expectNoMsg(100.millis)
.expectNext(3)
.expectComplete()
}
"deliver buffered elements onComplete before the timeout" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]()
Source(1 to 3).delay(300.millis).to(Sink(c)).run()
val cSub = c.expectSubscription()
c.expectNoMsg(200.millis)
cSub.request(100)
(1 to 3) foreach { n c.expectNext(n) }
c.expectComplete()
c.expectNoMsg(200.millis)
}
}
}

View file

@ -37,7 +37,7 @@ class FlowTakeWithinSpec extends AkkaSpec {
c.expectNoMsg(200.millis)
}
"deliver bufferd elements onComplete before the timeout" in assertAllStagesStopped {
"deliver buffered elements onComplete before the timeout" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]()
Source(1 to 3).takeWithin(1.second).to(Sink(c)).run()
val cSub = c.expectSubscription()

View file

@ -857,13 +857,42 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
}
}
private[stream] class Delay[T](d: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
var element: T = _
val timerName = "DelayedTimer"
var willStop = false
setHandler(in, new InHandler {
override def onPush(): Unit = {
element = grab(in)
scheduleOnce("DelayedTimer", d)
}
override def onUpstreamFinish(): Unit =
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
else completeStage()
})
setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in)
})
final override protected def onTimer(key: Any): Unit = {
push(out, element)
if (willStop)
completeStage()
}
}
override def toString = "Delay"
}
private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
setHandler(in, new InHandler {
override def onPush(): Unit = push(out, grab(in))
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
setHandler(out, new OutHandler {
@ -887,8 +916,6 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea
override def onPush(): Unit =
if (allow) push(out, grab(in))
else pull(in)
override def onUpstreamFinish(): Unit = completeStage()
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
})
setHandler(out, new OutHandler {

View file

@ -503,6 +503,20 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
def groupedWithin(n: Int, d: FiniteDuration): javadsl.Flow[In, java.util.List[Out @uncheckedVariance], Mat] =
new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
/**
* Shifts emissions in time by a specified amount
*
* '''Emits when''' upstream emitted and configured time elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream completes
*/
def delay(of: FiniteDuration): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.delay(of))
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.

View file

@ -8,7 +8,9 @@ import java.io.{ OutputStream, InputStream, File }
import akka.actor.{ ActorRef, Cancellable, Props }
import akka.event.LoggingAdapter
import akka.japi.{ Pair, Util, function }
import akka.stream.Attributes._
import akka.stream._
import akka.stream.impl.fusing.Delay
import akka.stream.impl.{ ConstantFun, StreamLayout }
import akka.stream.stage.Stage
import akka.util.ByteString
@ -707,6 +709,20 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
def groupedWithin(n: Int, d: FiniteDuration): javadsl.Source[java.util.List[Out @uncheckedVariance], Mat] =
new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
/**
* Shifts emissions in time by a specified amount
*
* '''Emits when''' upstream emitted and configured time elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream completes
*/
def delay(of: FiniteDuration): javadsl.Source[Out, Mat] =
new Source(delegate.delay(of))
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.

View file

@ -8,7 +8,7 @@ import akka.stream.Attributes._
import akka.stream._
import akka.stream.impl.Stages.{ DirectProcessor, StageModule, SymbolicGraphStage }
import akka.stream.impl.StreamLayout.{ EmptyModule, Module }
import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, TakeWithin, MapAsync, MapAsyncUnordered }
import akka.stream.impl.fusing._
import akka.stream.impl.{ ReactiveStreamsCompliance, ConstantFun, Stages, StreamLayout, Timers }
import akka.stream.stage.AbstractStage.{ PushPullGraphStageWithMaterializedValue, PushPullGraphStage }
import akka.stream.stage._
@ -716,6 +716,20 @@ trait FlowOps[+Out, +Mat] {
via(new GroupedWithin[Out](n, d).withAttributes(name("groupedWithin")))
}
/**
* Shifts emissions in time by a specified amount
*
* '''Emits when''' upstream emitted and configured time elapsed
*
* '''Backpressures when''' downstream backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream completes
*/
def delay(of: FiniteDuration): Repr[Out, Mat] =
via(new Delay[Out](of).withAttributes(name("delay")))
/**
* Discard the given number of elements at the beginning of the stream.
* No elements will be dropped if `n` is zero or negative.
@ -881,7 +895,6 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' prefix elements has been consumed and substream has been consumed
*
* '''Cancels when''' downstream cancels or substream cancels
*
*/
def prefixAndTail[U >: Out](n: Int): Repr[(immutable.Seq[Out], Source[U, Unit]), Mat] =
deprecatedAndThen(PrefixAndTail(n))
@ -956,7 +969,6 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' upstream completes
*
* '''Cancels when''' downstream cancels and substreams cancel
*
*/
def splitWhen[U >: Out](p: Out Boolean): Repr[Source[U, Unit], Mat] =
deprecatedAndThen(Split.when(p.asInstanceOf[Any Boolean]))
@ -1008,7 +1020,6 @@ trait FlowOps[+Out, +Mat] {
* '''Completes when''' upstream completes and all consumed substreams complete
*
* '''Cancels when''' downstream cancels
*
*/
def flatMapConcat[T](f: Out Source[T, _]): Repr[T, Mat] =
deprecatedAndThen(ConcatAll(f.asInstanceOf[Any Source[Any, _]]))