add maxFixedBufferSize materializer setting #19702

This commit is contained in:
Roland Kuhn 2016-02-07 14:54:48 +01:00
parent f3073b89e0
commit d0e6f46f14
13 changed files with 306 additions and 223 deletions

View file

@ -17,7 +17,7 @@ import akka.stream.{ ActorAttributes, Materializer }
import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy }
import akka.stream.actor.ActorPublisherMessage._
import akka.stream.actor.ActorSubscriberMessage._
import akka.stream.impl.{ SeqActorName, FixedSizeBuffer }
import akka.stream.impl.{ SeqActorName, Buffer }
import akka.stream.scaladsl.{ Keep, Flow, Sink, Source }
import akka.http.impl.settings.HostConnectionPoolSetup
import akka.http.scaladsl.model._
@ -52,7 +52,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
extends ActorSubscriber with ActorPublisher[RequestContext] with ActorLogging {
import PoolInterfaceActor._
private[this] val inputBuffer = FixedSizeBuffer[PoolRequest](hcps.setup.settings.maxOpenRequests)
private[this] val inputBuffer = Buffer[PoolRequest](hcps.setup.settings.maxOpenRequests, fm)
private[this] var activeIdleTimeout: Option[Cancellable] = None
log.debug("(Re-)starting host connection pool to {}:{}", hcps.host, hcps.port)
@ -119,7 +119,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup,
// if we can't dispatch right now we buffer and dispatch when demand from the pool arrives
if (inputBuffer.isFull) {
x.responsePromise.failure(
new BufferOverflowException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}]"))
new BufferOverflowException(s"Exceeded configured max-open-requests value of [${inputBuffer.capacity}]"))
} else inputBuffer.enqueue(x)
} else dispatchRequest(x) // if we can dispatch right now, do it
request(1) // for every incoming request we demand one response from the pool

View file

@ -4,6 +4,7 @@
package akka.stream.impl
import akka.stream.testkit.AkkaSpec
import akka.stream.ActorMaterializerSettings
class FixedBufferSpec extends AkkaSpec {
@ -94,4 +95,29 @@ class FixedBufferSpec extends AkkaSpec {
}
}
"Buffer factory" must {
val default = ActorMaterializerSettings(system)
"default to one billion for maxFixedBufferSize" in {
default.maxFixedBufferSize should ===(1000000000)
}
"produce BoundedBuffers when capacity > max-fixed-buffer-size" in {
Buffer(Int.MaxValue, default) shouldBe a[BoundedBuffer[_]]
}
"produce FixedSizeBuffers when capacity < max-fixed-buffer-size" in {
Buffer(1000, default) shouldBe a[FixedSizeBuffer.ModuloFixedSizeBuffer[_]]
Buffer(1024, default) shouldBe a[FixedSizeBuffer.PowerOfTwoFixedSizeBuffer[_]]
}
"produce FixedSizeBuffers when max-fixed-buffer-size < BoundedBufferSize" in {
val settings = default.withMaxFixedBufferSize(9)
Buffer(5, default) shouldBe a[FixedSizeBuffer.ModuloFixedSizeBuffer[_]]
Buffer(10, default) shouldBe a[FixedSizeBuffer.ModuloFixedSizeBuffer[_]]
Buffer(16, default) shouldBe a[FixedSizeBuffer.PowerOfTwoFixedSizeBuffer[_]]
}
}
}

View file

