pekko/akka-stream/src/main/scala/akka/stream/FlowMaterializer.scala

231 lines
10 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream
import akka.actor._
import akka.stream.impl.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor }
import com.typesafe.config.Config
import org.reactivestreams.{ Publisher, Subscriber }
object FlowMaterializer {
2014-04-01 19:35:56 +02:00
/**
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
2014-04-01 19:35:56 +02:00
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
2014-04-01 19:35:56 +02:00
* will be used to create these actors, therefore it is *forbidden* to pass this object
2014-04-01 20:59:49 +02:00
* to another actor if the factory is an ActorContext.
*
* The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the
* configuration of the `context`'s underlying [[akka.actor.ActorSystem]].
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
2014-04-01 19:35:56 +02:00
*/
def apply(materializerSettings: Option[MaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = actorSystemOf(context)
val settings = materializerSettings getOrElse MaterializerSettings(system)
apply(settings, namePrefix.getOrElse("flow"))(context)
}
/**
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(materializerSettings: MaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = actorSystemOf(context)
new ActorBasedFlowMaterializer(
materializerSettings,
context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)),
FlowNameCounter(system).counter,
namePrefix)
}
/**
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(materializerSettings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer =
apply(Some(materializerSettings), None)
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create(context: ActorRefFactory): FlowMaterializer =
apply()(context)
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*/
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
apply(Option(settings), None)(context)
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
* will be used to create these actors, therefore it is *forbidden* to pass this object
* to another actor if the factory is an ActorContext.
*
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def create(settings: MaterializerSettings, context: ActorRefFactory, namePrefix: String): FlowMaterializer =
apply(Option(settings), Option(namePrefix))(context)
private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
val system = context match {
case s: ExtendedActorSystem s
case c: ActorContext c.system
case null throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _
throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, got [${context.getClass.getName}]")
}
system
}
}
2014-04-01 19:35:56 +02:00
/**
* A FlowMaterializer takes the list of transformations comprising a
2014-04-01 19:35:56 +02:00
* [[akka.stream.scaladsl.Flow]] and materializes them in the form of
* [[org.reactivestreams.Processor]] instances. How transformation
2014-04-01 19:35:56 +02:00
* steps are split up into asynchronous regions is implementation
* dependent.
*/
abstract class FlowMaterializer(val settings: MaterializerSettings) {
/**
* The `namePrefix` is used as the first part of the names of the actors running
* the processing steps.
*/
def withNamePrefix(name: String): FlowMaterializer
/**
* INTERNAL API
* ops are stored in reverse order
*/
private[akka] def toPublisher[I, O](publisherNode: Ast.PublisherNode[I], ops: List[Ast.AstNode]): Publisher[O]
/**
* INTERNAL API
*/
private[akka] def ductProduceTo[In, Out](subscriber: Subscriber[Out], ops: List[Ast.AstNode]): Subscriber[In]
/**
* INTERNAL API
*/
private[akka] def ductBuild[In, Out](ops: List[Ast.AstNode]): (Subscriber[In], Publisher[Out])
}
object MaterializerSettings {
/**
* Create [[MaterializerSettings]].
*
* You can refine the configuration based settings using [[MaterializerSettings#withBuffer]],
* [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]]
*/
def apply(system: ActorSystem): MaterializerSettings =
apply(system.settings.config.getConfig("akka.stream.materializer"))
/**
* Create [[MaterializerSettings]].
*
* You can refine the configuration based settings using [[MaterializerSettings#withBuffer]],
* [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]]
*/
def apply(config: Config): MaterializerSettings =
MaterializerSettings(
config.getInt("initial-input-buffer-size"),
config.getInt("max-input-buffer-size"),
config.getInt("initial-fan-out-buffer-size"),
config.getInt("max-fan-out-buffer-size"),
config.getString("dispatcher"))
/**
* Java API
*
* You can refine the configuration based settings using [[MaterializerSettings#withBuffer]],
* [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]]
*/
def create(system: ActorSystem): MaterializerSettings =
apply(system)
/**
* Java API
*
* You can refine the configuration based settings using [[MaterializerSettings#withBuffer]],
* [[MaterializerSettings#withFanOut]], [[MaterializerSettings#withSubscriptionTimeout]]
*/
def create(config: Config): MaterializerSettings =
apply(config)
}
2014-04-01 19:35:56 +02:00
/**
* The buffers employed by the generated Processors can be configured by
* creating an appropriate instance of this class.
*
2014-04-01 19:35:56 +02:00
* This will likely be replaced in the future by auto-tuning these values at runtime.
*/
final case class MaterializerSettings(
initialInputBufferSize: Int,
maxInputBufferSize: Int,
initialFanOutBufferSize: Int,
maxFanOutBufferSize: Int,
dispatcher: String) {
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
require(maxInputBufferSize > 0, "maxInputBufferSize must be > 0")
require(isPowerOfTwo(maxInputBufferSize), "maxInputBufferSize must be a power of two")
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
require(initialFanOutBufferSize > 0, "initialFanOutBufferSize must be > 0")
require(maxFanOutBufferSize > 0, "maxFanOutBufferSize must be > 0")
require(isPowerOfTwo(maxFanOutBufferSize), "maxFanOutBufferSize must be a power of two")
require(initialFanOutBufferSize <= maxFanOutBufferSize, s"initialFanOutBufferSize($initialFanOutBufferSize) must be <= maxFanOutBufferSize($maxFanOutBufferSize)")
def withInputBuffer(initialSize: Int, maxSize: Int): MaterializerSettings =
copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize)
def withFanOutBuffer(initialSize: Int, maxSize: Int): MaterializerSettings =
copy(initialFanOutBufferSize = initialSize, maxFanOutBufferSize = maxSize)
def withDispatcher(dispatcher: String): MaterializerSettings =
copy(dispatcher = dispatcher)
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0
}