diff --git a/akka-stream/src/main/scala/akka/stream/Stream.scala b/akka-stream/src/main/scala/akka/stream/Stream.scala index 76ea209db0..bb51a7a857 100644 --- a/akka-stream/src/main/scala/akka/stream/Stream.scala +++ b/akka-stream/src/main/scala/akka/stream/Stream.scala @@ -44,6 +44,14 @@ trait Stream[T] { f: (S, Try[T]) ⇒ (S, immutable.Seq[U]), onComplete: S ⇒ immutable.Seq[U] = (_: S) ⇒ Nil, isComplete: S ⇒ Boolean = (_: S) ⇒ false): Stream[U] + + def groupBy[K](f: T ⇒ K): Stream[(K, Producer[T])] + def splitWhen(p: T ⇒ Boolean): Stream[Producer[T]] + + def merge(other: Producer[T]): Stream[T] + def zip[U](other: Producer[U]): Stream[(T, U)] + def concat(next: Producer[T]): Stream[T] + def toFuture(generator: ProcessorGenerator): Future[T] def consume(generator: ProcessorGenerator): Unit def toProducer(generator: ProcessorGenerator): Producer[T] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 8c058aedd8..2d6bef20b3 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -3,12 +3,11 @@ */ package akka.stream.impl -import java.util.Arrays import scala.collection.immutable import scala.util.{ Failure, Success } import scala.util.control.NonFatal import org.reactivestreams.api.Processor -import org.reactivestreams.spi.{ Subscriber, Subscription } +import org.reactivestreams.spi.Subscriber import akka.actor.{ Actor, ActorLogging, ActorRef, Props } import akka.stream.GeneratorSettings import akka.event.LoggingReceive @@ -21,6 +20,11 @@ private[akka] object ActorProcessor { def props(settings: GeneratorSettings, op: AstNode): Props = op match { case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t)) case r: Recover ⇒ Props(new RecoverProcessorImpl(settings, r)) + case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) + case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) + case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) + case z: Zip ⇒ Props(new ZipImpl(settings, z.other)) + case c: Concat ⇒ Props(new ConcatImpl(settings, c.other)) } } @@ -30,7 +34,6 @@ class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] wi * INTERNAL API */ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) extends Actor with SubscriberManagement[Any] with ActorLogging { - import ActorProcessor._ type S = ActorSubscription[Any] override def maxBufferSize: Int = settings.maxFanOutBufferSize @@ -39,9 +42,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) override def receive = waitingExposedPublisher + protected var primaryInputs: Inputs = _ + ////////////////////// Startup phases ////////////////////// - var upstream: Subscription = _ var exposedPublisher: ActorPublisher[Any] = _ def waitingExposedPublisher: Receive = { @@ -52,20 +56,27 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) } def waitingForUpstream: Receive = downstreamManagement orElse { - case OnComplete ⇒ shutdown(completed = true) // There is nothing to flush here + case OnComplete ⇒ + // Instead of introducing an edge case, handle it in the general way + primaryInputs = EmptyInputs + transitionToRunningWhenReady() case OnSubscribe(subscription) ⇒ assert(subscription != null) - upstream = subscription - // Prime up input buffer - upstream.requestMore(inputBuffer.length) - context.become(running) + primaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize) + transitionToRunningWhenReady() case OnError(cause) ⇒ failureReceived(cause) } + def transitionToRunningWhenReady(): Unit = if (primaryInputs ne null) { + primaryInputs.prefetch() + transferState = initialTransferState + context.become(running) + } + ////////////////////// Management of subscribers ////////////////////// // All methods called here are implemented by SubscriberManagement - val downstreamManagement: Receive = { + def downstreamManagement: Receive = { case SubscribePending ⇒ subscribePending() case RequestMore(subscription, elements) ⇒ @@ -83,9 +94,10 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) def running: Receive = LoggingReceive(downstreamManagement orElse { case OnNext(element) ⇒ - enqueueInputElement(element) + primaryInputs.enqueueInputElement(element) pump() case OnComplete ⇒ + primaryInputs.complete() flushAndComplete() pump() case OnError(cause) ⇒ failureReceived(cause) @@ -93,12 +105,15 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Called by SubscriberManagement when all subscribers are gone. // The method shutdown() is called automatically by SubscriberManagement after it called this method. - override def cancelUpstream(): Unit = if (upstream ne null) upstream.cancel() + override def cancelUpstream(): Unit = { + if (primaryInputs ne null) primaryInputs.cancel() + PrimaryOutputs.cancel() + } // Called by SubscriberManagement whenever the output buffer is ready to accept additional elements override protected def requestFromUpstream(elements: Int): Unit = { log.debug(s"received downstream demand from buffer: $elements") - downstreamBufferSpace += elements + PrimaryOutputs.enqueueOutputDemand(elements) } def failureReceived(e: Throwable): Unit = fail(e) @@ -107,80 +122,45 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) shutdownReason = Some(e) log.error(e, "failure during processing") // FIXME: escalate to supervisor instead abortDownstream(e) - if (upstream ne null) upstream.cancel() + if (primaryInputs ne null) primaryInputs.cancel() context.stop(self) } - private var downstreamBufferSpace = 0 - private var inputBuffer = Array.ofDim[AnyRef](settings.initialInputBufferSize) - private var inputBufferElements = 0 - private var nextInputElementCursor = 0 - val IndexMask = settings.initialInputBufferSize - 1 + object PrimaryOutputs extends Outputs { + private var downstreamBufferSpace = 0 + private var downstreamCompleted = false + def demandAvailable = downstreamBufferSpace > 0 - // TODO: buffer and batch sizing heuristics - def requestBatchSize = math.max(1, inputBuffer.length / 2) - private var batchRemaining = requestBatchSize - - def dequeueInputElement(): Any = { - val elem = inputBuffer(nextInputElementCursor) - inputBuffer(nextInputElementCursor) = null - - batchRemaining -= 1 - if (batchRemaining == 0 && !upstreamCompleted) { - upstream.requestMore(requestBatchSize) - batchRemaining = requestBatchSize + def enqueueOutputDemand(demand: Int): Unit = downstreamBufferSpace += demand + def enqueueOutputElement(elem: Any): Unit = { + downstreamBufferSpace -= 1 + pushToDownstream(elem) } - inputBufferElements -= 1 - nextInputElementCursor += 1 - nextInputElementCursor &= IndexMask - elem + def complete(): Unit = downstreamCompleted = true + def cancel(): Unit = downstreamCompleted = true + def isComplete: Boolean = downstreamCompleted + override val NeedsDemand: TransferState = new TransferState { + def isReady = demandAvailable + def isCompleted = downstreamCompleted + } + override def NeedsDemandOrCancel: TransferState = new TransferState { + def isReady = demandAvailable || downstreamCompleted + def isCompleted = false + } } - def enqueueInputElement(elem: Any): Unit = { - inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] - inputBufferElements += 1 - } + def needsPrimaryInputAndDemand = primaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand - def enqueueOutputElement(elem: Any): Unit = { - downstreamBufferSpace -= 1 - pushToDownstream(elem) - } - - // States of the operation that is executed by this processor - trait TransferState { - protected def isReady: Boolean - def isCompleted: Boolean - def isExecutable = isReady && !isCompleted - def inputsAvailable = inputBufferElements > 0 - def demandAvailable = downstreamBufferSpace > 0 - def inputsDepleted = upstreamCompleted && inputBufferElements == 0 - } - object NeedsInput extends TransferState { - def isReady = inputsAvailable || inputsDepleted - def isCompleted = false - } - object NeedsDemand extends TransferState { - def isReady = demandAvailable - def isCompleted = false - } - object NeedsInputAndDemand extends TransferState { - def isReady = inputsAvailable && demandAvailable || inputsDepleted - def isCompleted = false - } - object Completed extends TransferState { - def isReady = false - def isCompleted = true - } - - var transferState: TransferState = NeedsInputAndDemand + var transferState: TransferState = _ + protected def initialTransferState: TransferState // Exchange input buffer elements and output buffer "requests" until one of them becomes empty. // Generate upstream requestMore for every Nth consumed input element protected def pump(): Unit = { try while (transferState.isExecutable) { + transferState = transfer() log.debug(s"iterating the pump with state $transferState and buffer $bufferDebug") - transferState = transfer(transferState) } catch { case NonFatal(e) ⇒ fail(e) } log.debug(s"finished iterating the pump with state $transferState and buffer $bufferDebug") @@ -188,9 +168,8 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) if (transferState.isCompleted) { if (!isShuttingDown) { log.debug("shutting down the pump") - if (!upstreamCompleted) upstream.cancel() - Arrays.fill(inputBuffer, 0, inputBuffer.length, null) - inputBufferElements = 0 + if (!primaryInputs.isCompleted) primaryInputs.cancel() + primaryInputs.clear() context.become(flushing) isShuttingDown = true } @@ -200,16 +179,11 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) // Needs to be implemented by Processor implementations. Transfers elements from the input buffer to the output // buffer. - protected def transfer(current: TransferState): TransferState + protected def transfer(): TransferState ////////////////////// Completing and Flushing ////////////////////// - var upstreamCompleted = false - - protected def flushAndComplete(): Unit = { - upstreamCompleted = true - context.become(flushing) - } + protected def flushAndComplete(): Unit = context.become(flushing) def flushing: Receive = downstreamManagement orElse { case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") @@ -219,6 +193,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) ////////////////////// Shutdown and cleanup (graceful and abort) ////////////////////// var isShuttingDown = false + var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason // Called by SubscriberManagement to signal that output buffer finished (flushed or aborted) @@ -226,6 +201,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) isShuttingDown = true if (completed) shutdownReason = None + PrimaryOutputs.complete() context.stop(self) } @@ -257,27 +233,34 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast // TODO performance improvement: mutable buffer? var emits = immutable.Seq.empty[Any] - override def transfer(current: TransferState): TransferState = { - val depleted = current.inputsDepleted + object NeedsInputAndDemandOrCompletion extends TransferState { + def isReady = primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable || primaryInputs.inputsDepleted + def isCompleted = false + } + + override def initialTransferState = NeedsInputAndDemandOrCompletion + + override def transfer(): TransferState = { + val depleted = primaryInputs.inputsDepleted if (emits.isEmpty) { isComplete = op.isComplete(state) if (depleted || isComplete) { emits = op.onComplete(state) hasOnCompleteRun = true } else { - val e = dequeueInputElement() + val e = primaryInputs.dequeueInputElement() val (nextState, newEmits) = op.f(state, e) state = nextState emits = newEmits } } else { - enqueueOutputElement(emits.head) + PrimaryOutputs.enqueueOutputElement(emits.head) emits = emits.tail } - if (emits.nonEmpty) NeedsDemand + if (emits.nonEmpty) PrimaryOutputs.NeedsDemand else if (hasOnCompleteRun) Completed - else NeedsInputAndDemand + else NeedsInputAndDemandOrCompletion } override def toString: String = s"Transformer(state=$state, isComplete=$isComplete, hasOnCompleteRun=$hasOnCompleteRun, emits=$emits)" @@ -287,12 +270,39 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast * INTERNAL API */ private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) { - override def enqueueInputElement(elem: Any): Unit = { - super.enqueueInputElement(Success(elem)) + + val WrapInSuccess: Receive = { + case OnNext(elem) ⇒ + primaryInputs.enqueueInputElement(Success(elem)) + pump() } + + override def running: Receive = WrapInSuccess orElse super.running + override def failureReceived(e: Throwable): Unit = { - super.enqueueInputElement(Failure(e)) + primaryInputs.enqueueInputElement(Failure(e)) + primaryInputs.complete() flushAndComplete() pump() } } + +/** + * INTERNAL API + */ +private[akka] object IdentityProcessorImpl { + def props(settings: GeneratorSettings): Props = Props(new IdentityProcessorImpl(settings)) +} + +/** + * INTERNAL API + */ +private[akka] class IdentityProcessorImpl(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) { + + override def initialTransferState = needsPrimaryInputAndDemand + override protected def transfer(): TransferState = { + PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) + needsPrimaryInputAndDemand + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala new file mode 100644 index 0000000000..a4aac4191d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -0,0 +1,80 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import org.reactivestreams.spi.Subscription +import akka.actor.{ Terminated, Props, ActorRef } +import akka.stream.{ Stream, GeneratorSettings } +import akka.stream.impl._ + +/** + * INTERNAL API + */ +private[akka] object GroupByProcessorImpl { + + trait SubstreamElementState + case object NoPending extends SubstreamElementState + case class PendingElement(elem: Any, key: Any) extends SubstreamElementState + case class PendingElementForNewStream(elem: Any, key: Any) extends SubstreamElementState +} + +/** + * INTERNAL API + */ +private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { + import GroupByProcessorImpl._ + + var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] + + var substreamPendingState: SubstreamElementState = NoPending + def substreamsFinished: Boolean = keyToSubstreamOutputs.isEmpty + + override def initialTransferState = needsPrimaryInputAndDemand + + override def transfer(): TransferState = substreamPendingState match { + case PendingElementForNewStream(elem, key) ⇒ + if (PrimaryOutputs.isComplete) { + substreamPendingState = NoPending + // Just drop, we do not open any more substreams + primaryInputs.NeedsInput + } else { + val substreamOutput = newSubstream() + pushToDownstream((key, substreamOutput.processor)) + keyToSubstreamOutputs(key) = substreamOutput + substreamPendingState = PendingElement(elem, key) + substreamOutput.NeedsDemand + } + + case PendingElement(elem, key) ⇒ + if (!keyToSubstreamOutputs(key).isComplete) keyToSubstreamOutputs(key).enqueueOutputElement(elem) + substreamPendingState = NoPending + primaryInputs.NeedsInput + + case NoPending ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + if (keyToSubstreamOutputs.contains(key)) { + substreamPendingState = PendingElement(elem, key) + keyToSubstreamOutputs(key).NeedsDemand + } else if (!PrimaryOutputs.isComplete) { + substreamPendingState = PendingElementForNewStream(elem, key) + PrimaryOutputs.NeedsDemand + } else primaryInputs.NeedsInput + + } + + override def invalidateSubstream(substream: ActorRef): Unit = { + substreamPendingState match { + case PendingElement(_, key) if keyToSubstreamOutputs(key).substream == substream ⇒ + transferState = primaryInputs.NeedsInput + substreamPendingState = NoPending + case PendingElementForNewStream(_, key) if keyToSubstreamOutputs(key).substream == substream ⇒ + transferState = primaryInputs.NeedsInput + substreamPendingState = NoPending + case _ ⇒ + } + super.invalidateSubstream(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala index f1b50d3c58..52e737ec6e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala @@ -20,6 +20,11 @@ private[akka] object Ast { case class Transform(zero: Any, f: (Any, Any) ⇒ (Any, immutable.Seq[Any]), onComplete: Any ⇒ immutable.Seq[Any], isComplete: Any ⇒ Boolean) extends AstNode case class Recover(t: Transform) extends AstNode + case class GroupBy(f: Any ⇒ Any) extends AstNode + case class SplitWhen(p: Any ⇒ Boolean) extends AstNode + case class Merge(other: Producer[Any]) extends AstNode + case class Zip(other: Producer[Any]) extends AstNode + case class Concat(other: Producer[Any]) extends AstNode trait ProducerNode[I] { def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala new file mode 100644 index 0000000000..15a0aca31e --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.{ Props, ActorRef } +import org.reactivestreams.spi.Subscription +import akka.stream.impl._ +import akka.stream.{ Stream, GeneratorSettings } +import akka.actor.Terminated + +/** + * INTERNAL API + */ +private[akka] object SplitWhenProcessorImpl { + + trait SubstreamElementState + case object NoPending extends SubstreamElementState + case class PendingElement(elem: Any) extends SubstreamElementState + case class PendingElementForNewStream(elem: Any) extends SubstreamElementState +} + +/** + * INTERNAL API + */ +private[akka] class SplitWhenProcessorImpl(_settings: GeneratorSettings, val splitPredicate: Any ⇒ Boolean) + extends MultiStreamOutputProcessor(_settings) { + import SplitWhenProcessorImpl._ + + var pendingElement: SubstreamElementState = NoPending + var started = false + var currentSubstream: SubstreamOutputs = _ + + override def initialTransferState = needsPrimaryInputAndDemand + + override def transfer(): TransferState = pendingElement match { + case NoPending ⇒ + val elem = primaryInputs.dequeueInputElement() + if (!started) { + pendingElement = PendingElementForNewStream(elem) + started = true + PrimaryOutputs.NeedsDemand + } else if (splitPredicate(elem)) { + pendingElement = PendingElementForNewStream(elem) + currentSubstream.complete() + PrimaryOutputs.NeedsDemand + } else if (!currentSubstream.isComplete) { + pendingElement = PendingElement(elem) + currentSubstream.NeedsDemand + } else primaryInputs.NeedsInput + case PendingElement(elem) ⇒ + currentSubstream.enqueueOutputElement(elem) + pendingElement = NoPending + primaryInputs.NeedsInput + case PendingElementForNewStream(elem) ⇒ + val substreamOutput = newSubstream() + pushToDownstream(substreamOutput.processor) + currentSubstream = substreamOutput + pendingElement = PendingElement(elem) + currentSubstream.NeedsDemand + } + + override def invalidateSubstream(substream: ActorRef): Unit = { + pendingElement match { + case PendingElement(_) ⇒ + transferState = primaryInputs.NeedsInput + pendingElement = NoPending + case _ ⇒ + } + super.invalidateSubstream(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala new file mode 100644 index 0000000000..1917f2fcbb --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala @@ -0,0 +1,74 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.GeneratorSettings +import org.reactivestreams.api.Producer +import scala.concurrent.forkjoin.ThreadLocalRandom + +/** + * INTERNAL API + */ +private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any]) + extends TwoStreamInputProcessor(_settings, _other) { + + def needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand + + override def initialTransferState = needsAnyInputAndDemand + override def transfer(): TransferState = { + // TODO: More flexible merging strategies are possible here. This takes a random element if we have elements + // from both upstreams. + val tieBreak = ThreadLocalRandom.current().nextBoolean() + if (primaryInputs.inputsAvailable && (!secondaryInputs.inputsAvailable || tieBreak)) { + PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) + } else { + PrimaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement()) + } + needsAnyInputAndDemand + } + +} + +/** + * INTERNAL API + */ +private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any]) + extends TwoStreamInputProcessor(_settings, _other) { + + def needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand + + override def initialTransferState = needsBothInputAndDemand + override protected def transfer(): TransferState = { + PrimaryOutputs.enqueueOutputElement((primaryInputs.dequeueInputElement(), secondaryInputs.dequeueInputElement())) + needsBothInputAndDemand + } +} + +/** + * INTERNAL API + */ +private[akka] class ConcatImpl(_settings: GeneratorSettings, _other: Producer[Any]) + extends TwoStreamInputProcessor(_settings, _other) { + + def needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand + def needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand + var processingPrimary = true + + override protected def initialTransferState: TransferState = needsPrimaryInputAndDemandWithComplete + override protected def transfer(): TransferState = { + if (processingPrimary) { + if (primaryInputs.inputsDepleted) { + processingPrimary = false + needsSecondaryInputAndDemand + } else { + PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) + needsPrimaryInputAndDemandWithComplete + } + } else { + PrimaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement()) + needsSecondaryInputAndDemand + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala index 780f8a5411..f5d8f0c7b1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamImpl.scala @@ -61,6 +61,16 @@ private[akka] case class StreamImpl[I, O](producerNode: Ast.ProducerNode[I], ops onComplete.asInstanceOf[Any ⇒ immutable.Seq[Any]], isComplete.asInstanceOf[Any ⇒ Boolean]))) + override def zip[O2](other: Producer[O2]): Stream[(O, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) + + override def concat(next: Producer[O]): Stream[O] = andThen(Concat(next.asInstanceOf[Producer[Any]])) + + override def merge(other: Producer[O]): Stream[O] = andThen(Merge(other.asInstanceOf[Producer[Any]])) + + override def splitWhen(p: (O) ⇒ Boolean): Stream[Producer[O]] = andThen(SplitWhen(p.asInstanceOf[Any ⇒ Boolean])) + + override def groupBy[K](f: (O) ⇒ K): Stream[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) + def toFuture(generator: ProcessorGenerator): Future[O] = { val p = Promise[O]() transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, isComplete = _ == 1).consume(generator) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala new file mode 100644 index 0000000000..ce416df41d --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -0,0 +1,177 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.stream.GeneratorSettings +import akka.actor.{ Terminated, ActorRef } +import org.reactivestreams.spi.{ Subscriber, Subscription } +import org.reactivestreams.api.Producer + +/** + * INTERNAL API + */ +private[akka] object MultiStreamOutputProcessor { + case class SubstreamRequestMore(substream: ActorRef, demand: Int) + case class SubstreamCancel(substream: ActorRef) + + class SubstreamSubscription( final val parent: ActorRef, final val substream: ActorRef) extends Subscription { + override def requestMore(elements: Int): Unit = + if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") + else parent ! SubstreamRequestMore(substream, elements) + override def cancel(): Unit = parent ! SubstreamCancel(substream) + override def toString = "SubstreamSubscription" + System.identityHashCode(this) + } + +} + +/** + * INTERNAL API + */ +private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) { + import MultiStreamOutputProcessor._ + + private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs] + + class SubstreamOutputs extends Outputs { + private var completed: Boolean = false + private var demands: Int = 0 + + val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings))) + val processor = new ActorProcessor[AnyRef, AnyRef](substream) + + override def isComplete: Boolean = completed + override def complete(): Unit = { + if (!completed) substream ! OnComplete + completed = true + } + + override def cancel(): Unit = completed = true + + override def enqueueOutputElement(elem: Any): Unit = { + demands -= 1 + substream ! OnNext(elem) + } + def enqueueOutputDemand(demand: Int): Unit = demands += demand + override def demandAvailable: Boolean = demands > 0 + override val NeedsDemand: TransferState = new TransferState { + override def isReady: Boolean = demandAvailable + override def isCompleted: Boolean = completed + } + override val NeedsDemandOrCancel: TransferState = new TransferState { + override def isReady: Boolean = demandAvailable || isComplete + override def isCompleted: Boolean = false + } + } + + protected def newSubstream(): SubstreamOutputs = { + val outputs = new SubstreamOutputs + outputs.substream ! OnSubscribe(new SubstreamSubscription(self, outputs.substream)) + substreamOutputs(outputs.substream) = outputs + outputs + } + + protected def invalidateSubstream(substream: ActorRef): Unit = { + substreamOutputs(substream).complete() + substreamOutputs -= substream + if ((isShuttingDown || PrimaryOutputs.isComplete) && context.children.isEmpty) context.stop(self) + pump() + } + + override def fail(e: Throwable): Unit = { + context.children foreach (_ ! OnError(e)) + super.fail(e) + } + + override def shutdown(completed: Boolean): Unit = { + // If the master stream is cancelled (no one consumes substreams) then this callback does not mean we are shutting down + // We can only shut down after all substreams are closed + //if (!PrimaryOutputs.isComplete) { + if (context.children.isEmpty) super.shutdown(completed) + //} + + } + + override def completeDownstream(): Unit = { + context.children foreach (_ ! OnComplete) + super.completeDownstream() + } + + override val downstreamManagement: Receive = super.downstreamManagement orElse { + case SubstreamRequestMore(key, demand) ⇒ + substreamOutputs(key).enqueueOutputDemand(demand) + pump() + case SubstreamCancel(key) ⇒ // FIXME: Terminated should handle this case. Maybe remove SubstreamCancel and just Poison self? + case Terminated(child) ⇒ invalidateSubstream(child) + + } +} + +/** + * INTERNAL API + */ +private[akka] object TwoStreamInputProcessor { + class OtherActorSubscriber[T]( final val impl: ActorRef) extends Subscriber[T] { + override def onError(cause: Throwable): Unit = impl ! OnError(cause) + override def onComplete(): Unit = impl ! OtherStreamOnComplete + override def onNext(element: T): Unit = impl ! OtherStreamOnNext(element) + override def onSubscribe(subscription: Subscription): Unit = impl ! OtherStreamOnSubscribe(subscription) + } + + case object OtherStreamOnComplete + case class OtherStreamOnNext(element: Any) + case class OtherStreamOnSubscribe(subscription: Subscription) +} + +/** + * INTERNAL API + */ +private[akka] abstract class TwoStreamInputProcessor(_settings: GeneratorSettings, val other: Producer[Any]) + extends ActorProcessorImpl(_settings) { + import TwoStreamInputProcessor._ + + var secondaryInputs: Inputs = _ + + other.getPublisher.subscribe(new OtherActorSubscriber(self)) + + override def waitingForUpstream: Receive = super.waitingForUpstream orElse { + case OtherStreamOnComplete ⇒ + secondaryInputs = EmptyInputs + transitionToRunningWhenReady() + case OtherStreamOnSubscribe(subscription) ⇒ + assert(subscription != null) + secondaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize) + transitionToRunningWhenReady() + } + + override def running: Receive = super.running orElse { + case OtherStreamOnNext(element) ⇒ + secondaryInputs.enqueueInputElement(element) + pump() + case OtherStreamOnComplete ⇒ + secondaryInputs.complete() + flushAndComplete() + pump() + } + + override def flushAndComplete(): Unit = { + if (secondaryInputs.isCompleted && primaryInputs.isCompleted) + super.flushAndComplete() + } + + override def transitionToRunningWhenReady(): Unit = if ((primaryInputs ne null) && (secondaryInputs ne null)) { + secondaryInputs.prefetch() + super.transitionToRunningWhenReady() + } + + override def fail(cause: Throwable): Unit = { + if (secondaryInputs ne null) secondaryInputs.cancel() + super.fail(cause) + } + + override def cancelUpstream(): Unit = { + if (secondaryInputs ne null) secondaryInputs.cancel() + super.cancelUpstream() + } + +} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala new file mode 100644 index 0000000000..cbce375d7f --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -0,0 +1,138 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.stream.impl + +import org.reactivestreams.spi.Subscription +import java.util.Arrays + +trait Inputs { + def NeedsInput: TransferState + def NeedsInputOrComplete: TransferState + + def enqueueInputElement(elem: Any): Unit + def dequeueInputElement(): Any + + def cancel(): Unit + def complete(): Unit + def isCompleted: Boolean + + def prefetch(): Unit + def clear(): Unit + + def inputsDepleted: Boolean + def inputsAvailable: Boolean +} + +trait Outputs { + def NeedsDemand: TransferState + def NeedsDemandOrCancel: TransferState + + def demandAvailable: Boolean + def enqueueOutputElement(elem: Any): Unit + + def complete(): Unit + def cancel(): Unit + def isComplete: Boolean +} + +// States of the operation that is executed by this processor +trait TransferState { + def isReady: Boolean + def isCompleted: Boolean + def isExecutable = isReady && !isCompleted + + def ||(other: TransferState): TransferState = new TransferState { + def isReady: Boolean = TransferState.this.isReady || other.isReady + def isCompleted: Boolean = TransferState.this.isCompleted && other.isCompleted + } + + def &&(other: TransferState): TransferState = new TransferState { + def isReady: Boolean = TransferState.this.isReady && other.isReady + def isCompleted: Boolean = TransferState.this.isCompleted || other.isCompleted + } +} + +object Completed extends TransferState { + def isReady = false + def isCompleted = true +} + +object EmptyInputs extends Inputs { + override def inputsAvailable: Boolean = false + override def inputsDepleted: Boolean = true + override def isCompleted: Boolean = true + + override def complete(): Unit = () + override def cancel(): Unit = () + override def prefetch(): Unit = () + override def clear(): Unit = () + + override def dequeueInputElement(): Any = throw new UnsupportedOperationException("Cannot dequeue from EmptyInputs") + override def enqueueInputElement(elem: Any): Unit = throw new UnsupportedOperationException("Cannot enqueue to EmptyInputs") + + override val NeedsInputOrComplete: TransferState = new TransferState { + override def isReady: Boolean = true + override def isCompleted: Boolean = false + } + override val NeedsInput: TransferState = Completed +} + +class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends Inputs { + // TODO: buffer and batch sizing heuristics + private var inputBuffer = Array.ofDim[AnyRef](size) + private var inputBufferElements = 0 + private var nextInputElementCursor = 0 + private var upstreamCompleted = false + val IndexMask = size - 1 + + private def requestBatchSize = math.max(1, inputBuffer.length / 2) + private var batchRemaining = requestBatchSize + + def prefetch(): Unit = upstream.requestMore(inputBuffer.length) + + def dequeueInputElement(): Any = { + val elem = inputBuffer(nextInputElementCursor) + inputBuffer(nextInputElementCursor) = null + + batchRemaining -= 1 + if (batchRemaining == 0 && !upstreamCompleted) { + upstream.requestMore(requestBatchSize) + batchRemaining = requestBatchSize + } + + inputBufferElements -= 1 + nextInputElementCursor += 1 + nextInputElementCursor &= IndexMask + elem + } + + def enqueueInputElement(elem: Any): Unit = { + inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] + inputBufferElements += 1 + } + + def complete(): Unit = upstreamCompleted = true + def cancel(): Unit = { + if (!upstreamCompleted) upstream.cancel() + upstreamCompleted = true + } + def isCompleted: Boolean = upstreamCompleted + + def clear(): Unit = { + Arrays.fill(inputBuffer, 0, inputBuffer.length, null) + inputBufferElements = 0 + } + + def inputsDepleted = upstreamCompleted && inputBufferElements == 0 + def inputsAvailable = inputBufferElements > 0 + + override val NeedsInput: TransferState = new TransferState { + def isReady = inputsAvailable + def isCompleted = inputsDepleted + } + override val NeedsInputOrComplete: TransferState = new TransferState { + def isReady = inputsAvailable || inputsDepleted + def isCompleted = false + } +} diff --git a/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala new file mode 100644 index 0000000000..87ae3d90fd --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamConcatSpec.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.Stream +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec + +class StreamConcatSpec extends AkkaSpec { + import system.dispatcher + + val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2), system) + + "Concat" must { + + "work in the happy case" in { + val source0 = Stream(List.empty[Int].iterator).toProducer(gen) + val source1 = Stream((1 to 4).iterator).toProducer(gen) + val source2 = Stream((5 to 10).iterator).toProducer(gen) + val p = Stream(source0).concat(source1).concat(source2).toProducer(gen) + + val probe = StreamTestKit.consumerProbe[Int] + p.produceTo(probe) + val subscription = probe.expectSubscription() + + for (i ← 1 to 10) { + subscription.requestMore(1) + probe.expectNext(i) + } + + probe.expectComplete() + } + + } +} diff --git a/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala new file mode 100644 index 0000000000..30f7b2d129 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamGroupBySpec.scala @@ -0,0 +1,165 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit._ +import akka.testkit.AkkaSpec +import org.reactivestreams.api.Producer +import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.Stream + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StreamGroupBySpec extends AkkaSpec { + + import system.dispatcher + + val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2), system) + + case class StreamPuppet(p: Producer[Int]) { + val probe = StreamTestKit.consumerProbe[Int] + p.produceTo(probe) + val subscription = probe.expectSubscription() + + def requestMore(demand: Int): Unit = subscription.requestMore(demand) + def expectNext(elem: Int): Unit = probe.expectNext(elem) + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + def expectComplete(): Unit = probe.expectComplete() + def cancel(): Unit = subscription.cancel() + } + + class SubstreamsSupport(groupCount: Int = 2, elementCount: Int = 6) { + val source = Stream((1 to elementCount).iterator).toProducer(gen) + val groupStream = Stream(source).groupBy(_ % groupCount).toProducer(gen) + val masterConsumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] + + groupStream.produceTo(masterConsumer) + val masterSubscription = masterConsumer.expectSubscription() + + def getSubproducer(expectedKey: Int): Producer[Int] = { + masterSubscription.requestMore(1) + expectSubproducer(expectedKey: Int) + } + + def expectSubproducer(expectedKey: Int): Producer[Int] = { + val (key, substream) = masterConsumer.expectNext() + key should be(expectedKey) + substream + } + + } + + "groupBy" must { + "work in the happy case" in new SubstreamsSupport(groupCount = 2) { + val s1 = StreamPuppet(getSubproducer(1)) + masterConsumer.expectNoMsg(100.millis) + + s1.expectNoMsg(100.millis) + s1.requestMore(1) + s1.expectNext(1) + s1.expectNoMsg(100.millis) + + val s2 = StreamPuppet(getSubproducer(0)) + masterConsumer.expectNoMsg(100.millis) + + s2.expectNoMsg(100.millis) + s2.requestMore(2) + s2.expectNext(2) + s2.expectNext(4) + + s2.expectNoMsg(100.millis) + + s1.requestMore(1) + s1.expectNext(3) + + s2.requestMore(1) + s2.expectNext(6) + s2.expectComplete() + + s1.requestMore(1) + s1.expectNext(5) + s1.expectComplete() + + masterConsumer.expectComplete() + + } + + "accept cancellation of substreams" in new SubstreamsSupport(groupCount = 2) { + StreamPuppet(getSubproducer(1)).cancel() + + val substream = StreamPuppet(getSubproducer(0)) + masterConsumer.expectNoMsg(100.millis) + substream.requestMore(2) + substream.expectNext(2) + substream.expectNext(4) + substream.expectNoMsg(100.millis) + + substream.requestMore(2) + substream.expectNext(6) + substream.expectComplete() + + masterConsumer.expectComplete() + + } + + "accept cancellation of master stream when not consumed anything" in new SubstreamsSupport(groupCount = 2) { + masterSubscription.cancel() + masterConsumer.expectNoMsg(100.millis) + } + + "accept cancellation of master stream when substreams are open" in new SubstreamsSupport(groupCount = 3, elementCount = 13) { + pending + // val substream = StreamPuppet(getSubproducer(1)) + // + // substream.requestMore(1) + // substream.expectNext(1) + // + // masterSubscription.cancel() + // masterConsumer.expectNoMsg(100.millis) + // + // // Open substreams still work, others are discarded + // substream.requestMore(4) + // substream.expectNext(4) + // substream.expectNext(7) + // substream.expectNext(10) + // substream.expectNext(13) + // substream.expectComplete() + } + + "work with fanout on substreams" in new SubstreamsSupport(groupCount = 2) { + val substreamProducer = getSubproducer(1) + getSubproducer(0) + + val substreamConsumer1 = StreamPuppet(substreamProducer) + val substreamConsumer2 = StreamPuppet(substreamProducer) + + substreamConsumer1.requestMore(1) + substreamConsumer1.expectNext(1) + substreamConsumer2.requestMore(1) + substreamConsumer2.expectNext(1) + + substreamConsumer1.requestMore(1) + substreamConsumer1.expectNext(3) + substreamConsumer2.requestMore(1) + substreamConsumer2.expectNext(3) + } + + "work with fanout on master stream" in { + pending + } + + "work with fanout on substreams and master stream" in { + pending + } + + "abort on onError from upstream" in { + pending + } + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala new file mode 100644 index 0000000000..3c603368a7 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamMergeSpec.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec +import org.reactivestreams.api.Producer +import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.{ Stream, GeneratorSettings } + +class StreamMergeSpec extends AkkaSpec { + + import system.dispatcher + + val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2), system) + + "merge" must { + + "work in the happy case" in { + // Different input sizes (4 and 6) + val source1 = Stream((1 to 4).iterator).toProducer(gen) + val source2 = Stream((5 to 10).iterator).toProducer(gen) + val source3 = Stream(List.empty[Int].iterator).toProducer(gen) + val p = Stream(source1).merge(source2).merge(source3).toProducer(gen) + + val probe = StreamTestKit.consumerProbe[Int] + p.produceTo(probe) + val subscription = probe.expectSubscription() + + var collected = Set.empty[Int] + for (_ ← 1 to 10) { + subscription.requestMore(1) + collected += probe.expectNext() + } + + collected should be(Set(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)) + probe.expectComplete() + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala new file mode 100644 index 0000000000..30908c580c --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamSplitWhenSpec.scala @@ -0,0 +1,108 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.stream + +import scala.concurrent.duration._ +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec +import org.reactivestreams.api.Producer +import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.{ Stream, GeneratorSettings } + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class StreamSplitWhenSpec extends AkkaSpec { + + import system.dispatcher + + val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2), system) + + case class StreamPuppet(p: Producer[Int]) { + val probe = StreamTestKit.consumerProbe[Int] + p.produceTo(probe) + val subscription = probe.expectSubscription() + + def requestMore(demand: Int): Unit = subscription.requestMore(demand) + def expectNext(elem: Int): Unit = probe.expectNext(elem) + def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max) + def expectComplete(): Unit = probe.expectComplete() + def cancel(): Unit = subscription.cancel() + } + + class SubstreamsSupport(splitWhen: Int = 3, elementCount: Int = 6) { + val source = Stream((1 to elementCount).iterator).toProducer(gen) + val groupStream = Stream(source).splitWhen(_ == splitWhen).toProducer(gen) + val masterConsumer = StreamTestKit.consumerProbe[Producer[Int]] + + groupStream.produceTo(masterConsumer) + val masterSubscription = masterConsumer.expectSubscription() + + def getSubproducer(): Producer[Int] = { + masterSubscription.requestMore(1) + expectSubproducer() + } + + def expectSubproducer(): Producer[Int] = { + val substream = masterConsumer.expectNext() + substream + } + + } + + "splitWhen" must { + + "work in the happy case" in new SubstreamsSupport(elementCount = 4) { + val s1 = StreamPuppet(getSubproducer()) + masterConsumer.expectNoMsg(100.millis) + + s1.requestMore(2) + s1.expectNext(1) + s1.expectNext(2) + s1.expectComplete() + + val s2 = StreamPuppet(getSubproducer()) + masterConsumer.expectComplete() + + s2.requestMore(1) + s2.expectNext(3) + s2.expectNoMsg(100.millis) + + s2.requestMore(1) + s2.expectNext(4) + s2.expectComplete() + + } + + "support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubproducer()) + s1.cancel() + val s2 = StreamPuppet(getSubproducer()) + + s2.requestMore(4) + s2.expectNext(5) + s2.expectNext(6) + s2.expectNext(7) + s2.expectNext(8) + s2.expectComplete() + + masterConsumer.expectComplete() + } + + "support cancelling the master stream" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { + val s1 = StreamPuppet(getSubproducer()) + masterSubscription.cancel() + s1.requestMore(4) + s1.expectNext(1) + s1.expectNext(2) + s1.expectNext(3) + s1.expectNext(4) + s1.expectComplete() + } + + } + +} diff --git a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala index 5dc699a2e9..a3847f5544 100644 --- a/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/StreamTransformRecoverSpec.scala @@ -179,7 +179,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { { case (s, Failure(ex)) ⇒ (s + ex.getMessage, List(ex)) }, onComplete = x ⇒ List(TE(x.size + "10"))) .toProducer(gen) - val proc = p.expectSubscription + val proc = p.expectSubscription() val c = StreamTestKit.consumerProbe[Throwable] p2.produceTo(c) val s = c.expectSubscription() @@ -193,7 +193,7 @@ class StreamTransformRecoverSpec extends AkkaSpec { "forward errors when received and thrown" in { val p = StreamTestKit.producerProbe[Int] val p2 = Stream(p).transformRecover("")((_, in) ⇒ "" -> List(in.get)).toProducer(gen) - val proc = p.expectSubscription + val proc = p.expectSubscription() val c = StreamTestKit.consumerProbe[Int] p2.produceTo(c) val s = c.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala b/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala new file mode 100644 index 0000000000..b1caf9d183 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/StreamZipSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream + +import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.Stream +import akka.stream.testkit.StreamTestKit +import akka.testkit.AkkaSpec + +class StreamZipSpec extends AkkaSpec { + import system.dispatcher + + val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + initialInputBufferSize = 2, + maximumInputBufferSize = 2, + initialFanOutBufferSize = 2, + maxFanOutBufferSize = 2), system) + + "Zip" must { + + "work in the happy case" in { + // Different input sizes (4 and 6) + val source1 = Stream((1 to 4).iterator).toProducer(gen) + val source2 = Stream(List("A", "B", "C", "D", "E", "F").iterator).toProducer(gen) + val p = Stream(source1).zip(source2).toProducer(gen) + + val probe = StreamTestKit.consumerProbe[(Int, String)] + p.produceTo(probe) + val subscription = probe.expectSubscription() + + subscription.requestMore(2) + probe.expectNext((1, "A")) + probe.expectNext((2, "B")) + + subscription.requestMore(1) + probe.expectNext((3, "C")) + subscription.requestMore(1) + probe.expectNext((4, "D")) + + probe.expectComplete() + } + + } + +}