diff --git a/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala b/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala new file mode 100644 index 0000000000..0ec792f31d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/FlattenStrategy.scala @@ -0,0 +1,23 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import org.reactivestreams.api.Producer + +/** + * Strategy that defines how a stream of streams should be flattened into a stream of simple elements. + */ +abstract class FlattenStrategy[-T, U] + +object FlattenStrategy { + + /** + * Strategy that flattens a stream of streams by concatenating them. This means taking an incoming stream + * emitting its elements directly to the output until it completes and then taking the next stream. This has the + * consequence that if one of the input stream is infinite, no other streams after that will be consumed from. + */ + def concat[T]: FlattenStrategy[Producer[T], T] = Concat[T]() + + private[akka] case class Concat[T]() extends FlattenStrategy[Producer[T], T] +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index aeb3aa97f6..14c95a8c2f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -52,6 +52,12 @@ private[akka] object Ast { case class Tee(other: Consumer[Any]) extends AstNode { override def name = "tee" } + case class PrefixAndTail(n: Int) extends AstNode { + override def name = "prefixAndTail" + } + case object ConcatAll extends AstNode { + override def name = "concatFlatten" + } trait ProducerNode[I] { private[akka] def createProducer(materializer: ActorBasedFlowMaterializer, flowName: String): Producer[I] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 2c700e7440..201188e38e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -23,13 +23,15 @@ private[akka] object ActorProcessor { import Ast._ def props(settings: MaterializerSettings, op: AstNode): Props = (op match { - case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) - case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) - case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) - case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) - case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) - case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) - case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) + case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) + case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) + case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) + case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) + case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) + case c: Concat ⇒ Props(new ConcatImpl(settings, c.next)) + case t: Tee ⇒ Props(new TeeImpl(settings, t.other)) + case tt: PrefixAndTail ⇒ Props(new PrefixAndTailImpl(settings, tt.n)) + case ConcatAll ⇒ Props(new ConcatAllImpl(settings)) }).withDispatcher(settings.dispatcher) } @@ -159,7 +161,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu private var downstreamCompleted = false def demandAvailable = downstreamBufferSpace > 0 - override val receive = new SubReceive(waitingExposedPublisher) + override val subreceive = new SubReceive(waitingExposedPublisher) def enqueueOutputElement(elem: Any): Unit = { downstreamBufferSpace -= 1 @@ -204,7 +206,7 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu protected def waitingExposedPublisher: Actor.Receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher - receive.become(downstreamRunning) + subreceive.become(downstreamRunning) case other ⇒ throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") } @@ -244,7 +246,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin } } - override def receive = primaryInputs.subreceive orElse primaryOutputs.receive + override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive protected def onError(e: Throwable): Unit = fail(e) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala new file mode 100644 index 0000000000..1bf44792e2 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.MaterializerSettings +import org.reactivestreams.api.Producer +import akka.stream.impl.MultiStreamInputProcessor.SubstreamKey + +/** + * INTERNAL API + */ +private[akka] class ConcatAllImpl(_settings: MaterializerSettings) extends MultiStreamInputProcessor(_settings) { + + val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val producer = primaryInputs.dequeueInputElement().asInstanceOf[Producer[Any]] + val inputs = createSubstreamInputs(producer) + nextPhase(streamSubstream(inputs)) + } + + def streamSubstream(substream: SubstreamInputs): TransferPhase = + TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒ + if (substream.inputsDepleted) nextPhase(takeNextSubstream) + else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement()) + } + + nextPhase(takeNextSubstream) + + override def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = fail(e) +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index e700a23741..f7416fb5fc 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -9,11 +9,10 @@ import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import Ast.{ AstNode, Transform } -import akka.stream.FlowMaterializer +import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.Flow import scala.util.Success import scala.util.Failure -import akka.stream.Transformer import org.reactivestreams.api.Consumer import akka.stream.scaladsl.Duct @@ -210,6 +209,8 @@ private[akka] trait Builder[Out] { override def name = "take" }) + def prefixAndTail(n: Int): Thing[(immutable.Seq[Out], Producer[Out])] = andThen(PrefixAndTail(n)) + def grouped(n: Int): Thing[immutable.Seq[Out]] = transform(new Transformer[Out, immutable.Seq[Out]] { var buf: Vector[Out] = Vector.empty @@ -247,5 +248,10 @@ private[akka] trait Builder[Out] { def tee(other: Consumer[_ >: Out]): Thing[Out] = andThen(Tee(other.asInstanceOf[Consumer[Any]])) + def flatten[U](strategy: FlattenStrategy[Out, U]): Thing[U] = strategy match { + case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) + case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]") + } + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala new file mode 100644 index 0000000000..7488c37852 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala @@ -0,0 +1,53 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.MaterializerSettings +import scala.collection.immutable + +/** + * INTERNAL API + */ +private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeMax: Int) + extends MultiStreamOutputProcessor(_settings) { + + var taken = immutable.Vector.empty[Any] + var left = takeMax + + val take = TransferPhase(primaryInputs.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒ + if (primaryInputs.inputsDepleted) emitEmptyTail() + else { + val elem = primaryInputs.dequeueInputElement() + taken :+= elem + left -= 1 + if (left <= 0) { + if (primaryInputs.inputsDepleted) emitEmptyTail() + else emitNonEmptyTail() + } + } + } + + def streamTailPhase(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(primaryInputs.dequeueInputElement()) + } + + val takeEmpty = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + if (primaryInputs.inputsDepleted) emitEmptyTail() + else emitNonEmptyTail() + } + + def emitEmptyTail(): Unit = { + primaryOutputs.enqueueOutputElement((taken, EmptyProducer)) + nextPhase(completedPhase) + } + + def emitNonEmptyTail(): Unit = { + val substreamOutput = newSubstream() + primaryOutputs.enqueueOutputElement((taken, substreamOutput.processor)) + primaryOutputs.complete() + nextPhase(streamTailPhase(substreamOutput)) + } + + if (takeMax > 0) nextPhase(take) else nextPhase(takeEmpty) +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index 23aa1a704f..fbecebd2f5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -38,7 +38,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS private var completed: Boolean = false private var demands: Int = 0 - override def receive: SubReceive = + override def subreceive: SubReceive = throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") val substream = context.watch(context.actorOf( @@ -112,7 +112,7 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS } - override def receive = primaryInputs.subreceive orElse primaryOutputs.receive orElse substreamManagement + override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement } /** @@ -157,7 +157,7 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett } } - override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.receive + override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.subreceive other.getPublisher.subscribe(new OtherActorSubscriber(self)) @@ -165,4 +165,86 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett secondaryInputs.cancel() super.shutdownHooks() } +} + +/** + * INTERNAL API + */ +private[akka] object MultiStreamInputProcessor { + case class SubstreamKey(id: Int) + + class SubstreamSubscriber[T](val impl: ActorRef, key: SubstreamKey) extends Subscriber[T] { + override def onError(cause: Throwable): Unit = impl ! SubstreamOnError(key, cause) + override def onComplete(): Unit = impl ! SubstreamOnComplete(key) + override def onNext(element: T): Unit = impl ! SubstreamOnNext(key, element) + override def onSubscribe(subscription: Subscription): Unit = impl ! SubstreamStreamOnSubscribe(key, subscription) + } + + case class SubstreamOnComplete(key: SubstreamKey) + case class SubstreamOnNext(key: SubstreamKey, element: Any) + case class SubstreamOnError(key: SubstreamKey, e: Throwable) + case class SubstreamStreamOnSubscribe(key: SubstreamKey, subscription: Subscription) +} + +/** + * INTERNAL API + */ +private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { + import MultiStreamInputProcessor._ + var nextId = 0 + + private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs] + + class SubstreamInputs(val key: SubstreamKey) extends BatchingInputBuffer(settings.initialInputBufferSize, pump = this) { + // Not driven directly + override val subreceive = new SubReceive(Actor.emptyBehavior) + + def substreamOnComplete(): Unit = onComplete() + def substreamOnSubscribe(subscription: Subscription): Unit = onSubscribe(subscription) + def substreamOnError(e: Throwable): Unit = onError(e) + def substreamOnNext(elem: Any): Unit = enqueueInputElement(elem) + + override protected def inputOnError(e: Throwable): Unit = { + super.inputOnError(e) + invalidateSubstream(key, e) + } + } + + val substreamManagement: Receive = { + case SubstreamStreamOnSubscribe(key, subscription) ⇒ substreamInputs(key).substreamOnSubscribe(subscription) + case SubstreamOnNext(key, element) ⇒ substreamInputs(key).substreamOnNext(element) + case SubstreamOnComplete(key) ⇒ { + substreamInputs(key).substreamOnComplete() + substreamInputs -= key + } + case SubstreamOnError(key, e) ⇒ substreamInputs(key).substreamOnError(e) + + } + + def createSubstreamInputs(p: Producer[Any]): SubstreamInputs = { + val key = SubstreamKey(nextId) + val inputs = new SubstreamInputs(key) + p.getPublisher.subscribe(new SubstreamSubscriber(self, key)) + substreamInputs(key) = inputs + nextId += 1 + inputs + } + + protected def invalidateSubstream(substream: SubstreamKey, e: Throwable): Unit = { + substreamInputs(substream).cancel() + substreamInputs -= substream + pump() + } + + override def fail(e: Throwable): Unit = { + substreamInputs.values foreach (_.cancel()) + super.fail(e) + } + + override def shutdownHooks(): Unit = { + substreamInputs.values foreach (_.cancel()) + super.shutdownHooks() + } + + override def receive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index cc2fd40467..b4a1b7b518 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -65,7 +65,7 @@ private[akka] trait Outputs { def demandAvailable: Boolean def enqueueOutputElement(elem: Any): Unit - def receive: SubReceive + def subreceive: SubReceive def complete(): Unit def cancel(e: Throwable): Unit diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala index 70b9ef6b7b..f8f997b52e 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala @@ -105,10 +105,10 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) def setConnection(c: ActorRef): Unit = { connection = c writePump.pump() - receive.become(handleWrite) + subreceive.become(handleWrite) } - val receive = new SubReceive(Actor.emptyBehavior) + val subreceive = new SubReceive(Actor.emptyBehavior) def handleWrite: Receive = { case WriteAck ⇒ @@ -165,7 +165,7 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) } override def receive = - primaryInputs.subreceive orElse primaryOutputs.receive orElse tcpInputs.subreceive orElse tcpOutputs.receive + primaryInputs.subreceive orElse primaryOutputs.subreceive orElse tcpInputs.subreceive orElse tcpOutputs.subreceive readPump.nextPhase(readPump.running) writePump.nextPhase(writePump.running) diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index 79a57ef874..6cf2b5142b 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -48,7 +48,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher IO(Tcp) ! bindCmd.copy(handler = self) - receive.become(downstreamRunning) + subreceive.become(downstreamRunning) case other ⇒ throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") } @@ -107,7 +107,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, } - override def receive: Actor.Receive = primaryOutputs.receive orElse incomingConnections.subreceive + override def receive: Actor.Receive = primaryOutputs.subreceive orElse incomingConnections.subreceive def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { () ⇒ val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 1cf9c500b3..54f6e147c3 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -13,8 +13,7 @@ import akka.japi.Function import akka.japi.Function2 import akka.japi.Procedure import akka.japi.Util.immutableSeq -import akka.stream.FlowMaterializer -import akka.stream.Transformer +import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Duct ⇒ SDuct } import akka.stream.impl.Ast @@ -125,6 +124,13 @@ abstract class Duct[In, Out] { */ def transform[U](transformer: Transformer[Out, U]): Duct[In, U] + /** + * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair + * of an empty collection and a stream containing the whole upstream unchanged. + */ + def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[Out], Producer[Out]]] + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -182,6 +188,12 @@ abstract class Duct[In, Out] { */ def tee(other: Consumer[_ >: Out]): Duct[In, Out] + /** + * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. + * This operation can be used on a stream of element type [[Producer]]. + */ + def flatten[U](strategy: FlattenStrategy[Out, U]): Duct[In, U] + /** * Append the operations of a [[Duct]] to this `Duct`. */ @@ -272,6 +284,13 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def transform[U](transformer: Transformer[T, U]): Duct[In, U] = new DuctAdapter(delegate.transform(transformer)) + /** + * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. + */ + override def prefixAndTail(n: Int): Duct[In, Pair[java.util.List[T], Producer[T]]] = + new DuctAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail) }) + override def groupBy[K](f: Function[T, K]): Duct[In, Pair[K, Producer[T]]] = new DuctAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step @@ -290,6 +309,9 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def tee(other: Consumer[_ >: T]): Duct[In, T] = new DuctAdapter(delegate.tee(other)) + override def flatten[U](strategy: FlattenStrategy[T, U]): Duct[In, U] = + new DuctAdapter(delegate.flatten(strategy)) + override def append[U](duct: Duct[_ >: In, U]): Duct[In, U] = new DuctAdapter(delegate.appendJava(duct)) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index b7b8d8a87f..69e9794696 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -14,9 +14,8 @@ import akka.japi.Function import akka.japi.Function2 import akka.japi.Procedure import akka.japi.Util.immutableSeq -import akka.stream.FlowMaterializer +import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } import akka.stream.scaladsl.{ Flow ⇒ SFlow } -import akka.stream.Transformer import org.reactivestreams.api.Consumer import akka.stream.impl.DuctImpl @@ -178,6 +177,13 @@ abstract class Flow[T] { */ def transform[U](transformer: Transformer[T, U]): Flow[U] + /** + * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair + * of an empty collection and a stream containing the whole upstream unchanged. + */ + def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Producer[T]]] + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -240,6 +246,12 @@ abstract class Flow[T] { */ def append[U](duct: Duct[_ >: T, U]): Flow[U] + /** + * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. + * This operation can be used on a stream of element type [[Producer]]. + */ + def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] + /** * Returns a [[scala.concurrent.Future]] that will be fulfilled with the first * thing that is signaled to this stream, which can be either an element (after @@ -348,6 +360,9 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def transform[U](transformer: Transformer[T, U]): Flow[U] = new FlowAdapter(delegate.transform(transformer)) + override def prefixAndTail(n: Int): Flow[Pair[java.util.List[T], Producer[T]]] = + new FlowAdapter(delegate.prefixAndTail(n).map { case (taken, tail) ⇒ Pair(taken.asJava, tail) }) + override def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]] = new FlowAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step @@ -366,6 +381,9 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def tee(other: Consumer[_ >: T]): Flow[T] = new FlowAdapter(delegate.tee(other)) + override def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] = + new FlowAdapter(delegate.flatten(strategy)) + override def append[U](duct: Duct[_ >: T, U]): Flow[U] = new FlowAdapter(delegate.appendJava(duct)) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 61fada98ba..0caf8924dc 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -8,8 +8,7 @@ import scala.collection.immutable import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer -import akka.stream.FlowMaterializer -import akka.stream.Transformer +import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } import akka.stream.impl.DuctImpl import akka.stream.impl.Ast @@ -116,6 +115,13 @@ trait Duct[In, +Out] { */ def transform[U](transformer: Transformer[Out, U]): Duct[In, U] + /** + * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair + * of an empty collection and a stream containing the whole upstream unchanged. + */ + def prefixAndTail(n: Int): Duct[In, (immutable.Seq[Out], Producer[Out @uncheckedVariance])] + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -173,6 +179,12 @@ trait Duct[In, +Out] { */ def tee(other: Consumer[_ >: Out]): Duct[In, Out] + /** + * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. + * This operation can be used on a stream of element type [[Producer]]. + */ + def flatten[U](strategy: FlattenStrategy[Out, U]): Duct[In, U] + /** * Append the operations of a [[Duct]] to this `Duct`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 13f65f1693..cb5c6d2c52 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -9,8 +9,7 @@ import scala.concurrent.Future import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer -import akka.stream.FlowMaterializer -import akka.stream.Transformer +import akka.stream.{ FlattenStrategy, FlowMaterializer, Transformer } import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.Ast.FutureProducerNode import akka.stream.impl.FlowImpl @@ -174,6 +173,13 @@ trait Flow[+T] { */ def transform[U](transformer: Transformer[T, U]): Flow[U] + /** + * Takes up to n elements from the stream and returns a pair containing a strict sequence of the taken element + * and a stream representing the remaining elements. If ''n'' is zero or negative, then this will return a pair + * of an empty collection and a stream containing the whole upstream unchanged. + */ + def prefixAndTail(n: Int): Flow[(immutable.Seq[T], Producer[T @uncheckedVariance])] + /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -231,6 +237,12 @@ trait Flow[+T] { */ def tee(other: Consumer[_ >: T]): Flow[T] + /** + * Transforms a stream of streams into a contiguous stream of elements using the provided flattening strategy. + * This operation can be used on a stream of element type [[Producer]]. + */ + def flatten[U](strategy: FlattenStrategy[T, U]): Flow[U] + /** * Append the operations of a [[Duct]] to this flow. */ diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index fc4bef8ed6..a76c4ad065 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -10,6 +10,7 @@ import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.TimeUnit; +import akka.stream.FlattenStrategy; import org.junit.ClassRule; import org.junit.Test; @@ -387,4 +388,39 @@ public class FlowTest { assertEquals("A", result); } + @Test + public void mustBeAbleToUsePrefixAndTail() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); + Future, Producer>> future = Flow.create(input).prefixAndTail(3).toFuture(materializer); + Pair, Producer> result = + Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(Arrays.asList(1, 2, 3), result.first()); + + Future> tailFuture = Flow.create(result.second()).grouped(4).toFuture(materializer); + List tailResult = + Await.result(tailFuture, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + assertEquals(Arrays.asList(4, 5, 6), tailResult); + } + + @Test + public void mustBeAbleToUseConcatAll() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input1 = Arrays.asList(1, 2, 3); + final java.lang.Iterable input2 = Arrays.asList(4, 5); + + final List> mainInputs = Arrays.asList( + Flow.create(input1).toProducer(materializer), + Flow.create(input2).toProducer(materializer) + ); + + Future> future = + Flow.create(mainInputs).flatten(FlattenStrategy.concat()).grouped(6).toFuture(materializer); + + List result = + Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + + assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); + } + } diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala new file mode 100644 index 0000000000..1f5a8a9e6a --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatAllSpec.scala @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.scaladsl.Flow +import scala.concurrent.duration._ +import scala.concurrent.Await +import org.reactivestreams.api.Producer +import scala.util.control.NoStackTrace + +class FlowConcatAllSpec extends AkkaSpec { + + val m = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) + + "ConcatAll" must { + + val testException = new Exception("test") with NoStackTrace + + "work in the happy case" in { + val s1 = Flow((1 to 2).iterator).toProducer(m) + val s2 = Flow(List.empty[Int]).toProducer(m) + val s3 = Flow(List(3)).toProducer(m) + val s4 = Flow((4 to 6).iterator).toProducer(m) + val s5 = Flow((7 to 10).iterator).toProducer(m) + + val main: Flow[Producer[Int]] = Flow(List(s1, s2, s3, s4, s5)) + + Await.result(main.flatten(FlattenStrategy.concat).grouped(10).toFuture(m), 3.seconds) should be(1 to 10) + } + + "work together with SplitWhen" in { + Await.result( + Flow((1 to 10).iterator).splitWhen(_ % 2 == 0).flatten(FlattenStrategy.concat).grouped(10).toFuture(m), + 3.seconds) should be(1 to 10) + } + + "on onError on master stream cancel the current open substream and signal error" in { + val producer = StreamTestKit.producerProbe[Producer[Int]] + val consumer = StreamTestKit.consumerProbe[Int] + Flow(producer).flatten(FlattenStrategy.concat).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + downstream.requestMore(1000) + + val substreamProducer = StreamTestKit.producerProbe[Int] + upstream.expectRequestMore() + upstream.sendNext(substreamProducer) + val subUpstream = substreamProducer.expectSubscription() + + upstream.sendError(testException) + consumer.expectError(testException) + subUpstream.expectCancellation() + } + + "on onError on open substream, cancel the master stream and signal error " in { + val producer = StreamTestKit.producerProbe[Producer[Int]] + val consumer = StreamTestKit.consumerProbe[Int] + Flow(producer).flatten(FlattenStrategy.concat).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + downstream.requestMore(1000) + + val substreamProducer = StreamTestKit.producerProbe[Int] + upstream.expectRequestMore() + upstream.sendNext(substreamProducer) + val subUpstream = substreamProducer.expectSubscription() + + subUpstream.sendError(testException) + consumer.expectError(testException) + upstream.expectCancellation() + } + + "on cancellation cancel the current open substream and the master stream" in { + val producer = StreamTestKit.producerProbe[Producer[Int]] + val consumer = StreamTestKit.consumerProbe[Int] + Flow(producer).flatten(FlattenStrategy.concat).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + downstream.requestMore(1000) + + val substreamProducer = StreamTestKit.producerProbe[Int] + upstream.expectRequestMore() + upstream.sendNext(substreamProducer) + val subUpstream = substreamProducer.expectSubscription() + + downstream.cancel() + + subUpstream.expectCancellation() + upstream.expectCancellation() + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala new file mode 100644 index 0000000000..6a1d756e3b --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala @@ -0,0 +1,146 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.testkit.{ StreamTestKit, AkkaSpec } +import akka.stream.scaladsl.Flow +import scala.concurrent.Await +import scala.concurrent.duration._ +import akka.stream.impl.EmptyProducer +import org.reactivestreams.api.Producer +import scala.util.control.NoStackTrace + +class FlowPrefixAndTailSpec extends AkkaSpec { + + val m = FlowMaterializer(MaterializerSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2, + dispatcher = "akka.test.stream-dispatcher")) + + "PrefixAndTail" must { + + val testException = new Exception("test") with NoStackTrace + + "work on empty input" in { + Await.result(Flow(Nil).prefixAndTail(10).toFuture(m), 3.seconds) should be((Nil, EmptyProducer)) + } + + "work on short input" in { + Await.result(Flow(List(1, 2, 3)).prefixAndTail(10).toFuture(m), 3.seconds) should be((List(1, 2, 3), EmptyProducer)) + } + + "work on longer inputs" in { + val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(5).toFuture(m), 3.seconds) + takes should be(1 to 5) + Await.result(Flow(tail).grouped(6).toFuture(m), 3.seconds) should be(6 to 10) + } + + "handle zero take count" in { + val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(0).toFuture(m), 3.seconds) + takes should be(Nil) + Await.result(Flow(tail).grouped(11).toFuture(m), 3.seconds) should be(1 to 10) + } + + "work if size of take is equals to stream size" in { + val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(10).toFuture(m), 3.seconds) + takes should be(1 to 10) + val consumer = StreamTestKit.consumerProbe[Int] + Flow(tail).produceTo(m, consumer) + consumer.expectCompletedOrSubscriptionFollowedByComplete() + } + + "handle onError when no substream open" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])] + + Flow(producer).prefixAndTail(3).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + + downstream.requestMore(1) + + upstream.expectRequestMore() + upstream.sendNext(1) + upstream.sendError(testException) + + consumer.expectError(testException) + } + + "handle onError when substream is open" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])] + + Flow(producer).prefixAndTail(1).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + + downstream.requestMore(1000) + + upstream.expectRequestMore() + upstream.sendNext(1) + + val (head, tail) = consumer.expectNext() + head should be(List(1)) + consumer.expectComplete() + + val substreamConsumer = StreamTestKit.consumerProbe[Int] + Flow(tail).produceTo(m, substreamConsumer) + val subUpstream = substreamConsumer.expectSubscription() + + upstream.sendError(testException) + substreamConsumer.expectError(testException) + + } + + "handle master stream cancellation" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])] + + Flow(producer).prefixAndTail(3).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + + downstream.requestMore(1) + + upstream.expectRequestMore() + upstream.sendNext(1) + + downstream.cancel() + upstream.expectCancellation() + } + + "handle substream cancellation" in { + val producer = StreamTestKit.producerProbe[Int] + val consumer = StreamTestKit.consumerProbe[(Seq[Int], Producer[Int])] + + Flow(producer).prefixAndTail(1).produceTo(m, consumer) + + val upstream = producer.expectSubscription() + val downstream = consumer.expectSubscription() + + downstream.requestMore(1000) + + upstream.expectRequestMore() + upstream.sendNext(1) + + val (head, tail) = consumer.expectNext() + head should be(List(1)) + consumer.expectComplete() + + val substreamConsumer = StreamTestKit.consumerProbe[Int] + Flow(tail).produceTo(m, substreamConsumer) + substreamConsumer.expectSubscription().cancel() + + upstream.expectCancellation() + + } + + } + +}