=str Rename Fan to Junction

This commit is contained in:
Patrik Nordwall 2014-09-10 14:40:29 +02:00
parent 5b360a76b0
commit c35f1f42e5
3 changed files with 35 additions and 35 deletions

View file

@ -62,15 +62,15 @@ private[akka] object Ast {
override def name = "buffer"
}
sealed trait FanAstNode {
sealed trait JunctionAstNode {
def name: String
}
case object Merge extends FanAstNode {
case object Merge extends JunctionAstNode {
override def name = "merge"
}
case object Broadcast extends FanAstNode {
case object Broadcast extends JunctionAstNode {
override def name = "broadcast"
}
@ -183,7 +183,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]")
}
override def materializeFan[In, Out](op: Ast.FanAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = op match {
override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = op match {
case Ast.Merge
// FIXME real impl
require(outputCount == 1)

View file

@ -16,15 +16,15 @@ import akka.stream.impl2.Ast
* Fan-in and fan-out vertices in the [[FlowGraph]] implements
* this marker interface.
*/
sealed trait FanOperation[T] extends FlowGraphInternal.Vertex
sealed trait Junction[T] extends FlowGraphInternal.Vertex
/**
* Fan-out vertices in the [[FlowGraph]] implements this marker interface.
*/
trait FanOutOperation[T] extends FanOperation[T]
trait FanOutOperation[T] extends Junction[T]
/**
* Fan-in vertices in the [[FlowGraph]] implements this marker interface.
*/
trait FanInOperation[T] extends FanOperation[T]
trait FanInOperation[T] extends Junction[T]
object Merge {
/**
@ -176,7 +176,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
private implicit val edgeFactory = scalax.collection.edge.LDiEdge
def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = {
def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = {
val sourceVertex = SourceVertex(source)
checkAddSourceSinkPrecondition(sourceVertex)
checkAddFanPrecondition(sink, in = true)
@ -184,14 +184,14 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
this
}
def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = {
def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = {
checkAddSourceSinkPrecondition(source)
checkAddFanPrecondition(sink, in = true)
graph.addLEdge(source, sink)(flow)
this
}
def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: Sink[Out]): this.type = {
def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Sink[Out]): this.type = {
val sinkVertex = SinkVertex(sink)
checkAddSourceSinkPrecondition(sinkVertex)
checkAddFanPrecondition(source, in = false)
@ -199,26 +199,26 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
this
}
def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = {
def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = {
checkAddSourceSinkPrecondition(sink)
checkAddFanPrecondition(source, in = false)
graph.addLEdge(source, sink)(flow)
this
}
def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = {
def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = {
checkAddFanPrecondition(source, in = false)
checkAddFanPrecondition(sink, in = true)
graph.addLEdge(source, sink)(flow)
this
}
def addEdge[In, Out](flow: FlowWithSource[In, Out], sink: FanOperation[Out]): this.type = {
def addEdge[In, Out](flow: FlowWithSource[In, Out], sink: Junction[Out]): this.type = {
addEdge(flow.input, flow.withoutSource, sink)
this
}
def addEdge[In, Out](source: FanOperation[In], flow: FlowWithSink[In, Out]): this.type = {
def addEdge[In, Out](source: Junction[In], flow: FlowWithSink[In, Out]): this.type = {
addEdge(source, flow.withoutSink, flow.output)
this
}
@ -250,18 +250,18 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge])
private def checkAddSourceSinkPrecondition(node: Vertex): Unit =
require(graph.find(node) == None, s"[$node] instance is already used in this flow graph")
private def checkAddFanPrecondition(fan: FanOperation[_], in: Boolean): Unit = {
fan match {
private def checkAddFanPrecondition(junction: Junction[_], in: Boolean): Unit = {
junction match {
case _: FanOutOperation[_] if in
graph.find(fan) match {
graph.find(junction) match {
case Some(existing) if existing.incoming.nonEmpty
throw new IllegalArgumentException(s"Fan-out [$fan] is already attached to input [${existing.incoming.head}]")
throw new IllegalArgumentException(s"Fan-out [$junction] is already attached to input [${existing.incoming.head}]")
case _ // ok
}
case _: FanInOperation[_] if !in
graph.find(fan) match {
graph.find(junction) match {
case Some(existing) if existing.outgoing.nonEmpty
throw new IllegalArgumentException(s"Fan-in [$fan] is already attached to output [${existing.outgoing.head}]")
throw new IllegalArgumentException(s"Fan-in [$junction] is already attached to output [${existing.outgoing.head}]")
case _ // ok
}
case _ // ok
@ -429,7 +429,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
case merge: Merge[_]
// one subscriber for each incoming edge of the merge vertex
val (subscribers, publishers) =
materializer.materializeFan[Any, Any](Ast.Merge, edge.from.inDegree, 1)
materializer.materializeJunction[Any, Any](Ast.Merge, edge.from.inDegree, 1)
val publisher = publishers.head
val edgeSubscribers = edge.from.incoming.zip(subscribers)
val materializedSink = connectToDownstream(publisher)
@ -448,7 +448,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
} else {
// one publisher for each outgoing edge of the broadcast vertex
val (subscribers, publishers) =
materializer.materializeFan[Any, Any](Ast.Broadcast, 1, edge.from.outDegree)
materializer.materializeJunction[Any, Any](Ast.Broadcast, 1, edge.from.outDegree)
val subscriber = subscribers.head
val edgePublishers = edge.from.outgoing.zip(publishers).toMap
val publisher = edgePublishers(edge)
@ -461,7 +461,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph
}
case other
throw new IllegalArgumentException("Unknown fan operation: " + other)
throw new IllegalArgumentException("Unknown junction operation: " + other)
}
}
@ -577,35 +577,35 @@ object FlowGraphImplicits {
}
class SourceNextStep[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {
def ~>(sink: FanOperation[Out]): FanOperation[Out] = {
def ~>(sink: Junction[Out]): Junction[Out] = {
builder.addEdge(source, flow, sink)
sink
}
}
implicit class FanOps[In](val fan: FanOperation[In]) extends AnyVal {
def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): FanNextStep[In, Out] = {
new FanNextStep(fan, flow, builder)
implicit class JunctionOps[In](val junction: Junction[In]) extends AnyVal {
def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = {
new JunctionNextStep(junction, flow, builder)
}
}
class FanNextStep[In, Out](fan: FanOperation[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {
def ~>(sink: FanOperation[Out]): FanOperation[Out] = {
builder.addEdge(fan, flow, sink)
class JunctionNextStep[In, Out](junction: Junction[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {
def ~>(sink: Junction[Out]): Junction[Out] = {
builder.addEdge(junction, flow, sink)
sink
}
def ~>(sink: Sink[Out]): Unit = {
builder.addEdge(fan, flow, sink)
builder.addEdge(junction, flow, sink)
}
def ~>(sink: UndefinedSink[Out]): Unit = {
builder.addEdge(fan, flow, sink)
builder.addEdge(junction, flow, sink)
}
}
implicit class FlowWithSourceOps[In, Out](val flow: FlowWithSource[In, Out]) extends AnyVal {
def ~>(sink: FanOperation[Out])(implicit builder: FlowGraphBuilder): FanOperation[Out] = {
def ~>(sink: Junction[Out])(implicit builder: FlowGraphBuilder): Junction[Out] = {
builder.addEdge(flow, sink)
sink
}
@ -620,7 +620,7 @@ object FlowGraphImplicits {
}
class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) {
def ~>(sink: FanOperation[Out]): FanOperation[Out] = {
def ~>(sink: Junction[Out]): Junction[Out] = {
builder.addEdge(source, flow, sink)
sink
}

View file

@ -143,7 +143,7 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) {
/**
* Create publishers and subscribers for fan-in and fan-out operations.
*/
def materializeFan[In, Out](op: Ast.FanAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]])
def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]])
}