Merge pull request #16227 from akka/wip-15205-route-java-patriknw
+str #15205 Java API for FlexiRoute
This commit is contained in:
commit
d036fb106b
6 changed files with 636 additions and 17 deletions
|
|
@ -0,0 +1,237 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import akka.actor.ActorSystem;
|
||||
import akka.stream.FlowMaterializer;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.stream.javadsl.FlexiRoute;
|
||||
import akka.japi.Pair;
|
||||
|
||||
import scala.concurrent.Await;
|
||||
import scala.concurrent.Future;
|
||||
import scala.concurrent.duration.Duration;
|
||||
|
||||
public class FlexiRouteTest {
|
||||
|
||||
@ClassRule
|
||||
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlexiRouteTest",
|
||||
AkkaSpec.testConf());
|
||||
|
||||
final ActorSystem system = actorSystemResource.getSystem();
|
||||
|
||||
final FlowMaterializer materializer = FlowMaterializer.create(system);
|
||||
|
||||
final Source<String> in = Source.from(Arrays.asList("a", "b", "c", "d", "e"));
|
||||
|
||||
final KeyedSink<List<String>, Future<List<String>>> out1 = Sink.<List<String>>future();
|
||||
final KeyedSink<List<String>, Future<List<String>>> out2 = Sink.<List<String>>future();
|
||||
|
||||
@Test
|
||||
public void mustBuildSimpleFairRoute() throws Exception {
|
||||
Fair<String> route = new Fair<String>();
|
||||
|
||||
MaterializedMap m = FlowGraph.builder().addEdge(in, route.in())
|
||||
.addEdge(route.output1(), Flow.of(String.class).grouped(100), out1)
|
||||
.addEdge(route.output2(), Flow.of(String.class).grouped(100), out2).run(materializer);
|
||||
|
||||
final List<String> result1 = Await.result(m.get(out1), Duration.apply(3, TimeUnit.SECONDS));
|
||||
final List<String> result2 = Await.result(m.get(out2), Duration.apply(3, TimeUnit.SECONDS));
|
||||
assertEquals(Arrays.asList("a", "c", "e"), result1);
|
||||
assertEquals(Arrays.asList("b", "d"), result2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBuildSimpleRoundRobinRoute() throws Exception {
|
||||
StrictRoundRobin<String> route = new StrictRoundRobin<String>();
|
||||
|
||||
MaterializedMap m = FlowGraph.builder().addEdge(in, route.in())
|
||||
.addEdge(route.output1(), Flow.of(String.class).grouped(100), out1)
|
||||
.addEdge(route.output2(), Flow.of(String.class).grouped(100), out2).run(materializer);
|
||||
|
||||
final List<String> result1 = Await.result(m.get(out1), Duration.apply(3, TimeUnit.SECONDS));
|
||||
final List<String> result2 = Await.result(m.get(out2), Duration.apply(3, TimeUnit.SECONDS));
|
||||
assertEquals(Arrays.asList("a", "c", "e"), result1);
|
||||
assertEquals(Arrays.asList("b", "d"), result2);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBuildSimpleUnzip() throws Exception {
|
||||
Unzip<Integer, String> unzip = new Unzip<Integer, String>();
|
||||
|
||||
@SuppressWarnings({ "unchecked", "rawtypes" })
|
||||
Source<Pair<Integer, String>> input = Source.from(Arrays.<Pair<Integer, String>>asList(new Pair(1, "A"), new Pair(
|
||||
2, "B"), new Pair(3, "C"), new Pair(4, "D")));
|
||||
|
||||
final KeyedSink<List<Integer>, Future<List<Integer>>> outA = Sink.<List<Integer>>future();
|
||||
final KeyedSink<List<String>, Future<List<String>>> outB = Sink.<List<String>>future();
|
||||
|
||||
MaterializedMap m = FlowGraph.builder().addEdge(input, unzip.in())
|
||||
.addEdge(unzip.outputA, Flow.of(Integer.class).grouped(100), outA)
|
||||
.addEdge(unzip.outputB, Flow.of(String.class).grouped(100), outB).run(materializer);
|
||||
|
||||
final List<Integer> result1 = Await.result(m.get(outA), Duration.apply(3, TimeUnit.SECONDS));
|
||||
final List<String> result2 = Await.result(m.get(outB), Duration.apply(3, TimeUnit.SECONDS));
|
||||
assertEquals(Arrays.asList(1, 2, 3, 4), result1);
|
||||
assertEquals(Arrays.asList("A", "B", "C", "D"), result2);
|
||||
}
|
||||
|
||||
/**
|
||||
* This is fair in that sense that after enqueueing to an output it yields to
|
||||
* other output if they are have requested elements. Or in other words, if all
|
||||
* outputs have demand available at the same time then in finite steps all
|
||||
* elements are enqueued to them.
|
||||
*/
|
||||
static public class Fair<T> extends FlexiRoute<T, T> {
|
||||
|
||||
private final OutputPort<T, T> output1 = createOutputPort();
|
||||
private final OutputPort<T, T> output2 = createOutputPort();
|
||||
|
||||
public Fair() {
|
||||
super("fairRoute");
|
||||
}
|
||||
|
||||
public OutputPort<T, T> output1() {
|
||||
return output1;
|
||||
}
|
||||
|
||||
public OutputPort<T, T> output2() {
|
||||
return output2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RouteLogic<T, T> createRouteLogic() {
|
||||
return new RouteLogic<T, T>() {
|
||||
@Override
|
||||
public List<OutputHandle> outputHandles(int outputCount) {
|
||||
return Arrays.asList(output1.handle(), output2.handle());
|
||||
}
|
||||
|
||||
private State<T, T> emitToAnyWithDemand = new State<T, T>(demandFromAny(output1, output2)) {
|
||||
@Override
|
||||
public State<T, T> onInput(RouteLogicContext<T, T> ctx, OutputHandle preferred, T element) {
|
||||
ctx.emit(preferred, element);
|
||||
return sameState();
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public State<T, T> initialState() {
|
||||
return new State<T, T>(demandFromAny(output1, output2)) {
|
||||
@Override
|
||||
public State<T, T> onInput(RouteLogicContext<T, T> ctx, OutputHandle preferred, T element) {
|
||||
ctx.emit(preferred, element);
|
||||
return emitToAnyWithDemand;
|
||||
}
|
||||
};
|
||||
}
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* It never skips an output while cycling but waits on it instead (closed
|
||||
* outputs are skipped though). The fair route above is a non-strict
|
||||
* round-robin (skips currently unavailable outputs).
|
||||
*/
|
||||
static public class StrictRoundRobin<T> extends FlexiRoute<T, T> {
|
||||
|
||||
private final OutputPort<T, T> output1 = createOutputPort();
|
||||
private final OutputPort<T, T> output2 = createOutputPort();
|
||||
|
||||
public StrictRoundRobin() {
|
||||
super("roundRobinRoute");
|
||||
}
|
||||
|
||||
public OutputPort<T, T> output1() {
|
||||
return output1;
|
||||
}
|
||||
|
||||
public OutputPort<T, T> output2() {
|
||||
return output2;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RouteLogic<T, T> createRouteLogic() {
|
||||
return new RouteLogic<T, T>() {
|
||||
@Override
|
||||
public List<OutputHandle> outputHandles(int outputCount) {
|
||||
return Arrays.asList(output1.handle(), output2.handle());
|
||||
}
|
||||
|
||||
private State<T, T> toOutput1 = new State<T, T>(demandFrom(output1)) {
|
||||
@Override
|
||||
public State<T, T> onInput(RouteLogicContext<T, T> ctx, OutputHandle preferred, T element) {
|
||||
ctx.emit(output1, element);
|
||||
return toOutput2;
|
||||
}
|
||||
};
|
||||
|
||||
private State<T, T> toOutput2 = new State<T, T>(demandFrom(output2)) {
|
||||
@Override
|
||||
public State<T, T> onInput(RouteLogicContext<T, T> ctx, OutputHandle preferred, T element) {
|
||||
ctx.emit(output2, element);
|
||||
return toOutput1;
|
||||
}
|
||||
};
|
||||
|
||||
@Override
|
||||
public State<T, T> initialState() {
|
||||
return toOutput1;
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
static public class Unzip<A, B> extends FlexiRoute<Pair<A, B>, Object> {
|
||||
|
||||
public final OutputPort<Pair<A, B>, A> outputA = createOutputPort();
|
||||
public final OutputPort<Pair<A, B>, B> outputB = createOutputPort();
|
||||
|
||||
public Unzip() {
|
||||
super("unzip");
|
||||
}
|
||||
|
||||
@Override
|
||||
public RouteLogic<Pair<A, B>, Object> createRouteLogic() {
|
||||
return new RouteLogic<Pair<A, B>, Object>() {
|
||||
|
||||
@Override
|
||||
public List<OutputHandle> outputHandles(int outputCount) {
|
||||
if (outputCount != 2)
|
||||
throw new IllegalArgumentException("Unzip must have two connected outputs, was " + outputCount);
|
||||
return Arrays.asList(outputA.handle(), outputB.handle());
|
||||
}
|
||||
|
||||
@Override
|
||||
public State<Pair<A, B>, Object> initialState() {
|
||||
return new State<Pair<A, B>, Object>(demandFromAll(outputA, outputB)) {
|
||||
@Override
|
||||
public State<Pair<A, B>, Object> onInput(RouteLogicContext<Pair<A, B>, Object> ctx, OutputHandle preferred,
|
||||
Pair<A, B> element) {
|
||||
ctx.emit(outputA, element.first());
|
||||
ctx.emit(outputB, element.second());
|
||||
return sameState();
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
@Override
|
||||
public CompletionHandling<Pair<A, B>> initialCompletionHandling() {
|
||||
return eagerClose();
|
||||
}
|
||||
|
||||
};
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -11,7 +11,7 @@ import akka.stream.testkit.StreamTestKit.PublisherProbe
|
|||
import akka.stream.testkit.StreamTestKit.SubscriberProbe
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
object GraphRouteSpec {
|
||||
object GraphFlexiRouteSpec {
|
||||
|
||||
/**
|
||||
* This is fair in that sense that after enqueueing to an output it yields to other output if
|
||||
|
|
@ -72,8 +72,6 @@ object GraphRouteSpec {
|
|||
val outB = createOutputPort[B]()
|
||||
|
||||
override def createRouteLogic() = new RouteLogic[(A, B)] {
|
||||
var lastInA: Option[A] = None
|
||||
var lastInB: Option[B] = None
|
||||
|
||||
override def outputHandles(outputCount: Int) = {
|
||||
require(outputCount == 2, s"Unzip must have two connected outputs, was $outputCount")
|
||||
|
|
@ -164,8 +162,8 @@ object GraphRouteSpec {
|
|||
}
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class GraphRouteSpec extends AkkaSpec {
|
||||
import GraphRouteSpec._
|
||||
class GraphFlexiRouteSpec extends AkkaSpec {
|
||||
import GraphFlexiRouteSpec._
|
||||
|
||||
implicit val materializer = FlowMaterializer()
|
||||
|
||||
|
|
|
|||
|
|
@ -109,13 +109,12 @@ private[akka] object Ast {
|
|||
override def name = "concat"
|
||||
}
|
||||
|
||||
case class FlexiMergeNode(merger: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode {
|
||||
override def name = merger.name.getOrElse("flexiMerge")
|
||||
|
||||
case class FlexiMergeNode(factory: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode {
|
||||
override def name = factory.name.getOrElse("flexiMerge")
|
||||
}
|
||||
|
||||
case class RouteNode(route: FlexiRoute[Any]) extends FanOutAstNode {
|
||||
override def name = route.name.getOrElse("route")
|
||||
case class FlexiRouteNode(factory: FlexiRouteImpl.RouteLogicFactory[Any]) extends FanOutAstNode {
|
||||
override def name = factory.name.getOrElse("flexiRoute")
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -260,7 +259,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
|
|||
actorOf(Balance.props(settings, outputCount, waitForAllDownstreams).withDispatcher(settings.dispatcher), actorName)
|
||||
case Ast.Unzip ⇒
|
||||
actorOf(Unzip.props(settings).withDispatcher(settings.dispatcher), actorName)
|
||||
case Ast.RouteNode(route) ⇒
|
||||
case Ast.FlexiRouteNode(route) ⇒
|
||||
actorOf(FlexiRouteImpl.props(settings, outputCount, route.createRouteLogic()).
|
||||
withDispatcher(settings.dispatcher), actorName)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -15,6 +15,11 @@ import akka.stream.impl.FanOut.OutputBunch
|
|||
private[akka] object FlexiRouteImpl {
|
||||
def props(settings: MaterializerSettings, outputCount: Int, routeLogic: FlexiRoute.RouteLogic[Any]): Props =
|
||||
Props(new FlexiRouteImpl(settings, outputCount, routeLogic))
|
||||
|
||||
trait RouteLogicFactory[In] {
|
||||
def name: Option[String]
|
||||
def createRouteLogic(): FlexiRoute.RouteLogic[In]
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
379
akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala
Normal file
379
akka-stream/src/main/scala/akka/stream/javadsl/FlexiRoute.scala
Normal file
|
|
@ -0,0 +1,379 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import scala.annotation.varargs
|
||||
import akka.stream.scaladsl
|
||||
import scala.collection.immutable
|
||||
import java.util.{ List ⇒ JList }
|
||||
import akka.japi.Util.immutableIndexedSeq
|
||||
import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory
|
||||
|
||||
object FlexiRoute {
|
||||
|
||||
/**
|
||||
* @see [[OutputPort]]
|
||||
*/
|
||||
sealed trait OutputHandle extends scaladsl.FlexiRoute.OutputHandle
|
||||
|
||||
/**
|
||||
* An `OutputPort` can be connected to a [[Sink]] with the [[FlowGraphBuilder]].
|
||||
* The `OutputPort` is also an [[OutputHandle]] which you use to define to which
|
||||
* downstream output to emit an element.
|
||||
*/
|
||||
class OutputPort[In, Out] private[akka] (val port: Int, parent: FlexiRoute[In, _])
|
||||
extends JunctionOutPort[Out] with OutputHandle {
|
||||
|
||||
def handle: OutputHandle = this
|
||||
|
||||
override val asScala: scaladsl.JunctionOutPort[Out] = new scaladsl.JunctionOutPort[Out] {
|
||||
override def port: Int = OutputPort.this.port
|
||||
override def vertex = parent.vertex
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
override private[akka] def portIndex: Int = port
|
||||
|
||||
override def toString: String = s"OutputPort($port)"
|
||||
}
|
||||
|
||||
sealed trait DemandCondition
|
||||
|
||||
/**
|
||||
* Demand condition for the [[State]] that will be
|
||||
* fulfilled when there are requests for elements from one specific downstream
|
||||
* output.
|
||||
*
|
||||
* It is not allowed to use a handle that has been cancelled or
|
||||
* has been completed. `IllegalArgumentException` is thrown if
|
||||
* that is not obeyed.
|
||||
*/
|
||||
class DemandFrom(val output: OutputHandle) extends DemandCondition
|
||||
|
||||
/**
|
||||
* Demand condition for the [[State]] that will be
|
||||
* fulfilled when there are requests for elements from any of the given downstream
|
||||
* outputs.
|
||||
*
|
||||
* Cancelled and completed inputs are not used, i.e. it is allowed
|
||||
* to specify them in the list of `outputs`.
|
||||
*/
|
||||
class DemandFromAny(val outputs: JList[OutputHandle]) extends DemandCondition
|
||||
|
||||
/**
|
||||
* Demand condition for the [[State]] that will be
|
||||
* fulfilled when there are requests for elements from all of the given downstream
|
||||
* outputs.
|
||||
*
|
||||
* Cancelled and completed outputs are not used, i.e. it is allowed
|
||||
* to specify them in the list of `outputs`.
|
||||
*/
|
||||
class DemandFromAll(val outputs: JList[OutputHandle]) extends DemandCondition
|
||||
|
||||
/**
|
||||
* Context that is passed to the functions of [[State]] and [[CompletionHandling]].
|
||||
* The context provides means for performing side effects, such as emitting elements
|
||||
* downstream.
|
||||
*/
|
||||
trait RouteLogicContext[In, Out] {
|
||||
/**
|
||||
* @return `true` if at least one element has been requested by the given downstream (output).
|
||||
*/
|
||||
def isDemandAvailable(output: OutputHandle): Boolean
|
||||
|
||||
/**
|
||||
* Emit one element downstream. It is only allowed to `emit` when
|
||||
* [[#isDemandAvailable]] is `true` for the given `output`, otherwise
|
||||
* `IllegalArgumentException` is thrown.
|
||||
*/
|
||||
def emit(output: OutputHandle, elem: Out): Unit
|
||||
|
||||
/**
|
||||
* Complete the given downstream successfully.
|
||||
*/
|
||||
def complete(output: OutputHandle): Unit
|
||||
|
||||
/**
|
||||
* Complete all downstreams successfully and cancel upstream.
|
||||
*/
|
||||
def complete(): Unit
|
||||
|
||||
/**
|
||||
* Complete the given downstream with failure.
|
||||
*/
|
||||
def error(output: OutputHandle, cause: Throwable): Unit
|
||||
|
||||
/**
|
||||
* Complete all downstreams with failure and cancel upstream.
|
||||
*/
|
||||
def error(cause: Throwable): Unit
|
||||
|
||||
/**
|
||||
* Replace current [[CompletionHandling]].
|
||||
*/
|
||||
def changeCompletionHandling(completion: CompletionHandling[In]): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* How to handle completion or error from upstream input and how to
|
||||
* handle cancel from downstream output.
|
||||
*
|
||||
* The `onComplete` method is called the upstream input was completed successfully.
|
||||
* It returns next behavior or [[#SameState]] to keep current behavior.
|
||||
*
|
||||
* The `onError` method is called when the upstream input was completed with failure.
|
||||
* It returns next behavior or [[#SameState]] to keep current behavior.
|
||||
*
|
||||
* The `onCancel` method is called when a downstream output cancels.
|
||||
* It returns next behavior or [[#SameState]] to keep current behavior.
|
||||
*/
|
||||
abstract class CompletionHandling[In] {
|
||||
def onComplete(ctx: RouteLogicContext[In, Any]): Unit
|
||||
def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit
|
||||
def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _]
|
||||
}
|
||||
|
||||
/**
|
||||
* Definition of which outputs that must have requested elements and how to act
|
||||
* on the read elements. When an element has been read [[#onInput]] is called and
|
||||
* then it is ensured that the specified downstream outputs have requested at least
|
||||
* one element, i.e. it is allowed to emit at least one element downstream with
|
||||
* [[RouteLogicContext#emit]].
|
||||
*
|
||||
* The `onInput` method is called when an `element` was read from upstream.
|
||||
* The function returns next behavior or [[#SameState]] to keep current behavior.
|
||||
*/
|
||||
abstract class State[In, Out](val condition: DemandCondition) {
|
||||
def onInput(ctx: RouteLogicContext[In, Out], preferredOutput: OutputHandle, element: In): State[In, _]
|
||||
}
|
||||
|
||||
/**
|
||||
* The possibly stateful logic that reads from the input and enables emitting to downstream
|
||||
* via the defined [[State]]. Handles completion, error and cancel via the defined
|
||||
* [[CompletionHandling]].
|
||||
*
|
||||
* Concrete instance is supposed to be created by implementing [[FlexiRoute#createRouteLogic]].
|
||||
*/
|
||||
abstract class RouteLogic[In, Out] {
|
||||
def outputHandles(outputCount: Int): JList[OutputHandle]
|
||||
def initialState: State[In, Out]
|
||||
def initialCompletionHandling: CompletionHandling[In] = defaultCompletionHandling
|
||||
|
||||
/**
|
||||
* Return this from [[State]] `onInput` to use same state for next element.
|
||||
*/
|
||||
def sameState[A]: State[In, A] = FlexiRoute.sameStateInstance.asInstanceOf[State[In, A]]
|
||||
|
||||
/**
|
||||
* Convenience to create a [[DemandFromAny]] condition.
|
||||
*/
|
||||
@varargs def demandFromAny(outputs: OutputHandle*): DemandFromAny = {
|
||||
import scala.collection.JavaConverters._
|
||||
new DemandFromAny(outputs.asJava)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience to create a [[DemandFromAll]] condition.
|
||||
*/
|
||||
@varargs def demandFromAll(outputs: OutputHandle*): DemandFromAll = {
|
||||
import scala.collection.JavaConverters._
|
||||
new DemandFromAll(outputs.asJava)
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience to create a [[DemandFrom]] condition.
|
||||
*/
|
||||
def demandFrom(output: OutputHandle): DemandFrom = new DemandFrom(output)
|
||||
|
||||
/**
|
||||
* When an output cancels it continues with remaining outputs.
|
||||
* Error or completion from upstream are immediately propagated.
|
||||
*/
|
||||
def defaultCompletionHandling: CompletionHandling[In] =
|
||||
new CompletionHandling[In] {
|
||||
override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = ()
|
||||
override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = ()
|
||||
override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] =
|
||||
sameState
|
||||
}
|
||||
|
||||
/**
|
||||
* Completes as soon as any output cancels.
|
||||
* Error or completion from upstream are immediately propagated.
|
||||
*/
|
||||
def eagerClose[A]: CompletionHandling[In] =
|
||||
new CompletionHandling[In] {
|
||||
override def onComplete(ctx: RouteLogicContext[In, Any]): Unit = ()
|
||||
override def onError(ctx: RouteLogicContext[In, Any], cause: Throwable): Unit = ()
|
||||
override def onCancel(ctx: RouteLogicContext[In, Any], output: OutputHandle): State[In, _] = {
|
||||
ctx.complete()
|
||||
sameState
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private val sameStateInstance = new State[Any, Any](new DemandFromAny(java.util.Collections.emptyList[OutputHandle])) {
|
||||
override def onInput(ctx: RouteLogicContext[Any, Any], output: OutputHandle, element: Any): State[Any, Any] =
|
||||
throw new UnsupportedOperationException("SameState.onInput should not be called")
|
||||
|
||||
override def toString: String = "SameState"
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object Internal {
|
||||
class RouteLogicWrapper[In](delegate: RouteLogic[In, _]) extends scaladsl.FlexiRoute.RouteLogic[In] {
|
||||
override def outputHandles(outputCount: Int): immutable.IndexedSeq[scaladsl.FlexiRoute.OutputHandle] =
|
||||
immutableIndexedSeq(delegate.outputHandles(outputCount))
|
||||
|
||||
override def initialState: this.State[_] = wrapState(delegate.initialState)
|
||||
|
||||
override def initialCompletionHandling: this.CompletionHandling =
|
||||
wrapCompletionHandling(delegate.initialCompletionHandling)
|
||||
|
||||
private def wrapState[Out](delegateState: FlexiRoute.State[In, Out]): State[Out] =
|
||||
if (sameStateInstance == delegateState)
|
||||
SameState
|
||||
else
|
||||
State(convertDemandCondition(delegateState.condition)) { (ctx, outputHandle, elem) ⇒
|
||||
val newDelegateState =
|
||||
delegateState.onInput(new RouteLogicContextWrapper(ctx), asJava(outputHandle), elem)
|
||||
wrapState(newDelegateState)
|
||||
}
|
||||
|
||||
private def wrapCompletionHandling[Out](
|
||||
delegateCompletionHandling: FlexiRoute.CompletionHandling[In]): CompletionHandling =
|
||||
CompletionHandling(
|
||||
onComplete = ctx ⇒ {
|
||||
delegateCompletionHandling.onComplete(new RouteLogicContextWrapper(ctx))
|
||||
},
|
||||
onError = (ctx, cause) ⇒ {
|
||||
delegateCompletionHandling.onError(new RouteLogicContextWrapper(ctx), cause)
|
||||
},
|
||||
onCancel = (ctx, outputHandle) ⇒ {
|
||||
val newDelegateState = delegateCompletionHandling.onCancel(
|
||||
new RouteLogicContextWrapper(ctx), asJava(outputHandle))
|
||||
wrapState(newDelegateState)
|
||||
})
|
||||
|
||||
private def asJava(outputHandle: scaladsl.FlexiRoute.OutputHandle): OutputHandle =
|
||||
outputHandle.asInstanceOf[OutputHandle]
|
||||
|
||||
class RouteLogicContextWrapper[Out](delegate: RouteLogicContext[Out]) extends FlexiRoute.RouteLogicContext[In, Out] {
|
||||
override def isDemandAvailable(output: OutputHandle): Boolean = delegate.isDemandAvailable(output)
|
||||
override def emit(output: OutputHandle, elem: Out): Unit = delegate.emit(output, elem)
|
||||
override def complete(): Unit = delegate.complete()
|
||||
override def complete(output: OutputHandle): Unit = delegate.complete(output)
|
||||
override def error(cause: Throwable): Unit = delegate.error(cause)
|
||||
override def error(output: OutputHandle, cause: Throwable): Unit = delegate.error(output, cause)
|
||||
override def changeCompletionHandling(completion: FlexiRoute.CompletionHandling[In]): Unit =
|
||||
delegate.changeCompletionHandling(wrapCompletionHandling(completion))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
def convertDemandCondition(condition: DemandCondition): scaladsl.FlexiRoute.DemandCondition =
|
||||
condition match {
|
||||
case c: DemandFromAny ⇒ scaladsl.FlexiRoute.DemandFromAny(immutableIndexedSeq(c.outputs))
|
||||
case c: DemandFromAll ⇒ scaladsl.FlexiRoute.DemandFromAll(immutableIndexedSeq(c.outputs))
|
||||
case c: DemandFrom ⇒ scaladsl.FlexiRoute.DemandFrom(c.output)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for implementing custom route junctions.
|
||||
* Such a junction always has one [[#in]] port and one or more output ports.
|
||||
* The output ports are to be defined in the concrete subclass and are created with
|
||||
* [[#createOutputPort]].
|
||||
*
|
||||
* The concrete subclass must implement [[#createRouteLogic]] to define the [[FlexiRoute#RouteLogic]]
|
||||
* that will be used when reading input elements and emitting output elements.
|
||||
* The [[FlexiRoute#RouteLogic]] instance may be stateful, but the ``FlexiRoute`` instance
|
||||
* must not hold mutable state, since it may be shared across several materialized ``FlowGraph``
|
||||
* instances.
|
||||
*
|
||||
* Note that a `FlexiRoute` with a specific name can only be used at one place (one vertex)
|
||||
* in the `FlowGraph`. If the `name` is not specified the `FlexiRoute` instance can only
|
||||
* be used at one place (one vertex) in the `FlowGraph`.
|
||||
*
|
||||
* @param name optional name of the junction in the [[FlowGraph]],
|
||||
*/
|
||||
abstract class FlexiRoute[In, Out](val name: Option[String]) {
|
||||
import FlexiRoute._
|
||||
import scaladsl.FlowGraphInternal
|
||||
import akka.stream.impl.Ast
|
||||
|
||||
def this() = this(None)
|
||||
def this(name: String) = this(Option(name))
|
||||
|
||||
private var outputCount = 0
|
||||
|
||||
// hide the internal vertex things from subclass, and make it possible to create new instance
|
||||
private class RouteVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex {
|
||||
override def minimumInputCount = 1
|
||||
override def maximumInputCount = 1
|
||||
override def minimumOutputCount = 2
|
||||
override def maximumOutputCount = outputCount
|
||||
|
||||
override private[akka] val astNode = {
|
||||
val factory = new RouteLogicFactory[Any] {
|
||||
override def name: Option[String] = vertexName
|
||||
override def createRouteLogic(): scaladsl.FlexiRoute.RouteLogic[Any] =
|
||||
new Internal.RouteLogicWrapper(FlexiRoute.this.createRouteLogic().asInstanceOf[RouteLogic[Any, Any]])
|
||||
}
|
||||
Ast.FlexiRouteNode(factory)
|
||||
}
|
||||
|
||||
override def name = vertexName
|
||||
|
||||
final override def newInstance() = new RouteVertex(None)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] val vertex: FlowGraphInternal.InternalVertex = new RouteVertex(name)
|
||||
|
||||
/**
|
||||
* Input port of the `FlexiRoute` junction. A [[Source]] can be connected to this output
|
||||
* with the [[FlowGraphBuilder]].
|
||||
*/
|
||||
val in: JunctionInPort[In] = new JunctionInPort[In] {
|
||||
override val asScala: scaladsl.JunctionInPort[In] = new scaladsl.JunctionInPort[In] {
|
||||
override def vertex = FlexiRoute.this.vertex
|
||||
type NextT = Nothing
|
||||
override def next = scaladsl.NoNext
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Concrete subclass is supposed to define one or more output ports and
|
||||
* they are created by calling this method. Each [[FlexiRoute.OutputPort]] can be
|
||||
* connected to a [[Sink]] with the [[FlowGraphBuilder]].
|
||||
* The `OutputPort` is also an [[FlexiRoute.OutputHandle]] which you use to define to which
|
||||
* downstream output to emit an element.
|
||||
*/
|
||||
protected final def createOutputPort[T](): OutputPort[In, T] = {
|
||||
val port = outputCount
|
||||
outputCount += 1
|
||||
new OutputPort(port, parent = this)
|
||||
}
|
||||
|
||||
/**
|
||||
* Create the stateful logic that will be used when reading input elements
|
||||
* and emitting output elements. Create a new instance every time.
|
||||
*/
|
||||
def createRouteLogic(): RouteLogic[In, Out]
|
||||
|
||||
override def toString = name match {
|
||||
case Some(n) ⇒ n
|
||||
case None ⇒ getClass.getSimpleName + "@" + Integer.toHexString(super.hashCode())
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -5,13 +5,14 @@ package akka.stream.scaladsl
|
|||
|
||||
import scala.collection.immutable
|
||||
import akka.stream.impl.Ast
|
||||
import akka.stream.impl.FlexiRouteImpl.RouteLogicFactory
|
||||
|
||||
object FlexiRoute {
|
||||
|
||||
/**
|
||||
* @see [[OutputPort]]
|
||||
*/
|
||||
sealed trait OutputHandle {
|
||||
trait OutputHandle {
|
||||
private[akka] def portIndex: Int
|
||||
}
|
||||
|
||||
|
|
@ -51,7 +52,7 @@ object FlexiRoute {
|
|||
* fulfilled when there are requests for elements from any of the given downstream
|
||||
* outputs.
|
||||
*
|
||||
* Cancelled and completed inputs are not used, i.e. it is allowed
|
||||
* Cancelled and completed outputs are not used, i.e. it is allowed
|
||||
* to specify them in the list of `outputs`.
|
||||
*/
|
||||
final case class DemandFromAny(outputs: OutputHandle*) extends DemandCondition
|
||||
|
|
@ -64,7 +65,7 @@ object FlexiRoute {
|
|||
* fulfilled when there are requests for elements from all of the given downstream
|
||||
* outputs.
|
||||
*
|
||||
* Cancelled and completed inputs are not used, i.e. it is allowed
|
||||
* Cancelled and completed outputs are not used, i.e. it is allowed
|
||||
* to specify them in the list of `outputs`.
|
||||
*/
|
||||
final case class DemandFromAll(outputs: OutputHandle*) extends DemandCondition
|
||||
|
|
@ -210,7 +211,7 @@ object FlexiRoute {
|
|||
*
|
||||
* @param name optional name of the junction in the [[FlowGraph]],
|
||||
*/
|
||||
abstract class FlexiRoute[In](val name: Option[String]) {
|
||||
abstract class FlexiRoute[In](val name: Option[String]) extends RouteLogicFactory[In] {
|
||||
import FlexiRoute._
|
||||
|
||||
def this(name: String) = this(Some(name))
|
||||
|
|
@ -225,7 +226,7 @@ abstract class FlexiRoute[In](val name: Option[String]) {
|
|||
override def minimumOutputCount = 2
|
||||
override def maximumOutputCount = outputCount
|
||||
|
||||
override private[akka] val astNode = Ast.RouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]])
|
||||
override private[akka] val astNode = Ast.FlexiRouteNode(FlexiRoute.this.asInstanceOf[FlexiRoute[Any]])
|
||||
override def name = vertexName
|
||||
|
||||
final override private[scaladsl] def newInstance() = new RouteVertex(None)
|
||||
|
|
@ -260,7 +261,7 @@ abstract class FlexiRoute[In](val name: Option[String]) {
|
|||
* Create the stateful logic that will be used when reading input elements
|
||||
* and emitting output elements. Create a new instance every time.
|
||||
*/
|
||||
def createRouteLogic(): RouteLogic[In]
|
||||
override def createRouteLogic(): RouteLogic[In]
|
||||
|
||||
override def toString = name match {
|
||||
case Some(n) ⇒ n
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue