From 400e49e88d01ed678040f553352ba2a8de00bc1e Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Thu, 4 Sep 2014 14:31:22 +0200 Subject: [PATCH] =str #15735 Prototype of FlowGraph Add documentation --- .../scala/akka/stream/scaladsl2/Flow.scala | 27 +- .../akka/stream/scaladsl2/FlowGraph.scala | 609 ++++++++++++++++++ .../scala/akka/stream/scaladsl2/Sink.scala | 6 +- .../scala/akka/stream/scaladsl2/Source.scala | 4 + .../stream/scaladsl2/CombinatorSpec.scala | 22 - .../{FlowSpec.scala => FlowCompileSpec.scala} | 3 +- .../scaladsl2/FlowGraphCompileSpec.scala | 168 +++++ 7 files changed, 802 insertions(+), 37 deletions(-) create mode 100644 akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala delete mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala rename akka-stream/src/test/scala/akka/stream/scaladsl2/{FlowSpec.scala => FlowCompileSpec.scala} (99%) create mode 100644 akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala index a3d991025f..dbab8c22a8 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Flow.scala @@ -17,20 +17,10 @@ import scala.language.higherKinds */ sealed trait Flow -/** - * Marker interface for flows that have a free (attachable) input side. - */ -sealed trait HasNoSource[-In] extends Flow - -/** - * Marker interface for flows that have a free (attachable) output side. - */ -sealed trait HasNoSink[+Out] extends Flow - /** * Operations offered by flows with a free output side: the DSL flows left-to-right only. */ -trait FlowOps[-In, +Out] extends HasNoSink[Out] { +trait FlowOps[-In, +Out] { type Repr[-I, +O] <: FlowOps[I, O] // Storing ops in reverse order @@ -49,7 +39,7 @@ trait FlowOps[-In, +Out] extends HasNoSink[Out] { /** * Flow without attached input and without attached output, can be used as a `Processor`. */ -final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In, Out] with HasNoSource[In] { +final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In, Out] { override type Repr[-I, +O] = ProcessorFlow[I, O] override protected def andThen[U](op: AstNode): Repr[In, U] = this.copy(ops = op :: ops) @@ -67,7 +57,7 @@ final case class ProcessorFlow[-In, +Out](ops: List[AstNode]) extends FlowOps[In /** * Flow with attached output, can be used as a `Subscriber`. */ -final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) extends HasNoSource[In] { +final case class FlowWithSink[-In, +Out](private[scaladsl2] val output: Sink[Out @uncheckedVariance], ops: List[AstNode]) { def withSource(in: Source[In]): RunnableFlow[In, Out] = RunnableFlow(in, output, ops) def withoutSink: ProcessorFlow[In, Out] = ProcessorFlow(ops) @@ -128,11 +118,22 @@ final case class RunnableFlow[-In, +Out](private[scaladsl2] val input: Source[In materializer.materialize(input, output, ops) } +/** + * Returned by [[RunnableFlow#run]] and can be used as parameter to the + * accessor method to retrieve the materialized `Source` or `Sink`, e.g. + * [[SubscriberSource#subscriber]] or [[PublisherSink#publisher]]. + */ class MaterializedFlow(sourceKey: AnyRef, matSource: Any, sinkKey: AnyRef, matSink: Any) extends MaterializedSource with MaterializedSink { + /** + * Do not call directly. Use accessor method in the concrete `Source`, e.g. [[SubscriberSource#subscriber]]. + */ override def getSourceFor[T](key: SourceWithKey[_, T]): T = if (key == sourceKey) matSource.asInstanceOf[T] else throw new IllegalArgumentException(s"Source key [$key] doesn't match the source [$sourceKey] of this flow") + /** + * Do not call directly. Use accessor method in the concrete `Sink`, e.g. [[PublisherSink#publisher]]. + */ def getSinkFor[T](key: SinkWithKey[_, T]): T = if (key == sinkKey) matSink.asInstanceOf[T] else throw new IllegalArgumentException(s"Sink key [$key] doesn't match the sink [$sinkKey] of this flow") diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala new file mode 100644 index 0000000000..d710474049 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -0,0 +1,609 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import scala.language.existentials +import scalax.collection.edge.LDiEdge +import scalax.collection.mutable.Graph +import scalax.collection.immutable.{ Graph ⇒ ImmutableGraph } +import org.reactivestreams.Subscriber +import akka.stream.impl.BlackholeSubscriber +import org.reactivestreams.Publisher +import org.reactivestreams.Processor + +/** + * Fan-in and fan-out vertices in the [[FlowGraph]] implements + * this marker interface. + */ +sealed trait FanOperation[T] extends FlowGraphInternal.Vertex +/** + * Fan-out vertices in the [[FlowGraph]] implements this marker interface. + */ +trait FanOutOperation[T] extends FanOperation[T] +/** + * Fan-in vertices in the [[FlowGraph]] implements this marker interface. + */ +trait FanInOperation[T] extends FanOperation[T] + +object Merge { + /** + * Create a new anonymous `Merge` vertex with the specified output type. + * Note that a `Merge` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def apply[T]: Merge[T] = new Merge[T](None) + /** + * Create a named `Merge` vertex with the specified output type. + * Note that a `Merge` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. Calling this method several times with the same name + * returns instances that are `equal`. + */ + def apply[T](name: String): Merge[T] = new Merge[T](Some(name)) +} +/** + * Merge several streams, taking elements as they arrive from input streams + * (picking randomly when several have elements ready). + * + * When building the [[FlowGraph]] you must connect one or more input flows/sources + * and one output flow/sink to the `Merge` vertex. + */ +final class Merge[T](override val name: Option[String]) extends FanInOperation[T] with FlowGraphInternal.NamedVertex + +object Broadcast { + /** + * Create a new anonymous `Broadcast` vertex with the specified input type. + * Note that a `Broadcast` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def apply[T]: Broadcast[T] = new Broadcast[T](None) + /** + * Create a named `Broadcast` vertex with the specified input type. + * Note that a `Broadcast` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. Calling this method several times with the same name + * returns instances that are `equal`. + */ + def apply[T](name: String): Broadcast[T] = new Broadcast[T](Some(name)) +} +/** + * Fan-out the stream to several streams. Each element is produced to + * the other streams. It will not shutdown until the subscriptions for at least + * two downstream subscribers have been established. + */ +final class Broadcast[T](override val name: Option[String]) extends FanOutOperation[T] with FlowGraphInternal.NamedVertex + +object UndefinedSink { + /** + * Create a new anonymous `UndefinedSink` vertex with the specified input type. + * Note that a `UndefinedSink` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def apply[T]: UndefinedSink[T] = new UndefinedSink[T](None) + /** + * Create a named `UndefinedSink` vertex with the specified input type. + * Note that a `UndefinedSink` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. Calling this method several times with the same name + * returns instances that are `equal`. + */ + def apply[T](name: String): UndefinedSink[T] = new UndefinedSink[T](Some(name)) +} +/** + * It is possible to define a [[PartialFlowGraph]] with output flows that are not connected + * yet by using this placeholder instead of the real [[Sink]]. Later the placeholder can + * be replaced with [[FlowGraphBuilder#attachSink]]. + */ +final class UndefinedSink[T](override val name: Option[String]) extends FlowGraphInternal.NamedVertex + +object UndefinedSource { + /** + * Create a new anonymous `UndefinedSource` vertex with the specified input type. + * Note that a `UndefinedSource` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def apply[T]: UndefinedSource[T] = new UndefinedSource[T](None) + /** + * Create a named `UndefinedSource` vertex with the specified output type. + * Note that a `UndefinedSource` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. Calling this method several times with the same name + * returns instances that are `equal`. + */ + def apply[T](name: String): UndefinedSource[T] = new UndefinedSource[T](Some(name)) +} +/** + * It is possible to define a [[PartialFlowGraph]] with input flows that are not connected + * yet by using this placeholder instead of the real [[Source]]. Later the placeholder can + * be replaced with [[FlowGraphBuilder#attachSource]]. + */ +final class UndefinedSource[T](override val name: Option[String]) extends FlowGraphInternal.NamedVertex + +/** + * INTERNAL API + */ +private[akka] object FlowGraphInternal { + + sealed trait Vertex + case class SourceVertex(source: Source[_]) extends Vertex { + override def toString = source.toString + } + case class SinkVertex(sink: Sink[_]) extends Vertex { + override def toString = sink.toString + } + + sealed trait NamedVertex extends Vertex { + def name: Option[String] + + final override def equals(obj: Any): Boolean = + obj match { + case other: NamedVertex ⇒ + if (name.isDefined) (getClass == other.getClass && name == other.name) else (this eq other) + case _ ⇒ false + } + + final override def hashCode: Int = name match { + case Some(n) ⇒ n.hashCode() + case None ⇒ super.hashCode() + } + + override def toString = name match { + case Some(n) ⇒ n + case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode()) + } + } + +} + +/** + * Builder of [[FlowGraph]] and [[PartialFlowGraph]]. + * Syntactic sugar is provided by [[FlowGraphImplicits]]. + */ +class FlowGraphBuilder private (graph: Graph[FlowGraphInternal.Vertex, LDiEdge]) { + import FlowGraphInternal._ + + private[akka] def this() = this(Graph.empty[FlowGraphInternal.Vertex, LDiEdge]) + + private[akka] def this(immutableGraph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) = + this(Graph.from(edges = immutableGraph.edges.map(e ⇒ LDiEdge(e.from.value, e.to.value)(e.label)).toIterable)) + + private implicit val edgeFactory = scalax.collection.edge.LDiEdge + + def addEdge[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = { + val sourceVertex = SourceVertex(source) + checkAddSourceSinkPrecondition(sourceVertex) + checkAddFanPrecondition(sink, in = true) + graph.addLEdge(sourceVertex, sink)(flow) + this + } + + def addEdge[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = { + checkAddSourceSinkPrecondition(source) + checkAddFanPrecondition(sink, in = true) + graph.addLEdge(source, sink)(flow) + this + } + + def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: Sink[Out]): this.type = { + val sinkVertex = SinkVertex(sink) + checkAddSourceSinkPrecondition(sinkVertex) + checkAddFanPrecondition(source, in = false) + graph.addLEdge(source, sinkVertex)(flow) + this + } + + def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: UndefinedSink[Out]): this.type = { + checkAddSourceSinkPrecondition(sink) + checkAddFanPrecondition(source, in = false) + graph.addLEdge(source, sink)(flow) + this + } + + def addEdge[In, Out](source: FanOperation[In], flow: ProcessorFlow[In, Out], sink: FanOperation[Out]): this.type = { + checkAddFanPrecondition(source, in = false) + checkAddFanPrecondition(sink, in = true) + graph.addLEdge(source, sink)(flow) + this + } + + def addEdge[In, Out](flow: FlowWithSource[In, Out], sink: FanOperation[Out]): this.type = { + addEdge(flow.input, flow.withoutSource, sink) + this + } + + def addEdge[In, Out](source: FanOperation[In], flow: FlowWithSink[In, Out]): this.type = { + addEdge(source, flow.withoutSink, flow.output) + this + } + + def attachSink[Out](token: UndefinedSink[Out], sink: Sink[Out]): this.type = { + graph.find(token) match { + case Some(existing) ⇒ + require(existing.value.isInstanceOf[UndefinedSink[_]], s"Flow already attached to a sink [${existing.value}]") + val edge = existing.incoming.head + graph.remove(existing) + graph.addLEdge(edge.from.value, SinkVertex(sink))(edge.label) + case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSink [${token}]") + } + this + } + + def attachSource[In](token: UndefinedSource[In], source: Source[In]): this.type = { + graph.find(token) match { + case Some(existing) ⇒ + require(existing.value.isInstanceOf[UndefinedSource[_]], s"Flow already attached to a source [${existing.value}]") + val edge = existing.outgoing.head + graph.remove(existing) + graph.addLEdge(SourceVertex(source), edge.to.value)(edge.label) + case None ⇒ throw new IllegalArgumentException(s"No matching UndefinedSource [${token}]") + } + this + } + + private def checkAddSourceSinkPrecondition(node: Vertex): Unit = + require(graph.find(node) == None, s"[$node] instance is already used in this flow graph") + + private def checkAddFanPrecondition(fan: FanOperation[_], in: Boolean): Unit = { + fan match { + case _: FanOutOperation[_] if in ⇒ + graph.find(fan) match { + case Some(existing) if existing.incoming.nonEmpty ⇒ + throw new IllegalArgumentException(s"Fan-out [$fan] is already attached to input [${existing.incoming.head}]") + case _ ⇒ // ok + } + case _: FanInOperation[_] if !in ⇒ + graph.find(fan) match { + case Some(existing) if existing.outgoing.nonEmpty ⇒ + throw new IllegalArgumentException(s"Fan-in [$fan] is already attached to output [${existing.outgoing.head}]") + case _ ⇒ // ok + } + case _ ⇒ // ok + } + } + + /** + * INTERNAL API + */ + private[akka] def build(): FlowGraph = { + checkPartialBuildPreconditions() + checkBuildPreconditions() + new FlowGraph(immutableGraph()) + } + + /** + * INTERNAL API + */ + private[akka] def partialBuild(): PartialFlowGraph = { + checkPartialBuildPreconditions() + new PartialFlowGraph(immutableGraph()) + } + + //convert it to an immutable.Graph + private def immutableGraph(): ImmutableGraph[Vertex, LDiEdge] = + ImmutableGraph.from(edges = graph.edges.map(e ⇒ LDiEdge(e.from.value, e.to.value)(e.label)).toIterable) + + private def checkPartialBuildPreconditions(): Unit = { + graph.findCycle match { + case None ⇒ + case Some(cycle) ⇒ throw new IllegalArgumentException("Cycle detected, not supported yet. " + cycle) + } + } + + private def checkBuildPreconditions(): Unit = { + val undefinedSourcesSinks = graph.nodes.filter { + _.value match { + case _: UndefinedSource[_] | _: UndefinedSink[_] ⇒ true + case x ⇒ false + } + } + if (undefinedSourcesSinks.nonEmpty) { + val formatted = undefinedSourcesSinks.map(n ⇒ n.value match { + case u: UndefinedSource[_] ⇒ s"$u -> ${n.outgoing.head.label} -> ${n.outgoing.head.to}" + case u: UndefinedSink[_] ⇒ s"${n.incoming.head.from} -> ${n.incoming.head.label} -> $u" + }) + throw new IllegalArgumentException("Undefined sources or sinks: " + formatted.mkString(", ")) + } + + // we will be able to relax these checks + graph.nodes.foreach { node ⇒ + node.value match { + case merge: Merge[_] ⇒ + require(node.incoming.size == 2, "Merge must have two incoming edges: " + node.incoming) + require(node.outgoing.size == 1, "Merge must have one outgoing edge: " + node.outgoing) + case bcast: Broadcast[_] ⇒ + require(node.incoming.size == 1, "Broadcast must have one incoming edge: " + node.incoming) + require(node.outgoing.size == 2, "Broadcast must have two outgoing edges: " + node.outgoing) + case _ ⇒ // no check for other node types + } + } + + require(graph.nonEmpty, "Graph must not be empty") + require(graph.exists(graph having ((node = { n ⇒ n.isLeaf && n.diSuccessors.isEmpty }))), + "Graph must have at least one sink") + require(graph.exists(graph having ((node = { n ⇒ n.isLeaf && n.diPredecessors.isEmpty }))), + "Graph must have at least one source") + + require(graph.isConnected, "Graph must be connected") + } + +} + +/** + * Build a [[FlowGraph]] by starting with one of the `apply` methods. + * Syntactic sugar is provided by [[FlowGraphImplicits]]. + * + * `IllegalArgumentException` is throw if the built graph is invalid. + */ +object FlowGraph { + /** + * Build a [[FlowGraph]] from scratch. + */ + def apply(block: FlowGraphBuilder ⇒ Unit): FlowGraph = + apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LDiEdge])(block) + + /** + * Continue building a [[FlowGraph]] from an existing `PartialFlowGraph`. + * For example you can attach undefined sources and sinks with + * [[FlowGraphBuilder#attachSource]] and [[FlowGraphBuilder#attachSink]] + */ + def apply(partialFlowGraph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ Unit): FlowGraph = + apply(partialFlowGraph.graph)(block) + + /** + * Continue building a [[FlowGraph]] from an existing `FlowGraph`. + * For example you can connect more output flows to a [[Broadcast]] vertex. + */ + def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder ⇒ Unit): FlowGraph = + apply(flowGraph.graph)(block) + + private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge])(block: FlowGraphBuilder ⇒ Unit): FlowGraph = { + val builder = new FlowGraphBuilder(graph) + block(builder) + builder.build() + } +} + +/** + * Concrete flow graph that can be materialized with [[#run]]. + */ +class FlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) { + import FlowGraphInternal._ + + /** + * Materialize the `FlowGraph` and attach all sinks and sources. + */ + def run()(implicit materializer: FlowMaterializer): MaterializedFlowGraph = { + import scalax.collection.GraphTraversal._ + + // FIXME remove when real materialization is done + def dummyProcessor(name: String): Processor[Any, Any] = new BlackholeSubscriber[Any](1) with Publisher[Any] with Processor[Any, Any] { + def subscribe(subscriber: Subscriber[Any]): Unit = subscriber.onComplete() + override def toString = name + } + + // start with sinks + val startingNodes = graph.nodes.filter(n ⇒ n.isLeaf && n.diSuccessors.isEmpty) + + case class Memo(visited: Set[graph.EdgeT] = Set.empty, + nodeProcessor: Map[graph.NodeT, Processor[Any, Any]] = Map.empty, + sources: Map[Source[_], FlowWithSink[Any, Any]] = Map.empty, + materializedSinks: Map[Sink[_], Any] = Map.empty) + + val result = startingNodes.foldLeft(Memo()) { + case (memo, start) ⇒ + + val traverser = graph.innerEdgeTraverser(start, parameters = Parameters(direction = Predecessors, kind = BreadthFirst), + ordering = graph.defaultEdgeOrdering) + traverser.foldLeft(memo) { + case (memo, edge) ⇒ + if (memo.visited(edge)) { + memo + } else { + val flow = edge.label.asInstanceOf[ProcessorFlow[Any, Any]] + + // returns the materialized sink, if any + def connectProcessorToDownstream(processor: Processor[Any, Any]): Option[(SinkWithKey[_, _], Any)] = { + val f = flow.withSource(PublisherSource(processor)) + edge.to.value match { + case SinkVertex(sink: SinkWithKey[_, _]) ⇒ + val mf = f.withSink(sink.asInstanceOf[Sink[Any]]).run() + Some(sink -> mf.getSinkFor(sink)) + case SinkVertex(sink) ⇒ + f.withSink(sink.asInstanceOf[Sink[Any]]).run() + None + case _ ⇒ + f.withSink(SubscriberSink(memo.nodeProcessor(edge.to))).run() + None + } + } + + edge.from.value match { + case SourceVertex(src) ⇒ + val f = flow.withSink(SubscriberSink(memo.nodeProcessor(edge.to))) + // connect the source with the flow later + memo.copy(visited = memo.visited + edge, + sources = memo.sources.updated(src, f)) + + case _: FanOperation[_] ⇒ + val processor = edge.from.value match { + case merge: Merge[_] ⇒ + // FIXME materialize Merge + dummyProcessor("merge-processor") + case bcast: Broadcast[_] ⇒ + memo.nodeProcessor.getOrElse(edge.from, { + // FIXME materialize Broadcast + dummyProcessor("bcast-processor") + }) + case other ⇒ + throw new IllegalArgumentException("Unknown fan operation: " + other) + } + val materializedSink = connectProcessorToDownstream(processor) + memo.copy( + visited = memo.visited + edge, + nodeProcessor = memo.nodeProcessor.updated(edge.from, processor), + materializedSinks = memo.materializedSinks ++ materializedSink) + } + } + + } + + } + + // connect all input sources as the last thing + val materializedSources = result.sources.foldLeft(Map.empty[Source[_], Any]) { + case (acc, (src, flow)) ⇒ + val mf = flow.withSource(src).run() + src match { + case srcKey: SourceWithKey[_, _] ⇒ acc.updated(src, mf.getSourceFor(srcKey)) + case _ ⇒ acc + } + } + + new MaterializedFlowGraph(materializedSources, result.materializedSinks) + } + +} + +/** + * Build a [[PartialFlowGraph]] by starting with one of the `apply` methods. + * Syntactic sugar is provided by [[FlowGraphImplicits]]. + * + * `IllegalArgumentException` is throw if the built graph is invalid. + */ +object PartialFlowGraph { + /** + * Build a [[PartialFlowGraph]] from scratch. + */ + def apply(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = + apply(ImmutableGraph.empty[FlowGraphInternal.Vertex, LDiEdge])(block) + + /** + * Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`. + */ + def apply(partialFlowGraph: PartialFlowGraph)(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = + apply(partialFlowGraph.graph)(block) + + /** + * Continue building a [[PartialFlowGraph]] from an existing `PartialFlowGraph`. + */ + def apply(flowGraph: FlowGraph)(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = + apply(flowGraph.graph)(block) + + private def apply(graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge])(block: FlowGraphBuilder ⇒ Unit): PartialFlowGraph = { + val builder = new FlowGraphBuilder(graph) + block(builder) + builder.partialBuild() + } + +} + +/** + * `PartialFlowGraph` may have sources and sinks that are not attach, and it can therefore not + * be `run`. + */ +class PartialFlowGraph private[akka] (private[akka] val graph: ImmutableGraph[FlowGraphInternal.Vertex, LDiEdge]) { + import FlowGraphInternal._ + + def undefinedSources: Set[UndefinedSource[_]] = + graph.nodes.collect { + case n if n.value.isInstanceOf[UndefinedSource[_]] ⇒ n.value.asInstanceOf[UndefinedSource[_]] + }(collection.breakOut) + + def undefinedSinks: Set[UndefinedSink[_]] = + graph.nodes.collect { + case n if n.value.isInstanceOf[UndefinedSink[_]] ⇒ n.value.asInstanceOf[UndefinedSink[_]] + }(collection.breakOut) + +} + +/** + * Returned by [[FlowGraph#run]] and can be used as parameter to the + * accessor method to retrieve the materialized `Source` or `Sink`, e.g. + * [[SubscriberSource#subscriber]] or [[PublisherSink#publisher]]. + */ +class MaterializedFlowGraph(materializedSources: Map[Source[_], Any], materializedSinks: Map[Sink[_], Any]) + extends MaterializedSource with MaterializedSink { + + /** + * Do not call directly. Use accessor method in the concrete `Source`, e.g. [[SubscriberSource#subscriber]]. + */ + override def getSourceFor[T](key: SourceWithKey[_, T]): T = + materializedSources.get(key) match { + case Some(matSource) ⇒ matSource.asInstanceOf[T] + case None ⇒ + throw new IllegalArgumentException(s"Source key [$key] doesn't exist in this flow graph") + } + + /** + * Do not call directly. Use accessor method in the concrete `Sink`, e.g. [[PublisherSink#publisher]]. + */ + def getSinkFor[T](key: SinkWithKey[_, T]): T = + materializedSinks.get(key) match { + case Some(matSink) ⇒ matSink.asInstanceOf[T] + case None ⇒ + throw new IllegalArgumentException(s"Sink key [$key] doesn't exist in this flow graph") + } +} + +/** + * Implicit conversions that provides syntactic sugar for building flow graphs. + */ +object FlowGraphImplicits { + implicit class SourceOps[In](val source: Source[In]) extends AnyVal { + def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): SourceNextStep[In, Out] = { + new SourceNextStep(source, flow, builder) + } + } + + class SourceNextStep[In, Out](source: Source[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { + def ~>(sink: FanOperation[Out]): FanOperation[Out] = { + builder.addEdge(source, flow, sink) + sink + } + } + + implicit class FanOps[In](val fan: FanOperation[In]) extends AnyVal { + def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): FanNextStep[In, Out] = { + new FanNextStep(fan, flow, builder) + } + } + + class FanNextStep[In, Out](fan: FanOperation[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { + def ~>(sink: FanOperation[Out]): FanOperation[Out] = { + builder.addEdge(fan, flow, sink) + sink + } + + def ~>(sink: Sink[Out]): Unit = { + builder.addEdge(fan, flow, sink) + } + + def ~>(sink: UndefinedSink[Out]): Unit = { + builder.addEdge(fan, flow, sink) + } + } + + implicit class FlowWithSourceOps[In, Out](val flow: FlowWithSource[In, Out]) extends AnyVal { + def ~>(sink: FanOperation[Out])(implicit builder: FlowGraphBuilder): FanOperation[Out] = { + builder.addEdge(flow, sink) + sink + } + } + + // FIXME add more for FlowWithSource and FlowWithSink, and shortcuts injecting identity flows + + implicit class UndefinedSourceOps[In](val source: UndefinedSource[In]) extends AnyVal { + def ~>[Out](flow: ProcessorFlow[In, Out])(implicit builder: FlowGraphBuilder): UndefinedSourceNextStep[In, Out] = { + new UndefinedSourceNextStep(source, flow, builder) + } + } + + class UndefinedSourceNextStep[In, Out](source: UndefinedSource[In], flow: ProcessorFlow[In, Out], builder: FlowGraphBuilder) { + def ~>(sink: FanOperation[Out]): FanOperation[Out] = { + builder.addEdge(source, flow, sink) + sink + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala index 2e1ca8bf88..5c2203c188 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Sink.scala @@ -19,7 +19,7 @@ import java.util.concurrent.atomic.AtomicReference * implement [[SinkWithKey]] or [[SimpleSink]], otherwise a custom [[FlowMaterializer]] * will have to be used to be able to attach them. * - * All Sinks defined in this package rely upon an [[ActorBasedFlowMaterializer]] being + * All Sinks defined in this package rely upon an [[akka.stream.impl2.ActorBasedFlowMaterializer]] being * made available to them in order to use the attach method. Other * FlowMaterializers can be used but must then implement the functionality of these * Sink nodes themselves (or construct an ActorBasedFlowMaterializer). @@ -54,6 +54,10 @@ trait SimpleSink[-Out] extends Sink[Out] { * operations. */ def isActive: Boolean = false + + // these are unique keys, case class equality would break them + final override def equals(other: Any): Boolean = super.equals(other) + final override def hashCode: Int = super.hashCode } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala index e3e5f3e1a0..fc66571808 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/Source.scala @@ -121,6 +121,10 @@ trait SimpleSource[+In] extends Source[In] { * operations. */ def isActive: Boolean = false + + // these are unique keys, case class equality would break them + final override def equals(other: Any): Boolean = super.equals(other) + final override def hashCode: Int = super.hashCode } /** diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala deleted file mode 100644 index 57440f2e66..0000000000 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/CombinatorSpec.scala +++ /dev/null @@ -1,22 +0,0 @@ -/** - * Copyright (C) 2014 Typesafe Inc. - */ -package akka.stream.scaladsl2 - -import org.scalatest.Matchers -import org.scalatest.WordSpec -import scala.collection.immutable -import scala.concurrent.duration._ -import scala.concurrent.Future -import akka.stream.OverflowStrategy - -class CombinatorSpec extends WordSpec with Matchers { - val f = FlowFrom[Int] - - "Linear simple combinators in Flow" should { - "map" in { - val t: ProcessorFlow[Int, Int] = f.map(_ * 2) - } - } - -} diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCompileSpec.scala similarity index 99% rename from akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala rename to akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCompileSpec.scala index 093e8adc7a..c94432a045 100644 --- a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowSpec.scala +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowCompileSpec.scala @@ -9,7 +9,7 @@ import akka.stream.testkit.AkkaSpec import scala.collection.immutable.Seq import scala.concurrent.Future -class FlowSpec extends AkkaSpec { +class FlowCompileSpec extends AkkaSpec { val intSeq = IterableSource(Seq(1, 2, 3)) val strSeq = IterableSource(Seq("a", "b", "c")) @@ -143,4 +143,5 @@ class FlowSpec extends AkkaSpec { "closed.ToFuture" shouldNot compile } } + } diff --git a/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala new file mode 100644 index 0000000000..dd6ca709a3 --- /dev/null +++ b/akka-stream/src/test/scala/akka/stream/scaladsl2/FlowGraphCompileSpec.scala @@ -0,0 +1,168 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl2 + +import akka.stream.testkit.AkkaSpec +import akka.stream.Transformer + +class FlowGraphCompileSpec extends AkkaSpec { + + implicit val mat = FlowMaterializer() + + def op[In, Out]: () ⇒ Transformer[In, Out] = { () ⇒ + new Transformer[In, Out] { + override def onNext(elem: In) = List(elem.asInstanceOf[Out]) + } + } + + val f1 = FlowFrom[String].transform("f1", op[String, String]) + val f2 = FlowFrom[String].transform("f2", op[String, String]) + val f3 = FlowFrom[String].transform("f3", op[String, String]) + val f4 = FlowFrom[String].transform("f4", op[String, String]) + val f5 = FlowFrom[String].transform("f5", op[String, String]) + val f6 = FlowFrom[String].transform("f6", op[String, String]) + + val in1 = IterableSource(List("a", "b", "c")) + val in2 = IterableSource(List("d", "e", "f")) + val out1 = PublisherSink[String] + val out2 = FutureSink[String] + + "FlowGraph" should { + "build simple merge" in { + FlowGraph { b ⇒ + val merge = Merge[String] + b. + addEdge(in1, f1, merge). + addEdge(in2, f2, merge). + addEdge(merge, f3, out1) + } + } + + "build simple broadcast" in { + FlowGraph { b ⇒ + val bcast = Broadcast[String] + b. + addEdge(in1, f1, bcast). + addEdge(bcast, f2, out1). + addEdge(bcast, f3, out2) + } + } + + "build simple merge - broadcast" in { + FlowGraph { b ⇒ + val merge = Merge[String] + val bcast = Broadcast[String] + b. + addEdge(in1, f1, merge). + addEdge(in2, f2, merge). + addEdge(merge, f3, bcast). + addEdge(bcast, f4, out1). + addEdge(bcast, f5, out2) + } + } + + "build simple merge - broadcast with implicits" in { + FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val merge = Merge[String] + val bcast = Broadcast[String] + in1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> out1 + in2 ~> f4 ~> merge + bcast ~> f5 ~> out2 + } + } + + /** + * in ---> f1 -+-> f2 -+-> f3 ---> out1 + * ^ | + * | V + * f5 <-+- f4 + * | + * V + * f6 ---> out2 + */ + "detect cycle in " in { + intercept[IllegalArgumentException] { + FlowGraph { b ⇒ + val merge = Merge[String] + val bcast1 = Broadcast[String] + val bcast2 = Broadcast[String] + b. + addEdge(in1, f1, merge). + addEdge(merge, f2, bcast1). + addEdge(bcast1, f3, out1). + addEdge(bcast1, f4, bcast2). + addEdge(bcast2, f5, merge). // cycle + addEdge(bcast2, f6, out2) + } + }.getMessage.toLowerCase should include("cycle") + + } + + "express complex topologies in a readable way" in { + intercept[IllegalArgumentException] { + FlowGraph { implicit b ⇒ + val merge = Merge[String] + val bcast1 = Broadcast[String] + val bcast2 = Broadcast[String] + import FlowGraphImplicits._ + in1 ~> f1 ~> merge ~> f2 ~> bcast1 ~> f3 ~> out1 + bcast1 ~> f4 ~> bcast2 ~> f5 ~> merge + bcast2 ~> f6 ~> out2 + } + }.getMessage.toLowerCase should include("cycle") + } + + "attachSource and attachSink" in { + val mg = FlowGraph { b ⇒ + val merge = Merge[String] + val undefinedSrc1 = UndefinedSource[String] + val undefinedSrc2 = UndefinedSource[String] + val undefinedSink1 = UndefinedSink[String] + b. + addEdge(undefinedSrc1, f1, merge). + addEdge(UndefinedSource[String]("src2"), f2, merge). + addEdge(merge, f3, undefinedSink1) + + b.attachSource(undefinedSrc1, in1) + b.attachSource(UndefinedSource[String]("src2"), in2) + b.attachSink(undefinedSink1, out1) + + }.run() + out1.publisher(mg) should not be (null) + } + + "build partial flow graphs" in { + val undefinedSrc1 = UndefinedSource[String] + val undefinedSrc2 = UndefinedSource[String] + val undefinedSink1 = UndefinedSink[String] + val bcast = Broadcast[String] + + val partial1 = PartialFlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val merge = Merge[String] + undefinedSrc1 ~> f1 ~> merge ~> f2 ~> bcast ~> f3 ~> undefinedSink1 + undefinedSrc2 ~> f4 ~> merge + + } + partial1.undefinedSources should be(Set(undefinedSrc1, undefinedSrc2)) + partial1.undefinedSinks should be(Set(undefinedSink1)) + + val partial2 = PartialFlowGraph(partial1) { implicit b ⇒ + import FlowGraphImplicits._ + b.attachSource(undefinedSrc1, in1) + b.attachSource(undefinedSrc2, in2) + bcast ~> f5 ~> UndefinedSink[String]("sink2") + } + partial2.undefinedSources should be(Set.empty) + partial2.undefinedSinks should be(Set(undefinedSink1, UndefinedSink[String]("sink2"))) + + FlowGraph(partial2) { implicit b ⇒ + b.attachSink(undefinedSink1, out1) + b.attachSink(UndefinedSink[String]("sink2"), out2) + } + } + + } +}