From 289c03d1a1324fb11964950fa9f479a0a699a2a4 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 1 Apr 2014 14:58:54 +0200 Subject: [PATCH] !str move to SingleStreamProcessors.scala --- .../akka/stream/impl/ActorProcessor.scala | 84 ----------------- .../stream/impl/SingleStreamProcessors.scala | 94 +++++++++++++++++++ ...ducer.scala => SubscriberManagement.scala} | 0 3 files changed, 94 insertions(+), 84 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala rename akka-stream/src/main/scala/akka/stream/impl/{AbstractProducer.scala => SubscriberManagement.scala} (100%) diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 6ff7769370..a2be10fd3d 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -228,87 +228,3 @@ private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) } } - -/** - * INTERNAL API - */ -private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast.Transform) extends ActorProcessorImpl(_settings) { - var state = op.zero - var isComplete = false - var hasOnCompleteRun = false - // TODO performance improvement: mutable buffer? - var emits = immutable.Seq.empty[Any] - - object NeedsInputAndDemandOrCompletion extends TransferState { - def isReady = (primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable) || primaryInputs.inputsDepleted - def isCompleted = false - } - - override def initialTransferState = NeedsInputAndDemandOrCompletion - - override def transfer(): TransferState = { - val depleted = primaryInputs.inputsDepleted - if (emits.isEmpty) { - isComplete = op.isComplete(state) - if (depleted || isComplete) { - emits = op.onComplete(state) - hasOnCompleteRun = true - } else { - val e = primaryInputs.dequeueInputElement() - val (nextState, newEmits) = op.f(state, e) - state = nextState - emits = newEmits - } - } else { - PrimaryOutputs.enqueueOutputElement(emits.head) - emits = emits.tail - } - - if (emits.nonEmpty) PrimaryOutputs.NeedsDemand - else if (hasOnCompleteRun) Completed - else NeedsInputAndDemandOrCompletion - } - - override def toString: String = s"Transformer(state=$state, isComplete=$isComplete, hasOnCompleteRun=$hasOnCompleteRun, emits=$emits)" -} - -/** - * INTERNAL API - */ -private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) { - - val wrapInSuccess: Receive = { - case OnNext(elem) ⇒ - primaryInputs.enqueueInputElement(Success(elem)) - pump() - } - - override def running: Receive = wrapInSuccess orElse super.running - - override def failureReceived(e: Throwable): Unit = { - primaryInputs.enqueueInputElement(Failure(e)) - primaryInputs.complete() - flushAndComplete() - pump() - } -} - -/** - * INTERNAL API - */ -private[akka] object IdentityProcessorImpl { - def props(settings: GeneratorSettings): Props = Props(new IdentityProcessorImpl(settings)) -} - -/** - * INTERNAL API - */ -private[akka] class IdentityProcessorImpl(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) { - - override def initialTransferState = needsPrimaryInputAndDemand - override protected def transfer(): TransferState = { - PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) - needsPrimaryInputAndDemand - } - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala new file mode 100644 index 0000000000..698df96370 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -0,0 +1,94 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import scala.collection.immutable +import scala.util.{Failure, Success} + +import akka.actor.Props +import akka.stream.GeneratorSettings + +/** + * INTERNAL API + */ +private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast.Transform) extends ActorProcessorImpl(_settings) { + var state = op.zero + var isComplete = false + var hasOnCompleteRun = false + // TODO performance improvement: mutable buffer? + var emits = immutable.Seq.empty[Any] + + object NeedsInputAndDemandOrCompletion extends TransferState { + def isReady = (primaryInputs.inputsAvailable && PrimaryOutputs.demandAvailable) || primaryInputs.inputsDepleted + def isCompleted = false + } + + override def initialTransferState = NeedsInputAndDemandOrCompletion + + override def transfer(): TransferState = { + val depleted = primaryInputs.inputsDepleted + if (emits.isEmpty) { + isComplete = op.isComplete(state) + if (depleted || isComplete) { + emits = op.onComplete(state) + hasOnCompleteRun = true + } else { + val e = primaryInputs.dequeueInputElement() + val (nextState, newEmits) = op.f(state, e) + state = nextState + emits = newEmits + } + } else { + PrimaryOutputs.enqueueOutputElement(emits.head) + emits = emits.tail + } + + if (emits.nonEmpty) PrimaryOutputs.NeedsDemand + else if (hasOnCompleteRun) Completed + else NeedsInputAndDemandOrCompletion + } + + override def toString: String = s"Transformer(state=$state, isComplete=$isComplete, hasOnCompleteRun=$hasOnCompleteRun, emits=$emits)" +} + +/** + * INTERNAL API + */ +private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) { + + val wrapInSuccess: Receive = { + case OnNext(elem) ⇒ + primaryInputs.enqueueInputElement(Success(elem)) + pump() + } + + override def running: Receive = wrapInSuccess orElse super.running + + override def failureReceived(e: Throwable): Unit = { + primaryInputs.enqueueInputElement(Failure(e)) + primaryInputs.complete() + flushAndComplete() + pump() + } +} + +/** + * INTERNAL API + */ +private[akka] object IdentityProcessorImpl { + def props(settings: GeneratorSettings): Props = Props(new IdentityProcessorImpl(settings)) +} + +/** + * INTERNAL API + */ +private[akka] class IdentityProcessorImpl(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) { + + override def initialTransferState = needsPrimaryInputAndDemand + override protected def transfer(): TransferState = { + PrimaryOutputs.enqueueOutputElement(primaryInputs.dequeueInputElement()) + needsPrimaryInputAndDemand + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala similarity index 100% rename from akka-stream/src/main/scala/akka/stream/impl/AbstractProducer.scala rename to akka-stream/src/main/scala/akka/stream/impl/SubscriberManagement.scala