diff --git a/akka-stream/src/main/scala/akka/stream/Transformer.scala b/akka-stream/src/main/scala/akka/stream/Transformer.scala index 110a5319d0..688cd51423 100644 --- a/akka-stream/src/main/scala/akka/stream/Transformer.scala +++ b/akka-stream/src/main/scala/akka/stream/Transformer.scala @@ -31,16 +31,24 @@ abstract class Transformer[-T, +U] { def isComplete: Boolean = false /** - * Invoked before signaling normal completion to the downstream consumers + * Invoked before the Transformer terminates (either normal completion or after an onError) * to produce a (possibly empty) sequence of elements in response to the * end-of-stream event. + * + * This method is only called if [[Transformer#onError]] does not throw an exception. The default implementation + * of [[Transformer#onError]] throws the received cause forcing the error to propagate downstream immediately. + * + * @param e Contains a non-empty option with the error causing the termination or an empty option + * if the Transformer was completed normally */ - def onComplete(): immutable.Seq[U] = Nil + def onTermination(e: Option[Throwable]): immutable.Seq[U] = Nil /** - * Invoked when failure is signaled from upstream. + * Invoked when failure is signaled from upstream. If this method throws an exception, then onError is immediately + * propagated downstream. If this method completes normally then [[Transformer#onTermination]] is invoked as a final + * step, passing the original cause. */ - def onError(cause: Throwable): Unit = () + def onError(cause: Throwable): Unit = throw cause /** * Invoked after normal completion or error. @@ -53,23 +61,3 @@ abstract class Transformer[-T, +U] { */ def name: String = "transform" } - -/** - * General interface for stream transformation. - * @see [[akka.stream.scaladsl.Flow#transformRecover]] - * @see [[akka.stream.javadsl.Flow#transformRecover]] - * @see [[Transformer]] - */ -abstract class RecoveryTransformer[-T, +U] extends Transformer[T, U] { - /** - * Invoked when failure is signaled from upstream to emit an additional - * sequence of elements before the stream ends. - */ - def onErrorRecover(cause: Throwable): immutable.Seq[U] - - /** - * Name of this transformation step. Used as part of the actor name. - * Facilitates debugging and logging. - */ - override def name: String = "transformRecover" -} \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/extra/Log.scala b/akka-stream/src/main/scala/akka/stream/extra/Log.scala index f726cccb30..d03a96895d 100644 --- a/akka-stream/src/main/scala/akka/stream/extra/Log.scala +++ b/akka-stream/src/main/scala/akka/stream/extra/Log.scala @@ -76,7 +76,7 @@ class Log[T](override val name: String = "log") extends Transformer[T, T] with T log.info("OnNext: [{}]", element) } - final override def onComplete(): immutable.Seq[T] = { + final override def onTermination(e: Option[Throwable]): immutable.Seq[T] = { logOnComplete() Nil } @@ -85,7 +85,10 @@ class Log[T](override val name: String = "log") extends Transformer[T, T] with T log.info("OnComplete") } - final override def onError(cause: Throwable): Unit = logOnError(cause) + final override def onError(cause: Throwable): Unit = { + logOnError(cause) + throw cause + } def logOnError(cause: Throwable): Unit = { log.error(cause, "OnError") diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 69c51fcb6c..f8d58d1689 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -10,7 +10,6 @@ import org.reactivestreams.spi.Subscriber import akka.actor.ActorRefFactory import akka.stream.{ MaterializerSettings, FlowMaterializer } import akka.stream.Transformer -import akka.stream.RecoveryTransformer import scala.util.Try import scala.concurrent.Future import scala.util.Success @@ -34,9 +33,6 @@ private[akka] object Ast { case class Transform(transformer: Transformer[Any, Any]) extends AstNode { override def name = transformer.name } - case class Recover(recoveryTransformer: RecoveryTransformer[Any, Any]) extends AstNode { - override def name = recoveryTransformer.name - } case class GroupBy(f: Any ⇒ Any) extends AstNode { override def name = "groupBy" } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index 6df05543b5..146ecb1034 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -8,11 +8,10 @@ import scala.util.{ Failure, Success } import scala.util.control.NonFatal import org.reactivestreams.api.Consumer import org.reactivestreams.spi.{ Subscriber, Subscription } -import Ast.{ AstNode, Recover, Transform } +import Ast.{ AstNode, Transform } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala } import akka.stream.MaterializerSettings import akka.stream.Transformer -import akka.stream.RecoveryTransformer /** * INTERNAL API @@ -43,11 +42,9 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon private[akka] object ActorConsumer { import Ast._ - def props(settings: MaterializerSettings, op: AstNode): Props = - (op match { - case t: Transform ⇒ Props(new TransformActorConsumer(settings, t.transformer)) - case r: Recover ⇒ Props(new RecoverActorConsumer(settings, r.recoveryTransformer)) - }).withDispatcher(settings.dispatcher) + def props(settings: MaterializerSettings, op: AstNode) = op match { + case t: Transform ⇒ Props(new TransformActorConsumer(settings, t.transformer)) withDispatcher (settings.dispatcher) + } } /** @@ -124,13 +121,15 @@ private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSet */ private[akka] class TransformActorConsumer(_settings: MaterializerSettings, transformer: Transformer[Any, Any]) extends AbstractActorConsumer(_settings) with ActorLogging { + var error: Option[Throwable] = None // Null is the proper default here + var hasCleanupRun = false private var onCompleteCalled = false private def callOnComplete(): Unit = { if (!onCompleteCalled) { onCompleteCalled = true - try transformer.onComplete() - catch { case NonFatal(e) ⇒ log.error(e, "failure during onComplete") } + try transformer.onTermination(error) + catch { case NonFatal(e) ⇒ log.error(e, "failure during onTermination") } shutdown() } } @@ -142,9 +141,15 @@ private[akka] class TransformActorConsumer(_settings: MaterializerSettings, tran } override def onError(cause: Throwable): Unit = { - log.error(cause, "terminating due to onError") - transformer.onError(cause) - shutdown() + try { + transformer.onError(cause) + error = Some(cause) + onComplete() + } catch { + case NonFatal(e) ⇒ + log.error(e, "terminating due to onError") + shutdown() + } } override def onComplete(): Unit = { @@ -161,15 +166,3 @@ private[akka] class TransformActorConsumer(_settings: MaterializerSettings, tran try super.postStop() finally if (!hasCleanupRun) transformer.cleanup() } } - -/** - * INTERNAL API - */ -private[akka] class RecoverActorConsumer(_settings: MaterializerSettings, recoveryTransformer: RecoveryTransformer[Any, Any]) - extends TransformActorConsumer(_settings, recoveryTransformer) { - - override def onError(cause: Throwable): Unit = { - recoveryTransformer.onErrorRecover(cause) - onComplete() - } -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 1e648c3e37..b570c8a8a5 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -4,10 +4,12 @@ package akka.stream.impl import org.reactivestreams.api.Processor -import org.reactivestreams.spi.Subscriber +import org.reactivestreams.spi.{ Subscription, Subscriber } import akka.actor._ import akka.stream.MaterializerSettings import akka.event.LoggingReceive +import java.util.Arrays +import scala.util.control.NonFatal /** * INTERNAL API @@ -17,7 +19,6 @@ private[akka] object ActorProcessor { def props(settings: MaterializerSettings, op: AstNode): Props = (op match { case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t.transformer)) - case r: Recover ⇒ Props(new RecoverProcessorImpl(settings, r.recoveryTransformer)) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) case g: GroupBy ⇒ Props(new GroupByProcessorImpl(settings, g.f)) case m: Merge ⇒ Props(new MergeImpl(settings, m.other)) @@ -35,92 +36,183 @@ private[akka] class ActorProcessor[I, O]( final val impl: ActorRef) extends Proc /** * INTERNAL API */ -private[akka] trait PrimaryInputs { - this: Actor ⇒ - // FIXME: have a NoInputs here to avoid nulls - // FIXME: make it a val and remove all lazy vals caching TransferStates - protected var primaryInputs: Inputs = _ +private[akka] abstract class BatchingInputBuffer(val size: Int, val pump: Pump) extends DefaultInputTransferStates { + require(size > 0, "buffer size cannot be zero") + require((size & (size - 1)) == 0, "buffer size must be a power of two") + // TODO: buffer and batch sizing heuristics + private var upstream: Subscription = _ + private val inputBuffer = Array.ofDim[AnyRef](size) + private var inputBufferElements = 0 + private var nextInputElementCursor = 0 + private var upstreamCompleted = false + private val IndexMask = size - 1 - def settings: MaterializerSettings + private def requestBatchSize = math.max(1, inputBuffer.length / 2) + private var batchRemaining = requestBatchSize - def waitingForUpstream: Receive = { - case OnComplete ⇒ - // Instead of introducing an edge case, handle it in the general way - primaryInputs = EmptyInputs - transitionToRunningWhenReady() - case OnSubscribe(subscription) ⇒ - assert(subscription != null) - primaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize) - transitionToRunningWhenReady() - case OnError(cause) ⇒ primaryInputOnError(cause) - } + override val subreceive: SubReceive = new SubReceive(waitingForUpstream) - def transitionToRunningWhenReady(): Unit = - if (primaryInputs ne null) { - primaryInputs.prefetch() - primaryInputsReady() + override def dequeueInputElement(): Any = { + val elem = inputBuffer(nextInputElementCursor) + inputBuffer(nextInputElementCursor) = null + + batchRemaining -= 1 + if (batchRemaining == 0 && !upstreamCompleted) { + upstream.requestMore(requestBatchSize) + batchRemaining = requestBatchSize } - def upstreamManagement: Receive = { - case OnNext(element) ⇒ - primaryInputs.enqueueInputElement(element) - pumpInputs() - case OnComplete ⇒ - primaryInputs.complete() - primaryInputOnComplete() - pumpInputs() - case OnError(cause) ⇒ primaryInputOnError(cause) + inputBufferElements -= 1 + nextInputElementCursor += 1 + nextInputElementCursor &= IndexMask + elem + } + + protected final def enqueueInputElement(elem: Any): Unit = { + if (isOpen) { + if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun") + inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] + inputBufferElements += 1 + } + pump.pump() + } + + override def cancel(): Unit = { + if (!upstreamCompleted) { + upstreamCompleted = true + if (upstream ne null) upstream.cancel() + clear() + } + } + override def isClosed: Boolean = upstreamCompleted + + private def clear(): Unit = { + Arrays.fill(inputBuffer, 0, inputBuffer.length, null) + inputBufferElements = 0 + } + + override def inputsDepleted = upstreamCompleted && inputBufferElements == 0 + override def inputsAvailable = inputBufferElements > 0 + + protected def onComplete(): Unit = { + upstreamCompleted = true + subreceive.become(completed) + pump.pump() + } + + protected def onSubscribe(subscription: Subscription): Unit = { + assert(subscription != null) + upstream = subscription + // Prefetch + upstream.requestMore(inputBuffer.length) + subreceive.become(upstreamRunning) + } + + protected def onError(e: Throwable): Unit = { + upstreamCompleted = true + subreceive.become(completed) + inputOnError(e) + } + + protected def waitingForUpstream: Actor.Receive = { + case OnComplete ⇒ onComplete() + case OnSubscribe(subscription) ⇒ onSubscribe(subscription) + case OnError(cause) ⇒ onError(cause) + } + + protected def upstreamRunning: Actor.Receive = { + case OnNext(element) ⇒ enqueueInputElement(element) + case OnComplete ⇒ onComplete() + case OnError(cause) ⇒ onError(cause) + } + + protected def completed: Actor.Receive = { + case OnSubscribe(subscription) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") + } + + protected def inputOnError(e: Throwable): Unit = { + clear() } - def pumpInputs(): Unit - def primaryInputsReady(): Unit - def primaryInputOnError(cause: Throwable): Unit - def primaryInputOnComplete(): Unit } /** * INTERNAL API */ -private[akka] trait PrimaryOutputs { - this: Actor ⇒ - // FIXME: avoid nulls and add a failing guard instance instead +private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBufferSize: Int, self: ActorRef, val pump: Pump) + extends DefaultOutputTransferStates + with SubscriberManagement[Any] { + + override type S = ActorSubscription[Any] + override def createSubscription(subscriber: Subscriber[Any]): S = + new ActorSubscription(self, subscriber) protected var exposedPublisher: ActorPublisher[Any] = _ - def settings: MaterializerSettings + private var downstreamBufferSpace = 0 + private var downstreamCompleted = false + def demandAvailable = downstreamBufferSpace > 0 - val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize) { - override type S = ActorSubscription[Any] - override def createSubscription(subscriber: Subscriber[Any]): S = - new ActorSubscription(self, subscriber) - override def afterShutdown(completed: Boolean): Unit = primaryOutputsFinished(completed) + override val receive = new SubReceive(waitingExposedPublisher) + + def enqueueOutputElement(elem: Any): Unit = { + downstreamBufferSpace -= 1 + pushToDownstream(elem) } - def waitingExposedPublisher: Receive = { + def complete(): Unit = + if (!downstreamCompleted) { + downstreamCompleted = true + completeDownstream() + } + + def cancel(e: Throwable): Unit = { + if (!downstreamCompleted) { + downstreamCompleted = true + abortDownstream(e) + } + if (exposedPublisher ne null) exposedPublisher.shutdown(Some(e)) + } + + def isClosed: Boolean = downstreamCompleted + + def afterShutdown(): Unit + + override protected def requestFromUpstream(elements: Int): Unit = downstreamBufferSpace += elements + + private def subscribePending(): Unit = + exposedPublisher.takePendingSubscribers() foreach super.registerSubscriber + + override protected def shutdown(completed: Boolean): Unit = { + if (exposedPublisher ne null) { + if (completed) exposedPublisher.shutdown(None) + else exposedPublisher.shutdown(Some(new IllegalStateException("Cannot subscribe to shutdown publisher"))) + } + afterShutdown() + } + + override protected def cancelUpstream(): Unit = { + downstreamCompleted = true + } + + protected def waitingExposedPublisher: Actor.Receive = { case ExposedPublisher(publisher) ⇒ exposedPublisher = publisher - primaryOutputsReady() - case _ ⇒ throw new IllegalStateException("The first message must be ExposedPublisher") + receive.become(downstreamRunning) + case other ⇒ + throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") } - def downstreamManagement: Receive = { + protected def downstreamRunning: Actor.Receive = { case SubscribePending ⇒ subscribePending() case RequestMore(subscription, elements) ⇒ - primaryOutputs.handleRequest(subscription.asInstanceOf[ActorSubscription[Any]], elements) - pumpOutputs() + moreRequested(subscription.asInstanceOf[ActorSubscription[Any]], elements) + pump.pump() case Cancel(subscription) ⇒ - primaryOutputs.removeSubscription(subscription.asInstanceOf[ActorSubscription[Any]]) - pumpOutputs() + unregisterSubscription(subscription.asInstanceOf[ActorSubscription[Any]]) + pump.pump() } - private def subscribePending(): Unit = - exposedPublisher.takePendingSubscribers() foreach primaryOutputs.addSubscriber - - def primaryOutputsFinished(completed: Boolean): Unit - def primaryOutputsReady(): Unit - - def pumpOutputs(): Unit - } /** @@ -130,70 +222,51 @@ private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettin extends Actor with ActorLogging with SoftShutdown - with PrimaryInputs - with PrimaryOutputs with Pump { - override def receive = waitingExposedPublisher - - override def primaryInputOnError(e: Throwable): Unit = fail(e) - override def primaryInputOnComplete(): Unit = context.become(flushing) - override def primaryInputsReady(): Unit = { - setTransferState(initialTransferState) - context.become(running) + // FIXME: make pump a member + protected val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { + override def inputOnError(e: Throwable): Unit = ActorProcessorImpl.this.onError(e) } - override def primaryOutputsReady(): Unit = context.become(downstreamManagement orElse waitingForUpstream) - override def primaryOutputsFinished(completed: Boolean): Unit = { - isShuttingDown = true - if (completed) - shutdownReason = None - shutdown() - } + protected val primaryOutputs: Outputs = + new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, this) { + override def afterShutdown(): Unit = { + primaryOutputsShutdown = true + shutdownHooks() + } + } - def running: Receive = LoggingReceive(downstreamManagement orElse upstreamManagement) + override def receive = primaryInputs.subreceive orElse primaryOutputs.receive - def flushing: Receive = downstreamManagement orElse { - case OnSubscribe(subscription) ⇒ fail(new IllegalStateException("Cannot subscribe shutdown subscriber")) - case _ ⇒ // ignore everything else - } + protected def onError(e: Throwable): Unit = fail(e) protected def fail(e: Throwable): Unit = { - shutdownReason = Some(e) log.error(e, "failure during processing") // FIXME: escalate to supervisor instead + primaryInputs.cancel() primaryOutputs.cancel(e) - shutdown() + primaryOutputsShutdown = true + softShutdown() } - lazy val needsPrimaryInputAndDemand = primaryInputs.NeedsInput && primaryOutputs.NeedsDemand - - protected def initialTransferState: TransferState - override val pumpContext = context - override def pumpInputs(): Unit = pump() - override def pumpOutputs(): Unit = pump() override def pumpFinished(): Unit = { if (primaryInputs.isOpen) primaryInputs.cancel() - context.become(flushing) primaryOutputs.complete() } override def pumpFailed(e: Throwable): Unit = fail(e) - var isShuttingDown = false - var shutdownReason: Option[Throwable] = ActorPublisher.NormalShutdownReason - - def shutdown(): Unit = { - if (primaryInputs ne null) primaryInputs.cancel() - exposedPublisher.shutdown(shutdownReason) + protected def shutdownHooks(): Unit = { + primaryInputs.cancel() softShutdown() } + var primaryOutputsShutdown = false + override def postStop(): Unit = { - if (exposedPublisher ne null) - exposedPublisher.shutdown(shutdownReason) // Non-gracefully stopped, do our best here - if (!isShuttingDown) + if (!primaryOutputsShutdown) primaryOutputs.cancel(new IllegalStateException("Processor actor terminated abruptly")) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 3d5f3e0cb9..7133a6d080 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -8,13 +8,12 @@ import scala.concurrent.{ Future, Promise } import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer -import Ast.{ AstNode, Recover, Transform } +import Ast.{ AstNode, Transform } import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import scala.util.Success import scala.util.Failure import akka.stream.Transformer -import akka.stream.RecoveryTransformer import org.reactivestreams.api.Consumer import akka.stream.scaladsl.Duct @@ -32,12 +31,12 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def toFuture(materializer: FlowMaterializer): Future[O] = { val p = Promise[O]() - transformRecover(new RecoveryTransformer[O, Unit] { + transform(new Transformer[O, Unit] { var done = false override def onNext(in: O) = { p success in; done = true; Nil } - override def onErrorRecover(e: Throwable) = { p failure e; Nil } + override def onError(e: Throwable) = { p failure e } override def isComplete = done - override def onComplete() = { p.tryFailure(new NoSuchElementException("empty stream")); Nil } + override def onTermination(e: Option[Throwable]) = { p.tryFailure(new NoSuchElementException("empty stream")); Nil } }).consume(materializer) p.future } @@ -45,15 +44,16 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def consume(materializer: FlowMaterializer): Unit = materializer.consume(producerNode, ops) override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit = - transformRecover(new RecoveryTransformer[O, Unit] { - var ok = true + transform(new Transformer[O, Unit] { override def onNext(in: O) = Nil - override def onErrorRecover(e: Throwable) = { + override def onError(e: Throwable) = { callback(Failure(e)) - ok = false + throw e + } + override def onTermination(e: Option[Throwable]) = { + callback(Builder.SuccessUnit) Nil } - override def onComplete() = { if (ok) callback(Builder.SuccessUnit); Nil } }).consume(materializer) override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops) @@ -79,15 +79,16 @@ private[akka] case class DuctImpl[In, Out](ops: List[Ast.AstNode]) extends Duct[ materializer.ductConsume(ops) override def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Consumer[In] = - transformRecover(new RecoveryTransformer[Out, Unit] { - var ok = true + transform(new Transformer[Out, Unit] { override def onNext(in: Out) = Nil - override def onErrorRecover(e: Throwable) = { + override def onError(e: Throwable) = { callback(Failure(e)) - ok = false + throw e + } + override def onTermination(e: Option[Throwable]) = { + callback(Builder.SuccessUnit) Nil } - override def onComplete() = { if (ok) callback(Builder.SuccessUnit); Nil } }).consume(materializer) override def build(materializer: FlowMaterializer): (Consumer[In], Producer[Out]) = @@ -145,7 +146,7 @@ private[akka] trait Builder[Out] { def foreach(c: Out ⇒ Unit): Thing[Unit] = transform(new Transformer[Out, Unit] { override def onNext(in: Out) = { c(in); Nil } - override def onComplete() = ListOfUnit + override def onTermination(e: Option[Throwable]) = ListOfUnit override def name = "foreach" }) @@ -156,7 +157,7 @@ private[akka] trait Builder[Out] { // "Parameter type in structural refinement may not refer to an abstract type defined outside that refinement" class FoldTransformer[S](var state: S, f: (S, Out) ⇒ S) extends Transformer[Out, S] { override def onNext(in: Out): immutable.Seq[S] = { state = f(state, in); Nil } - override def onComplete(): immutable.Seq[S] = List(state) + override def onTermination(e: Option[Throwable]): immutable.Seq[S] = List(state) override def name = "fold" } @@ -209,7 +210,7 @@ private[akka] trait Builder[Out] { } else Nil } - override def onComplete() = if (buf.isEmpty) Nil else List(buf) + override def onTermination(e: Option[Throwable]) = if (buf.isEmpty) Nil else List(buf) override def name = "grouped" }) @@ -222,9 +223,6 @@ private[akka] trait Builder[Out] { def transform[U](transformer: Transformer[Out, U]): Thing[U] = andThen(Transform(transformer.asInstanceOf[Transformer[Any, Any]])) - def transformRecover[U](recoveryTransformer: RecoveryTransformer[Out, U]): Thing[U] = - andThen(Recover(recoveryTransformer.asInstanceOf[RecoveryTransformer[Any, Any]])) - def zip[O2](other: Producer[O2]): Thing[(Out, O2)] = andThen(Zip(other.asInstanceOf[Producer[Any]])) def concat[U >: Out](next: Producer[U]): Thing[U] = andThen(Concat(next.asInstanceOf[Producer[Any]])) diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 29e54f038d..6aeea3d871 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -3,74 +3,63 @@ */ package akka.stream.impl -import org.reactivestreams.spi.Subscription import akka.actor.{ Terminated, Props, ActorRef } import akka.stream.MaterializerSettings -import akka.stream.impl._ - -/** - * INTERNAL API - */ -private[akka] object GroupByProcessorImpl { - - sealed trait SubstreamElementState - case object NoPending extends SubstreamElementState - case class PendingElement(elem: Any, key: Any) extends SubstreamElementState - case class PendingElementForNewStream(elem: Any, key: Any) extends SubstreamElementState -} /** * INTERNAL API */ private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { - import GroupByProcessorImpl._ - var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] - var substreamPendingState: SubstreamElementState = NoPending - override def initialTransferState = needsPrimaryInputAndDemand + var pendingSubstreamOutputs: SubstreamOutputs = _ - override def transfer(): TransferState = { - substreamPendingState match { - case PendingElementForNewStream(elem, key) ⇒ - if (primaryOutputs.isClosed) { - substreamPendingState = NoPending - // Just drop, we do not open any more substreams - } else { - val substreamOutput = newSubstream() - primaryOutputs.enqueueOutputElement((key, substreamOutput.processor)) - keyToSubstreamOutputs(key) = substreamOutput - substreamPendingState = PendingElement(elem, key) - } + // No substream is open yet. If downstream cancels now, we are complete + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) + nextPhase(openSubstream(elem, key)) + } - case PendingElement(elem, key) ⇒ - if (keyToSubstreamOutputs(key).isOpen) keyToSubstreamOutputs(key).enqueueOutputElement(elem) - substreamPendingState = NoPending + // some substreams are open now. If downstream cancels, we still continue until the substreams are closed + val waitNext = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + val key = keyFor(elem) - case NoPending ⇒ - val elem = primaryInputs.dequeueInputElement() - val key = keyFor(elem) - - substreamPendingState = keyToSubstreamOutputs.get(key) match { - case Some(substream) if substream.isOpen ⇒ PendingElement(elem, key) - case None if primaryOutputs.isOpen ⇒ PendingElementForNewStream(elem, key) - case _ ⇒ NoPending - } - } - - substreamPendingState match { - case NoPending ⇒ primaryInputs.NeedsInput - case PendingElement(_, key) ⇒ keyToSubstreamOutputs(key).NeedsDemand - case PendingElementForNewStream(_, _) ⇒ primaryOutputs.NeedsDemand + keyToSubstreamOutputs.get(key) match { + case Some(substream) if substream.isOpen ⇒ nextPhase(dispatchToSubstream(elem, keyToSubstreamOutputs(key))) + case None if primaryOutputs.isOpen ⇒ nextPhase(openSubstream(elem, key)) + case _ ⇒ // stay } } + def openSubstream(elem: Any, key: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + if (primaryOutputs.isClosed) { + // Just drop, we do not open any more substreams + nextPhase(waitNext) + } else { + val substreamOutput = newSubstream() + primaryOutputs.enqueueOutputElement((key, substreamOutput.processor)) + keyToSubstreamOutputs(key) = substreamOutput + nextPhase(dispatchToSubstream(elem, substreamOutput)) + } + } + + def dispatchToSubstream(elem: Any, substream: SubstreamOutputs): TransferPhase = { + pendingSubstreamOutputs = substream + TransferPhase(substream.NeedsDemand) { () ⇒ + if (substream.isOpen) substream.enqueueOutputElement(elem) + pendingSubstreamOutputs = null + nextPhase(waitNext) + } + } + + nextPhase(waitFirst) + override def invalidateSubstream(substream: ActorRef): Unit = { - substreamPendingState match { - case PendingElement(_, key) if keyToSubstreamOutputs(key).substream == substream ⇒ - setTransferState(primaryInputs.NeedsInput) - substreamPendingState = NoPending - case _ ⇒ + if ((pendingSubstreamOutputs ne null) && substream == pendingSubstreamOutputs.substream) { + pendingSubstreamOutputs = null + nextPhase(waitNext) } super.invalidateSubstream(substream) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 9a99129c1d..3b8922776f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -7,7 +7,6 @@ import scala.collection.immutable import scala.util.{ Failure, Success } import akka.actor.Props import akka.stream.MaterializerSettings -import akka.stream.RecoveryTransformer import akka.stream.Transformer import scala.util.control.NonFatal @@ -15,45 +14,57 @@ import scala.util.control.NonFatal * INTERNAL API */ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, transformer: Transformer[Any, Any]) extends ActorProcessorImpl(_settings) { - var isComplete = false - var hasOnCompleteRun = false var hasCleanupRun = false // TODO performance improvement: mutable buffer? var emits = immutable.Seq.empty[Any] + var errorEvent: Option[Throwable] = None + + override def onError(e: Throwable): Unit = { + try { + transformer.onError(e) + errorEvent = Some(e) + pump() + } catch { case NonFatal(ex) ⇒ fail(ex) } + } object NeedsInputAndDemandOrCompletion extends TransferState { def isReady = (primaryInputs.inputsAvailable && primaryOutputs.demandAvailable) || primaryInputs.inputsDepleted def isCompleted = false } - override def initialTransferState = NeedsInputAndDemandOrCompletion - - override def transfer(): TransferState = { - val depleted = primaryInputs.inputsDepleted - if (emits.isEmpty) { - isComplete = transformer.isComplete - if (depleted || isComplete) { - emits = transformer.onComplete() - hasOnCompleteRun = true - } else { - val e = primaryInputs.dequeueInputElement() - emits = transformer.onNext(e) - } + val running: TransferPhase = TransferPhase(NeedsInputAndDemandOrCompletion) { () ⇒ + if (primaryInputs.inputsDepleted || transformer.isComplete) { + emits = transformer.onTermination(errorEvent) + emitAndThen(completedPhase) } else { - primaryOutputs.enqueueOutputElement(emits.head) - emits = emits.tail + val e = primaryInputs.dequeueInputElement() + emits = transformer.onNext(e) + emitAndThen(running) } - - if (emits.nonEmpty) primaryOutputs.NeedsDemand - else if (hasOnCompleteRun) Completed - else NeedsInputAndDemandOrCompletion } - override def toString: String = s"Transformer(isComplete=$isComplete, hasOnCompleteRun=$hasOnCompleteRun, emits=$emits, " + - s"transformer=$transformer)" + // Save previous phase we should return to in a var to avoid allocation + var phaseAfterFlush: TransferPhase = _ + + // Enters flushing phase if there are emits pending + def emitAndThen(andThen: TransferPhase): Unit = + if (emits.nonEmpty) { + phaseAfterFlush = andThen + nextPhase(emitting) + } else nextPhase(andThen) + + // Emits all pending elements, then returns to savedPhase + val emitting = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + primaryOutputs.enqueueOutputElement(emits.head) + emits = emits.tail + if (emits.isEmpty) nextPhase(phaseAfterFlush) + } + + nextPhase(running) + + override def toString: String = s"Transformer(emits=$emits, transformer=$transformer)" override def softShutdown(): Unit = { - shutdownReason foreach transformer.onError transformer.cleanup() hasCleanupRun = true // for postStop super.softShutdown() @@ -64,50 +75,6 @@ private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, tran } } -/** - * INTERNAL API - */ -private[akka] class RecoverProcessorImpl(_settings: MaterializerSettings, recoveryTransformer: RecoveryTransformer[Any, Any]) - extends TransformProcessorImpl(_settings, recoveryTransformer) { - - var error: Option[Throwable] = None - - override def transfer(): TransferState = { - val inputDrained = !primaryInputs.inputsAvailable - val depleted = primaryInputs.inputsDepleted - if (emits.isEmpty && error.isDefined && inputDrained) { - val e = error.get - error = None - emits = recoveryTransformer.onErrorRecover(e) - } else if (emits.isEmpty) { - isComplete = recoveryTransformer.isComplete - if (depleted || isComplete) { - emits = recoveryTransformer.onComplete() - hasOnCompleteRun = true - } else { - val e = primaryInputs.dequeueInputElement() - emits = recoveryTransformer.onNext(e) - } - - } else { - primaryOutputs.enqueueOutputElement(emits.head) - emits = emits.tail - } - - if (emits.nonEmpty) primaryOutputs.NeedsDemand - else if (hasOnCompleteRun) Completed - else NeedsInputAndDemandOrCompletion - } - - override def primaryInputOnError(e: Throwable): Unit = { - error = Some(e) - primaryInputs.complete() - context.become(flushing) - pump() - } - -} - /** * INTERNAL API */ @@ -120,10 +87,9 @@ private[akka] object IdentityProcessorImpl { */ private[akka] class IdentityProcessorImpl(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { - override def initialTransferState = needsPrimaryInputAndDemand - override protected def transfer(): TransferState = { + val running: TransferPhase = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) - needsPrimaryInputAndDemand } + nextPhase(running) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 83167dbe2f..d257f3ed64 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -9,67 +9,51 @@ import akka.stream.impl._ import akka.stream.MaterializerSettings import akka.actor.Terminated -/** - * INTERNAL API - */ -private[akka] object SplitWhenProcessorImpl { - - sealed trait SubstreamElementState - case object NoPending extends SubstreamElementState - case class PendingElement(elem: Any) extends SubstreamElementState - case class PendingElementForNewStream(elem: Any) extends SubstreamElementState -} - /** * INTERNAL API */ private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any ⇒ Boolean) extends MultiStreamOutputProcessor(_settings) { - import SplitWhenProcessorImpl._ - var pendingElement: SubstreamElementState = NoPending - var started = false var currentSubstream: SubstreamOutputs = _ - override def initialTransferState = needsPrimaryInputAndDemand - - override def transfer(): TransferState = { - pendingElement match { - case NoPending ⇒ - val elem = primaryInputs.dequeueInputElement() - if (!started) { - pendingElement = PendingElementForNewStream(elem) - started = true - } else if (splitPredicate(elem)) { - pendingElement = PendingElementForNewStream(elem) - currentSubstream.complete() - } else if (currentSubstream.isOpen) { - pendingElement = PendingElement(elem) - } else primaryInputs.NeedsInput - case PendingElement(elem) ⇒ - currentSubstream.enqueueOutputElement(elem) - pendingElement = NoPending - case PendingElementForNewStream(elem) ⇒ - val substreamOutput = newSubstream() - primaryOutputs.enqueueOutputElement(substreamOutput.processor) - currentSubstream = substreamOutput - pendingElement = PendingElement(elem) - } - - pendingElement match { - case NoPending ⇒ primaryInputs.NeedsInput - case PendingElement(_) ⇒ currentSubstream.NeedsDemand - case PendingElementForNewStream(_) ⇒ primaryOutputs.NeedsDemand - } + val waitFirst = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + nextPhase(openSubstream(primaryInputs.dequeueInputElement())) } + def openSubstream(elem: Any): TransferPhase = TransferPhase(primaryOutputs.NeedsDemand) { () ⇒ + val substreamOutput = newSubstream() + primaryOutputs.enqueueOutputElement(substreamOutput.processor) + currentSubstream = substreamOutput + nextPhase(serveSubstreamFirst(currentSubstream, elem)) + } + + // Serving the substream is split into two phases to minimize elements "held in hand" + def serveSubstreamFirst(substream: SubstreamOutputs, elem: Any) = TransferPhase(substream.NeedsDemand) { () ⇒ + substream.enqueueOutputElement(elem) + nextPhase(serveSubstreamRest(substream)) + } + + // Note that this phase is allocated only once per _slice_ and not per element + def serveSubstreamRest(substream: SubstreamOutputs) = TransferPhase(primaryInputs.NeedsInput && substream.NeedsDemand) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + if (splitPredicate(elem)) { + currentSubstream.complete() + currentSubstream = null + nextPhase(openSubstream(elem)) + } else substream.enqueueOutputElement(elem) + } + + // Ignore elements for a cancelled substream until a new substream needs to be opened + val ignoreUntilNewSubstream = TransferPhase(primaryInputs.NeedsInput) { () ⇒ + val elem = primaryInputs.dequeueInputElement() + if (splitPredicate(elem)) nextPhase(openSubstream(elem)) + } + + nextPhase(waitFirst) + override def invalidateSubstream(substream: ActorRef): Unit = { - pendingElement match { - case PendingElement(_) if substream == currentSubstream.substream ⇒ - setTransferState(primaryInputs.NeedsInput) - pendingElement = NoPending - case _ ⇒ - } + if ((currentSubstream ne null) && substream == currentSubstream.substream) nextPhase(ignoreUntilNewSubstream) super.invalidateSubstream(substream) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala index e9bff03e1d..450edd721b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala @@ -13,21 +13,17 @@ import scala.concurrent.forkjoin.ThreadLocalRandom private[akka] class MergeImpl(_settings: MaterializerSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { - lazy val needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && primaryOutputs.NeedsDemand - - override def initialTransferState = needsAnyInputAndDemand - override def transfer(): TransferState = { - // TODO: More flexible merging strategies are possible here. This takes a random element if we have elements - // from both upstreams. - val tieBreak = ThreadLocalRandom.current().nextBoolean() - if (primaryInputs.inputsAvailable && (!secondaryInputs.inputsAvailable || tieBreak)) { - primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) - } else { - primaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement()) + val runningPhase = TransferPhase( + (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && primaryOutputs.NeedsDemand) { () ⇒ + def tieBreak = ThreadLocalRandom.current().nextBoolean() + if (primaryInputs.inputsAvailable && (!secondaryInputs.inputsAvailable || tieBreak)) { + primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) + } else { + primaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement()) + } } - needsAnyInputAndDemand - } + nextPhase(runningPhase) } /** @@ -36,13 +32,12 @@ private[akka] class MergeImpl(_settings: MaterializerSettings, _other: Producer[ private[akka] class ZipImpl(_settings: MaterializerSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { - lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && primaryOutputs.NeedsDemand - - override def initialTransferState = needsBothInputAndDemand - override protected def transfer(): TransferState = { + val runningPhase = TransferPhase(primaryInputs.NeedsInput && secondaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ primaryOutputs.enqueueOutputElement((primaryInputs.dequeueInputElement(), secondaryInputs.dequeueInputElement())) - needsBothInputAndDemand } + + nextPhase(runningPhase) + } /** @@ -51,24 +46,15 @@ private[akka] class ZipImpl(_settings: MaterializerSettings, _other: Producer[An private[akka] class ConcatImpl(_settings: MaterializerSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { - lazy val needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && primaryOutputs.NeedsDemand - lazy val needsSecondaryInputAndDemand = secondaryInputs.NeedsInput && primaryOutputs.NeedsDemand - var processingPrimary = true - - override protected def initialTransferState: TransferState = needsPrimaryInputAndDemandWithComplete - override protected def transfer(): TransferState = { - if (processingPrimary) { - if (primaryInputs.inputsDepleted) { - processingPrimary = false - needsSecondaryInputAndDemand - } else { - primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) - needsPrimaryInputAndDemandWithComplete - } - } else { - primaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement()) - needsSecondaryInputAndDemand - } + val processingPrimary = TransferPhase(primaryInputs.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒ + if (primaryInputs.inputsDepleted) nextPhase(processingSecondary) + else primaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) } + val processingSecondary = TransferPhase(secondaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ + primaryOutputs.enqueueOutputElement(secondaryInputs.dequeueInputElement()) + } + + nextPhase(processingPrimary) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala index ac63f918a0..9438058215 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanouts.scala @@ -15,58 +15,46 @@ import org.reactivestreams.spi.Subscription private[akka] class TeeImpl(_settings: MaterializerSettings, other: Consumer[Any]) extends ActorProcessorImpl(_settings) { - lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && primaryOutputs.NeedsDemand - - override def initialTransferState = needsBothInputAndDemand - - override def primaryOutputsReady(): Unit = { - primaryOutputs.addSubscriber(other.getSubscriber) - super.primaryOutputsReady() - } - - override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize) { + override val primaryOutputs = new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { var hasOtherSubscription = false var hasDownstreamSubscription = false var pendingRemoveSubscription: List[S] = Nil - override type S = ActorSubscription[Any] - override def createSubscription(subscriber: Subscriber[Any]): S = - new ActorSubscription(self, subscriber) - override def afterShutdown(completed: Boolean): Unit = { - primaryOutputsFinished(completed) - } + registerSubscriber(other.getSubscriber) - override val NeedsDemand: TransferState = new TransferState { - def isReady = demandAvailable - def isCompleted = isClosed - } - - override def addSubscriber(subscriber: Subscriber[Any]): Unit = { - super.addSubscriber(subscriber) + override def registerSubscriber(subscriber: Subscriber[Any]): Unit = { + super.registerSubscriber(subscriber) if (subscriber == other.getSubscriber) hasOtherSubscription = true else hasDownstreamSubscription = true if (pendingRemoveSubscription.nonEmpty && hasOtherSubscription && hasDownstreamSubscription) { - pendingRemoveSubscription foreach removeSubscription + pendingRemoveSubscription foreach unregisterSubscription pendingRemoveSubscription = Nil } } - override def removeSubscription(subscription: S): Unit = { + override def unregisterSubscription(subscription: S): Unit = { // make sure that we don't shutdown because of premature cancel if (hasOtherSubscription && hasDownstreamSubscription) - super.removeSubscription(subscription) + super.unregisterSubscription(subscription) else pendingRemoveSubscription :+= subscription // defer these until both subscriptions have been registered } + + override def afterShutdown(): Unit = { + primaryOutputsShutdown = true + shutdownHooks() + } } - override def transfer(): TransferState = { + var running = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ val in = primaryInputs.dequeueInputElement() primaryOutputs.enqueueOutputElement(in) - needsBothInputAndDemand } + + nextPhase(running) + } diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index f522ae999a..0011eacf8b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -4,7 +4,7 @@ package akka.stream.impl import akka.stream.MaterializerSettings -import akka.actor.{ Terminated, ActorRef } +import akka.actor.{ Actor, Terminated, ActorRef } import org.reactivestreams.spi.{ Subscriber, Subscription } import org.reactivestreams.api.Producer @@ -37,8 +37,12 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS private var completed: Boolean = false private var demands: Int = 0 - val substream = context.watch(context.actorOf(IdentityProcessorImpl.props(settings). - withDispatcher(context.props.dispatcher))) + override def receive: SubReceive = + throw new UnsupportedOperationException("Substream outputs are managed in a dedicated receive block") + + val substream = context.watch(context.actorOf( + IdentityProcessorImpl.props(settings) + .withDispatcher(context.props.dispatcher))) val processor = new ActorProcessor[AnyRef, AnyRef](substream) override def isClosed: Boolean = completed @@ -76,12 +80,12 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS outputs } - def fullyCompleted: Boolean = isShuttingDown && isPumpFinished && context.children.isEmpty + def fullyCompleted: Boolean = primaryOutputsShutdown && isPumpFinished && context.children.isEmpty protected def invalidateSubstream(substream: ActorRef): Unit = { substreamOutputs(substream).complete() substreamOutputs -= substream - if (fullyCompleted) shutdown() + shutdownHooks() pump() } @@ -90,19 +94,15 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS super.fail(e) } - override def primaryOutputsFinished(completed: Boolean): Unit = { - // If the master stream is cancelled (no one consumes substreams as elements from the master stream) - // then this callback does not mean we are shutting down - // We can only shut down after all substreams (our children) are closed - if (fullyCompleted) shutdown() - } + // FIXME: proper shutdown scheduling + override def shutdownHooks(): Unit = if (fullyCompleted) super.shutdownHooks() override def pumpFinished(): Unit = { context.children foreach (_ ! OnComplete) super.pumpFinished() } - override val downstreamManagement: Receive = super.downstreamManagement orElse { + val substreamManagement: Receive = { case SubstreamRequestMore(key, demand) ⇒ substreamOutputs(key).enqueueOutputDemand(demand) pump() @@ -110,6 +110,8 @@ private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerS case Terminated(child) ⇒ invalidateSubstream(child) } + + override def receive = primaryInputs.subreceive orElse primaryOutputs.receive orElse substreamManagement } /** @@ -135,51 +137,31 @@ private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSett extends ActorProcessorImpl(_settings) { import TwoStreamInputProcessor._ - var secondaryInputs: Inputs = _ + val secondaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, this) { + override val subreceive: SubReceive = new SubReceive(waitingForUpstream) - override def primaryOutputsReady(): Unit = { - other.getPublisher.subscribe(new OtherActorSubscriber(self)) - super.primaryOutputsReady() + override def inputOnError(e: Throwable): Unit = TwoStreamInputProcessor.this.onError(e) + + override def waitingForUpstream: Receive = { + case OtherStreamOnComplete ⇒ onComplete() + case OtherStreamOnSubscribe(subscription) ⇒ onSubscribe(subscription) + } + + override def upstreamRunning: Receive = { + case OtherStreamOnNext(element) ⇒ enqueueInputElement(element) + case OtherStreamOnComplete ⇒ onComplete() + } + override protected def completed: Actor.Receive = { + case OtherStreamOnSubscribe(_) ⇒ throw new IllegalStateException("Cannot subscribe shutdown subscriber") + } } - override def waitingForUpstream: Receive = super.waitingForUpstream orElse { - case OtherStreamOnComplete ⇒ - secondaryInputs = EmptyInputs - transitionToRunningWhenReady() - case OtherStreamOnSubscribe(subscription) ⇒ - assert(subscription != null) - secondaryInputs = new BatchingInputBuffer(subscription, settings.initialInputBufferSize) - transitionToRunningWhenReady() - } + override def receive = secondaryInputs.subreceive orElse primaryInputs.subreceive orElse primaryOutputs.receive - override def running: Receive = super.running orElse { - case OtherStreamOnNext(element) ⇒ - secondaryInputs.enqueueInputElement(element) - pump() - case OtherStreamOnComplete ⇒ - secondaryInputs.complete() - primaryInputOnComplete() - pump() - } + other.getPublisher.subscribe(new OtherActorSubscriber(self)) - override def primaryInputOnComplete(): Unit = { - if (secondaryInputs.isClosed && primaryInputs.isClosed) - super.primaryInputOnComplete() + override def shutdownHooks(): Unit = { + secondaryInputs.cancel() + super.shutdownHooks() } - - override def transitionToRunningWhenReady(): Unit = if ((primaryInputs ne null) && (secondaryInputs ne null)) { - secondaryInputs.prefetch() - super.transitionToRunningWhenReady() - } - - override def fail(cause: Throwable): Unit = { - if (secondaryInputs ne null) secondaryInputs.cancel() - super.fail(cause) - } - - override def primaryOutputsFinished(completed: Boolean) { - if (secondaryInputs ne null) secondaryInputs.cancel() - super.primaryOutputsFinished(completed) - } - } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala index da5c57ef17..cc2fd40467 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Transfer.scala @@ -6,7 +6,21 @@ package akka.stream.impl import org.reactivestreams.spi.{ Subscriber, Subscription } import java.util.Arrays import scala.util.control.NonFatal -import akka.actor.ActorRefFactory +import akka.actor.{ Actor, ActorRefFactory } + +/** + * INTERNAL API + */ +private[akka] class SubReceive(initial: Actor.Receive) extends Actor.Receive { + private var currentReceive = initial + + override def isDefinedAt(msg: Any): Boolean = currentReceive.isDefinedAt(msg) + override def apply(msg: Any): Unit = currentReceive.apply(msg) + + def become(newBehavior: Actor.Receive): Unit = { + currentReceive = newBehavior + } +} /** * INTERNAL API @@ -15,16 +29,14 @@ private[akka] trait Inputs { def NeedsInput: TransferState def NeedsInputOrComplete: TransferState - def enqueueInputElement(elem: Any): Unit def dequeueInputElement(): Any + def subreceive: SubReceive + def cancel(): Unit - def complete(): Unit def isClosed: Boolean def isOpen: Boolean = !isClosed - def prefetch(): Unit - def inputsDepleted: Boolean def inputsAvailable: Boolean } @@ -53,6 +65,8 @@ private[akka] trait Outputs { def demandAvailable: Boolean def enqueueOutputElement(elem: Any): Unit + def receive: SubReceive + def complete(): Unit def cancel(e: Throwable): Unit def isClosed: Boolean @@ -112,24 +126,7 @@ private[akka] object NotInitialized extends TransferState { /** * INTERNAL API */ -private[akka] object EmptyInputs extends Inputs { - override def inputsAvailable: Boolean = false - override def inputsDepleted: Boolean = true - override def isClosed: Boolean = true - - override def complete(): Unit = () - override def cancel(): Unit = () - override def prefetch(): Unit = () - - override def dequeueInputElement(): Any = throw new UnsupportedOperationException("Cannot dequeue from EmptyInputs") - override def enqueueInputElement(elem: Any): Unit = throw new UnsupportedOperationException("Cannot enqueue to EmptyInputs") - - override val NeedsInputOrComplete: TransferState = new TransferState { - override def isReady: Boolean = true - override def isCompleted: Boolean = false - } - override val NeedsInput: TransferState = Completed -} +private[akka] case class TransferPhase(precondition: TransferState)(val action: () ⇒ Unit) /** * INTERNAL API @@ -137,15 +134,25 @@ private[akka] object EmptyInputs extends Inputs { private[akka] trait Pump { protected def pumpContext: ActorRefFactory private var transferState: TransferState = NotInitialized - def setTransferState(t: TransferState): Unit = transferState = t + private var currentAction: () ⇒ Unit = + () ⇒ throw new IllegalStateException("Pump has been not initialized with a phase") - def isPumpFinished: Boolean = transferState.isCompleted + final def nextPhase(phase: TransferPhase): Unit = { + transferState = phase.precondition + currentAction = phase.action + } + + final def isPumpFinished: Boolean = transferState.isCompleted + + protected final val completedPhase = TransferPhase(Completed) { + () ⇒ throw new IllegalStateException("The action of completed phase must be never executed") + } // Exchange input buffer elements and output buffer "requests" until one of them becomes empty. // Generate upstream requestMore for every Nth consumed input element final def pump(): Unit = { try while (transferState.isExecutable) { - transferState = ActorBasedFlowMaterializer.withCtx(pumpContext)(transfer()) + ActorBasedFlowMaterializer.withCtx(pumpContext)(currentAction()) } catch { case NonFatal(e) ⇒ pumpFailed(e) } if (isPumpFinished) pumpFinished() @@ -154,107 +161,5 @@ private[akka] trait Pump { protected def pumpFailed(e: Throwable): Unit protected def pumpFinished(): Unit - // Needs to be implemented by Processor implementations. Transfers elements from the input buffer to the output - // buffer. - protected def transfer(): TransferState } -/** - * INTERNAL API - */ -private[akka] class BatchingInputBuffer(val upstream: Subscription, val size: Int) extends DefaultInputTransferStates { - // TODO: buffer and batch sizing heuristics - private var inputBuffer = Array.ofDim[AnyRef](size) - private var inputBufferElements = 0 - private var nextInputElementCursor = 0 - private var upstreamCompleted = false - private val IndexMask = size - 1 - - private def requestBatchSize = math.max(1, inputBuffer.length / 2) - private var batchRemaining = requestBatchSize - - override def prefetch(): Unit = upstream.requestMore(inputBuffer.length) - - override def dequeueInputElement(): Any = { - val elem = inputBuffer(nextInputElementCursor) - inputBuffer(nextInputElementCursor) = null - - batchRemaining -= 1 - if (batchRemaining == 0 && !upstreamCompleted) { - upstream.requestMore(requestBatchSize) - batchRemaining = requestBatchSize - } - - inputBufferElements -= 1 - nextInputElementCursor += 1 - nextInputElementCursor &= IndexMask - elem - } - - override def enqueueInputElement(elem: Any): Unit = - if (isOpen) { - if (inputBufferElements == size) throw new IllegalStateException("Input buffer overrun") - inputBuffer((nextInputElementCursor + inputBufferElements) & IndexMask) = elem.asInstanceOf[AnyRef] - inputBufferElements += 1 - } - - override def complete(): Unit = upstreamCompleted = true - override def cancel(): Unit = { - if (!upstreamCompleted) { - upstreamCompleted = true - upstream.cancel() - clear() - } - } - override def isClosed: Boolean = upstreamCompleted - - private def clear(): Unit = { - Arrays.fill(inputBuffer, 0, inputBuffer.length, null) - inputBufferElements = 0 - } - - override def inputsDepleted = upstreamCompleted && inputBufferElements == 0 - override def inputsAvailable = inputBufferElements > 0 - -} - -/** - * INTERNAL API - */ -private[akka] abstract class FanoutOutputs(val maxBufferSize: Int, val initialBufferSize: Int) extends DefaultOutputTransferStates with SubscriberManagement[Any] { - private var downstreamBufferSpace = 0 - private var downstreamCompleted = false - def demandAvailable = downstreamBufferSpace > 0 - - def enqueueOutputDemand(demand: Int): Unit = downstreamBufferSpace += demand - def enqueueOutputElement(elem: Any): Unit = { - downstreamBufferSpace -= 1 - pushToDownstream(elem) - } - - def complete(): Unit = - if (!downstreamCompleted) { - downstreamCompleted = true - completeDownstream() - } - - def cancel(e: Throwable): Unit = - if (!downstreamCompleted) { - downstreamCompleted = true - abortDownstream(e) - } - - def isClosed: Boolean = downstreamCompleted - - def handleRequest(subscription: S, elements: Int): Unit = super.moreRequested(subscription, elements) - def addSubscriber(subscriber: Subscriber[Any]): Unit = super.registerSubscriber(subscriber) - def removeSubscription(subscription: S): Unit = super.unregisterSubscription(subscription) - - def afterShutdown(completed: Boolean): Unit - - override protected def requestFromUpstream(elements: Int): Unit = enqueueOutputDemand(elements) - override protected def shutdown(completed: Boolean): Unit = afterShutdown(completed) - override protected def cancelUpstream(): Unit = { - downstreamCompleted = true - } -} diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala index 3baa900ec7..70b9ef6b7b 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpConnectionStream.scala @@ -28,39 +28,95 @@ private[akka] object TcpStreamActor { /** * INTERNAL API */ -private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor - with PrimaryInputs - with PrimaryOutputs { +private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) extends Actor { import TcpStreamActor._ - def connection: ActorRef - val tcpInputs = new DefaultInputTransferStates { + val primaryInputs: Inputs = new BatchingInputBuffer(settings.initialInputBufferSize, writePump) { + override def inputOnError(e: Throwable): Unit = fail(e) + } + + val primaryOutputs: Outputs = + new FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, readPump) { + override def afterShutdown(): Unit = { + tcpInputs.cancel() + TcpStreamActor.this.tryShutdown() + } + } + + object tcpInputs extends DefaultInputTransferStates { private var closed: Boolean = false private var pendingElement: ByteString = null + private var connection: ActorRef = _ + + val subreceive = new SubReceive(Actor.emptyBehavior) + + def setConnection(c: ActorRef): Unit = { + connection = c + // Prefetch + c ! ResumeReading + subreceive.become(handleRead) + readPump.pump() + } + + def handleRead: Receive = { + case Received(data) ⇒ + pendingElement = data + readPump.pump() + case Closed ⇒ + closed = true + tcpOutputs.complete() + writePump.pump() + readPump.pump() + case ConfirmedClosed ⇒ + closed = true + readPump.pump() + case PeerClosed ⇒ + closed = true + readPump.pump() + case ErrorClosed(cause) ⇒ fail(new TcpStreamException(s"The connection closed with error $cause")) + case CommandFailed(cmd) ⇒ fail(new TcpStreamException(s"Tcp command [$cmd] failed")) + case Aborted ⇒ fail(new TcpStreamException("The connection has been aborted")) + } override def inputsAvailable: Boolean = pendingElement ne null override def inputsDepleted: Boolean = closed && !inputsAvailable - override def prefetch(): Unit = connection ! ResumeReading override def isClosed: Boolean = closed - override def complete(): Unit = closed = true + override def cancel(): Unit = { closed = true pendingElement = null } + override def dequeueInputElement(): Any = { val elem = pendingElement pendingElement = null connection ! ResumeReading elem } - override def enqueueInputElement(elem: Any): Unit = pendingElement = elem.asInstanceOf[ByteString] } object tcpOutputs extends DefaultOutputTransferStates { private var closed: Boolean = false private var pendingDemand = true + private var connection: ActorRef = _ + + def setConnection(c: ActorRef): Unit = { + connection = c + writePump.pump() + receive.become(handleWrite) + } + + val receive = new SubReceive(Actor.emptyBehavior) + + def handleWrite: Receive = { + case WriteAck ⇒ + pendingDemand = true + writePump.pump() + + } + override def isClosed: Boolean = closed override def cancel(e: Throwable): Unit = { if (!closed) connection ! Abort @@ -74,90 +130,54 @@ private[akka] abstract class TcpStreamActor(val settings: MaterializerSettings) connection ! Write(elem.asInstanceOf[ByteString], WriteAck) pendingDemand = false } - def enqueueDemand(): Unit = pendingDemand = true override def demandAvailable: Boolean = pendingDemand } object writePump extends Pump { - lazy val NeedsInputAndDemand = primaryInputs.NeedsInput && tcpOutputs.NeedsDemand - override protected def transfer(): TransferState = { + + def running = TransferPhase(primaryInputs.NeedsInput && tcpOutputs.NeedsDemand) { () ⇒ var batch = ByteString.empty while (primaryInputs.inputsAvailable) batch ++= primaryInputs.dequeueInputElement().asInstanceOf[ByteString] tcpOutputs.enqueueOutputElement(batch) - NeedsInputAndDemand } - override protected def pumpFinished(): Unit = tcpOutputs.complete() + + override protected def pumpFinished(): Unit = { + tcpOutputs.complete() + tryShutdown() + } override protected def pumpFailed(e: Throwable): Unit = fail(e) override protected def pumpContext: ActorRefFactory = context } object readPump extends Pump { - lazy val NeedsInputAndDemand = tcpInputs.NeedsInput && primaryOutputs.NeedsDemand - override protected def transfer(): TransferState = { + + def running = TransferPhase(tcpInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ primaryOutputs.enqueueOutputElement(tcpInputs.dequeueInputElement()) - NeedsInputAndDemand } - override protected def pumpFinished(): Unit = primaryOutputs.complete() + + override protected def pumpFinished(): Unit = { + primaryOutputs.complete() + tryShutdown() + } override protected def pumpFailed(e: Throwable): Unit = fail(e) override protected def pumpContext: ActorRefFactory = context } - override def pumpInputs(): Unit = writePump.pump() - override def pumpOutputs(): Unit = readPump.pump() + override def receive = + primaryInputs.subreceive orElse primaryOutputs.receive orElse tcpInputs.subreceive orElse tcpOutputs.receive - override def receive = waitingExposedPublisher - - override def primaryInputOnError(e: Throwable): Unit = fail(e) - override def primaryInputOnComplete(): Unit = shutdown() - override def primaryInputsReady(): Unit = { - connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) - readPump.setTransferState(readPump.NeedsInputAndDemand) - writePump.setTransferState(writePump.NeedsInputAndDemand) - tcpInputs.prefetch() - context.become(running) - } - - override def primaryOutputsReady(): Unit = context.become(downstreamManagement orElse waitingForUpstream) - override def primaryOutputsFinished(completed: Boolean): Unit = shutdown() - - val running: Receive = upstreamManagement orElse downstreamManagement orElse { - case WriteAck ⇒ - tcpOutputs.enqueueDemand() - pumpInputs() - case Received(data) ⇒ - tcpInputs.enqueueInputElement(data) - pumpOutputs() - case Closed ⇒ - tcpInputs.complete() - tcpOutputs.complete() - writePump.pump() - readPump.pump() - case ConfirmedClosed ⇒ - tcpInputs.complete() - pumpOutputs() - case PeerClosed ⇒ - tcpInputs.complete() - pumpOutputs() - case ErrorClosed(cause) ⇒ fail(new TcpStreamException(s"The connection closed with error $cause")) - case CommandFailed(cmd) ⇒ fail(new TcpStreamException(s"Tcp command [$cmd] failed")) - case Aborted ⇒ fail(new TcpStreamException("The connection has been aborted")) - } + readPump.nextPhase(readPump.running) + writePump.nextPhase(writePump.running) def fail(e: Throwable): Unit = { tcpInputs.cancel() tcpOutputs.cancel(e) - if (primaryInputs ne null) primaryInputs.cancel() + primaryInputs.cancel() primaryOutputs.cancel(e) - exposedPublisher.shutdown(Some(e)) } - def shutdown(): Unit = { - if (tcpOutputs.isClosed && primaryOutputs.isClosed) { - context.stop(self) - exposedPublisher.shutdown(None) - } - } + def tryShutdown(): Unit = if (primaryInputs.isClosed && tcpInputs.isClosed && tcpOutputs.isClosed) context.stop(self) } @@ -168,6 +188,9 @@ private[akka] class InboundTcpStreamActor( val connection: ActorRef, _settings: MaterializerSettings) extends TcpStreamActor(_settings) { + connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) + tcpInputs.setConnection(connection) + tcpOutputs.setConnection(connection) } /** @@ -176,23 +199,26 @@ private[akka] class InboundTcpStreamActor( private[akka] class OutboundTcpStreamActor(val connectCmd: Connect, val requester: ActorRef, _settings: MaterializerSettings) extends TcpStreamActor(_settings) { import TcpStreamActor._ - var connection: ActorRef = _ import context.system - override def primaryOutputsReady(): Unit = context.become(waitingExposedProcessor) + val initSteps = new SubReceive(waitingExposedProcessor) - val waitingExposedProcessor: Receive = { + override def receive = initSteps orElse super.receive + + def waitingExposedProcessor: Receive = { case StreamTcpManager.ExposedProcessor(processor) ⇒ IO(Tcp) ! connectCmd - context.become(waitConnection(processor)) - case _ ⇒ throw new IllegalStateException("The second message must be ExposedProcessor") + initSteps.become(waitConnection(processor)) } def waitConnection(exposedProcessor: Processor[ByteString, ByteString]): Receive = { case Connected(remoteAddress, localAddress) ⇒ - connection = sender() + val connection = sender() + connection ! Register(self, keepOpenOnPeerClosed = true, useResumeWriting = false) + tcpOutputs.setConnection(connection) + tcpInputs.setConnection(connection) requester ! StreamTcp.OutgoingTcpConnection(remoteAddress, localAddress, exposedProcessor) - context.become(downstreamManagement orElse waitingForUpstream) + initSteps.become(Actor.emptyBehavior) case f: CommandFailed ⇒ val ex = new TcpStreamException("Connection failed.") requester ! Status.Failure(ex) diff --git a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala index dd6871677c..79a57ef874 100644 --- a/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala +++ b/akka-stream/src/main/scala/akka/stream/io/TcpListenStreamActor.scala @@ -34,71 +34,65 @@ private[akka] object TcpListenStreamActor { * INTERNAL API */ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, val settings: MaterializerSettings) extends Actor - with PrimaryOutputs with Pump { + with Pump { import TcpListenStreamActor._ import context.system - var listener: ActorRef = _ + object primaryOutputs extends FanoutOutputs(settings.maxFanOutBufferSize, settings.initialFanOutBufferSize, self, pump = this) { + override def afterShutdown(): Unit = { + incomingConnections.cancel() + context.stop(self) + } - override def receive: Actor.Receive = waitingExposedPublisher - override def primaryOutputsReady(): Unit = { - IO(Tcp) ! bindCmd.copy(handler = self) - context.become(waitBound) - } + override def waitingExposedPublisher: Actor.Receive = { + case ExposedPublisher(publisher) ⇒ + exposedPublisher = publisher + IO(Tcp) ! bindCmd.copy(handler = self) + receive.become(downstreamRunning) + case other ⇒ + throw new IllegalStateException(s"The first message must be ExposedPublisher but was [$other]") + } - val waitBound: Receive = { - case Bound(localAddress) ⇒ - listener = sender() - setTransferState(NeedsInputAndDemand) - incomingConnections.prefetch() - requester ! StreamTcp.TcpServerBinding( - localAddress, - ConnectionProducer(exposedPublisher.asInstanceOf[Publisher[StreamTcp.IncomingTcpConnection]])) - context.become(running) - case f: CommandFailed ⇒ - val ex = new TcpListenStreamException("Bind failed") - requester ! Status.Failure(ex) - fail(ex) - } - - val running: Receive = downstreamManagement orElse { - case c: Connected ⇒ - incomingConnections.enqueueInputElement((c, sender())) - pump() - case f: CommandFailed ⇒ - fail(new TcpListenStreamException(s"Command [${f.cmd}] failed")) - } - - override def pumpOutputs(): Unit = pump() - - override def primaryOutputsFinished(completed: Boolean): Unit = shutdown() - - lazy val NeedsInputAndDemand = primaryOutputs.NeedsDemand && incomingConnections.NeedsInput - - override protected def transfer(): TransferState = { - val (connected, connection) = incomingConnections.dequeueInputElement().asInstanceOf[(Connected, ActorRef)] - val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, settings)) - val processor = new ActorProcessor[ByteString, ByteString](tcpStreamActor) - primaryOutputs.enqueueOutputElement(StreamTcp.IncomingTcpConnection(connected.remoteAddress, processor, processor)) - NeedsInputAndDemand + def getExposedPublisher = exposedPublisher } override protected def pumpFinished(): Unit = incomingConnections.cancel() override protected def pumpFailed(e: Throwable): Unit = fail(e) override protected def pumpContext: ActorRefFactory = context - val incomingConnections = new DefaultInputTransferStates { + val incomingConnections: Inputs = new DefaultInputTransferStates { + var listener: ActorRef = _ private var closed: Boolean = false private var pendingConnection: (Connected, ActorRef) = null + def waitBound: Receive = { + case Bound(localAddress) ⇒ + listener = sender() + nextPhase(runningPhase) + listener ! ResumeAccepting(1) + requester ! StreamTcp.TcpServerBinding( + localAddress, + ConnectionProducer(primaryOutputs.getExposedPublisher.asInstanceOf[Publisher[StreamTcp.IncomingTcpConnection]])) + subreceive.become(running) + case f: CommandFailed ⇒ + val ex = new TcpListenStreamException("Bind failed") + requester ! Status.Failure(ex) + fail(ex) + } + + def running: Receive = { + case c: Connected ⇒ + pendingConnection = (c, sender()) + pump() + case f: CommandFailed ⇒ + fail(new TcpListenStreamException(s"Command [${f.cmd}] failed")) + } + + override val subreceive = new SubReceive(waitBound) + override def inputsAvailable: Boolean = pendingConnection ne null override def inputsDepleted: Boolean = closed && !inputsAvailable - override def prefetch(): Unit = listener ! ResumeAccepting(1) override def isClosed: Boolean = closed - override def complete(): Unit = { - if (!closed && listener != null) listener ! Unbind - closed = true - } override def cancel(): Unit = { if (!closed && listener != null) listener ! Unbind closed = true @@ -110,19 +104,21 @@ private[akka] class TcpListenStreamActor(bindCmd: Tcp.Bind, requester: ActorRef, listener ! ResumeAccepting(1) elem } - override def enqueueInputElement(elem: Any): Unit = pendingConnection = elem.asInstanceOf[(Connected, ActorRef)] } + override def receive: Actor.Receive = primaryOutputs.receive orElse incomingConnections.subreceive + + def runningPhase = TransferPhase(primaryOutputs.NeedsDemand && incomingConnections.NeedsInput) { () ⇒ + val (connected: Connected, connection: ActorRef) = incomingConnections.dequeueInputElement() + val tcpStreamActor = context.actorOf(TcpStreamActor.inboundProps(connection, settings)) + val processor = new ActorProcessor[ByteString, ByteString](tcpStreamActor) + primaryOutputs.enqueueOutputElement(StreamTcp.IncomingTcpConnection(connected.remoteAddress, processor, processor)) + } + def fail(e: Throwable): Unit = { incomingConnections.cancel() primaryOutputs.cancel(e) - exposedPublisher.shutdown(Some(e)) } - def shutdown(): Unit = { - incomingConnections.complete() - primaryOutputs.complete() - exposedPublisher.shutdown(None) - } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala index 0ff434220e..0b554a4c98 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Duct.scala @@ -13,7 +13,6 @@ import akka.japi.Function2 import akka.japi.Procedure import akka.japi.Util.immutableSeq import akka.stream.FlowMaterializer -import akka.stream.RecoveryTransformer import akka.stream.Transformer import akka.stream.scaladsl.{ Duct ⇒ SDuct } @@ -124,16 +123,6 @@ abstract class Duct[In, Out] { */ def transform[U](transformer: Transformer[Out, U]): Duct[In, U] - /** - * This transformation stage works exactly like [[#transform]] with the - * change that failure signaled from upstream will invoke - * [[akka.stream.RecoveryTransformer#onError]], which can emit an additional sequence of - * elements before the stream ends. - * - * After normal completion or error the [[akka.stream.RecoveryTransformer#cleanup]] function is called. - */ - def transformRecover[U](recoveryTransformer: RecoveryTransformer[Out, U]): Duct[In, U] - /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -270,9 +259,6 @@ private[akka] class DuctAdapter[In, T](delegate: SDuct[In, T]) extends Duct[In, override def transform[U](transformer: Transformer[T, U]): Duct[In, U] = new DuctAdapter(delegate.transform(transformer)) - override def transformRecover[U](transformer: RecoveryTransformer[T, U]): Duct[In, U] = - new DuctAdapter(delegate.transformRecover(transformer)) - override def groupBy[K](f: Function[T, K]): Duct[In, Pair[K, Producer[T]]] = new DuctAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5b210333e2..37e878006e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -17,7 +17,6 @@ import akka.japi.Util.immutableSeq import akka.stream.FlowMaterializer import akka.stream.scaladsl.{ Flow ⇒ SFlow } import akka.stream.Transformer -import akka.stream.RecoveryTransformer import org.reactivestreams.api.Consumer /** @@ -178,16 +177,6 @@ abstract class Flow[T] { */ def transform[U](transformer: Transformer[T, U]): Flow[U] - /** - * This transformation stage works exactly like [[#transform]] with the - * change that failure signaled from upstream will invoke - * [[akka.stream.RecoveryTransformer#onError]], which can emit an additional sequence of - * elements before the stream ends. - * - * After normal completion or error the [[akka.stream.RecoveryTransformer#cleanup]] function is called. - */ - def transformRecover[U](transformer: RecoveryTransformer[T, U]): Flow[U] - /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element @@ -343,9 +332,6 @@ private[akka] class FlowAdapter[T](delegate: SFlow[T]) extends Flow[T] { override def transform[U](transformer: Transformer[T, U]): Flow[U] = new FlowAdapter(delegate.transform(transformer)) - override def transformRecover[U](transformer: RecoveryTransformer[T, U]): Flow[U] = - new FlowAdapter(delegate.transformRecover(transformer)) - override def groupBy[K](f: Function[T, K]): Flow[Pair[K, Producer[T]]] = new FlowAdapter(delegate.groupBy(f.apply).map { case (k, p) ⇒ Pair(k, p) }) // FIXME optimize to one step diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala index 9d4988e773..49a6c4960e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Duct.scala @@ -9,7 +9,6 @@ import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import akka.stream.FlowMaterializer -import akka.stream.RecoveryTransformer import akka.stream.Transformer import akka.stream.impl.DuctImpl @@ -116,17 +115,6 @@ trait Duct[In, +Out] { */ def transform[U](transformer: Transformer[Out, U]): Duct[In, U] - /** - * This transformation stage works exactly like [[#transform]] with the - * change that failure signaled from upstream will invoke - * [[RecoveryTransformer#onError]], which can emit an additional sequence of - * elements before the stream ends. - * - * After normal completion or error the [[RecoveryTransformer#cleanup]] function - * is called. - */ - def transformRecover[U](recoveryTransformer: RecoveryTransformer[Out, U]): Duct[In, U] - /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index ac3bbc24fb..bf5e71b945 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,7 +10,6 @@ import scala.util.Try import org.reactivestreams.api.Consumer import org.reactivestreams.api.Producer import akka.stream.FlowMaterializer -import akka.stream.RecoveryTransformer import akka.stream.Transformer import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.Ast.FutureProducerNode @@ -175,16 +174,6 @@ trait Flow[+T] { */ def transform[U](transformer: Transformer[T, U]): Flow[U] - /** - * This transformation stage works exactly like [[#transform]] with the - * change that failure signaled from upstream will invoke - * [[akka.stream.RecoveryTransformer#onErrorRecover]], which can emit an additional sequence of - * elements before the stream ends. - * - * [[akka.stream.Transformer#onError]] is not called when failure is signaled from upstream. - */ - def transformRecover[U](recoveryTransformer: RecoveryTransformer[T, U]): Flow[U] - /** * This operation demultiplexes the incoming stream into separate output * streams, one for each element key. The key is computed for each element diff --git a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java index 06d354a393..f3daa47715 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/DuctTest.java @@ -29,7 +29,6 @@ import akka.japi.Procedure; import akka.japi.Util; import akka.stream.FlowMaterializer; import akka.stream.MaterializerSettings; -import akka.stream.RecoveryTransformer; import akka.stream.Transformer; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; diff --git a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java index dea2aa5ce9..fc4bef8ed6 100644 --- a/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream/src/test/java/akka/stream/javadsl/FlowTest.java @@ -17,6 +17,7 @@ import static org.junit.Assert.assertEquals; import org.reactivestreams.api.Producer; +import scala.Option; import scala.concurrent.Await; import scala.concurrent.Future; import scala.concurrent.duration.FiniteDuration; @@ -28,7 +29,6 @@ import akka.japi.Procedure; import akka.japi.Util; import akka.stream.FlowMaterializer; import akka.stream.MaterializerSettings; -import akka.stream.RecoveryTransformer; import akka.stream.Transformer; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; @@ -119,7 +119,7 @@ public class FlowTest { } @Override - public scala.collection.immutable.Seq onComplete() { + public scala.collection.immutable.Seq onTermination(Option e) { return Util.immutableSingletonSeq(sum); } @@ -156,7 +156,7 @@ public class FlowTest { else return elem + elem; } - }).transformRecover(new RecoveryTransformer() { + }).transform(new Transformer() { @Override public scala.collection.immutable.Seq onNext(Integer element) { @@ -164,20 +164,19 @@ public class FlowTest { } @Override - public scala.collection.immutable.Seq onErrorRecover(Throwable e) { - return Util.immutableSingletonSeq(e.getMessage()); + public scala.collection.immutable.Seq onTermination(Option e) { + if (e.isEmpty()) return Util.immutableSeq(new String[0]); + else return Util.immutableSingletonSeq(e.get().getMessage()); } + @Override + public void onError(Throwable e) {} + @Override public boolean isComplete() { return false; } - @Override - public scala.collection.immutable.Seq onComplete() { - return Util.immutableSeq(new String[0]); - } - @Override public void cleanup() { } diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index 4a62c28973..1e65e1bee5 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -81,18 +81,14 @@ class FlowConcatSpec extends TwoStreamsSetup { consumer1.expectErrorOrSubscriptionFollowedByError(TestException) val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher) - val subscription2 = consumer2.expectSubscription() - subscription2.requestMore(5) consumer2.expectErrorOrSubscriptionFollowedByError(TestException) } "work with one delayed failed and one nonempty producer" in { val consumer1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator)) - val subscription1 = consumer1.expectSubscription() consumer1.expectErrorOrSubscriptionFollowedByError(TestException) val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher) - val subscription2 = consumer2.expectSubscription() consumer2.expectErrorOrSubscriptionFollowedByError(TestException) } diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 2fd23ceccd..61530b9fed 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -204,9 +204,7 @@ class FlowGroupBySpec extends AkkaSpec { val consumer = StreamTestKit.consumerProbe[(Int, Producer[Int])] producer.produceTo(consumer) - val subscription = consumer.expectSubscription() - subscription.requestMore(100) - consumer.expectComplete() + consumer.expectCompletedOrSubscriptionFollowedByComplete() } "abort on onError from upstream" in { diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index e72de28a99..ae8117da84 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -15,11 +15,15 @@ import scala.util.Try import scala.util.Success object FlowTransformRecoverSpec { - abstract class TryRecoveryTransformer[T, U] extends RecoveryTransformer[T, U] { + abstract class TryRecoveryTransformer[T, U] extends Transformer[T, U] { def onNext(element: Try[T]): immutable.Seq[U] override def onNext(element: T): immutable.Seq[U] = onNext(Success(element)) - override def onErrorRecover(cause: Throwable): immutable.Seq[U] = onNext(Failure(cause)) + override def onError(cause: Throwable) = () + override def onTermination(cause: Option[Throwable]): immutable.Seq[U] = cause match { + case None ⇒ Nil + case Some(e) ⇒ onNext(Failure(e)) + } } } @@ -38,13 +42,17 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce one-to-one transformation as expected" in { val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem List(tot) } - override def onErrorRecover(e: Throwable) = List(-1) + override def onError(e: Throwable) = () + override def onTermination(e: Option[Throwable]) = e match { + case None ⇒ Nil + case Some(_) ⇒ List(-1) + } }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -62,13 +70,17 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce one-to-several transformation as expected" in { val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem Vector.fill(elem)(tot) } - override def onErrorRecover(e: Throwable) = List(-1) + override def onError(e: Throwable) = () + override def onTermination(e: Option[Throwable]) = e match { + case None ⇒ Nil + case Some(_) ⇒ List(-1) + } }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -89,13 +101,17 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce dropping transformation as expected" in { val p = Flow(List(1, 2, 3, 4).iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { var tot = 0 override def onNext(elem: Int) = { tot += elem if (elem % 2 == 0) Nil else List(tot) } - override def onErrorRecover(e: Throwable) = List(-1) + override def onError(e: Throwable) = () + override def onTermination(e: Option[Throwable]) = e match { + case None ⇒ Nil + case Some(_) ⇒ List(-1) + } }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -113,20 +129,24 @@ class FlowTransformRecoverSpec extends AkkaSpec { "produce multi-step transformation as expected" in { val p = Flow(List("a", "bc", "def").iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new TryRecoveryTransformer[String, Int] { + transform(new TryRecoveryTransformer[String, Int] { var concat = "" override def onNext(element: Try[String]) = { concat += element List(concat.length) } }). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { var tot = 0 override def onNext(length: Int) = { tot += length List(tot) } - override def onErrorRecover(e: Throwable) = List(-1) + override def onError(e: Throwable) = () + override def onTermination(e: Option[Throwable]) = e match { + case None ⇒ Nil + case Some(_) ⇒ List(-1) + } }). toProducer(materializer) val c1 = StreamTestKit.consumerProbe[Int] @@ -153,13 +173,13 @@ class FlowTransformRecoverSpec extends AkkaSpec { "invoke onComplete when done" in { val p = Flow(List("a").iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new TryRecoveryTransformer[String, String] { + transform(new TryRecoveryTransformer[String, String] { var s = "" override def onNext(element: Try[String]) = { s += element Nil } - override def onComplete() = List(s + "B") + override def onTermination(e: Option[Throwable]) = List(s + "B") }). toProducer(materializer) val c = StreamTestKit.consumerProbe[String] @@ -173,7 +193,7 @@ class FlowTransformRecoverSpec extends AkkaSpec { "allow cancellation using isComplete" in { val p = StreamTestKit.producerProbe[Int] val p2 = Flow(p). - transformRecover(new TryRecoveryTransformer[Int, Int] { + transform(new TryRecoveryTransformer[Int, Int] { var s = "" override def onNext(element: Try[Int]) = { s += element @@ -197,14 +217,14 @@ class FlowTransformRecoverSpec extends AkkaSpec { "call onComplete after isComplete signaled completion" in { val p = StreamTestKit.producerProbe[Int] val p2 = Flow(p). - transformRecover(new TryRecoveryTransformer[Int, Int] { + transform(new TryRecoveryTransformer[Int, Int] { var s = "" override def onNext(element: Try[Int]) = { s += element List(element.get) } override def isComplete = s == "Success(1)" - override def onComplete() = List(s.length + 10) + override def onTermination(e: Option[Throwable]) = List(s.length + 10) }). toProducer(materializer) val proc = p.expectSubscription @@ -223,12 +243,12 @@ class FlowTransformRecoverSpec extends AkkaSpec { "report error when exception is thrown" in { val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { override def onNext(elem: Int) = { if (elem == 2) throw new IllegalArgumentException("two not allowed") else List(elem, elem) } - override def onErrorRecover(e: Throwable) = List(-1) + override def onError(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -252,9 +272,13 @@ class FlowTransformRecoverSpec extends AkkaSpec { if (elem == 2) throw new IllegalArgumentException("two not allowed") else (1 to 5).map(elem * 100 + _) }. - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem) - override def onErrorRecover(e: Throwable) = List(-1, -2, -3) + override def onError(e: Throwable) = () + override def onTermination(e: Option[Throwable]) = e match { + case None ⇒ Nil + case Some(_) ⇒ List(-1, -2, -3) + } }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -290,36 +314,45 @@ class FlowTransformRecoverSpec extends AkkaSpec { case class TE(message: String) extends RuntimeException(message) with NoStackTrace - "transform errors when received" in { + "transform errors in sequence with normal messages" in { val p = StreamTestKit.producerProbe[Int] val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Throwable] { + transform(new Transformer[Int, String] { var s = "" - override def onNext(element: Int) = List(new IllegalStateException) - override def onErrorRecover(ex: Throwable) = { - s += ex.getMessage - List(ex) + override def onNext(element: Int) = { + s += element.toString + List(s) + } + override def onError(ex: Throwable) = () + override def onTermination(ex: Option[Throwable]) = { + ex match { + case None ⇒ Nil + case Some(e) ⇒ + s += e.getMessage + List(s) + } } - override def onComplete() = List(TE(s.size + "10")) }). toProducer(materializer) val proc = p.expectSubscription() - val c = StreamTestKit.consumerProbe[Throwable] + val c = StreamTestKit.consumerProbe[String] p2.produceTo(c) val s = c.expectSubscription() - s.requestMore(10) + proc.sendNext(0) proc.sendError(TE("1")) - c.expectNext(TE("1")) - c.expectNext(TE("110")) + // Request late to prove the in-sequence nature + s.requestMore(10) + c.expectNext("0") + c.expectNext("01") c.expectComplete() } "forward errors when received and thrown" in { val p = StreamTestKit.producerProbe[Int] val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { override def onNext(in: Int) = List(in) - override def onErrorRecover(e: Throwable) = throw e + override def onError(e: Throwable) = throw e }). toProducer(materializer) val proc = p.expectSubscription() @@ -336,9 +369,9 @@ class FlowTransformRecoverSpec extends AkkaSpec { "support cancel as expected" in { val p = Flow(List(1, 2, 3).iterator).toProducer(materializer) val p2 = Flow(p). - transformRecover(new RecoveryTransformer[Int, Int] { + transform(new Transformer[Int, Int] { override def onNext(elem: Int) = List(elem, elem) - override def onErrorRecover(e: Throwable) = List(-1) + override def onError(e: Throwable) = List(-1) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index f75709d0c9..555c8b9a78 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -11,6 +11,7 @@ import com.typesafe.config.ConfigFactory import akka.stream.scaladsl.Flow import akka.testkit.TestProbe import scala.util.control.NoStackTrace +import scala.collection.immutable @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.receive=off\nakka.loglevel=INFO")) { @@ -145,7 +146,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d s += element Nil } - override def onComplete() = List(s + "B") + override def onTermination(e: Option[Throwable]) = List(s + "B") }). toProducer(materializer) val c = StreamTestKit.consumerProbe[String] @@ -166,7 +167,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d s += element Nil } - override def onComplete() = List(s + "B") + override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). toProducer(materializer) @@ -209,7 +210,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(out) } } - override def onComplete() = List(s + "B") + override def onTermination(e: Option[Throwable]) = List(s + "B") override def cleanup() = cleanupProbe.ref ! s }). toProducer(materializer) @@ -258,7 +259,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d List(element) } override def isComplete = s == "1" - override def onComplete() = List(s.length + 10) + override def onTermination(e: Option[Throwable]) = List(s.length + 10) override def cleanup() = cleanupProbe.ref ! s }). toProducer(materializer) @@ -285,7 +286,6 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d if (elem == 2) throw new IllegalArgumentException("two not allowed") else List(elem, elem) } - override def onError(cause: Throwable): Unit = errProbe.ref ! cause }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] @@ -298,7 +298,6 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d consumer.expectError().getMessage should be("two not allowed") consumer.expectNoMsg(200.millis) } - errProbe.expectMsgType[IllegalArgumentException].getMessage should be("two not allowed") } "support cancel as expected" in { @@ -325,7 +324,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d val p2 = Flow(p). transform(new Transformer[Int, Int] { override def onNext(elem: Int) = Nil - override def onComplete() = List(1, 2, 3) + override def onTermination(e: Option[Throwable]) = List(1, 2, 3) }). toProducer(materializer) val consumer = StreamTestKit.consumerProbe[Int] diff --git a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala index 6003113202..b6dcbde3ae 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala @@ -43,26 +43,18 @@ class FlowZipSpec extends TwoStreamsSetup { "work with one immediately completed and one nonempty producer" in { val consumer1 = setup(completedPublisher, nonemptyPublisher((1 to 4).iterator)) - val subscription1 = consumer1.expectSubscription() - subscription1.requestMore(4) - consumer1.expectComplete() + consumer1.expectCompletedOrSubscriptionFollowedByComplete() val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), completedPublisher) - val subscription2 = consumer2.expectSubscription() - subscription2.requestMore(4) - consumer2.expectComplete() + consumer2.expectCompletedOrSubscriptionFollowedByComplete() } "work with one delayed completed and one nonempty producer" in { val consumer1 = setup(soonToCompletePublisher, nonemptyPublisher((1 to 4).iterator)) - val subscription1 = consumer1.expectSubscription() - subscription1.requestMore(4) - consumer1.expectComplete() + consumer1.expectCompletedOrSubscriptionFollowedByComplete() val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToCompletePublisher) - val subscription2 = consumer2.expectSubscription() - subscription2.requestMore(4) - consumer2.expectComplete() + consumer2.expectCompletedOrSubscriptionFollowedByComplete() } "work with one immediately failed and one nonempty producer" in { @@ -70,14 +62,11 @@ class FlowZipSpec extends TwoStreamsSetup { consumer1.expectErrorOrSubscriptionFollowedByError(TestException) val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), failedPublisher) - val subscription2 = consumer2.expectSubscription() - subscription2.requestMore(4) consumer2.expectErrorOrSubscriptionFollowedByError(TestException) } "work with one delayed failed and one nonempty producer" in { val consumer1 = setup(soonToFailPublisher, nonemptyPublisher((1 to 4).iterator)) - val subscription1 = consumer1.expectSubscription() consumer1.expectErrorOrSubscriptionFollowedByError(TestException) val consumer2 = setup(nonemptyPublisher((1 to 4).iterator), soonToFailPublisher) diff --git a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala index 2cabf80be6..75d1885b3a 100644 --- a/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/TwoStreamsSetup.scala @@ -75,23 +75,17 @@ abstract class TwoStreamsSetup extends AkkaSpec { def commonTests() = { "work with two immediately completed producers" in { val consumer = setup(completedPublisher, completedPublisher) - val subscription = consumer.expectSubscription() - subscription.requestMore(1) - consumer.expectComplete() + consumer.expectCompletedOrSubscriptionFollowedByComplete() } "work with two delayed completed producers" in { val consumer = setup(soonToCompletePublisher, soonToCompletePublisher) - val subscription = consumer.expectSubscription() - subscription.requestMore(1) - consumer.expectComplete() + consumer.expectCompletedOrSubscriptionFollowedByComplete() } "work with one immediately completed and one delayed completed producer" in { val consumer = setup(completedPublisher, soonToCompletePublisher) - val subscription = consumer.expectSubscription() - subscription.requestMore(1) - consumer.expectComplete() + consumer.expectCompletedOrSubscriptionFollowedByComplete() } "work with two immediately failed producers" in { diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala b/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala index 2154926a9d..94a9af32c5 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ConsumerProbe.scala @@ -23,6 +23,7 @@ trait ConsumerProbe[I] extends Consumer[I] { def expectError(): Throwable def expectErrorOrSubscriptionFollowedByError(cause: Throwable): Unit def expectErrorOrSubscriptionFollowedByError(): Throwable + def expectCompletedOrSubscriptionFollowedByComplete() def expectComplete(): Unit def expectNoMsg(): Unit diff --git a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala index fea1c8d67a..42955ff7e7 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/StreamTestKit.scala @@ -42,6 +42,15 @@ object StreamTestKit { case OnError(cause) ⇒ cause } + def expectCompletedOrSubscriptionFollowedByComplete(): Unit = { + probe.expectMsgPF() { + case s: OnSubscribe ⇒ + s.subscription.requestMore(1) + expectComplete() + case OnComplete ⇒ + } + } + def expectNoMsg(): Unit = probe.expectNoMsg() def expectNoMsg(max: FiniteDuration): Unit = probe.expectNoMsg(max)