!str #15851 Rename FlowMaterializer and settings
* FlowMaterializer is now the actor independent interface * ActorFlowMaterializer is the actor based interface * MaterializerSettings renamed to ActorFlowMaterializerSettings * impl.ActorBasedFlowMaterializer renamed to impl.ActorFlowMaterializerImpl * Optimizations included in ActorFlowMaterializerSettings * Note that http is using FlowMaterializer in api, but I suspect that it will currently only run with a ActorFlowMaterializer
This commit is contained in:
parent
5b72928439
commit
cd9d503b03
146 changed files with 601 additions and 595 deletions
|
|
@ -5,12 +5,9 @@ package akka.stream
|
|||
|
||||
import java.util.Locale
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import akka.stream.impl._
|
||||
import akka.stream.scaladsl.Key
|
||||
|
||||
import scala.collection.immutable
|
||||
|
||||
import akka.actor.ActorContext
|
||||
import akka.actor.ActorRefFactory
|
||||
import akka.actor.ActorSystem
|
||||
|
|
@ -18,33 +15,34 @@ import akka.actor.ExtendedActorSystem
|
|||
import com.typesafe.config.Config
|
||||
import org.reactivestreams.Publisher
|
||||
import org.reactivestreams.Subscriber
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.Props
|
||||
import akka.actor.ActorRef
|
||||
|
||||
object FlowMaterializer {
|
||||
object ActorFlowMaterializer {
|
||||
|
||||
/**
|
||||
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Scala API: Creates a ActorFlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create one actor that in turn creates actors for the transformation steps.
|
||||
*
|
||||
* The materializer's [[akka.stream.MaterializerSettings]] will be obtained from the
|
||||
* The materializer's [[akka.stream.ActorFlowMaterializerSettings]] will be obtained from the
|
||||
* configuration of the `context`'s underlying [[akka.actor.ActorSystem]].
|
||||
*
|
||||
* The `namePrefix` is used as the first part of the names of the actors running
|
||||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def apply(materializerSettings: Option[MaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
|
||||
def apply(materializerSettings: Option[ActorFlowMaterializerSettings] = None, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): ActorFlowMaterializer = {
|
||||
val system = actorSystemOf(context)
|
||||
|
||||
val settings = materializerSettings getOrElse MaterializerSettings(system)
|
||||
val settings = materializerSettings getOrElse ActorFlowMaterializerSettings(system)
|
||||
apply(settings, namePrefix.getOrElse("flow"))(context)
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Scala API: Creates a ActorFlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
|
|
@ -54,20 +52,19 @@ object FlowMaterializer {
|
|||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def apply(materializerSettings: MaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): FlowMaterializer = {
|
||||
def apply(materializerSettings: ActorFlowMaterializerSettings, namePrefix: String)(implicit context: ActorRefFactory): ActorFlowMaterializer = {
|
||||
val system = actorSystemOf(context)
|
||||
|
||||
new ActorBasedFlowMaterializer(
|
||||
new ActorFlowMaterializerImpl(
|
||||
materializerSettings,
|
||||
system.dispatchers,
|
||||
context.actorOf(StreamSupervisor.props(materializerSettings).withDispatcher(materializerSettings.dispatcher)),
|
||||
FlowNameCounter(system).counter,
|
||||
namePrefix,
|
||||
optimizations = Optimizations.none)
|
||||
namePrefix)
|
||||
}
|
||||
|
||||
/**
|
||||
* Scala API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Scala API: Creates a ActorFlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
|
|
@ -77,11 +74,11 @@ object FlowMaterializer {
|
|||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def apply(materializerSettings: MaterializerSettings)(implicit context: ActorRefFactory): FlowMaterializer =
|
||||
def apply(materializerSettings: ActorFlowMaterializerSettings)(implicit context: ActorRefFactory): ActorFlowMaterializer =
|
||||
apply(Some(materializerSettings), None)
|
||||
|
||||
/**
|
||||
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Java API: Creates a ActorFlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
|
|
@ -90,20 +87,20 @@ object FlowMaterializer {
|
|||
* Defaults the actor name prefix used to name actors running the processing steps to `"flow"`.
|
||||
* The actor names are built up of `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def create(context: ActorRefFactory): FlowMaterializer =
|
||||
def create(context: ActorRefFactory): ActorFlowMaterializer =
|
||||
apply()(context)
|
||||
|
||||
/**
|
||||
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Java API: Creates a ActorFlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create one actor that in turn creates actors for the transformation steps.
|
||||
*/
|
||||
def create(settings: MaterializerSettings, context: ActorRefFactory): FlowMaterializer =
|
||||
def create(settings: ActorFlowMaterializerSettings, context: ActorRefFactory): ActorFlowMaterializer =
|
||||
apply(Option(settings), None)(context)
|
||||
|
||||
/**
|
||||
* Java API: Creates a FlowMaterializer which will execute every step of a transformation
|
||||
* Java API: Creates a ActorFlowMaterializer which will execute every step of a transformation
|
||||
* pipeline within its own [[akka.actor.Actor]]. The required [[akka.actor.ActorRefFactory]]
|
||||
* (which can be either an [[akka.actor.ActorSystem]] or an [[akka.actor.ActorContext]])
|
||||
* will be used to create these actors, therefore it is *forbidden* to pass this object
|
||||
|
|
@ -113,7 +110,7 @@ object FlowMaterializer {
|
|||
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
|
||||
* `namePrefix-flowNumber-flowStepNumber-stepName`.
|
||||
*/
|
||||
def create(settings: MaterializerSettings, context: ActorRefFactory, namePrefix: String): FlowMaterializer =
|
||||
def create(settings: ActorFlowMaterializerSettings, context: ActorRefFactory, namePrefix: String): ActorFlowMaterializer =
|
||||
apply(Option(settings), Option(namePrefix))(context)
|
||||
|
||||
private def actorSystemOf(context: ActorRefFactory): ActorSystem = {
|
||||
|
|
@ -130,13 +127,24 @@ object FlowMaterializer {
|
|||
}
|
||||
|
||||
/**
|
||||
* A FlowMaterializer takes the list of transformations comprising a
|
||||
* A ActorFlowMaterializer takes the list of transformations comprising a
|
||||
* [[akka.stream.scaladsl.Flow]] and materializes them in the form of
|
||||
* [[org.reactivestreams.Processor]] instances. How transformation
|
||||
* steps are split up into asynchronous regions is implementation
|
||||
* dependent.
|
||||
*/
|
||||
abstract class FlowMaterializer(val settings: MaterializerSettings) {
|
||||
abstract class ActorFlowMaterializer extends FlowMaterializer {
|
||||
|
||||
def settings: ActorFlowMaterializerSettings
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] def actorOf(props: Props, name: String): ActorRef
|
||||
|
||||
}
|
||||
|
||||
abstract class FlowMaterializer {
|
||||
|
||||
/**
|
||||
* The `namePrefix` shall be used for deriving the names of processing
|
||||
|
|
@ -161,69 +169,71 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) {
|
|||
|
||||
}
|
||||
|
||||
object MaterializerSettings {
|
||||
/**
|
||||
* Create [[MaterializerSettings]].
|
||||
*
|
||||
* You can refine the configuration based settings using [[MaterializerSettings#withInputBuffer]],
|
||||
* [[MaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def apply(system: ActorSystem): MaterializerSettings =
|
||||
apply(system.settings.config.getConfig("akka.stream.materializer"))
|
||||
|
||||
/**
|
||||
* Create [[MaterializerSettings]].
|
||||
*
|
||||
* You can refine the configuration based settings using [[MaterializerSettings#withInputBuffer]],
|
||||
* [[MaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def apply(config: Config): MaterializerSettings =
|
||||
MaterializerSettings(
|
||||
config.getInt("initial-input-buffer-size"),
|
||||
config.getInt("max-input-buffer-size"),
|
||||
config.getString("dispatcher"),
|
||||
StreamSubscriptionTimeoutSettings(config),
|
||||
config.getString("file-io-dispatcher"),
|
||||
config.getBoolean("debug-logging"))
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*
|
||||
* You can refine the configuration based settings using [[MaterializerSettings#withInputBuffer]],
|
||||
* [[MaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def create(system: ActorSystem): MaterializerSettings =
|
||||
apply(system)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*
|
||||
* You can refine the configuration based settings using [[MaterializerSettings#withInputBuffer]],
|
||||
* [[MaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def create(config: Config): MaterializerSettings =
|
||||
apply(config)
|
||||
}
|
||||
|
||||
/**
|
||||
* This exception or subtypes thereof should be used to signal materialization
|
||||
* failures.
|
||||
*/
|
||||
class MaterializationException(msg: String, cause: Throwable = null) extends RuntimeException(msg, cause)
|
||||
|
||||
object ActorFlowMaterializerSettings {
|
||||
/**
|
||||
* Create [[ActorFlowMaterializerSettings]].
|
||||
*
|
||||
* You can refine the configuration based settings using [[ActorFlowMaterializerSettings#withInputBuffer]],
|
||||
* [[ActorFlowMaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def apply(system: ActorSystem): ActorFlowMaterializerSettings =
|
||||
apply(system.settings.config.getConfig("akka.stream.materializer"))
|
||||
|
||||
/**
|
||||
* Create [[ActorFlowMaterializerSettings]].
|
||||
*
|
||||
* You can refine the configuration based settings using [[ActorFlowMaterializerSettings#withInputBuffer]],
|
||||
* [[ActorFlowMaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def apply(config: Config): ActorFlowMaterializerSettings =
|
||||
ActorFlowMaterializerSettings(
|
||||
initialInputBufferSize = config.getInt("initial-input-buffer-size"),
|
||||
maxInputBufferSize = config.getInt("max-input-buffer-size"),
|
||||
dispatcher = config.getString("dispatcher"),
|
||||
subscriptionTimeoutSettings = StreamSubscriptionTimeoutSettings(config),
|
||||
fileIODispatcher = config.getString("file-io-dispatcher"),
|
||||
debugLogging = config.getBoolean("debug-logging"),
|
||||
optimizations = Optimizations.none)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*
|
||||
* You can refine the configuration based settings using [[ActorFlowMaterializerSettings#withInputBuffer]],
|
||||
* [[ActorFlowMaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def create(system: ActorSystem): ActorFlowMaterializerSettings =
|
||||
apply(system)
|
||||
|
||||
/**
|
||||
* Java API
|
||||
*
|
||||
* You can refine the configuration based settings using [[ActorFlowMaterializerSettings#withInputBuffer]],
|
||||
* [[ActorFlowMaterializerSettings#withDispatcher]]
|
||||
*/
|
||||
def create(config: Config): ActorFlowMaterializerSettings =
|
||||
apply(config)
|
||||
}
|
||||
|
||||
/**
|
||||
* The buffers employed by the generated Processors can be configured by
|
||||
* creating an appropriate instance of this class.
|
||||
*
|
||||
* This will likely be replaced in the future by auto-tuning these values at runtime.
|
||||
*/
|
||||
final case class MaterializerSettings(
|
||||
final case class ActorFlowMaterializerSettings(
|
||||
initialInputBufferSize: Int,
|
||||
maxInputBufferSize: Int,
|
||||
dispatcher: String,
|
||||
subscriptionTimeoutSettings: StreamSubscriptionTimeoutSettings,
|
||||
fileIODispatcher: String, // FIXME Why does this exist?!
|
||||
debugLogging: Boolean) {
|
||||
debugLogging: Boolean,
|
||||
optimizations: Optimizations) {
|
||||
|
||||
require(initialInputBufferSize > 0, "initialInputBufferSize must be > 0")
|
||||
|
||||
|
|
@ -231,15 +241,18 @@ final case class MaterializerSettings(
|
|||
require(isPowerOfTwo(maxInputBufferSize), "maxInputBufferSize must be a power of two")
|
||||
require(initialInputBufferSize <= maxInputBufferSize, s"initialInputBufferSize($initialInputBufferSize) must be <= maxInputBufferSize($maxInputBufferSize)")
|
||||
|
||||
def withInputBuffer(initialSize: Int, maxSize: Int): MaterializerSettings =
|
||||
def withInputBuffer(initialSize: Int, maxSize: Int): ActorFlowMaterializerSettings =
|
||||
copy(initialInputBufferSize = initialSize, maxInputBufferSize = maxSize)
|
||||
|
||||
def withDispatcher(dispatcher: String): MaterializerSettings =
|
||||
def withDispatcher(dispatcher: String): ActorFlowMaterializerSettings =
|
||||
copy(dispatcher = dispatcher)
|
||||
|
||||
def withDebugLogging(enable: Boolean): MaterializerSettings =
|
||||
def withDebugLogging(enable: Boolean): ActorFlowMaterializerSettings =
|
||||
copy(debugLogging = enable)
|
||||
|
||||
def withOptimizations(optimizations: Optimizations): ActorFlowMaterializerSettings =
|
||||
copy(optimizations = optimizations)
|
||||
|
||||
private def isPowerOfTwo(n: Integer): Boolean = (n & (n - 1)) == 0 // FIXME this considers 0 a power of 2
|
||||
}
|
||||
|
||||
|
|
@ -278,3 +291,12 @@ object StreamSubscriptionTimeoutTerminationMode {
|
|||
def cancel = CancelTermination
|
||||
|
||||
}
|
||||
|
||||
final object Optimizations {
|
||||
val none: Optimizations = Optimizations(collapsing = false, elision = false, simplification = false, fusion = false)
|
||||
val all: Optimizations = Optimizations(collapsing = true, elision = true, simplification = true, fusion = true)
|
||||
}
|
||||
|
||||
final case class Optimizations(collapsing: Boolean, elision: Boolean, simplification: Boolean, fusion: Boolean) {
|
||||
def isEnabled: Boolean = collapsing || elision || simplification || fusion
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue