2015-01-28 14:19:50 +01:00
/* *
* Copyright ( C ) 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.impl.Junctions._
import akka.stream.impl.GenJunctions._
import akka.stream.impl.Stages. { MaterializingStageFactory , StageModule }
import akka.stream.impl._
import akka.stream.impl.StreamLayout._
import akka.stream._
import OperationAttributes.name
import scala.collection.immutable
import scala.annotation.unchecked.uncheckedVariance
import scala.annotation.tailrec
object Merge {
/* *
2015-04-14 08:59:37 +02:00
* Create a new `Merge` with the specified number of input ports .
2015-01-28 14:19:50 +01:00
*
* @param inputPorts number of input ports
*/
2015-04-14 08:59:37 +02:00
def apply [ T ] ( inputPorts : Int ) : Merge [ T ] = {
val shape = new UniformFanInShape [ T , T ] ( inputPorts )
new Merge ( inputPorts , shape , new MergeModule ( shape , OperationAttributes . name ( "Merge" ) ) )
}
2015-01-28 14:19:50 +01:00
}
/* *
* Merge several streams , taking elements as they arrive from input streams
2015-04-14 08:59:37 +02:00
* ( picking randomly when several have elements ready ) .
2015-01-28 14:19:50 +01:00
*
2015-04-09 16:11:16 +02:00
* ''' Emits when ''' one of the inputs has an element available
*
* ''' Backpressures when ''' downstream backpressures
*
* ''' Completes when ''' all upstreams complete
*
* ''' Cancels when ''' downstream cancels
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
class Merge [ T ] private ( inputPorts : Int ,
override val shape : UniformFanInShape [ T , T ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ UniformFanInShape [ T , T ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : Merge [ T ] =
new Merge ( inputPorts , shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : Merge [ T ] = withAttributes ( OperationAttributes . name ( name ) )
}
2015-01-28 14:19:50 +01:00
object MergePreferred {
import FanInShape._
final class MergePreferredShape [ T ] ( val secondaryPorts : Int , _init : Init [ T ] ) extends UniformFanInShape [ T , T ] ( secondaryPorts , _init ) {
def this ( secondaryPorts : Int , name : String ) = this ( secondaryPorts , Name ( name ) )
override protected def construct ( init : Init [ T ] ) : FanInShape [ T ] = new MergePreferredShape ( secondaryPorts , init )
override def deepCopy ( ) : MergePreferredShape [ T ] = super . deepCopy ( ) . asInstanceOf [ MergePreferredShape [ T ] ]
val preferred = newInlet [ T ] ( "preferred" )
}
/* *
2015-04-14 08:59:37 +02:00
* Create a new `MergePreferred` with the specified number of secondary input ports .
2015-01-28 14:19:50 +01:00
*
* @param secondaryPorts number of secondary input ports
*/
2015-04-14 08:59:37 +02:00
def apply [ T ] ( secondaryPorts : Int ) : MergePreferred [ T ] = {
val shape = new MergePreferredShape [ T ] ( secondaryPorts , "MergePreferred" )
new MergePreferred ( secondaryPorts , shape , new MergePreferredModule ( shape , OperationAttributes . name ( "MergePreferred" ) ) )
}
2015-01-28 14:19:50 +01:00
}
/* *
2015-04-14 08:59:37 +02:00
* Merge several streams , taking elements as they arrive from input streams
* ( picking from preferred when several have elements ready ) .
2015-01-28 14:19:50 +01:00
*
2015-04-14 08:59:37 +02:00
* A `MergePreferred` has one `out` port , one `preferred` input port and 0 or more secondary `in` ports .
2015-04-09 16:11:16 +02:00
*
* ''' Emits when ''' one of the inputs has an element available , preferring
* a specified input if multiple have elements available
*
* ''' Backpressures when ''' downstream backpressures
*
* ''' Completes when ''' all upstreams complete
*
* ''' Cancels when ''' downstream cancels
*
* A `Broadcast` has one `in` port and 2 or more `out` ports .
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
class MergePreferred [ T ] private ( secondaryPorts : Int ,
override val shape : MergePreferred.MergePreferredShape [ T ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ MergePreferred . MergePreferredShape [ T ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : MergePreferred [ T ] =
new MergePreferred ( secondaryPorts , shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : MergePreferred [ T ] = withAttributes ( OperationAttributes . name ( name ) )
}
2015-01-28 14:19:50 +01:00
object Broadcast {
/* *
2015-04-14 08:59:37 +02:00
* Create a new `Broadcast` with the specified number of output ports .
2015-01-28 14:19:50 +01:00
*
* @param outputPorts number of output ports
*/
2015-04-14 08:59:37 +02:00
def apply [ T ] ( outputPorts : Int ) : Broadcast [ T ] = {
val shape = new UniformFanOutShape [ T , T ] ( outputPorts )
new Broadcast ( outputPorts , shape , new BroadcastModule ( shape , OperationAttributes . name ( "Broadcast" ) ) )
}
2015-01-28 14:19:50 +01:00
}
/* *
2015-04-09 16:11:16 +02:00
* Fan - out the stream to several streams emitting each incoming upstream element to all downstream consumers .
* It will not shut down until the subscriptions for at least two downstream subscribers have been established .
2015-01-28 14:19:50 +01:00
*
2015-04-09 16:11:16 +02:00
* ''' Emits when ''' all of the outputs stops backpressuring and there is an input element available
*
* ''' Backpressures when ''' any of the outputs backpressure
*
* ''' Completes when ''' upstream completes
*
* ''' Cancels when ''' all downstreams cancel
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
class Broadcast [ T ] private ( outputPorts : Int ,
override val shape : UniformFanOutShape [ T , T ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ UniformFanOutShape [ T , T ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : Broadcast [ T ] =
new Broadcast ( outputPorts , shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : Broadcast [ T ] = withAttributes ( OperationAttributes . name ( name ) )
}
2015-01-28 14:19:50 +01:00
object Balance {
/* *
2015-04-14 08:59:37 +02:00
* Create a new `Balance` with the specified number of output ports .
2015-01-28 14:19:50 +01:00
*
* @param outputPorts number of output ports
* @param waitForAllDownstreams if you use `waitForAllDownstreams = true` it will not start emitting
* elements to downstream outputs until all of them have requested at least one element ,
* default value is `false`
*/
2015-04-14 08:59:37 +02:00
def apply [ T ] ( outputPorts : Int , waitForAllDownstreams : Boolean = false ) : Balance [ T ] = {
val shape = new UniformFanOutShape [ T , T ] ( outputPorts )
new Balance ( outputPorts , waitForAllDownstreams , shape ,
new BalanceModule ( shape , waitForAllDownstreams , OperationAttributes . name ( "Balance" ) ) )
}
2015-01-28 14:19:50 +01:00
}
/* *
2015-04-09 16:11:16 +02:00
* Fan - out the stream to several streams . Each upstream element is emitted to the first available downstream consumer .
* It will not shut down until the subscriptions
2015-04-14 08:59:37 +02:00
* for at least two downstream subscribers have been established .
2015-01-28 14:19:50 +01:00
*
2015-04-14 08:59:37 +02:00
* A `Balance` has one `in` port and 2 or more `out` ports .
2015-04-09 16:11:16 +02:00
*
* ''' Emits when ''' any of the outputs stops backpressuring ; emits the element to the first available output
*
* ''' Backpressures when ''' all of the outputs backpressure
*
* ''' Completes when ''' upstream completes
*
* ''' Cancels when ''' all downstreams cancel
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
class Balance [ T ] private ( outputPorts : Int ,
waitForAllDownstreams : Boolean ,
override val shape : UniformFanOutShape [ T , T ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ UniformFanOutShape [ T , T ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : Balance [ T ] =
new Balance ( outputPorts , waitForAllDownstreams , shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : Balance [ T ] = withAttributes ( OperationAttributes . name ( name ) )
}
2015-01-28 14:19:50 +01:00
object Zip {
/* *
2015-04-14 08:59:37 +02:00
* Create a new `Zip` .
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
def apply [ A , B ] ( ) : Zip [ A , B ] = {
val shape = new FanInShape2 [ A , B , ( A , B ) ] ( "Zip" )
new Zip ( shape , new ZipWith2Module [ A , B , ( A , B ) ] ( shape , Keep . both , OperationAttributes . name ( "Zip" ) ) )
}
2015-01-28 14:19:50 +01:00
}
/* *
2015-04-14 08:59:37 +02:00
* Combine the elements of 2 streams into a stream of tuples .
*
* A `Zip` has a `left` and a `right` input port and one `out` port
2015-04-09 16:11:16 +02:00
*
* ''' Emits when ''' all of the inputs has an element available
*
* ''' Backpressures when ''' downstream backpressures
*
* ''' Completes when ''' any upstream completes
*
* ''' Cancels when ''' downstream cancels
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
class Zip [ A , B ] private ( override val shape : FanInShape2 [ A , B , ( A , B ) ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ FanInShape2 [ A , B , ( A , B ) ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : Zip [ A , B ] =
new Zip ( shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : Zip [ A , B ] = withAttributes ( OperationAttributes . name ( name ) )
}
2015-04-09 16:11:16 +02:00
/* *
* Combine the elements of multiple streams into a stream of combined elements using a combiner function .
*
* ''' Emits when ''' all of the inputs has an element available
*
* ''' Backpressures when ''' downstream backpressures
*
* ''' Completes when ''' any upstream completes
*
* ''' Cancels when ''' downstream cancels
*/
2015-01-28 14:19:50 +01:00
object ZipWith extends ZipWithApply
/* *
* Takes a stream of pair elements and splits each pair to two output streams .
*
* An `Unzip` has one `in` port and one `left` and one `right` output port .
2015-04-09 16:11:16 +02:00
*
* ''' Emits when ''' all of the outputs stops backpressuring and there is an input element available
*
* ''' Backpressures when ''' any of the outputs backpressures
*
* ''' Completes when ''' upstream completes
*
* ''' Cancels when ''' any downstream cancels
2015-01-28 14:19:50 +01:00
*/
object Unzip {
/* *
2015-04-14 08:59:37 +02:00
* Create a new `Unzip` .
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
def apply [ A , B ] ( ) : Unzip [ A , B ] = {
val shape = new FanOutShape2 [ ( A , B ) , A , B ] ( "Unzip" )
new Unzip ( shape , new UnzipModule ( shape , OperationAttributes . name ( "Unzip" ) ) )
}
}
/* *
* Combine the elements of multiple streams into a stream of the combined elements .
*/
class Unzip [ A , B ] private ( override val shape : FanOutShape2 [ ( A , B ) , A , B ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ FanOutShape2 [ ( A , B ) , A , B ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : Unzip [ A , B ] =
new Unzip ( shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : Unzip [ A , B ] = withAttributes ( OperationAttributes . name ( name ) )
}
object Concat {
/* *
* Create a new `Concat` .
*/
def apply [ T ] ( ) : Concat [ T ] = {
val shape = new UniformFanInShape [ T , T ] ( 2 )
new Concat ( shape , new ConcatModule ( shape , OperationAttributes . name ( "Concat" ) ) )
}
2015-01-28 14:19:50 +01:00
}
/* *
* Takes two streams and outputs one stream formed from the two input streams
* by first emitting all of the elements from the first stream and then emitting
* all of the elements from the second stream .
*
* A `Concat` has one `first` port , one `second` port and one `out` port .
2015-04-09 16:11:16 +02:00
*
* ''' Emits when ''' the current stream has an element available ; if the current input completes , it tries the next one
*
* ''' Backpressures when ''' downstream backpressures
*
* ''' Completes when ''' all upstreams complete
*
* ''' Cancels when ''' downstream cancels
2015-01-28 14:19:50 +01:00
*/
2015-04-14 08:59:37 +02:00
class Concat [ T ] private ( override val shape : UniformFanInShape [ T , T ] ,
private [ stream ] override val module : StreamLayout . Module )
extends Graph [ UniformFanInShape [ T , T ] , Unit ] {
override def withAttributes ( attr : OperationAttributes ) : Concat [ T ] =
new Concat ( shape , module . withAttributes ( attr ) . wrap ( ) )
override def named ( name : String ) : Concat [ T ] = withAttributes ( OperationAttributes . name ( name ) )
2015-01-28 14:19:50 +01:00
}
object FlowGraph extends GraphApply {
2015-03-30 14:22:12 +02:00
class Builder [ + M ] private [ stream ] ( ) {
2015-01-28 14:19:50 +01:00
private var moduleInProgress : Module = EmptyModule
2015-04-24 12:14:04 +02:00
def addEdge [ A , B , M2 ] ( from : Outlet [ A ] , via : Graph [ FlowShape [ A , B ] , M2 ] , to : Inlet [ B ] ) : Unit = {
2015-01-28 14:19:50 +01:00
val flowCopy = via . module . carbonCopy
moduleInProgress =
moduleInProgress
. grow ( flowCopy )
. connect ( from , flowCopy . shape . inlets . head )
. connect ( flowCopy . shape . outlets . head , to )
}
def addEdge [ T ] ( from : Outlet [ T ] , to : Inlet [ T ] ) : Unit = {
moduleInProgress = moduleInProgress . connect ( from , to )
}
/* *
* Import a graph into this module , performing a deep copy , discarding its
* materialized value and returning the copied Ports that are now to be
* connected .
*/
def add [ S <: Shape ] ( graph : Graph [ S , _ ] ) : S = {
2015-04-13 20:10:59 +03:00
if ( StreamLayout . Debug ) graph . module . validate ( )
2015-01-28 14:19:50 +01:00
val copy = graph . module . carbonCopy
moduleInProgress = moduleInProgress . grow ( copy )
graph . shape . copyFromPorts ( copy . shape . inlets , copy . shape . outlets ) . asInstanceOf [ S ]
}
2015-04-23 13:32:07 +02:00
/* *
* INTERNAL API .
*
* This is only used by the materialization - importing apply methods of Source ,
* Flow , Sink and Graph .
*/
private [ stream ] def add [ S <: Shape , A ] ( graph : Graph [ S , _ ] , transform : ( A ) ⇒ Any ) : S = {
if ( StreamLayout . Debug ) graph . module . validate ( )
val copy = graph . module . carbonCopy
moduleInProgress = moduleInProgress . grow ( copy . transformMaterializedValue ( transform . asInstanceOf [ Any ⇒ Any ] ) )
graph . shape . copyFromPorts ( copy . shape . inlets , copy . shape . outlets ) . asInstanceOf [ S ]
}
2015-01-28 14:19:50 +01:00
/* *
* INTERNAL API .
*
* This is only used by the materialization - importing apply methods of Source ,
* Flow , Sink and Graph .
*/
private [ stream ] def add [ S <: Shape , A , B ] ( graph : Graph [ S , _ ] , combine : ( A , B ) ⇒ Any ) : S = {
2015-04-13 20:10:59 +03:00
if ( StreamLayout . Debug ) graph . module . validate ( )
2015-01-28 14:19:50 +01:00
val copy = graph . module . carbonCopy
moduleInProgress = moduleInProgress . grow ( copy , combine )
graph . shape . copyFromPorts ( copy . shape . inlets , copy . shape . outlets ) . asInstanceOf [ S ]
}
def add [ T ] ( s : Source [ T , _ ] ) : Outlet [ T ] = add ( s : Graph [ SourceShape [ T ] , _ ] ) . outlet
def add [ T ] ( s : Sink [ T , _ ] ) : Inlet [ T ] = add ( s : Graph [ SinkShape [ T ] , _ ] ) . inlet
2015-03-30 14:22:12 +02:00
/* *
* Returns an [ [ Outlet ] ] that gives access to the materialized value of this graph . Once the graph is materialized
* this outlet will emit exactly one element which is the materialized value . It is possible to expose this
* outlet as an externally accessible outlet of a [ [ Source ] ] , [ [ Sink ] ] , [ [ Flow ] ] or [ [ BidiFlow ] ] .
*
* It is possible to call this method multiple times to get multiple [ [ Outlet ] ] instances if necessary . All of
* the outlets will emit the materialized value .
*
* Be careful to not to feed the result of this outlet to a stage that produces the materialized value itself ( for
* example to a [ [ Sink # fold ] ] that contributes to the materialized value ) since that might lead to an unresolvable
* dependency cycle .
*
* @return The outlet that will emit the materialized value .
*/
2015-05-05 10:29:41 +02:00
def materializedValue : Outlet [ M ] = {
2015-03-30 14:22:12 +02:00
val module = new MaterializedValueSource [ Any ]
moduleInProgress = moduleInProgress . grow ( module )
module . shape . outlet . asInstanceOf [ Outlet [ M ] ]
}
2015-01-28 14:19:50 +01:00
private [ stream ] def andThen ( port : OutPort , op : StageModule ) : Unit = {
moduleInProgress =
moduleInProgress
. grow ( op )
. connect ( port , op . inPort )
}
private [ stream ] def buildRunnable [ Mat ] ( ) : RunnableFlow [ Mat ] = {
if ( ! moduleInProgress . isRunnable ) {
throw new IllegalArgumentException (
"Cannot build the RunnableFlow because there are unconnected ports: " +
( moduleInProgress . outPorts ++ moduleInProgress . inPorts ) . mkString ( ", " ) )
}
2015-03-30 14:22:12 +02:00
new RunnableFlow ( moduleInProgress . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
private [ stream ] def buildSource [ T , Mat ] ( outlet : Outlet [ T ] ) : Source [ T , Mat ] = {
if ( moduleInProgress . isRunnable )
throw new IllegalArgumentException ( "Cannot build the Source since no ports remain open" )
if ( ! moduleInProgress . isSource )
throw new IllegalArgumentException (
s" Cannot build Source with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . outPorts . head != outlet )
throw new IllegalArgumentException ( s" provided Outlet $outlet does not equal the module’ s open Outlet ${ moduleInProgress . outPorts . head } " )
2015-03-30 14:22:12 +02:00
new Source ( moduleInProgress . replaceShape ( SourceShape ( outlet ) ) . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
private [ stream ] def buildFlow [ In , Out , Mat ] ( inlet : Inlet [ In ] , outlet : Outlet [ Out ] ) : Flow [ In , Out , Mat ] = {
if ( ! moduleInProgress . isFlow )
throw new IllegalArgumentException (
s" Cannot build Flow with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . outPorts . head != outlet )
throw new IllegalArgumentException ( s" provided Outlet $outlet does not equal the module’ s open Outlet ${ moduleInProgress . outPorts . head } " )
if ( moduleInProgress . inPorts . head != inlet )
throw new IllegalArgumentException ( s" provided Inlet $inlet does not equal the module’ s open Inlet ${ moduleInProgress . inPorts . head } " )
2015-03-30 14:22:12 +02:00
new Flow ( moduleInProgress . replaceShape ( FlowShape ( inlet , outlet ) ) . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
2015-03-04 15:22:33 +01:00
private [ stream ] def buildBidiFlow [ I1 , O1 , I2 , O2 , Mat ] ( shape : BidiShape [ I1 , O1 , I2 , O2 ] ) : BidiFlow [ I1 , O1 , I2 , O2 , Mat ] = {
if ( ! moduleInProgress . isBidiFlow )
throw new IllegalArgumentException (
s" Cannot build BidiFlow with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . outPorts . toSet != shape . outlets . toSet )
throw new IllegalArgumentException ( s" provided Outlets [ ${ shape . outlets . mkString ( "," ) } ] does not equal the module’ s open Outlets [ ${ moduleInProgress . outPorts . mkString ( "," ) } ] " )
if ( moduleInProgress . inPorts . toSet != shape . inlets . toSet )
throw new IllegalArgumentException ( s" provided Inlets [ ${ shape . inlets . mkString ( "," ) } ] does not equal the module’ s open Inlets [ ${ moduleInProgress . inPorts . mkString ( "," ) } ] " )
2015-03-30 14:22:12 +02:00
new BidiFlow ( moduleInProgress . replaceShape ( shape ) . wrap ( ) )
2015-03-04 15:22:33 +01:00
}
2015-01-28 14:19:50 +01:00
private [ stream ] def buildSink [ T , Mat ] ( inlet : Inlet [ T ] ) : Sink [ T , Mat ] = {
if ( moduleInProgress . isRunnable )
throw new IllegalArgumentException ( "Cannot build the Sink since no ports remain open" )
if ( ! moduleInProgress . isSink )
throw new IllegalArgumentException (
s" Cannot build Sink with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . inPorts . head != inlet )
throw new IllegalArgumentException ( s" provided Inlet $inlet does not equal the module’ s open Inlet ${ moduleInProgress . inPorts . head } " )
2015-03-30 14:22:12 +02:00
new Sink ( moduleInProgress . replaceShape ( SinkShape ( inlet ) ) . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
private [ stream ] def module : Module = moduleInProgress
2015-03-06 12:22:14 +01:00
/* * Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava : javadsl.FlowGraph.Builder [ M ] = new javadsl . FlowGraph . Builder ( ) ( this )
2015-01-28 14:19:50 +01:00
}
object Implicits {
@tailrec
2015-03-30 14:22:12 +02:00
private [ stream ] def findOut [ I , O ] ( b : Builder [ _ ] , junction : UniformFanOutShape [ I , O ] , n : Int ) : Outlet [ O ] = {
2015-01-28 14:19:50 +01:00
if ( n == junction . outArray . length )
throw new IllegalArgumentException ( s" no more outlets free on $junction " )
else if ( b . module . downstreams . contains ( junction . out ( n ) ) ) findOut ( b , junction , n + 1 )
else junction . out ( n )
}
@tailrec
2015-03-30 14:22:12 +02:00
private [ stream ] def findIn [ I , O ] ( b : Builder [ _ ] , junction : UniformFanInShape [ I , O ] , n : Int ) : Inlet [ I ] = {
2015-01-28 14:19:50 +01:00
if ( n == junction . inArray . length )
throw new IllegalArgumentException ( s" no more inlets free on $junction " )
else if ( b . module . upstreams . contains ( junction . in ( n ) ) ) findIn ( b , junction , n + 1 )
else junction . in ( n )
}
trait CombinerBase [ T ] extends Any {
2015-03-30 14:22:12 +02:00
def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ T ]
2015-01-28 14:19:50 +01:00
2015-03-30 14:22:12 +02:00
def ~> ( to : Inlet [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , to )
}
2015-04-24 12:14:04 +02:00
def ~> [ Out ] ( via : Graph [ FlowShape [ T , Out ] , Any ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
val s = b . add ( via )
b . addEdge ( importAndGetPort ( b ) , s . inlet )
s . outlet
}
2015-03-30 14:22:12 +02:00
def ~> [ Out ] ( junction : UniformFanInShape [ T , Out ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
def bind ( n : Int ) : Unit = {
if ( n == junction . inArray . length )
throw new IllegalArgumentException ( s" no more inlets free on $junction " )
else if ( b . module . upstreams . contains ( junction . in ( n ) ) ) bind ( n + 1 )
else b . addEdge ( importAndGetPort ( b ) , junction . in ( n ) )
}
bind ( 0 )
junction . out
}
2015-03-30 14:22:12 +02:00
def ~> [ Out ] ( junction : UniformFanOutShape [ T , Out ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , junction . in )
try findOut ( b , junction , 0 )
catch {
case e : IllegalArgumentException ⇒ new DisabledPortOps ( e . getMessage )
}
}
2015-03-30 14:22:12 +02:00
def ~> [ Out ] ( flow : FlowShape [ T , Out ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , flow . inlet )
flow . outlet
}
2015-04-24 12:14:04 +02:00
def ~> ( to : Graph [ SinkShape [ T ] , _ ] ) ( implicit b : Builder [ _ ] ) : Unit = {
b . addEdge ( importAndGetPort ( b ) , b . add ( to ) . inlet )
2015-01-28 14:19:50 +01:00
}
2015-03-30 14:22:12 +02:00
def ~> ( to : SinkShape [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , to . inlet )
}
}
trait ReverseCombinerBase [ T ] extends Any {
2015-03-30 14:22:12 +02:00
def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ T ]
2015-01-28 14:19:50 +01:00
2015-03-30 14:22:12 +02:00
def <~ ( from : Outlet [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( from , importAndGetPortReverse ( b ) )
}
2015-04-24 12:14:04 +02:00
def <~ [ In ] ( via : Graph [ FlowShape [ In , T ] , _ ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
val s = b . add ( via )
b . addEdge ( s . outlet , importAndGetPortReverse ( b ) )
s . inlet
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( junction : UniformFanOutShape [ In , T ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
def bind ( n : Int ) : Unit = {
if ( n == junction . outArray . length )
throw new IllegalArgumentException ( s" no more outlets free on $junction " )
else if ( b . module . downstreams . contains ( junction . out ( n ) ) ) bind ( n + 1 )
else b . addEdge ( junction . out ( n ) , importAndGetPortReverse ( b ) )
}
bind ( 0 )
junction . in
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( junction : UniformFanInShape [ In , T ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( junction . out , importAndGetPortReverse ( b ) )
try findIn ( b , junction , 0 )
catch {
case e : IllegalArgumentException ⇒ new DisabledReversePortOps ( e . getMessage )
}
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( flow : FlowShape [ In , T ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( flow . outlet , importAndGetPortReverse ( b ) )
flow . inlet
}
2015-04-24 12:14:04 +02:00
def <~ ( from : Graph [ SourceShape [ T ] , _ ] ) ( implicit b : Builder [ _ ] ) : Unit = {
b . addEdge ( b . add ( from ) . outlet , importAndGetPortReverse ( b ) )
2015-01-28 14:19:50 +01:00
}
2015-03-30 14:22:12 +02:00
def <~ ( from : SourceShape [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( from . outlet , importAndGetPortReverse ( b ) )
}
}
2015-03-30 14:22:12 +02:00
class PortOps [ Out , Mat ] ( val outlet : Outlet [ Out ] , b : Builder [ _ ] ) extends FlowOps [ Out , Mat ] with CombinerBase [ Out ] {
2015-01-28 14:19:50 +01:00
override type Repr [ + O , + M ] = PortOps [ O , M ] @uncheckedVariance
override def withAttributes ( attr : OperationAttributes ) : Repr [ Out , Mat ] =
throw new UnsupportedOperationException ( "Cannot set attributes on chained ops from a junction output port" )
override private [ scaladsl ] def andThen [ U ] ( op : StageModule ) : Repr [ U , Mat ] = {
b . andThen ( outlet , op )
new PortOps ( op . shape . outlet . asInstanceOf [ Outlet [ U ] ] , b )
}
override private [ scaladsl ] def andThenMat [ U , Mat2 ] ( op : MaterializingStageFactory ) : Repr [ U , Mat2 ] = {
// We don't track materialization here
b . andThen ( outlet , op )
new PortOps ( op . shape . outlet . asInstanceOf [ Outlet [ U ] ] , b )
}
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ Out ] = outlet
2015-01-28 14:19:50 +01:00
}
class DisabledPortOps [ Out , Mat ] ( msg : String ) extends PortOps [ Out , Mat ] ( null , null ) {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ Out ] = throw new IllegalArgumentException ( msg )
2015-01-28 14:19:50 +01:00
}
implicit class ReversePortOps [ In ] ( val inlet : Inlet [ In ] ) extends ReverseCombinerBase [ In ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = inlet
2015-01-28 14:19:50 +01:00
}
class DisabledReversePortOps [ In ] ( msg : String ) extends ReversePortOps [ In ] ( null ) {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = throw new IllegalArgumentException ( msg )
2015-01-28 14:19:50 +01:00
}
implicit class FanInOps [ In , Out ] ( val j : UniformFanInShape [ In , Out ] ) extends AnyVal with CombinerBase [ Out ] with ReverseCombinerBase [ In ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ Out ] = j . out
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = findIn ( b , j , 0 )
2015-01-28 14:19:50 +01:00
}
implicit class FanOutOps [ In , Out ] ( val j : UniformFanOutShape [ In , Out ] ) extends AnyVal with ReverseCombinerBase [ In ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = j . in
2015-01-28 14:19:50 +01:00
}
2015-04-24 12:14:04 +02:00
implicit class SinkArrow [ T ] ( val s : Graph [ SinkShape [ T ] , _ ] ) extends AnyVal with ReverseCombinerBase [ T ] {
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ T ] = b . add ( s ) . inlet
2015-01-28 14:19:50 +01:00
}
implicit class SinkShapeArrow [ T ] ( val s : SinkShape [ T ] ) extends AnyVal with ReverseCombinerBase [ T ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ T ] = s . inlet
2015-01-28 14:19:50 +01:00
}
implicit class FlowShapeArrow [ I , O ] ( val f : FlowShape [ I , O ] ) extends AnyVal with ReverseCombinerBase [ I ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ I ] = f . inlet
2015-03-04 15:22:33 +01:00
2015-04-24 12:14:04 +02:00
def <~> [ I2 , O2 , Mat ] ( bidi : Graph [ BidiShape [ O , O2 , I2 , I ] , Mat ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( bidi )
b . addEdge ( f . outlet , shape . in1 )
b . addEdge ( shape . out2 , f . inlet )
shape
}
2015-03-30 14:22:12 +02:00
def <~> [ I2 , O2 ] ( bidi : BidiShape [ O , O2 , I2 , I ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
b . addEdge ( f . outlet , bidi . in1 )
b . addEdge ( bidi . out2 , f . inlet )
bidi
}
2015-04-24 12:14:04 +02:00
def <~> [ M ] ( flow : Graph [ FlowShape [ O , I ] , M ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( flow )
b . addEdge ( shape . outlet , f . inlet )
b . addEdge ( f . outlet , shape . inlet )
}
}
2015-04-24 12:14:04 +02:00
implicit class FlowArrow [ I , O , M ] ( val f : Graph [ FlowShape [ I , O ] , M ] ) extends AnyVal {
def <~> [ I2 , O2 , Mat ] ( bidi : Graph [ BidiShape [ O , O2 , I2 , I ] , Mat ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( bidi )
val flow = b . add ( f )
b . addEdge ( flow . outlet , shape . in1 )
b . addEdge ( shape . out2 , flow . inlet )
shape
}
2015-03-30 14:22:12 +02:00
def <~> [ I2 , O2 ] ( bidi : BidiShape [ O , O2 , I2 , I ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
val flow = b . add ( f )
b . addEdge ( flow . outlet , bidi . in1 )
b . addEdge ( bidi . out2 , flow . inlet )
bidi
}
2015-04-24 12:14:04 +02:00
def <~> [ M2 ] ( flow : Graph [ FlowShape [ O , I ] , M2 ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( flow )
val ff = b . add ( f )
b . addEdge ( shape . outlet , ff . inlet )
b . addEdge ( ff . outlet , shape . inlet )
}
}
implicit class BidiFlowShapeArrow [ I1 , O1 , I2 , O2 ] ( val bidi : BidiShape [ I1 , O1 , I2 , O2 ] ) extends AnyVal {
2015-03-30 14:22:12 +02:00
def <~> [ I3 , O3 ] ( other : BidiShape [ O1 , O3 , I3 , I2 ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O1 , O3 , I3 , I2 ] = {
2015-03-04 15:22:33 +01:00
b . addEdge ( bidi . out1 , other . in1 )
b . addEdge ( other . out2 , bidi . in2 )
other
}
2015-04-24 12:14:04 +02:00
def <~> [ I3 , O3 , M ] ( otherFlow : Graph [ BidiShape [ O1 , O3 , I3 , I2 ] , M ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O1 , O3 , I3 , I2 ] = {
2015-03-04 15:22:33 +01:00
val other = b . add ( otherFlow )
b . addEdge ( bidi . out1 , other . in1 )
b . addEdge ( other . out2 , bidi . in2 )
other
}
2015-03-30 14:22:12 +02:00
def <~> ( flow : FlowShape [ O1 , I2 ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
b . addEdge ( bidi . out1 , flow . inlet )
b . addEdge ( flow . outlet , bidi . in2 )
}
2015-04-24 12:14:04 +02:00
def <~> [ M ] ( f : Graph [ FlowShape [ O1 , I2 ] , M ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
val flow = b . add ( f )
b . addEdge ( bidi . out1 , flow . inlet )
b . addEdge ( flow . outlet , bidi . in2 )
}
2015-01-28 14:19:50 +01:00
}
import scala.language.implicitConversions
2015-03-30 14:22:12 +02:00
implicit def port2flow [ T ] ( from : Outlet [ T ] ) ( implicit b : Builder [ _ ] ) : PortOps [ T , Unit ] =
2015-01-28 14:19:50 +01:00
new PortOps ( from , b )
2015-03-30 14:22:12 +02:00
implicit def fanOut2flow [ I , O ] ( j : UniformFanOutShape [ I , O ] ) ( implicit b : Builder [ _ ] ) : PortOps [ O , Unit ] =
2015-01-28 14:19:50 +01:00
new PortOps ( findOut ( b , j , 0 ) , b )
2015-03-30 14:22:12 +02:00
implicit def flow2flow [ I , O ] ( f : FlowShape [ I , O ] ) ( implicit b : Builder [ _ ] ) : PortOps [ O , Unit ] =
2015-01-28 14:19:50 +01:00
new PortOps ( f . outlet , b )
2015-04-24 12:14:04 +02:00
implicit class SourceArrow [ T ] ( val s : Graph [ SourceShape [ T ] , _ ] ) extends AnyVal with CombinerBase [ T ] {
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ T ] = b . add ( s ) . outlet
2015-01-28 14:19:50 +01:00
}
implicit class SourceShapeArrow [ T ] ( val s : SourceShape [ T ] ) extends AnyVal with CombinerBase [ T ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ T ] = s . outlet
2015-01-28 14:19:50 +01:00
}
}
}