diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala index 173381081f..aff4a9be41 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlowSpec.scala @@ -240,6 +240,7 @@ class GraphFlowSpec extends AkkaSpec { validateProbe(probe, 10, Set(2, 4, 6, 8, 10, 20, 40, 60, 80, 100)) } } + "turned into sinks" should { "work with a Source" in { val in = UndefinedSource[Int] @@ -360,6 +361,41 @@ class GraphFlowSpec extends AkkaSpec { validateProbe(probe, stdRequests, stdResult) } + + "allow connecting source to sink directly" in { + val probe = StreamTestKit.SubscriberProbe[Int]() + val inSource = Source.subscriber[Int] + val outSink = Sink.publisher[Int] + + val source = Source[Int]() { implicit b ⇒ + import FlowGraphImplicits._ + val out = UndefinedSink[Int] + inSource ~> out + out + } + + val sink = Sink[Int]() { implicit b ⇒ + import FlowGraphImplicits._ + val in = UndefinedSource[Int] + in ~> outSink + in + } + + val mm = FlowGraph { implicit b ⇒ + import FlowGraphImplicits._ + val broadcast = Broadcast[Int] + source ~> sink + }.run() + + val subscriber = mm.get(inSource) + val publisher = mm.get(outSink) + + source1.runWith(Sink.publisher).subscribe(subscriber) + publisher.subscribe(probe) + + validateProbe(probe, 4, (0 to 3).toSet) + } + } } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 0830199446..c91c426f81 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -85,6 +85,10 @@ private[akka] object Ast { sealed trait FanInAstNode extends JunctionAstNode sealed trait FanOutAstNode extends JunctionAstNode + case object IdentityAstNode extends JunctionAstNode { + override def name = "identity" + } + case object Merge extends FanInAstNode { override def name = "merge" } @@ -274,6 +278,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting impl ! FanOut.ExposedPublishers(publishers.asInstanceOf[immutable.Seq[ActorPublisher[Any]]]) val subscriber = ActorSubscriber[In](impl) (List(subscriber), publishers) + + case identity @ Ast.IdentityAstNode ⇒ + val id: Processor[In, Out] = processorForNode(identityTransform, identity.name, 1).asInstanceOf[Processor[In, Out]] + (List(id), List(id)) } } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 00d6a66a17..fe4bc04ec1 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -57,6 +57,20 @@ private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOu override private[akka] def next = this } +private[akka] final class Identity[T]() extends FlowGraphInternal.InternalVertex with Junction[T] { + def name: Option[String] = Some("identity") + + override private[akka] val vertex = this + override val minimumInputCount: Int = 1 + override val maximumInputCount: Int = 1 + override val minimumOutputCount: Int = 1 + override val maximumOutputCount: Int = 1 + + override private[akka] def astNode = Ast.IdentityAstNode + + final override private[scaladsl] def newInstance() = new Identity[T]() +} + object Merge { /** * Create a new anonymous `Merge` vertex with the specified output type. @@ -916,7 +930,7 @@ class FlowGraphBuilder private[akka] (_graph: DirectedGraphBuilder[FlowGraphInte private def checkAmbigiousKeyedElement(vertex: Vertex): Unit = { def warningMessage(el: Any): String = s"An `${el}` instance MUST NOT be used more than once in a `FlowGraph` to avoid ambiguity. " + - s"Use individual instances instead the same one multiple times instead. Nodes are: ${graph.nodes}" + s"Use individual instances instead of the same one multiple times. Nodes are: ${graph.nodes}" vertex match { case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_]] ⇒ require(!graph.contains(v), warningMessage(v.source)) @@ -1091,11 +1105,21 @@ class FlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[Flo if (edges.size == 1) { val edge = edges.head (edge.from.label, edge.to.label) match { - case (sourceVertex: SourceVertex, sinkVertex: SinkVertex) ⇒ + case (sourceVertex @ SourceVertex(_: ActorFlowSource[_]), sinkVertex @ SinkVertex(_: ActorFlowSink[_])) ⇒ + // Only an ActorFlow{Source,Sink} can be materialized as a Flow. val pipe = edge.label.pipe runSimple(sourceVertex, sinkVertex, pipe) + case (sourceVertex: SourceVertex, sinkVertex: SinkVertex) ⇒ + // One or two Graph{Source,Sink} must be materialized as a graph. + // Add identity Junction in-between because graph materialization + // algorithm materializes sinks when it finds a Junction. + FlowGraph { b ⇒ + val id = new Identity[Any] + b.addEdge(sourceVertex.source, id) + b.addEdge(id.asInstanceOf[Identity[Nothing]], sinkVertex.sink) + }.run() case _ ⇒ - runGraph() + throw new IllegalStateException(s"Unable to materialize FlowGraph with one edge connecting ${edge.from.label} and ${edge.to.label}.") } } else runGraph()