@ -46,6 +46,16 @@ akka {
# desirable since it reduces the number of Actors that are created.
auto-fusing = on
# Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered,
# buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed
# buffer upon stream materialization if the requested buffer size is less than this
# configuration parameter. The default is very high because failing early is better
# than failing under load.
#
# Buffers sized larger than this will dynamically grow/shrink and consume more memory
# per element than the fixed size buffers.
max-fixed-buffer-size = 1000000000
debug {
# Enables the fuzzing mode which increases the chance of race conditions
# by aggressively reordering events and making certain operations more

View file

@ -208,10 +208,11 @@ object ActorMaterializerSettings {
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean) =
autoFusing: Boolean,
maxFixedBufferSize: Int) =
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing)
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize)
/**
* Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala).
@ -232,7 +233,8 @@ object ActorMaterializerSettings {
debugLogging = config.getBoolean("debug-logging"),
outputBurstLimit = config.getInt("output-burst-limit"),
fuzzingMode = config.getBoolean("debug.fuzzing-mode"),
autoFusing = config.getBoolean("auto-fusing"))
autoFusing = config.getBoolean("auto-fusing"),
maxFixedBufferSize = config.getInt("max-fixed-buffer-size"))
/**
* Create [[ActorMaterializerSettings]] from individual settings (Java).
@ -246,10 +248,11 @@ object ActorMaterializerSettings {
debugLogging: Boolean,
outputBurstLimit: Int,
fuzzingMode: Boolean,
autoFusing: Boolean) =
autoFusing: Boolean,
maxFixedBufferSize: Int) =
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing)
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize)
/**
* Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java).
@ -278,7 +281,8 @@ final class ActorMaterializerSettings(
val debugLogging: Boolean,
val outputBurstLimit: Int,
val fuzzingMode: Boolean,
val autoFusing: Boolean) {
val autoFusing: Boolean,
val maxFixedBufferSize: Int) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
@ -294,10 +298,11 @@ final class ActorMaterializerSettings(
debugLogging: Boolean = this.debugLogging,
outputBurstLimit: Int = this.outputBurstLimit,
fuzzingMode: Boolean = this.fuzzingMode,
autoFusing: Boolean = this.autoFusing) =
autoFusing: Boolean = this.autoFusing,
maxFixedBufferSize: Int = this.maxFixedBufferSize) =
new ActorMaterializerSettings(
initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging,
outputBurstLimit, fuzzingMode, autoFusing)
outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize)
/**
* Each asynchronous piece of a materialized stream topology is executed by one Actor
@ -377,6 +382,15 @@ final class ActorMaterializerSettings(
if (enable == this.autoFusing) this
else copy(autoFusing = enable)
/**
* Configure the maximum buffer size for which a FixedSizeBuffer will be preallocated.
* This defaults to a large value because it is usually better to fail early when
* system memory is not sufficient to hold the buffer.
*/
def withMaxFixedBufferSize(size: Int): ActorMaterializerSettings =
if (size == this.maxFixedBufferSize) this
else copy(maxFixedBufferSize = size)
/**
* Leaked publishers and subscribers are cleaned up when they are not used within a given
* deadline, configured by [[StreamSubscriptionTimeoutSettings]].

View file

@ -8,27 +8,29 @@ import akka.actor.Props
import akka.actor.Status
import akka.stream.OverflowStrategies._
import akka.stream.{ BufferOverflowException, OverflowStrategy, OverflowStrategies }
import akka.stream.ActorMaterializerSettings
/**
* INTERNAL API
*/
private[akka] object ActorRefSourceActor {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = {
def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = {
require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported")
Props(new ActorRefSourceActor(bufferSize, overflowStrategy))
val maxFixedBufferSize = settings.maxFixedBufferSize
Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize))
}
}
/**
* INTERNAL API
*/
private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy)
private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int)
extends akka.stream.actor.ActorPublisher[Any] with ActorLogging {
import akka.stream.actor.ActorPublisherMessage._
import akka.stream.OverflowStrategy._
// when bufferSize is 0 there the buffer is not used
protected val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize)
protected val buffer = if (bufferSize == 0) null else Buffer[Any](bufferSize, maxFixedBufferSize)
def receive = ({
case Cancel

View file

@ -1,104 +0,0 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.{ util ju }
/**
* INTERNAL API
*/
private[akka] trait Buffer[T] {
def used: Int
def isFull: Boolean
def isEmpty: Boolean
def nonEmpty: Boolean
def enqueue(elem: T): Unit
def dequeue(): T
def peek(): T
def clear(): Unit
def dropHead(): Unit
def dropTail(): Unit
}
/**
* INTERNAL API
*/
private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] {
def used: Int = q.used
def isFull: Boolean = q.isFull
def isEmpty: Boolean = q.isEmpty
def nonEmpty: Boolean = q.nonEmpty
def enqueue(elem: T): Unit = q.enqueue(elem)
def dequeue(): T = q.dequeue()
def peek(): T = q.peek()
def clear(): Unit = q.clear()
def dropHead(): Unit = q.dropHead()
def dropTail(): Unit = q.dropTail()
private final class FixedQueue extends Buffer[T] {
final val Size = 16
final val Mask = 15
private val queue = new Array[AnyRef](Size)
private var head = 0
private var tail = 0
override def used = tail - head
override def isFull = used == capacity
override def isEmpty = tail == head
override def nonEmpty = tail != head
override def enqueue(elem: T): Unit =
if (tail - head == Size) {
val queue = new DynamicQueue(head)
while (nonEmpty) {
queue.enqueue(dequeue())
}
q = queue
queue.enqueue(elem)
} else {
queue(tail & Mask) = elem.asInstanceOf[AnyRef]
tail += 1
}
override def dequeue(): T = {
val pos = head & Mask
val ret = queue(pos).asInstanceOf[T]
queue(pos) = null
head += 1
ret
}
override def peek(): T =
if (tail == head) null.asInstanceOf[T]
else queue(head & Mask).asInstanceOf[T]
override def clear(): Unit =
while (nonEmpty) {
dequeue()
}
override def dropHead(): Unit = dequeue()
override def dropTail(): Unit = {
tail -= 1
queue(tail & Mask) = null
}
}
private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] {
override def used = size
override def isFull = size == capacity
override def nonEmpty = !isEmpty()
override def enqueue(elem: T): Unit = add(elem)
override def dequeue(): T = remove()
override def dropHead(): Unit = remove()
override def dropTail(): Unit = removeLast()
}
private var q: Buffer[T] = new FixedQueue
}

