Merge pull request #15244 from drewhk/wip-15081-rate-detach-ops-drewhk

+str #15081 Rate detached ops
This commit is contained in:
drewhk 2014-05-23 15:18:53 +02:00
commit e0b217fe21
16 changed files with 1022 additions and 13 deletions

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
/**
* Represents a strategy that decides how to deal with a buffer that is full but is about to receive a new element.
*/
sealed abstract class OverflowStrategy
object OverflowStrategy {
/**
* INTERNAL API
*/
private[akka] final case object DropHead extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] final case object DropTail extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] final case object DropBuffer extends OverflowStrategy
/**
* INTERNAL API
*/
private[akka] final case object Backpressure extends OverflowStrategy
/**
* If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
* the new element.
*/
def dropHead: OverflowStrategy = DropHead
/**
* If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
* the new element.
*/
def dropTail: OverflowStrategy = DropTail
/**
* If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
*/
def dropBuffer: OverflowStrategy = DropBuffer
/**
* If the buffer is full when a new element is available this strategy backpressures the upstream producer until
* space becomes available in the buffer.
*/
def backpressure: OverflowStrategy = Backpressure
}

View file

@ -8,8 +8,7 @@ import scala.collection.immutable
import org.reactivestreams.api.{ Consumer, Processor, Producer }
import org.reactivestreams.spi.Subscriber
import akka.actor.ActorRefFactory
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.stream.Transformer
import akka.stream.{ OverflowStrategy, MaterializerSettings, FlowMaterializer, Transformer }
import scala.util.Try
import scala.concurrent.Future
import scala.util.Success
@ -59,6 +58,18 @@ private[akka] object Ast {
override def name = "concatFlatten"
}
case class Conflate(seed: Any Any, aggregate: (Any, Any) Any) extends AstNode {
override def name = "conflate"
}
case class Expand(seed: Any Any, extrapolate: Any (Any, Any)) extends AstNode {
override def name = "expand"
}
case class Buffer(size: Int, overflowStrategy: OverflowStrategy) extends AstNode {
override def name = "buffer"
}
trait ProducerNode[I] {
private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I]
}

View file

@ -23,13 +23,16 @@ private[akka] object ActorProcessor {
import Ast._
def props(settings: MaterializerSettings, op: AstNode): Props =
(op match {
case t: Transform Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
case t: Tee Props(new TeeImpl(settings, t.other))
case t: Transform Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
case t: Tee Props(new TeeImpl(settings, t.other))
case cf: Conflate Props(new ConflateImpl(settings, cf.seed, cf.aggregate))
case ex: Expand Props(new ExpandImpl(settings, ex.seed, ex.extrapolate))
case bf: Buffer Props(new BufferImpl(settings, bf.size, bf.overflowStrategy))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case ConcatAll Props(new ConcatAllImpl(settings))
}).withDispatcher(settings.dispatcher)

View file

@ -0,0 +1,83 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
/**
* INTERNAL API
*/
private[akka] object FixedSizeBuffer {
/**
* INTERNAL API
*
* Returns a fixed size buffer backed by an array. The buffer implementation DOES NOT check agains overflow or
* underflow, it is the responsibility of the user to track or check the capacity of the buffer before enqueueing
* dequeueing or dropping.
*
* Returns a specialized instance for power-of-two sized buffers.
*/
def apply(size: Int): FixedSizeBuffer =
if (((size - 1) & size) == 0) new PowerOfTwoFixedSizeBuffer(size)
else new ModuloFixedSizeBuffer(size)
sealed abstract class FixedSizeBuffer(val size: Int) {
protected var readIdx = 0
protected var writeIdx = 0
private var remainingCapacity = size
private val buffer = Array.ofDim[Any](size)
protected def incWriteIdx(): Unit
protected def decWriteIdx(): Unit
protected def incReadIdx(): Unit
def isFull: Boolean = remainingCapacity == 0
def isEmpty: Boolean = remainingCapacity == size
def enqueue(elem: Any): Unit = {
buffer(writeIdx) = elem
incWriteIdx()
remainingCapacity -= 1
}
def dequeue(): Any = {
val result = buffer(readIdx)
dropHead()
result
}
def clear(): Unit = {
java.util.Arrays.fill(buffer.asInstanceOf[Array[Object]], null)
readIdx = 0
writeIdx = 0
remainingCapacity = size
}
def dropHead(): Unit = {
buffer(readIdx) = null
incReadIdx()
remainingCapacity += 1
}
def dropTail(): Unit = {
decWriteIdx()
//buffer(writeIdx) = null
remainingCapacity += 1
}
}
private final class ModuloFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) {
override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) % size
override protected def decWriteIdx(): Unit = writeIdx = (writeIdx + size - 1) % size
override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) % size
}
private final class PowerOfTwoFixedSizeBuffer(_size: Int) extends FixedSizeBuffer(_size) {
private val Mask = size - 1
override protected def incReadIdx(): Unit = readIdx = (readIdx + 1) & Mask
override protected def decWriteIdx(): Unit = writeIdx = (writeIdx - 1) & Mask
override protected def incWriteIdx(): Unit = writeIdx = (writeIdx + 1) & Mask
}
}

