many compile fixes and FlowMaterializer

The DSLs MUST NOT rely upon ActorFlowMaterializer, otherwise we cannot
add other materializers later.
This commit is contained in:
Roland Kuhn 2015-02-26 22:42:34 +01:00
parent a4f4cb298a
commit e870c7cbd2
15 changed files with 131 additions and 108 deletions

View file

@ -19,6 +19,17 @@ import org.reactivestreams._
import scala.concurrent.{ Await, ExecutionContextExecutor }
object ActorFlowMaterializerImpl {
import OperationAttributes._
private[akka] def calcSettings(opAttr: OperationAttributes)(settings: ActorFlowMaterializerSettings): ActorFlowMaterializerSettings =
opAttr.attributes.collect {
case InputBuffer(initial, max) (s: ActorFlowMaterializerSettings) s.withInputBuffer(initial, max)
case Dispatcher(dispatcher) (s: ActorFlowMaterializerSettings) s.withDispatcher(dispatcher)
case SupervisionStrategy(decider) (s: ActorFlowMaterializerSettings)
s.withSupervisionStrategy(decider)
}.reduceOption(_ andThen _).getOrElse((x: ActorFlowMaterializerSettings) x)(settings) // FIXME is this the optimal way of encoding this?
}
/**
* INTERNAL API
*/
@ -29,6 +40,7 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
namePrefix: String,
optimizations: Optimizations)
extends ActorFlowMaterializer {
import ActorFlowMaterializerImpl._
import akka.stream.impl.Stages._
def withNamePrefix(name: String): FlowMaterializer = this.copy(namePrefix = name)
@ -49,60 +61,66 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
name
}
override protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any = atomic match {
case sink: SinkModule[_, _]
val (sub, mat) = sink.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes))
assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]])
mat
case source: SourceModule[_, _]
val (pub, mat) = source.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes))
assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]])
mat
override protected def materializeAtomic(atomic: Module, effectiveAttributes: OperationAttributes): Any = {
case stage: StageModule
val (processor, mat) = processorFor(stage, effectiveAttributes)
assignPort(stage.inPort, processor)
assignPort(stage.outPort, processor)
mat
atomic match {
case sink: SinkModule[_, _]
val (sub, mat) = sink.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes))
assignPort(sink.shape.inlet, sub.asInstanceOf[Subscriber[Any]])
mat
case source: SourceModule[_, _]
val (pub, mat) = source.create(ActorFlowMaterializerImpl.this, stageName(effectiveAttributes))
assignPort(source.shape.outlet, pub.asInstanceOf[Publisher[Any]])
mat
case junction: JunctionModule materializeJunction(junction, effectiveAttributes)
case stage: StageModule
val (processor, mat) = processorFor(stage, effectiveAttributes, calcSettings(effectiveAttributes)(settings))
assignPort(stage.inPort, processor)
assignPort(stage.outPort, processor)
mat
case junction: JunctionModule materializeJunction(junction, effectiveAttributes, calcSettings(effectiveAttributes)(settings))
}
}
private def processorFor(op: StageModule, effectiveAttributes: OperationAttributes): (Processor[Any, Any], Any) = op match {
private def processorFor(op: StageModule,
effectiveAttributes: OperationAttributes,
effectiveSettings: ActorFlowMaterializerSettings): (Processor[Any, Any], Any) = op match {
case DirectProcessor(processorFactory, _) processorFactory()
case _
val (opprops, mat) = ActorProcessorFactory.props(ActorFlowMaterializerImpl.this, op, effectiveAttributes)
val processor = ActorProcessorFactory[Any, Any](actorOf(
opprops,
stageName(effectiveAttributes),
effectiveAttributes.settings(settings).dispatcher))
effectiveSettings.dispatcher))
processor -> mat
}
private def materializeJunction(op: JunctionModule, effectiveAttributes: OperationAttributes): Unit = {
private def materializeJunction(op: JunctionModule,
effectiveAttributes: OperationAttributes,
effectiveSettings: ActorFlowMaterializerSettings): Unit = {
op match {
case fanin: FanInModule
val (props, inputs, output) = fanin match {
case MergeModule(shape, _)
(FairMerge.props(effectiveAttributes.settings(settings), shape.inArray.size), shape.inArray.toSeq, shape.out)
(FairMerge.props(effectiveSettings, shape.inArray.size), shape.inArray.toSeq, shape.out)
case f: FlexiMergeModule[t, p]
val flexi = f.flexi(f.shape)
(FlexiMerge.props(effectiveAttributes.settings(settings), f.shape, flexi), f.shape.inlets, f.shape.outlets.head)
(FlexiMerge.props(effectiveSettings, f.shape, flexi), f.shape.inlets, f.shape.outlets.head)
case MergePreferredModule(shape, _)
(UnfairMerge.props(effectiveAttributes.settings(settings), shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out)
(UnfairMerge.props(effectiveSettings, shape.inlets.size), shape.preferred +: shape.inArray.toSeq, shape.out)
case ConcatModule(shape, _)
require(shape.inArray.size == 2, "currently only supporting concatenation of exactly two inputs") // FIXME
(Concat.props(effectiveAttributes.settings(settings)), shape.inArray.toSeq, shape.out)
(Concat.props(effectiveSettings), shape.inArray.toSeq, shape.out)
case zip: ZipWithModule
(zip.props(effectiveAttributes.settings(settings)), zip.shape.inlets, zip.outPorts.head)
(zip.props(effectiveSettings), zip.shape.inlets, zip.outPorts.head)
}
val impl = actorOf(props, stageName(effectiveAttributes), effectiveAttributes.settings(settings).dispatcher)
val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
val publisher = new ActorPublisher[Any](impl)
impl ! ExposedPublisher(publisher)
for ((in, id) inputs.zipWithIndex) {
@ -115,18 +133,18 @@ case class ActorFlowMaterializerImpl(override val settings: ActorFlowMaterialize
case r: FlexiRouteModule[t, p]
val flexi = r.flexi(r.shape)
(FlexiRoute.props(effectiveAttributes.settings(settings), r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets)
(FlexiRoute.props(effectiveSettings, r.shape, flexi), r.shape.inlets.head: InPort, r.shape.outlets)
case BroadcastModule(shape, _)
(Broadcast.props(effectiveAttributes.settings(settings), shape.outArray.size), shape.in, shape.outArray.toSeq)
(Broadcast.props(effectiveSettings, shape.outArray.size), shape.in, shape.outArray.toSeq)
case BalanceModule(shape, waitForDownstreams, _)
(Balance.props(effectiveAttributes.settings(settings), shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
(Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
case UnzipModule(shape, _)
(Unzip.props(effectiveAttributes.settings(settings)), shape.in, shape.outlets)
(Unzip.props(effectiveSettings), shape.in, shape.outlets)
}
val impl = actorOf(props, stageName(effectiveAttributes), effectiveAttributes.settings(settings).dispatcher)
val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
val size = outs.size
def factory(id: Int) = new ActorPublisher[Any](impl) {
override val wakeUpMsg = FanOut.SubstreamSubscribePending(id)
@ -215,25 +233,26 @@ private[akka] class StreamSupervisor(settings: ActorFlowMaterializerSettings) ex
*/
private[akka] object ActorProcessorFactory {
import akka.stream.impl.Stages._
import ActorFlowMaterializerImpl._
def props(materializer: ActorFlowMaterializerImpl, op: StageModule, parentAttributes: OperationAttributes): (Props, Any) = {
val att = parentAttributes and op.attributes
// USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
// Also, otherwise the attributes will not affect the settings properly!
val settings = att.settings(materializer.settings)
val settings = calcSettings(att)(materializer.settings)
op match {
case Identity(_) (ActorInterpreter.props(settings, List(fusing.Map({ x: Any x }, att.settings(settings).supervisionDecider))), ())
case Identity(_) (ActorInterpreter.props(settings, List(fusing.Map({ x: Any x }, settings.supervisionDecider))), ())
case Fused(ops, _) (ActorInterpreter.props(settings, ops), ())
case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, att.settings(settings).supervisionDecider))), ())
case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, att.settings(settings).supervisionDecider))), ())
case Map(f, _) (ActorInterpreter.props(settings, List(fusing.Map(f, settings.supervisionDecider))), ())
case Filter(p, _) (ActorInterpreter.props(settings, List(fusing.Filter(p, settings.supervisionDecider))), ())
case Drop(n, _) (ActorInterpreter.props(settings, List(fusing.Drop(n))), ())
case Take(n, _) (ActorInterpreter.props(settings, List(fusing.Take(n))), ())
case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(att.settings(settings).supervisionDecider)(pf))), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, att.settings(settings).supervisionDecider))), ())
case Collect(pf, _) (ActorInterpreter.props(settings, List(fusing.Collect(settings.supervisionDecider)(pf))), ())
case Scan(z, f, _) (ActorInterpreter.props(settings, List(fusing.Scan(z, f, settings.supervisionDecider))), ())
case Expand(s, f, _) (ActorInterpreter.props(settings, List(fusing.Expand(s, f))), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, att.settings(settings).supervisionDecider))), ())
case Conflate(s, f, _) (ActorInterpreter.props(settings, List(fusing.Conflate(s, f, settings.supervisionDecider))), ())
case Buffer(n, s, _) (ActorInterpreter.props(settings, List(fusing.Buffer(n, s))), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, att.settings(settings).supervisionDecider))), ())
case MapConcat(f, _) (ActorInterpreter.props(settings, List(fusing.MapConcat(f, settings.supervisionDecider))), ())
case MapAsync(f, _) (MapAsyncProcessorImpl.props(settings, f), ())
case MapAsyncUnordered(f, _) (MapAsyncUnorderedProcessorImpl.props(settings, f), ())
case Grouped(n, _) (ActorInterpreter.props(settings, List(fusing.Grouped(n))), ())