View file

@ -0,0 +1,204 @@
/**
* Copyright (C) 2015-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import java.{ util ju }
import akka.stream._
import scala.reflect.classTag
import scala.reflect.ClassTag
/**
* INTERNAL API
*/
private[akka] trait Buffer[T] {
def capacity: Int
def used: Int
def isFull: Boolean
def isEmpty: Boolean
def nonEmpty: Boolean
def enqueue(elem: T): Unit
def dequeue(): T
def peek(): T
def clear(): Unit
def dropHead(): Unit
def dropTail(): Unit
}
private[akka] object Buffer {
val FixedQueueSize = 128
val FixedQueueMask = 127
def apply[T](size: Int, settings: ActorMaterializerSettings): Buffer[T] =
apply(size, settings.maxFixedBufferSize)
def apply[T](size: Int, materializer: Materializer): Buffer[T] =
materializer match {
case m: ActorMaterializer apply(size, m.settings.maxFixedBufferSize)
case _ apply(size, 1000000000)
}
def apply[T](size: Int, max: Int): Buffer[T] =
if (size < FixedQueueSize || size < max) FixedSizeBuffer(size)
else new BoundedBuffer(size)
}
/**
* INTERNAL API
*/
private[akka] object FixedSizeBuffer {
/**
* INTERNAL API
*
* Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check against overflow or
* underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing
* dequeueing or dropping.
*
* Returns a specialized instance for power-of-two sized buffers.
*/
def apply[T](size: Int): FixedSizeBuffer[T] =
if (size < 1) throw new IllegalArgumentException("size must be positive")
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
sealed abstract class FixedSizeBuffer[T](val capacity: Int) extends Buffer[T] {
override def toString = s"Buffer($capacity, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
private val buffer = new Array[AnyRef](capacity)
protected var readIdx = 0
protected var writeIdx = 0
def used: Int = writeIdx - readIdx
def isFull: Boolean = used == capacity
def isEmpty: Boolean = used == 0
def nonEmpty: Boolean = used != 0
def enqueue(elem: T): Unit = {
put(writeIdx, elem)
writeIdx += 1
}
protected def toOffset(idx: Int): Int
def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef]
def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
def peek(): T = get(readIdx)
def dequeue(): T = {
val result = get(readIdx)
dropHead()
result
}
def clear(): Unit = {
java.util.Arrays.fill(buffer, null)
readIdx = 0
writeIdx = 0
}
def dropHead(): Unit = {
put(readIdx, null.asInstanceOf[T])
readIdx += 1
}
def dropTail(): Unit = {
writeIdx -= 1
put(writeIdx, null.asInstanceOf[T])
}
}
private[akka] final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
override protected def toOffset(idx: Int): Int = idx % capacity
}
private[akka] final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
private val Mask = capacity - 1
override protected def toOffset(idx: Int): Int = idx & Mask
}
}
/**
* INTERNAL API
*/
private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] {
def used: Int = q.used
def isFull: Boolean = q.isFull
def isEmpty: Boolean = q.isEmpty
def nonEmpty: Boolean = q.nonEmpty
def enqueue(elem: T): Unit = q.enqueue(elem)
def dequeue(): T = q.dequeue()
def peek(): T = q.peek()
def clear(): Unit = q.clear()
def dropHead(): Unit = q.dropHead()
def dropTail(): Unit = q.dropTail()
private final class FixedQueue extends Buffer[T] {
import Buffer._
private val queue = new Array[AnyRef](FixedQueueSize)
private var head = 0
private var tail = 0
override def capacity = BoundedBuffer.this.capacity
override def used = tail - head
override def isFull = used == capacity
override def isEmpty = tail == head
override def nonEmpty = tail != head
override def enqueue(elem: T): Unit =
if (tail - head == FixedQueueSize) {
val queue = new DynamicQueue(head)
while (nonEmpty) {
queue.enqueue(dequeue())
}
q = queue
queue.enqueue(elem)
} else {
queue(tail & FixedQueueMask) = elem.asInstanceOf[AnyRef]
tail += 1
}
override def dequeue(): T = {
val pos = head & FixedQueueMask
val ret = queue(pos).asInstanceOf[T]
queue(pos) = null
head += 1
ret
}
override def peek(): T =
if (tail == head) null.asInstanceOf[T]
else queue(head & FixedQueueMask).asInstanceOf[T]
override def clear(): Unit =
while (nonEmpty) {
dequeue()
}
override def dropHead(): Unit = dequeue()
override def dropTail(): Unit = {
tail -= 1
queue(tail & FixedQueueMask) = null
}
}
private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] {
override def capacity = BoundedBuffer.this.capacity
override def used = size
override def isFull = size == capacity
override def nonEmpty = !isEmpty()
override def enqueue(elem: T): Unit = add(elem)
override def dequeue(): T = remove()
override def dropHead(): Unit = remove()
override def dropTail(): Unit = removeLast()
}
private var q: Buffer[T] = new FixedQueue
}