View file

@ -9,6 +9,7 @@ import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import Ast.{ AstNode, Transform }
import akka.stream.{ OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.Flow
import scala.util.Success
@ -248,6 +249,17 @@ private[akka] trait Builder[Out] {
def tee(other: Consumer[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Consumer[Any]]))
def conflate[S](seed: Out S, aggregate: (S, Out) S): Thing[S] =
andThen(Conflate(seed.asInstanceOf[Any Any], aggregate.asInstanceOf[(Any, Any) Any]))
def expand[S, U](seed: Out S, extrapolate: S (U, S)): Thing[U] =
andThen(Expand(seed.asInstanceOf[Any Any], extrapolate.asInstanceOf[Any (Any, Any)]))
def buffer(size: Int, overflowStrategy: OverflowStrategy): Thing[Out] = {
require(size > 0, s"Buffer size must be larger than zero but was [$size]")
andThen(Buffer(size, overflowStrategy))
}
def flatten[U](strategy: FlattenStrategy[Out, U]): Thing[U] = strategy match {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]")

View file

@ -0,0 +1,92 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.{ OverflowStrategy, MaterializerSettings }
class ConflateImpl(_settings: MaterializerSettings, seed: Any Any, aggregate: (Any, Any) Any) extends ActorProcessorImpl(_settings) {
var conflated: Any = null
val waitNextZero: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { ()
conflated = seed(primaryInputs.dequeueInputElement())
nextPhase(conflateThenEmit)
}
val conflateThenEmit: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { ()
if (primaryInputs.inputsAvailable) conflated = aggregate(conflated, primaryInputs.dequeueInputElement())
if (primaryOutputs.demandAvailable) {
primaryOutputs.enqueueOutputElement(conflated)
conflated = null
nextPhase(waitNextZero)
}
}
nextPhase(waitNextZero)
}
class ExpandImpl(_settings: MaterializerSettings, seed: Any Any, extrapolate: Any (Any, Any)) extends ActorProcessorImpl(_settings) {
var extrapolateState: Any = null
val waitFirst: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { ()
extrapolateState = seed(primaryInputs.dequeueInputElement())
nextPhase(emitFirst)
}
val emitFirst: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
emitExtrapolate()
nextPhase(extrapolateOrReset)
}
val extrapolateOrReset: TransferPhase = TransferPhase(primaryInputs.NeedsInputOrComplete || primaryOutputs.NeedsDemand) { ()
if (primaryInputs.inputsDepleted) nextPhase(completedPhase)
else if (primaryInputs.inputsAvailable) {
extrapolateState = seed(primaryInputs.dequeueInputElement())
nextPhase(emitFirst)
} else emitExtrapolate()
}
def emitExtrapolate(): Unit = {
val (emit, nextState) = extrapolate(extrapolateState)
primaryOutputs.enqueueOutputElement(emit)
extrapolateState = nextState
}
nextPhase(waitFirst)
}
class BufferImpl(_settings: MaterializerSettings, size: Int, overflowStrategy: OverflowStrategy) extends ActorProcessorImpl(_settings) {
import OverflowStrategy._
val buffer = FixedSizeBuffer(size)
val dropAction: () Unit = overflowStrategy match {
case DropHead buffer.dropHead
case DropTail buffer.dropTail
case DropBuffer buffer.clear
case Backpressure () nextPhase(bufferFull)
}
val bufferEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput) { ()
buffer.enqueue(primaryInputs.dequeueInputElement())
nextPhase(bufferNonEmpty)
}
val bufferNonEmpty: TransferPhase = TransferPhase(primaryInputs.NeedsInput || primaryOutputs.NeedsDemand) { ()
if (primaryOutputs.demandAvailable) {
primaryOutputs.enqueueOutputElement(buffer.dequeue())
if (buffer.isEmpty) nextPhase(bufferEmpty)
} else {
if (buffer.isFull) dropAction()
else buffer.enqueue(primaryInputs.dequeueInputElement())
}
}
val bufferFull: TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { ()
primaryOutputs.enqueueOutputElement(buffer.dequeue())
if (buffer.isEmpty) nextPhase(bufferEmpty)
else nextPhase(bufferNonEmpty)
}
nextPhase(bufferEmpty)
}

