Merge pull request #15226 from drewhk/wip-15088-concatall-take-tail-drewhk

ConcatAll and TakeAndTail
This commit is contained in:
drewhk 2014-05-23 09:48:16 +02:00
commit 37449745e0
17 changed files with 581 additions and 29 deletions

View file

@ -0,0 +1,23 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import org.reactivestreams.api.Producer
/**
* Strategy that defines how a stream of streams should be flattened into a stream of simple elements.
*/
abstract class FlattenStrategy[-T, U]
object FlattenStrategy {
/**
* Strategy that flattens a stream of streams by concatenating them. This means taking an incoming stream
* emitting its elements directly to the output until it completes and then taking the next stream. This has the
* consequence that if one of the input stream is infinite, no other streams after that will be consumed from.
*/
def concat[T]: FlattenStrategy[Producer[T], T] = Concat[T]()
private[akka] case class Concat[T]() extends FlattenStrategy[Producer[T], T]
}

View file

@ -52,6 +52,12 @@ private[akka] object Ast {
case class Tee(other: Consumer[Any]) extends AstNode {
override def name = "tee"
}
case class PrefixAndTail(n: Int) extends AstNode {
override def name = "prefixAndTail"
}
case object ConcatAll extends AstNode {
override def name = "concatFlatten"
}
trait ProducerNode[I] {
private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I]

View file

@ -23,13 +23,15 @@ private[akka] object ActorProcessor {
import Ast._
def props(settings: MaterializerSettings, op: AstNode): Props =
(op match {
case t: Transform Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
case t: Tee Props(new TeeImpl(settings, t.other))
case t: Transform Props(new TransformProcessorImpl(settings, t.transformer))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
case g: GroupBy Props(new GroupByProcessorImpl(settings, g.f))
case m: Merge Props(new MergeImpl(settings, m.other))
case z: Zip Props(new ZipImpl(settings, z.other))
case c: Concat Props(new ConcatImpl(settings, c.next))
case t: Tee Props(new TeeImpl(settings, t.other))
case tt: PrefixAndTail Props(new PrefixAndTailImpl(settings, tt.n))
case ConcatAll Props(new ConcatAllImpl(settings))
}).withDispatcher(settings.dispatcher)
}
@ -159,7 +161,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
private var downstreamCompleted = false
def demandAvailable = downstreamBufferSpace > 0
override val receive = new SubReceive(waitingExposedPublisher)
override val subreceive = new SubReceive(waitingExposedPublisher)
def enqueueOutputElement(elem: Any): Unit = {
downstreamBufferSpace -= 1
@ -204,7 +206,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu
protected def waitingExposedPublisher: Actor.Receive = {
case ExposedPublisher(publisher)
exposedPublisher = publisher
receive.become(downstreamRunning)
subreceive.become(downstreamRunning)
case other
throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]")
}
@ -244,7 +246,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin
}
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.receive
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive
protected def onError(e: Throwable): Unit = fail(e)

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.MaterializerSettings
import org.reactivestreams.api.Producer
import akka.stream.impl.MultiStreamInputProcessor.SubstreamKey
/**
* INTERNAL API
*/
private[akka] class ConcatAllImpl(_settings: MaterializerSettings) extends MultiStreamInputProcessor(_settings) {
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { ()
val producer = primaryInputs.dequeueInputElement().asInstanceOf[Producer[Any]]
val inputs = createSubstreamInputs(producer)
nextPhase(streamSubstream(inputs))
}
def streamSubstream(substream: SubstreamInputs): TransferPhase =
TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { ()
if (substream.inputsDepleted) nextPhase(takeNextSubstream)
else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement())
}
nextPhase(takeNextSubstream)
override def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = fail(e)
}

View file