View file

@ -1,84 +0,0 @@
/**
* Copyright (C) 2009-2016 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import scala.reflect.classTag
import scala.reflect.ClassTag
/**
* INTERNAL API
*/
private[akka] object FixedSizeBuffer {
/**
* INTERNAL API
*
* Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check against overflow or
* underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing
* dequeueing or dropping.
*
* Returns a specialized instance for power-of-two sized buffers.
*/
def apply[T](size: Int): FixedSizeBuffer[T] =
if (size < 1) throw new IllegalArgumentException("size must be positive")
else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
sealed abstract class FixedSizeBuffer[T](val size: Int) extends Buffer[T] {
override def toString = s"Buffer($size, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})"
private val buffer = new Array[AnyRef](size)
protected var readIdx = 0
protected var writeIdx = 0
def used: Int = writeIdx - readIdx
def isFull: Boolean = used == size
def isEmpty: Boolean = used == 0
def nonEmpty: Boolean = used != 0
def enqueue(elem: T): Unit = {
put(writeIdx, elem)
writeIdx += 1
}
protected def toOffset(idx: Int): Int
def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef]
def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T]
def peek(): T = get(readIdx)
def dequeue(): T = {
val result = get(readIdx)
dropHead()
result
}
def clear(): Unit = {
java.util.Arrays.fill(buffer, null)
readIdx = 0
writeIdx = 0
}
def dropHead(): Unit = {
put(readIdx, null.asInstanceOf[T])
readIdx += 1
}
def dropTail(): Unit = {
writeIdx -= 1
put(writeIdx, null.asInstanceOf[T])
}
}
private final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
override protected def toOffset(idx: Int): Int = idx % size
}
private final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) {
private val Mask = size - 1
override protected def toOffset(idx: Int): Int = idx & Mask
}
}

View file

@ -107,8 +107,8 @@ private[akka] final class ActorRefSource[Out](
extends SourceModule[Out, ActorRef](shape) {
override def create(context: MaterializationContext) = {
val ref = ActorMaterializer.downcast(context.materializer).actorOf(context,
ActorRefSourceActor.props(bufferSize, overflowStrategy))
val mat = ActorMaterializer.downcast(context.materializer)
val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings))
(akka.stream.actor.ActorPublisher[Out](ref), ref)
}

