2014-03-30 09:27:19 +02:00
/* *
* Copyright ( C ) 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.impl
2014-05-08 19:34:58 +02:00
import java.util.concurrent.atomic.AtomicLong
2014-08-22 11:42:05 +02:00
2014-11-17 22:50:15 +01:00
import akka.dispatch.Dispatchers
2014-11-09 21:09:50 +01:00
import akka.event.Logging
2014-11-12 10:43:39 +01:00
import akka.stream.impl.fusing.ActorInterpreter
2014-08-22 11:42:05 +02:00
import scala.annotation.tailrec
import scala.collection.immutable
2014-11-17 22:50:15 +01:00
import scala.concurrent. { ExecutionContext , Await , Future }
import akka.actor._
2014-11-12 10:43:39 +01:00
import akka.stream. { FlowMaterializer , MaterializerSettings , OverflowStrategy , TimerTransformer }
2014-10-27 14:35:41 +01:00
import akka.stream.MaterializationException
import akka.stream.actor.ActorSubscriber
import akka.stream.impl.Zip.ZipAs
import akka.stream.scaladsl._
2014-11-12 10:43:39 +01:00
import akka.stream.stage._
2014-10-27 14:35:41 +01:00
import akka.pattern.ask
import org.reactivestreams. { Processor , Publisher , Subscriber }
2014-03-30 09:27:19 +02:00
/* *
* INTERNAL API
*/
private [ akka ] object Ast {
2014-11-09 21:09:50 +01:00
sealed abstract class AstNode {
2014-05-08 19:34:58 +02:00
def name : String
}
2014-03-30 09:27:19 +02:00
2014-11-12 10:43:39 +01:00
final case class TimerTransform ( mkStage : ( ) ⇒ TimerTransformer [ Any , Any ] , override val name : String ) extends AstNode
final case class StageFactory ( mkStage : ( ) ⇒ Stage [ _ , _ ] , override val name : String ) extends AstNode
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
object Fused {
2014-11-12 10:43:39 +01:00
def apply ( ops : immutable.Seq [ Stage [ _ , _ ] ] ) : Fused =
2014-11-09 21:09:50 +01:00
Fused ( ops , ops . map ( x ⇒ Logging . simpleName ( x ) . toLowerCase ) . mkString ( "+" ) ) //FIXME change to something more performant for name
}
2014-11-12 10:43:39 +01:00
final case class Fused ( ops : immutable.Seq [ Stage [ _ , _ ] ] , override val name : String ) extends AstNode
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
final case class Map ( f : Any ⇒ Any ) extends AstNode { override def name = "map" }
2014-10-08 18:16:57 +02:00
2014-11-09 21:09:50 +01:00
final case class Filter ( p : Any ⇒ Boolean ) extends AstNode { override def name = "filter" }
final case class Collect ( pf : PartialFunction [ Any , Any ] ) extends AstNode { override def name = "collect" }
// FIXME Replace with OperateAsync
final case class MapAsync ( f : Any ⇒ Future [ Any ] ) extends AstNode { override def name = "mapAsync" }
//FIXME Should be OperateUnorderedAsync
final case class MapAsyncUnordered ( f : Any ⇒ Future [ Any ] ) extends AstNode { override def name = "mapAsyncUnordered" }
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
final case class Grouped ( n : Int ) extends AstNode {
require ( n > 0 , "n must be greater than 0" )
override def name = "grouped"
2014-10-27 14:35:41 +01:00
}
2014-11-09 21:09:50 +01:00
//FIXME should be `n: Long`
final case class Take ( n : Int ) extends AstNode {
override def name = "take"
2014-05-08 19:34:58 +02:00
}
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
//FIXME should be `n: Long`
final case class Drop ( n : Int ) extends AstNode {
override def name = "drop"
2014-05-16 14:21:15 +02:00
}
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
final case class Scan ( zero : Any , f : ( Any , Any ) ⇒ Any ) extends AstNode { override def name = "scan" }
final case class Buffer ( size : Int , overflowStrategy : OverflowStrategy ) extends AstNode {
require ( size > 0 , s" Buffer size must be larger than zero but was [ $size ] " )
override def name = "buffer"
}
final case class Conflate ( seed : Any ⇒ Any , aggregate : ( Any , Any ) ⇒ Any ) extends AstNode {
override def name = "conflate"
}
final case class Expand ( seed : Any ⇒ Any , extrapolate : Any ⇒ ( Any , Any ) ) extends AstNode {
override def name = "expand"
2014-10-27 14:35:41 +01:00
}
2014-11-09 21:09:50 +01:00
final case class MapConcat ( f : Any ⇒ immutable . Seq [ Any ] ) extends AstNode {
override def name = "mapConcat"
}
final case class GroupBy ( f : Any ⇒ Any ) extends AstNode { override def name = "groupBy" }
final case class PrefixAndTail ( n : Int ) extends AstNode { override def name = "prefixAndTail" }
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
final case class SplitWhen ( p : Any ⇒ Boolean ) extends AstNode { override def name = "splitWhen" }
final case object ConcatAll extends AstNode {
2014-05-16 14:21:15 +02:00
override def name = "concatFlatten"
}
2014-03-28 15:44:18 +01:00
2014-10-27 14:35:41 +01:00
sealed trait JunctionAstNode {
def name : String
2014-03-28 15:44:18 +01:00
}
2014-10-27 14:35:41 +01:00
// FIXME: Try to eliminate these
sealed trait FanInAstNode extends JunctionAstNode
sealed trait FanOutAstNode extends JunctionAstNode
2014-11-09 21:09:50 +01:00
// FIXME Why do we need this?
2014-11-06 12:38:04 +02:00
case object IdentityAstNode extends JunctionAstNode {
override def name = "identity"
}
2014-10-27 14:35:41 +01:00
case object Merge extends FanInAstNode {
override def name = "merge"
2014-03-28 15:44:18 +01:00
}
2014-10-27 14:35:41 +01:00
case object MergePreferred extends FanInAstNode {
override def name = "mergePreferred"
2014-03-28 15:44:18 +01:00
}
2014-10-27 14:35:41 +01:00
case object Broadcast extends FanOutAstNode {
override def name = "broadcast"
2014-03-28 15:44:18 +01:00
}
2014-10-27 14:35:41 +01:00
2014-10-29 10:17:03 +01:00
case class Balance ( waitForAllDownstreams : Boolean ) extends FanOutAstNode {
2014-10-27 14:35:41 +01:00
override def name = "balance"
2014-04-02 08:07:05 +02:00
}
2014-10-27 14:35:41 +01:00
final case class Zip ( as : ZipAs ) extends FanInAstNode {
override def name = "zip"
2014-04-29 15:16:05 +02:00
}
2014-10-27 14:35:41 +01:00
case object Unzip extends FanOutAstNode {
override def name = "unzip"
2014-05-22 20:58:38 +02:00
}
2014-10-27 14:35:41 +01:00
case object Concat extends FanInAstNode {
override def name = "concat"
}
2014-11-05 20:12:24 +01:00
case class FlexiMergeNode ( factory : FlexiMergeImpl.MergeLogicFactory [ Any ] ) extends FanInAstNode {
override def name = factory . name . getOrElse ( "flexiMerge" )
2014-10-30 09:13:25 +01:00
}
2014-11-05 20:12:24 +01:00
case class FlexiRouteNode ( factory : FlexiRouteImpl.RouteLogicFactory [ Any ] ) extends FanOutAstNode {
override def name = factory . name . getOrElse ( "flexiRoute" )
2014-10-27 14:35:41 +01:00
}
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
/* *
* INTERNAL API
*/
final object Optimizations {
val none : Optimizations = Optimizations ( collapsing = false , elision = false , simplification = false , fusion = false )
val all : Optimizations = Optimizations ( collapsing = true , elision = true , simplification = true , fusion = true )
}
/* *
* INTERNAL API
*/
final case class Optimizations ( collapsing : Boolean , elision : Boolean , simplification : Boolean , fusion : Boolean ) {
def isEnabled : Boolean = collapsing || elision || simplification || fusion
}
2014-03-30 09:27:19 +02:00
/* *
* INTERNAL API
*/
2014-10-27 14:35:41 +01:00
case class ActorBasedFlowMaterializer ( override val settings : MaterializerSettings ,
2014-11-17 22:50:15 +01:00
dispatchers : Dispatchers , // FIXME is this the right choice for loading an EC?
2014-10-27 14:35:41 +01:00
supervisor : ActorRef ,
flowNameCounter : AtomicLong ,
2014-11-09 21:09:50 +01:00
namePrefix : String ,
optimizations : Optimizations )
2014-05-22 08:40:41 +02:00
extends FlowMaterializer ( settings ) {
2014-10-27 14:35:41 +01:00
import Ast.AstNode
2014-04-02 09:03:59 +02:00
2014-08-21 12:35:38 +02:00
def withNamePrefix ( name : String ) : FlowMaterializer = this . copy ( namePrefix = name )
2014-05-08 19:34:58 +02:00
2014-11-09 21:09:50 +01:00
private [ this ] def nextFlowNameCount ( ) : Long = flowNameCounter . incrementAndGet ( )
2014-05-08 19:34:58 +02:00
2014-11-09 21:09:50 +01:00
private [ this ] def createFlowName ( ) : String = s" $namePrefix - ${ nextFlowNameCount ( ) } "
2014-05-08 19:34:58 +02:00
2014-11-09 21:09:50 +01:00
@tailrec private [ this ] def processorChain ( topProcessor : Processor [ _ , _ ] ,
ops : List [ AstNode ] ,
flowName : String ,
n : Int ) : Processor [ _ , _ ] =
2014-03-30 09:27:19 +02:00
ops match {
case op : : tail ⇒
2014-11-09 21:09:50 +01:00
val opProcessor = processorForNode [ Any , Any ] ( op , flowName , n )
opProcessor . subscribe ( topProcessor . asInstanceOf [ Subscriber [ Any ] ] )
2014-05-08 19:34:58 +02:00
processorChain ( opProcessor , tail , flowName , n - 1 )
2014-11-09 21:09:50 +01:00
case Nil ⇒
topProcessor
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
//FIXME Optimize the implementation of the optimizer (no joke)
// AstNodes are in reverse order, Fusable Ops are in order
private [ this ] final def optimize ( ops : List [ Ast . AstNode ] ) : ( List [ Ast . AstNode ] , Int ) = {
2014-11-12 10:43:39 +01:00
@tailrec def analyze ( rest : List [ Ast . AstNode ] , optimized : List [ Ast . AstNode ] , fuseCandidates : List [ Stage [ _ , _ ] ] ) : ( List [ Ast . AstNode ] , Int ) = {
2014-11-09 21:09:50 +01:00
//The `verify` phase
def verify ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case ( f : Ast . Fused ) : : _ ⇒ throw new IllegalStateException ( "Fused AST nodes not allowed to be present in the input to the optimizer: " + f )
//TODO Ast.Take(-Long.MaxValue..0) == stream doesn't do anything. Perhaps output warning for that?
case noMatch ⇒ noMatch
}
// The `elide` phase
// TODO / FIXME : This phase could be pulled out to be executed incrementally when building the Ast
def elide ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case noMatch if ! optimizations . elision || ( noMatch ne orig ) ⇒ orig
//Collapses consecutive Take's into one
case ( t1 @ Ast . Take ( t1n ) ) : : ( t2 @ Ast . Take ( t2n ) ) :: rest ⇒ ( if ( t1n < t2n ) t1 else t2 ) : : rest
//Collapses consecutive Drop's into one
case ( d1 @ Ast . Drop ( d1n ) ) : : ( d2 @ Ast . Drop ( d2n ) ) :: rest ⇒ new Ast . Drop ( d1n + d2n ) : : rest
case Ast . Drop ( n ) : : rest if n < 1 ⇒ rest // a 0 or negative drop is a NoOp
case noMatch ⇒ noMatch
}
// The `simplify` phase
def simplify ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case noMatch if ! optimizations . simplification || ( noMatch ne orig ) ⇒ orig
// Two consecutive maps is equivalent to one pipelined map
2014-11-20 17:01:49 +02:00
case Ast . Map ( second ) : : Ast . Map ( first ) :: rest ⇒ Ast . Map ( first andThen second ) : : rest
2014-11-09 21:09:50 +01:00
case noMatch ⇒ noMatch
}
// the `Collapse` phase
def collapse ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case noMatch if ! optimizations . collapsing || ( noMatch ne orig ) ⇒ orig
// Collapses a filter and a map into a collect
case Ast . Map ( f ) : : Ast . Filter ( p ) :: rest ⇒ Ast . Collect ( { case i if p ( i ) ⇒ f ( i ) } ) : : rest
case noMatch ⇒ noMatch
}
// Tries to squeeze AstNode into a single fused pipeline
2014-11-12 10:43:39 +01:00
def ast2op ( head : Ast . AstNode , prev : List [ Stage [ _ , _ ] ] ) : List [ Stage [ _ , _ ] ] =
2014-11-09 21:09:50 +01:00
head match {
// Always-on below
2014-11-12 10:43:39 +01:00
case Ast . StageFactory ( mkStage , _ ) ⇒ mkStage ( ) : : prev
2014-11-09 21:09:50 +01:00
// Optimizations below
case noMatch if ! optimizations . fusion ⇒ prev
case Ast . Map ( f ) ⇒ fusing . Map ( f ) : : prev
2014-11-17 22:50:15 +01:00
case Ast . Filter ( p ) ⇒ fusing . Filter ( p ) : : prev
case Ast . Drop ( n ) ⇒ fusing . Drop ( n ) : : prev
case Ast . Take ( n ) ⇒ fusing . Take ( n ) : : prev
2014-11-09 21:09:50 +01:00
case Ast . Collect ( pf ) ⇒ fusing . Collect ( pf ) : : prev
2014-11-17 22:50:15 +01:00
case Ast . Scan ( z , f ) ⇒ fusing . Scan ( z , f ) : : prev
case Ast . Expand ( s , f ) ⇒ fusing . Expand ( s , f ) : : prev
case Ast . Conflate ( s , f ) ⇒ fusing . Conflate ( s , f ) : : prev
case Ast . Buffer ( n , s ) ⇒ fusing . Buffer ( n , s ) : : prev
case Ast . MapConcat ( f ) ⇒ fusing . MapConcat ( f ) : : prev
case Ast . Grouped ( n ) ⇒ fusing . Grouped ( n ) : : prev
2014-11-09 21:09:50 +01:00
//FIXME Add more fusion goodies here
case _ ⇒ prev
}
// First verify, then try to elide, then try to simplify, then try to fuse
collapse ( rest , simplify ( rest , elide ( rest , verify ( rest , rest ) ) ) ) match {
case Nil ⇒
if ( fuseCandidates . isEmpty ) ( optimized . reverse , optimized . length ) // End of optimization run without fusion going on, wrap up
else ( ( Ast . Fused ( fuseCandidates ) : : optimized ) . reverse , optimized . length + 1 ) // End of optimization run with fusion going on, so add it to the optimized stack
// If the Ast was changed this pass simply recur
case modified if modified ne rest ⇒ analyze ( modified , optimized , fuseCandidates )
// No changes to the Ast, lets try to see if we can squeeze the current head Ast node into a fusion pipeline
case head : : rest ⇒
ast2op ( head , fuseCandidates ) match {
case Nil ⇒ analyze ( rest , head : : optimized , Nil )
case `fuseCandidates` ⇒ analyze ( rest , head : : Ast . Fused ( fuseCandidates ) :: optimized , Nil )
case newFuseCandidates ⇒ analyze ( rest , optimized , newFuseCandidates )
}
}
}
val result = analyze ( ops , Nil , Nil )
//println(s"before: $ops")
//println(s"after: ${result._1}")
result
2014-03-30 09:27:19 +02:00
}
// Ops come in reverse order
2014-11-09 21:09:50 +01:00
override def materialize [ In , Out ] ( source : Source [ In ] , sink : Sink [ Out ] , rawOps : List [ Ast . AstNode ] ) : MaterializedMap = {
val flowName = createFlowName ( ) //FIXME: Creates Id even when it is not used in all branches below
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
def throwUnknownType ( typeName : String , s : AnyRef ) : Nothing =
throw new MaterializationException ( s" unknown $typeName type ${ s . getClass } " )
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
def attachSink ( pub : Publisher [ Out ] , flowName : String ) = sink match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSink [ Out ] ⇒ s . attach ( pub , this , flowName )
case s ⇒ throwUnknownType ( "Sink" , s )
}
2014-11-09 21:09:50 +01:00
def attachSource ( sub : Subscriber [ In ] , flowName : String ) = source match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSource [ In ] ⇒ s . attach ( sub , this , flowName )
case s ⇒ throwUnknownType ( "Source" , s )
}
2014-11-09 21:09:50 +01:00
def createSink ( flowName : String ) = sink match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSink [ In ] ⇒ s . create ( this , flowName )
case s ⇒ throwUnknownType ( "Sink" , s )
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
def createSource ( flowName : String ) = source match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSource [ Out ] ⇒ s . create ( this , flowName )
case s ⇒ throwUnknownType ( "Source" , s )
}
def isActive ( s : AnyRef ) = s match {
case s : ActorFlowSource [ _ ] ⇒ s . isActive
case s : ActorFlowSink [ _ ] ⇒ s . isActive
case s : Source [ _ ] ⇒ throwUnknownType ( "Source" , s )
case s : Sink [ _ ] ⇒ throwUnknownType ( "Sink" , s )
}
val ( sourceValue , sinkValue ) =
2014-11-09 21:09:50 +01:00
if ( rawOps . isEmpty ) {
2014-10-27 14:35:41 +01:00
if ( isActive ( sink ) ) {
2014-11-09 21:09:50 +01:00
val ( sub , value ) = createSink ( flowName )
( attachSource ( sub , flowName ) , value )
2014-10-27 14:35:41 +01:00
} else if ( isActive ( source ) ) {
2014-11-09 21:09:50 +01:00
val ( pub , value ) = createSource ( flowName )
( value , attachSink ( pub , flowName ) )
2014-10-27 14:35:41 +01:00
} else {
2014-11-12 10:43:39 +01:00
val id = processorForNode [ In , Out ] ( identityStageNode , flowName , 1 )
2014-11-09 21:09:50 +01:00
( attachSource ( id , flowName ) , attachSink ( id , flowName ) )
2014-10-27 14:35:41 +01:00
}
} else {
2014-11-09 21:09:50 +01:00
val ( ops , opsSize ) = if ( optimizations . isEnabled ) optimize ( rawOps ) else ( rawOps , rawOps . length )
val last = processorForNode [ Any , Out ] ( ops . head , flowName , opsSize )
2014-10-27 14:35:41 +01:00
val first = processorChain ( last , ops . tail , flowName , opsSize - 1 ) . asInstanceOf [ Processor [ In , Any ] ]
2014-11-09 21:09:50 +01:00
( attachSource ( first , flowName ) , attachSink ( last , flowName ) )
2014-10-27 14:35:41 +01:00
}
new MaterializedPipe ( source , sourceValue , sink , sinkValue )
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
//FIXME Should this be a dedicated AstNode?
2014-11-12 10:43:39 +01:00
private [ this ] val identityStageNode = Ast . StageFactory ( ( ) ⇒ FlowOps . identityStage [ Any ] , "identity" )
2014-05-07 15:56:02 +02:00
2014-11-17 22:50:15 +01:00
def executionContext : ExecutionContext = dispatchers . lookup ( settings . dispatcher match {
case Deploy . NoDispatcherGiven ⇒ Dispatchers . DefaultDispatcherId
case other ⇒ other
} )
2014-10-27 14:35:41 +01:00
/* *
* INTERNAL API
*/
2014-11-09 21:09:50 +01:00
private [ akka ] def processorForNode [ In , Out ] ( op : AstNode , flowName : String , n : Int ) : Processor [ In , Out ] =
ActorProcessorFactory [ In , Out ] ( actorOf ( ActorProcessorFactory . props ( this , op ) , s" $flowName - $n - ${ op . name } " ) )
2014-08-21 12:35:38 +02:00
def actorOf ( props : Props , name : String ) : ActorRef = supervisor match {
case ref : LocalActorRef ⇒
2014-11-09 21:09:50 +01:00
ref . underlying . attachChild ( props . withDispatcher ( settings . dispatcher ) , name , systemService = false )
2014-08-21 12:35:38 +02:00
case ref : RepointableActorRef ⇒
if ( ref . isStarted )
2014-11-09 21:09:50 +01:00
ref . underlying . asInstanceOf [ ActorCell ] . attachChild ( props . withDispatcher ( settings . dispatcher ) , name , systemService = false )
2014-08-21 12:35:38 +02:00
else {
implicit val timeout = ref . system . settings . CreationTimeout
2014-11-09 21:09:50 +01:00
val f = ( supervisor ? StreamSupervisor . Materialize ( props . withDispatcher ( settings . dispatcher ) , name ) ) . mapTo [ ActorRef ]
2014-08-21 12:35:38 +02:00
Await . result ( f , timeout . duration )
}
2014-11-09 21:09:50 +01:00
case unknown ⇒
throw new IllegalStateException ( s" Stream supervisor must be a local actor, was [ ${ unknown . getClass . getName } ] " )
2014-08-21 12:35:38 +02:00
}
2014-11-09 21:09:50 +01:00
// FIXME Investigate possibility of using `enableOperationsFusion` in `materializeJunction`
2014-10-27 14:35:41 +01:00
override def materializeJunction [ In , Out ] ( op : Ast . JunctionAstNode , inputCount : Int , outputCount : Int ) : ( immutable.Seq [ Subscriber [ In ] ] , immutable . Seq [ Publisher [ Out ] ] ) = {
2014-11-09 21:09:50 +01:00
val actorName = s" ${ createFlowName ( ) } - ${ op . name } "
2014-10-27 14:35:41 +01:00
op match {
case fanin : Ast . FanInAstNode ⇒
2014-10-30 09:13:25 +01:00
val impl = fanin match {
2014-11-09 21:09:50 +01:00
case Ast . Merge ⇒ actorOf ( FairMerge . props ( settings , inputCount ) , actorName )
case Ast . MergePreferred ⇒ actorOf ( UnfairMerge . props ( settings , inputCount ) , actorName )
case zip : Ast . Zip ⇒ actorOf ( Zip . props ( settings , zip . as ) , actorName )
case Ast . Concat ⇒ actorOf ( Concat . props ( settings ) , actorName )
case Ast . FlexiMergeNode ( merger ) ⇒ actorOf ( FlexiMergeImpl . props ( settings , inputCount , merger . createMergeLogic ( ) ) , actorName )
2014-10-27 14:35:41 +01:00
}
2014-10-31 08:53:27 +01:00
val publisher = new ActorPublisher [ Out ] ( impl )
2014-10-27 14:35:41 +01:00
impl ! ExposedPublisher ( publisher . asInstanceOf [ ActorPublisher [ Any ] ] )
val subscribers = Vector . tabulate ( inputCount ) ( FanIn . SubInput [ In ] ( impl , _ ) )
( subscribers , List ( publisher ) )
case fanout : Ast . FanOutAstNode ⇒
2014-10-30 09:13:25 +01:00
val impl = fanout match {
2014-11-09 21:09:50 +01:00
case Ast . Broadcast ⇒ actorOf ( Broadcast . props ( settings , outputCount ) , actorName )
case Ast . Balance ( waitForAllDownstreams ) ⇒ actorOf ( Balance . props ( settings , outputCount , waitForAllDownstreams ) , actorName )
case Ast . Unzip ⇒ actorOf ( Unzip . props ( settings ) , actorName )
case Ast . FlexiRouteNode ( route ) ⇒ actorOf ( FlexiRouteImpl . props ( settings , outputCount , route . createRouteLogic ( ) ) , actorName )
2014-10-27 14:35:41 +01:00
}
2014-10-31 08:53:27 +01:00
val publishers = Vector . tabulate ( outputCount ) ( id ⇒ new ActorPublisher [ Out ] ( impl ) {
2014-10-27 14:35:41 +01:00
override val wakeUpMsg = FanOut . SubstreamSubscribePending ( id )
} )
impl ! FanOut . ExposedPublishers ( publishers . asInstanceOf [ immutable . Seq [ ActorPublisher [ Any ] ] ] )
val subscriber = ActorSubscriber [ In ] ( impl )
( List ( subscriber ) , publishers )
2014-11-06 12:38:04 +02:00
2014-11-09 21:09:50 +01:00
case identity @ Ast . IdentityAstNode ⇒ // FIXME Why is IdentityAstNode a JunctionAStNode?
2014-11-12 10:43:39 +01:00
val id = List ( processorForNode [ In , Out ] ( identityStageNode , identity . name , 1 ) ) // FIXME is `identity.name` appropriate/unique here?
2014-11-09 21:09:50 +01:00
( id , id )
2014-05-07 15:56:02 +02:00
}
2014-10-27 14:35:41 +01:00
2014-05-07 15:56:02 +02:00
}
2014-03-30 09:27:19 +02:00
}
2014-05-08 19:34:58 +02:00
/* *
* INTERNAL API
*/
private [ akka ] object FlowNameCounter extends ExtensionId [ FlowNameCounter ] with ExtensionIdProvider {
override def get ( system : ActorSystem ) : FlowNameCounter = super . get ( system )
override def lookup = FlowNameCounter
override def createExtension ( system : ExtendedActorSystem ) : FlowNameCounter = new FlowNameCounter
}
/* *
* INTERNAL API
*/
private [ akka ] class FlowNameCounter extends Extension {
val counter = new AtomicLong ( 0 )
2014-08-21 12:35:38 +02:00
}
/* *
* INTERNAL API
*/
private [ akka ] object StreamSupervisor {
def props ( settings : MaterializerSettings ) : Props = Props ( new StreamSupervisor ( settings ) )
case class Materialize ( props : Props , name : String )
}
private [ akka ] class StreamSupervisor ( settings : MaterializerSettings ) extends Actor {
import StreamSupervisor._
2014-10-27 14:35:41 +01:00
override def supervisorStrategy = SupervisorStrategy . stoppingStrategy
2014-08-21 12:35:38 +02:00
def receive = {
case Materialize ( props , name ) ⇒
val impl = context . actorOf ( props , name )
sender ( ) ! impl
}
2014-10-27 14:35:41 +01:00
}
/* *
* INTERNAL API
*/
private [ akka ] object ActorProcessorFactory {
import Ast._
def props ( materializer : FlowMaterializer , op : AstNode ) : Props = {
2014-11-09 21:09:50 +01:00
val settings = materializer . settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
2014-10-27 14:35:41 +01:00
( op match {
2014-11-12 10:43:39 +01:00
case Fused ( ops , _ ) ⇒ ActorInterpreter . props ( settings , ops )
case Map ( f ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Map ( f ) ) )
case Filter ( p ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Filter ( p ) ) )
case Drop ( n ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Drop ( n ) ) )
case Take ( n ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Take ( n ) ) )
case Collect ( pf ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Collect ( pf ) ) )
case Scan ( z , f ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Scan ( z , f ) ) )
case Expand ( s , f ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Expand ( s , f ) ) )
case Conflate ( s , f ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Conflate ( s , f ) ) )
case Buffer ( n , s ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Buffer ( n , s ) ) )
case MapConcat ( f ) ⇒ ActorInterpreter . props ( settings , List ( fusing . MapConcat ( f ) ) )
case MapAsync ( f ) ⇒ MapAsyncProcessorImpl . props ( settings , f )
case MapAsyncUnordered ( f ) ⇒ MapAsyncUnorderedProcessorImpl . props ( settings , f )
case Grouped ( n ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Grouped ( n ) ) )
case GroupBy ( f ) ⇒ GroupByProcessorImpl . props ( settings , f )
case PrefixAndTail ( n ) ⇒ PrefixAndTailImpl . props ( settings , n )
case SplitWhen ( p ) ⇒ SplitWhenProcessorImpl . props ( settings , p )
case ConcatAll ⇒ ConcatAllImpl . props ( materializer ) //FIXME closes over the materializer, is this good?
case StageFactory ( mkStage , _ ) ⇒ ActorInterpreter . props ( settings , List ( mkStage ( ) ) )
case TimerTransform ( mkStage , _ ) ⇒ TimerTransformerProcessorsImpl . props ( settings , mkStage ( ) )
2014-10-27 14:35:41 +01:00
} ) . withDispatcher ( settings . dispatcher )
}
def apply [ I , O ] ( impl : ActorRef ) : ActorProcessor [ I , O ] = {
val p = new ActorProcessor [ I , O ] ( impl )
impl ! ExposedPublisher ( p . asInstanceOf [ ActorPublisher [ Any ] ] )
p
}
}