@ -9,11 +9,10 @@ import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import Ast.{ AstNode, Transform }
import akka.stream.FlowMaterializer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.Flow
import scala.util.Success
import scala.util.Failure
import akka.stream.Transformer
import org.reactivestreams.api.Consumer
import akka.stream.scaladsl.Duct
@ -210,6 +209,8 @@ private[akka] trait Builder[Out] {
override def name = "take"
})
def prefixAndTail(n: Int): Thing[(immutable.Seq[Out], Producer[Out])] = andThen(PrefixAndTail(n))
def grouped(n: Int): Thing[immutable.Seq[Out]] =
transform(new Transformer[Out, immutable.Seq[Out]] {
var buf: Vector[Out] = Vector.empty
@ -247,5 +248,10 @@ private[akka] trait Builder[Out] {
def tee(other: Consumer[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Consumer[Any]]))
def flatten[U](strategy: FlattenStrategy[Out, U]): Thing[U] = strategy match {
case _: FlattenStrategy.Concat[Out] andThen(ConcatAll)
case _ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]")
}
}

View file

@ -0,0 +1,53 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.MaterializerSettings
import scala.collection.immutable
/**
* INTERNAL API
*/
private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeMax: Int)
extends MultiStreamOutputProcessor(_settings) {
var taken = immutable.Vector.empty[Any]
var left = takeMax
val take = TransferPhase(primaryInputs.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { ()
if (primaryInputs.inputsDepleted) emitEmptyTail()
else {
val elem = primaryInputs.dequeueInputElement()
taken :+= elem
left -= 1
if (left <= 0) {
if (primaryInputs.inputsDepleted) emitEmptyTail()
else emitNonEmptyTail()
}
}
}
def streamTailPhase(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { ()
substream.enqueueOutputElement(primaryInputs.dequeueInputElement())
}
val takeEmpty = TransferPhase(primaryOutputs.NeedsDemand) { ()
if (primaryInputs.inputsDepleted) emitEmptyTail()
else emitNonEmptyTail()
}
def emitEmptyTail(): Unit = {
primaryOutputs.enqueueOutputElement((taken, EmptyProducer))
nextPhase(completedPhase)
}
def emitNonEmptyTail(): Unit = {
val substreamOutput = newSubstream()
primaryOutputs.enqueueOutputElement((taken, substreamOutput.processor))
primaryOutputs.complete()
nextPhase(streamTailPhase(substreamOutput))
}
if (takeMax > 0) nextPhase(take) else nextPhase(takeEmpty)
}

View file

@ -38,7 +38,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
private var completed: Boolean = false
private var demands: Int = 0
override def receive: SubReceive =
override def subreceive: SubReceive =
throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block")
val substream = context.watch(context.actorOf(
@ -112,7 +112,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.receive orElse substreamManagement
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
}
/**
@ -157,7 +157,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
}
}
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.receive
override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive
other.getPublisher.subscribe(new OtherActorSubscriber(self))
@ -165,4 +165,86 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett
secondaryInputs.cancel()
super.shutdownHooks()
}
}
/**
* INTERNAL API
*/
private[akka] object MultiStreamInputProcessor {
case class SubstreamKey(id: Int)
class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] {
override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause)
override def onComplete(): Unit = impl ! SubstreamOnComplete(key)
override def onNext(element: T): Unit = impl ! SubstreamOnNext(key, element)
override def onSubscribe(subscription: Subscription): Unit = impl ! SubstreamStreamOnSubscribe(key, subscription)
}
case class SubstreamOnComplete(key: SubstreamKey)
case class SubstreamOnNext(key: SubstreamKey, element: Any)
case class SubstreamOnError(key: SubstreamKey, e: Throwable)
case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription)
}
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamInputProcessor._
var nextId = 0
private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs]
class SubstreamInputs(val key: SubstreamKey) extends BatchingInputBuffer(settings.initialInputBufferSize, pump = this) {
// Not driven directly
override val subreceive = new SubReceive(Actor.emptyBehavior)
def substreamOnComplete(): Unit = onComplete()
def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription)
def substreamOnError(e: Throwable): Unit = onError(e)
def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem)
override protected def inputOnError(e: Throwable): Unit = {
super.inputOnError(e)
invalidateSubstream(key, e)
}
}
val substreamManagement: Receive = {
case SubstreamStreamOnSubscribe(key, subscription) substreamInputs(key).substreamOnSubscribe(subscription)
case SubstreamOnNext(key, element) substreamInputs(key).substreamOnNext(element)
case SubstreamOnComplete(key) {
substreamInputs(key).substreamOnComplete()
substreamInputs -= key
}
case SubstreamOnError(key, e) substreamInputs(key).substreamOnError(e)
}
def createSubstreamInputs(p: Producer[Any]): SubstreamInputs = {
val key = SubstreamKey(nextId)
val inputs = new SubstreamInputs(key)
p.getPublisher.subscribe(new SubstreamSubscriber(self, key))
substreamInputs(key) = inputs
nextId += 1
inputs
}
protected def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = {
substreamInputs(substream).cancel()
substreamInputs -= substream
pump()
}
override def fail(e: Throwable): Unit = {
substreamInputs.values foreach (_.cancel())
super.fail(e)
}
override def shutdownHooks(): Unit = {
substreamInputs.values foreach (_.cancel())
super.shutdownHooks()
}
override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement
}

