diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index 8b07ce5dea..962d94d3a1 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -20,6 +20,7 @@ import akka.stream.impl.ConflateImpl import akka.stream.impl.ExpandImpl import akka.stream.impl.BufferImpl import akka.stream.impl.FanoutProcessorImpl +import akka.stream.impl.BlackholeSubscriber /** * INTERNAL API @@ -61,6 +62,18 @@ private[akka] object Ast { override def name = "buffer" } + sealed trait FanAstNode { + def name: String + } + + case object Merge extends FanAstNode { + override def name = "merge" + } + + case object Broadcast extends FanAstNode { + override def name = "broadcast" + } + } /** @@ -170,6 +183,23 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting throw new IllegalStateException(s"Stream supervisor must be a local actor, was [${supervisor.getClass.getName}]") } + override def materializeFan[In, Out](op: Ast.FanAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) = op match { + case Ast.Merge ⇒ + // FIXME real impl + require(outputCount == 1) + (Vector.fill(inputCount)(dummySubscriber[In]), List(dummyPublisher[Out])) + case Ast.Broadcast ⇒ + // FIXME real impl + require(inputCount == 1) + (List(dummySubscriber[In]), Vector.fill(outputCount)(dummyPublisher[Out])) + } + + // FIXME remove + private def dummySubscriber[T]: Subscriber[T] = new BlackholeSubscriber[T](1) + private def dummyPublisher[T]: Publisher[T] = new Publisher[T] { + def subscribe(subscriber: Subscriber[_ >: T]): Unit = subscriber.onComplete() + } + } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 97ae52210e..4bb2bd7e36 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -10,7 +10,7 @@ import scalax.collection.immutable.{ Graph ⇒ ImmutableGraph } import org.reactivestreams.Subscriber import akka.stream.impl.BlackholeSubscriber import org.reactivestreams.Publisher -import org.reactivestreams.Processor +import akka.stream.impl2.Ast /** * Fan-in and fan-out vertices in the [[FlowGraph]] implements @@ -376,17 +376,12 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph def run()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { import scalax.collection.GraphTraversal._ - // FIXME remove when real materialization is done - def dummyProcessor(name: String): Processor[Any, Any] = new BlackholeSubscriber[Any](1) with Publisher[Any] with Processor[Any, Any] { - def subscribe(subscriber: Subscriber[_ >: Any]): Unit = subscriber.onComplete() - override def toString = name - } - // start with sinks val startingNodes = graph.nodes.filter(n ⇒ n.isLeaf && n.diSuccessors.isEmpty) case class Memo(visited: Set[graph.EdgeT] = Set.empty, - nodeProcessor: Map[graph.NodeT, Processor[Any, Any]] = Map.empty, + downstreamSubscriber: Map[graph.EdgeT, Subscriber[Any]] = Map.empty, + upstreamPublishers: Map[graph.EdgeT, Publisher[Any]] = Map.empty, sources: Map[Source[_], FlowWithSink[Any, Any]] = Map.empty, materializedSinks: Map[Sink[_], Any] = Map.empty) @@ -403,8 +398,8 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph val flow = edge.label.asInstanceOf[ProcessorFlow[Any, Any]] // returns the materialized sink, if any - def connectProcessorToDownstream(processor: Processor[Any, Any]): Option[(SinkWithKey[_, _], Any)] = { - val f = flow.withSource(PublisherSource(processor)) + def connectToDownstream(publisher: Publisher[Any]): Option[(SinkWithKey[_, _], Any)] = { + val f = flow.withSource(PublisherSource(publisher)) edge.to.value match { case SinkVertex(sink: SinkWithKey[_, _]) ⇒ val mf = f.withSink(sink.asInstanceOf[Sink[Any]]).run() @@ -413,36 +408,55 @@ class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraph f.withSink(sink.asInstanceOf[Sink[Any]]).run() None case _ ⇒ - f.withSink(SubscriberSink(memo.nodeProcessor(edge.to))).run() + f.withSink(SubscriberSink(memo.downstreamSubscriber(edge))).run() None } } edge.from.value match { case SourceVertex(src) ⇒ - val f = flow.withSink(SubscriberSink(memo.nodeProcessor(edge.to))) + val f = flow.withSink(SubscriberSink(memo.downstreamSubscriber(edge))) // connect the source with the flow later memo.copy(visited = memo.visited + edge, sources = memo.sources.updated(src, f)) - case _: FanOperation[_] ⇒ - val processor = edge.from.value match { - case merge: Merge[_] ⇒ - // FIXME materialize Merge - dummyProcessor("merge-processor") - case bcast: Broadcast[_] ⇒ - memo.nodeProcessor.getOrElse(edge.from, { - // FIXME materialize Broadcast - dummyProcessor("bcast-processor") - }) - case other ⇒ - throw new IllegalArgumentException("Unknown fan operation: " + other) - } - val materializedSink = connectProcessorToDownstream(processor) + case merge: Merge[_] ⇒ + // one subscriber for each incoming edge of the merge vertex + val (subscribers, publishers) = + materializer.materializeFan[Any, Any](Ast.Merge, edge.from.inDegree, 1) + val publisher = publishers.head + val edgeSubscribers = edge.from.incoming.zip(subscribers) + val materializedSink = connectToDownstream(publisher) memo.copy( visited = memo.visited + edge, - nodeProcessor = memo.nodeProcessor.updated(edge.from, processor), + downstreamSubscriber = memo.downstreamSubscriber ++ edgeSubscribers, materializedSinks = memo.materializedSinks ++ materializedSink) + + case bcast: Broadcast[_] ⇒ + if (memo.upstreamPublishers.contains(edge)) { + // broadcast vertex already materialized + val materializedSink = connectToDownstream(memo.upstreamPublishers(edge)) + memo.copy( + visited = memo.visited + edge, + materializedSinks = memo.materializedSinks ++ materializedSink) + } else { + // one publisher for each outgoing edge of the broadcast vertex + val (subscribers, publishers) = + materializer.materializeFan[Any, Any](Ast.Broadcast, 1, edge.from.outDegree) + val subscriber = subscribers.head + val edgePublishers = edge.from.outgoing.zip(publishers).toMap + val publisher = edgePublishers(edge) + val materializedSink = connectToDownstream(publisher) + memo.copy( + visited = memo.visited + edge, + downstreamSubscriber = memo.downstreamSubscriber + (edge.from.incoming.head -> subscriber), + upstreamPublishers = memo.upstreamPublishers ++ edgePublishers, + materializedSinks = memo.materializedSinks ++ materializedSink) + } + + case other ⇒ + throw new IllegalArgumentException("Unknown fan operation: " + other) + } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala index f444f0ed94..ca2a3ee6d1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowMaterializer.scala @@ -3,9 +3,12 @@ */ package akka.stream.scaladsl2 +import scala.collection.immutable import akka.actor.{ ActorContext, ActorRefFactory, ActorSystem, ExtendedActorSystem } import akka.stream.MaterializerSettings import akka.stream.impl2.{ ActorBasedFlowMaterializer, Ast, FlowNameCounter, StreamSupervisor } +import org.reactivestreams.Subscriber +import org.reactivestreams.Publisher object FlowMaterializer { @@ -137,6 +140,11 @@ abstract class FlowMaterializer(val settings: MaterializerSettings) { */ def materialize[In, Out](source: Source[In], sink: Sink[Out], ops: List[Ast.AstNode]): MaterializedFlow + /** + * Create publishers and subscribers for fan-in and fan-out operations. + */ + def materializeFan[In, Out](op: Ast.FanAstNode, inputCount: Int, outputCount: Int): (immutable.Seq[Subscriber[In]], immutable.Seq[Publisher[Out]]) + } /** diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala index dd6ca709a3..b6e2192d56 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -36,7 +36,7 @@ class FlowGraphCompileSpec extends AkkaSpec { addEdge(in1, f1, merge). addEdge(in2, f2, merge). addEdge(merge, f3, out1) - } + }.run() } "build simple broadcast" in { @@ -46,7 +46,7 @@ class FlowGraphCompileSpec extends AkkaSpec { addEdge(in1, f1, bcast). addEdge(bcast, f2, out1). addEdge(bcast, f3, out2) - } + }.run() } "build simple merge - broadcast" in { @@ -59,7 +59,7 @@ class FlowGraphCompileSpec extends AkkaSpec { addEdge(merge, f3, bcast). addEdge(bcast, f4, out1). addEdge(bcast, f5, out2) - } + }.run() } "build simple merge - broadcast with implicits" in { @@ -70,7 +70,7 @@ class FlowGraphCompileSpec extends AkkaSpec { in1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> out1 in2 ~> f4 ~> merge bcast ~> f5 ~> out2 - } + }.run() } /** @@ -161,7 +161,7 @@ class FlowGraphCompileSpec extends AkkaSpec { FlowGraph(partial2) { implicit b ⇒ b.attachSink(undefinedSink1, out1) b.attachSink(UndefinedSink[String]("sink2"), out2) - } + }.run() } }