Merge remote-tracking branch 'pr/18985' into release-2.3-dev
This commit is contained in:
commit
819c1ef504
14 changed files with 421 additions and 23 deletions
|
|
@ -5,7 +5,8 @@ package akka.stream.impl.fusing
|
|||
|
||||
import akka.event.Logging.LogLevel
|
||||
import akka.event.{ LogSource, Logging, LoggingAdapter }
|
||||
import akka.stream.Attributes.LogLevels
|
||||
import akka.stream.Attributes.{ InputBuffer, LogLevels }
|
||||
import akka.stream.DelayOverflowStrategy.EmitEarly
|
||||
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
|
||||
import akka.stream.impl.{ FixedSizeBuffer, ReactiveStreamsCompliance }
|
||||
import akka.stream.stage._
|
||||
|
|
@ -14,10 +15,10 @@ import scala.annotation.tailrec
|
|||
import scala.collection.immutable
|
||||
import scala.collection.immutable.VectorBuilder
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.util.control.NonFatal
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import akka.stream.ActorAttributes.SupervisionStrategy
|
||||
import scala.concurrent.duration.{ FiniteDuration, _ }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -398,7 +399,7 @@ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowSt
|
|||
else ctx.absorbTermination()
|
||||
|
||||
val enqueueAction: (DetachedContext[T], T) ⇒ UpstreamDirective = {
|
||||
overflowStrategy match {
|
||||
(overflowStrategy: @unchecked) match {
|
||||
case DropHead ⇒ (ctx, elem) ⇒
|
||||
if (buffer.isFull) buffer.dropHead()
|
||||
buffer.enqueue(elem)
|
||||
|
|
@ -857,13 +858,89 @@ private[stream] class GroupedWithin[T](n: Int, d: FiniteDuration) extends GraphS
|
|||
}
|
||||
}
|
||||
|
||||
private[stream] class Delay[T](d: FiniteDuration, strategy: DelayOverflowStrategy) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
val size = inheritedAttributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
|
||||
val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element
|
||||
val timerName = "DelayedTimer"
|
||||
var willStop = false
|
||||
|
||||
setHandler(in, handler = new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
if (buffer.isFull) (strategy: @unchecked) match {
|
||||
case EmitEarly ⇒
|
||||
if (!isTimerActive(timerName))
|
||||
push(out, buffer.dequeue()._2)
|
||||
else {
|
||||
cancelTimer(timerName)
|
||||
onTimer(timerName)
|
||||
}
|
||||
case DelayOverflowStrategy.DropHead ⇒
|
||||
buffer.dropHead()
|
||||
grabAndPull(true)
|
||||
case DelayOverflowStrategy.DropTail ⇒
|
||||
buffer.dropTail()
|
||||
grabAndPull(true)
|
||||
case DelayOverflowStrategy.DropNew ⇒
|
||||
grab(in)
|
||||
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
|
||||
case DelayOverflowStrategy.DropBuffer ⇒
|
||||
buffer.clear()
|
||||
grabAndPull(true)
|
||||
case DelayOverflowStrategy.Fail ⇒
|
||||
failStage(new DelayOverflowStrategy.Fail.BufferOverflowException(s"Buffer overflow for delay combinator (max capacity was: $size)!"))
|
||||
case DelayOverflowStrategy.Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
|
||||
}
|
||||
else {
|
||||
grabAndPull(strategy != DelayOverflowStrategy.Backpressure || buffer.size < size - 1)
|
||||
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
|
||||
}
|
||||
}
|
||||
|
||||
def grabAndPull(pullCondition: Boolean): Unit = {
|
||||
buffer.enqueue((System.nanoTime(), grab(in)))
|
||||
if (pullCondition) pull(in)
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = {
|
||||
if (isAvailable(out) && isTimerActive(timerName)) willStop = true
|
||||
else completeStage()
|
||||
}
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
if (!isTimerActive(timerName) && !buffer.isEmpty && nextElementWaitTime() < 0)
|
||||
push(out, buffer.dequeue()._2)
|
||||
|
||||
if (!willStop && !hasBeenPulled(in)) pull(in)
|
||||
completeIfReady()
|
||||
}
|
||||
})
|
||||
|
||||
def completeIfReady(): Unit = if (willStop && buffer.isEmpty) completeStage()
|
||||
|
||||
def nextElementWaitTime(): Long = d.toMillis - (System.nanoTime() - buffer.peek()._1) * 1000 * 1000
|
||||
|
||||
final override protected def onTimer(key: Any): Unit = {
|
||||
push(out, buffer.dequeue()._2)
|
||||
if (!buffer.isEmpty) {
|
||||
val waitTime = nextElementWaitTime()
|
||||
if (waitTime > 10) scheduleOnce(timerName, waitTime.millis)
|
||||
}
|
||||
completeIfReady()
|
||||
}
|
||||
}
|
||||
|
||||
override def toString = "Delay"
|
||||
}
|
||||
|
||||
private[stream] class TakeWithin[T](timeout: FiniteDuration) extends SimpleLinearGraphStage[T] {
|
||||
|
||||
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new TimerGraphStageLogic(shape) {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = push(out, grab(in))
|
||||
override def onUpstreamFinish(): Unit = completeStage()
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
|
|
@ -888,8 +965,6 @@ private[stream] class DropWithin[T](timeout: FiniteDuration) extends SimpleLinea
|
|||
override def onPush(): Unit =
|
||||
if (allow) push(out, grab(in))
|
||||
else pull(in)
|
||||
override def onUpstreamFinish(): Unit = completeStage()
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = failStage(ex)
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue