diff --git a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala index da2614c773..22e0631e90 100644 --- a/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala +++ b/akka-http-core/src/main/scala/akka/http/impl/engine/client/PoolInterfaceActor.scala @@ -17,7 +17,7 @@ import akka.stream.Materializer import akka.stream.actor.{ ActorPublisher, ActorSubscriber, ZeroRequestStrategy } import akka.stream.actor.ActorPublisherMessage._ import akka.stream.actor.ActorSubscriberMessage._ -import akka.stream.impl.{ SeqActorName, FixedSizeBuffer } +import akka.stream.impl.{ SeqActorName, Buffer } import akka.stream.scaladsl.{ Keep, Flow, Sink, Source } import akka.http.impl.settings.HostConnectionPoolSetup import akka.http.scaladsl.model._ @@ -52,7 +52,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, extends ActorSubscriber with ActorPublisher[RequestContext] with ActorLogging { import PoolInterfaceActor._ - private[this] val inputBuffer = FixedSizeBuffer[PoolRequest](hcps.setup.settings.maxOpenRequests) + private[this] val inputBuffer = Buffer[PoolRequest](hcps.setup.settings.maxOpenRequests, fm) private[this] var activeIdleTimeout: Option[Cancellable] = None log.debug("(Re-)starting host connection pool to {}:{}", hcps.host, hcps.port) @@ -119,7 +119,7 @@ private class PoolInterfaceActor(hcps: HostConnectionPoolSetup, // if we can't dispatch right now we buffer and dispatch when demand from the pool arrives if (inputBuffer.isFull) { x.responsePromise.failure( - new BufferOverflowException(s"Exceeded configured max-open-requests value of [${inputBuffer.size}]")) + new BufferOverflowException(s"Exceeded configured max-open-requests value of [${inputBuffer.capacity}]")) } else inputBuffer.enqueue(x) } else dispatchRequest(x) // if we can dispatch right now, do it request(1) // for every incoming request we demand one response from the pool diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala index 32978d281b..a6ce59e338 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/FixedBufferSpec.scala @@ -4,6 +4,7 @@ package akka.stream.impl import akka.stream.testkit.AkkaSpec +import akka.stream.ActorMaterializerSettings class FixedBufferSpec extends AkkaSpec { @@ -94,4 +95,29 @@ class FixedBufferSpec extends AkkaSpec { } } + "Buffer factory" must { + val default = ActorMaterializerSettings(system) + + "default to one billion for maxFixedBufferSize" in { + default.maxFixedBufferSize should ===(1000000000) + } + + "produce BoundedBuffers when capacity > max-fixed-buffer-size" in { + Buffer(Int.MaxValue, default) shouldBe a[BoundedBuffer[_]] + } + + "produce FixedSizeBuffers when capacity < max-fixed-buffer-size" in { + Buffer(1000, default) shouldBe a[FixedSizeBuffer.ModuloFixedSizeBuffer[_]] + Buffer(1024, default) shouldBe a[FixedSizeBuffer.PowerOfTwoFixedSizeBuffer[_]] + } + + "produce FixedSizeBuffers when max-fixed-buffer-size < BoundedBufferSize" in { + val settings = default.withMaxFixedBufferSize(9) + Buffer(5, default) shouldBe a[FixedSizeBuffer.ModuloFixedSizeBuffer[_]] + Buffer(10, default) shouldBe a[FixedSizeBuffer.ModuloFixedSizeBuffer[_]] + Buffer(16, default) shouldBe a[FixedSizeBuffer.PowerOfTwoFixedSizeBuffer[_]] + } + + } + } diff --git a/akka-stream/src/main/resources/reference.conf b/akka-stream/src/main/resources/reference.conf index 2c53344c75..1d4396b7e5 100644 --- a/akka-stream/src/main/resources/reference.conf +++ b/akka-stream/src/main/resources/reference.conf @@ -45,6 +45,16 @@ akka { # this may cause an initial runtime overhead, but most of the time fusing is # desirable since it reduces the number of Actors that are created. auto-fusing = on + + # Those stream elements which have explicit buffers (like mapAsync, mapAsyncUnordered, + # buffer, flatMapMerge, Source.actorRef, Source.queue, etc.) will preallocate a fixed + # buffer upon stream materialization if the requested buffer size is less than this + # configuration parameter. The default is very high because failing early is better + # than failing under load. + # + # Buffers sized larger than this will dynamically grow/shrink and consume more memory + # per element than the fixed size buffers. + max-fixed-buffer-size = 1000000000 debug { # Enables the fuzzing mode which increases the chance of race conditions diff --git a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala index 55d6e95a2f..887ecb9a06 100644 --- a/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/ActorMaterializer.scala @@ -208,10 +208,11 @@ object ActorMaterializerSettings { debugLogging: Boolean, outputBurstLimit: Int, fuzzingMode: Boolean, - autoFusing: Boolean) = + autoFusing: Boolean, + maxFixedBufferSize: Int) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize) /** * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Scala). @@ -232,7 +233,8 @@ object ActorMaterializerSettings { debugLogging = config.getBoolean("debug-logging"), outputBurstLimit = config.getInt("output-burst-limit"), fuzzingMode = config.getBoolean("debug.fuzzing-mode"), - autoFusing = config.getBoolean("auto-fusing")) + autoFusing = config.getBoolean("auto-fusing"), + maxFixedBufferSize = config.getInt("max-fixed-buffer-size")) /** * Create [[ActorMaterializerSettings]] from individual settings (Java). @@ -246,10 +248,11 @@ object ActorMaterializerSettings { debugLogging: Boolean, outputBurstLimit: Int, fuzzingMode: Boolean, - autoFusing: Boolean) = + autoFusing: Boolean, + maxFixedBufferSize: Int) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize) /** * Create [[ActorMaterializerSettings]] from the settings of an [[akka.actor.ActorSystem]] (Java). @@ -278,7 +281,8 @@ final class ActorMaterializerSettings( val debugLogging: Boolean, val outputBurstLimit: Int, val fuzzingMode: Boolean, - val autoFusing: Boolean) { + val autoFusing: Boolean, + val maxFixedBufferSize: Int) { require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0") @@ -294,10 +298,11 @@ final class ActorMaterializerSettings( debugLogging: Boolean = this.debugLogging, outputBurstLimit: Int = this.outputBurstLimit, fuzzingMode: Boolean = this.fuzzingMode, - autoFusing: Boolean = this.autoFusing) = + autoFusing: Boolean = this.autoFusing, + maxFixedBufferSize: Int = this.maxFixedBufferSize) = new ActorMaterializerSettings( initialInputBufferSize, maxInputBufferSize, dispatcher, supervisionDecider, subscriptionTimeoutSettings, debugLogging, - outputBurstLimit, fuzzingMode, autoFusing) + outputBurstLimit, fuzzingMode, autoFusing, maxFixedBufferSize) /** * Each asynchronous piece of a materialized stream topology is executed by one Actor @@ -377,6 +382,15 @@ final class ActorMaterializerSettings( if (enable == this.autoFusing) this else copy(autoFusing = enable) + /** + * Configure the maximum buffer size for which a FixedSizeBuffer will be preallocated. + * This defaults to a large value because it is usually better to fail early when + * system memory is not sufficient to hold the buffer. + */ + def withMaxFixedBufferSize(size: Int): ActorMaterializerSettings = + if (size == this.maxFixedBufferSize) this + else copy(maxFixedBufferSize = size) + /** * Leaked publishers and subscribers are cleaned up when they are not used within a given * deadline, configured by [[StreamSubscriptionTimeoutSettings]]. diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala index c64a6e8c09..a447358b78 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorRefSourceActor.scala @@ -8,27 +8,29 @@ import akka.actor.Props import akka.actor.Status import akka.stream.OverflowStrategies._ import akka.stream.{ BufferOverflowException, OverflowStrategy, OverflowStrategies } +import akka.stream.ActorMaterializerSettings /** * INTERNAL API */ private[akka] object ActorRefSourceActor { - def props(bufferSize: Int, overflowStrategy: OverflowStrategy) = { + def props(bufferSize: Int, overflowStrategy: OverflowStrategy, settings: ActorMaterializerSettings) = { require(overflowStrategy != OverflowStrategies.Backpressure, "Backpressure overflowStrategy not supported") - Props(new ActorRefSourceActor(bufferSize, overflowStrategy)) + val maxFixedBufferSize = settings.maxFixedBufferSize + Props(new ActorRefSourceActor(bufferSize, overflowStrategy, maxFixedBufferSize)) } } /** * INTERNAL API */ -private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy) +private[akka] class ActorRefSourceActor(bufferSize: Int, overflowStrategy: OverflowStrategy, maxFixedBufferSize: Int) extends akka.stream.actor.ActorPublisher[Any] with ActorLogging { import akka.stream.actor.ActorPublisherMessage._ import akka.stream.OverflowStrategy._ // when bufferSize is 0 there the buffer is not used - protected val buffer = if (bufferSize == 0) null else FixedSizeBuffer[Any](bufferSize) + protected val buffer = if (bufferSize == 0) null else Buffer[Any](bufferSize, maxFixedBufferSize) def receive = ({ case Cancel ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl/BoundedBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/BoundedBuffer.scala deleted file mode 100644 index b0216f09c6..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/BoundedBuffer.scala +++ /dev/null @@ -1,104 +0,0 @@ -/** - * Copyright (C) 2015-2016 Typesafe Inc. - */ -package akka.stream.impl - -import java.{ util ⇒ ju } - -/** - * INTERNAL API - */ -private[akka] trait Buffer[T] { - def used: Int - def isFull: Boolean - def isEmpty: Boolean - def nonEmpty: Boolean - - def enqueue(elem: T): Unit - def dequeue(): T - - def peek(): T - def clear(): Unit - def dropHead(): Unit - def dropTail(): Unit -} - -/** - * INTERNAL API - */ -private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] { - - def used: Int = q.used - def isFull: Boolean = q.isFull - def isEmpty: Boolean = q.isEmpty - def nonEmpty: Boolean = q.nonEmpty - - def enqueue(elem: T): Unit = q.enqueue(elem) - def dequeue(): T = q.dequeue() - - def peek(): T = q.peek() - def clear(): Unit = q.clear() - def dropHead(): Unit = q.dropHead() - def dropTail(): Unit = q.dropTail() - - private final class FixedQueue extends Buffer[T] { - final val Size = 16 - final val Mask = 15 - - private val queue = new Array[AnyRef](Size) - private var head = 0 - private var tail = 0 - - override def used = tail - head - override def isFull = used == capacity - override def isEmpty = tail == head - override def nonEmpty = tail != head - - override def enqueue(elem: T): Unit = - if (tail - head == Size) { - val queue = new DynamicQueue(head) - while (nonEmpty) { - queue.enqueue(dequeue()) - } - q = queue - queue.enqueue(elem) - } else { - queue(tail & Mask) = elem.asInstanceOf[AnyRef] - tail += 1 - } - override def dequeue(): T = { - val pos = head & Mask - val ret = queue(pos).asInstanceOf[T] - queue(pos) = null - head += 1 - ret - } - - override def peek(): T = - if (tail == head) null.asInstanceOf[T] - else queue(head & Mask).asInstanceOf[T] - override def clear(): Unit = - while (nonEmpty) { - dequeue() - } - override def dropHead(): Unit = dequeue() - override def dropTail(): Unit = { - tail -= 1 - queue(tail & Mask) = null - } - } - - private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] { - override def used = size - override def isFull = size == capacity - override def nonEmpty = !isEmpty() - - override def enqueue(elem: T): Unit = add(elem) - override def dequeue(): T = remove() - - override def dropHead(): Unit = remove() - override def dropTail(): Unit = removeLast() - } - - private var q: Buffer[T] = new FixedQueue -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala new file mode 100644 index 0000000000..184e0bf7b2 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/Buffers.scala @@ -0,0 +1,204 @@ +/** + * Copyright (C) 2015-2016 Typesafe Inc. + */ +package akka.stream.impl + +import java.{ util ⇒ ju } +import akka.stream._ +import scala.reflect.classTag +import scala.reflect.ClassTag + +/** + * INTERNAL API + */ +private[akka] trait Buffer[T] { + def capacity: Int + def used: Int + def isFull: Boolean + def isEmpty: Boolean + def nonEmpty: Boolean + + def enqueue(elem: T): Unit + def dequeue(): T + + def peek(): T + def clear(): Unit + def dropHead(): Unit + def dropTail(): Unit +} + +private[akka] object Buffer { + val FixedQueueSize = 128 + val FixedQueueMask = 127 + + def apply[T](size: Int, settings: ActorMaterializerSettings): Buffer[T] = + apply(size, settings.maxFixedBufferSize) + + def apply[T](size: Int, materializer: Materializer): Buffer[T] = + materializer match { + case m: ActorMaterializer ⇒ apply(size, m.settings.maxFixedBufferSize) + case _ ⇒ apply(size, 1000000000) + } + + def apply[T](size: Int, max: Int): Buffer[T] = + if (size < FixedQueueSize || size < max) FixedSizeBuffer(size) + else new BoundedBuffer(size) +} + +/** + * INTERNAL API + */ +private[akka] object FixedSizeBuffer { + + /** + * INTERNAL API + * + * Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check against overflow or + * underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing + * dequeueing or dropping. + * + * Returns a specialized instance for power-of-two sized buffers. + */ + def apply[T](size: Int): FixedSizeBuffer[T] = + if (size < 1) throw new IllegalArgumentException("size must be positive") + else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size) + else new ModuloFixedSizeBuffer(size) + + sealed abstract class FixedSizeBuffer[T](val capacity: Int) extends Buffer[T] { + override def toString = s"Buffer($capacity, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})" + private val buffer = new Array[AnyRef](capacity) + + protected var readIdx = 0 + protected var writeIdx = 0 + def used: Int = writeIdx - readIdx + + def isFull: Boolean = used == capacity + def isEmpty: Boolean = used == 0 + def nonEmpty: Boolean = used != 0 + + def enqueue(elem: T): Unit = { + put(writeIdx, elem) + writeIdx += 1 + } + + protected def toOffset(idx: Int): Int + + def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef] + def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T] + + def peek(): T = get(readIdx) + + def dequeue(): T = { + val result = get(readIdx) + dropHead() + result + } + + def clear(): Unit = { + java.util.Arrays.fill(buffer, null) + readIdx = 0 + writeIdx = 0 + } + + def dropHead(): Unit = { + put(readIdx, null.asInstanceOf[T]) + readIdx += 1 + } + + def dropTail(): Unit = { + writeIdx -= 1 + put(writeIdx, null.asInstanceOf[T]) + } + } + + private[akka] final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) { + override protected def toOffset(idx: Int): Int = idx % capacity + } + + private[akka] final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) { + private val Mask = capacity - 1 + override protected def toOffset(idx: Int): Int = idx & Mask + } + +} + +/** + * INTERNAL API + */ +private[akka] final class BoundedBuffer[T](val capacity: Int) extends Buffer[T] { + + def used: Int = q.used + def isFull: Boolean = q.isFull + def isEmpty: Boolean = q.isEmpty + def nonEmpty: Boolean = q.nonEmpty + + def enqueue(elem: T): Unit = q.enqueue(elem) + def dequeue(): T = q.dequeue() + + def peek(): T = q.peek() + def clear(): Unit = q.clear() + def dropHead(): Unit = q.dropHead() + def dropTail(): Unit = q.dropTail() + + private final class FixedQueue extends Buffer[T] { + import Buffer._ + + private val queue = new Array[AnyRef](FixedQueueSize) + private var head = 0 + private var tail = 0 + + override def capacity = BoundedBuffer.this.capacity + override def used = tail - head + override def isFull = used == capacity + override def isEmpty = tail == head + override def nonEmpty = tail != head + + override def enqueue(elem: T): Unit = + if (tail - head == FixedQueueSize) { + val queue = new DynamicQueue(head) + while (nonEmpty) { + queue.enqueue(dequeue()) + } + q = queue + queue.enqueue(elem) + } else { + queue(tail & FixedQueueMask) = elem.asInstanceOf[AnyRef] + tail += 1 + } + override def dequeue(): T = { + val pos = head & FixedQueueMask + val ret = queue(pos).asInstanceOf[T] + queue(pos) = null + head += 1 + ret + } + + override def peek(): T = + if (tail == head) null.asInstanceOf[T] + else queue(head & FixedQueueMask).asInstanceOf[T] + override def clear(): Unit = + while (nonEmpty) { + dequeue() + } + override def dropHead(): Unit = dequeue() + override def dropTail(): Unit = { + tail -= 1 + queue(tail & FixedQueueMask) = null + } + } + + private final class DynamicQueue(startIdx: Int) extends ju.LinkedList[T] with Buffer[T] { + override def capacity = BoundedBuffer.this.capacity + override def used = size + override def isFull = size == capacity + override def nonEmpty = !isEmpty() + + override def enqueue(elem: T): Unit = add(elem) + override def dequeue(): T = remove() + + override def dropHead(): Unit = remove() + override def dropTail(): Unit = removeLast() + } + + private var q: Buffer[T] = new FixedQueue +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala deleted file mode 100644 index ae53622daa..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala +++ /dev/null @@ -1,84 +0,0 @@ -/** - * Copyright (C) 2009-2016 Typesafe Inc. - */ -package akka.stream.impl - -import scala.reflect.classTag -import scala.reflect.ClassTag - -/** - * INTERNAL API - */ -private[akka] object FixedSizeBuffer { - - /** - * INTERNAL API - * - * Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check against overflow or - * underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing - * dequeueing or dropping. - * - * Returns a specialized instance for power-of-two sized buffers. - */ - def apply[T](size: Int): FixedSizeBuffer[T] = - if (size < 1) throw new IllegalArgumentException("size must be positive") - else if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size) - else new ModuloFixedSizeBuffer(size) - - sealed abstract class FixedSizeBuffer[T](val size: Int) extends Buffer[T] { - override def toString = s"Buffer($size, $readIdx, $writeIdx)(${(readIdx until writeIdx).map(get).mkString(", ")})" - private val buffer = new Array[AnyRef](size) - - protected var readIdx = 0 - protected var writeIdx = 0 - def used: Int = writeIdx - readIdx - - def isFull: Boolean = used == size - def isEmpty: Boolean = used == 0 - def nonEmpty: Boolean = used != 0 - - def enqueue(elem: T): Unit = { - put(writeIdx, elem) - writeIdx += 1 - } - - protected def toOffset(idx: Int): Int - - def put(idx: Int, elem: T): Unit = buffer(toOffset(idx)) = elem.asInstanceOf[AnyRef] - def get(idx: Int): T = buffer(toOffset(idx)).asInstanceOf[T] - - def peek(): T = get(readIdx) - - def dequeue(): T = { - val result = get(readIdx) - dropHead() - result - } - - def clear(): Unit = { - java.util.Arrays.fill(buffer, null) - readIdx = 0 - writeIdx = 0 - } - - def dropHead(): Unit = { - put(readIdx, null.asInstanceOf[T]) - readIdx += 1 - } - - def dropTail(): Unit = { - writeIdx -= 1 - put(writeIdx, null.asInstanceOf[T]) - } - } - - private final class ModuloFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) { - override protected def toOffset(idx: Int): Int = idx % size - } - - private final class PowerOfTwoFixedSizeBuffer[T](_size: Int) extends FixedSizeBuffer[T](_size) { - private val Mask = size - 1 - override protected def toOffset(idx: Int): Int = idx & Mask - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala index e6db823860..4109caa5c7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Modules.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Modules.scala @@ -107,8 +107,8 @@ private[akka] final class ActorRefSource[Out]( extends SourceModule[Out, ActorRef](shape) { override def create(context: MaterializationContext) = { - val ref = ActorMaterializer.downcast(context.materializer).actorOf(context, - ActorRefSourceActor.props(bufferSize, overflowStrategy)) + val mat = ActorMaterializer.downcast(context.materializer) + val ref = mat.actorOf(context, ActorRefSourceActor.props(bufferSize, overflowStrategy, mat.settings)) (akka.stream.actor.ActorPublisher[Out](ref), ref) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala index 1d3553f374..c843b9db81 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sinks.scala @@ -252,17 +252,17 @@ private[akka] class QueueSink[T]() extends GraphStageWithMaterializedValue[SinkS override val shape: SinkShape[T] = SinkShape.of(in) override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { - type Received[E] = Try[Option[E]] - - val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max - require(maxBuffer > 0, "Buffer size must be greater than 0") - - val buffer = FixedSizeBuffer[Received[T]](maxBuffer + 1) - var currentRequest: Option[Requested[T]] = None - val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[Requested[T]] { + type Received[E] = Try[Option[E]] + + val maxBuffer = module.attributes.getAttribute(classOf[InputBuffer], InputBuffer(16, 16)).max + require(maxBuffer > 0, "Buffer size must be greater than 0") + + var buffer: Buffer[Received[T]] = _ + var currentRequest: Option[Requested[T]] = None override def preStart(): Unit = { + buffer = Buffer(maxBuffer + 1, materializer) setKeepGoing(true) initCallback(callback.invoke) pull(in) diff --git a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala index fd3d4ae0e1..857cbe4ca9 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Sources.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Sources.scala @@ -24,11 +24,14 @@ private[akka] class QueueSource[T](maxBuffer: Int, overflowStrategy: OverflowStr override def createLogicAndMaterializedValue(inheritedAttributes: Attributes) = { val stageLogic = new GraphStageLogic(shape) with CallbackWrapper[(T, Offered)] { - val buffer = if (maxBuffer == 0) null else FixedSizeBuffer[T](maxBuffer) + var buffer: Buffer[T] = _ var pendingOffer: Option[(T, Offered)] = None var pulled = false - override def preStart(): Unit = initCallback(callback.invoke) + override def preStart(): Unit = { + if (maxBuffer > 0) buffer = Buffer(maxBuffer, materializer) + initCallback(callback.invoke) + } override def postStop(): Unit = stopCallback { case (elem, promise) ⇒ promise.failure(new IllegalStateException("Stream is terminated. SourceQueue is detached")) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index f235e82561..74dd9e9942 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -9,7 +9,7 @@ import akka.stream.Attributes.{ InputBuffer, LogLevels } import akka.stream.impl.Stages.DefaultAttributes import akka.stream.OverflowStrategies._ import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.impl.{ FixedSizeBuffer, BoundedBuffer, ReactiveStreamsCompliance } +import akka.stream.impl.{ Buffer ⇒ BufferImpl, ReactiveStreamsCompliance } import akka.stream.stage._ import akka.stream.{ Supervision, _ } import scala.annotation.tailrec @@ -372,7 +372,11 @@ private[akka] final case class Sliding[T](n: Int, step: Int) extends PushPullSta */ private[akka] final case class Buffer[T](size: Int, overflowStrategy: OverflowStrategy) extends DetachedStage[T, T] { - private val buffer = FixedSizeBuffer[T](size) + private var buffer: BufferImpl[T] = _ + + override def preStart(ctx: LifecycleContext): Unit = { + buffer = BufferImpl(size, ctx.materializer) + } override def onPush(elem: T, ctx: DetachedContext[T]): UpstreamDirective = if (ctx.isHoldingDownstream) ctx.pushAndPull(elem) @@ -639,9 +643,11 @@ private[akka] final case class MapAsync[In, Out](parallelism: Int, f: In ⇒ Fut //FIXME Put Supervision.stoppingDecider as a SupervisionStrategy on DefaultAttributes.mapAsync? val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) - val buffer = new BoundedBuffer[Holder[Try[Out]]](parallelism) + var buffer: BufferImpl[Holder[Try[Out]]] = _ def todo = buffer.used + override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) + @tailrec private def pushOne(): Unit = if (buffer.isEmpty) { if (isClosed(in)) completeStage() @@ -719,9 +725,11 @@ private[akka] final case class MapAsyncUnordered[In, Out](parallelism: Int, f: I .map(_.decider).getOrElse(Supervision.stoppingDecider) var inFlight = 0 - val buffer = new BoundedBuffer[Out](parallelism) + var buffer: BufferImpl[Out] = _ def todo = inFlight + buffer.used + override def preStart(): Unit = buffer = BufferImpl(parallelism, materializer) + def failOrPull(ex: Throwable) = if (decider(ex) == Supervision.Stop) failStage(ex) else if (isClosed(in) && todo == 0) completeStage() @@ -958,9 +966,11 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS case Some(InputBuffer(min, max)) ⇒ max } - val buffer = FixedSizeBuffer[(Long, T)](size) // buffer has pairs timestamp with upstream element + var buffer: BufferImpl[(Long, T)] = _ // buffer has pairs timestamp with upstream element var willStop = false + override def preStart(): Unit = buffer = BufferImpl(size, materializer) + setHandler(in, handler = new InHandler { //FIXME rewrite into distinct strategy functions to avoid matching on strategy for every input when full override def onPush(): Unit = { @@ -989,7 +999,7 @@ private[stream] final class Delay[T](d: FiniteDuration, strategy: DelayOverflowS case Backpressure ⇒ throw new IllegalStateException("Delay buffer must never overflow in Backpressure mode") } else { - grabAndPull(strategy != Backpressure || buffer.size < size - 1) + grabAndPull(strategy != Backpressure || buffer.capacity < size - 1) if (!isTimerActive(timerName)) scheduleOnce(timerName, d) } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala index ef2a19eb45..7e423a1309 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -21,7 +21,7 @@ import akka.stream.impl.MultiStreamOutputProcessor.SubstreamSubscriptionTimeout import scala.annotation.tailrec import akka.stream.impl.PublisherSource import akka.stream.impl.CancellingSubscriber -import akka.stream.impl.BoundedBuffer +import akka.stream.impl.{ Buffer ⇒ BufferImpl } /** * INTERNAL API @@ -38,7 +38,9 @@ final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Graph[ var sources = Set.empty[SubSinkInlet[T]] def activeSources = sources.size - val q = new BoundedBuffer[SubSinkInlet[T]](breadth) + var q: BufferImpl[SubSinkInlet[T]] = _ + + override def preStart(): Unit = q = BufferImpl(breadth, materializer) def pushOut(): Unit = { val src = q.dequeue()