View file

@ -65,7 +65,7 @@ private[akka] trait Outputs {
def demandAvailable: Boolean
def enqueueOutputElement(elem: Any): Unit
def receive: SubReceive
def subreceive: SubReceive
def complete(): Unit
def cancel(e: Throwable): Unit

View file

@ -105,10 +105,10 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings)
def setConnection(c: ActorRef): Unit = {
connection = c
writePump.pump()
receive.become(handleWrite)
subreceive.become(handleWrite)
}
val receive = new SubReceive(Actor.emptyBehavior)
val subreceive = new SubReceive(Actor.emptyBehavior)
def handleWrite: Receive = {
case WriteAck
@ -165,7 +165,7 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings)
}
override def receive =
primaryInputs.subreceive orElse primaryOutputs.receive orElse tcpInputs.subreceive orElse tcpOutputs.receive
primaryInputs.subreceive orElse primaryOutputs.subreceive orElse tcpInputs.subreceive orElse tcpOutputs.subreceive
readPump.nextPhase(readPump.running)
writePump.nextPhase(writePump.running)

View file

@ -48,7 +48,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
case ExposedPublisher(publisher)
exposedPublisher = publisher
IO(Tcp) ! bindCmd.copy(handler = self)
receive.become(downstreamRunning)
subreceive.become(downstreamRunning)
case other
throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]")
}
@ -107,7 +107,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef,
}
override def receive: Actor.Receive = primaryOutputs.receive orElse incomingConnections.subreceive
override def receive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive
def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { ()
val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement()

View file

@ -13,8 +13,7 @@ import akka.japi.Function
import akka.japi.Function2
import akka.japi.Procedure
import akka.japi.Util.immutableSeq
import akka.stream.FlowMaterializer
import akka.stream.Transformer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.{ Duct SDuct }
import akka.stream.impl.Ast
@ -125,6 +124,13 @@ abstract class Duct[In, Out] {
*/
def transform[U](transformer: Transformer[Out, U]): Duct[In, U]
/**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*/
def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[Out], Producer[Out]]]
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
@ -182,6 +188,12 @@ abstract class Duct[In, Out] {
*/
def tee(other: Consumer[_ >: Out]): Duct[In, Out]
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Producer]].
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): Duct[In, U]
/**
* Append the operations of a [[Duct]] to this `Duct`.
*/
@ -272,6 +284,13 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
override def transform[U](transformer: Transformer[T, U]): Duct[In, U] =
new DuctAdapter(delegate.transform(transformer))
/**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements.
*/
override def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[T], Producer[T]]] =
new DuctAdapter(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail) })
override def groupBy[K](f: Function[T, K]): Duct[In, Pair[K, Producer[T]]] =
new DuctAdapter(delegate.groupBy(f.apply).map { case (k, p) Pair(k, p) }) // FIXME optimize to one step
@ -290,6 +309,9 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In,
override def tee(other: Consumer[_ >: T]): Duct[In, T] =
new DuctAdapter(delegate.tee(other))
override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] =
new DuctAdapter(delegate.flatten(strategy))
override def append[U](duct: Duct[_ >: In, U]): Duct[In, U] =
new DuctAdapter(delegate.appendJava(duct))

