From c35f1f42e5e60b054bdd0eeb43771ed8d169a1ee Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 10 Sep 2014 14:40:29 +0200 Subject: [PATCH] =str Rename Fan to Junction --- .../impl2/ActorBasedFlowMaterializer.scala | 8 +-- .../akka/stream/scaladsl2/FlowGraph.scala | 60 +++++++++---------- .../stream/scaladsl2/FlowMaterializer.scala | 2 +- 3 files changed, 35 insertions(+), 35 deletions(-) diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 962d94d3a1..b41f497546 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -62,15 +62,15 @@ private[akka] object Ast { override def name = "buffer" } - sealed trait FanAstNode { + sealed trait JunctionAstNode { def name: String } - case object Merge extends FanAstNode { + case object Merge extends JunctionAstNode { override def name = "merge" } - case object Broadcast extends FanAstNode { + case object Broadcast extends JunctionAstNode { override def name = "broadcast" } @@ -183,7 +183,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") } - override def materializeFan[In, Out](op: Ast.FanAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = op match { + override def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = op match { case Ast.Merge ⇒ // FIXME real impl require(outputCount == 1) diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 4f5efbc90e..5de26ce57c 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -16,15 +16,15 @@ import akka.stream.impl2.Ast * Fan-in and fan-out vertices in the [[FlowGraph]] implements * this marker interface. */ -sealed trait FanOperation[T] extends FlowGraphInternal.Vertex +sealed trait Junction[T] extends FlowGraphInternal.Vertex /** * Fan-out vertices in the [[FlowGraph]] implements this marker interface. */ -trait FanOutOperation[T] extends FanOperation[T] +trait FanOutOperation[T] extends Junction[T] /** * Fan-in vertices in the [[FlowGraph]] implements this marker interface. */ -trait FanInOperation[T] extends FanOperation[T] +trait FanInOperation[T] extends Junction[T] object Merge { /** @@ -176,7 +176,7 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) private implicit val edgeFactory = scalax.collection.edge.LDiEdge - def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = { + def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { val sourceVertex = SourceVertex(source) checkAddSourceSinkPrecondition(sourceVertex) checkAddFanPrecondition(sink, in = true) @@ -184,14 +184,14 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) this } - def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = { + def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { checkAddSourceSinkPrecondition(source) checkAddFanPrecondition(sink, in = true) graph.addLEdge(source, sink)(flow) this } - def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: Sink[Out]): this.type = { + def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Sink[Out]): this.type = { val sinkVertex = SinkVertex(sink) checkAddSourceSinkPrecondition(sinkVertex) checkAddFanPrecondition(source, in = false) @@ -199,26 +199,26 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) this } - def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = { + def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = { checkAddSourceSinkPrecondition(sink) checkAddFanPrecondition(source, in = false) graph.addLEdge(source, sink)(flow) this } - def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = { + def addEdge[In, Out](source: Junction[In], flow: ProcessorFlow[In, Out], sink: Junction[Out]): this.type = { checkAddFanPrecondition(source, in = false) checkAddFanPrecondition(sink, in = true) graph.addLEdge(source, sink)(flow) this } - def addEdge[In, Out](flow: FlowWithSource[In, Out], sink: FanOperation[Out]): this.type = { + def addEdge[In, Out](flow: FlowWithSource[In, Out], sink: Junction[Out]): this.type = { addEdge(flow.input, flow.withoutSource, sink) this } - def addEdge[In, Out](source: FanOperation[In], flow: FlowWithSink[In, Out]): this.type = { + def addEdge[In, Out](source: Junction[In], flow: FlowWithSink[In, Out]): this.type = { addEdge(source, flow.withoutSink, flow.output) this } @@ -250,18 +250,18 @@ class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) private def checkAddSourceSinkPrecondition(node: Vertex): Unit = require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") - private def checkAddFanPrecondition(fan: FanOperation[_], in: Boolean): Unit = { - fan match { + private def checkAddFanPrecondition(junction: Junction[_], in: Boolean): Unit = { + junction match { case _: FanOutOperation[_] if in ⇒ - graph.find(fan) match { + graph.find(junction) match { case Some(existing) if existing.incoming.nonEmpty ⇒ - throw new IllegalArgumentException(s"Fan-out [$fan] is already attached to input [${existing.incoming.head}]") + throw new IllegalArgumentException(s"Fan-out [$junction] is already attached to input [${existing.incoming.head}]") case _ ⇒ // ok } case _: FanInOperation[_] if !in ⇒ - graph.find(fan) match { + graph.find(junction) match { case Some(existing) if existing.outgoing.nonEmpty ⇒ - throw new IllegalArgumentException(s"Fan-in [$fan] is already attached to output [${existing.outgoing.head}]") + throw new IllegalArgumentException(s"Fan-in [$junction] is already attached to output [${existing.outgoing.head}]") case _ ⇒ // ok } case _ ⇒ // ok @@ -429,7 +429,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph case merge: Merge[_] ⇒ // one subscriber for each incoming edge of the merge vertex val (subscribers, publishers) = - materializer.materializeFan[Any, Any](Ast.Merge, edge.from.inDegree, 1) + materializer.materializeJunction[Any, Any](Ast.Merge, edge.from.inDegree, 1) val publisher = publishers.head val edgeSubscribers = edge.from.incoming.zip(subscribers) val materializedSink = connectToDownstream(publisher) @@ -448,7 +448,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph } else { // one publisher for each outgoing edge of the broadcast vertex val (subscribers, publishers) = - materializer.materializeFan[Any, Any](Ast.Broadcast, 1, edge.from.outDegree) + materializer.materializeJunction[Any, Any](Ast.Broadcast, 1, edge.from.outDegree) val subscriber = subscribers.head val edgePublishers = edge.from.outgoing.zip(publishers).toMap val publisher = edgePublishers(edge) @@ -461,7 +461,7 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph } case other ⇒ - throw new IllegalArgumentException("Unknown fan operation: " + other) + throw new IllegalArgumentException("Unknown junction operation: " + other) } } @@ -577,35 +577,35 @@ object FlowGraphImplicits { } class SourceNextStep[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { - def ~>(sink: FanOperation[Out]): FanOperation[Out] = { + def ~>(sink: Junction[Out]): Junction[Out] = { builder.addEdge(source, flow, sink) sink } } - implicit class FanOps[In](val fan: FanOperation[In]) extends AnyVal { - def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): FanNextStep[In, Out] = { - new FanNextStep(fan, flow, builder) + implicit class JunctionOps[In](val junction: Junction[In]) extends AnyVal { + def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): JunctionNextStep[In, Out] = { + new JunctionNextStep(junction, flow, builder) } } - class FanNextStep[In, Out](fan: FanOperation[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { - def ~>(sink: FanOperation[Out]): FanOperation[Out] = { - builder.addEdge(fan, flow, sink) + class JunctionNextStep[In, Out](junction: Junction[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { + def ~>(sink: Junction[Out]): Junction[Out] = { + builder.addEdge(junction, flow, sink) sink } def ~>(sink: Sink[Out]): Unit = { - builder.addEdge(fan, flow, sink) + builder.addEdge(junction, flow, sink) } def ~>(sink: UndefinedSink[Out]): Unit = { - builder.addEdge(fan, flow, sink) + builder.addEdge(junction, flow, sink) } } implicit class FlowWithSourceOps[In, Out](val flow: FlowWithSource[In, Out]) extends AnyVal { - def ~>(sink: FanOperation[Out])(implicit builder: FlowGraphBuilder): FanOperation[Out] = { + def ~>(sink: Junction[Out])(implicit builder: FlowGraphBuilder): Junction[Out] = { builder.addEdge(flow, sink) sink } @@ -620,7 +620,7 @@ object FlowGraphImplicits { } class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { - def ~>(sink: FanOperation[Out]): FanOperation[Out] = { + def ~>(sink: Junction[Out]): Junction[Out] = { builder.addEdge(source, flow, sink) sink } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index ca2a3ee6d1..013501cbff 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -143,7 +143,7 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { /** * Create publishers and subscribers for fan-in and fan-out operations. */ - def materializeFan[In, Out](op: Ast.FanAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) + def materializeJunction[In, Out](op: Ast.JunctionAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) }