+str #15205 Add FlexiRoute junction

This commit is contained in:
Patrik Nordwall 2014-10-30 09:13:25 +01:00
parent 8003332542
commit 5f75af7059
7 changed files with 923 additions and 18 deletions

View file

@ -114,7 +114,11 @@ private[akka] object Ast {
}
case class FlexiMergeNode(merger: FlexiMerge[Any]) extends FanInAstNode {
override def name = merger.name.getOrElse("")
override def name = merger.name.getOrElse("flexMerge")
}
case class RouteNode(route: FlexiRoute[Any]) extends FanOutAstNode {
override def name = route.name.getOrElse("route")
}
}
@ -232,7 +236,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
op match {
case fanin: Ast.FanInAstNode
val impl = op match {
val impl = fanin match {
case Ast.Merge
actorOf(FairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.MergePreferred
@ -252,13 +256,16 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
(subscribers, List(publisher))
case fanout: Ast.FanOutAstNode
val impl = op match {
val impl = fanout match {
case Ast.Broadcast
actorOf(Broadcast.props(settings, outputCount).withDispatcher(settings.dispatcher), actorName)
case Ast.Balance(waitForAllDownstreams)
actorOf(Balance.props(settings, outputCount, waitForAllDownstreams).withDispatcher(settings.dispatcher), actorName)
case Ast.Unzip
actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName)
case Ast.RouteNode(route)
actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()).
withDispatcher(settings.dispatcher), actorName)
}
val publishers = Vector.tabulate(outputCount)(id new ActorPublisher[Out](impl) {