View file

@ -14,9 +14,8 @@ import akka.japi.Function
import akka.japi.Function2
import akka.japi.Procedure
import akka.japi.Util.immutableSeq
import akka.stream.FlowMaterializer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.scaladsl.{ Flow SFlow }
import akka.stream.Transformer
import org.reactivestreams.api.Consumer
import akka.stream.impl.DuctImpl
@ -178,6 +177,13 @@ abstract class Flow[T] {
*/
def transform[U](transformer: Transformer[T, U]): Flow[U]
/**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*/
def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Producer[T]]]
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
@ -240,6 +246,12 @@ abstract class Flow[T] {
*/
def append[U](duct: Duct[_ >: T, U]): Flow[U]
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Producer]].
*/
def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U]
/**
* Returns a [[scala.concurrent.Future]] that will be fulfilled with the first
* thing that is signaled to this stream, which can be either an element (after
@ -348,6 +360,9 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
override def transform[U](transformer: Transformer[T, U]): Flow[U] =
new FlowAdapter(delegate.transform(transformer))
override def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Producer[T]]] =
new FlowAdapter(delegate.prefixAndTail(n).map { case (taken, tail) Pair(taken.asJava, tail) })
override def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]] =
new FlowAdapter(delegate.groupBy(f.apply).map { case (k, p) Pair(k, p) }) // FIXME optimize to one step
@ -366,6 +381,9 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] {
override def tee(other: Consumer[_ >: T]): Flow[T] =
new FlowAdapter(delegate.tee(other))
override def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] =
new FlowAdapter(delegate.flatten(strategy))
override def append[U](duct: Duct[_ >: T, U]): Flow[U] =
new FlowAdapter(delegate.appendJava(duct))

View file

@ -8,8 +8,7 @@ import scala.collection.immutable
import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import akka.stream.FlowMaterializer
import akka.stream.Transformer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.impl.DuctImpl
import akka.stream.impl.Ast
@ -116,6 +115,13 @@ trait Duct[In, +Out] {
*/
def transform[U](transformer: Transformer[Out, U]): Duct[In, U]
/**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*/
def prefixAndTail(n: Int): Duct[In, (immutable.Seq[Out], Producer[Out @uncheckedVariance])]
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
@ -173,6 +179,12 @@ trait Duct[In, +Out] {
*/
def tee(other: Consumer[_ >: Out]): Duct[In, Out]
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Producer]].
*/
def flatten[U](strategy: FlattenStrategy[Out, U]): Duct[In, U]
/**
* Append the operations of a [[Duct]] to this `Duct`.
*/

View file

@ -9,8 +9,7 @@ import scala.concurrent.Future
import scala.util.Try
import org.reactivestreams.api.Consumer
import org.reactivestreams.api.Producer
import akka.stream.FlowMaterializer
import akka.stream.Transformer
import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer }
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
import akka.stream.impl.Ast.FutureProducerNode
import akka.stream.impl.FlowImpl
@ -174,6 +173,13 @@ trait Flow[+T] {
*/
def transform[U](transformer: Transformer[T, U]): Flow[U]
/**
* Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element
* and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair
* of an empty collection and a stream containing the whole upstream unchanged.
*/
def prefixAndTail(n: Int): Flow[(immutable.Seq[T], Producer[T @uncheckedVariance])]
/**
* This operation demultiplexes the incoming stream into separate output
* streams, one for each element key. The key is computed for each element
@ -231,6 +237,12 @@ trait Flow[+T] {
*/
def tee(other: Consumer[_ >: T]): Flow[T]
/**
* Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy.
* This operation can be used on a stream of element type [[Producer]].
*/
def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U]
/**
* Append the operations of a [[Duct]] to this flow.
*/