View file

@ -15,7 +15,7 @@ import akka.japi.Pair
import akka.japi.Predicate
import akka.japi.Procedure
import akka.japi.Util.immutableSeq
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.{ Duct SDuct }
import akka.stream.impl.Ast
@ -201,6 +201,44 @@ abstract class Duct[In, Out] {
*/
def append[U](duct: Duct[_ >: In, U]): Duct[In, U]
/**
* Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary
* until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream producer is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Function[Out, S], aggregate: Function2[S, Out, S]): Duct[In, S]
/**
* Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older
* element until new element comes from the upstream. For example an expand step might repeat the last element for
* the consumer until it receives an update from upstream.
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* consumer.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
def expand[S, U](seed: Function[Out, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U]
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
* Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no
* space available
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out]
/**
* Materialize this `Duct` by attaching it to the specified downstream `consumer`
* and return a `Consumer` representing the input side of the `Duct`.
@ -311,6 +349,18 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
override def tee(other: Consumer[_ >: T]): Duct[In, T] =
new DuctAdapter(delegate.tee(other))
override def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, T] =
new DuctAdapter(delegate.buffer(size, overflowStrategy))
override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U] =
new DuctAdapter(delegate.expand(seed.apply, (s: S) {
val p = extrapolate.apply(s)
(p.first, p.second)
}))
override def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Duct[In, S] =
new DuctAdapter(delegate.conflate(seed.apply, aggregate.apply))
override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] =
new DuctAdapter(delegate.flatten(strategy))

View file

@ -16,7 +16,7 @@ import akka.japi.Pair
import akka.japi.Predicate
import akka.japi.Procedure
import akka.japi.Util.immutableSeq
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.{ Flow SFlow }
import org.reactivestreams.api.Consumer
@ -253,6 +253,44 @@ abstract class Flow[T] {
*/
def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U]
/**
* Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary
* until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream producer is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Flow[S]
/**
* Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older
* element until new element comes from the upstream. For example an expand step might repeat the last element for
* the consumer until it receives an update from upstream.
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* consumer.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Flow[U]
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
* Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no
* space available
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T]
/**
* Returns a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after
@ -372,6 +410,18 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
override def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] =
new FlowAdapter(delegate.flatten(strategy))
override def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] =
new FlowAdapter(delegate.buffer(size, overflowStrategy))
override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Flow[U] =
new FlowAdapter(delegate.expand(seed.apply, (s: S) {
val p = extrapolate.apply(s)
(p.first, p.second)
}))
override def conflate[S](seed: Function[T, S], aggregate: Function2[S, T, S]): Flow[S] =
new FlowAdapter(delegate.conflate(seed.apply, aggregate.apply))
override def append[U](duct: Duct[_ >: T, U]): Flow[U] =
new FlowAdapter(delegate.appendJava(duct))

View file

@ -8,7 +8,7 @@ import scala.collection.immutable
import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.impl.DuctImpl
import akka.stream.impl.Ast
@ -185,6 +185,44 @@ trait Duct[In, +Out] {
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): Duct[In, U]
/**
* Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary
* until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream producer is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: Out S, aggregate: (S, Out) S): Duct[In, S]
/**
* Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older
* element until new element comes from the upstream. For example an expand step might repeat the last element for
* the consumer until it receives an update from upstream.
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* consumer.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
def expand[S, U](seed: Out S, extrapolate: S (U, S)): Duct[In, U]
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
* Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no
* space available
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out]
/**
* Append the operations of a [[Duct]] to this `Duct`.
*/

View file

