diff --git a/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala new file mode 100644 index 0000000000..4548ff4949 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/OverflowStrategy.scala @@ -0,0 +1,55 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +/** + * Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element. + */ +sealed abstract class OverflowStrategy + +object OverflowStrategy { + + /** + * INTERNAL API + */ + private[akka] final case object DropHead extends OverflowStrategy + + /** + * INTERNAL API + */ + private[akka] final case object DropTail extends OverflowStrategy + + /** + * INTERNAL API + */ + private[akka] final case object DropBuffer extends OverflowStrategy + + /** + * INTERNAL API + */ + private[akka] final case object Backpressure extends OverflowStrategy + + /** + * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for + * the new element. + */ + def dropHead: OverflowStrategy = DropHead + + /** + * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for + * the new element. + */ + def dropTail: OverflowStrategy = DropTail + + /** + * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element. + */ + def dropBuffer: OverflowStrategy = DropBuffer + + /** + * If the buffer is full when a new element is available this strategy backpressures the upstream producer until + * space becomes available in the buffer. + */ + def backpressure: OverflowStrategy = Backpressure +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 14c95a8c2f..baff53c342 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -8,8 +8,7 @@ import scala.collection.immutable import org.reactivestreams.api.{ Consumer, Processor, Producer } import org.reactivestreams.spi.Subscriber import akka.actor.ActorRefFactory -import akka.stream.{ MaterializerSettings, FlowMaterializer } -import akka.stream.Transformer +import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer } import scala.util.Try import scala.concurrent.Future import scala.util.Success @@ -59,6 +58,18 @@ private[akka] object Ast { override def name = "concatFlatten" } + case class Conflate(seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends AstNode { + override def name = "conflate" + } + + case class Expand(seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends AstNode { + override def name = "expand" + } + + case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode { + override def name = "buffer" + } + trait ProducerNode[I] { private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 201188e38e..d11108c67f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -23,13 +23,16 @@ private[akka] object ActorProcessor { import Ast._ def props(settings: MaterializerSettings, op: AstNode): Props = (op match { - case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) - case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) - case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) - case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) - case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) - case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) - case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) + case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) + case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) + case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) + case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) + case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) + case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) + case cf: Conflate ⇒ Props(new ConflateImpl(settings, cf.seed, cf.aggregate)) + case ex: Expand ⇒ Props(new ExpandImpl(settings, ex.seed, ex.extrapolate)) + case bf: Buffer ⇒ Props(new BufferImpl(settings, bf.size, bf.overflowStrategy)) case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) case ConcatAll ⇒ Props(new ConcatAllImpl(settings)) }).withDispatcher(settings.dispatcher) diff --git a/akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala b/akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala new file mode 100644 index 0000000000..9c3702d766 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/FixedSizeBuffer.scala @@ -0,0 +1,83 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +/** + * INTERNAL API + */ +private[akka] object FixedSizeBuffer { + + /** + * INTERNAL API + * + * Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check agains overflow or + * underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing + * dequeueing or dropping. + * + * Returns a specialized instance for power-of-two sized buffers. + */ + def apply(size: Int): FixedSizeBuffer = + if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size) + else new ModuloFixedSizeBuffer(size) + + sealed abstract class FixedSizeBuffer(val size: Int) { + protected var readIdx = 0 + protected var writeIdx = 0 + private var remainingCapacity = size + private val buffer = Array.ofDim[Any](size) + + protected def incWriteIdx(): Unit + protected def decWriteIdx(): Unit + protected def incReadIdx(): Unit + + def isFull: Boolean = remainingCapacity == 0 + def isEmpty: Boolean = remainingCapacity == size + + def enqueue(elem: Any): Unit = { + buffer(writeIdx) = elem + incWriteIdx() + remainingCapacity -= 1 + } + + def dequeue(): Any = { + val result = buffer(readIdx) + dropHead() + result + } + + def clear(): Unit = { + java.util.Arrays.fill(buffer.asInstanceOf[Array[Object]], null) + readIdx = 0 + writeIdx = 0 + remainingCapacity = size + } + + def dropHead(): Unit = { + buffer(readIdx) = null + incReadIdx() + remainingCapacity += 1 + } + + def dropTail(): Unit = { + decWriteIdx() + //buffer(writeIdx) = null + remainingCapacity += 1 + } + } + + private final class ModuloFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) { + override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) % size + override protected def decWriteIdx(): Unit = writeIdx = (writeIdx + size - 1) % size + override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) % size + } + + private final class PowerOfTwoFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) { + private val Mask = size - 1 + override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) & Mask + override protected def decWriteIdx(): Unit = writeIdx = (writeIdx - 1) & Mask + override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) & Mask + } + +} + diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index f7416fb5fc..b2489ab05b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -9,6 +9,7 @@ import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import Ast.{ AstNode, Transform } +import akka.stream.{ OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.Flow import scala.util.Success @@ -248,6 +249,17 @@ private[akka] trait Builder[Out] { def tee(other: Consumer[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Consumer[Any]])) + def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Thing[S] = + andThen(Conflate(seed.asInstanceOf[Any ⇒ Any], aggregate.asInstanceOf[(Any, Any) ⇒ Any])) + + def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Thing[U] = + andThen(Expand(seed.asInstanceOf[Any ⇒ Any], extrapolate.asInstanceOf[Any ⇒ (Any, Any)])) + + def buffer(size: Int, overflowStrategy: OverflowStrategy): Thing[Out] = { + require(size > 0, s"Buffer size must be larger than zero but was [$size]") + andThen(Buffer(size, overflowStrategy)) + } + def flatten[U](strategy: FlattenStrategy[Out, U]): Thing[U] = strategy match { case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]") diff --git a/akka-stream/src/main/scala/akka/stream/impl/RateDetachedProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/RateDetachedProcessors.scala new file mode 100644 index 0000000000..90d0d4b6ed --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/RateDetachedProcessors.scala @@ -0,0 +1,92 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.{ OverflowStrategy, MaterializerSettings } + +class ConflateImpl(_settings: MaterializerSettings, seed: Any ⇒ Any, aggregate: (Any, Any) ⇒ Any) extends ActorProcessorImpl(_settings) { + var conflated: Any = null + + val waitNextZero: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + conflated = seed(primaryInputs.dequeueInputElement()) + nextPhase(conflateThenEmit) + } + + val conflateThenEmit: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { () ⇒ + if (primaryInputs.inputsAvailable) conflated = aggregate(conflated, primaryInputs.dequeueInputElement()) + if (primaryOutputs.demandAvailable) { + primaryOutputs.enqueueOutputElement(conflated) + conflated = null + nextPhase(waitNextZero) + } + } + + nextPhase(waitNextZero) +} + +class ExpandImpl(_settings: MaterializerSettings, seed: Any ⇒ Any, extrapolate: Any ⇒ (Any, Any)) extends ActorProcessorImpl(_settings) { + var extrapolateState: Any = null + + val waitFirst: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + extrapolateState = seed(primaryInputs.dequeueInputElement()) + nextPhase(emitFirst) + } + + val emitFirst: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + emitExtrapolate() + nextPhase(extrapolateOrReset) + } + + val extrapolateOrReset: TransferPhase = TransferPhase(primaryInputs.NeedsInputOrComplete || primaryOutputs.NeedsDemand) { () ⇒ + if (primaryInputs.inputsDepleted) nextPhase(completedPhase) + else if (primaryInputs.inputsAvailable) { + extrapolateState = seed(primaryInputs.dequeueInputElement()) + nextPhase(emitFirst) + } else emitExtrapolate() + } + + def emitExtrapolate(): Unit = { + val (emit, nextState) = extrapolate(extrapolateState) + primaryOutputs.enqueueOutputElement(emit) + extrapolateState = nextState + } + + nextPhase(waitFirst) +} + +class BufferImpl(_settings: MaterializerSettings, size: Int, overflowStrategy: OverflowStrategy) extends ActorProcessorImpl(_settings) { + import OverflowStrategy._ + + val buffer = FixedSizeBuffer(size) + + val dropAction: () ⇒ Unit = overflowStrategy match { + case DropHead ⇒ buffer.dropHead + case DropTail ⇒ buffer.dropTail + case DropBuffer ⇒ buffer.clear + case Backpressure ⇒ () ⇒ nextPhase(bufferFull) + } + + val bufferEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + buffer.enqueue(primaryInputs.dequeueInputElement()) + nextPhase(bufferNonEmpty) + } + + val bufferNonEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { () ⇒ + if (primaryOutputs.demandAvailable) { + primaryOutputs.enqueueOutputElement(buffer.dequeue()) + if (buffer.isEmpty) nextPhase(bufferEmpty) + } else { + if (buffer.isFull) dropAction() + else buffer.enqueue(primaryInputs.dequeueInputElement()) + } + } + + val bufferFull: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + primaryOutputs.enqueueOutputElement(buffer.dequeue()) + if (buffer.isEmpty) nextPhase(bufferEmpty) + else nextPhase(bufferNonEmpty) + } + + nextPhase(bufferEmpty) +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 981b05a7df..42c0c99262 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -15,7 +15,7 @@ import akka.japi.Pair import akka.japi.Predicate import akka.japi.Procedure import akka.japi.Util.immutableSeq -import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } +import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Duct ⇒ SDuct } import akka.stream.impl.Ast @@ -201,6 +201,44 @@ abstract class Duct[In, Out] { */ def append[U](duct: Duct[_ >: In, U]): Duct[In, U] + /** + * Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary + * until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream producer is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: Function[Out, S], aggregate: Function2[S, Out, S]): Duct[In, S] + + /** + * Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the consumer until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * consumer. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: Function[Out, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U] + + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out] + /** * Materialize this `Duct` by attaching it to the specified downstream `consumer` * and return a `Consumer` representing the input side of the `Duct`. @@ -311,6 +349,18 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def tee(other: Consumer[_ >: T]): Duct[In, T] = new DuctAdapter(delegate.tee(other)) + override def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, T] = + new DuctAdapter(delegate.buffer(size, overflowStrategy)) + + override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U] = + new DuctAdapter(delegate.expand(seed.apply, (s: S) ⇒ { + val p = extrapolate.apply(s) + (p.first, p.second) + })) + + override def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Duct[In, S] = + new DuctAdapter(delegate.conflate(seed.apply, aggregate.apply)) + override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] = new DuctAdapter(delegate.flatten(strategy)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5dc2c39479..8a462ca683 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -16,7 +16,7 @@ import akka.japi.Pair import akka.japi.Predicate import akka.japi.Procedure import akka.japi.Util.immutableSeq -import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } +import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Flow ⇒ SFlow } import org.reactivestreams.api.Consumer @@ -253,6 +253,44 @@ abstract class Flow[T] { */ def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] + /** + * Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary + * until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream producer is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Flow[S] + + /** + * Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the consumer until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * consumer. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Flow[U] + + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] + /** * Returns a [[scala.concurrent.Future]] that will be fulfilled with the first * thing that is signaled to this stream, which can be either an element (after @@ -372,6 +410,18 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] = new FlowAdapter(delegate.flatten(strategy)) + override def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] = + new FlowAdapter(delegate.buffer(size, overflowStrategy)) + + override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Flow[U] = + new FlowAdapter(delegate.expand(seed.apply, (s: S) ⇒ { + val p = extrapolate.apply(s) + (p.first, p.second) + })) + + override def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Flow[S] = + new FlowAdapter(delegate.conflate(seed.apply, aggregate.apply)) + override def append[U](duct: Duct[_ >: T, U]): Flow[U] = new FlowAdapter(delegate.appendJava(duct)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 0caf8924dc..ef950e9275 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -8,7 +8,7 @@ import scala.collection.immutable import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer -import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } +import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.impl.DuctImpl import akka.stream.impl.Ast @@ -185,6 +185,44 @@ trait Duct[In, +Out] { */ def flatten[U](strategy: FlattenStrategy[Out, U]): Duct[In, U] + /** + * Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary + * until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream producer is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: Out ⇒ S, aggregate: (S, Out) ⇒ S): Duct[In, S] + + /** + * Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the consumer until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * consumer. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: Out ⇒ S, extrapolate: S ⇒ (U, S)): Duct[In, U] + + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out] + /** * Append the operations of a [[Duct]] to this `Duct`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index cb5c6d2c52..9a175c1fab 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,7 +9,7 @@ import scala.concurrent.Future import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer -import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } +import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer } import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.Ast.FutureProducerNode import akka.stream.impl.FlowImpl @@ -243,6 +243,44 @@ trait Flow[+T] { */ def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] + /** + * Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary + * until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the + * upstream producer is faster. + * + * This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not + * duplicate elements. + * + * @param seed Provides the first state for a conflated value using the first unconsumed element as a start + * @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate + */ + def conflate[S](seed: T ⇒ S, aggregate: (S, T) ⇒ S): Flow[S] + + /** + * Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older + * element until new element comes from the upstream. For example an expand step might repeat the last element for + * the consumer until it receives an update from upstream. + * + * This element will never "drop" upstream elements as all elements go through at least one extrapolation step. + * This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream + * consumer. + * + * @param seed Provides the first state for extrapolation using the first unconsumed element + * @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation + * state. + */ + def expand[S, U](seed: T ⇒ S, extrapolate: S ⇒ (U, S)): Flow[U] + + /** + * Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full. + * Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no + * space available + * + * @param size The size of the buffer in element count + * @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer + */ + def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] + /** * Append the operations of a [[Duct]] to this flow. */ diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index 8c919133e4..931220643d 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -11,6 +11,7 @@ import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; import akka.stream.FlattenStrategy; +import akka.stream.OverflowStrategy; import org.junit.ClassRule; import org.junit.Test; @@ -425,4 +426,70 @@ public class FlowTest { assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); } + @Test + public void mustBeAbleToUseBuffer() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future> future = Flow + .create(input) + .buffer(2, OverflowStrategy.backpressure()) + .grouped(4) + .toFuture(materializer); + List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(input, result); + } + + @Test + public void mustBeAbleToUseConflate() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future future = Flow + .create(input) + .conflate(new Function() { + @Override + public String apply(String s) throws Exception { + return s; + } + }, + new Function2() { + @Override + public String apply(String in, String aggr) throws Exception { + return in; + } + } + ) + .fold("", new Function2() { + @Override + public String apply(String aggr, String in) throws Exception { + return in; + } + }) + .toFuture(materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("C", result); + } + + @Test + public void mustBeAbleToUseExpand() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final List input = Arrays.asList("A", "B", "C"); + Future future = Flow + .create(input) + .expand(new Function() { + @Override + public String apply(String in) throws Exception { + return in; + } + }, + new Function>() { + @Override + public Pair apply(String in) throws Exception { + return new Pair(in, in); + } + } + ) + .toFuture(materializer); + String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals("A", result); + } } diff --git a/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala new file mode 100644 index 0000000000..2048996bfc --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowBufferSpec.scala @@ -0,0 +1,183 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.scaladsl.Flow +import scala.concurrent.Await +import scala.concurrent.duration._ +import OverflowStrategy._ + +class FlowBufferSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 1, + maximumInputBufferSize = 1, + initialFanOutBufferSize = 1, + maxFanOutBufferSize = 1, + dispatcher = "akka.test.stream-dispatcher")) + + "Buffer" must { + + "pass elements through normally in backpressured mode" in { + val future = Flow((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).toFuture(materializer) + Await.result(future, 3.seconds) should be(1 to 1000) + } + + "pass elements through normally in backpressured mode with buffer size one" in { + val future = Flow((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).toFuture(materializer) + Await.result(future, 3.seconds) should be(1 to 1000) + } + + "pass elements through a chain of backpressured buffers of different size" in { + val future = Flow((1 to 1000).iterator) + .buffer(1, overflowStrategy = OverflowStrategy.backpressure) + .buffer(10, overflowStrategy = OverflowStrategy.backpressure) + .buffer(256, overflowStrategy = OverflowStrategy.backpressure) + .buffer(1, overflowStrategy = OverflowStrategy.backpressure) + .buffer(5, overflowStrategy = OverflowStrategy.backpressure) + .buffer(128, overflowStrategy = OverflowStrategy.backpressure) + .grouped(1001) + .toFuture(materializer) + Await.result(future, 3.seconds) should be(1 to 1000) + } + + "accept elements that fit in the buffer while downstream is silent" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.backpressure).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + // Fill up buffer + for (i ← 1 to 100) autoProducer.sendNext(i) + + // drain + for (i ← 1 to 100) { + sub.requestMore(1) + consumer.expectNext(i) + } + sub.cancel() + } + + "drop head elements if buffer is full and configured so" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.dropHead).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + // Fill up buffer + for (i ← 1 to 200) autoProducer.sendNext(i) + + // drain + for (i ← 101 to 200) { + sub.requestMore(1) + consumer.expectNext(i) + } + + sub.requestMore(1) + consumer.expectNoMsg(1.seconds) + + autoProducer.sendNext(-1) + sub.requestMore(1) + consumer.expectNext(-1) + + sub.cancel() + } + + "drop tail elements if buffer is full and configured so" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.dropTail).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + // Fill up buffer + for (i ← 1 to 200) autoProducer.sendNext(i) + + // drain + for (i ← 1 to 99) { + sub.requestMore(1) + consumer.expectNext(i) + } + + sub.requestMore(1) + consumer.expectNext(200) + + sub.requestMore(1) + consumer.expectNoMsg(1.seconds) + + autoProducer.sendNext(-1) + sub.requestMore(1) + consumer.expectNext(-1) + + sub.cancel() + } + + "drop all elements if buffer is full and configured so" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + // Fill up buffer + for (i ← 1 to 150) autoProducer.sendNext(i) + + // drain + for (i ← 101 to 150) { + sub.requestMore(1) + consumer.expectNext(i) + } + + sub.requestMore(1) + consumer.expectNoMsg(1.seconds) + + autoProducer.sendNext(-1) + sub.requestMore(1) + consumer.expectNext(-1) + + sub.cancel() + } + + for (strategy ← List(OverflowStrategy.dropHead, OverflowStrategy.dropTail, OverflowStrategy.dropBuffer)) { + + s"work with $strategy if buffer size of one" in { + + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).buffer(1, overflowStrategy = strategy).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + // Fill up buffer + for (i ← 1 to 200) autoProducer.sendNext(i) + + sub.requestMore(1) + consumer.expectNext(200) + + sub.requestMore(1) + consumer.expectNoMsg(1.seconds) + + autoProducer.sendNext(-1) + sub.requestMore(1) + consumer.expectNext(-1) + + sub.cancel() + } + } + + } +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala new file mode 100644 index 0000000000..de04ad5c92 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowConflateSpec.scala @@ -0,0 +1,100 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.scaladsl.Flow +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowConflateSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) + + "Conflate" must { + + "pass-through elements unchanged when there is no rate difference" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + for (i ← 1 to 100) { + sub.requestMore(1) + autoProducer.sendNext(i) + consumer.expectNext(i) + } + + sub.cancel() + } + + "conflate elements while downstream is silent" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + for (i ← 1 to 100) { + autoProducer.sendNext(i) + } + consumer.expectNoMsg(1.second) + sub.requestMore(1) + consumer.expectNext(5050) + sub.cancel() + } + + "work on a variable rate chain" in { + val future = Flow((1 to 1000).iterator) + .conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .fold(0)(_ + _) + .toFuture(materializer) + + Await.result(future, 10.seconds) should be(500500) + } + + "backpressure consumer when upstream is slower" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).conflate[Int](seed = i ⇒ i, aggregate = (sum, i) ⇒ sum + i).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + sub.requestMore(1) + autoProducer.sendNext(1) + consumer.expectNext(1) + + sub.requestMore(1) + consumer.expectNoMsg(1.second) + autoProducer.sendNext(2) + consumer.expectNext(2) + + autoProducer.sendNext(3) + autoProducer.sendNext(4) + sub.requestMore(1) + consumer.expectNext(7) + + sub.requestMore(1) + consumer.expectNoMsg(1.second) + sub.cancel() + + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala new file mode 100644 index 0000000000..7fb7235bcc --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowExpandSpec.scala @@ -0,0 +1,119 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.scaladsl.Flow +import scala.concurrent.forkjoin.ThreadLocalRandom +import scala.concurrent.Await +import scala.concurrent.duration._ + +class FlowExpandSpec extends AkkaSpec { + + val materializer = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) + + "Expand" must { + + "pass-through elements unchanged when there is no rate difference" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + // Simply repeat the last element as an extrapolation step + Flow(producer).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + for (i ← 1 to 100) { + // Order is important here: If the request comes first it will be extrapolated! + autoProducer.sendNext(i) + sub.requestMore(1) + consumer.expectNext(i) + } + + sub.cancel() + } + + "expand elements while upstream is silent" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + // Simply repeat the last element as an extrapolation step + Flow(producer).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + autoProducer.sendNext(42) + + for (i ← 1 to 100) { + sub.requestMore(1) + consumer.expectNext(42) + } + + autoProducer.sendNext(-42) + sub.requestMore(1) + consumer.expectNext(-42) + + sub.cancel() + } + + "work on a variable rate chain" in { + val future = Flow((1 to 100).iterator) + .map { i ⇒ if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i } + .expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)) + .fold(Set.empty[Int])(_ + _) + .toFuture(materializer) + + Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100)) + } + + "backpressure producer when consumer is slower" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[Int] + + Flow(producer).expand[Int, Int](seed = i ⇒ i, extrapolate = i ⇒ (i, i)).produceTo(materializer, consumer) + + val autoProducer = new StreamTestKit.AutoProducer(producer) + val sub = consumer.expectSubscription() + + autoProducer.sendNext(1) + sub.requestMore(1) + consumer.expectNext(1) + sub.requestMore(1) + consumer.expectNext(1) + + var pending = autoProducer.pendingRequests + // Deplete pending requests coming from input buffer + while (pending > 0) { + autoProducer.subscription.sendNext(2) + pending -= 1 + } + + // The above sends are absorbed in the input buffer, and will result in two one-sized batch requests + pending += autoProducer.subscription.expectRequestMore() + pending += autoProducer.subscription.expectRequestMore() + while (pending > 0) { + autoProducer.subscription.sendNext(2) + pending -= 1 + } + + producer.expectNoMsg(1.second) + + sub.requestMore(2) + consumer.expectNext(2) + consumer.expectNext(2) + + // Now production is resumed + autoProducer.subscription.expectRequestMore() + + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/impl/FixedBufferSpec.scala b/akka-stream/src/test/scala/akka/stream/impl/FixedBufferSpec.scala new file mode 100644 index 0000000000..70a0d1b3ab --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/impl/FixedBufferSpec.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.testkit.AkkaSpec + +class FixedBufferSpec extends AkkaSpec { + + for (size ← List(1, 3, 4)) { + + s"FixedSizeBuffer of size $size" must { + + "start as empty" in { + val buf = FixedSizeBuffer(size) + buf.isEmpty should be(true) + buf.isFull should be(false) + } + + "become nonempty after enqueueing" in { + val buf = FixedSizeBuffer(size) + buf.enqueue("test") + buf.isEmpty should be(false) + buf.isFull should be(size == 1) + } + + "become full after size elements are enqueued" in { + val buf = FixedSizeBuffer(size) + for (_ ← 1 to size) buf.enqueue("test") + buf.isEmpty should be(false) + buf.isFull should be(true) + } + + "become empty after enqueueing and tail drop" in { + val buf = FixedSizeBuffer(size) + buf.enqueue("test") + buf.dropTail() + buf.isEmpty should be(true) + buf.isFull should be(false) + } + + "become empty after enqueueing and head drop" in { + val buf = FixedSizeBuffer(size) + buf.enqueue("test") + buf.dropHead() + buf.isEmpty should be(true) + buf.isFull should be(false) + } + + "drop head properly" in { + val buf = FixedSizeBuffer(size) + for (elem ← 1 to size) buf.enqueue(elem) + buf.dropHead() + for (elem ← 2 to size) buf.dequeue() should be(elem) + } + + "drop tail properly" in { + val buf = FixedSizeBuffer(size) + for (elem ← 1 to size) buf.enqueue(elem) + buf.dropTail() + for (elem ← 1 to size - 1) buf.dequeue() should be(elem) + } + + "become non-full after tail dropped from full buffer" in { + val buf = FixedSizeBuffer(size) + for (_ ← 1 to size) buf.enqueue("test") + buf.dropTail() + buf.isEmpty should be(size == 1) + buf.isFull should be(false) + } + + "become non-full after head dropped from full buffer" in { + val buf = FixedSizeBuffer(size) + for (_ ← 1 to size) buf.enqueue("test") + buf.dropTail() + buf.isEmpty should be(size == 1) + buf.isFull should be(false) + } + + "work properly with full-range filling/draining cycles" in { + val buf = FixedSizeBuffer(size) + + for (_ ← 1 to 10) { + buf.isEmpty should be(true) + buf.isFull should be(false) + for (elem ← 1 to size) buf.enqueue(elem) + buf.isEmpty should be(false) + buf.isFull should be(true) + for (elem ← 1 to size) buf.dequeue() should be(elem) + } + + } + + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala index 6d5a5688a0..67d8df987e 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -142,4 +142,15 @@ object StreamTestKit { override def requestMore(elements: Int): Unit = subscriber.onComplete() override def cancel(): Unit = () } + + class AutoProducer[T](probe: ProducerProbe[T], initialPendingRequests: Int = 0) { + val subscription = probe.expectSubscription() + var pendingRequests = initialPendingRequests + + def sendNext(elem: T): Unit = { + if (pendingRequests == 0) pendingRequests = subscription.expectRequestMore() + pendingRequests -= 1 + subscription.sendNext(elem) + } + } }