View file

@ -10,6 +10,7 @@ import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.TimeUnit;
import akka.stream.FlattenStrategy;
import org.junit.ClassRule;
import org.junit.Test;
@ -387,4 +388,39 @@ public class FlowTest {
assertEquals("A", result);
}
@Test
public void mustBeAbleToUsePrefixAndTail() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input = Arrays.asList(1, 2, 3, 4, 5, 6);
Future<Pair<List<Integer>, Producer<Integer>>> future = Flow.create(input).prefixAndTail(3).toFuture(materializer);
Pair<List<Integer>, Producer<Integer>> result =
Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(Arrays.asList(1, 2, 3), result.first());
Future<List<Integer>> tailFuture = Flow.create(result.second()).grouped(4).toFuture(materializer);
List<Integer> tailResult =
Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(Arrays.asList(4, 5, 6), tailResult);
}
@Test
public void mustBeAbleToUseConcatAll() throws Exception {
final JavaTestKit probe = new JavaTestKit(system);
final java.lang.Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
final java.lang.Iterable<Integer> input2 = Arrays.asList(4, 5);
final List<Producer<Integer>> mainInputs = Arrays.asList(
Flow.create(input1).toProducer(materializer),
Flow.create(input2).toProducer(materializer)
);
Future<List<Integer>> future =
Flow.create(mainInputs).<Integer>flatten(FlattenStrategy.<Integer>concat()).grouped(6).toFuture(materializer);
List<Integer> result =
Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
}
}

View file

@ -0,0 +1,104 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.scaladsl.Flow
import scala.concurrent.duration._
import scala.concurrent.Await
import org.reactivestreams.api.Producer
import scala.util.control.NoStackTrace
class FlowConcatAllSpec extends AkkaSpec {
val m = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
"ConcatAll" must {
val testException = new Exception("test") with NoStackTrace
"work in the happy case" in {
val s1 = Flow((1 to 2).iterator).toProducer(m)
val s2 = Flow(List.empty[Int]).toProducer(m)
val s3 = Flow(List(3)).toProducer(m)
val s4 = Flow((4 to 6).iterator).toProducer(m)
val s5 = Flow((7 to 10).iterator).toProducer(m)
val main: Flow[Producer[Int]] = Flow(List(s1, s2, s3, s4, s5))
Await.result(main.flatten(FlattenStrategy.concat).grouped(10).toFuture(m), 3.seconds) should be(1 to 10)
}
"work together with SplitWhen" in {
Await.result(
Flow((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).grouped(10).toFuture(m),
3.seconds) should be(1 to 10)
}
"on onError on master stream cancel the current open substream and signal error" in {
val producer = StreamTestKit.producerProbe[Producer[Int]]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).flatten(FlattenStrategy.concat).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1000)
val substreamProducer = StreamTestKit.producerProbe[Int]
upstream.expectRequestMore()
upstream.sendNext(substreamProducer)
val subUpstream = substreamProducer.expectSubscription()
upstream.sendError(testException)
consumer.expectError(testException)
subUpstream.expectCancellation()
}
"on onError on open substream, cancel the master stream and signal error " in {
val producer = StreamTestKit.producerProbe[Producer[Int]]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).flatten(FlattenStrategy.concat).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1000)
val substreamProducer = StreamTestKit.producerProbe[Int]
upstream.expectRequestMore()
upstream.sendNext(substreamProducer)
val subUpstream = substreamProducer.expectSubscription()
subUpstream.sendError(testException)
consumer.expectError(testException)
upstream.expectCancellation()
}
"on cancellation cancel the current open substream and the master stream" in {
val producer = StreamTestKit.producerProbe[Producer[Int]]
val consumer = StreamTestKit.consumerProbe[Int]
Flow(producer).flatten(FlattenStrategy.concat).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1000)
val substreamProducer = StreamTestKit.producerProbe[Int]
upstream.expectRequestMore()
upstream.sendNext(substreamProducer)
val subUpstream = substreamProducer.expectSubscription()
downstream.cancel()
subUpstream.expectCancellation()
upstream.expectCancellation()
}
}
}

