Merge pull request #19703 from akka/wip-19702-maxFixedBufferSize-RK

add maxFixedBufferSize materializer setting #19702
This commit is contained in:
Roland Kuhn 2016-02-11 08:41:29 +01:00
commit c136c06f76
13 changed files with 306 additions and 223 deletions

View file

@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance }
import akka.stream.impl.{ Buffer BufferImpl, ReactiveStreamsCompliance }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
@ -372,7 +372,11 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta
*/
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
private val buffer = FixedSizeBuffer[T](size)
private var buffer: BufferImpl[T] = _
override def preStart(ctx: LifecycleContext): Unit = {
buffer = BufferImpl(size, ctx.materializer)
}
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
@ -639,9 +643,11 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider)
val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
var buffer: BufferImpl[Holder[Try[Out]]] = _
def todo = buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
@tailrec private def pushOne(): Unit =
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
@ -719,9 +725,11 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
.map(_.decider).getOrElse(Supervision.stoppingDecider)
var inFlight = 0
val buffer = new BoundedBuffer[Out](parallelism)
var buffer: BufferImpl[Out] = _
def todo = inFlight + buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
def failOrPull(ex: Throwable) =
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isClosed(in) && todo == 0) completeStage()
@ -958,9 +966,11 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
case Some(InputBuffer(min, max)) max
}
val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element
var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
var willStop = false
override def preStart(): Unit = buffer = BufferImpl(size, materializer)
setHandler(in, handler = new InHandler {
//FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full
override def onPush(): Unit = {
@ -989,7 +999,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
else {
grabAndPull(strategy != Backpressure || buffer.size < size - 1)
grabAndPull(strategy != Backpressure || buffer.capacity < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
}
}