@ -9,7 +9,7 @@ import scala.concurrent.Future
import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.{ FlattenStrategy, OverflowStrategy, FlowMaterializer, Transformer }
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
import akka.stream.impl.Ast.FutureProducerNode
import akka.stream.impl.FlowImpl
@ -243,6 +243,44 @@ trait Flow[+T] {
*/
def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U]
/**
* Allows a faster upstream to progress independently of a slower consumer by conflating elements into a summary
* until the consumer is ready to accept them. For example a conflate step might average incoming numbers if the
* upstream producer is faster.
*
* This element only rolls up elements if the upstream is faster, but if the downstream is faster it will not
* duplicate elements.
*
* @param seed Provides the first state for a conflated value using the first unconsumed element as a start
* @param aggregate Takes the currently aggregated value and the current pending element to produce a new aggregate
*/
def conflate[S](seed: T S, aggregate: (S, T) S): Flow[S]
/**
* Allows a faster downstream to progress independently of a slower producer by extrapolating elements from an older
* element until new element comes from the upstream. For example an expand step might repeat the last element for
* the consumer until it receives an update from upstream.
*
* This element will never "drop" upstream elements as all elements go through at least one extrapolation step.
* This means that if the upstream is actually faster than the upstream it will be backpressured by the downstream
* consumer.
*
* @param seed Provides the first state for extrapolation using the first unconsumed element
* @param extrapolate Takes the current extrapolation state to produce an output element and the next extrapolation
* state.
*/
def expand[S, U](seed: T S, extrapolate: S (U, S)): Flow[U]
/**
* Adds a fixed size buffer in the flow that allows to store elements from a faster upstream until it becomes full.
* Depending on the defined [[OverflowStrategy]] it might drop elements or backpressure the upstream if there is no
* space available
*
* @param size The size of the buffer in element count
* @param overflowStrategy Strategy that is used when incoming elements cannot fit inside the buffer
*/
def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T]
/**
* Append the operations of a [[Duct]] to this flow.
*/

View file

@ -11,6 +11,7 @@ import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import akka.stream.FlattenStrategy;
import akka.stream.OverflowStrategy;
import org.junit.ClassRule;
import org.junit.Test;
@ -425,4 +426,70 @@ public class FlowTest {
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
}
@Test
public void mustBeAbleToUseBuffer() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<List<String>> future = Flow
.create(input)
.buffer(2, OverflowStrategy.backpressure())
.grouped(4)
.toFuture(materializer);
List<String> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(input, result);
}
@Test
public void mustBeAbleToUseConflate() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow
.create(input)
.conflate(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s;
}
},
new Function2<String, String, String>() {
@Override
public String apply(String in, String aggr) throws Exception {
return in;
}
}
)
.fold("", new Function2<String, String, String>() {
@Override
public String apply(String aggr, String in) throws Exception {
return in;
}
})
.toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("C", result);
}
@Test
public void mustBeAbleToUseExpand() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final List<String> input = Arrays.asList("A", "B", "C");
Future<String> future = Flow
.create(input)
.expand(new Function<String, String>() {
@Override
public String apply(String in) throws Exception {
return in;
}
},
new Function<String, Pair<String, String>>() {
@Override
public Pair<String, String> apply(String in) throws Exception {
return new Pair<String, String>(in, in);
}
}
)
.toFuture(materializer);
String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals("A", result);
}
}

View file

