!str #15564 Add stream supervisor

* this is the first step, to enable safe FlowMaterializer that can be shared
* had to remove Log because of missing context (we need another solution for log)
* naming of the actors have changed, should be revisited
This commit is contained in:
Patrik Nordwall 2014-08-21 12:35:38 +02:00
parent 35f455d26d
commit e0bc669bc5
12 changed files with 97 additions and 392 deletions

View file

@ -10,6 +10,10 @@ import akka.stream.impl.Ast
import org.reactivestreams.{ Publisher, Subscriber }
import scala.concurrent.duration._
import akka.actor.Deploy
import akka.actor.ExtendedActorSystem
import akka.actor.ActorContext
import akka.stream.impl.StreamSupervisor
import akka.stream.impl.FlowNameCounter
object FlowMaterializer {
@ -24,8 +28,21 @@ object FlowMaterializer {
* the processing steps. The default `namePrefix` is `"flow"`. The actor names are built up of
* `namePrefix-flowNumber-flowStepNumber-stepName`.
*/
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer =
new ActorBasedFlowMaterializer(settings, context, namePrefix.getOrElse("flow"))
def apply(settings: MaterializerSettings, namePrefix: Option[String] = None)(implicit context: ActorRefFactory): FlowMaterializer = {
val system = context match {
case s: ExtendedActorSystem s
case c: ActorContext c.system
case null throw new IllegalArgumentException("ActorRefFactory context must be defined")
case _ throw new IllegalArgumentException(s"ActorRefFactory context must be a ActorSystem or ActorContext, " +
"got [${_contex.getClass.getName}]")
}
new ActorBasedFlowMaterializer(
settings,
context.actorOf(StreamSupervisor.props(settings).withDispatcher(settings.dispatcher)),
FlowNameCounter(system).counter,
namePrefix.getOrElse("flow"))
}
/**
* Java API: Creates a FlowMaterializer which will execute every step of a transformation