View file

@ -252,17 +252,17 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS
override val shape: SinkShape[T] = SinkShape.of(in)
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
type Received[E] = Try[Option[E]]
val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max
require(maxBuffer > 0, "Buffer size must be greater than 0")
val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1)
var buffer: Buffer[Received[T]] = _
var currentRequest: Option[Requested[T]] = None
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] {
override def preStart(): Unit = {
buffer = Buffer(maxBuffer + 1, materializer)
setKeepGoing(true)
initCallback(callback.invoke)
pull(in)

View file

@ -24,11 +24,14 @@ private[akka] class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStr
override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = {
val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[(T, Offered)] {
val buffer = if (maxBuffer == 0) null else FixedSizeBuffer[T](maxBuffer)
var buffer: Buffer[T] = _
var pendingOffer: Option[(T, Offered)] = None
var pulled = false
override def preStart(): Unit = initCallback(callback.invoke)
override def preStart(): Unit = {
if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer)
initCallback(callback.invoke)
}
override def postStop(): Unit = stopCallback {
case (elem, promise) promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached"))
}

View file

@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels }
import akka.stream.impl.Stages.DefaultAttributes
import akka.stream.OverflowStrategies._
import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage
import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance }
import akka.stream.impl.{ Buffer BufferImpl, ReactiveStreamsCompliance }
import akka.stream.stage._
import akka.stream.{ Supervision, _ }
import scala.annotation.tailrec
@ -372,7 +372,11 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta
*/
private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] {
private val buffer = FixedSizeBuffer[T](size)
private var buffer: BufferImpl[T] = _
override def preStart(ctx: LifecycleContext): Unit = {
buffer = BufferImpl(size, ctx.materializer)
}
override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective =
if (ctx.isHoldingDownstream) ctx.pushAndPull(elem)
@ -634,9 +638,11 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut
//FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync?
val decider = inheritedAttributes.getAttribute(classOf[SupervisionStrategy]).map(_.decider).getOrElse(Supervision.stoppingDecider)
val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism)
var buffer: BufferImpl[Holder[Try[Out]]] = _
def todo = buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
@tailrec private def pushOne(): Unit =
if (buffer.isEmpty) {
if (isClosed(in)) completeStage()
@ -714,9 +720,11 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I
.map(_.decider).getOrElse(Supervision.stoppingDecider)
var inFlight = 0
val buffer = new BoundedBuffer[Out](parallelism)
var buffer: BufferImpl[Out] = _
def todo = inFlight + buffer.used
override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer)
def failOrPull(ex: Throwable) =
if (decider(ex) == Supervision.Stop) failStage(ex)
else if (isClosed(in) && todo == 0) completeStage()
@ -953,9 +961,11 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
case Some(InputBuffer(min, max)) max
}
val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element
var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element
var willStop = false
override def preStart(): Unit = buffer = BufferImpl(size, materializer)
setHandler(in, handler = new InHandler {
//FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full
override def onPush(): Unit = {
@ -984,7 +994,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS
case Backpressure throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode")
}
else {
grabAndPull(strategy != Backpressure || buffer.size < size - 1)
grabAndPull(strategy != Backpressure || buffer.capacity < size - 1)
if (!isTimerActive(timerName)) scheduleOnce(timerName, d)
}
}

View file

@ -21,7 +21,7 @@ import akka.stream.impl.MultiStreamOutputProcessor.SubstreamSubscriptionTimeout
import scala.annotation.tailrec
import akka.stream.impl.PublisherSource
import akka.stream.impl.CancellingSubscriber
import akka.stream.impl.BoundedBuffer
import akka.stream.impl.{ Buffer BufferImpl }
/**
* INTERNAL API
@ -38,7 +38,9 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[
var sources = Set.empty[SubSinkInlet[T]]
def activeSources = sources.size
val q = new BoundedBuffer[SubSinkInlet[T]](breadth)
var q: BufferImpl[SubSinkInlet[T]] = _
override def preStart(): Unit = q = BufferImpl(breadth, materializer)
def pushOut(): Unit = {
val src = q.dequeue()