@ -0,0 +1,183 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.scaladsl.Flow
import scala.concurrent.Await
import scala.concurrent.duration._
import OverflowStrategy._
class FlowBufferSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 1,
maximumInputBufferSize = 1,
initialFanOutBufferSize = 1,
maxFanOutBufferSize = 1,
dispatcher = "akka.test.stream-dispatcher"))
"Buffer" must {
"pass elements through normally in backpressured mode" in {
val future = Flow((1 to 1000).iterator).buffer(100, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).toFuture(materializer)
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through normally in backpressured mode with buffer size one" in {
val future = Flow((1 to 1000).iterator).buffer(1, overflowStrategy = OverflowStrategy.backpressure).grouped(1001).toFuture(materializer)
Await.result(future, 3.seconds) should be(1 to 1000)
}
"pass elements through a chain of backpressured buffers of different size" in {
val future = Flow((1 to 1000).iterator)
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
.buffer(10, overflowStrategy = OverflowStrategy.backpressure)
.buffer(256, overflowStrategy = OverflowStrategy.backpressure)
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
.buffer(5, overflowStrategy = OverflowStrategy.backpressure)
.buffer(128, overflowStrategy = OverflowStrategy.backpressure)
.grouped(1001)
.toFuture(materializer)
Await.result(future, 3.seconds) should be(1 to 1000)
}
"accept elements that fit in the buffer while downstream is silent" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.backpressure).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
// Fill up buffer
for (i 1 to 100) autoProducer.sendNext(i)
// drain
for (i 1 to 100) {
sub.requestMore(1)
consumer.expectNext(i)
}
sub.cancel()
}
"drop head elements if buffer is full and configured so" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.dropHead).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
// Fill up buffer
for (i 1 to 200) autoProducer.sendNext(i)
// drain
for (i 101 to 200) {
sub.requestMore(1)
consumer.expectNext(i)
}
sub.requestMore(1)
consumer.expectNoMsg(1.seconds)
autoProducer.sendNext(-1)
sub.requestMore(1)
consumer.expectNext(-1)
sub.cancel()
}
"drop tail elements if buffer is full and configured so" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.dropTail).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
// Fill up buffer
for (i 1 to 200) autoProducer.sendNext(i)
// drain
for (i 1 to 99) {
sub.requestMore(1)
consumer.expectNext(i)
}
sub.requestMore(1)
consumer.expectNext(200)
sub.requestMore(1)
consumer.expectNoMsg(1.seconds)
autoProducer.sendNext(-1)
sub.requestMore(1)
consumer.expectNext(-1)
sub.cancel()
}
"drop all elements if buffer is full and configured so" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).buffer(100, overflowStrategy = OverflowStrategy.dropBuffer).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
// Fill up buffer
for (i 1 to 150) autoProducer.sendNext(i)
// drain
for (i 101 to 150) {
sub.requestMore(1)
consumer.expectNext(i)
}
sub.requestMore(1)
consumer.expectNoMsg(1.seconds)
autoProducer.sendNext(-1)
sub.requestMore(1)
consumer.expectNext(-1)
sub.cancel()
}
for (strategy List(OverflowStrategy.dropHead, OverflowStrategy.dropTail, OverflowStrategy.dropBuffer)) {
s"work with $strategy if buffer size of one" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).buffer(1, overflowStrategy = strategy).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
// Fill up buffer
for (i 1 to 200) autoProducer.sendNext(i)
sub.requestMore(1)
consumer.expectNext(200)
sub.requestMore(1)
consumer.expectNoMsg(1.seconds)
autoProducer.sendNext(-1)
sub.requestMore(1)
consumer.expectNext(-1)
sub.cancel()
}
}
}
}

View file

@ -0,0 +1,100 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.scaladsl.Flow
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowConflateSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
"Conflate" must {
"pass-through elements unchanged when there is no rate difference" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
for (i 1 to 100) {
sub.requestMore(1)
autoProducer.sendNext(i)
consumer.expectNext(i)
}
sub.cancel()
}
"conflate elements while downstream is silent" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
for (i 1 to 100) {
autoProducer.sendNext(i)
}
consumer.expectNoMsg(1.second)
sub.requestMore(1)
consumer.expectNext(5050)
sub.cancel()
}
"work on a variable rate chain" in {
val future = Flow((1 to 1000).iterator)
.conflate[Int](seed = i i, aggregate = (sum, i) sum + i)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.fold(0)(_ + _)
.toFuture(materializer)
Await.result(future, 10.seconds) should be(500500)
}
"backpressure consumer when upstream is slower" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).conflate[Int](seed = i i, aggregate = (sum, i) sum + i).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
sub.requestMore(1)
autoProducer.sendNext(1)
consumer.expectNext(1)
sub.requestMore(1)
consumer.expectNoMsg(1.second)
autoProducer.sendNext(2)
consumer.expectNext(2)
autoProducer.sendNext(3)
autoProducer.sendNext(4)
sub.requestMore(1)
consumer.expectNext(7)
sub.requestMore(1)
consumer.expectNoMsg(1.second)
sub.cancel()
}
}
}

View file

