diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index cd94b89ffa..fb3781c411 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -24,6 +24,9 @@ private[akka] object Ast { def name: String } + case class FanoutBox(initialBufferSize: Int, maximumBufferSize: Int) extends AstNode { + override def name = "fanoutBox" + } case class Transform(name: String, mkTransformer: () ⇒ Transformer[Any, Any]) extends AstNode case class TimerTransform(name: String, mkTransformer: () ⇒ TimerTransformer[Any, Any]) extends AstNode case class MapFuture(f: Any ⇒ Future[Any]) extends AstNode { diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index b084e06348..60cd276a59 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -19,6 +19,7 @@ private[akka] object ActorProcessor { import Ast._ def props(settings: MaterializerSettings, op: AstNode): Props = (op match { + case fb: FanoutBox ⇒ Props(new FanoutProcessorImpl(settings, fb.initialBufferSize, fb.maximumBufferSize)) case t: TimerTransform ⇒ Props(new TimerTransformerProcessorsImpl(settings, t.mkTransformer())) case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.mkTransformer())) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) @@ -159,61 +160,48 @@ private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) /** * INTERNAL API */ -private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, val pump: Pump) - extends DefaultOutputTransferStates - with SubscriberManagement[Any] { - - override type S = ActorSubscription[Any] - override def createSubscription(subscriber: Subscriber[Any]): S = - new ActorSubscription(self, subscriber) +private[akka] class SimpleOutputs(self: ActorRef, val pump: Pump) extends DefaultOutputTransferStates { protected var exposedPublisher: ActorPublisher[Any] = _ - private var downstreamBufferSpace = 0 - private var downstreamCompleted = false - override def demandAvailable = downstreamBufferSpace > 0 - def demandCount: Int = downstreamBufferSpace + protected var subscriber: Subscriber[Any] = _ + protected var downstreamDemand: Long = 0L + protected var downstreamCompleted = false + override def demandAvailable = downstreamDemand > 0 + override def demandCount: Long = downstreamDemand - override val subreceive = new SubReceive(waitingExposedPublisher) + override def subreceive = _subreceive + private val _subreceive = new SubReceive(waitingExposedPublisher) def enqueueOutputElement(elem: Any): Unit = { - downstreamBufferSpace -= 1 - pushToDownstream(elem) + downstreamDemand -= 1 + subscriber.onNext(elem) } - def complete(): Unit = + def complete(): Unit = { if (!downstreamCompleted) { downstreamCompleted = true - completeDownstream() + if (subscriber ne null) subscriber.onComplete() + if (exposedPublisher ne null) exposedPublisher.shutdown(None) } + } def cancel(e: Throwable): Unit = { if (!downstreamCompleted) { downstreamCompleted = true - abortDownstream(e) + if (subscriber ne null) subscriber.onError(e) + if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } - if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) } def isClosed: Boolean = downstreamCompleted - def afterShutdown(): Unit - - override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements - - private def subscribePending(): Unit = - exposedPublisher.takePendingSubscribers() foreach registerSubscriber - - override protected def shutdown(completed: Boolean): Unit = { - if (exposedPublisher ne null) { - if (completed) exposedPublisher.shutdown(None) - else exposedPublisher.shutdown(Some(new IllegalStateException("Cannot subscribe to shutdown publisher"))) + private def subscribePending(subscribers: Seq[Subscriber[Any]]): Unit = + subscribers foreach { sub ⇒ + if (subscriber eq null) { + subscriber = sub + subscriber.onSubscribe(new ActorSubscription(self, subscriber)) + } else sub.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) } - afterShutdown() - } - - override protected def cancelUpstream(): Unit = { - downstreamCompleted = true - } protected def waitingExposedPublisher: Actor.Receive = { case ExposedPublisher(publisher) ⇒ @@ -225,12 +213,13 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu protected def downstreamRunning: Actor.Receive = { case SubscribePending ⇒ - subscribePending() + subscribePending(exposedPublisher.takePendingSubscribers()) case RequestMore(subscription, elements) ⇒ - moreRequested(subscription.asInstanceOf[ActorSubscription[Any]], elements) + downstreamDemand += elements pump.pump() case Cancel(subscription) ⇒ - unregisterSubscription(subscription.asInstanceOf[ActorSubscription[Any]]) + downstreamCompleted = true + exposedPublisher.shutdown(Some(new ActorPublisher.NormalShutdownException)) pump.pump() } @@ -242,7 +231,6 @@ private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBu private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettings) extends Actor with ActorLogging - with SoftShutdown with Pump with Stash { @@ -251,13 +239,7 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) } - protected val primaryOutputs: FanoutOutputs = - new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, this) { - override def afterShutdown(): Unit = { - primaryOutputsShutdown = true - shutdownHooks() - } - } + protected val primaryOutputs: Outputs = new SimpleOutputs(self, this) /** * Subclass may override [[#activeReceive]] @@ -279,29 +261,20 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin log.error(e, "failure during processing") // FIXME: escalate to supervisor instead primaryInputs.cancel() primaryOutputs.cancel(e) - primaryOutputsShutdown = true - softShutdown() + context.stop(self) } - override val pumpContext = context - override def pumpFinished(): Unit = { - if (primaryInputs.isOpen) primaryInputs.cancel() + primaryInputs.cancel() primaryOutputs.complete() + context.stop(self) } + override def pumpFailed(e: Throwable): Unit = fail(e) - protected def shutdownHooks(): Unit = { - primaryInputs.cancel() - softShutdown() - } - - var primaryOutputsShutdown = false - override def postStop(): Unit = { - // Non-gracefully stopped, do our best here - if (!primaryOutputsShutdown) - primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly")) + primaryInputs.cancel() + primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly")) } override def preRestart(reason: Throwable, message: Option[Any]): Unit = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala new file mode 100644 index 0000000000..fa2285fca2 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/FanoutProcessor.scala @@ -0,0 +1,119 @@ +package akka.stream.impl + +import akka.actor.{ Actor, ActorRef } +import akka.stream.MaterializerSettings +import org.reactivestreams.Subscriber + +/** + * INTERNAL API + */ +private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, val pump: Pump) + extends DefaultOutputTransferStates + with SubscriberManagement[Any] { + + override type S = ActorSubscription[Any] + override def createSubscription(subscriber: Subscriber[Any]): S = + new ActorSubscription(self, subscriber) + protected var exposedPublisher: ActorPublisher[Any] = _ + + private var downstreamBufferSpace = 0 + private var downstreamCompleted = false + override def demandAvailable = downstreamBufferSpace > 0 + override def demandCount: Long = downstreamBufferSpace + + override val subreceive = new SubReceive(waitingExposedPublisher) + + def enqueueOutputElement(elem: Any): Unit = { + downstreamBufferSpace -= 1 + pushToDownstream(elem) + } + + def complete(): Unit = + if (!downstreamCompleted) { + downstreamCompleted = true + completeDownstream() + } + + def cancel(e: Throwable): Unit = { + if (!downstreamCompleted) { + downstreamCompleted = true + abortDownstream(e) + } + if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) + } + + def isClosed: Boolean = downstreamCompleted + + def afterShutdown(): Unit + + override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements + + private def subscribePending(): Unit = + exposedPublisher.takePendingSubscribers() foreach registerSubscriber + + override protected def shutdown(completed: Boolean): Unit = { + if (exposedPublisher ne null) { + if (completed) exposedPublisher.shutdown(None) + else exposedPublisher.shutdown(Some(new IllegalStateException("Cannot subscribe to shutdown publisher"))) + } + afterShutdown() + } + + override protected def cancelUpstream(): Unit = { + downstreamCompleted = true + } + + protected def waitingExposedPublisher: Actor.Receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + subreceive.become(downstreamRunning) + case other ⇒ + throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") + } + + protected def downstreamRunning: Actor.Receive = { + case SubscribePending ⇒ + subscribePending() + case RequestMore(subscription, elements) ⇒ + moreRequested(subscription.asInstanceOf[ActorSubscription[Any]], elements) + pump.pump() + case Cancel(subscription) ⇒ + unregisterSubscription(subscription.asInstanceOf[ActorSubscription[Any]]) + pump.pump() + } + +} + +/** + * INTERNAL API + */ +private[akka] class FanoutProcessorImpl( + _settings: MaterializerSettings, + initialFanoutBufferSize: Int, + maximumFanoutBufferSize: Int) extends ActorProcessorImpl(_settings) { + + override val primaryOutputs: FanoutOutputs = + new FanoutOutputs(maximumFanoutBufferSize, initialFanoutBufferSize, self, this) { + override def afterShutdown(): Unit = afterFlush() + } + + val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) + } + + override def fail(e: Throwable): Unit = { + log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + primaryInputs.cancel() + primaryOutputs.cancel(e) + // Stopping will happen after flush + } + + override def pumpFinished(): Unit = { + primaryInputs.cancel() + primaryOutputs.complete() + } + + def afterFlush(): Unit = context.stop(self) + + nextPhase(running) +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 56dc9dbb80..07fb747190 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -334,6 +334,10 @@ private[akka] trait Builder[Out] { andThen(Buffer(size, overflowStrategy)) } + def fanout(initialBufferSize: Int, maximumBufferSize: Int): Thing[Out] = { + andThen(FanoutBox(initialBufferSize, maximumBufferSize)) + } + def flatten[U](strategy: FlattenStrategy[Out, U]): Thing[U] = strategy match { case _: FlattenStrategy.Concat[Out] ⇒ andThen(ConcatAll) case _ ⇒ throw new IllegalArgumentException(s"Unsupported flattening strategy [${strategy.getClass.getSimpleName}]") diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 6aeea3d871..6ec5287d57 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -1,67 +1,68 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.actor.{ Terminated, Props, ActorRef } -import akka.stream.MaterializerSettings - -/** - * INTERNAL API - */ -private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { - var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] - - var pendingSubstreamOutputs: SubstreamOutputs = _ - - // No substream is open yet. If downstream cancels now, we are complete - val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - nextPhase(openSubstream(elem, key)) - } - - // some substreams are open now. If downstream cancels, we still continue until the substreams are closed - val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - - keyToSubstreamOutputs.get(key) match { - case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key))) - case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) - case _ ⇒ // stay - } - } - - def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ - if (primaryOutputs.isClosed) { - // Just drop, we do not open any more substreams - nextPhase(waitNext) - } else { - val substreamOutput = newSubstream() - primaryOutputs.enqueueOutputElement((key, substreamOutput.processor)) - keyToSubstreamOutputs(key) = substreamOutput - nextPhase(dispatchToSubstream(elem, substreamOutput)) - } - } - - def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = { - pendingSubstreamOutputs = substream - TransferPhase(substream.NeedsDemand) { () ⇒ - if (substream.isOpen) substream.enqueueOutputElement(elem) - pendingSubstreamOutputs = null - nextPhase(waitNext) - } - } - - nextPhase(waitFirst) - - override def invalidateSubstream(substream: ActorRef): Unit = { - if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.substream) { - pendingSubstreamOutputs = null - nextPhase(waitNext) - } - super.invalidateSubstream(substream) - } - -} +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.{ Terminated, Props, ActorRef } +import akka.stream.MaterializerSettings +import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey + +/** + * INTERNAL API + */ +private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { + var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] + + var pendingSubstreamOutputs: SubstreamOutputs = _ + + // No substream is open yet. If downstream cancels now, we are complete + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + nextPhase(openSubstream(elem, key)) + } + + // some substreams are open now. If downstream cancels, we still continue until the substreams are closed + val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + + keyToSubstreamOutputs.get(key) match { + case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key))) + case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) + case _ ⇒ // stay + } + } + + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + if (primaryOutputs.isClosed) { + // Just drop, we do not open any more substreams + nextPhase(waitNext) + } else { + val substreamOutput = newSubstream() + primaryOutputs.enqueueOutputElement((key, substreamOutput)) + keyToSubstreamOutputs(key) = substreamOutput + nextPhase(dispatchToSubstream(elem, substreamOutput)) + } + } + + def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = { + pendingSubstreamOutputs = substream + TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + pendingSubstreamOutputs = null + nextPhase(waitNext) + } + } + + nextPhase(waitFirst) + + override def invalidateSubstream(substream: SubstreamKey): Unit = { + if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.key) { + pendingSubstreamOutputs = null + nextPhase(waitNext) + } + super.invalidateSubstream(substream) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala index 58365761a4..5567c15d86 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/PrefixAndTailImpl.scala @@ -44,7 +44,7 @@ private[akka] class PrefixAndTailImpl(_settings: MaterializerSettings, val takeM def emitNonEmptyTail(): Unit = { val substreamOutput = newSubstream() - primaryOutputs.enqueueOutputElement((taken, substreamOutput.processor)) + primaryOutputs.enqueueOutputElement((taken, substreamOutput)) primaryOutputs.complete() nextPhase(streamTailPhase(substreamOutput)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 1d20373176..a9bb2555ed 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -15,7 +15,6 @@ import scala.util.control.NonFatal private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: TransformerLike[Any, Any]) extends ActorProcessorImpl(_settings) { - var hasCleanupRun = false // TODO performance improvement: mutable buffer? var emits = immutable.Seq.empty[Any] var errorEvent: Option[Throwable] = None @@ -35,7 +34,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran object NeedsInputAndDemandOrCompletion extends TransferState { def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted - def isCompleted = false + def isCompleted = primaryOutputs.isClosed } private val runningPhase: TransferPhase = TransferPhase(NeedsInputAndDemandOrCompletion) { () ⇒ @@ -73,15 +72,7 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" - override def softShutdown(): Unit = { - transformer.cleanup() - hasCleanupRun = true // for postStop - super.softShutdown() - } - - override def postStop(): Unit = { - try super.postStop() finally if (!hasCleanupRun) transformer.cleanup() - } + override def postStop(): Unit = try super.postStop() finally transformer.cleanup() } /** diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index d0f499a924..1b30da06fb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -3,8 +3,8 @@ */ package akka.stream.impl -import akka.actor.ActorRef import akka.stream.MaterializerSettings +import akka.stream.impl.MultiStreamOutputProcessor.SubstreamKey /** * INTERNAL API @@ -20,7 +20,7 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ val substreamOutput = newSubstream() - primaryOutputs.enqueueOutputElement(substreamOutput.processor) + primaryOutputs.enqueueOutputElement(substreamOutput) currentSubstream = substreamOutput nextPhase(serveSubstreamFirst(currentSubstream, elem)) } @@ -49,8 +49,8 @@ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val nextPhase(waitFirst) - override def invalidateSubstream(substream: ActorRef): Unit = { - if ((currentSubstream ne null) && substream == currentSubstream.substream) nextPhase(ignoreUntilNewSubstream) + override def invalidateSubstream(substream: SubstreamKey): Unit = { + if ((currentSubstream ne null) && substream == currentSubstream.key) nextPhase(ignoreUntilNewSubstream) super.invalidateSubstream(substream) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala index 6374e6db07..ab95a235c1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala @@ -23,12 +23,23 @@ private[akka] class BroadcastImpl(_settings: MaterializerSettings, other: Subscr super.registerSubscriber(subscriber) } - override def afterShutdown(): Unit = { - primaryOutputsShutdown = true - shutdownHooks() - } + override def afterShutdown(): Unit = afterFlush() } + override def fail(e: Throwable): Unit = { + log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + primaryInputs.cancel() + primaryOutputs.cancel(e) + // Stopping will happen after flush + } + + override def pumpFinished(): Unit = { + primaryInputs.cancel() + primaryOutputs.complete() + } + + def afterFlush(): Unit = context.stop(self) + val running = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val in = primaryInputs.dequeueInputElement() primaryOutputs.enqueueOutputElement(in) diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index c336b820d4..43c48f5603 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -3,25 +3,27 @@ */ package akka.stream.impl +import java.util.concurrent.atomic.AtomicReference + +import akka.actor.{ Actor, ActorRef } import akka.stream.MaterializerSettings -import akka.actor.{ Actor, Terminated, ActorRef } import org.reactivestreams.{ Publisher, Subscriber, Subscription } -import akka.stream.actor.ActorSubscriber.OnSubscribe -import akka.stream.actor.ActorSubscriberMessage.{ OnNext, OnError, OnComplete } -import akka.actor.Stash +import scala.collection.mutable /** * INTERNAL API */ private[akka] object MultiStreamOutputProcessor { - case class SubstreamRequestMore(substream: ActorRef, demand: Int) - case class SubstreamCancel(substream: ActorRef) + case class SubstreamKey(id: Long) + case class SubstreamRequestMore(substream: SubstreamKey, demand: Int) + case class SubstreamCancel(substream: SubstreamKey) + case class SubstreamSubscribe(substream: SubstreamKey, subscriber: Subscriber[Any]) - class SubstreamSubscription(val parent: ActorRef, val substream: ActorRef) extends Subscription { + class SubstreamSubscription(val parent: ActorRef, val substreamKey: SubstreamKey) extends Subscription { override def request(elements: Int): Unit = if (elements <= 0) throw new IllegalArgumentException("The number of requested elements must be > 0") - else parent ! SubstreamRequestMore(substream, elements) - override def cancel(): Unit = parent ! SubstreamCancel(substream) + else parent ! SubstreamRequestMore(substreamKey, elements) + override def cancel(): Unit = parent ! SubstreamCancel(substreamKey) override def toString = "SubstreamSubscription" + System.identityHashCode(this) } @@ -32,62 +34,85 @@ private[akka] object MultiStreamOutputProcessor { */ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { import MultiStreamOutputProcessor._ + private var nextId = 0 + private val substreamOutputs = mutable.Map.empty[SubstreamKey, SubstreamOutputs] - private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs] + class SubstreamOutputs(val key: SubstreamKey) extends SimpleOutputs(self, this) with Publisher[Any] { - class SubstreamOutputs extends Outputs { - private var completed: Boolean = false - private var demands: Int = 0 + sealed trait PublisherState + sealed trait CompletedState extends PublisherState + case object Open extends PublisherState + final case class Attached(sub: Subscriber[Any]) extends PublisherState + case object Completed extends CompletedState + final case class Failed(e: Throwable) extends CompletedState + + private val subscription = new SubstreamSubscription(self, key) + private val state = new AtomicReference[PublisherState](Open) override def subreceive: SubReceive = throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") - val substream = context.watch(context.actorOf( - IdentityProcessorImpl.props(settings) - .withDispatcher(context.props.dispatcher))) - val processor = ActorProcessor[AnyRef, AnyRef](substream) - - override def isClosed: Boolean = completed - override def complete(): Unit = { - if (!completed) substream ! OnComplete - completed = true + def enqueueOutputDemand(demand: Int): Unit = { + downstreamDemand += demand + pump.pump() } override def cancel(e: Throwable): Unit = { - if (!completed) substream ! OnError(e) - completed = true + if (!downstreamCompleted) { + closePublisher(Failed(e)) + downstreamCompleted = true + } } - override def enqueueOutputElement(elem: Any): Unit = { - demands -= 1 - substream ! OnNext(elem) + override def complete(): Unit = { + if (!downstreamCompleted) { + closePublisher(Completed) + downstreamCompleted = true + } } - def enqueueOutputDemand(demand: Int): Unit = demands += demand - override def demandAvailable: Boolean = demands > 0 - override val NeedsDemand: TransferState = new TransferState { - override def isReady: Boolean = demandAvailable - override def isCompleted: Boolean = completed + private def closePublisher(withState: CompletedState): Unit = { + state.getAndSet(withState) match { + case Attached(sub) ⇒ closeSubscriber(sub, withState) + case _: CompletedState ⇒ throw new IllegalStateException("Attempted to double shutdown publisher") + case Open ⇒ // No action needed + } } - override val NeedsDemandOrCancel: TransferState = new TransferState { - override def isReady: Boolean = demandAvailable || isClosed - override def isCompleted: Boolean = false + + private def closeSubscriber(s: Subscriber[Any], withState: CompletedState): Unit = withState match { + case Completed ⇒ s.onComplete() + case Failed(e) ⇒ s.onError(e) } + + override def subscribe(s: Subscriber[Any]): Unit = { + if (state.compareAndSet(Open, Attached(s))) self ! SubstreamSubscribe(key, s) + else { + state.get() match { + case _: Attached ⇒ s.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) + case c: CompletedState ⇒ closeSubscriber(s, c) + case Open ⇒ throw new IllegalStateException("Publisher cannot become open after being used before") + } + } + } + + def attachSubscriber(s: Subscriber[Any]): Unit = + if (subscriber eq null) { + subscriber = s + subscriber.onSubscribe(subscription) + } else subscriber.onError(new IllegalStateException("Cannot subscribe two or more Subscribers to this Publisher")) } protected def newSubstream(): SubstreamOutputs = { - val outputs = new SubstreamOutputs - outputs.substream ! OnSubscribe(new SubstreamSubscription(self, outputs.substream)) - substreamOutputs(outputs.substream) = outputs + val id = SubstreamKey(nextId) + nextId += 1 + val outputs = new SubstreamOutputs(id) + substreamOutputs(outputs.key) = outputs outputs } - def fullyCompleted: Boolean = primaryOutputsShutdown && isPumpFinished && context.children.isEmpty - - protected def invalidateSubstream(substream: ActorRef): Unit = { + protected def invalidateSubstream(substream: SubstreamKey): Unit = { substreamOutputs(substream).complete() substreamOutputs -= substream - shutdownHooks() pump() } @@ -96,21 +121,15 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS super.fail(e) } - // FIXME: proper shutdown scheduling - override def shutdownHooks(): Unit = if (fullyCompleted) super.shutdownHooks() - override def pumpFinished(): Unit = { - context.children foreach (_ ! OnComplete) + substreamOutputs.values foreach (_.complete()) super.pumpFinished() } val substreamManagement: Receive = { - case SubstreamRequestMore(key, demand) ⇒ - substreamOutputs(key).enqueueOutputDemand(demand) - pump() - case SubstreamCancel(key) ⇒ // FIXME: Terminated should handle this case. Maybe remove SubstreamCancel and just Poison self? - case Terminated(child) ⇒ invalidateSubstream(child) - + case SubstreamRequestMore(key, demand) ⇒ substreamOutputs(key).enqueueOutputDemand(demand) + case SubstreamCancel(key) ⇒ invalidateSubstream(key) + case SubstreamSubscribe(key, subscriber) ⇒ substreamOutputs(key).attachSubscriber(subscriber) } override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement @@ -138,7 +157,7 @@ private[akka] object TwoStreamInputProcessor { */ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSettings, val other: Publisher[Any]) extends ActorProcessorImpl(_settings) { - import TwoStreamInputProcessor._ + import akka.stream.impl.TwoStreamInputProcessor._ val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { override val subreceive: SubReceive = new SubReceive(waitingForUpstream) @@ -166,10 +185,11 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett other.subscribe(new OtherActorSubscriber(self)) - override def shutdownHooks(): Unit = { + override def pumpFinished(): Unit = { secondaryInputs.cancel() - super.shutdownHooks() + super.pumpFinished() } + } /** @@ -195,7 +215,7 @@ private[akka] object MultiStreamInputProcessor { * INTERNAL API */ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { - import MultiStreamInputProcessor._ + import akka.stream.impl.MultiStreamInputProcessor._ var nextId = 0 private val substreamInputs = collection.mutable.Map.empty[SubstreamKey, SubstreamInputs] @@ -246,9 +266,9 @@ private[akka] abstract class MultiStreamInputProcessor(_settings: MaterializerSe super.fail(e) } - override def shutdownHooks(): Unit = { + override def pumpFinished(): Unit = { substreamInputs.values foreach (_.cancel()) - super.shutdownHooks() + super.pumpFinished() } override def activeReceive = primaryInputs.subreceive orElse primaryOutputs.subreceive orElse substreamManagement diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index 6eb6390d57..6a1a948514 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -65,6 +65,9 @@ private[akka] trait Outputs { def subreceive: SubReceive + // FIXME: This is completely unnecessary, refactor MapFutureProcessorImpl + def demandCount: Long = -1L + def complete(): Unit def cancel(e: Throwable): Unit def isClosed: Boolean @@ -138,7 +141,6 @@ private[akka] case class TransferPhase(precondition: TransferState)(val action: * INTERNAL API */ private[akka] trait Pump { - protected def pumpContext: ActorRefFactory private var transferState: TransferState = NotInitialized private var currentAction: () ⇒ Unit = () ⇒ throw new IllegalStateException("Pump has been not initialized with a phase") diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala index dfdd0fca1b..5902f5daf9 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala @@ -37,13 +37,7 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) override def inputOnError(e: Throwable): Unit = fail(e) } - val primaryOutputs: Outputs = - new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, readPump) { - override def afterShutdown(): Unit = { - tcpInputs.cancel() - TcpStreamActor.this.tryShutdown() - } - } + val primaryOutputs: Outputs = new SimpleOutputs(self, readPump) object tcpInputs extends DefaultInputTransferStates { private var closed: Boolean = false @@ -150,7 +144,6 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) tryShutdown() } override protected def pumpFailed(e: Throwable): Unit = fail(e) - override protected def pumpContext: ActorRefFactory = context } object readPump extends Pump { @@ -160,11 +153,11 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) } override protected def pumpFinished(): Unit = { + tcpInputs.cancel() primaryOutputs.complete() tryShutdown() } override protected def pumpFailed(e: Throwable): Unit = fail(e) - override protected def pumpContext: ActorRefFactory = context } final override def receive = { diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index c851e7ac8a..f93c8bfc27 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -33,11 +33,7 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, import akka.stream.io.TcpListenStreamActor._ import context.system - object primaryOutputs extends FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { - override def afterShutdown(): Unit = { - incomingConnections.cancel() - context.stop(self) - } + object primaryOutputs extends SimpleOutputs(self, pump = this) { override def waitingExposedPublisher: Actor.Receive = { case ExposedPublisher(publisher) ⇒ @@ -51,9 +47,12 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, def getExposedPublisher = exposedPublisher } - override protected def pumpFinished(): Unit = incomingConnections.cancel() + override protected def pumpFinished(): Unit = { + incomingConnections.cancel() + context.stop(self) + } + override protected def pumpFailed(e: Throwable): Unit = fail(e) - override protected def pumpContext: ActorRefFactory = context val incomingConnections: Inputs = new DefaultInputTransferStates { var listener: ActorRef = _ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 84c4fc0be9..b6135868b0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -303,6 +303,8 @@ abstract class Duct[In, Out] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out] + def fanout(initialBufferSize: Int, maximumBufferSize: Int): Duct[In, Out] + /** * Materialize this `Duct` by attaching it to the specified downstream `subscriber` * and return a `Subscriber` representing the input side of the `Duct`. @@ -439,6 +441,9 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, T] = new DuctAdapter(delegate.buffer(size, overflowStrategy)) + override def fanout(initialBufferSize: Int, maximumBufferSize: Int): Duct[In, T] = + new DuctAdapter(delegate.fanout(initialBufferSize, maximumBufferSize)) + override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Duct[In, U] = new DuctAdapter(delegate.expand(seed.apply, (s: S) ⇒ { val p = extrapolate.apply(s) diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 88b1298596..74ec5d2782 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -367,6 +367,8 @@ abstract class Flow[T] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] + def fanout(initialBufferSize: Int, maximumBufferSize: Int): Flow[T] + /** * Returns a [[scala.concurrent.Future]] that will be fulfilled with the first * thing that is signaled to this stream, which can be either an element (after @@ -510,6 +512,9 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] = new FlowAdapter(delegate.buffer(size, overflowStrategy)) + override def fanout(initialBufferSize: Int, maximumBufferSize: Int): Flow[T] = + new FlowAdapter(delegate.fanout(initialBufferSize, maximumBufferSize)) + override def expand[S, U](seed: Function[T, S], extrapolate: Function[S, Pair[U, S]]): Flow[U] = new FlowAdapter(delegate.expand(seed.apply, (s: S) ⇒ { val p = extrapolate.apply(s) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index d2971da57e..49975a04ac 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -287,6 +287,8 @@ trait Duct[In, +Out] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Duct[In, Out] + def fanout(initialBufferSize: Int, maximumBufferSize: Int): Duct[In, Out] + /** * Append the operations of a [[Duct]] to this `Duct`. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 13c5b57bed..c7b1b8d533 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -357,6 +357,8 @@ trait Flow[+T] { */ def buffer(size: Int, overflowStrategy: OverflowStrategy): Flow[T] + def fanout(initialBufferSize: Int, maximumBufferSize: Int): Flow[T] + /** * Append the operations of a [[Duct]] to this flow. */ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index 23bdd9167c..a3d991025f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -3,23 +3,12 @@ */ package akka.stream.scaladsl2 -import scala.language.higherKinds -import scala.collection.immutable -import scala.concurrent.Future import akka.stream._ -import akka.stream.impl.BlackholeSubscriber import akka.stream.impl2.Ast._ -import scala.annotation.unchecked.uncheckedVariance -import akka.stream.impl.BlackholeSubscriber -import scala.concurrent.Promise -import akka.stream.impl.EmptyPublisher -import akka.stream.impl.IterablePublisher -import akka.stream.impl2.ActorBasedFlowMaterializer import org.reactivestreams._ -import scala.concurrent.duration.FiniteDuration -import scala.util.Try -import scala.util.Failure -import scala.util.Success + +import scala.annotation.unchecked.uncheckedVariance +import scala.language.higherKinds /** * This is the interface from which all concrete Flows inherit. No generic @@ -113,6 +102,12 @@ final case class FlowWithSource[-In, +Out](private[scaladsl2] val input: Source[ pubOut.publisher(mf) } + def toFanoutPublisher(initialBufferSize: Int, maximumBufferSize: Int)(implicit materializer: FlowMaterializer): Publisher[Out @uncheckedVariance] = { + val pubOut = PublisherSink.withFanout[Out](initialBufferSize, maximumBufferSize) + val mf = withSink(pubOut).run() + pubOut.publisher(mf) + } + def publishTo(subscriber: Subscriber[Out @uncheckedVariance])(implicit materializer: FlowMaterializer): Unit = toPublisher().subscribe(subscriber) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 81ac1cf31f..2e1ca8bf88 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -3,13 +3,15 @@ */ package akka.stream.scaladsl2 +import akka.actor.Props + import scala.annotation.unchecked.uncheckedVariance import scala.concurrent.{ Future, Promise } import scala.util.{ Failure, Success, Try } import org.reactivestreams.{ Publisher, Subscriber, Subscription } import akka.stream.Transformer -import akka.stream.impl.BlackholeSubscriber -import akka.stream.impl2.ActorBasedFlowMaterializer +import akka.stream.impl.{ FanoutProcessorImpl, BlackholeSubscriber } +import akka.stream.impl2.{ ActorProcessorFactory, ActorBasedFlowMaterializer } import java.util.concurrent.atomic.AtomicReference /** @@ -99,15 +101,30 @@ trait SinkWithKey[-Out, T] extends Sink[Out] { object PublisherSink { private val instance = new PublisherSink[Nothing] def apply[T]: PublisherSink[T] = instance.asInstanceOf[PublisherSink[T]] + def withFanout[T](initialBufferSize: Int, maximumBufferSize: Int): FanoutPublisherSink[T] = + new FanoutPublisherSink[T](initialBufferSize, maximumBufferSize) } -class PublisherSink[Out]() extends SinkWithKey[Out, Publisher[Out]] { +class PublisherSink[Out] extends SinkWithKey[Out, Publisher[Out]] { def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] = flowPublisher def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this) override def toString: String = "PublisherSink" } +class FanoutPublisherSink[Out](initialBufferSize: Int, maximumBufferSize: Int) extends SinkWithKey[Out, Publisher[Out]] { + def publisher(m: MaterializedSink): Publisher[Out] = m.getSinkFor(this) + override def attach(flowPublisher: Publisher[Out], materializer: ActorBasedFlowMaterializer, flowName: String): Publisher[Out] = { + val fanoutActor = materializer.actorOf( + Props(new FanoutProcessorImpl(materializer.settings, initialBufferSize, maximumBufferSize)), "fanout") + val fanoutProcessor = ActorProcessorFactory[Out, Out](fanoutActor) + flowPublisher.subscribe(fanoutProcessor) + fanoutProcessor + } + + override def toString: String = "Fanout" +} + /** * Holds a [[scala.concurrent.Future]] that will be fulfilled with the first * thing that is signaled to this stream, which can be either an element (after diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index a7306a2127..8e2f2a46bb 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -69,18 +69,21 @@ class FlowGroupBySpec extends AkkaSpec { s2.expectNoMsg(100.millis) s2.request(2) s2.expectNext(2) + + // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box + s1.request(1) s2.expectNext(4) s2.expectNoMsg(100.millis) - s1.request(1) s1.expectNext(3) s2.request(1) + // Important to request here on the OTHER stream because the buffer space is exactly one without the fanout box + s1.request(1) s2.expectNext(6) s2.expectComplete() - s1.request(1) s1.expectNext(5) s1.expectComplete() @@ -137,67 +140,6 @@ class FlowGroupBySpec extends AkkaSpec { // substream.expectComplete() } - "work with fanout on substreams" in new SubstreamsSupport(groupCount = 2) { - val substreamPublisher = getSubPublisher(1) - getSubPublisher(0) - - val substreamSubscriber1 = StreamPuppet(substreamPublisher) - val substreamSubscriber2 = StreamPuppet(substreamPublisher) - - substreamSubscriber1.request(1) - substreamSubscriber1.expectNext(1) - substreamSubscriber2.request(1) - substreamSubscriber2.expectNext(1) - - substreamSubscriber1.request(1) - substreamSubscriber1.expectNext(3) - substreamSubscriber2.request(1) - substreamSubscriber2.expectNext(3) - } - - "work with fanout on master stream" in { - val source = Flow((1 to 4).iterator).toPublisher() - val groupStream = Flow(source).groupBy(_ % 2).toPublisher() - val masterSubscriber1 = StreamTestKit.SubscriberProbe[(Int, Publisher[Int])]() - val masterSubscriber2 = StreamTestKit.SubscriberProbe[(Int, Publisher[Int])]() - - groupStream.subscribe(masterSubscriber1) - groupStream.subscribe(masterSubscriber2) - - val masterSubscription1 = masterSubscriber1.expectSubscription() - val masterSubscription2 = masterSubscriber2.expectSubscription() - - masterSubscription1.request(2) - masterSubscription2.request(1) - - val (key11, substream11) = masterSubscriber1.expectNext() - key11 should be(1) - val (key21, substream21) = masterSubscriber2.expectNext() - key21 should be(1) - - val puppet11 = StreamPuppet(substream11) - val puppet21 = StreamPuppet(substream21) - - puppet11.request(2) - puppet11.expectNext(1) - puppet11.expectNext(3) - - puppet21.request(1) - puppet21.expectNext(1) - puppet21.cancel() - - masterSubscription2.cancel() - - val (key12, substream12) = masterSubscriber1.expectNext() - key12 should be(0) - - val puppet12 = StreamPuppet(substream12) - puppet12.request(1) - puppet12.expectNext(2) - puppet12.cancel() - masterSubscription1.cancel() - } - "work with empty input stream" in { val publisher = Flow(List.empty[Int]).groupBy(_ % 2).toPublisher() val subscriber = StreamTestKit.SubscriberProbe[(Int, Publisher[Int])]() diff --git a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala index 4c97da6bc4..291e8e1593 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowPrefixAndTailSpec.scala @@ -50,7 +50,7 @@ class FlowPrefixAndTailSpec extends AkkaSpec { Await.result(Flow(tail).grouped(11).toFuture(), 3.seconds) should be(1 to 10) } - "work if size of take is equals to stream size" in { + "work if size of take is equal to stream size" in { val (takes, tail) = Await.result(Flow((1 to 10).iterator).prefixAndTail(10).toFuture(), 3.seconds) takes should be(1 to 10) val subscriber = StreamTestKit.SubscriberProbe[Int]() diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index 863612d5c9..ff9c9666bf 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -29,6 +29,8 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece val identity: Flow[Any] ⇒ Flow[Any] = in ⇒ in.map(e ⇒ e) val identity2: Flow[Any] ⇒ Flow[Any] = in ⇒ identity(in) + def identityWithFanout(initialBufferSize: Int, maximumBufferSize: Int): Flow[Any] ⇒ Flow[Any] = + in ⇒ in.fanout(initialBufferSize, maximumBufferSize) "A Flow" must { @@ -107,11 +109,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece "A Flow with multiple subscribers (FanOutBox)" must { "adapt speed to the currently slowest subscriber" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) - - new ChainSetup(identity, tweakedSettings) { + new ChainSetup(identityWithFanout(initialBufferSize = 1, maximumBufferSize = 1), settings.copy(initialInputBufferSize = 1)) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -137,11 +135,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "support slow subscriber with fan-out 2" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = 2, maxSize = 2) - - new ChainSetup(identity, tweakedSettings) { + new ChainSetup(identityWithFanout(initialBufferSize = 2, maximumBufferSize = 2), settings.copy(initialInputBufferSize = 1)) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -180,11 +174,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "incoming subscriber while elements were requested before" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) - - new ChainSetup(identity, tweakedSettings) { + new ChainSetup(identityWithFanout(initialBufferSize = 1, maximumBufferSize = 1), settings.copy(initialInputBufferSize = 1)) { downstreamSubscription.request(5) upstream.expectRequest(upstreamSubscription, 1) upstreamSubscription.sendNext("a1") @@ -221,11 +211,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "blocking subscriber cancels subscription" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) - - new ChainSetup(identity, tweakedSettings) { + new ChainSetup(identityWithFanout(initialBufferSize = 1, maximumBufferSize = 1), settings.copy(initialInputBufferSize = 1)) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() publisher.subscribe(downstream2) val downstream2Subscription = downstream2.expectSubscription() @@ -260,11 +246,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "after initial upstream was completed future subscribers' onComplete should be called instead of onSubscribed" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) - - new ChainSetup(identity, tweakedSettings) { + new ChainSetup(identityWithFanout(initialBufferSize = 1, maximumBufferSize = 1), settings.copy(initialInputBufferSize = 1)) { val downstream2 = StreamTestKit.SubscriberProbe[Any]() // don't link it just yet @@ -303,15 +285,12 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "after initial upstream reported an error future subscribers' onError should be called instead of onSubscribed" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = 1) - - new ChainSetup[Int, String](_.map(_ ⇒ throw TestException), tweakedSettings) { + new ChainSetup[Int, String]( + _.map(_ ⇒ throw TestException).fanout(initialBufferSize = 1, maximumBufferSize = 1), settings.copy(initialInputBufferSize = 1)) { downstreamSubscription.request(1) upstreamSubscription.expectRequest(1) - EventFilter[TestException.type](occurrences = 1) intercept { + EventFilter[TestException.type](occurrences = 2) intercept { upstreamSubscription.sendNext(5) upstreamSubscription.expectRequest(1) upstreamSubscription.expectCancellation() @@ -325,11 +304,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece } "when all subscriptions were cancelled future subscribers' onError should be called" in { - val tweakedSettings = settings - .withInputBuffer(initialSize = 1, maxSize = settings.maxInputBufferSize) - .withFanOutBuffer(initialSize = settings.initialFanOutBufferSize, maxSize = settings.maxFanOutBufferSize) - - new ChainSetup(identity, tweakedSettings) { + new ChainSetup(identityWithFanout(initialBufferSize = 1, maximumBufferSize = 16), settings.copy(initialInputBufferSize = 1)) { upstreamSubscription.expectRequest(1) downstreamSubscription.cancel() upstreamSubscription.expectCancellation() diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 98044f7002..489d33ceec 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -59,10 +59,10 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.request(2) s1.expectNext(1) s1.expectNext(2) + s1.request(1) s1.expectComplete() val s2 = StreamPuppet(getSubPublisher()) - masterSubscriber.expectComplete() s2.request(1) s2.expectNext(3) @@ -70,8 +70,10 @@ class FlowSplitWhenSpec extends AkkaSpec { s2.request(1) s2.expectNext(4) + s2.request(1) s2.expectComplete() + masterSubscriber.expectComplete() } "support cancelling substreams" in new SubstreamsSupport(splitWhen = 5, elementCount = 8) { @@ -84,8 +86,10 @@ class FlowSplitWhenSpec extends AkkaSpec { s2.expectNext(6) s2.expectNext(7) s2.expectNext(8) + s2.request(1) s2.expectComplete() + masterSubscription.request(1) masterSubscriber.expectComplete() } @@ -97,6 +101,7 @@ class FlowSplitWhenSpec extends AkkaSpec { s1.expectNext(2) s1.expectNext(3) s1.expectNext(4) + s1.request(1) s1.expectComplete() } diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index cd02e32a3e..1fe59fc710 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -146,7 +146,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { case None ⇒ Nil case Some(_) ⇒ List(-1) } - }). + }).fanout(1, 1). toPublisher() val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 9796f4e4e6..f2a60ff558 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -114,7 +114,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d tot += length List(tot) } - }). + }).fanout(2, 2). toPublisher() val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala index a182038769..ef9ecab700 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowTransformSpec.scala @@ -115,7 +115,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(tot) } }). - toPublisher() + toFanoutPublisher(2, 2) val c1 = StreamTestKit.SubscriberProbe[Int]() p2.subscribe(c1) val sub1 = c1.expectSubscription()