2015-01-28 14:19:50 +01:00
/* *
* Copyright ( C ) 2014 Typesafe Inc . < http : //www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.impl.Junctions._
import akka.stream.impl.GenJunctions._
import akka.stream.impl.Stages. { MaterializingStageFactory , StageModule }
import akka.stream.impl._
import akka.stream.impl.StreamLayout._
import akka.stream._
import OperationAttributes.name
import scala.collection.immutable
import scala.annotation.unchecked.uncheckedVariance
import scala.annotation.tailrec
/* *
* Merge several streams , taking elements as they arrive from input streams
* ( picking randomly when several have elements ready ) .
*
* A `Merge` has one `out` port and one or more `in` ports .
*/
object Merge {
/* *
* Create a new `Merge` with the specified number of input ports and attributes .
*
* @param inputPorts number of input ports
* @param attributes optional attributes
*/
def apply [ T ] ( inputPorts : Int , attributes : OperationAttributes = OperationAttributes . none ) : Graph [ UniformFanInShape [ T , T ] , Unit ] =
new Graph [ UniformFanInShape [ T , T ] , Unit ] {
val shape = new UniformFanInShape [ T , T ] ( inputPorts )
val module = new MergeModule ( shape , OperationAttributes . name ( "Merge" ) and attributes )
}
}
/* *
* Merge several streams , taking elements as they arrive from input streams
* ( picking from preferred when several have elements ready ) .
*
* A `MergePreferred` has one `out` port , one `preferred` input port and 0 or more secondary `in` ports .
*/
object MergePreferred {
import FanInShape._
final class MergePreferredShape [ T ] ( val secondaryPorts : Int , _init : Init [ T ] ) extends UniformFanInShape [ T , T ] ( secondaryPorts , _init ) {
def this ( secondaryPorts : Int , name : String ) = this ( secondaryPorts , Name ( name ) )
override protected def construct ( init : Init [ T ] ) : FanInShape [ T ] = new MergePreferredShape ( secondaryPorts , init )
override def deepCopy ( ) : MergePreferredShape [ T ] = super . deepCopy ( ) . asInstanceOf [ MergePreferredShape [ T ] ]
val preferred = newInlet [ T ] ( "preferred" )
}
/* *
* Create a new `PreferredMerge` with the specified number of secondary input ports and attributes .
*
* @param secondaryPorts number of secondary input ports
* @param attributes optional attributes
*/
def apply [ T ] ( secondaryPorts : Int , attributes : OperationAttributes = OperationAttributes . none ) : Graph [ MergePreferredShape [ T ] , Unit ] =
new Graph [ MergePreferredShape [ T ] , Unit ] {
val shape = new MergePreferredShape [ T ] ( secondaryPorts , "MergePreferred" )
val module = new MergePreferredModule ( shape , OperationAttributes . name ( "MergePreferred" ) and attributes )
}
}
/* *
* Fan - out the stream to several streams . Each element is produced to
* the other streams . It will not shut down until the subscriptions
* for at least two downstream subscribers have been established .
*
* A `Broadcast` has one `in` port and 2 or more `out` ports .
*/
object Broadcast {
/* *
* Create a new `Broadcast` with the specified number of output ports and attributes .
*
* @param outputPorts number of output ports
* @param attributes optional attributes
*/
def apply [ T ] ( outputPorts : Int , attributes : OperationAttributes = OperationAttributes . none ) : Graph [ UniformFanOutShape [ T , T ] , Unit ] =
new Graph [ UniformFanOutShape [ T , T ] , Unit ] {
val shape = new UniformFanOutShape [ T , T ] ( outputPorts )
val module = new BroadcastModule ( shape , OperationAttributes . name ( "Broadcast" ) and attributes )
}
}
/* *
* Fan - out the stream to several streams . Each element is produced to
* one of the other streams . It will not shut down until the subscriptions
* for at least two downstream subscribers have been established .
*
* A `Balance` has one `in` port and 2 or more `out` ports .
*/
object Balance {
/* *
* Create a new `Balance` with the specified number of output ports and attributes .
*
* @param outputPorts number of output ports
* @param waitForAllDownstreams if you use `waitForAllDownstreams = true` it will not start emitting
* elements to downstream outputs until all of them have requested at least one element ,
* default value is `false`
* @param attributes optional attributes
*/
def apply [ T ] ( outputPorts : Int , waitForAllDownstreams : Boolean = false , attributes : OperationAttributes = OperationAttributes . none ) : Graph [ UniformFanOutShape [ T , T ] , Unit ] =
new Graph [ UniformFanOutShape [ T , T ] , Unit ] {
val shape = new UniformFanOutShape [ T , T ] ( outputPorts )
val module = new BalanceModule ( shape , waitForAllDownstreams , OperationAttributes . name ( "Balance" ) and attributes )
}
}
/* *
* Combine the elements of 2 streams into a stream of tuples .
*
* A `Zip` has a `left` and a `right` input port and one `out` port
*/
object Zip {
/* *
* Create a new `Zip` with the specified attributes .
*
* @param attributes optional attributes
*/
def apply [ A , B ] ( attributes : OperationAttributes = OperationAttributes . none ) : Graph [ FanInShape2 [ A , B , ( A , B ) ] , Unit ] =
new Graph [ FanInShape2 [ A , B , ( A , B ) ] , Unit ] {
val shape = new FanInShape2 [ A , B , ( A , B ) ] ( "Zip" )
val module = new ZipWith2Module [ A , B , ( A , B ) ] ( shape , Keep . both , OperationAttributes . name ( "Zip" ) and attributes )
}
}
/* *
* Combine the elements of multiple streams into a stream of the combined elements .
*/
object ZipWith extends ZipWithApply
/* *
* Takes a stream of pair elements and splits each pair to two output streams .
*
* An `Unzip` has one `in` port and one `left` and one `right` output port .
*/
object Unzip {
/* *
* Create a new `Unzip` with the specified attributes .
*
* @param attributes optional attributes
*/
def apply [ A , B ] ( attributes : OperationAttributes = OperationAttributes . none ) : Graph [ FanOutShape2 [ ( A , B ) , A , B ] , Unit ] =
new Graph [ FanOutShape2 [ ( A , B ) , A , B ] , Unit ] {
val shape = new FanOutShape2 [ ( A , B ) , A , B ] ( "Unzip" )
val module = new UnzipModule ( shape , OperationAttributes . name ( "Unzip" ) and attributes )
}
}
/* *
* Takes two streams and outputs one stream formed from the two input streams
* by first emitting all of the elements from the first stream and then emitting
* all of the elements from the second stream .
*
* A `Concat` has one `first` port , one `second` port and one `out` port .
*/
object Concat {
/* *
* Create a new `Concat` with the specified attributes .
*
* @param attributes optional attributes
*/
def apply [ A ] ( attributes : OperationAttributes = OperationAttributes . none ) : Graph [ UniformFanInShape [ A , A ] , Unit ] =
new Graph [ UniformFanInShape [ A , A ] , Unit ] {
val shape = new UniformFanInShape [ A , A ] ( 2 )
val module = new ConcatModule ( shape , OperationAttributes . name ( "Concat" ) and attributes )
}
}
object FlowGraph extends GraphApply {
2015-03-30 14:22:12 +02:00
class Builder [ + M ] private [ stream ] ( ) {
2015-01-28 14:19:50 +01:00
private var moduleInProgress : Module = EmptyModule
2015-03-30 14:22:12 +02:00
def addEdge [ A , B , M2 ] ( from : Outlet [ A ] , via : Flow [ A , B , M2 ] , to : Inlet [ B ] ) : Unit = {
2015-01-28 14:19:50 +01:00
val flowCopy = via . module . carbonCopy
moduleInProgress =
moduleInProgress
. grow ( flowCopy )
. connect ( from , flowCopy . shape . inlets . head )
. connect ( flowCopy . shape . outlets . head , to )
}
def addEdge [ T ] ( from : Outlet [ T ] , to : Inlet [ T ] ) : Unit = {
moduleInProgress = moduleInProgress . connect ( from , to )
}
/* *
* Import a graph into this module , performing a deep copy , discarding its
* materialized value and returning the copied Ports that are now to be
* connected .
*/
def add [ S <: Shape ] ( graph : Graph [ S , _ ] ) : S = {
2015-04-13 20:10:59 +03:00
if ( StreamLayout . Debug ) graph . module . validate ( )
2015-01-28 14:19:50 +01:00
val copy = graph . module . carbonCopy
moduleInProgress = moduleInProgress . grow ( copy )
graph . shape . copyFromPorts ( copy . shape . inlets , copy . shape . outlets ) . asInstanceOf [ S ]
}
/* *
* INTERNAL API .
*
* This is only used by the materialization - importing apply methods of Source ,
* Flow , Sink and Graph .
*/
private [ stream ] def add [ S <: Shape , A , B ] ( graph : Graph [ S , _ ] , combine : ( A , B ) ⇒ Any ) : S = {
2015-04-13 20:10:59 +03:00
if ( StreamLayout . Debug ) graph . module . validate ( )
2015-01-28 14:19:50 +01:00
val copy = graph . module . carbonCopy
moduleInProgress = moduleInProgress . grow ( copy , combine )
graph . shape . copyFromPorts ( copy . shape . inlets , copy . shape . outlets ) . asInstanceOf [ S ]
}
def add [ T ] ( s : Source [ T , _ ] ) : Outlet [ T ] = add ( s : Graph [ SourceShape [ T ] , _ ] ) . outlet
def add [ T ] ( s : Sink [ T , _ ] ) : Inlet [ T ] = add ( s : Graph [ SinkShape [ T ] , _ ] ) . inlet
2015-03-30 14:22:12 +02:00
/* *
* Returns an [ [ Outlet ] ] that gives access to the materialized value of this graph . Once the graph is materialized
* this outlet will emit exactly one element which is the materialized value . It is possible to expose this
* outlet as an externally accessible outlet of a [ [ Source ] ] , [ [ Sink ] ] , [ [ Flow ] ] or [ [ BidiFlow ] ] .
*
* It is possible to call this method multiple times to get multiple [ [ Outlet ] ] instances if necessary . All of
* the outlets will emit the materialized value .
*
* Be careful to not to feed the result of this outlet to a stage that produces the materialized value itself ( for
* example to a [ [ Sink # fold ] ] that contributes to the materialized value ) since that might lead to an unresolvable
* dependency cycle .
*
* @return The outlet that will emit the materialized value .
*/
def matValue : Outlet [ M ] = {
val module = new MaterializedValueSource [ Any ]
moduleInProgress = moduleInProgress . grow ( module )
module . shape . outlet . asInstanceOf [ Outlet [ M ] ]
}
2015-01-28 14:19:50 +01:00
private [ stream ] def andThen ( port : OutPort , op : StageModule ) : Unit = {
moduleInProgress =
moduleInProgress
. grow ( op )
. connect ( port , op . inPort )
}
private [ stream ] def buildRunnable [ Mat ] ( ) : RunnableFlow [ Mat ] = {
if ( ! moduleInProgress . isRunnable ) {
throw new IllegalArgumentException (
"Cannot build the RunnableFlow because there are unconnected ports: " +
( moduleInProgress . outPorts ++ moduleInProgress . inPorts ) . mkString ( ", " ) )
}
2015-03-30 14:22:12 +02:00
new RunnableFlow ( moduleInProgress . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
private [ stream ] def buildSource [ T , Mat ] ( outlet : Outlet [ T ] ) : Source [ T , Mat ] = {
if ( moduleInProgress . isRunnable )
throw new IllegalArgumentException ( "Cannot build the Source since no ports remain open" )
if ( ! moduleInProgress . isSource )
throw new IllegalArgumentException (
s" Cannot build Source with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . outPorts . head != outlet )
throw new IllegalArgumentException ( s" provided Outlet $outlet does not equal the module’ s open Outlet ${ moduleInProgress . outPorts . head } " )
2015-03-30 14:22:12 +02:00
new Source ( moduleInProgress . replaceShape ( SourceShape ( outlet ) ) . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
private [ stream ] def buildFlow [ In , Out , Mat ] ( inlet : Inlet [ In ] , outlet : Outlet [ Out ] ) : Flow [ In , Out , Mat ] = {
if ( ! moduleInProgress . isFlow )
throw new IllegalArgumentException (
s" Cannot build Flow with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . outPorts . head != outlet )
throw new IllegalArgumentException ( s" provided Outlet $outlet does not equal the module’ s open Outlet ${ moduleInProgress . outPorts . head } " )
if ( moduleInProgress . inPorts . head != inlet )
throw new IllegalArgumentException ( s" provided Inlet $inlet does not equal the module’ s open Inlet ${ moduleInProgress . inPorts . head } " )
2015-03-30 14:22:12 +02:00
new Flow ( moduleInProgress . replaceShape ( FlowShape ( inlet , outlet ) ) . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
2015-03-04 15:22:33 +01:00
private [ stream ] def buildBidiFlow [ I1 , O1 , I2 , O2 , Mat ] ( shape : BidiShape [ I1 , O1 , I2 , O2 ] ) : BidiFlow [ I1 , O1 , I2 , O2 , Mat ] = {
if ( ! moduleInProgress . isBidiFlow )
throw new IllegalArgumentException (
s" Cannot build BidiFlow with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . outPorts . toSet != shape . outlets . toSet )
throw new IllegalArgumentException ( s" provided Outlets [ ${ shape . outlets . mkString ( "," ) } ] does not equal the module’ s open Outlets [ ${ moduleInProgress . outPorts . mkString ( "," ) } ] " )
if ( moduleInProgress . inPorts . toSet != shape . inlets . toSet )
throw new IllegalArgumentException ( s" provided Inlets [ ${ shape . inlets . mkString ( "," ) } ] does not equal the module’ s open Inlets [ ${ moduleInProgress . inPorts . mkString ( "," ) } ] " )
2015-03-30 14:22:12 +02:00
new BidiFlow ( moduleInProgress . replaceShape ( shape ) . wrap ( ) )
2015-03-04 15:22:33 +01:00
}
2015-01-28 14:19:50 +01:00
private [ stream ] def buildSink [ T , Mat ] ( inlet : Inlet [ T ] ) : Sink [ T , Mat ] = {
if ( moduleInProgress . isRunnable )
throw new IllegalArgumentException ( "Cannot build the Sink since no ports remain open" )
if ( ! moduleInProgress . isSink )
throw new IllegalArgumentException (
s" Cannot build Sink with open inputs ( ${ moduleInProgress . inPorts . mkString ( "," ) } ) and outputs ( ${ moduleInProgress . outPorts . mkString ( "," ) } ) " )
if ( moduleInProgress . inPorts . head != inlet )
throw new IllegalArgumentException ( s" provided Inlet $inlet does not equal the module’ s open Inlet ${ moduleInProgress . inPorts . head } " )
2015-03-30 14:22:12 +02:00
new Sink ( moduleInProgress . replaceShape ( SinkShape ( inlet ) ) . wrap ( ) )
2015-01-28 14:19:50 +01:00
}
private [ stream ] def module : Module = moduleInProgress
2015-03-06 12:22:14 +01:00
/* * Converts this Scala DSL element to it's Java DSL counterpart. */
def asJava : javadsl.FlowGraph.Builder [ M ] = new javadsl . FlowGraph . Builder ( ) ( this )
2015-01-28 14:19:50 +01:00
}
object Implicits {
@tailrec
2015-03-30 14:22:12 +02:00
private [ stream ] def findOut [ I , O ] ( b : Builder [ _ ] , junction : UniformFanOutShape [ I , O ] , n : Int ) : Outlet [ O ] = {
2015-01-28 14:19:50 +01:00
if ( n == junction . outArray . length )
throw new IllegalArgumentException ( s" no more outlets free on $junction " )
else if ( b . module . downstreams . contains ( junction . out ( n ) ) ) findOut ( b , junction , n + 1 )
else junction . out ( n )
}
@tailrec
2015-03-30 14:22:12 +02:00
private [ stream ] def findIn [ I , O ] ( b : Builder [ _ ] , junction : UniformFanInShape [ I , O ] , n : Int ) : Inlet [ I ] = {
2015-01-28 14:19:50 +01:00
if ( n == junction . inArray . length )
throw new IllegalArgumentException ( s" no more inlets free on $junction " )
else if ( b . module . upstreams . contains ( junction . in ( n ) ) ) findIn ( b , junction , n + 1 )
else junction . in ( n )
}
trait CombinerBase [ T ] extends Any {
2015-03-30 14:22:12 +02:00
def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ T ]
2015-01-28 14:19:50 +01:00
2015-03-30 14:22:12 +02:00
def ~> ( to : Inlet [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , to )
}
2015-04-02 16:52:30 +02:00
def ~> [ Out ] ( via : Flow [ T , Out , Any ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
val s = b . add ( via )
b . addEdge ( importAndGetPort ( b ) , s . inlet )
s . outlet
}
2015-03-30 14:22:12 +02:00
def ~> [ Out ] ( junction : UniformFanInShape [ T , Out ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
def bind ( n : Int ) : Unit = {
if ( n == junction . inArray . length )
throw new IllegalArgumentException ( s" no more inlets free on $junction " )
else if ( b . module . upstreams . contains ( junction . in ( n ) ) ) bind ( n + 1 )
else b . addEdge ( importAndGetPort ( b ) , junction . in ( n ) )
}
bind ( 0 )
junction . out
}
2015-03-30 14:22:12 +02:00
def ~> [ Out ] ( junction : UniformFanOutShape [ T , Out ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , junction . in )
try findOut ( b , junction , 0 )
catch {
case e : IllegalArgumentException ⇒ new DisabledPortOps ( e . getMessage )
}
}
2015-03-30 14:22:12 +02:00
def ~> [ Out ] ( flow : FlowShape [ T , Out ] ) ( implicit b : Builder [ _ ] ) : PortOps [ Out , Unit ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , flow . inlet )
flow . outlet
}
2015-03-30 14:22:12 +02:00
def ~> ( to : Sink [ T , _ ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , b . add ( to ) )
}
2015-03-30 14:22:12 +02:00
def ~> ( to : SinkShape [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( importAndGetPort ( b ) , to . inlet )
}
}
trait ReverseCombinerBase [ T ] extends Any {
2015-03-30 14:22:12 +02:00
def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ T ]
2015-01-28 14:19:50 +01:00
2015-03-30 14:22:12 +02:00
def <~ ( from : Outlet [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( from , importAndGetPortReverse ( b ) )
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( via : Flow [ In , T , _ ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
val s = b . add ( via )
b . addEdge ( s . outlet , importAndGetPortReverse ( b ) )
s . inlet
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( junction : UniformFanOutShape [ In , T ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
def bind ( n : Int ) : Unit = {
if ( n == junction . outArray . length )
throw new IllegalArgumentException ( s" no more outlets free on $junction " )
else if ( b . module . downstreams . contains ( junction . out ( n ) ) ) bind ( n + 1 )
else b . addEdge ( junction . out ( n ) , importAndGetPortReverse ( b ) )
}
bind ( 0 )
junction . in
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( junction : UniformFanInShape [ In , T ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( junction . out , importAndGetPortReverse ( b ) )
try findIn ( b , junction , 0 )
catch {
case e : IllegalArgumentException ⇒ new DisabledReversePortOps ( e . getMessage )
}
}
2015-03-30 14:22:12 +02:00
def <~ [ In ] ( flow : FlowShape [ In , T ] ) ( implicit b : Builder [ _ ] ) : ReversePortOps [ In ] = {
2015-01-28 14:19:50 +01:00
b . addEdge ( flow . outlet , importAndGetPortReverse ( b ) )
flow . inlet
}
2015-03-30 14:22:12 +02:00
def <~ ( from : Source [ T , _ ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( b . add ( from ) , importAndGetPortReverse ( b ) )
}
2015-03-30 14:22:12 +02:00
def <~ ( from : SourceShape [ T ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-01-28 14:19:50 +01:00
b . addEdge ( from . outlet , importAndGetPortReverse ( b ) )
}
}
2015-03-30 14:22:12 +02:00
class PortOps [ Out , Mat ] ( val outlet : Outlet [ Out ] , b : Builder [ _ ] ) extends FlowOps [ Out , Mat ] with CombinerBase [ Out ] {
2015-01-28 14:19:50 +01:00
override type Repr [ + O , + M ] = PortOps [ O , M ] @uncheckedVariance
override def withAttributes ( attr : OperationAttributes ) : Repr [ Out , Mat ] =
throw new UnsupportedOperationException ( "Cannot set attributes on chained ops from a junction output port" )
override private [ scaladsl ] def andThen [ U ] ( op : StageModule ) : Repr [ U , Mat ] = {
b . andThen ( outlet , op )
new PortOps ( op . shape . outlet . asInstanceOf [ Outlet [ U ] ] , b )
}
override private [ scaladsl ] def andThenMat [ U , Mat2 ] ( op : MaterializingStageFactory ) : Repr [ U , Mat2 ] = {
// We don't track materialization here
b . andThen ( outlet , op )
new PortOps ( op . shape . outlet . asInstanceOf [ Outlet [ U ] ] , b )
}
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ Out ] = outlet
2015-01-28 14:19:50 +01:00
}
class DisabledPortOps [ Out , Mat ] ( msg : String ) extends PortOps [ Out , Mat ] ( null , null ) {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ Out ] = throw new IllegalArgumentException ( msg )
2015-01-28 14:19:50 +01:00
}
implicit class ReversePortOps [ In ] ( val inlet : Inlet [ In ] ) extends ReverseCombinerBase [ In ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = inlet
2015-01-28 14:19:50 +01:00
}
class DisabledReversePortOps [ In ] ( msg : String ) extends ReversePortOps [ In ] ( null ) {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = throw new IllegalArgumentException ( msg )
2015-01-28 14:19:50 +01:00
}
implicit class FanInOps [ In , Out ] ( val j : UniformFanInShape [ In , Out ] ) extends AnyVal with CombinerBase [ Out ] with ReverseCombinerBase [ In ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ Out ] = j . out
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = findIn ( b , j , 0 )
2015-01-28 14:19:50 +01:00
}
implicit class FanOutOps [ In , Out ] ( val j : UniformFanOutShape [ In , Out ] ) extends AnyVal with ReverseCombinerBase [ In ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ In ] = j . in
2015-01-28 14:19:50 +01:00
}
implicit class SinkArrow [ T ] ( val s : Sink [ T , _ ] ) extends AnyVal with ReverseCombinerBase [ T ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ T ] = b . add ( s )
2015-01-28 14:19:50 +01:00
}
implicit class SinkShapeArrow [ T ] ( val s : SinkShape [ T ] ) extends AnyVal with ReverseCombinerBase [ T ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ T ] = s . inlet
2015-01-28 14:19:50 +01:00
}
implicit class FlowShapeArrow [ I , O ] ( val f : FlowShape [ I , O ] ) extends AnyVal with ReverseCombinerBase [ I ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPortReverse ( b : Builder [ _ ] ) : Inlet [ I ] = f . inlet
2015-03-04 15:22:33 +01:00
2015-03-30 14:22:12 +02:00
def <~> [ I2 , O2 , Mat ] ( bidi : BidiFlow [ O , O2 , I2 , I , Mat ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( bidi )
b . addEdge ( f . outlet , shape . in1 )
b . addEdge ( shape . out2 , f . inlet )
shape
}
2015-03-30 14:22:12 +02:00
def <~> [ I2 , O2 ] ( bidi : BidiShape [ O , O2 , I2 , I ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
b . addEdge ( f . outlet , bidi . in1 )
b . addEdge ( bidi . out2 , f . inlet )
bidi
}
2015-03-30 14:22:12 +02:00
def <~> [ M ] ( flow : Flow [ O , I , M ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( flow )
b . addEdge ( shape . outlet , f . inlet )
b . addEdge ( f . outlet , shape . inlet )
}
}
implicit class FlowArrow [ I , O , M ] ( val f : Flow [ I , O , M ] ) extends AnyVal {
2015-03-30 14:22:12 +02:00
def <~> [ I2 , O2 , Mat ] ( bidi : BidiFlow [ O , O2 , I2 , I , Mat ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( bidi )
val flow = b . add ( f )
b . addEdge ( flow . outlet , shape . in1 )
b . addEdge ( shape . out2 , flow . inlet )
shape
}
2015-03-30 14:22:12 +02:00
def <~> [ I2 , O2 ] ( bidi : BidiShape [ O , O2 , I2 , I ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O , O2 , I2 , I ] = {
2015-03-04 15:22:33 +01:00
val flow = b . add ( f )
b . addEdge ( flow . outlet , bidi . in1 )
b . addEdge ( bidi . out2 , flow . inlet )
bidi
}
2015-03-30 14:22:12 +02:00
def <~> [ M2 ] ( flow : Flow [ O , I , M2 ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
val shape = b . add ( flow )
val ff = b . add ( f )
b . addEdge ( shape . outlet , ff . inlet )
b . addEdge ( ff . outlet , shape . inlet )
}
}
implicit class BidiFlowShapeArrow [ I1 , O1 , I2 , O2 ] ( val bidi : BidiShape [ I1 , O1 , I2 , O2 ] ) extends AnyVal {
2015-03-30 14:22:12 +02:00
def <~> [ I3 , O3 ] ( other : BidiShape [ O1 , O3 , I3 , I2 ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O1 , O3 , I3 , I2 ] = {
2015-03-04 15:22:33 +01:00
b . addEdge ( bidi . out1 , other . in1 )
b . addEdge ( other . out2 , bidi . in2 )
other
}
2015-03-30 14:22:12 +02:00
def <~> [ I3 , O3 , M ] ( otherFlow : BidiFlow [ O1 , O3 , I3 , I2 , M ] ) ( implicit b : Builder [ _ ] ) : BidiShape [ O1 , O3 , I3 , I2 ] = {
2015-03-04 15:22:33 +01:00
val other = b . add ( otherFlow )
b . addEdge ( bidi . out1 , other . in1 )
b . addEdge ( other . out2 , bidi . in2 )
other
}
2015-03-30 14:22:12 +02:00
def <~> ( flow : FlowShape [ O1 , I2 ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
b . addEdge ( bidi . out1 , flow . inlet )
b . addEdge ( flow . outlet , bidi . in2 )
}
2015-03-30 14:22:12 +02:00
def <~> [ M ] ( f : Flow [ O1 , I2 , M ] ) ( implicit b : Builder [ _ ] ) : Unit = {
2015-03-04 15:22:33 +01:00
val flow = b . add ( f )
b . addEdge ( bidi . out1 , flow . inlet )
b . addEdge ( flow . outlet , bidi . in2 )
}
2015-01-28 14:19:50 +01:00
}
import scala.language.implicitConversions
2015-03-30 14:22:12 +02:00
implicit def port2flow [ T ] ( from : Outlet [ T ] ) ( implicit b : Builder [ _ ] ) : PortOps [ T , Unit ] =
2015-01-28 14:19:50 +01:00
new PortOps ( from , b )
2015-03-30 14:22:12 +02:00
implicit def fanOut2flow [ I , O ] ( j : UniformFanOutShape [ I , O ] ) ( implicit b : Builder [ _ ] ) : PortOps [ O , Unit ] =
2015-01-28 14:19:50 +01:00
new PortOps ( findOut ( b , j , 0 ) , b )
2015-03-30 14:22:12 +02:00
implicit def flow2flow [ I , O ] ( f : FlowShape [ I , O ] ) ( implicit b : Builder [ _ ] ) : PortOps [ O , Unit ] =
2015-01-28 14:19:50 +01:00
new PortOps ( f . outlet , b )
implicit class SourceArrow [ T ] ( val s : Source [ T , _ ] ) extends AnyVal with CombinerBase [ T ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ T ] = b . add ( s )
2015-01-28 14:19:50 +01:00
}
implicit class SourceShapeArrow [ T ] ( val s : SourceShape [ T ] ) extends AnyVal with CombinerBase [ T ] {
2015-03-30 14:22:12 +02:00
override def importAndGetPort ( b : Builder [ _ ] ) : Outlet [ T ] = s . outlet
2015-01-28 14:19:50 +01:00
}
}
}