View file

@ -0,0 +1,146 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.stream.testkit.{ StreamTestKit, AkkaSpec }
import akka.stream.scaladsl.Flow
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.stream.impl.EmptyProducer
import org.reactivestreams.api.Producer
import scala.util.control.NoStackTrace
class FlowPrefixAndTailSpec extends AkkaSpec {
val m = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,
maxFanOutBufferSize = 2,
dispatcher = "akka.test.stream-dispatcher"))
"PrefixAndTail" must {
val testException = new Exception("test") with NoStackTrace
"work on empty input" in {
Await.result(Flow(Nil).prefixAndTail(10).toFuture(m), 3.seconds) should be((Nil, EmptyProducer))
}
"work on short input" in {
Await.result(Flow(List(1, 2, 3)).prefixAndTail(10).toFuture(m), 3.seconds) should be((List(1, 2, 3), EmptyProducer))
}
"work on longer inputs" in {
val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(5).toFuture(m), 3.seconds)
takes should be(1 to 5)
Await.result(Flow(tail).grouped(6).toFuture(m), 3.seconds) should be(6 to 10)
}
"handle zero take count" in {
val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(0).toFuture(m), 3.seconds)
takes should be(Nil)
Await.result(Flow(tail).grouped(11).toFuture(m), 3.seconds) should be(1 to 10)
}
"work if size of take is equals to stream size" in {
val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(10).toFuture(m), 3.seconds)
takes should be(1 to 10)
val consumer = StreamTestKit.consumerProbe[Int]
Flow(tail).produceTo(m, consumer)
consumer.expectCompletedOrSubscriptionFollowedByComplete()
}
"handle onError when no substream open" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])]
Flow(producer).prefixAndTail(3).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1)
upstream.expectRequestMore()
upstream.sendNext(1)
upstream.sendError(testException)
consumer.expectError(testException)
}
"handle onError when substream is open" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])]
Flow(producer).prefixAndTail(1).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1000)
upstream.expectRequestMore()
upstream.sendNext(1)
val (head, tail) = consumer.expectNext()
head should be(List(1))
consumer.expectComplete()
val substreamConsumer = StreamTestKit.consumerProbe[Int]
Flow(tail).produceTo(m, substreamConsumer)
val subUpstream = substreamConsumer.expectSubscription()
upstream.sendError(testException)
substreamConsumer.expectError(testException)
}
"handle master stream cancellation" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])]
Flow(producer).prefixAndTail(3).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1)
upstream.expectRequestMore()
upstream.sendNext(1)
downstream.cancel()
upstream.expectCancellation()
}
"handle substream cancellation" in {
val producer = StreamTestKit.producerProbe[Int]
val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])]
Flow(producer).prefixAndTail(1).produceTo(m, consumer)
val upstream = producer.expectSubscription()
val downstream = consumer.expectSubscription()
downstream.requestMore(1000)
upstream.expectRequestMore()
upstream.sendNext(1)
val (head, tail) = consumer.expectNext()
head should be(List(1))
consumer.expectComplete()
val substreamConsumer = StreamTestKit.consumerProbe[Int]
Flow(tail).produceTo(m, substreamConsumer)
substreamConsumer.expectSubscription().cancel()
upstream.expectCancellation()
}
}
}