!str Rename ProcessGenerator to FlowMaterializer

This commit is contained in:
Patrik Nordwall 2014-04-08 13:37:55 +02:00 committed by Roland Kuhn
parent 907765fc24
commit a318676f4a
41 changed files with 116 additions and 116 deletions

View file

@ -5,31 +5,31 @@ package akka.stream
import scala.concurrent.duration.FiniteDuration
import akka.actor.ActorRefFactory
import akka.stream.impl.ActorBasedProcessorGenerator
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.impl.Ast
import org.reactivestreams.api.Producer
import scala.concurrent.duration._
object ProcessorGenerator {
object FlowMaterializer {
/**
* Creates a ProcessorGenerator which will execute every step of a transformation
* Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*/
def apply(settings: GeneratorSettings)(implicit context: ActorRefFactory): ProcessorGenerator =
new ActorBasedProcessorGenerator(settings, context)
def apply(settings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer =
new ActorBasedFlowMaterializer(settings, context)
}
/**
* A ProcessorGenerator takes the list of transformations comprising a
* A FlowMaterializer takes the list of transformations comprising a
* [[akka.stream.scaladsl.Flow]] and materializes them in the form of
* [[org.reactivestreams.api.Processor]] instances. How transformation
* steps are split up into asynchronous regions is implementation
* dependent.
*/
trait ProcessorGenerator {
trait FlowMaterializer {
/**
* INTERNAL API
* ops are stored in reverse order
@ -48,7 +48,7 @@ trait ProcessorGenerator {
*
* This will likely be replaced in the future by auto-tuning these values at runtime.
*/
case class GeneratorSettings(
case class MaterializerSettings(
initialFanOutBufferSize: Int = 4,
maxFanOutBufferSize: Int = 16,
initialInputBufferSize: Int = 4,

View file

@ -10,7 +10,7 @@ import org.reactivestreams.api.{ Consumer, Processor, Producer }
import org.reactivestreams.spi.Subscriber
import akka.actor.ActorRefFactory
import akka.stream.{ GeneratorSettings, ProcessorGenerator }
import akka.stream.{ MaterializerSettings, FlowMaterializer }
/**
* INTERNAL API
@ -33,25 +33,25 @@ private[akka] object Ast {
case class Concat(next: Producer[Any]) extends AstNode
trait ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I]
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I]
}
case class ExistingProducer[I](producer: Producer[I]) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory) = producer
def createProducer(settings: MaterializerSettings, context: ActorRefFactory) = producer
}
case class IteratorProducerNode[I](iterator: Iterator[I]) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] =
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] =
if (iterator.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
else new ActorProducer[I](context.actorOf(IteratorProducer.props(iterator, settings)))
}
case class IterableProducerNode[I](iterable: immutable.Iterable[I]) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] =
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] =
if (iterable.isEmpty) EmptyProducer.asInstanceOf[Producer[I]]
else new ActorProducer[I](context.actorOf(IterableProducer.props(iterable, settings)))
}
case class ThunkProducerNode[I](f: () I) extends ProducerNode[I] {
def createProducer(settings: GeneratorSettings, context: ActorRefFactory): Producer[I] =
def createProducer(settings: MaterializerSettings, context: ActorRefFactory): Producer[I] =
new ActorProducer(context.actorOf(ActorProducer.props(settings, f)))
}
}
@ -59,7 +59,7 @@ private[akka] object Ast {
/**
* INTERNAL API
*/
private[akka] object ActorBasedProcessorGenerator {
private[akka] object ActorBasedFlowMaterializer {
val ctx = new ThreadLocal[ActorRefFactory]
@ -75,9 +75,9 @@ private[akka] object ActorBasedProcessorGenerator {
/**
* INTERNAL API
*/
private[akka] class ActorBasedProcessorGenerator(settings: GeneratorSettings, _context: ActorRefFactory) extends ProcessorGenerator {
private[akka] class ActorBasedFlowMaterializer(settings: MaterializerSettings, _context: ActorRefFactory) extends FlowMaterializer {
import Ast._
import ActorBasedProcessorGenerator._
import ActorBasedFlowMaterializer._
private def context = ctx.get() match {
case null _context

View file

@ -12,7 +12,7 @@ import org.reactivestreams.spi.{ Subscriber, Subscription }
import Ast.{ AstNode, Recover, Transform }
import akka.actor.{ Actor, ActorLogging, ActorRef, Props, actorRef2Scala }
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
/**
* INTERNAL API
@ -43,7 +43,7 @@ private[akka] class ActorConsumer[T]( final val impl: ActorRef) extends ActorCon
private[akka] object ActorConsumer {
import Ast._
def props(gen: GeneratorSettings, op: AstNode) = op match {
def props(gen: MaterializerSettings, op: AstNode) = op match {
case t: Transform Props(new TransformActorConsumer(gen, t))
case r: Recover Props(new RecoverActorConsumer(gen, r))
}
@ -52,9 +52,9 @@ private[akka] object ActorConsumer {
/**
* INTERNAL API
*/
private[akka] abstract class AbstractActorConsumer(val settings: GeneratorSettings) extends Actor with SoftShutdown {
private[akka] abstract class AbstractActorConsumer(val settings: MaterializerSettings) extends Actor with SoftShutdown {
import ActorProcessor._
import ActorBasedProcessorGenerator._
import ActorBasedFlowMaterializer._
/**
* Consume one element synchronously: the Actor mailbox is the queue.
@ -121,7 +121,7 @@ private[akka] abstract class AbstractActorConsumer(val settings: GeneratorSettin
/**
* INTERNAL API
*/
private[akka] class TransformActorConsumer(_settings: GeneratorSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging {
private[akka] class TransformActorConsumer(_settings: MaterializerSettings, op: Ast.Transform) extends AbstractActorConsumer(_settings) with ActorLogging {
private var state = op.zero
private var onCompleteCalled = false
@ -155,7 +155,7 @@ private[akka] class TransformActorConsumer(_settings: GeneratorSettings, op: Ast
/**
* INTERNAL API
*/
private[akka] class RecoverActorConsumer(_settings: GeneratorSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) {
private[akka] class RecoverActorConsumer(_settings: MaterializerSettings, op: Ast.Recover) extends TransformActorConsumer(_settings, op.t) {
override def onNext(elem: Any): Unit = {
super.onNext(Success(elem))
}

View file

@ -9,7 +9,7 @@ import scala.util.control.NonFatal
import org.reactivestreams.api.Processor
import org.reactivestreams.spi.Subscriber
import akka.actor.{ Actor, ActorLogging, ActorRef, Props }
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import akka.event.LoggingReceive
/**
@ -17,7 +17,7 @@ import akka.event.LoggingReceive
*/
private[akka] object ActorProcessor {
import Ast._
def props(settings: GeneratorSettings, op: AstNode): Props = op match {
def props(settings: MaterializerSettings, op: AstNode): Props = op match {
case t: Transform Props(new TransformProcessorImpl(settings, t))
case r: Recover Props(new RecoverProcessorImpl(settings, r))
case s: SplitWhen Props(new SplitWhenProcessorImpl(settings, s.p))
@ -33,13 +33,13 @@ class ActorProcessor[I, O]( final val impl: ActorRef) extends Processor[I, O] wi
/**
* INTERNAL API
*/
private[akka] abstract class ActorProcessorImpl(val settings: GeneratorSettings)
private[akka] abstract class ActorProcessorImpl(val settings: MaterializerSettings)
extends Actor
with SubscriberManagement[Any]
with ActorLogging
with SoftShutdown {
import ActorBasedProcessorGenerator._
import ActorBasedFlowMaterializer._
type S = ActorSubscription[Any]

View file

@ -9,7 +9,7 @@ import scala.collection.immutable
import org.reactivestreams.api.{ Consumer, Producer }
import org.reactivestreams.spi.{ Publisher, Subscriber }
import akka.actor.ActorRef
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import akka.actor.ActorLogging
import akka.actor.Actor
import scala.concurrent.duration.Duration
@ -41,7 +41,7 @@ class ActorProducer[T]( final val impl: ActorRef) extends ActorProducerLike[T]
* INTERNAL API
*/
private[akka] object ActorProducer {
def props[T](settings: GeneratorSettings, f: () T): Props =
def props[T](settings: MaterializerSettings, f: () T): Props =
Props(new ActorProducerImpl(f, settings))
}
@ -142,14 +142,14 @@ private[akka] object ActorProducerImpl {
/**
* INTERNAL API
*/
private[akka] class ActorProducerImpl[T](f: () T, settings: GeneratorSettings)
private[akka] class ActorProducerImpl[T](f: () T, settings: MaterializerSettings)
extends Actor
with ActorLogging
with SubscriberManagement[T]
with SoftShutdown {
import ActorProducerImpl._
import ActorBasedProcessorGenerator._
import ActorBasedFlowMaterializer._
type S = ActorSubscription[T]
var pub: ActorPublisher[T] = _

View file

@ -8,7 +8,7 @@ import scala.concurrent.{ Future, Promise }
import scala.util.Try
import org.reactivestreams.api.Producer
import Ast.{ AstNode, Recover, Transform }
import akka.stream.ProcessorGenerator
import akka.stream.FlowMaterializer
import akka.stream.scaladsl.Flow
import scala.util.Success
import scala.util.Failure
@ -86,17 +86,17 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
override def groupBy[K](f: (O) K): Flow[(K, Producer[O])] = andThen(GroupBy(f.asInstanceOf[Any Any]))
override def toFuture(generator: ProcessorGenerator): Future[O] = {
override def toFuture(materializer: FlowMaterializer): Future[O] = {
val p = Promise[O]()
transformRecover(0)((x, in) { p complete in; 1 -> Nil },
onComplete = _ { p.tryFailure(new NoSuchElementException("empty stream")); Nil },
isComplete = _ == 1).consume(generator)
isComplete = _ == 1).consume(materializer)
p.future
}
override def consume(generator: ProcessorGenerator): Unit = generator.consume(producerNode, ops)
override def consume(materializer: FlowMaterializer): Unit = materializer.consume(producerNode, ops)
def onComplete(generator: ProcessorGenerator)(callback: Try[Unit] Unit): Unit =
def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Unit =
transformRecover(true)(
f = {
case (_, fail @ Failure(ex))
@ -104,8 +104,8 @@ private[akka] case class FlowImpl[I, O](producerNode: Ast.ProducerNode[I], ops:
OnCompleteFailureToken
case _ OnCompleteSuccessToken
},
onComplete = ok { if (ok) callback(SuccessUnit); Nil }).consume(generator)
onComplete = ok { if (ok) callback(SuccessUnit); Nil }).consume(materializer)
override def toProducer(generator: ProcessorGenerator): Producer[O] = generator.toProducer(producerNode, ops)
override def toProducer(materializer: FlowMaterializer): Producer[O] = materializer.toProducer(producerNode, ops)
}

View file

@ -5,7 +5,7 @@ package akka.stream.impl
import org.reactivestreams.spi.Subscription
import akka.actor.{ Terminated, Props, ActorRef }
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import akka.stream.impl._
/**
@ -22,7 +22,7 @@ private[akka] object GroupByProcessorImpl {
/**
* INTERNAL API
*/
private[akka] class GroupByProcessorImpl(settings: GeneratorSettings, val keyFor: Any Any) extends MultiStreamOutputProcessor(settings) {
private[akka] class GroupByProcessorImpl(settings: MaterializerSettings, val keyFor: Any Any) extends MultiStreamOutputProcessor(settings) {
import GroupByProcessorImpl._
var keyToSubstreamOutputs = collection.mutable.Map.empty[Any, SubstreamOutputs]

View file

@ -13,14 +13,14 @@ import akka.actor.ActorRef
import akka.actor.Props
import akka.actor.SupervisorStrategy
import akka.actor.Terminated
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import scala.concurrent.duration.Duration
/**
* INTERNAL API
*/
private[akka] object IterableProducer {
def props(iterable: immutable.Iterable[Any], settings: GeneratorSettings): Props =
def props(iterable: immutable.Iterable[Any], settings: MaterializerSettings): Props =
Props(new IterableProducer(iterable, settings))
object BasicActorSubscription {
@ -46,10 +46,10 @@ private[akka] object IterableProducer {
* makes use of its own iterable, i.e. each consumer will receive the elements from the
* beginning of the iterable and it can consume the elements in its own pace.
*/
private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings: GeneratorSettings) extends Actor with SoftShutdown {
private[akka] class IterableProducer(iterable: immutable.Iterable[Any], settings: MaterializerSettings) extends Actor with SoftShutdown {
import IterableProducer.BasicActorSubscription
import IterableProducer.BasicActorSubscription.Cancel
import ActorBasedProcessorGenerator._
import ActorBasedFlowMaterializer._
require(iterable.nonEmpty, "Use EmptyProducer for empty iterable")
@ -139,7 +139,7 @@ private[akka] class IterableProducerWorker(iterator: Iterator[Any], subscriber:
extends Actor with SoftShutdown {
import IterableProducerWorker._
import IterableProducer.BasicActorSubscription._
import ActorBasedProcessorGenerator._
import ActorBasedFlowMaterializer._
require(iterator.hasNext, "Iterator must not be empty")

View file

@ -4,14 +4,14 @@
package akka.stream.impl
import akka.actor.Props
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import akka.stream.Stop
/**
* INTERNAL API
*/
private[akka] object IteratorProducer {
def props(iterator: Iterator[Any], settings: GeneratorSettings): Props = {
def props(iterator: Iterator[Any], settings: MaterializerSettings): Props = {
def f(): Any = {
if (!iterator.hasNext) throw Stop
iterator.next()

View file

@ -7,12 +7,12 @@ import scala.collection.immutable
import scala.util.{ Failure, Success }
import akka.actor.Props
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
/**
* INTERNAL API
*/
private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast.Transform) extends ActorProcessorImpl(_settings) {
private[akka] class TransformProcessorImpl(_settings: MaterializerSettings, op: Ast.Transform) extends ActorProcessorImpl(_settings) {
var state = op.zero
var isComplete = false
var hasOnCompleteRun = false
@ -65,7 +65,7 @@ private[akka] class TransformProcessorImpl(_settings: GeneratorSettings, op: Ast
/**
* INTERNAL API
*/
private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) {
private[akka] class RecoverProcessorImpl(_settings: MaterializerSettings, _op: Ast.Recover) extends TransformProcessorImpl(_settings, _op.t) {
val wrapInSuccess: Receive = {
case OnNext(elem)
@ -87,13 +87,13 @@ private[akka] class RecoverProcessorImpl(_settings: GeneratorSettings, _op: Ast.
* INTERNAL API
*/
private[akka] object IdentityProcessorImpl {
def props(settings: GeneratorSettings): Props = Props(new IdentityProcessorImpl(settings))
def props(settings: MaterializerSettings): Props = Props(new IdentityProcessorImpl(settings))
}
/**
* INTERNAL API
*/
private[akka] class IdentityProcessorImpl(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) {
private[akka] class IdentityProcessorImpl(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
override def initialTransferState = needsPrimaryInputAndDemand
override protected def transfer(): TransferState = {

View file

@ -6,7 +6,7 @@ package akka.stream.impl
import akka.actor.{ Props, ActorRef }
import org.reactivestreams.spi.Subscription
import akka.stream.impl._
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import akka.actor.Terminated
/**
@ -23,7 +23,7 @@ private[akka] object SplitWhenProcessorImpl {
/**
* INTERNAL API
*/
private[akka] class SplitWhenProcessorImpl(_settings: GeneratorSettings, val splitPredicate: Any Boolean)
private[akka] class SplitWhenProcessorImpl(_settings: MaterializerSettings, val splitPredicate: Any Boolean)
extends MultiStreamOutputProcessor(_settings) {
import SplitWhenProcessorImpl._

View file

@ -3,14 +3,14 @@
*/
package akka.stream.impl
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import org.reactivestreams.api.Producer
import scala.concurrent.forkjoin.ThreadLocalRandom
/**
* INTERNAL API
*/
private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any])
private[akka] class MergeImpl(_settings: MaterializerSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
lazy val needsAnyInputAndDemand = (primaryInputs.NeedsInput || secondaryInputs.NeedsInput) && PrimaryOutputs.NeedsDemand
@ -33,7 +33,7 @@ private[akka] class MergeImpl(_settings: GeneratorSettings, _other: Producer[Any
/**
* INTERNAL API
*/
private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any])
private[akka] class ZipImpl(_settings: MaterializerSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
lazy val needsBothInputAndDemand = primaryInputs.NeedsInput && secondaryInputs.NeedsInput && PrimaryOutputs.NeedsDemand
@ -48,7 +48,7 @@ private[akka] class ZipImpl(_settings: GeneratorSettings, _other: Producer[Any])
/**
* INTERNAL API
*/
private[akka] class ConcatImpl(_settings: GeneratorSettings, _other: Producer[Any])
private[akka] class ConcatImpl(_settings: MaterializerSettings, _other: Producer[Any])
extends TwoStreamInputProcessor(_settings, _other) {
lazy val needsPrimaryInputAndDemandWithComplete = primaryInputs.NeedsInputOrComplete && PrimaryOutputs.NeedsDemand

View file

@ -3,7 +3,7 @@
*/
package akka.stream.impl
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
import akka.actor.{ Terminated, ActorRef }
import org.reactivestreams.spi.{ Subscriber, Subscription }
import org.reactivestreams.api.Producer
@ -28,7 +28,7 @@ private[akka] object MultiStreamOutputProcessor {
/**
* INTERNAL API
*/
private[akka] abstract class MultiStreamOutputProcessor(_settings: GeneratorSettings) extends ActorProcessorImpl(_settings) {
private[akka] abstract class MultiStreamOutputProcessor(_settings: MaterializerSettings) extends ActorProcessorImpl(_settings) {
import MultiStreamOutputProcessor._
private val substreamOutputs = collection.mutable.Map.empty[ActorRef, SubstreamOutputs]
@ -125,7 +125,7 @@ private[akka] object TwoStreamInputProcessor {
/**
* INTERNAL API
*/
private[akka] abstract class TwoStreamInputProcessor(_settings: GeneratorSettings, val other: Producer[Any])
private[akka] abstract class TwoStreamInputProcessor(_settings: MaterializerSettings, val other: Producer[Any])
extends ActorProcessorImpl(_settings) {
import TwoStreamInputProcessor._

View file

@ -11,7 +11,7 @@ import scala.util.control.NoStackTrace
import org.reactivestreams.api.Producer
import akka.stream.ProcessorGenerator
import akka.stream.FlowMaterializer
import akka.stream.impl.Ast.{ ExistingProducer, IterableProducerNode, IteratorProducerNode, ThunkProducerNode }
import akka.stream.impl.FlowImpl
@ -75,7 +75,7 @@ object Flow {
*
* By default every operation is executed within its own [[akka.actor.Actor]]
* to enable full pipelining of the chained set of computations. This behavior
* is determined by the [[akka.stream.ProcessorGenerator]] which is required
* is determined by the [[akka.stream.FlowMaterializer]] which is required
* by those methods that materialize the Flow into a series of
* [[org.reactivestreams.api.Processor]] instances. The returned reactive stream
* is fully started and active.
@ -219,19 +219,19 @@ trait Flow[+T] {
* (failing the Future with a NoSuchElementException). *This operation
* materializes the flow and initiates its execution.*
*
* The given ProcessorGenerator decides how the flows logical structure is
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def toFuture(generator: ProcessorGenerator): Future[T]
def toFuture(materializer: FlowMaterializer): Future[T]
/**
* Attaches a consumer to this stream which will just discard all received
* elements. *This will materialize the flow and initiate its execution.*
*
* The given ProcessorGenerator decides how the flows logical structure is
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def consume(generator: ProcessorGenerator): Unit
def consume(materializer: FlowMaterializer): Unit
/**
* When this flow is completed, either through an error or normal
@ -240,7 +240,7 @@ trait Flow[+T] {
*
* *This operation materializes the flow and initiates its execution.*
*/
def onComplete(generator: ProcessorGenerator)(callback: Try[Unit] Unit): Unit
def onComplete(materializer: FlowMaterializer)(callback: Try[Unit] Unit): Unit
/**
* Materialize this flow and return the downstream-most
@ -249,10 +249,10 @@ trait Flow[+T] {
* elements to fill the internal buffers it will assert back-pressure until
* a consumer connects and creates demand for elements to be emitted.
*
* The given ProcessorGenerator decides how the flows logical structure is
* The given FlowMaterializer decides how the flows logical structure is
* broken down into individual processing steps.
*/
def toProducer(generator: ProcessorGenerator): Producer[T @uncheckedVariance]
def toProducer(materializer: FlowMaterializer): Producer[T @uncheckedVariance]
}

View file

@ -6,26 +6,26 @@ package akka.stream
import org.scalatest.testng.TestNGSuiteLike
import org.reactivestreams.spi.Publisher
import org.reactivestreams.tck.PublisherVerification
import akka.stream.impl.ActorBasedProcessorGenerator
import akka.stream.impl.ActorBasedFlowMaterializer
import org.reactivestreams.api.Producer
import akka.stream.scaladsl.Flow
class ActorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
import system.dispatcher
private val factory = ProcessorGenerator(GeneratorSettings())
private val materializer = FlowMaterializer(MaterializerSettings())
private def createProducer(elements: Int): Producer[Int] = {
val iter = Iterator from 1000
val iter2 = if (elements > 0) iter take elements else iter
Flow(() if (iter2.hasNext) iter2.next() else throw Stop).toProducer(factory)
Flow(() if (iter2.hasNext) iter2.next() else throw Stop).toProducer(materializer)
}
def createPublisher(elements: Int): Publisher[Int] = createProducer(elements).getPublisher
override def createCompletedStatePublisher(): Publisher[Int] = {
val pub = createProducer(1)
Flow(pub).consume(factory)
Flow(pub).consume(materializer)
Thread.sleep(100)
pub.getPublisher
}

View file

@ -3,14 +3,14 @@
*/
package akka.stream
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
class FlowConcatSpec extends AkkaSpec {
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamDropSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamFilterSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamFoldSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class FlowForeachSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -7,13 +7,13 @@ import scala.concurrent.duration._
import akka.stream.testkit._
import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamGroupBySpec extends AkkaSpec {
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -10,7 +10,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamGroupedSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -16,7 +16,7 @@ import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowIterableSpec extends AkkaSpec {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))
"A Flow based on an iterable" must {

View file

@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowIteratorSpec extends AkkaSpec {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 4,

View file

@ -8,7 +8,7 @@ import akka.stream.testkit.ScriptedTest
class FlowMapConcatSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -9,7 +9,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom.{ current ⇒ random }
class StreamMapSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -7,12 +7,12 @@ import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.Flow
class FlowMergeSpec extends AkkaSpec {
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -18,7 +18,7 @@ import scala.util.Success
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowOnCompleteSpec extends AkkaSpec with ScriptedTest {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -16,7 +16,7 @@ class FlowSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.debug.rece
import system.dispatcher
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -7,13 +7,13 @@ import scala.concurrent.duration._
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import org.reactivestreams.api.Producer
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class StreamSplitWhenSpec extends AkkaSpec {
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -12,7 +12,7 @@ import akka.stream.impl.RequestMore
class StreamTakeSpec extends AkkaSpec with ScriptedTest {
val genSettings = GeneratorSettings(
val genSettings = MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow
class FlowToFutureSpec extends AkkaSpec with ScriptedTest {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 16,
initialFanOutBufferSize = 1,

View file

@ -14,7 +14,7 @@ import akka.stream.scaladsl.Flow
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class FlowTransformRecoverSpec extends AkkaSpec {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -16,7 +16,7 @@ class FlowTransformSpec extends AkkaSpec(ConfigFactory.parseString("akka.actor.d
import system.dispatcher
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -3,14 +3,14 @@
*/
package akka.stream
import akka.stream.impl.{ IteratorProducer, ActorBasedProcessorGenerator }
import akka.stream.impl.{ IteratorProducer, ActorBasedFlowMaterializer }
import akka.stream.testkit.StreamTestKit
import akka.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
class FlowZipSpec extends AkkaSpec {
val gen = new ActorBasedProcessorGenerator(GeneratorSettings(
val gen = new ActorBasedFlowMaterializer(MaterializerSettings(
initialInputBufferSize = 2,
maximumInputBufferSize = 2,
initialFanOutBufferSize = 2,

View file

@ -13,7 +13,7 @@ import akka.stream.impl.TransformProcessorImpl
import akka.stream.impl.Ast
import akka.testkit.TestEvent
import akka.testkit.EventFilter
import akka.stream.impl.ActorBasedProcessorGenerator
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.stream.scaladsl.Flow
class IdentityProcessorTest extends IdentityProcessorVerification[Int] with WithActorSystem with TestNGSuiteLike {
@ -24,21 +24,21 @@ class IdentityProcessorTest extends IdentityProcessorVerification[Int] with With
val fanoutSize = maxBufferSize / 2
val inputSize = maxBufferSize - fanoutSize
val factory = new ActorBasedProcessorGenerator(
GeneratorSettings(
val materializer = new ActorBasedFlowMaterializer(
MaterializerSettings(
initialInputBufferSize = inputSize,
maximumInputBufferSize = inputSize,
initialFanOutBufferSize = fanoutSize,
maxFanOutBufferSize = fanoutSize),
system)
val processor = factory.processorForNode(Ast.Transform(Unit, (_, in: Any) (Unit, List(in)), _ Nil, _ false, _ ()))
val processor = materializer.processorForNode(Ast.Transform(Unit, (_, in: Any) (Unit, List(in)), _ Nil, _ false, _ ()))
processor.asInstanceOf[Processor[Int, Int]]
}
def createHelperPublisher(elements: Int): Publisher[Int] = {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
val iter = Iterator from 1000
Flow(if (elements > 0) iter take elements else iter).toProducer(gen).getPublisher

View file

@ -11,7 +11,7 @@ import akka.stream.scaladsl.Flow
class IterableProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {

View file

@ -10,7 +10,7 @@ import akka.stream.scaladsl.Flow
class IteratorProducerTest extends PublisherVerification[Int] with WithActorSystem with TestNGSuiteLike {
val gen = ProcessorGenerator(GeneratorSettings(
val gen = FlowMaterializer(MaterializerSettings(
maximumInputBufferSize = 512))(system)
def createPublisher(elements: Int): Publisher[Int] = {

View file

@ -5,7 +5,7 @@ package akka.stream
import akka.testkit.AkkaSpec
import akka.stream.scaladsl.Flow
import akka.stream.impl.ActorBasedProcessorGenerator
import akka.stream.impl.ActorBasedFlowMaterializer
import akka.actor.ActorContext
import scala.concurrent.Await
import scala.concurrent.duration._
@ -15,11 +15,11 @@ import scala.util.control.NonFatal
class ProcessorHierarchySpec extends AkkaSpec("akka.actor.debug.lifecycle=off\nakka.loglevel=INFO") {
val gen = ProcessorGenerator(GeneratorSettings())
val gen = FlowMaterializer(MaterializerSettings())
def self = ActorBasedProcessorGenerator.ctx.get().asInstanceOf[ActorContext].self
def self = ActorBasedFlowMaterializer.ctx.get().asInstanceOf[ActorContext].self
"An ActorBasedProcessorGenerator" must {
"An ActorBasedFlowMaterializer" must {
"generate the right level of descendants" in {
val f = Flow(() {

View file

@ -3,16 +3,16 @@
*/
package akka.stream.testkit
import akka.stream.{ GeneratorSettings, ProcessorGenerator }
import akka.stream.{ MaterializerSettings, FlowMaterializer }
import akka.actor.ActorSystem
import akka.stream.scaladsl.Flow
class ChainSetup[I, O](stream: Flow[I] Flow[O], val settings: GeneratorSettings)(implicit val system: ActorSystem) {
class ChainSetup[I, O](stream: Flow[I] Flow[O], val settings: MaterializerSettings)(implicit val system: ActorSystem) {
val upstream = StreamTestKit.producerProbe[I]()
val downstream = StreamTestKit.consumerProbe[O]()
private val s = stream(Flow(upstream))
val producer = s.toProducer(ProcessorGenerator(settings))
val producer = s.toProducer(FlowMaterializer(settings))
val upstreamSubscription = upstream.expectSubscription()
producer.produceTo(downstream)
val downstreamSubscription = downstream.expectSubscription()

View file

@ -11,7 +11,7 @@ import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.concurrent.duration._
import scala.util.control.NonFatal
import akka.stream.scaladsl.Flow
import akka.stream.GeneratorSettings
import akka.stream.MaterializerSettings
trait ScriptedTest extends ShouldMatchers {
@ -78,7 +78,7 @@ trait ScriptedTest extends ShouldMatchers {
class ScriptRunner[In, Out](
op: Flow[In] Flow[Out],
gen: GeneratorSettings,
gen: MaterializerSettings,
script: Script[In, Out],
maximumOverrun: Int,
maximumRequest: Int,
@ -186,7 +186,7 @@ trait ScriptedTest extends ShouldMatchers {
}
def runScript[In, Out](script: Script[In, Out], gen: GeneratorSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
def runScript[In, Out](script: Script[In, Out], gen: MaterializerSettings, maximumOverrun: Int = 3, maximumRequest: Int = 3, maximumBuffer: Int = 3)(
op: Flow[In] Flow[Out])(implicit system: ActorSystem): Unit = {
new ScriptRunner(op, gen, script, maximumOverrun, maximumRequest, maximumBuffer).run()
}