+str #18556 add delay combinator

This commit is contained in:
Alexander Golubev 2015-11-25 21:29:35 -05:00
parent 270ef41359
commit 83d3143236
10 changed files with 277 additions and 60 deletions

View file

@ -263,6 +263,16 @@ object TestSubscriber {
self self
} }
/**
* Fluent DSL
*
* Expect a stream element during specified time, then timeout.
*/
def expectNext(d: FiniteDuration, element: I): Self = {
probe.expectMsg(d, OnNext(element))
self
}
/** /**
* Fluent DSL * Fluent DSL
* *

View file

@ -10,7 +10,6 @@ import akka.stream.testkit.scaladsl._
import akka.actor.Actor import akka.actor.Actor
import akka.actor.ActorRef import akka.actor.ActorRef
import akka.actor.Props import akka.actor.Props
import akka.stream.OverflowStrategy
object ActorRefSinkSpec { object ActorRefSinkSpec {
case class Fw(ref: ActorRef) extends Actor { case class Fw(ref: ActorRef) extends Actor {

View file

@ -3,40 +3,116 @@
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl
import akka.stream.ActorMaterializer import akka.stream.Attributes._
import akka.stream.testkit.Utils._
import akka.stream.testkit.scaladsl.TestSink import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber } import akka.stream.testkit.{ AkkaSpec, TestPublisher, TestSubscriber }
import akka.stream.testkit.Utils._ import akka.stream.{ DelayOverflowStrategy, ActorMaterializer }
import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class FlowDelaySpec extends AkkaSpec { class FlowDelaySpec extends AkkaSpec {
implicit val materializer = ActorMaterializer() implicit val materializer = ActorMaterializer()
"A Delay" must { "A Delay" must {
"deliver elements with some time shift" in {
Await.result(
Source(1 to 10).delay(1.seconds).grouped(100).runWith(Sink.head),
1200.millis) should ===(1 to 10)
}
"deliver element after time passed" in { "deliver element after time passed from actual receiving element" in {
Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int]) Source(1 to 3).delay(300.millis).runWith(TestSink.probe[Int])
.request(3) .request(2)
.expectNoMsg(100.millis) .expectNoMsg(200.millis) //delay
.expectNext(1) .expectNext(200.millis, 1) //delayed element
.expectNoMsg(100.millis) .expectNext(100.millis, 2) //buffered element
.expectNext(2) .expectNoMsg(200.millis)
.expectNoMsg(100.millis) .request(1)
.expectNext(3) .expectNext(3) //buffered element
.expectComplete() .expectComplete()
} }
"deliver buffered elements onComplete before the timeout" in assertAllStagesStopped { "deliver elements with delay for slow stream" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]() val c = TestSubscriber.manualProbe[Int]()
Source(1 to 3).delay(300.millis).to(Sink(c)).run() val p = TestPublisher.manualProbe[Int]()
Source(p).delay(300.millis).to(Sink(c)).run()
val cSub = c.expectSubscription() val cSub = c.expectSubscription()
c.expectNoMsg(200.millis) val pSub = p.expectSubscription()
cSub.request(100) cSub.request(100)
(1 to 3) foreach { n c.expectNext(n) } pSub.sendNext(1)
c.expectComplete()
c.expectNoMsg(200.millis) c.expectNoMsg(200.millis)
c.expectNext(1)
pSub.sendNext(2)
c.expectNoMsg(200.millis)
c.expectNext(2)
pSub.sendComplete()
c.expectComplete()
}
"drop tail for internal buffer if it's full in DropTail mode" in assertAllStagesStopped {
Await.result(
Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropTail).withAttributes(inputBuffer(16, 16))
.grouped(100)
.runWith(Sink.head),
1200.millis) should ===((1 to 15).toList :+ 20)
}
"drop head for internal buffer if it's full in DropHead mode" in assertAllStagesStopped {
Await.result(
Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropHead).withAttributes(inputBuffer(16, 16))
.grouped(100)
.runWith(Sink.head),
1200.millis) should ===(5 to 20)
}
"clear all for internal buffer if it's full in DropBuffer mode" in assertAllStagesStopped {
Await.result(
Source(1 to 20).delay(1.seconds, DelayOverflowStrategy.dropBuffer).withAttributes(inputBuffer(16, 16))
.grouped(100)
.runWith(Sink.head),
1200.millis) should ===(17 to 20)
}
"pass elements with delay through normally in backpressured mode" in assertAllStagesStopped {
Source(1 to 3).delay(300.millis, DelayOverflowStrategy.backpressure).runWith(TestSink.probe[Int])
.request(5)
.expectNoMsg(200.millis)
.expectNext(200.millis, 1)
.expectNoMsg(200.millis)
.expectNext(200.millis, 2)
.expectNoMsg(200.millis)
.expectNext(200.millis, 3)
}
"fail on overflow in Fail mode" in assertAllStagesStopped {
Source(1 to 20).delay(300.millis, DelayOverflowStrategy.fail)
.withAttributes(inputBuffer(16, 16))
.runWith(TestSink.probe[Int])
.request(100)
.expectError(new DelayOverflowStrategy.Fail.BufferOverflowException("Buffer overflow for delay combinator (max capacity was: 16)!"))
}
"emit early when buffer is full and in EmitEarly mode" in assertAllStagesStopped {
val c = TestSubscriber.manualProbe[Int]()
val p = TestPublisher.manualProbe[Int]()
Source(p).delay(10.seconds, DelayOverflowStrategy.emitEarly).withAttributes(inputBuffer(16, 16)).to(Sink(c)).run()
val cSub = c.expectSubscription()
val pSub = p.expectSubscription()
cSub.request(20)
for (i 1 to 16) pSub.sendNext(i)
c.expectNoMsg(300.millis)
pSub.sendNext(17)
c.expectNext(100.millis, 1)
pSub.sendError(new RuntimeException() with NoStackTrace)
c.expectError()
} }
} }
} }

View file

@ -6,42 +6,45 @@ package akka.stream
/** /**
* Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element. * Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element.
*/ */
sealed abstract class OverflowStrategy sealed abstract class OverflowStrategy extends Serializable
sealed trait DelayOverflowStrategy extends Serializable
object OverflowStrategy { private[akka] trait BaseOverflowStrategy {
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case object DropHead extends OverflowStrategy private[akka] case object DropHead extends OverflowStrategy with DelayOverflowStrategy
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case object DropTail extends OverflowStrategy private[akka] case object DropTail extends OverflowStrategy with DelayOverflowStrategy
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case object DropBuffer extends OverflowStrategy private[akka] case object DropBuffer extends OverflowStrategy with DelayOverflowStrategy
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case object DropNew extends OverflowStrategy private[akka] case object DropNew extends OverflowStrategy with DelayOverflowStrategy
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case object Backpressure extends OverflowStrategy private[akka] case object Backpressure extends OverflowStrategy with DelayOverflowStrategy
/** /**
* INTERNAL API * INTERNAL API
*/ */
private[akka] final case object Fail extends OverflowStrategy { private[akka] case object Fail extends OverflowStrategy with DelayOverflowStrategy {
final case class BufferOverflowException(msg: String) extends RuntimeException(msg) final case class BufferOverflowException(msg: String) extends RuntimeException(msg)
} }
}
object OverflowStrategy extends BaseOverflowStrategy {
/** /**
* If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
* the new element. * the new element.
@ -75,3 +78,48 @@ object OverflowStrategy {
*/ */
def fail: OverflowStrategy = Fail def fail: OverflowStrategy = Fail
} }
object DelayOverflowStrategy extends BaseOverflowStrategy {
/**
* INTERNAL API
*/
private[akka] case object EmitEarly extends DelayOverflowStrategy
/**
* If the buffer is full when a new element is available this strategy send next element downstream without waiting
*/
def emitEarly: DelayOverflowStrategy = EmitEarly
/**
* If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
* the new element.
*/
def dropHead: DelayOverflowStrategy = DropHead
/**
* If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
* the new element.
*/
def dropTail: DelayOverflowStrategy = DropTail
/**
* If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
*/
def dropBuffer: DelayOverflowStrategy = DropBuffer
/**
* If the buffer is full when a new element arrives, drops the new element.
*/
def dropNew: DelayOverflowStrategy = DropNew
/**
* If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
* space becomes available in the buffer.
*/
def backpressure: DelayOverflowStrategy = Backpressure
/**
* If the buffer is full when a new element is available this strategy completes the stream with failure.
*/
def fail: DelayOverflowStrategy = Fail
}

