From 40371ebde4a9fc19d5705718ace76313b1c3d236 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 5 Nov 2014 20:12:24 +0100 Subject: [PATCH] +str #15205 Java API for FlexiRoute --- .../akka/stream/javadsl/FlexiRouteTest.java | 237 +++++++++++ .../stream/scaladsl/GraphFlexiRouteSpec.scala | 8 +- .../impl/ActorBasedFlowMaterializer.scala | 11 +- .../akka/stream/impl/FlexiRouteImpl.scala | 5 + .../akka/stream/javadsl/FlexiRoute.scala | 379 ++++++++++++++++++ .../akka/stream/scaladsl/FlexiRoute.scala | 13 +- 6 files changed, 636 insertions(+), 17 deletions(-) create mode 100644 akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java create mode 100644 akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java new file mode 100644 index 0000000000..21a43d1cee --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiRouteTest.java @@ -0,0 +1,237 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl; + +import java.util.Arrays; +import java.util.List; +import org.junit.ClassRule; +import org.junit.Test; +import static org.junit.Assert.assertEquals; +import java.util.concurrent.TimeUnit; + +import akka.actor.ActorSystem; +import akka.stream.FlowMaterializer; +import akka.stream.testkit.AkkaSpec; +import akka.stream.javadsl.FlexiRoute; +import akka.japi.Pair; + +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; + +public class FlexiRouteTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlexiRouteTest", + AkkaSpec.testConf()); + + final ActorSystem system = actorSystemResource.getSystem(); + + final FlowMaterializer materializer = FlowMaterializer.create(system); + + final Source in = Source.from(Arrays.asList("a", "b", "c", "d", "e")); + + final KeyedSink, Future>> out1 = Sink.>future(); + final KeyedSink, Future>> out2 = Sink.>future(); + + @Test + public void mustBuildSimpleFairRoute() throws Exception { + Fair route = new Fair(); + + MaterializedMap m = FlowGraph.builder().addEdge(in, route.in()) + .addEdge(route.output1(), Flow.of(String.class).grouped(100), out1) + .addEdge(route.output2(), Flow.of(String.class).grouped(100), out2).run(materializer); + + final List result1 = Await.result(m.get(out1), Duration.apply(3, TimeUnit.SECONDS)); + final List result2 = Await.result(m.get(out2), Duration.apply(3, TimeUnit.SECONDS)); + assertEquals(Arrays.asList("a", "c", "e"), result1); + assertEquals(Arrays.asList("b", "d"), result2); + } + + @Test + public void mustBuildSimpleRoundRobinRoute() throws Exception { + StrictRoundRobin route = new StrictRoundRobin(); + + MaterializedMap m = FlowGraph.builder().addEdge(in, route.in()) + .addEdge(route.output1(), Flow.of(String.class).grouped(100), out1) + .addEdge(route.output2(), Flow.of(String.class).grouped(100), out2).run(materializer); + + final List result1 = Await.result(m.get(out1), Duration.apply(3, TimeUnit.SECONDS)); + final List result2 = Await.result(m.get(out2), Duration.apply(3, TimeUnit.SECONDS)); + assertEquals(Arrays.asList("a", "c", "e"), result1); + assertEquals(Arrays.asList("b", "d"), result2); + } + + @Test + public void mustBuildSimpleUnzip() throws Exception { + Unzip unzip = new Unzip(); + + @SuppressWarnings({ "unchecked", "rawtypes" }) + Source> input = Source.from(Arrays.>asList(new Pair(1, "A"), new Pair( + 2, "B"), new Pair(3, "C"), new Pair(4, "D"))); + + final KeyedSink, Future>> outA = Sink.>future(); + final KeyedSink, Future>> outB = Sink.>future(); + + MaterializedMap m = FlowGraph.builder().addEdge(input, unzip.in()) + .addEdge(unzip.outputA, Flow.of(Integer.class).grouped(100), outA) + .addEdge(unzip.outputB, Flow.of(String.class).grouped(100), outB).run(materializer); + + final List result1 = Await.result(m.get(outA), Duration.apply(3, TimeUnit.SECONDS)); + final List result2 = Await.result(m.get(outB), Duration.apply(3, TimeUnit.SECONDS)); + assertEquals(Arrays.asList(1, 2, 3, 4), result1); + assertEquals(Arrays.asList("A", "B", "C", "D"), result2); + } + + /** + * This is fair in that sense that after enqueueing to an output it yields to + * other output if they are have requested elements. Or in other words, if all + * outputs have demand available at the same time then in finite steps all + * elements are enqueued to them. + */ + static public class Fair extends FlexiRoute { + + private final OutputPort output1 = createOutputPort(); + private final OutputPort output2 = createOutputPort(); + + public Fair() { + super("fairRoute"); + } + + public OutputPort output1() { + return output1; + } + + public OutputPort output2() { + return output2; + } + + @Override + public RouteLogic createRouteLogic() { + return new RouteLogic() { + @Override + public List outputHandles(int outputCount) { + return Arrays.asList(output1.handle(), output2.handle()); + } + + private State emitToAnyWithDemand = new State(demandFromAny(output1, output2)) { + @Override + public State onInput(RouteLogicContext ctx, OutputHandle preferred, T element) { + ctx.emit(preferred, element); + return sameState(); + } + }; + + @Override + public State initialState() { + return new State(demandFromAny(output1, output2)) { + @Override + public State onInput(RouteLogicContext ctx, OutputHandle preferred, T element) { + ctx.emit(preferred, element); + return emitToAnyWithDemand; + } + }; + } + }; + } + } + + /** + * It never skips an output while cycling but waits on it instead (closed + * outputs are skipped though). The fair route above is a non-strict + * round-robin (skips currently unavailable outputs). + */ + static public class StrictRoundRobin extends FlexiRoute { + + private final OutputPort output1 = createOutputPort(); + private final OutputPort output2 = createOutputPort(); + + public StrictRoundRobin() { + super("roundRobinRoute"); + } + + public OutputPort output1() { + return output1; + } + + public OutputPort output2() { + return output2; + } + + @Override + public RouteLogic createRouteLogic() { + return new RouteLogic() { + @Override + public List outputHandles(int outputCount) { + return Arrays.asList(output1.handle(), output2.handle()); + } + + private State toOutput1 = new State(demandFrom(output1)) { + @Override + public State onInput(RouteLogicContext ctx, OutputHandle preferred, T element) { + ctx.emit(output1, element); + return toOutput2; + } + }; + + private State toOutput2 = new State(demandFrom(output2)) { + @Override + public State onInput(RouteLogicContext ctx, OutputHandle preferred, T element) { + ctx.emit(output2, element); + return toOutput1; + } + }; + + @Override + public State initialState() { + return toOutput1; + } + + }; + } + } + + static public class Unzip extends FlexiRoute, Object> { + + public final OutputPort, A> outputA = createOutputPort(); + public final OutputPort, B> outputB = createOutputPort(); + + public Unzip() { + super("unzip"); + } + + @Override + public RouteLogic, Object> createRouteLogic() { + return new RouteLogic, Object>() { + + @Override + public List outputHandles(int outputCount) { + if (outputCount != 2) + throw new IllegalArgumentException("Unzip must have two connected outputs, was " + outputCount); + return Arrays.asList(outputA.handle(), outputB.handle()); + } + + @Override + public State, Object> initialState() { + return new State, Object>(demandFromAll(outputA, outputB)) { + @Override + public State, Object> onInput(RouteLogicContext, Object> ctx, OutputHandle preferred, + Pair element) { + ctx.emit(outputA, element.first()); + ctx.emit(outputB, element.second()); + return sameState(); + } + }; + } + + @Override + public CompletionHandling> initialCompletionHandling() { + return eagerClose(); + } + + }; + } + } + +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala index f9868db865..1b58460088 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphFlexiRouteSpec.scala @@ -11,7 +11,7 @@ import akka.stream.testkit.StreamTestKit.PublisherProbe import akka.stream.testkit.StreamTestKit.SubscriberProbe import akka.actor.ActorSystem -object GraphRouteSpec { +object GraphFlexiRouteSpec { /** * This is fair in that sense that after enqueueing to an output it yields to other output if @@ -72,8 +72,6 @@ object GraphRouteSpec { val outB = createOutputPort[B]() override def createRouteLogic() = new RouteLogic[(A, B)] { - var lastInA: Option[A] = None - var lastInB: Option[B] = None override def outputHandles(outputCount: Int) = { require(outputCount == 2, s"Unzip must have two connected outputs, was $outputCount") @@ -164,8 +162,8 @@ object GraphRouteSpec { } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class GraphRouteSpec extends AkkaSpec { - import GraphRouteSpec._ +class GraphFlexiRouteSpec extends AkkaSpec { + import GraphFlexiRouteSpec._ implicit val materializer = FlowMaterializer() diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 55cec396e7..4247c5525c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -109,13 +109,12 @@ private[akka] object Ast { override def name = "concat" } - case class FlexiMergeNode(merger: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode { - override def name = merger.name.getOrElse("flexiMerge") - + case class FlexiMergeNode(factory: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode { + override def name = factory.name.getOrElse("flexiMerge") } - case class RouteNode(route: FlexiRoute[Any]) extends FanOutAstNode { - override def name = route.name.getOrElse("route") + case class FlexiRouteNode(factory: FlexiRouteImpl.RouteLogicFactory[Any]) extends FanOutAstNode { + override def name = factory.name.getOrElse("flexiRoute") } } @@ -260,7 +259,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting actorOf(Balance.props(settings, outputCount, waitForAllDownstreams).withDispatcher(settings.dispatcher), actorName) case Ast.Unzip ⇒ actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName) - case Ast.RouteNode(route) ⇒ + case Ast.FlexiRouteNode(route) ⇒ actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()). withDispatcher(settings.dispatcher), actorName) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala index 3746159b56..2693b090be 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiRouteImpl.scala @@ -15,6 +15,11 @@ import akka.stream.impl.FanOut.OutputBunch private[akka] object FlexiRouteImpl { def props(settings: MaterializerSettings, outputCount: Int, routeLogic: FlexiRoute.RouteLogic[Any]): Props = Props(new FlexiRouteImpl(settings, outputCount, routeLogic)) + + trait RouteLogicFactory[In] { + def name: Option[String] + def createRouteLogic(): FlexiRoute.RouteLogic[In] + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala new file mode 100644 index 0000000000..2ecbc074ea --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala @@ -0,0 +1,379 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl + +import scala.annotation.varargs +import akka.stream.scaladsl +import scala.collection.immutable +import java.util.{ List ⇒ JList } +import akka.japi.Util.immutableIndexedSeq +import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory + +object FlexiRoute { + + /** + * @see [[OutputPort]] + */ + sealed trait OutputHandle extends scaladsl.FlexiRoute.OutputHandle + + /** + * An `OutputPort` can be connected to a [[Sink]] with the [[FlowGraphBuilder]]. + * The `OutputPort` is also an [[OutputHandle]] which you use to define to which + * downstream output to emit an element. + */ + class OutputPort[In, Out] private[akka] (val port: Int, parent: FlexiRoute[In, _]) + extends JunctionOutPort[Out] with OutputHandle { + + def handle: OutputHandle = this + + override val asScala: scaladsl.JunctionOutPort[Out] = new scaladsl.JunctionOutPort[Out] { + override def port: Int = OutputPort.this.port + override def vertex = parent.vertex + } + + /** + * INTERNAL API + */ + override private[akka] def portIndex: Int = port + + override def toString: String = s"OutputPort($port)" + } + + sealed trait DemandCondition + + /** + * Demand condition for the [[State]] that will be + * fulfilled when there are requests for elements from one specific downstream + * output. + * + * It is not allowed to use a handle that has been cancelled or + * has been completed. `IllegalArgumentException` is thrown if + * that is not obeyed. + */ + class DemandFrom(val output: OutputHandle) extends DemandCondition + + /** + * Demand condition for the [[State]] that will be + * fulfilled when there are requests for elements from any of the given downstream + * outputs. + * + * Cancelled and completed inputs are not used, i.e. it is allowed + * to specify them in the list of `outputs`. + */ + class DemandFromAny(val outputs: JList[OutputHandle]) extends DemandCondition + + /** + * Demand condition for the [[State]] that will be + * fulfilled when there are requests for elements from all of the given downstream + * outputs. + * + * Cancelled and completed outputs are not used, i.e. it is allowed + * to specify them in the list of `outputs`. + */ + class DemandFromAll(val outputs: JList[OutputHandle]) extends DemandCondition + + /** + * Context that is passed to the functions of [[State]] and [[CompletionHandling]]. + * The context provides means for performing side effects, such as emitting elements + * downstream. + */ + trait RouteLogicContext[In, Out] { + /** + * @return `true` if at least one element has been requested by the given downstream (output). + */ + def isDemandAvailable(output: OutputHandle): Boolean + + /** + * Emit one element downstream. It is only allowed to `emit` when + * [[#isDemandAvailable]] is `true` for the given `output`, otherwise + * `IllegalArgumentException` is thrown. + */ + def emit(output: OutputHandle, elem: Out): Unit + + /** + * Complete the given downstream successfully. + */ + def complete(output: OutputHandle): Unit + + /** + * Complete all downstreams successfully and cancel upstream. + */ + def complete(): Unit + + /** + * Complete the given downstream with failure. + */ + def error(output: OutputHandle, cause: Throwable): Unit + + /** + * Complete all downstreams with failure and cancel upstream. + */ + def error(cause: Throwable): Unit + + /** + * Replace current [[CompletionHandling]]. + */ + def changeCompletionHandling(completion: CompletionHandling[In]): Unit + } + + /** + * How to handle completion or error from upstream input and how to + * handle cancel from downstream output. + * + * The `onComplete` method is called the upstream input was completed successfully. + * It returns next behavior or [[#SameState]] to keep current behavior. + * + * The `onError` method is called when the upstream input was completed with failure. + * It returns next behavior or [[#SameState]] to keep current behavior. + * + * The `onCancel` method is called when a downstream output cancels. + * It returns next behavior or [[#SameState]] to keep current behavior. + */ + abstract class CompletionHandling[In] { + def onComplete(ctx: RouteLogicContext[In, Any]): Unit + def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit + def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] + } + + /** + * Definition of which outputs that must have requested elements and how to act + * on the read elements. When an element has been read [[#onInput]] is called and + * then it is ensured that the specified downstream outputs have requested at least + * one element, i.e. it is allowed to emit at least one element downstream with + * [[RouteLogicContext#emit]]. + * + * The `onInput` method is called when an `element` was read from upstream. + * The function returns next behavior or [[#SameState]] to keep current behavior. + */ + abstract class State[In, Out](val condition: DemandCondition) { + def onInput(ctx: RouteLogicContext[In, Out], preferredOutput: OutputHandle, element: In): State[In, _] + } + + /** + * The possibly stateful logic that reads from the input and enables emitting to downstream + * via the defined [[State]]. Handles completion, error and cancel via the defined + * [[CompletionHandling]]. + * + * Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]]. + */ + abstract class RouteLogic[In, Out] { + def outputHandles(outputCount: Int): JList[OutputHandle] + def initialState: State[In, Out] + def initialCompletionHandling: CompletionHandling[In] = defaultCompletionHandling + + /** + * Return this from [[State]] `onInput` to use same state for next element. + */ + def sameState[A]: State[In, A] = FlexiRoute.sameStateInstance.asInstanceOf[State[In, A]] + + /** + * Convenience to create a [[DemandFromAny]] condition. + */ + @varargs def demandFromAny(outputs: OutputHandle*): DemandFromAny = { + import scala.collection.JavaConverters._ + new DemandFromAny(outputs.asJava) + } + + /** + * Convenience to create a [[DemandFromAll]] condition. + */ + @varargs def demandFromAll(outputs: OutputHandle*): DemandFromAll = { + import scala.collection.JavaConverters._ + new DemandFromAll(outputs.asJava) + } + + /** + * Convenience to create a [[DemandFrom]] condition. + */ + def demandFrom(output: OutputHandle): DemandFrom = new DemandFrom(output) + + /** + * When an output cancels it continues with remaining outputs. + * Error or completion from upstream are immediately propagated. + */ + def defaultCompletionHandling: CompletionHandling[In] = + new CompletionHandling[In] { + override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = () + override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () + override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = + sameState + } + + /** + * Completes as soon as any output cancels. + * Error or completion from upstream are immediately propagated. + */ + def eagerClose[A]: CompletionHandling[In] = + new CompletionHandling[In] { + override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = () + override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = () + override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = { + ctx.complete() + sameState + } + } + } + + private val sameStateInstance = new State[Any, Any](new DemandFromAny(java.util.Collections.emptyList[OutputHandle])) { + override def onInput(ctx: RouteLogicContext[Any, Any], output: OutputHandle, element: Any): State[Any, Any] = + throw new UnsupportedOperationException("SameState.onInput should not be called") + + override def toString: String = "SameState" + } + + /** + * INTERNAL API + */ + private[akka] object Internal { + class RouteLogicWrapper[In](delegate: RouteLogic[In, _]) extends scaladsl.FlexiRoute.RouteLogic[In] { + override def outputHandles(outputCount: Int): immutable.IndexedSeq[scaladsl.FlexiRoute.OutputHandle] = + immutableIndexedSeq(delegate.outputHandles(outputCount)) + + override def initialState: this.State[_] = wrapState(delegate.initialState) + + override def initialCompletionHandling: this.CompletionHandling = + wrapCompletionHandling(delegate.initialCompletionHandling) + + private def wrapState[Out](delegateState: FlexiRoute.State[In, Out]): State[Out] = + if (sameStateInstance == delegateState) + SameState + else + State(convertDemandCondition(delegateState.condition)) { (ctx, outputHandle, elem) ⇒ + val newDelegateState = + delegateState.onInput(new RouteLogicContextWrapper(ctx), asJava(outputHandle), elem) + wrapState(newDelegateState) + } + + private def wrapCompletionHandling[Out]( + delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling = + CompletionHandling( + onComplete = ctx ⇒ { + delegateCompletionHandling.onComplete(new RouteLogicContextWrapper(ctx)) + }, + onError = (ctx, cause) ⇒ { + delegateCompletionHandling.onError(new RouteLogicContextWrapper(ctx), cause) + }, + onCancel = (ctx, outputHandle) ⇒ { + val newDelegateState = delegateCompletionHandling.onCancel( + new RouteLogicContextWrapper(ctx), asJava(outputHandle)) + wrapState(newDelegateState) + }) + + private def asJava(outputHandle: scaladsl.FlexiRoute.OutputHandle): OutputHandle = + outputHandle.asInstanceOf[OutputHandle] + + class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] { + override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output) + override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem) + override def complete(): Unit = delegate.complete() + override def complete(output: OutputHandle): Unit = delegate.complete(output) + override def error(cause: Throwable): Unit = delegate.error(cause) + override def error(output: OutputHandle, cause: Throwable): Unit = delegate.error(output, cause) + override def changeCompletionHandling(completion: FlexiRoute.CompletionHandling[In]): Unit = + delegate.changeCompletionHandling(wrapCompletionHandling(completion)) + } + + } + + def convertDemandCondition(condition: DemandCondition): scaladsl.FlexiRoute.DemandCondition = + condition match { + case c: DemandFromAny ⇒ scaladsl.FlexiRoute.DemandFromAny(immutableIndexedSeq(c.outputs)) + case c: DemandFromAll ⇒ scaladsl.FlexiRoute.DemandFromAll(immutableIndexedSeq(c.outputs)) + case c: DemandFrom ⇒ scaladsl.FlexiRoute.DemandFrom(c.output) + } + + } +} + +/** + * Base class for implementing custom route junctions. + * Such a junction always has one [[#in]] port and one or more output ports. + * The output ports are to be defined in the concrete subclass and are created with + * [[#createOutputPort]]. + * + * The concrete subclass must implement [[#createRouteLogic]] to define the [[FlexiRoute#RouteLogic]] + * that will be used when reading input elements and emitting output elements. + * The [[FlexiRoute#RouteLogic]] instance may be stateful, but the ``FlexiRoute`` instance + * must not hold mutable state, since it may be shared across several materialized ``FlowGraph`` + * instances. + * + * Note that a `FlexiRoute` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. If the `name` is not specified the `FlexiRoute` instance can only + * be used at one place (one vertex) in the `FlowGraph`. + * + * @param name optional name of the junction in the [[FlowGraph]], + */ +abstract class FlexiRoute[In, Out](val name: Option[String]) { + import FlexiRoute._ + import scaladsl.FlowGraphInternal + import akka.stream.impl.Ast + + def this() = this(None) + def this(name: String) = this(Option(name)) + + private var outputCount = 0 + + // hide the internal vertex things from subclass, and make it possible to create new instance + private class RouteVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex { + override def minimumInputCount = 1 + override def maximumInputCount = 1 + override def minimumOutputCount = 2 + override def maximumOutputCount = outputCount + + override private[akka] val astNode = { + val factory = new RouteLogicFactory[Any] { + override def name: Option[String] = vertexName + override def createRouteLogic(): scaladsl.FlexiRoute.RouteLogic[Any] = + new Internal.RouteLogicWrapper(FlexiRoute.this.createRouteLogic().asInstanceOf[RouteLogic[Any, Any]]) + } + Ast.FlexiRouteNode(factory) + } + + override def name = vertexName + + final override def newInstance() = new RouteVertex(None) + } + + /** + * INTERNAL API + */ + private[akka] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(name) + + /** + * Input port of the `FlexiRoute` junction. A [[Source]] can be connected to this output + * with the [[FlowGraphBuilder]]. + */ + val in: JunctionInPort[In] = new JunctionInPort[In] { + override val asScala: scaladsl.JunctionInPort[In] = new scaladsl.JunctionInPort[In] { + override def vertex = FlexiRoute.this.vertex + type NextT = Nothing + override def next = scaladsl.NoNext + } + } + + /** + * Concrete subclass is supposed to define one or more output ports and + * they are created by calling this method. Each [[FlexiRoute.OutputPort]] can be + * connected to a [[Sink]] with the [[FlowGraphBuilder]]. + * The `OutputPort` is also an [[FlexiRoute.OutputHandle]] which you use to define to which + * downstream output to emit an element. + */ + protected final def createOutputPort[T](): OutputPort[In, T] = { + val port = outputCount + outputCount += 1 + new OutputPort(port, parent = this) + } + + /** + * Create the stateful logic that will be used when reading input elements + * and emitting output elements. Create a new instance every time. + */ + def createRouteLogic(): RouteLogic[In, Out] + + override def toString = name match { + case Some(n) ⇒ n + case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode()) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala index 9e968286f2..241ad7712e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiRoute.scala @@ -5,13 +5,14 @@ package akka.stream.scaladsl import scala.collection.immutable import akka.stream.impl.Ast +import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory object FlexiRoute { /** * @see [[OutputPort]] */ - sealed trait OutputHandle { + trait OutputHandle { private[akka] def portIndex: Int } @@ -51,7 +52,7 @@ object FlexiRoute { * fulfilled when there are requests for elements from any of the given downstream * outputs. * - * Cancelled and completed inputs are not used, i.e. it is allowed + * Cancelled and completed outputs are not used, i.e. it is allowed * to specify them in the list of `outputs`. */ final case class DemandFromAny(outputs: OutputHandle*) extends DemandCondition @@ -64,7 +65,7 @@ object FlexiRoute { * fulfilled when there are requests for elements from all of the given downstream * outputs. * - * Cancelled and completed inputs are not used, i.e. it is allowed + * Cancelled and completed outputs are not used, i.e. it is allowed * to specify them in the list of `outputs`. */ final case class DemandFromAll(outputs: OutputHandle*) extends DemandCondition @@ -210,7 +211,7 @@ object FlexiRoute { * * @param name optional name of the junction in the [[FlowGraph]], */ -abstract class FlexiRoute[In](val name: Option[String]) { +abstract class FlexiRoute[In](val name: Option[String]) extends RouteLogicFactory[In] { import FlexiRoute._ def this(name: String) = this(Some(name)) @@ -225,7 +226,7 @@ abstract class FlexiRoute[In](val name: Option[String]) { override def minimumOutputCount = 2 override def maximumOutputCount = outputCount - override private[akka] val astNode = Ast.RouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]]) + override private[akka] val astNode = Ast.FlexiRouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]]) override def name = vertexName final override private[scaladsl] def newInstance() = new RouteVertex(None) @@ -260,7 +261,7 @@ abstract class FlexiRoute[In](val name: Option[String]) { * Create the stateful logic that will be used when reading input elements * and emitting output elements. Create a new instance every time. */ - def createRouteLogic(): RouteLogic[In] + override def createRouteLogic(): RouteLogic[In] override def toString = name match { case Some(n) ⇒ n