@ -0,0 +1,119 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.scaladsl.Flow
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.Await
import scala.concurrent.duration._
class FlowExpandSpec extends AkkaSpec {
val materializer = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
"Expand" must {
"pass-through elements unchanged when there is no rate difference" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
// Simply repeat the last element as an extrapolation step
Flow(producer).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
for (i 1 to 100) {
// Order is important here: If the request comes first it will be extrapolated!
autoProducer.sendNext(i)
sub.requestMore(1)
consumer.expectNext(i)
}
sub.cancel()
}
"expand elements while upstream is silent" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
// Simply repeat the last element as an extrapolation step
Flow(producer).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
autoProducer.sendNext(42)
for (i 1 to 100) {
sub.requestMore(1)
consumer.expectNext(42)
}
autoProducer.sendNext(-42)
sub.requestMore(1)
consumer.expectNext(-42)
sub.cancel()
}
"work on a variable rate chain" in {
val future = Flow((1 to 100).iterator)
.map { i if (ThreadLocalRandom.current().nextBoolean()) Thread.sleep(10); i }
.expand[Int, Int](seed = i i, extrapolate = i (i, i))
.fold(Set.empty[Int])(_ + _)
.toFuture(materializer)
Await.result(future, 10.seconds) should be(Set.empty[Int] ++ (1 to 100))
}
"backpressure producer when consumer is slower" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).expand[Int, Int](seed = i i, extrapolate = i (i, i)).produceTo(materializer, consumer)
val autoProducer = new StreamTestKit.AutoProducer(producer)
val sub = consumer.expectSubscription()
autoProducer.sendNext(1)
sub.requestMore(1)
consumer.expectNext(1)
sub.requestMore(1)
consumer.expectNext(1)
var pending = autoProducer.pendingRequests
// Deplete pending requests coming from input buffer
while (pending > 0) {
autoProducer.subscription.sendNext(2)
pending -= 1
}
// The above sends are absorbed in the input buffer, and will result in two one-sized batch requests
pending += autoProducer.subscription.expectRequestMore()
pending += autoProducer.subscription.expectRequestMore()
while (pending > 0) {
autoProducer.subscription.sendNext(2)
pending -= 1
}
producer.expectNoMsg(1.second)
sub.requestMore(2)
consumer.expectNext(2)
consumer.expectNext(2)
// Now production is resumed
autoProducer.subscription.expectRequestMore()
}
}
}

View file

@ -0,0 +1,97 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.testkit.AkkaSpec
class FixedBufferSpec extends AkkaSpec {
for (size List(1, 3, 4)) {
s"FixedSizeBuffer of size $size" must {
"start as empty" in {
val buf = FixedSizeBuffer(size)
buf.isEmpty should be(true)
buf.isFull should be(false)
}
"become nonempty after enqueueing" in {
val buf = FixedSizeBuffer(size)
buf.enqueue("test")
buf.isEmpty should be(false)
buf.isFull should be(size == 1)
}
"become full after size elements are enqueued" in {
val buf = FixedSizeBuffer(size)
for (_ 1 to size) buf.enqueue("test")
buf.isEmpty should be(false)
buf.isFull should be(true)
}
"become empty after enqueueing and tail drop" in {
val buf = FixedSizeBuffer(size)
buf.enqueue("test")
buf.dropTail()
buf.isEmpty should be(true)
buf.isFull should be(false)
}
"become empty after enqueueing and head drop" in {
val buf = FixedSizeBuffer(size)
buf.enqueue("test")
buf.dropHead()
buf.isEmpty should be(true)
buf.isFull should be(false)
}
"drop head properly" in {
val buf = FixedSizeBuffer(size)
for (elem 1 to size) buf.enqueue(elem)
buf.dropHead()
for (elem 2 to size) buf.dequeue() should be(elem)
}
"drop tail properly" in {
val buf = FixedSizeBuffer(size)
for (elem 1 to size) buf.enqueue(elem)
buf.dropTail()
for (elem 1 to size - 1) buf.dequeue() should be(elem)
}
"become non-full after tail dropped from full buffer" in {
val buf = FixedSizeBuffer(size)
for (_ 1 to size) buf.enqueue("test")
buf.dropTail()
buf.isEmpty should be(size == 1)
buf.isFull should be(false)
}
"become non-full after head dropped from full buffer" in {
val buf = FixedSizeBuffer(size)
for (_ 1 to size) buf.enqueue("test")
buf.dropTail()
buf.isEmpty should be(size == 1)
buf.isFull should be(false)
}
"work properly with full-range filling/draining cycles" in {
val buf = FixedSizeBuffer(size)
for (_ 1 to 10) {
buf.isEmpty should be(true)
buf.isFull should be(false)
for (elem 1 to size) buf.enqueue(elem)
buf.isEmpty should be(false)
buf.isFull should be(true)
for (elem 1 to size) buf.dequeue() should be(elem)
}
}
}
}
}

View file

@ -142,4 +142,15 @@ object StreamTestKit {
override def requestMore(elements: Int): Unit = subscriber.onComplete()
override def cancel(): Unit = ()
}
class AutoProducer[T](probe: ProducerProbe[T], initialPendingRequests: Int = 0) {
val subscription = probe.expectSubscription()
var pendingRequests = initialPendingRequests
def sendNext(elem: T): Unit = {
if (pendingRequests == 0) pendingRequests = subscription.expectRequestMore()
pendingRequests -= 1
subscription.sendNext(elem)
}
}
}