View file

@ -53,7 +53,7 @@ private[akka] class AcknowledgePublisher(bufferSize: Int, overflowStrategy: Over
sendAck(false) sendAck(false)
} else if (!buffer.isFull) } else if (!buffer.isFull)
enqueueAndSendAck(elem) enqueueAndSendAck(elem)
else overflowStrategy match { else (overflowStrategy: @unchecked) match {
case DropHead case DropHead
log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]")
buffer.dropHead() buffer.dropHead()

View file

@ -58,7 +58,7 @@ private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: Overf
log.debug("Dropping element because there is no downstream demand: [{}]", elem) log.debug("Dropping element because there is no downstream demand: [{}]", elem)
else if (!buffer.isFull) else if (!buffer.isFull)
buffer.enqueue(elem) buffer.enqueue(elem)
else overflowStrategy match { else (overflowStrategy: @unchecked) match {
case DropHead case DropHead
log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]") log.debug("Dropping the head element because buffer is full and overflowStrategy is: [DropHead]")
buffer.dropHead() buffer.dropHead()

View file

@ -5,7 +5,8 @@ package akka.stream.impl.fusing
import akka.event.Logging.LogLevel import akka.event.Logging.LogLevel
import akka.event.{ LogSource, Logging, LoggingAdapter } import akka.event.{ LogSource, Logging, LoggingAdapter }
import akka.stream.Attributes.LogLevels import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.DelayOverflowStrategy.EmitEarly
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance } import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
import akka.stream.stage._ import akka.stream.stage._
@ -14,10 +15,10 @@ import scala.annotation.tailrec
import scala.collection.immutable import scala.collection.immutable
import scala.collection.immutable.VectorBuilder import scala.collection.immutable.VectorBuilder
import scala.concurrent.Future import scala.concurrent.Future
import scala.concurrent.duration.FiniteDuration
import scala.util.control.NonFatal import scala.util.control.NonFatal
import scala.util.{ Failure, Success, Try } import scala.util.{ Failure, Success, Try }
import akka.stream.ActorAttributes.SupervisionStrategy import akka.stream.ActorAttributes.SupervisionStrategy
import scala.concurrent.duration.{ FiniteDuration, _ }
/** /**
* INTERNAL API * INTERNAL API
@ -398,7 +399,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
else ctx.absorbTermination() else ctx.absorbTermination()
val enqueueAction: (DetachedContext[T], T) UpstreamDirective = { val enqueueAction: (DetachedContext[T], T) UpstreamDirective = {
overflowStrategy match { (overflowStrategy: @unchecked) match {
case DropHead (ctx, elem) case DropHead (ctx, elem)
if (buffer.isFull) buffer.dropHead() if (buffer.isFull) buffer.dropHead()
buffer.enqueue(elem) buffer.enqueue(elem)
@ -857,31 +858,78 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
} }
} }
private[stream] class Delay[T](d: FiniteDuration) extends SimpleLinearGraphStage[T] { private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
var element: T = _ val maxBuffer = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
val buffer = FixedSizeBuffer[(Long, T)](maxBuffer) // buffer has pairs timestamp with upstream element
val timerName = "DelayedTimer" val timerName = "DelayedTimer"
var willStop = false var willStop = false
setHandler(in, new InHandler { setHandler(in, handler = new InHandler {
override def onPush(): Unit = { override def onPush(): Unit = {
element = grab(in) if (buffer.isFull) (strategy: @unchecked) match {
scheduleOnce("DelayedTimer", d) case EmitEarly
if (!isTimerActive(timerName))
push(out, buffer.dequeue()._2)
else {
cancelTimer(timerName)
onTimer(timerName)
}
case DelayOverflowStrategy.DropHead
buffer.dropHead()
grabElementAndSchedule(true)
case DelayOverflowStrategy.DropTail
buffer.dropTail()
grabElementAndSchedule(true)
case DelayOverflowStrategy.DropNew
grab(in)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
case DelayOverflowStrategy.DropBuffer
buffer.clear()
grabElementAndSchedule(true)
case DelayOverflowStrategy.Fail
failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $maxBuffer)!"))
case DelayOverflowStrategy.Backpressure //do nothing here
}
else {
grabElementAndSchedule(strategy != DelayOverflowStrategy.Backpressure)
}
} }
override def onUpstreamFinish(): Unit =
def grabElementAndSchedule(pullCondition: Boolean): Unit = {
buffer.enqueue((System.currentTimeMillis(), grab(in)))
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
if (pullCondition) pull(in)
}
override def onUpstreamFinish(): Unit = {
if (isAvailable(out) && isTimerActive(timerName)) willStop = true if (isAvailable(out) && isTimerActive(timerName)) willStop = true
else completeStage() else completeStage()
}
}) })
setHandler(out, new OutHandler { setHandler(out, new OutHandler {
override def onPull(): Unit = pull(in) override def onPull(): Unit = {
if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0)
push(out, buffer.dequeue()._2)
if (!willStop && !hasBeenPulled(in)) pull(in)
completeIfReady()
}
}) })
def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage()
def nextElementWaitTime(): Long = d.toMillis - (System.currentTimeMillis() - buffer.peek()._1)
final override protected def onTimer(key: Any): Unit = { final override protected def onTimer(key: Any): Unit = {
push(out, element) push(out, buffer.dequeue()._2)
if (willStop) if (!buffer.isEmpty) {
completeStage() val waitTime = nextElementWaitTime()
if (waitTime > 0) scheduleOnce(timerName, waitTime.millis)
}
completeIfReady()
} }
} }

View file

@ -504,18 +504,30 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step new Flow(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
/** /**
* Shifts emissions in time by a specified amount * Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
* *
* '''Emits when''' upstream emitted and configured time elapsed * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
* *
* '''Backpressures when''' downstream backpressures * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
* *
* '''Completes when''' upstream completes * '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
* *
* '''Cancels when''' downstream completes * '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def delay(of: FiniteDuration): javadsl.Flow[In, Out, Mat] = def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): javadsl.Flow[In, Out, Mat] =
new Flow(delegate.delay(of)) new Flow(delegate.delay(of, strategy))
/** /**
* Discard the given number of elements at the beginning of the stream. * Discard the given number of elements at the beginning of the stream.

View file

@ -710,18 +710,30 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step new Source(delegate.groupedWithin(n, d).map(_.asJava)) // TODO optimize to one step
/** /**
* Shifts emissions in time by a specified amount * Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
* *
* '''Emits when''' upstream emitted and configured time elapsed * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
* *
* '''Backpressures when''' downstream backpressures * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
* *
* '''Completes when''' upstream completes * '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
* *
* '''Cancels when''' downstream completes * '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def delay(of: FiniteDuration): javadsl.Source[Out, Mat] = def delay(of: FiniteDuration, strategy: DelayOverflowStrategy): javadsl.Source[Out, Mat] =
new Source(delegate.delay(of)) new Source(delegate.delay(of, strategy))
/** /**
* Discard the given number of elements at the beginning of the stream. * Discard the given number of elements at the beginning of the stream.

View file

@ -717,18 +717,30 @@ trait FlowOps[+Out, +Mat] {
} }
/** /**
* Shifts emissions in time by a specified amount * Shifts elements emission in time by a specified amount. It allows to store elements
* in internal buffer while waiting for next element to be emitted. Depending on the defined
* [[akka.stream.DelayOverflowStrategy]] it might drop elements or backpressure the upstream if
* there is no space available in the buffer.
* *
* '''Emits when''' upstream emitted and configured time elapsed * Internal buffer has default capacity 16. You can set buffer size by calling `withAttributes(inputBuffer)`
* *
* '''Backpressures when''' downstream backpressures * '''Emits when''' there is a pending element in the buffer and configured time for this element elapsed
* * EmitEarly - strategy do not wait to emit element if buffer is full
* *
* '''Completes when''' upstream completes * '''Backpressures when''' depending on OverflowStrategy
* * Backpressure - backpressures when buffer is full
* * DropHead, DropTail, DropBuffer - never backpressures
* * Fail - fails the stream if buffer gets full
* *
* '''Cancels when''' downstream completes * '''Completes when''' upstream completes and buffered elements has been drained
*
* '''Cancels when''' downstream cancels
*
* @param of time to shift all messages
* @param strategy Strategy that is used when incoming elements cannot fit inside the buffer
*/ */
def delay(of: FiniteDuration): Repr[Out, Mat] = def delay(of: FiniteDuration, strategy: DelayOverflowStrategy = DelayOverflowStrategy.dropTail): Repr[Out, Mat] =
via(new Delay[Out](of).withAttributes(name("delay"))) via(new Delay[Out](of, strategy).withAttributes(name("delay")))
/** /**
* Discard the given number of elements at the beginning of the stream. * Discard the given number of elements at the beginning of the stream.