2014-03-30 09:27:19 +02:00
/* *
* Copyright ( C ) 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.impl
2014-05-08 19:34:58 +02:00
import java.util.concurrent.atomic.AtomicLong
2014-08-22 11:42:05 +02:00
2014-11-17 22:50:15 +01:00
import akka.dispatch.Dispatchers
2014-11-09 21:09:50 +01:00
import akka.event.Logging
2014-11-12 10:43:39 +01:00
import akka.stream.impl.fusing.ActorInterpreter
2014-12-01 20:07:55 +02:00
import akka.stream.scaladsl.OperationAttributes._
2014-08-22 11:42:05 +02:00
import scala.annotation.tailrec
import scala.collection.immutable
2014-11-28 10:41:57 +01:00
import scala.concurrent. { Promise , ExecutionContext , Await , Future }
2014-11-17 22:50:15 +01:00
import akka.actor._
2014-11-12 10:43:39 +01:00
import akka.stream. { FlowMaterializer , MaterializerSettings , OverflowStrategy , TimerTransformer }
2014-10-27 14:35:41 +01:00
import akka.stream.MaterializationException
import akka.stream.actor.ActorSubscriber
import akka.stream.scaladsl._
2014-11-12 10:43:39 +01:00
import akka.stream.stage._
2014-10-27 14:35:41 +01:00
import akka.pattern.ask
import org.reactivestreams. { Processor , Publisher , Subscriber }
2014-03-30 09:27:19 +02:00
/* *
* INTERNAL API
*/
private [ akka ] object Ast {
2014-12-01 20:07:55 +02:00
2014-11-09 21:09:50 +01:00
sealed abstract class AstNode {
2014-12-01 20:07:55 +02:00
def attributes : OperationAttributes
def withAttributes ( attributes : OperationAttributes ) : AstNode
}
2014-11-20 21:59:33 +01:00
// FIXME Fix the name `Defaults` is waaaay too opaque. How about "Names"?
2014-12-01 20:07:55 +02:00
object Defaults {
val timerTransform = name ( "timerTransform" )
val stageFactory = name ( "stageFactory" )
val fused = name ( "fused" )
val map = name ( "map" )
val filter = name ( "filter" )
val collect = name ( "collect" )
val mapAsync = name ( "mapAsync" )
val mapAsyncUnordered = name ( "mapAsyncUnordered" )
val grouped = name ( "grouped" )
val take = name ( "take" )
val drop = name ( "drop" )
val scan = name ( "scan" )
val buffer = name ( "buffer" )
val conflate = name ( "conflate" )
val expand = name ( "expand" )
val mapConcat = name ( "mapConcat" )
val groupBy = name ( "groupBy" )
val prefixAndTail = name ( "prefixAndTail" )
val splitWhen = name ( "splitWhen" )
val concatAll = name ( "concatAll" )
val processor = name ( "processor" )
val processorWithKey = name ( "processorWithKey" )
val identityOp = name ( "identityOp" )
val merge = name ( "merge" )
val mergePreferred = name ( "mergePreferred" )
val broadcast = name ( "broadcast" )
val balance = name ( "balance" )
val zip = name ( "zip" )
val unzip = name ( "unzip" )
val concat = name ( "concat" )
val flexiMerge = name ( "flexiMerge" )
val flexiRoute = name ( "flexiRoute" )
val identityJunction = name ( "identityJunction" )
2014-05-08 19:34:58 +02:00
}
2014-03-30 09:27:19 +02:00
2014-12-01 20:07:55 +02:00
import Defaults._
2014-11-12 10:43:39 +01:00
2014-12-01 20:07:55 +02:00
final case class TimerTransform ( mkStage : ( ) ⇒ TimerTransformer [ Any , Any ] , attributes : OperationAttributes = timerTransform ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
final case class StageFactory ( mkStage : ( ) ⇒ Stage [ _ , _ ] , attributes : OperationAttributes = stageFactory ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
object Fused {
2014-11-12 10:43:39 +01:00
def apply ( ops : immutable.Seq [ Stage [ _ , _ ] ] ) : Fused =
2014-12-01 20:07:55 +02:00
Fused ( ops , name ( ops . map ( x ⇒ Logging . simpleName ( x ) . toLowerCase ) . mkString ( "+" ) ) ) //FIXME change to something more performant for name
}
final case class Fused ( ops : immutable.Seq [ Stage [ _ , _ ] ] , attributes : OperationAttributes = fused ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-11-09 21:09:50 +01:00
}
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class Map ( f : Any ⇒ Any , attributes : OperationAttributes = map ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-10-08 18:16:57 +02:00
2014-12-01 20:07:55 +02:00
final case class Filter ( p : Any ⇒ Boolean , attributes : OperationAttributes = filter ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-11-09 21:09:50 +01:00
2014-12-01 20:07:55 +02:00
final case class Collect ( pf : PartialFunction [ Any , Any ] , attributes : OperationAttributes = collect ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-11-09 21:09:50 +01:00
// FIXME Replace with OperateAsync
2014-12-01 20:07:55 +02:00
final case class MapAsync ( f : Any ⇒ Future [ Any ] , attributes : OperationAttributes = mapAsync ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-11-09 21:09:50 +01:00
//FIXME Should be OperateUnorderedAsync
2014-12-01 20:07:55 +02:00
final case class MapAsyncUnordered ( f : Any ⇒ Future [ Any ] , attributes : OperationAttributes = mapAsyncUnordered ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class Grouped ( n : Int , attributes : OperationAttributes = grouped ) extends AstNode {
2014-11-09 21:09:50 +01:00
require ( n > 0 , "n must be greater than 0" )
2014-12-01 20:07:55 +02:00
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-10-27 14:35:41 +01:00
}
2014-11-09 21:09:50 +01:00
//FIXME should be `n: Long`
2014-12-01 20:07:55 +02:00
final case class Take ( n : Int , attributes : OperationAttributes = take ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-05-08 19:34:58 +02:00
}
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
//FIXME should be `n: Long`
2014-12-01 20:07:55 +02:00
final case class Drop ( n : Int , attributes : OperationAttributes = drop ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-05-16 14:21:15 +02:00
}
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class Scan ( zero : Any , f : ( Any , Any ) ⇒ Any , attributes : OperationAttributes = scan ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-11-09 21:09:50 +01:00
2014-12-01 20:07:55 +02:00
final case class Buffer ( size : Int , overflowStrategy : OverflowStrategy , attributes : OperationAttributes = buffer ) extends AstNode {
2014-11-09 21:09:50 +01:00
require ( size > 0 , s" Buffer size must be larger than zero but was [ $size ] " )
2014-12-01 20:07:55 +02:00
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-11-09 21:09:50 +01:00
}
2014-12-01 20:07:55 +02:00
final case class Conflate ( seed : Any ⇒ Any , aggregate : ( Any , Any ) ⇒ Any , attributes : OperationAttributes = conflate ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-11-09 21:09:50 +01:00
}
2014-12-01 20:07:55 +02:00
final case class Expand ( seed : Any ⇒ Any , extrapolate : Any ⇒ ( Any , Any ) , attributes : OperationAttributes = expand ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-10-27 14:35:41 +01:00
}
2014-12-01 20:07:55 +02:00
final case class MapConcat ( f : Any ⇒ immutable . Seq [ Any ] , attributes : OperationAttributes = mapConcat ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-11-09 21:09:50 +01:00
}
2014-12-01 20:07:55 +02:00
final case class GroupBy ( f : Any ⇒ Any , attributes : OperationAttributes = groupBy ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class PrefixAndTail ( n : Int , attributes : OperationAttributes = prefixAndTail ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-11-09 21:09:50 +01:00
2014-12-01 20:07:55 +02:00
final case class SplitWhen ( p : Any ⇒ Boolean , attributes : OperationAttributes = splitWhen ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-05-16 14:21:15 +02:00
}
2014-03-28 15:44:18 +01:00
2014-12-01 20:07:55 +02:00
final case class ConcatAll ( attributes : OperationAttributes = concatAll ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-11-28 10:41:57 +01:00
}
2014-12-01 20:07:55 +02:00
case class DirectProcessor ( p : ( ) ⇒ Processor [ Any , Any ] , attributes : OperationAttributes = processor ) extends AstNode {
2014-11-20 21:59:33 +01:00
override def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-12-01 20:07:55 +02:00
}
2014-11-20 21:59:33 +01:00
2014-12-02 16:38:14 +01:00
case class DirectProcessorWithKey ( p : ( ) ⇒ ( Processor [ Any , Any ] , Any ) , key : Key [ _ ] , attributes : OperationAttributes = processorWithKey ) extends AstNode {
2014-11-20 21:59:33 +01:00
def withAttributes ( attributes : OperationAttributes ) = copy ( attributes = attributes )
2014-11-28 10:41:57 +01:00
}
2014-12-01 20:07:55 +02:00
2014-10-27 14:35:41 +01:00
sealed trait JunctionAstNode {
2014-12-01 20:07:55 +02:00
def attributes : OperationAttributes
2014-03-28 15:44:18 +01:00
}
2014-10-27 14:35:41 +01:00
// FIXME: Try to eliminate these
sealed trait FanInAstNode extends JunctionAstNode
sealed trait FanOutAstNode extends JunctionAstNode
2014-12-06 14:51:40 +01:00
/* *
* INTERNAL API
* `f` MUST be implemented as value of type `scala.FunctionN`
*/
sealed trait ZipWith extends FanInAstNode {
/* * MUST be implemented as type of FunctionN */
def f : Any
}
final case class Zip2With [ T1 , T2 ] ( f : Function2 [ T1 , T2 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip3With [ T1 , T2 , T3 ] ( f : Function3 [ T1 , T2 , T3 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip4With [ T1 , T2 , T3 , T4 ] ( f : Function4 [ T1 , T2 , T3 , T4 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip5With [ T1 , T2 , T3 , T4 , T5 ] ( f : Function5 [ T1 , T2 , T3 , T4 , T5 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip6With [ T1 , T2 , T3 , T4 , T5 , T6 ] ( f : Function6 [ T1 , T2 , T3 , T4 , T5 , T6 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip7With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 ] ( f : Function7 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip8With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 ] ( f : Function8 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip9With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 ] ( f : Function9 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip10With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 ] ( f : Function10 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip11With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 ] ( f : Function11 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip12With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 ] ( f : Function12 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip13With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 ] ( f : Function13 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip14With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 ] ( f : Function14 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip15With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 ] ( f : Function15 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip16With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 ] ( f : Function16 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip17With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 ] ( f : Function17 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip18With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 ] ( f : Function18 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip19With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 ] ( f : Function19 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip20With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , T20 ] ( f : Function20 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , T20 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip21With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , T20 , T21 ] ( f : Function21 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , T20 , T21 , Any ] , attributes : OperationAttributes ) extends ZipWith
final case class Zip22With [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , T20 , T21 , T22 ] ( f : Function22 [ T1 , T2 , T3 , T4 , T5 , T6 , T7 , T8 , T9 , T10 , T11 , T12 , T13 , T14 , T15 , T16 , T17 , T18 , T19 , T20 , T21 , T22 , Any ] , attributes : OperationAttributes ) extends ZipWith
2014-11-20 21:59:33 +01:00
2014-11-09 21:09:50 +01:00
// FIXME Why do we need this?
2014-12-01 20:07:55 +02:00
case class IdentityAstNode ( attributes : OperationAttributes ) extends JunctionAstNode
2014-11-06 12:38:04 +02:00
2014-12-01 20:07:55 +02:00
final case class Merge ( attributes : OperationAttributes ) extends FanInAstNode
final case class MergePreferred ( attributes : OperationAttributes ) extends FanInAstNode
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class Broadcast ( attributes : OperationAttributes ) extends FanOutAstNode
final case class Balance ( waitForAllDownstreams : Boolean , attributes : OperationAttributes ) extends FanOutAstNode
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class Unzip ( attributes : OperationAttributes ) extends FanOutAstNode
2014-10-30 09:13:25 +01:00
2014-12-01 20:07:55 +02:00
final case class Concat ( attributes : OperationAttributes ) extends FanInAstNode
2014-10-27 14:35:41 +01:00
2014-12-01 20:07:55 +02:00
final case class FlexiMergeNode ( factory : FlexiMergeImpl.MergeLogicFactory [ Any ] , attributes : OperationAttributes ) extends FanInAstNode
final case class FlexiRouteNode ( factory : FlexiRouteImpl.RouteLogicFactory [ Any ] , attributes : OperationAttributes ) extends FanOutAstNode
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
/* *
* INTERNAL API
*/
final object Optimizations {
val none : Optimizations = Optimizations ( collapsing = false , elision = false , simplification = false , fusion = false )
val all : Optimizations = Optimizations ( collapsing = true , elision = true , simplification = true , fusion = true )
}
/* *
* INTERNAL API
*/
final case class Optimizations ( collapsing : Boolean , elision : Boolean , simplification : Boolean , fusion : Boolean ) {
def isEnabled : Boolean = collapsing || elision || simplification || fusion
}
2014-03-30 09:27:19 +02:00
/* *
* INTERNAL API
*/
2014-10-27 14:35:41 +01:00
case class ActorBasedFlowMaterializer ( override val settings : MaterializerSettings ,
2014-11-17 22:50:15 +01:00
dispatchers : Dispatchers , // FIXME is this the right choice for loading an EC?
2014-10-27 14:35:41 +01:00
supervisor : ActorRef ,
flowNameCounter : AtomicLong ,
2014-11-09 21:09:50 +01:00
namePrefix : String ,
optimizations : Optimizations )
2014-05-22 08:40:41 +02:00
extends FlowMaterializer ( settings ) {
2014-10-27 14:35:41 +01:00
import Ast.AstNode
2014-04-02 09:03:59 +02:00
2014-08-21 12:35:38 +02:00
def withNamePrefix ( name : String ) : FlowMaterializer = this . copy ( namePrefix = name )
2014-05-08 19:34:58 +02:00
2014-11-09 21:09:50 +01:00
private [ this ] def nextFlowNameCount ( ) : Long = flowNameCounter . incrementAndGet ( )
2014-05-08 19:34:58 +02:00
2014-11-09 21:09:50 +01:00
private [ this ] def createFlowName ( ) : String = s" $namePrefix - ${ nextFlowNameCount ( ) } "
2014-05-08 19:34:58 +02:00
2014-11-09 21:09:50 +01:00
@tailrec private [ this ] def processorChain ( topProcessor : Processor [ _ , _ ] ,
ops : List [ AstNode ] ,
flowName : String ,
2014-11-28 10:41:57 +01:00
n : Int ,
materializedMap : MaterializedMap ) : ( Processor [ _ , _ ] , MaterializedMap ) =
2014-03-30 09:27:19 +02:00
ops match {
case op : : tail ⇒
2014-11-28 10:41:57 +01:00
val ( opProcessor , opMap ) = processorForNode [ Any , Any ] ( op , flowName , n )
2014-11-09 21:09:50 +01:00
opProcessor . subscribe ( topProcessor . asInstanceOf [ Subscriber [ Any ] ] )
2014-11-28 10:41:57 +01:00
processorChain ( opProcessor , tail , flowName , n - 1 , materializedMap . merge ( opMap ) )
2014-11-09 21:09:50 +01:00
case Nil ⇒
2014-11-28 10:41:57 +01:00
( topProcessor , materializedMap )
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
//FIXME Optimize the implementation of the optimizer (no joke)
// AstNodes are in reverse order, Fusable Ops are in order
2014-11-28 10:41:57 +01:00
private [ this ] final def optimize ( ops : List [ Ast . AstNode ] , mmFuture : Future [ MaterializedMap ] ) : ( List [ Ast . AstNode ] , Int ) = {
2014-11-12 10:43:39 +01:00
@tailrec def analyze ( rest : List [ Ast . AstNode ] , optimized : List [ Ast . AstNode ] , fuseCandidates : List [ Stage [ _ , _ ] ] ) : ( List [ Ast . AstNode ] , Int ) = {
2014-11-09 21:09:50 +01:00
//The `verify` phase
def verify ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case ( f : Ast . Fused ) : : _ ⇒ throw new IllegalStateException ( "Fused AST nodes not allowed to be present in the input to the optimizer: " + f )
//TODO Ast.Take(-Long.MaxValue..0) == stream doesn't do anything. Perhaps output warning for that?
case noMatch ⇒ noMatch
}
// The `elide` phase
// TODO / FIXME : This phase could be pulled out to be executed incrementally when building the Ast
def elide ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case noMatch if ! optimizations . elision || ( noMatch ne orig ) ⇒ orig
//Collapses consecutive Take's into one
2014-12-01 20:07:55 +02:00
case ( t1 : Ast . Take ) : : ( t2 : Ast . Take ) :: rest ⇒ ( if ( t1 . n < t2 . n ) t1 else t2 ) : : rest
2014-11-09 21:09:50 +01:00
//Collapses consecutive Drop's into one
2014-12-01 20:07:55 +02:00
case ( d1 : Ast . Drop ) : : ( d2 : Ast . Drop ) :: rest ⇒ new Ast . Drop ( d1 . n + d2 . n , d1 . attributes and d2 . attributes ) : : rest
2014-11-09 21:09:50 +01:00
2014-12-01 20:07:55 +02:00
case Ast . Drop ( n , _ ) : : rest if n < 1 ⇒ rest // a 0 or negative drop is a NoOp
2014-11-09 21:09:50 +01:00
case noMatch ⇒ noMatch
}
// The `simplify` phase
def simplify ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case noMatch if ! optimizations . simplification || ( noMatch ne orig ) ⇒ orig
// Two consecutive maps is equivalent to one pipelined map
2014-12-06 11:23:44 +01:00
case Ast . Map ( second , secondAttributes ) : : Ast . Map ( first , firstAttributes ) : : rest ⇒
Ast . Map ( first andThen second , firstAttributes and secondAttributes ) : : rest
2014-11-09 21:09:50 +01:00
case noMatch ⇒ noMatch
}
// the `Collapse` phase
def collapse ( rest : List [ Ast . AstNode ] , orig : List [ Ast . AstNode ] ) : List [ Ast . AstNode ] =
rest match {
case noMatch if ! optimizations . collapsing || ( noMatch ne orig ) ⇒ orig
// Collapses a filter and a map into a collect
2014-12-06 11:23:44 +01:00
case Ast . Map ( mapFn , mapAttributes ) : : Ast . Filter ( filFn , filAttributes ) : : rest ⇒
Ast . Collect ( { case i if filFn ( i ) ⇒ mapFn ( i ) } , filAttributes and mapAttributes ) : : rest
2014-11-09 21:09:50 +01:00
case noMatch ⇒ noMatch
}
// Tries to squeeze AstNode into a single fused pipeline
2014-11-12 10:43:39 +01:00
def ast2op ( head : Ast . AstNode , prev : List [ Stage [ _ , _ ] ] ) : List [ Stage [ _ , _ ] ] =
2014-11-09 21:09:50 +01:00
head match {
// Always-on below
2014-11-12 10:43:39 +01:00
case Ast . StageFactory ( mkStage , _ ) ⇒ mkStage ( ) : : prev
2014-11-09 21:09:50 +01:00
// Optimizations below
case noMatch if ! optimizations . fusion ⇒ prev
2014-12-01 20:07:55 +02:00
case Ast . Map ( f , _ ) ⇒ fusing . Map ( f ) : : prev
case Ast . Filter ( p , _ ) ⇒ fusing . Filter ( p ) : : prev
case Ast . Drop ( n , _ ) ⇒ fusing . Drop ( n ) : : prev
case Ast . Take ( n , _ ) ⇒ fusing . Take ( n ) : : prev
case Ast . Collect ( pf , _ ) ⇒ fusing . Collect ( pf ) : : prev
case Ast . Scan ( z , f , _ ) ⇒ fusing . Scan ( z , f ) : : prev
case Ast . Expand ( s , f , _ ) ⇒ fusing . Expand ( s , f ) : : prev
case Ast . Conflate ( s , f , _ ) ⇒ fusing . Conflate ( s , f ) : : prev
case Ast . Buffer ( n , s , _ ) ⇒ fusing . Buffer ( n , s ) : : prev
case Ast . MapConcat ( f , _ ) ⇒ fusing . MapConcat ( f ) : : prev
case Ast . Grouped ( n , _ ) ⇒ fusing . Grouped ( n ) : : prev
2014-11-09 21:09:50 +01:00
//FIXME Add more fusion goodies here
case _ ⇒ prev
}
// First verify, then try to elide, then try to simplify, then try to fuse
collapse ( rest , simplify ( rest , elide ( rest , verify ( rest , rest ) ) ) ) match {
case Nil ⇒
if ( fuseCandidates . isEmpty ) ( optimized . reverse , optimized . length ) // End of optimization run without fusion going on, wrap up
else ( ( Ast . Fused ( fuseCandidates ) : : optimized ) . reverse , optimized . length + 1 ) // End of optimization run with fusion going on, so add it to the optimized stack
// If the Ast was changed this pass simply recur
case modified if modified ne rest ⇒ analyze ( modified , optimized , fuseCandidates )
// No changes to the Ast, lets try to see if we can squeeze the current head Ast node into a fusion pipeline
case head : : rest ⇒
ast2op ( head , fuseCandidates ) match {
case Nil ⇒ analyze ( rest , head : : optimized , Nil )
case `fuseCandidates` ⇒ analyze ( rest , head : : Ast . Fused ( fuseCandidates ) :: optimized , Nil )
case newFuseCandidates ⇒ analyze ( rest , optimized , newFuseCandidates )
}
}
}
val result = analyze ( ops , Nil , Nil )
result
2014-03-30 09:27:19 +02:00
}
// Ops come in reverse order
2014-12-02 16:38:14 +01:00
override def materialize [ In , Out ] ( source : Source [ In ] , sink : Sink [ Out ] , rawOps : List [ Ast . AstNode ] , keys : List [ Key [ _ ] ] ) : MaterializedMap = {
2014-11-09 21:09:50 +01:00
val flowName = createFlowName ( ) //FIXME: Creates Id even when it is not used in all branches below
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
def throwUnknownType ( typeName : String , s : AnyRef ) : Nothing =
throw new MaterializationException ( s" unknown $typeName type ${ s . getClass } " )
2014-10-27 14:35:41 +01:00
2014-11-09 21:09:50 +01:00
def attachSink ( pub : Publisher [ Out ] , flowName : String ) = sink match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSink [ Out ] ⇒ s . attach ( pub , this , flowName )
case s ⇒ throwUnknownType ( "Sink" , s )
}
2014-11-09 21:09:50 +01:00
def attachSource ( sub : Subscriber [ In ] , flowName : String ) = source match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSource [ In ] ⇒ s . attach ( sub , this , flowName )
case s ⇒ throwUnknownType ( "Source" , s )
}
2014-11-09 21:09:50 +01:00
def createSink ( flowName : String ) = sink match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSink [ In ] ⇒ s . create ( this , flowName )
case s ⇒ throwUnknownType ( "Sink" , s )
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
def createSource ( flowName : String ) = source match {
2014-10-27 14:35:41 +01:00
case s : ActorFlowSource [ Out ] ⇒ s . create ( this , flowName )
case s ⇒ throwUnknownType ( "Source" , s )
}
def isActive ( s : AnyRef ) = s match {
case s : ActorFlowSource [ _ ] ⇒ s . isActive
case s : ActorFlowSink [ _ ] ⇒ s . isActive
case s : Source [ _ ] ⇒ throwUnknownType ( "Source" , s )
case s : Sink [ _ ] ⇒ throwUnknownType ( "Sink" , s )
}
2014-12-02 16:38:14 +01:00
def addIfKeyed ( m : Materializable , v : Any , map : MaterializedMap ) = m match {
case km : KeyedMaterializable [ _ ] ⇒ map . updated ( km , v )
case _ ⇒ map
}
2014-10-27 14:35:41 +01:00
2014-11-28 10:41:57 +01:00
val mmPromise = Promise [ MaterializedMap ]
val mmFuture = mmPromise . future
val ( sourceValue , sinkValue , pipeMap ) =
2014-11-09 21:09:50 +01:00
if ( rawOps . isEmpty ) {
2014-10-27 14:35:41 +01:00
if ( isActive ( sink ) ) {
2014-11-09 21:09:50 +01:00
val ( sub , value ) = createSink ( flowName )
2014-11-28 10:41:57 +01:00
( attachSource ( sub , flowName ) , value , MaterializedMap . empty )
2014-10-27 14:35:41 +01:00
} else if ( isActive ( source ) ) {
2014-11-09 21:09:50 +01:00
val ( pub , value ) = createSource ( flowName )
2014-11-28 10:41:57 +01:00
( value , attachSink ( pub , flowName ) , MaterializedMap . empty )
2014-10-27 14:35:41 +01:00
} else {
2014-11-28 10:41:57 +01:00
val ( id , empty ) = processorForNode [ In , Out ] ( identityStageNode , flowName , 1 )
( attachSource ( id , flowName ) , attachSink ( id , flowName ) , empty )
2014-10-27 14:35:41 +01:00
}
} else {
2014-11-28 10:41:57 +01:00
val ( ops , opsSize ) = if ( optimizations . isEnabled ) optimize ( rawOps , mmFuture ) else ( rawOps , rawOps . length )
val ( last , lastMap ) = processorForNode [ Any , Out ] ( ops . head , flowName , opsSize )
val ( first , map ) = processorChain ( last , ops . tail , flowName , opsSize - 1 , lastMap )
( attachSource ( first . asInstanceOf [ Processor [ In , Any ] ] , flowName ) , attachSink ( last , flowName ) , map )
2014-10-27 14:35:41 +01:00
}
2014-12-02 16:38:14 +01:00
val sourceSinkMap = addIfKeyed ( sink , sinkValue , addIfKeyed ( source , sourceValue , pipeMap ) )
2014-11-28 10:41:57 +01:00
if ( keys . isEmpty ) sourceSinkMap
else ( sourceSinkMap /: keys ) {
case ( mm , k ) ⇒ mm . updated ( k , k . materialize ( mm ) )
}
2014-03-30 09:27:19 +02:00
}
2014-11-09 21:09:50 +01:00
//FIXME Should this be a dedicated AstNode?
2014-12-01 20:07:55 +02:00
private [ this ] val identityStageNode = Ast . StageFactory ( ( ) ⇒ FlowOps . identityStage [ Any ] , Ast . Defaults . identityOp )
2014-05-07 15:56:02 +02:00
2014-11-17 22:50:15 +01:00
def executionContext : ExecutionContext = dispatchers . lookup ( settings . dispatcher match {
case Deploy . NoDispatcherGiven ⇒ Dispatchers . DefaultDispatcherId
case other ⇒ other
} )
2014-10-27 14:35:41 +01:00
/* *
* INTERNAL API
*/
2014-11-28 10:41:57 +01:00
private [ akka ] def processorForNode [ In , Out ] ( op : AstNode , flowName : String , n : Int ) : ( Processor [ In , Out ] , MaterializedMap ) = op match {
// FIXME #16376 should probably be replaced with an ActorFlowProcessor similar to ActorFlowSource/Sink
2014-12-01 20:07:55 +02:00
case Ast . DirectProcessor ( p , _ ) ⇒ ( p ( ) . asInstanceOf [ Processor [ In , Out ] ] , MaterializedMap . empty )
case Ast . DirectProcessorWithKey ( p , key , _ ) ⇒
2014-11-28 10:41:57 +01:00
val ( processor , value ) = p ( )
( processor . asInstanceOf [ Processor [ In , Out ] ] , MaterializedMap . empty . updated ( key , value ) )
case _ ⇒
2014-12-01 20:07:55 +02:00
( ActorProcessorFactory [ In , Out ] ( actorOf ( ActorProcessorFactory . props ( this , op ) , s" $flowName - $n - ${ op . attributes . name } " , op ) ) , MaterializedMap . empty )
2014-11-28 10:41:57 +01:00
}
2014-08-21 12:35:38 +02:00
2014-12-01 20:07:55 +02:00
private [ akka ] def actorOf ( props : Props , name : String ) : ActorRef =
actorOf ( props , name , settings . dispatcher )
private [ akka ] def actorOf ( props : Props , name : String , ast : Ast . JunctionAstNode ) : ActorRef =
actorOf ( props , name , ast . attributes . settings ( settings ) . dispatcher )
private [ akka ] def actorOf ( props : Props , name : String , ast : AstNode ) : ActorRef =
actorOf ( props , name , ast . attributes . settings ( settings ) . dispatcher )
private [ akka ] def actorOf ( props : Props , name : String , dispatcher : String ) : ActorRef = supervisor match {
2014-08-21 12:35:38 +02:00
case ref : LocalActorRef ⇒
2014-12-01 20:07:55 +02:00
ref . underlying . attachChild ( props . withDispatcher ( dispatcher ) , name , systemService = false )
2014-08-21 12:35:38 +02:00
case ref : RepointableActorRef ⇒
if ( ref . isStarted )
2014-12-01 20:07:55 +02:00
ref . underlying . asInstanceOf [ ActorCell ] . attachChild ( props . withDispatcher ( dispatcher ) , name , systemService = false )
2014-08-21 12:35:38 +02:00
else {
implicit val timeout = ref . system . settings . CreationTimeout
2014-12-01 20:07:55 +02:00
val f = ( supervisor ? StreamSupervisor . Materialize ( props . withDispatcher ( dispatcher ) , name ) ) . mapTo [ ActorRef ]
2014-08-21 12:35:38 +02:00
Await . result ( f , timeout . duration )
}
2014-11-09 21:09:50 +01:00
case unknown ⇒
throw new IllegalStateException ( s" Stream supervisor must be a local actor, was [ ${ unknown . getClass . getName } ] " )
2014-08-21 12:35:38 +02:00
}
2014-11-09 21:09:50 +01:00
// FIXME Investigate possibility of using `enableOperationsFusion` in `materializeJunction`
2014-10-27 14:35:41 +01:00
override def materializeJunction [ In , Out ] ( op : Ast . JunctionAstNode , inputCount : Int , outputCount : Int ) : ( immutable.Seq [ Subscriber [ In ] ] , immutable . Seq [ Publisher [ Out ] ] ) = {
2014-12-01 20:07:55 +02:00
val actorName = s" ${ createFlowName ( ) } - ${ op . attributes . name } "
val transformedSettings = op . attributes . settings ( settings )
2014-10-27 14:35:41 +01:00
op match {
case fanin : Ast . FanInAstNode ⇒
2014-12-01 20:07:55 +02:00
val props = fanin match {
case Ast . Merge ( _ ) ⇒ FairMerge . props ( transformedSettings , inputCount )
case Ast . MergePreferred ( _ ) ⇒ UnfairMerge . props ( transformedSettings , inputCount )
2014-12-06 14:51:40 +01:00
case z : Ast . ZipWith ⇒ ZipWith . props ( transformedSettings , z . f )
2014-12-01 20:07:55 +02:00
case Ast . Concat ( _ ) ⇒ Concat . props ( transformedSettings )
case Ast . FlexiMergeNode ( merger , _ ) ⇒ FlexiMergeImpl . props ( transformedSettings , inputCount , merger . createMergeLogic ( ) )
2014-10-27 14:35:41 +01:00
}
2014-12-01 20:07:55 +02:00
val impl = actorOf ( props , actorName , fanin )
2014-10-27 14:35:41 +01:00
2014-10-31 08:53:27 +01:00
val publisher = new ActorPublisher [ Out ] ( impl )
2014-10-27 14:35:41 +01:00
impl ! ExposedPublisher ( publisher . asInstanceOf [ ActorPublisher [ Any ] ] )
2014-11-20 21:59:33 +01:00
val subscribers = Vector . tabulate ( inputCount ) ( FanIn . SubInput [ In ] ( impl , _ ) ) // FIXME switch to List.tabulate for inputCount < 8?
2014-10-27 14:35:41 +01:00
( subscribers , List ( publisher ) )
case fanout : Ast . FanOutAstNode ⇒
2014-12-01 20:07:55 +02:00
val props = fanout match {
case Ast . Broadcast ( _ ) ⇒ Broadcast . props ( transformedSettings , outputCount )
case Ast . Balance ( waitForAllDownstreams , _ ) ⇒ Balance . props ( transformedSettings , outputCount , waitForAllDownstreams )
case Ast . Unzip ( _ ) ⇒ Unzip . props ( transformedSettings )
case Ast . FlexiRouteNode ( route , _ ) ⇒ FlexiRouteImpl . props ( transformedSettings , outputCount , route . createRouteLogic ( ) )
2014-10-27 14:35:41 +01:00
}
2014-12-01 20:07:55 +02:00
val impl = actorOf ( props , actorName , fanout )
2014-10-27 14:35:41 +01:00
2014-11-20 21:59:33 +01:00
val publishers = Vector . tabulate ( outputCount ) ( id ⇒ new ActorPublisher [ Out ] ( impl ) { // FIXME switch to List.tabulate for inputCount < 8?
2014-10-27 14:35:41 +01:00
override val wakeUpMsg = FanOut . SubstreamSubscribePending ( id )
} )
impl ! FanOut . ExposedPublishers ( publishers . asInstanceOf [ immutable . Seq [ ActorPublisher [ Any ] ] ] )
val subscriber = ActorSubscriber [ In ] ( impl )
( List ( subscriber ) , publishers )
2014-11-06 12:38:04 +02:00
2014-12-01 20:07:55 +02:00
case identity @ Ast . IdentityAstNode ( attr ) ⇒ // FIXME Why is IdentityAstNode a JunctionAStNode?
2014-11-28 10:41:57 +01:00
// We can safely ignore the materialized map that gets created here since it will be empty
2014-12-01 20:07:55 +02:00
val id = List ( processorForNode [ In , Out ] ( identityStageNode , attr . name , 1 ) . _1 ) // FIXME is `identity.name` appropriate/unique here?
2014-11-09 21:09:50 +01:00
( id , id )
2014-05-07 15:56:02 +02:00
}
2014-10-27 14:35:41 +01:00
2014-05-07 15:56:02 +02:00
}
2014-03-30 09:27:19 +02:00
}
2014-05-08 19:34:58 +02:00
/* *
* INTERNAL API
*/
private [ akka ] object FlowNameCounter extends ExtensionId [ FlowNameCounter ] with ExtensionIdProvider {
override def get ( system : ActorSystem ) : FlowNameCounter = super . get ( system )
override def lookup = FlowNameCounter
override def createExtension ( system : ExtendedActorSystem ) : FlowNameCounter = new FlowNameCounter
}
/* *
* INTERNAL API
*/
private [ akka ] class FlowNameCounter extends Extension {
val counter = new AtomicLong ( 0 )
2014-08-21 12:35:38 +02:00
}
/* *
* INTERNAL API
*/
private [ akka ] object StreamSupervisor {
def props ( settings : MaterializerSettings ) : Props = Props ( new StreamSupervisor ( settings ) )
case class Materialize ( props : Props , name : String )
}
private [ akka ] class StreamSupervisor ( settings : MaterializerSettings ) extends Actor {
import StreamSupervisor._
2014-10-27 14:35:41 +01:00
override def supervisorStrategy = SupervisorStrategy . stoppingStrategy
2014-08-21 12:35:38 +02:00
def receive = {
case Materialize ( props , name ) ⇒
val impl = context . actorOf ( props , name )
sender ( ) ! impl
}
2014-10-27 14:35:41 +01:00
}
/* *
* INTERNAL API
*/
private [ akka ] object ActorProcessorFactory {
import Ast._
def props ( materializer : FlowMaterializer , op : AstNode ) : Props = {
2014-11-09 21:09:50 +01:00
val settings = materializer . settings // USE THIS TO AVOID CLOSING OVER THE MATERIALIZER BELOW
2014-12-01 20:07:55 +02:00
op match {
2014-11-12 10:43:39 +01:00
case Fused ( ops , _ ) ⇒ ActorInterpreter . props ( settings , ops )
2014-12-01 20:07:55 +02:00
case Map ( f , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Map ( f ) ) )
case Filter ( p , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Filter ( p ) ) )
case Drop ( n , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Drop ( n ) ) )
case Take ( n , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Take ( n ) ) )
case Collect ( pf , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Collect ( pf ) ) )
case Scan ( z , f , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Scan ( z , f ) ) )
case Expand ( s , f , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Expand ( s , f ) ) )
case Conflate ( s , f , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Conflate ( s , f ) ) )
case Buffer ( n , s , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Buffer ( n , s ) ) )
case MapConcat ( f , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . MapConcat ( f ) ) )
case MapAsync ( f , _ ) ⇒ MapAsyncProcessorImpl . props ( settings , f )
case MapAsyncUnordered ( f , _ ) ⇒ MapAsyncUnorderedProcessorImpl . props ( settings , f )
case Grouped ( n , _ ) ⇒ ActorInterpreter . props ( settings , List ( fusing . Grouped ( n ) ) )
case GroupBy ( f , _ ) ⇒ GroupByProcessorImpl . props ( settings , f )
case PrefixAndTail ( n , _ ) ⇒ PrefixAndTailImpl . props ( settings , n )
case SplitWhen ( p , _ ) ⇒ SplitWhenProcessorImpl . props ( settings , p )
case ConcatAll ( _ ) ⇒ ConcatAllImpl . props ( materializer ) //FIXME closes over the materializer, is this good?
2014-11-12 10:43:39 +01:00
case StageFactory ( mkStage , _ ) ⇒ ActorInterpreter . props ( settings , List ( mkStage ( ) ) )
case TimerTransform ( mkStage , _ ) ⇒ TimerTransformerProcessorsImpl . props ( settings , mkStage ( ) )
2014-12-01 20:07:55 +02:00
}
2014-10-27 14:35:41 +01:00
}
def apply [ I , O ] ( impl : ActorRef ) : ActorProcessor [ I , O ] = {
val p = new ActorProcessor [ I , O ] ( impl )
impl ! ExposedPublisher ( p . asInstanceOf [ ActorPublisher [ Any ] ] )
p
}
}