2014-03-30 09:27:19 +02:00
|
|
|
/**
|
2016-02-23 12:58:39 +01:00
|
|
|
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
|
2014-03-30 09:27:19 +02:00
|
|
|
*/
|
|
|
|
|
package akka.stream.impl
|
|
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
import java.util.concurrent.atomic.AtomicBoolean
|
2015-12-14 17:02:00 +01:00
|
|
|
import java.{ util ⇒ ju }
|
2016-05-03 18:58:26 -07:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
import akka.NotUsed
|
2014-11-17 22:50:15 +01:00
|
|
|
import akka.actor._
|
2016-05-03 18:58:26 -07:00
|
|
|
import akka.event.{ Logging, LoggingAdapter }
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.dispatch.Dispatchers
|
|
|
|
|
import akka.pattern.ask
|
2015-10-30 22:13:10 -04:00
|
|
|
import akka.stream._
|
2016-05-03 18:58:26 -07:00
|
|
|
import akka.stream.impl.StreamLayout.{ AtomicModule, Module }
|
2015-10-31 14:46:10 +01:00
|
|
|
import akka.stream.impl.fusing.{ ActorGraphInterpreter, GraphModule }
|
2016-02-16 18:19:30 +01:00
|
|
|
import akka.stream.impl.io.TLSActor
|
|
|
|
|
import akka.stream.impl.io.TlsModule
|
2015-01-28 14:19:50 +01:00
|
|
|
import org.reactivestreams._
|
2016-05-03 18:58:26 -07:00
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
import scala.concurrent.duration.FiniteDuration
|
2015-02-26 11:58:29 +01:00
|
|
|
import scala.concurrent.{ Await, ExecutionContextExecutor }
|
2015-12-14 17:02:00 +01:00
|
|
|
import akka.stream.impl.fusing.GraphStageModule
|
|
|
|
|
import akka.stream.impl.fusing.GraphInterpreter.GraphAssembly
|
|
|
|
|
import akka.stream.impl.fusing.Fusing
|
2015-12-17 13:35:37 +01:00
|
|
|
import akka.stream.impl.fusing.GraphInterpreterShell
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
/**
|
|
|
|
|
* ExtendedActorMaterializer used by subtypes which materializer using GraphInterpreterShell
|
|
|
|
|
*/
|
|
|
|
|
abstract class ExtendedActorMaterializer extends ActorMaterializer {
|
|
|
|
|
|
|
|
|
|
override def withNamePrefix(name: String): ExtendedActorMaterializer
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
def materialize[Mat](
|
|
|
|
|
_runnableGraph: Graph[ClosedShape, Mat],
|
|
|
|
|
subflowFuser: GraphInterpreterShell ⇒ ActorRef): Mat
|
|
|
|
|
|
2016-11-09 20:14:04 +01:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
def materialize[Mat](
|
|
|
|
|
_runnableGraph: Graph[ClosedShape, Mat],
|
|
|
|
|
subflowFuser: GraphInterpreterShell ⇒ ActorRef,
|
|
|
|
|
initialAttributes: Attributes): Mat
|
|
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
override def actorOf(context: MaterializationContext, props: Props): ActorRef = {
|
|
|
|
|
val dispatcher =
|
|
|
|
|
if (props.deploy.dispatcher == Deploy.NoDispatcherGiven) effectiveSettings(context.effectiveAttributes).dispatcher
|
|
|
|
|
else props.dispatcher
|
|
|
|
|
actorOf(props, context.stageName, dispatcher)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
protected def actorOf(props: Props, name: String, dispatcher: String): ActorRef = {
|
|
|
|
|
supervisor match {
|
|
|
|
|
case ref: LocalActorRef ⇒
|
|
|
|
|
ref.underlying.attachChild(props.withDispatcher(dispatcher), name, systemService = false)
|
|
|
|
|
case ref: RepointableActorRef ⇒
|
|
|
|
|
if (ref.isStarted)
|
|
|
|
|
ref.underlying.asInstanceOf[ActorCell].attachChild(props.withDispatcher(dispatcher), name, systemService = false)
|
|
|
|
|
else {
|
|
|
|
|
implicit val timeout = ref.system.settings.CreationTimeout
|
|
|
|
|
val f = (supervisor ? StreamSupervisor.Materialize(props.withDispatcher(dispatcher), name)).mapTo[ActorRef]
|
|
|
|
|
Await.result(f, timeout.duration)
|
|
|
|
|
}
|
|
|
|
|
case unknown ⇒
|
|
|
|
|
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${unknown.getClass.getName}]")
|
|
|
|
|
}
|
|
|
|
|
}
|
2016-05-03 18:58:26 -07:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
override def logger: LoggingAdapter
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
|
|
|
|
override def supervisor: ActorRef
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
2014-03-30 09:27:19 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-06-02 14:06:57 +02:00
|
|
|
private[akka] case class ActorMaterializerImpl(
|
|
|
|
|
system: ActorSystem,
|
|
|
|
|
override val settings: ActorMaterializerSettings,
|
|
|
|
|
dispatchers: Dispatchers,
|
|
|
|
|
supervisor: ActorRef,
|
|
|
|
|
haveShutDown: AtomicBoolean,
|
2016-05-03 18:58:26 -07:00
|
|
|
flowNames: SeqActorName) extends ExtendedActorMaterializer {
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream.impl.Stages._
|
2015-11-23 15:57:09 +01:00
|
|
|
private val _logger = Logging.getLogger(system, this)
|
|
|
|
|
override def logger = _logger
|
2015-11-09 16:19:12 +01:00
|
|
|
|
2016-02-12 08:28:16 +01:00
|
|
|
if (settings.fuzzingMode && !system.settings.config.hasPath("akka.stream.secret-test-fuzzing-warning-disable")) {
|
2015-11-23 15:57:09 +01:00
|
|
|
_logger.warning("Fuzzing mode is enabled on this system. If you see this warning on your production system then " +
|
2016-01-06 23:47:41 +01:00
|
|
|
"set akka.stream.materializer.debug.fuzzing-mode to off.")
|
2015-11-09 16:19:12 +01:00
|
|
|
}
|
2014-10-27 14:35:41 +01:00
|
|
|
|
2015-06-19 17:15:50 +02:00
|
|
|
override def shutdown(): Unit =
|
|
|
|
|
if (haveShutDown.compareAndSet(false, true)) supervisor ! PoisonPill
|
|
|
|
|
|
|
|
|
|
override def isShutdown: Boolean = haveShutDown.get()
|
|
|
|
|
|
2015-12-17 13:35:37 +01:00
|
|
|
override def withNamePrefix(name: String): ActorMaterializerImpl = this.copy(flowNames = flowNames.copy(name))
|
2014-05-08 19:34:58 +02:00
|
|
|
|
2015-11-23 15:50:58 +01:00
|
|
|
private[this] def createFlowName(): String = flowNames.next()
|
2014-05-08 19:34:58 +02:00
|
|
|
|
2016-11-09 20:14:04 +01:00
|
|
|
private val defaultInitialAttributes = Attributes(
|
2015-10-31 14:46:10 +01:00
|
|
|
Attributes.InputBuffer(settings.initialInputBufferSize, settings.maxInputBufferSize) ::
|
|
|
|
|
ActorAttributes.Dispatcher(settings.dispatcher) ::
|
|
|
|
|
ActorAttributes.SupervisionStrategy(settings.supervisionDecider) ::
|
|
|
|
|
Nil)
|
|
|
|
|
|
2015-06-23 18:28:53 +02:00
|
|
|
override def effectiveSettings(opAttr: Attributes): ActorMaterializerSettings = {
|
2015-06-23 17:32:55 +02:00
|
|
|
import Attributes._
|
|
|
|
|
import ActorAttributes._
|
|
|
|
|
opAttr.attributeList.foldLeft(settings) { (s, attr) ⇒
|
2015-04-10 14:39:48 +02:00
|
|
|
attr match {
|
|
|
|
|
case InputBuffer(initial, max) ⇒ s.withInputBuffer(initial, max)
|
|
|
|
|
case Dispatcher(dispatcher) ⇒ s.withDispatcher(dispatcher)
|
|
|
|
|
case SupervisionStrategy(decider) ⇒ s.withSupervisionStrategy(decider)
|
2015-11-11 21:29:38 +01:00
|
|
|
case _ ⇒ s
|
2015-04-10 14:39:48 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2015-09-11 15:50:17 +02:00
|
|
|
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable) =
|
|
|
|
|
system.scheduler.schedule(initialDelay, interval, task)(executionContext)
|
|
|
|
|
|
|
|
|
|
override def scheduleOnce(delay: FiniteDuration, task: Runnable) =
|
|
|
|
|
system.scheduler.scheduleOnce(delay, task)(executionContext)
|
|
|
|
|
|
2015-12-17 13:35:37 +01:00
|
|
|
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat]): Mat =
|
2016-11-09 20:14:04 +01:00
|
|
|
materialize(_runnableGraph, null, defaultInitialAttributes)
|
|
|
|
|
|
|
|
|
|
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat =
|
|
|
|
|
materialize(_runnableGraph, null, initialAttributes)
|
|
|
|
|
|
|
|
|
|
override def materialize[Mat](_runnableGraph: Graph[ClosedShape, Mat], subflowFuser: (GraphInterpreterShell) ⇒ ActorRef): Mat =
|
|
|
|
|
materialize(_runnableGraph, subflowFuser, defaultInitialAttributes)
|
2015-12-17 13:35:37 +01:00
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
override def materialize[Mat](
|
2016-11-09 20:14:04 +01:00
|
|
|
_runnableGraph: Graph[ClosedShape, Mat],
|
|
|
|
|
subflowFuser: GraphInterpreterShell ⇒ ActorRef,
|
|
|
|
|
initialAttributes: Attributes
|
|
|
|
|
): Mat = {
|
2015-12-14 17:02:00 +01:00
|
|
|
val runnableGraph =
|
|
|
|
|
if (settings.autoFusing) Fusing.aggressive(_runnableGraph)
|
|
|
|
|
else _runnableGraph
|
|
|
|
|
|
2015-06-19 17:15:50 +02:00
|
|
|
if (haveShutDown.get())
|
|
|
|
|
throw new IllegalStateException("Attempted to call materialize() after the ActorMaterializer has been shut down.")
|
2015-07-06 22:00:21 +02:00
|
|
|
if (StreamLayout.Debug) StreamLayout.validate(runnableGraph.module)
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2015-10-31 14:46:10 +01:00
|
|
|
val session = new MaterializerSession(runnableGraph.module, initialAttributes) {
|
2015-01-28 14:19:50 +01:00
|
|
|
private val flowName = createFlowName()
|
|
|
|
|
private var nextId = 0
|
2015-06-23 17:32:55 +02:00
|
|
|
private def stageName(attr: Attributes): String = {
|
2015-04-10 16:49:49 +02:00
|
|
|
val name = s"$flowName-$nextId-${attr.nameOrDefault()}"
|
2015-01-28 14:19:50 +01:00
|
|
|
nextId += 1
|
|
|
|
|
name
|
|
|
|
|
}
|
2014-11-09 21:09:50 +01:00
|
|
|
|
2016-03-11 17:08:30 +01:00
|
|
|
override protected def materializeAtomic(atomic: AtomicModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
|
|
|
|
|
if (MaterializerSession.Debug) println(s"materializing $atomic")
|
2015-02-26 22:42:34 +01:00
|
|
|
|
2015-06-14 03:12:30 -04:00
|
|
|
def newMaterializationContext() =
|
|
|
|
|
new MaterializationContext(ActorMaterializerImpl.this, effectiveAttributes, stageName(effectiveAttributes))
|
2015-02-26 22:42:34 +01:00
|
|
|
atomic match {
|
|
|
|
|
case sink: SinkModule[_, _] ⇒
|
2015-04-10 14:39:48 +02:00
|
|
|
val (sub, mat) = sink.create(newMaterializationContext())
|
2016-02-24 11:55:28 +01:00
|
|
|
assignPort(sink.shape.in, sub)
|
2015-12-14 17:02:00 +01:00
|
|
|
matVal.put(atomic, mat)
|
2015-02-26 22:42:34 +01:00
|
|
|
case source: SourceModule[_, _] ⇒
|
2015-04-10 14:39:48 +02:00
|
|
|
val (pub, mat) = source.create(newMaterializationContext())
|
2015-12-14 14:52:06 +01:00
|
|
|
assignPort(source.shape.out, pub.asInstanceOf[Publisher[Any]])
|
2015-12-14 17:02:00 +01:00
|
|
|
matVal.put(atomic, mat)
|
2015-02-26 22:42:34 +01:00
|
|
|
|
2016-05-23 11:31:49 +03:00
|
|
|
case stage: ProcessorModule[_, _, _] ⇒
|
|
|
|
|
val (processor, mat) = stage.createProcessor()
|
2015-02-26 22:42:34 +01:00
|
|
|
assignPort(stage.inPort, processor)
|
2016-05-23 11:31:49 +03:00
|
|
|
assignPort(stage.outPort, processor.asInstanceOf[Publisher[Any]])
|
2015-12-14 17:02:00 +01:00
|
|
|
matVal.put(atomic, mat)
|
2015-02-26 22:42:34 +01:00
|
|
|
|
2015-06-14 03:12:30 -04:00
|
|
|
case tls: TlsModule ⇒ // TODO solve this so TlsModule doesn't need special treatment here
|
2015-04-20 16:33:57 +02:00
|
|
|
val es = effectiveSettings(effectiveAttributes)
|
2015-06-14 03:12:30 -04:00
|
|
|
val props =
|
2016-06-02 13:13:11 +02:00
|
|
|
TLSActor.props(es, tls.sslContext, tls.sslConfig, tls.firstSession, tls.role, tls.closing, tls.hostInfo)
|
2015-04-20 16:33:57 +02:00
|
|
|
val impl = actorOf(props, stageName(effectiveAttributes), es.dispatcher)
|
|
|
|
|
def factory(id: Int) = new ActorPublisher[Any](impl) {
|
|
|
|
|
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
|
|
|
|
|
}
|
|
|
|
|
val publishers = Vector.tabulate(2)(factory)
|
|
|
|
|
impl ! FanOut.ExposedPublishers(publishers)
|
|
|
|
|
|
2016-02-16 18:19:30 +01:00
|
|
|
assignPort(tls.plainOut, publishers(TLSActor.UserOut))
|
|
|
|
|
assignPort(tls.cipherOut, publishers(TLSActor.TransportOut))
|
2015-04-20 16:33:57 +02:00
|
|
|
|
2016-02-16 18:19:30 +01:00
|
|
|
assignPort(tls.plainIn, FanIn.SubInput[Any](impl, TLSActor.UserIn))
|
|
|
|
|
assignPort(tls.cipherIn, FanIn.SubInput[Any](impl, TLSActor.TransportIn))
|
2015-04-20 16:33:57 +02:00
|
|
|
|
2016-01-20 10:00:37 +02:00
|
|
|
matVal.put(atomic, NotUsed)
|
2015-12-14 17:02:00 +01:00
|
|
|
|
2015-08-19 15:22:02 +02:00
|
|
|
case graph: GraphModule ⇒
|
2015-12-14 17:02:00 +01:00
|
|
|
matGraph(graph, effectiveAttributes, matVal)
|
2015-09-18 14:30:43 +02:00
|
|
|
|
2015-12-14 17:02:00 +01:00
|
|
|
case stage: GraphStageModule ⇒
|
|
|
|
|
val graph =
|
2016-06-02 14:06:57 +02:00
|
|
|
GraphModule(
|
|
|
|
|
GraphAssembly(stage.shape.inlets, stage.shape.outlets, stage.stage),
|
2015-12-14 17:02:00 +01:00
|
|
|
stage.shape, stage.attributes, Array(stage))
|
|
|
|
|
matGraph(graph, effectiveAttributes, matVal)
|
|
|
|
|
}
|
|
|
|
|
}
|
2015-09-18 14:30:43 +02:00
|
|
|
|
2015-12-14 17:02:00 +01:00
|
|
|
private def matGraph(graph: GraphModule, effectiveAttributes: Attributes, matVal: ju.Map[Module, Any]): Unit = {
|
|
|
|
|
val calculatedSettings = effectiveSettings(effectiveAttributes)
|
2016-07-22 15:39:37 +02:00
|
|
|
val (connections, logics) = graph.assembly.materialize(effectiveAttributes, graph.matValIDs, matVal, registerSrc)
|
2015-12-14 17:02:00 +01:00
|
|
|
|
2016-07-22 15:39:37 +02:00
|
|
|
val shell = new GraphInterpreterShell(graph.assembly, connections, logics, graph.shape,
|
2015-12-17 13:35:37 +01:00
|
|
|
calculatedSettings, ActorMaterializerImpl.this)
|
|
|
|
|
|
|
|
|
|
val impl =
|
|
|
|
|
if (subflowFuser != null && !effectiveAttributes.contains(Attributes.AsyncBoundary)) {
|
|
|
|
|
subflowFuser(shell)
|
|
|
|
|
} else {
|
|
|
|
|
val props = ActorGraphInterpreter.props(shell)
|
|
|
|
|
actorOf(props, stageName(effectiveAttributes), calculatedSettings.dispatcher)
|
|
|
|
|
}
|
2015-12-14 17:02:00 +01:00
|
|
|
|
|
|
|
|
for ((inlet, i) ← graph.shape.inlets.iterator.zipWithIndex) {
|
2015-12-17 13:35:37 +01:00
|
|
|
val subscriber = new ActorGraphInterpreter.BoundarySubscriber(impl, shell, i)
|
2015-12-14 17:02:00 +01:00
|
|
|
assignPort(inlet, subscriber)
|
|
|
|
|
}
|
|
|
|
|
for ((outlet, i) ← graph.shape.outlets.iterator.zipWithIndex) {
|
2015-12-17 13:35:37 +01:00
|
|
|
val publisher = new ActorGraphInterpreter.BoundaryPublisher(impl, shell, i)
|
|
|
|
|
impl ! ActorGraphInterpreter.ExposedPublisher(shell, i, publisher)
|
2015-12-14 17:02:00 +01:00
|
|
|
assignPort(outlet, publisher)
|
2015-02-26 22:42:34 +01:00
|
|
|
}
|
2014-11-09 21:09:50 +01:00
|
|
|
}
|
2014-03-30 09:27:19 +02:00
|
|
|
|
2014-11-28 10:41:57 +01:00
|
|
|
}
|
2015-01-28 14:19:50 +01:00
|
|
|
|
|
|
|
|
session.materialize().asInstanceOf[Mat]
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
2014-05-07 15:56:02 +02:00
|
|
|
|
2016-10-28 16:05:56 +02:00
|
|
|
override def makeLogger(logSource: Class[_]): LoggingAdapter =
|
|
|
|
|
Logging(system, logSource)
|
|
|
|
|
|
2015-04-10 14:39:48 +02:00
|
|
|
override lazy val executionContext: ExecutionContextExecutor = dispatchers.lookup(settings.dispatcher match {
|
2014-11-17 22:50:15 +01:00
|
|
|
case Deploy.NoDispatcherGiven ⇒ Dispatchers.DefaultDispatcherId
|
|
|
|
|
case other ⇒ other
|
|
|
|
|
})
|
|
|
|
|
|
2014-03-30 09:27:19 +02:00
|
|
|
}
|
2014-05-08 19:34:58 +02:00
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
private[akka] class SubFusingActorMaterializerImpl(val delegate: ExtendedActorMaterializer, registerShell: GraphInterpreterShell ⇒ ActorRef) extends Materializer {
|
2015-12-17 13:35:37 +01:00
|
|
|
override def executionContext: ExecutionContextExecutor = delegate.executionContext
|
|
|
|
|
|
|
|
|
|
override def materialize[Mat](runnable: Graph[ClosedShape, Mat]): Mat = delegate.materialize(runnable, registerShell)
|
|
|
|
|
|
2016-11-09 20:14:04 +01:00
|
|
|
override def materialize[Mat](runnable: Graph[ClosedShape, Mat], initialAttributes: Attributes): Mat =
|
|
|
|
|
delegate.materialize(runnable, registerShell, initialAttributes)
|
|
|
|
|
|
2015-12-17 13:35:37 +01:00
|
|
|
override def scheduleOnce(delay: FiniteDuration, task: Runnable): Cancellable = delegate.scheduleOnce(delay, task)
|
|
|
|
|
|
|
|
|
|
override def schedulePeriodically(initialDelay: FiniteDuration, interval: FiniteDuration, task: Runnable): Cancellable =
|
|
|
|
|
delegate.schedulePeriodically(initialDelay, interval, task)
|
|
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
override def withNamePrefix(name: String): SubFusingActorMaterializerImpl =
|
2015-12-17 13:35:37 +01:00
|
|
|
new SubFusingActorMaterializerImpl(delegate.withNamePrefix(name), registerShell)
|
|
|
|
|
}
|
|
|
|
|
|
2014-05-08 19:34:58 +02:00
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
object FlowNames extends ExtensionId[FlowNames] with ExtensionIdProvider {
|
2015-11-23 15:50:58 +01:00
|
|
|
override def get(system: ActorSystem): FlowNames = super.get(system)
|
|
|
|
|
override def lookup() = FlowNames
|
|
|
|
|
override def createExtension(system: ExtendedActorSystem): FlowNames = new FlowNames
|
2014-05-08 19:34:58 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
class FlowNames extends Extension {
|
2015-11-23 15:57:09 +01:00
|
|
|
val name = SeqActorName("Flow")
|
2014-08-21 12:35:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* INTERNAL API
|
|
|
|
|
*/
|
2016-05-03 18:58:26 -07:00
|
|
|
object StreamSupervisor {
|
2015-06-25 12:54:29 +02:00
|
|
|
def props(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean): Props =
|
|
|
|
|
Props(new StreamSupervisor(settings, haveShutDown)).withDeploy(Deploy.local)
|
2016-07-22 04:03:26 -04:00
|
|
|
private[stream] val baseName = "StreamSupervisor"
|
|
|
|
|
private val actorName = SeqActorName(baseName)
|
2015-11-19 17:19:45 +01:00
|
|
|
def nextName(): String = actorName.next()
|
|
|
|
|
|
2015-06-14 03:12:30 -04:00
|
|
|
final case class Materialize(props: Props, name: String)
|
|
|
|
|
extends DeadLetterSuppression with NoSerializationVerificationNeeded
|
2015-04-16 02:24:01 +02:00
|
|
|
|
|
|
|
|
/** Testing purpose */
|
2015-08-01 00:13:14 +02:00
|
|
|
case object GetChildren
|
2015-04-16 02:24:01 +02:00
|
|
|
/** Testing purpose */
|
|
|
|
|
final case class Children(children: Set[ActorRef])
|
|
|
|
|
/** Testing purpose */
|
2015-08-01 00:13:14 +02:00
|
|
|
case object StopChildren
|
2015-05-14 12:21:47 +02:00
|
|
|
/** Testing purpose */
|
2015-08-01 00:13:14 +02:00
|
|
|
case object StoppedChildren
|
2015-12-20 12:54:05 +01:00
|
|
|
/** Testing purpose */
|
|
|
|
|
case object PrintDebugDump
|
2014-08-21 12:35:38 +02:00
|
|
|
}
|
|
|
|
|
|
2016-05-03 18:58:26 -07:00
|
|
|
class StreamSupervisor(settings: ActorMaterializerSettings, haveShutDown: AtomicBoolean) extends Actor {
|
2015-01-28 14:19:50 +01:00
|
|
|
import akka.stream.impl.StreamSupervisor._
|
2014-08-21 12:35:38 +02:00
|
|
|
|
2014-10-27 14:35:41 +01:00
|
|
|
override def supervisorStrategy = SupervisorStrategy.stoppingStrategy
|
|
|
|
|
|
2014-08-21 12:35:38 +02:00
|
|
|
def receive = {
|
|
|
|
|
case Materialize(props, name) ⇒
|
|
|
|
|
val impl = context.actorOf(props, name)
|
|
|
|
|
sender() ! impl
|
2015-05-14 12:21:47 +02:00
|
|
|
case GetChildren ⇒ sender() ! Children(context.children.toSet)
|
|
|
|
|
case StopChildren ⇒
|
|
|
|
|
context.children.foreach(context.stop)
|
|
|
|
|
sender() ! StoppedChildren
|
2014-08-21 12:35:38 +02:00
|
|
|
}
|
2015-06-25 12:54:29 +02:00
|
|
|
|
|
|
|
|
override def postStop(): Unit = haveShutDown.set(true)
|
2014-10-27 14:35:41 +01:00
|
|
|
}
|
|
|
|
|
|