Merge pull request #16222 from 2m/wip-16163-graph-source-to-graph-sink

=str #16163 allow connecting GraphSource to GraphSink directly
This commit is contained in:
Martynas Mickevičius 2014-11-06 12:57:53 +02:00
commit a4a3cd2635
3 changed files with 71 additions and 3 deletions

View file

@ -240,6 +240,7 @@ class GraphFlowSpec extends AkkaSpec {
validateProbe(probe, 10, Set(2, 4, 6, 8, 10, 20, 40, 60, 80, 100))
}
}
"turned into sinks" should {
"work with a Source" in {
val in = UndefinedSource[Int]
@ -360,6 +361,41 @@ class GraphFlowSpec extends AkkaSpec {
validateProbe(probe, stdRequests, stdResult)
}
"allow connecting source to sink directly" in {
val probe = StreamTestKit.SubscriberProbe[Int]()
val inSource = Source.subscriber[Int]
val outSink = Sink.publisher[Int]
val source = Source[Int]() { implicit b
import FlowGraphImplicits._
val out = UndefinedSink[Int]
inSource ~> out
out
}
val sink = Sink[Int]() { implicit b
import FlowGraphImplicits._
val in = UndefinedSource[Int]
in ~> outSink
in
}
val mm = FlowGraph { implicit b
import FlowGraphImplicits._
val broadcast = Broadcast[Int]
source ~> sink
}.run()
val subscriber = mm.get(inSource)
val publisher = mm.get(outSink)
source1.runWith(Sink.publisher).subscribe(subscriber)
publisher.subscribe(probe)
validateProbe(probe, 4, (0 to 3).toSet)
}
}
}
}

View file

@ -85,6 +85,10 @@ private[akka] object Ast {
sealed trait FanInAstNode extends JunctionAstNode
sealed trait FanOutAstNode extends JunctionAstNode
case object IdentityAstNode extends JunctionAstNode {
override def name = "identity"
}
case object Merge extends FanInAstNode {
override def name = "merge"
}
@ -274,6 +278,10 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
impl ! FanOut.ExposedPublishers(publishers.asInstanceOf[immutable.Seq[ActorPublisher[Any]]])
val subscriber = ActorSubscriber[In](impl)
(List(subscriber), publishers)
case identity @ Ast.IdentityAstNode
val id: Processor[In, Out] = processorForNode(identityTransform, identity.name, 1).asInstanceOf[Processor[In, Out]]
(List(id), List(id))
}
}

View file

@ -57,6 +57,20 @@ private[akka] sealed trait Junction[T] extends JunctionInPort[T] with JunctionOu
override private[akka] def next = this
}
private[akka] final class Identity[T]() extends FlowGraphInternal.InternalVertex with Junction[T] {
def name: Option[String] = Some("identity")
override private[akka] val vertex = this
override val minimumInputCount: Int = 1
override val maximumInputCount: Int = 1
override val minimumOutputCount: Int = 1
override val maximumOutputCount: Int = 1
override private[akka] def astNode = Ast.IdentityAstNode
final override private[scaladsl] def newInstance() = new Identity[T]()
}
object Merge {
/**
* Create a new anonymous `Merge` vertex with the specified output type.
@ -916,7 +930,7 @@ class FlowGraphBuilder private[akka] (_graph: DirectedGraphBuilder[FlowGraphInte
private def checkAmbigiousKeyedElement(vertex: Vertex): Unit = {
def warningMessage(el: Any): String =
s"An `${el}` instance MUST NOT be used more than once in a `FlowGraph` to avoid ambiguity. " +
s"Use individual instances instead the same one multiple times instead. Nodes are: ${graph.nodes}"
s"Use individual instances instead of the same one multiple times. Nodes are: ${graph.nodes}"
vertex match {
case v: SourceVertex if v.source.isInstanceOf[KeyedSource[_]] require(!graph.contains(v), warningMessage(v.source))
@ -1091,11 +1105,21 @@ class FlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[Flo
if (edges.size == 1) {
val edge = edges.head
(edge.from.label, edge.to.label) match {
case (sourceVertex: SourceVertex, sinkVertex: SinkVertex)
case (sourceVertex @ SourceVertex(_: ActorFlowSource[_]), sinkVertex @ SinkVertex(_: ActorFlowSink[_]))
// Only an ActorFlow{Source,Sink} can be materialized as a Flow.
val pipe = edge.label.pipe
runSimple(sourceVertex, sinkVertex, pipe)
case (sourceVertex: SourceVertex, sinkVertex: SinkVertex)
// One or two Graph{Source,Sink} must be materialized as a graph.
// Add identity Junction in-between because graph materialization
// algorithm materializes sinks when it finds a Junction.
FlowGraph { b
val id = new Identity[Any]
b.addEdge(sourceVertex.source, id)
b.addEdge(id.asInstanceOf[Identity[Nothing]], sinkVertex.sink)
}.run()
case _
runGraph()
throw new IllegalStateException(s"Unable to materialize FlowGraph with one edge connecting ${edge.from.label} and ${edge.to.label}.")
}
} else
runGraph()