diff --git a/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala similarity index 85% rename from akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala rename to akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala index e8e08067c8..0a7ba084f4 100644 --- a/akka-stream/src/main/scala/akka/stream/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala @@ -5,31 +5,31 @@ package akka.stream import scala.concurrent.duration.FiniteDuration import akka.actor.ActorRefFactory -import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.impl.Ast import org.reactivestreams.api.Producer import scala.concurrent.duration._ -object ProcessorGenerator { +object FlowMaterializer { /** - * Creates a ProcessorGenerator which will execute every step of a transformation + * Creates a FlowMaterializer which will execute every step of a transformation * pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]] * (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]]) * will be used to create these actors, therefore it is *forbidden* to pass this object * to another actor if the factory is an ActorContext. */ - def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator = - new ActorBasedProcessorGenerator(settings, context) + def apply(settings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer = + new ActorBasedFlowMaterializer(settings, context) } /** - * A ProcessorGenerator takes the list of transformations comprising a + * A FlowMaterializer takes the list of transformations comprising a * [[akka.stream.scaladsl.Flow]] and materializes them in the form of * [[org.reactivestreams.api.Processor]] instances. How transformation * steps are split up into asynchronous regions is implementation * dependent. */ -trait ProcessorGenerator { +trait FlowMaterializer { /** * INTERNAL API * ops are stored in reverse order @@ -48,7 +48,7 @@ trait ProcessorGenerator { * * This will likely be replaced in the future by auto-tuning these values at runtime. */ -case class GeneratorSettings( +case class MaterializerSettings( initialFanOutBufferSize: Int = 4, maxFanOutBufferSize: Int = 16, initialInputBufferSize: Int = 4, diff --git a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala similarity index 82% rename from akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala rename to akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index aeae838092..9ebf7f9f01 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ProcessorGenerator.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -10,7 +10,7 @@ import org.reactivestreams.api.{ Consumer, Processor, Producer } import org.reactivestreams.spi.Subscriber import akka.actor.ActorRefFactory -import akka.stream.{ GeneratorSettings, ProcessorGenerator } +import akka.stream.{ MaterializerSettings, FlowMaterializer } /** * INTERNAL API @@ -33,25 +33,25 @@ private[akka] object Ast { case class Concat(next: Producer[Any]) extends AstNode trait ProducerNode[I] { - def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] } case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] { - def createProducer(settings: GeneratorSettings, context: ActorRefFactory) = producer + def createProducer(settings: MaterializerSettings, context: ActorRefFactory) = producer } case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] { - def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] = + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = if (iterator.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] else new ActorProducer[I](context.actorOf(IteratorProducer.props(iterator, settings))) } case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] { - def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] = + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]] else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings))) } case class ThunkProducerNode[I](f: () ⇒ I) extends ProducerNode[I] { - def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] = + def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] = new ActorProducer(context.actorOf(ActorProducer.props(settings, f))) } } @@ -59,7 +59,7 @@ private[akka] object Ast { /** * INTERNAL API */ -private[akka] object ActorBasedProcessorGenerator { +private[akka] object ActorBasedFlowMaterializer { val ctx = new ThreadLocal[ActorRefFactory] @@ -75,9 +75,9 @@ private[akka] object ActorBasedProcessorGenerator { /** * INTERNAL API */ -private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, _context: ActorRefFactory) extends ProcessorGenerator { +private[akka] class ActorBasedFlowMaterializer(settings: MaterializerSettings, _context: ActorRefFactory) extends FlowMaterializer { import Ast._ - import ActorBasedProcessorGenerator._ + import ActorBasedFlowMaterializer._ private def context = ctx.get() match { case null ⇒ _context diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala index 0d080d0832..90ea63d3ce 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorConsumer.scala @@ -12,7 +12,7 @@ import org.reactivestreams.spi.{ Subscriber, Subscription } import Ast.{ AstNode, Recover, Transform } import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala } -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings /** * INTERNAL API @@ -43,7 +43,7 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon private[akka] object ActorConsumer { import Ast._ - def props(gen: GeneratorSettings, op: AstNode) = op match { + def props(gen: MaterializerSettings, op: AstNode) = op match { case t: Transform ⇒ Props(new TransformActorConsumer(gen, t)) case r: Recover ⇒ Props(new RecoverActorConsumer(gen, r)) } @@ -52,9 +52,9 @@ private[akka] object ActorConsumer { /** * INTERNAL API */ -private[akka] abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Actor with SoftShutdown { +private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSettings) extends Actor with SoftShutdown { import ActorProcessor._ - import ActorBasedProcessorGenerator._ + import ActorBasedFlowMaterializer._ /** * Consume one element synchronously: the Actor mailbox is the queue. @@ -121,7 +121,7 @@ private[akka] abstract class AbstractActorConsumer(val settings: GeneratorSettin /** * INTERNAL API */ -private[akka] class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging { +private[akka] class TransformActorConsumer(_settings: MaterializerSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging { private var state = op.zero private var onCompleteCalled = false @@ -155,7 +155,7 @@ private[akka] class TransformActorConsumer(_settings: GeneratorSettings, op: Ast /** * INTERNAL API */ -private[akka] class RecoverActorConsumer(_settings: GeneratorSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) { +private[akka] class RecoverActorConsumer(_settings: MaterializerSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) { override def onNext(elem: Any): Unit = { super.onNext(Success(elem)) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala index 9cfd63579d..9c40e2eb28 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProcessor.scala @@ -9,7 +9,7 @@ import scala.util.control.NonFatal import org.reactivestreams.api.Processor import org.reactivestreams.spi.Subscriber import akka.actor.{ Actor, ActorLogging, ActorRef, Props } -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import akka.event.LoggingReceive /** @@ -17,7 +17,7 @@ import akka.event.LoggingReceive */ private[akka] object ActorProcessor { import Ast._ - def props(settings: GeneratorSettings, op: AstNode): Props = op match { + def props(settings: MaterializerSettings, op: AstNode): Props = op match { case t: Transform ⇒ Props(new TransformProcessorImpl(settings, t)) case r: Recover ⇒ Props(new RecoverProcessorImpl(settings, r)) case s: SplitWhen ⇒ Props(new SplitWhenProcessorImpl(settings, s.p)) @@ -33,13 +33,13 @@ class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] wi /** * INTERNAL API */ -private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings) +private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettings) extends Actor with SubscriberManagement[Any] with ActorLogging with SoftShutdown { - import ActorBasedProcessorGenerator._ + import ActorBasedFlowMaterializer._ type S = ActorSubscription[Any] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala index c3ea2a57e1..fe4b3c02fd 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorProducer.scala @@ -9,7 +9,7 @@ import scala.collection.immutable import org.reactivestreams.api.{ Consumer, Producer } import org.reactivestreams.spi.{ Publisher, Subscriber } import akka.actor.ActorRef -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import akka.actor.ActorLogging import akka.actor.Actor import scala.concurrent.duration.Duration @@ -41,7 +41,7 @@ class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T] * INTERNAL API */ private[akka] object ActorProducer { - def props[T](settings: GeneratorSettings, f: () ⇒ T): Props = + def props[T](settings: MaterializerSettings, f: () ⇒ T): Props = Props(new ActorProducerImpl(f, settings)) } @@ -142,14 +142,14 @@ private[akka] object ActorProducerImpl { /** * INTERNAL API */ -private[akka] class ActorProducerImpl[T](f: () ⇒ T, settings: GeneratorSettings) +private[akka] class ActorProducerImpl[T](f: () ⇒ T, settings: MaterializerSettings) extends Actor with ActorLogging with SubscriberManagement[T] with SoftShutdown { import ActorProducerImpl._ - import ActorBasedProcessorGenerator._ + import ActorBasedFlowMaterializer._ type S = ActorSubscription[T] var pub: ActorPublisher[T] = _ diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala index 04fa584266..aefd7fe63b 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlowImpl.scala @@ -8,7 +8,7 @@ import scala.concurrent.{ Future, Promise } import scala.util.Try import org.reactivestreams.api.Producer import Ast.{ AstNode, Recover, Transform } -import akka.stream.ProcessorGenerator +import akka.stream.FlowMaterializer import akka.stream.scaladsl.Flow import scala.util.Success import scala.util.Failure @@ -86,17 +86,17 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: override def groupBy[K](f: (O) ⇒ K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any ⇒ Any])) - override def toFuture(generator: ProcessorGenerator): Future[O] = { + override def toFuture(materializer: FlowMaterializer): Future[O] = { val p = Promise[O]() transformRecover(0)((x, in) ⇒ { p complete in; 1 -> Nil }, onComplete = _ ⇒ { p.tryFailure(new NoSuchElementException("empty stream")); Nil }, - isComplete = _ == 1).consume(generator) + isComplete = _ == 1).consume(materializer) p.future } - override def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops) + override def consume(materializer: FlowMaterializer): Unit = materializer.consume(producerNode, ops) - def onComplete(generator: ProcessorGenerator)(callback: Try[Unit] ⇒ Unit): Unit = + def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit = transformRecover(true)( f = { case (_, fail @ Failure(ex)) ⇒ @@ -104,8 +104,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops: OnCompleteFailureToken case _ ⇒ OnCompleteSuccessToken }, - onComplete = ok ⇒ { if (ok) callback(SuccessUnit); Nil }).consume(generator) + onComplete = ok ⇒ { if (ok) callback(SuccessUnit); Nil }).consume(materializer) - override def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops) + override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala index 047e607c3d..302e725976 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/GroupByProcessorImpl.scala @@ -5,7 +5,7 @@ package akka.stream.impl import org.reactivestreams.spi.Subscription import akka.actor.{ Terminated, Props, ActorRef } -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import akka.stream.impl._ /** @@ -22,7 +22,7 @@ private[akka] object GroupByProcessorImpl { /** * INTERNAL API */ -private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { +private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any ⇒ Any) extends MultiStreamOutputProcessor(settings) { import GroupByProcessorImpl._ var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs] diff --git a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala index 1f4cfee6c9..4a14757c3f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IterableProducer.scala @@ -13,14 +13,14 @@ import akka.actor.ActorRef import akka.actor.Props import akka.actor.SupervisorStrategy import akka.actor.Terminated -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import scala.concurrent.duration.Duration /** * INTERNAL API */ private[akka] object IterableProducer { - def props(iterable: immutable.Iterable[Any], settings: GeneratorSettings): Props = + def props(iterable: immutable.Iterable[Any], settings: MaterializerSettings): Props = Props(new IterableProducer(iterable, settings)) object BasicActorSubscription { @@ -46,10 +46,10 @@ private[akka] object IterableProducer { * makes use of its own iterable, i.e. each consumer will receive the elements from the * beginning of the iterable and it can consume the elements in its own pace. */ -private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings: GeneratorSettings) extends Actor with SoftShutdown { +private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown { import IterableProducer.BasicActorSubscription import IterableProducer.BasicActorSubscription.Cancel - import ActorBasedProcessorGenerator._ + import ActorBasedFlowMaterializer._ require(iterable.nonEmpty, "Use EmptyProducer for empty iterable") @@ -139,7 +139,7 @@ private[akka] class IterableProducerWorker(iterator: Iterator[Any], subscriber: extends Actor with SoftShutdown { import IterableProducerWorker._ import IterableProducer.BasicActorSubscription._ - import ActorBasedProcessorGenerator._ + import ActorBasedFlowMaterializer._ require(iterator.hasNext, "Iterator must not be empty") diff --git a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala index ff9389f3b0..2c5a3f6885 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/IteratorProducer.scala @@ -4,14 +4,14 @@ package akka.stream.impl import akka.actor.Props -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import akka.stream.Stop /** * INTERNAL API */ private[akka] object IteratorProducer { - def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = { + def props(iterator: Iterator[Any], settings: MaterializerSettings): Props = { def f(): Any = { if (!iterator.hasNext) throw Stop iterator.next() diff --git a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala index 8f6e47e81a..6ae8783c9e 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SingleStreamProcessors.scala @@ -7,12 +7,12 @@ import scala.collection.immutable import scala.util.{ Failure, Success } import akka.actor.Props -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings /** * INTERNAL API */ -private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast.Transform) extends ActorProcessorImpl(_settings) { +private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, op: Ast.Transform) extends ActorProcessorImpl(_settings) { var state = op.zero var isComplete = false var hasOnCompleteRun = false @@ -65,7 +65,7 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast /** * INTERNAL API */ -private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) { +private[akka] class RecoverProcessorImpl(_settings: MaterializerSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) { val wrapInSuccess: Receive = { case OnNext(elem) ⇒ @@ -87,13 +87,13 @@ private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast. * INTERNAL API */ private[akka] object IdentityProcessorImpl { - def props(settings: GeneratorSettings): Props = Props(new IdentityProcessorImpl(settings)) + def props(settings: MaterializerSettings): Props = Props(new IdentityProcessorImpl(settings)) } /** * INTERNAL API */ -private[akka] class IdentityProcessorImpl(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) { +private[akka] class IdentityProcessorImpl(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { override def initialTransferState = needsPrimaryInputAndDemand override protected def transfer(): TransferState = { diff --git a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala index 4165fd22f6..ce92e038df 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/SplitWhenProcessorImpl.scala @@ -6,7 +6,7 @@ package akka.stream.impl import akka.actor.{ Props, ActorRef } import org.reactivestreams.spi.Subscription import akka.stream.impl._ -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import akka.actor.Terminated /** @@ -23,7 +23,7 @@ private[akka] object SplitWhenProcessorImpl { /** * INTERNAL API */ -private[akka] class SplitWhenProcessorImpl(_settings: GeneratorSettings, val splitPredicate: Any ⇒ Boolean) +private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any ⇒ Boolean) extends MultiStreamOutputProcessor(_settings) { import SplitWhenProcessorImpl._ diff --git a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala index 0e8632add5..6375489989 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StaticFanins.scala @@ -3,14 +3,14 @@ */ package akka.stream.impl -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import org.reactivestreams.api.Producer import scala.concurrent.forkjoin.ThreadLocalRandom /** * INTERNAL API */ -private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any]) +private[akka] class MergeImpl(_settings: MaterializerSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { lazy val needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand @@ -33,7 +33,7 @@ private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any /** * INTERNAL API */ -private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any]) +private[akka] class ZipImpl(_settings: MaterializerSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand @@ -48,7 +48,7 @@ private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any]) /** * INTERNAL API */ -private[akka] class ConcatImpl(_settings: GeneratorSettings, _other: Producer[Any]) +private[akka] class ConcatImpl(_settings: MaterializerSettings, _other: Producer[Any]) extends TwoStreamInputProcessor(_settings, _other) { lazy val needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand diff --git a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala index dfb38aa661..8be2106302 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/StreamOfStreamProcessors.scala @@ -3,7 +3,7 @@ */ package akka.stream.impl -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings import akka.actor.{ Terminated, ActorRef } import org.reactivestreams.spi.{ Subscriber, Subscription } import org.reactivestreams.api.Producer @@ -28,7 +28,7 @@ private[akka] object MultiStreamOutputProcessor { /** * INTERNAL API */ -private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) { +private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) { import MultiStreamOutputProcessor._ private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs] @@ -125,7 +125,7 @@ private[akka] object TwoStreamInputProcessor { /** * INTERNAL API */ -private[akka] abstract class TwoStreamInputProcessor(_settings: GeneratorSettings, val other: Producer[Any]) +private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSettings, val other: Producer[Any]) extends ActorProcessorImpl(_settings) { import TwoStreamInputProcessor._ diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index f1125c236e..f00975e4e6 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -11,7 +11,7 @@ import scala.util.control.NoStackTrace import org.reactivestreams.api.Producer -import akka.stream.ProcessorGenerator +import akka.stream.FlowMaterializer import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode } import akka.stream.impl.FlowImpl @@ -75,7 +75,7 @@ object Flow { * * By default every operation is executed within its own [[akka.actor.Actor]] * to enable full pipelining of the chained set of computations. This behavior - * is determined by the [[akka.stream.ProcessorGenerator]] which is required + * is determined by the [[akka.stream.FlowMaterializer]] which is required * by those methods that materialize the Flow into a series of * [[org.reactivestreams.api.Processor]] instances. The returned reactive stream * is fully started and active. @@ -219,19 +219,19 @@ trait Flow[+T] { * (failing the Future with a NoSuchElementException). *This operation * materializes the flow and initiates its execution.* * - * The given ProcessorGenerator decides how the flow’s logical structure is + * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def toFuture(generator: ProcessorGenerator): Future[T] + def toFuture(materializer: FlowMaterializer): Future[T] /** * Attaches a consumer to this stream which will just discard all received * elements. *This will materialize the flow and initiate its execution.* * - * The given ProcessorGenerator decides how the flow’s logical structure is + * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def consume(generator: ProcessorGenerator): Unit + def consume(materializer: FlowMaterializer): Unit /** * When this flow is completed, either through an error or normal @@ -240,7 +240,7 @@ trait Flow[+T] { * * *This operation materializes the flow and initiates its execution.* */ - def onComplete(generator: ProcessorGenerator)(callback: Try[Unit] ⇒ Unit): Unit + def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] ⇒ Unit): Unit /** * Materialize this flow and return the downstream-most @@ -249,10 +249,10 @@ trait Flow[+T] { * elements to fill the internal buffers it will assert back-pressure until * a consumer connects and creates demand for elements to be emitted. * - * The given ProcessorGenerator decides how the flow’s logical structure is + * The given FlowMaterializer decides how the flow’s logical structure is * broken down into individual processing steps. */ - def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance] + def toProducer(materializer: FlowMaterializer): Producer[T @uncheckedVariance] } diff --git a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala index 7791bd1b32..0ea082d6e2 100644 --- a/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/ActorProducerTest.scala @@ -6,26 +6,26 @@ package akka.stream import org.scalatest.testng.TestNGSuiteLike import org.reactivestreams.spi.Publisher import org.reactivestreams.tck.PublisherVerification -import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.impl.ActorBasedFlowMaterializer import org.reactivestreams.api.Producer import akka.stream.scaladsl.Flow class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { import system.dispatcher - private val factory = ProcessorGenerator(GeneratorSettings()) + private val materializer = FlowMaterializer(MaterializerSettings()) private def createProducer(elements: Int): Producer[Int] = { val iter = Iterator from 1000 val iter2 = if (elements > 0) iter take elements else iter - Flow(() ⇒ if (iter2.hasNext) iter2.next() else throw Stop).toProducer(factory) + Flow(() ⇒ if (iter2.hasNext) iter2.next() else throw Stop).toProducer(materializer) } def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher override def createCompletedStatePublisher(): Publisher[Int] = { val pub = createProducer(1) - Flow(pub).consume(factory) + Flow(pub).consume(materializer) Thread.sleep(100) pub.getPublisher } diff --git a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala index 57e06d6698..d86020755f 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowConcatSpec.scala @@ -3,14 +3,14 @@ */ package akka.stream -import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.stream.scaladsl.Flow class FlowConcatSpec extends AkkaSpec { - val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + val gen = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala index 9a9b129b4e..0af23bd7ad 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowDropSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamDropSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala index 5907f66b66..a8d84ee7e8 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFilterSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamFilterSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala index 7f8f5fd817..5a0ad97820 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowFoldSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamFoldSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala index 13788f1bf7..ecd8a855ee 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowForeachTest.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class FlowForeachSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala index 5b0efb45ba..e9cbde1933 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupBySpec.scala @@ -7,13 +7,13 @@ import scala.concurrent.duration._ import akka.stream.testkit._ import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer -import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class StreamGroupBySpec extends AkkaSpec { - val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + val gen = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala index 39aea8d1e1..052eee5dd1 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowGroupedSpec.scala @@ -10,7 +10,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamGroupedSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala index cc973a1df5..3f38b9d329 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIterableSpec.scala @@ -16,7 +16,7 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowIterableSpec extends AkkaSpec { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512)) "A Flow based on an iterable" must { diff --git a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala index e8458aba84..2132e2f05a 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowIteratorSpec.scala @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowIteratorSpec extends AkkaSpec { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 4, diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala index f0b8270b0b..2b00514900 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapConcatSpec.scala @@ -8,7 +8,7 @@ import akka.stream.testkit.ScriptedTest class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala index 3f27c94058..3a1cc1fda2 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMapSpec.scala @@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random } class StreamMapSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala index 84c654d484..e6731dd999 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowMergeSpec.scala @@ -7,12 +7,12 @@ import scala.concurrent.duration._ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer -import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow class FlowMergeSpec extends AkkaSpec { - val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + val gen = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala index 8b9e9bb5a4..121d4a3d1f 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowOnCompleteSpec.scala @@ -18,7 +18,7 @@ import scala.util.Success @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala index a4a0ae2acc..cd42c961d6 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSpec.scala @@ -16,7 +16,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece import system.dispatcher - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala index 64a6e2a7c8..6d88657a05 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowSplitWhenSpec.scala @@ -7,13 +7,13 @@ import scala.concurrent.duration._ import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import org.reactivestreams.api.Producer -import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class StreamSplitWhenSpec extends AkkaSpec { - val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + val gen = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala index b60b8b2c65..6a93254ef5 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTakeSpec.scala @@ -12,7 +12,7 @@ import akka.stream.impl.RequestMore class StreamTakeSpec extends AkkaSpec with ScriptedTest { - val genSettings = GeneratorSettings( + val genSettings = MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala index 9b6664fd48..9ecd802570 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowToFutureSpec.scala @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow class FlowToFutureSpec extends AkkaSpec with ScriptedTest { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 16, initialFanOutBufferSize = 1, diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala index ced11a4b1d..0490274732 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformRecoverSpec.scala @@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class FlowTransformRecoverSpec extends AkkaSpec { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala index 8884a4ca4e..859e357a4e 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowTransformSpec.scala @@ -16,7 +16,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d import system.dispatcher - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala index 8083bc11a1..c41c4e7865 100644 --- a/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/FlowZipSpec.scala @@ -3,14 +3,14 @@ */ package akka.stream -import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator } +import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer } import akka.stream.testkit.StreamTestKit import akka.testkit.AkkaSpec import akka.stream.scaladsl.Flow class FlowZipSpec extends AkkaSpec { - val gen = new ActorBasedProcessorGenerator(GeneratorSettings( + val gen = new ActorBasedFlowMaterializer(MaterializerSettings( initialInputBufferSize = 2, maximumInputBufferSize = 2, initialFanOutBufferSize = 2, diff --git a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala index d5a81257d9..59932587fb 100644 --- a/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IdentityProcessorTest.scala @@ -13,7 +13,7 @@ import akka.stream.impl.TransformProcessorImpl import akka.stream.impl.Ast import akka.testkit.TestEvent import akka.testkit.EventFilter -import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.impl.ActorBasedFlowMaterializer import akka.stream.scaladsl.Flow class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike { @@ -24,21 +24,21 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With val fanoutSize = maxBufferSize / 2 val inputSize = maxBufferSize - fanoutSize - val factory = new ActorBasedProcessorGenerator( - GeneratorSettings( + val materializer = new ActorBasedFlowMaterializer( + MaterializerSettings( initialInputBufferSize = inputSize, maximumInputBufferSize = inputSize, initialFanOutBufferSize = fanoutSize, maxFanOutBufferSize = fanoutSize), system) - val processor = factory.processorForNode(Ast.Transform(Unit, (_, in: Any) ⇒ (Unit, List(in)), _ ⇒ Nil, _ ⇒ false, _ ⇒ ())) + val processor = materializer.processorForNode(Ast.Transform(Unit, (_, in: Any) ⇒ (Unit, List(in)), _ ⇒ Nil, _ ⇒ false, _ ⇒ ())) processor.asInstanceOf[Processor[Int, Int]] } def createHelperPublisher(elements: Int): Publisher[Int] = { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) val iter = Iterator from 1000 Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher diff --git a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala index a12b53e843..751714def2 100644 --- a/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IterableProducerTest.scala @@ -11,7 +11,7 @@ import akka.stream.scaladsl.Flow class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) def createPublisher(elements: Int): Publisher[Int] = { diff --git a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala index f1a06a424d..cab1922c5e 100644 --- a/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala +++ b/akka-stream/src/test/scala/akka/stream/IteratorProducerTest.scala @@ -10,7 +10,7 @@ import akka.stream.scaladsl.Flow class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike { - val gen = ProcessorGenerator(GeneratorSettings( + val gen = FlowMaterializer(MaterializerSettings( maximumInputBufferSize = 512))(system) def createPublisher(elements: Int): Publisher[Int] = { diff --git a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala index cc235754ce..44cd3a7530 100644 --- a/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala +++ b/akka-stream/src/test/scala/akka/stream/ProcessorHierarchySpec.scala @@ -5,7 +5,7 @@ package akka.stream import akka.testkit.AkkaSpec import akka.stream.scaladsl.Flow -import akka.stream.impl.ActorBasedProcessorGenerator +import akka.stream.impl.ActorBasedFlowMaterializer import akka.actor.ActorContext import scala.concurrent.Await import scala.concurrent.duration._ @@ -15,11 +15,11 @@ import scala.util.control.NonFatal class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") { - val gen = ProcessorGenerator(GeneratorSettings()) + val gen = FlowMaterializer(MaterializerSettings()) - def self = ActorBasedProcessorGenerator.ctx.get().asInstanceOf[ActorContext].self + def self = ActorBasedFlowMaterializer.ctx.get().asInstanceOf[ActorContext].self - "An ActorBasedProcessorGenerator" must { + "An ActorBasedFlowMaterializer" must { "generate the right level of descendants" in { val f = Flow(() ⇒ { diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala index 6052b62078..f984edcb8c 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ChainSetup.scala @@ -3,16 +3,16 @@ */ package akka.stream.testkit -import akka.stream.{ GeneratorSettings, ProcessorGenerator } +import akka.stream.{ MaterializerSettings, FlowMaterializer } import akka.actor.ActorSystem import akka.stream.scaladsl.Flow -class ChainSetup[I, O](stream: Flow[I] ⇒ Flow[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) { +class ChainSetup[I, O](stream: Flow[I] ⇒ Flow[O], val settings: MaterializerSettings)(implicit val system: ActorSystem) { val upstream = StreamTestKit.producerProbe[I]() val downstream = StreamTestKit.consumerProbe[O]() private val s = stream(Flow(upstream)) - val producer = s.toProducer(ProcessorGenerator(settings)) + val producer = s.toProducer(FlowMaterializer(settings)) val upstreamSubscription = upstream.expectSubscription() producer.produceTo(downstream) val downstreamSubscription = downstream.expectSubscription() diff --git a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala index 7ecb1245ca..8ff3d85426 100644 --- a/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala +++ b/akka-stream/src/test/scala/akka/stream/testkit/ScriptedTest.scala @@ -11,7 +11,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom import scala.concurrent.duration._ import scala.util.control.NonFatal import akka.stream.scaladsl.Flow -import akka.stream.GeneratorSettings +import akka.stream.MaterializerSettings trait ScriptedTest extends ShouldMatchers { @@ -78,7 +78,7 @@ trait ScriptedTest extends ShouldMatchers { class ScriptRunner[In, Out]( op: Flow[In] ⇒ Flow[Out], - gen: GeneratorSettings, + gen: MaterializerSettings, script: Script[In, Out], maximumOverrun: Int, maximumRequest: Int, @@ -186,7 +186,7 @@ trait ScriptedTest extends ShouldMatchers { } - def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( + def runScript[In, Out](script: Script[In, Out], gen: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)( op: Flow[In] ⇒ Flow[Out])(implicit system: ActorSystem): Unit = { new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run() }