+str #16105 Java API for FlexiMerge

This commit is contained in:
Patrik Nordwall 2014-11-04 14:02:35 +01:00
parent 3d134af562
commit 30ee5539b8
5 changed files with 635 additions and 6 deletions

View file

@ -0,0 +1,276 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl;
import java.util.Arrays;
import java.util.List;
import java.util.HashSet;
import org.junit.ClassRule;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import java.util.concurrent.TimeUnit;
import org.reactivestreams.Publisher;
import akka.actor.ActorSystem;
import akka.stream.FlowMaterializer;
import akka.stream.testkit.AkkaSpec;
import akka.stream.javadsl.FlexiMerge;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import akka.japi.Pair;
public class FlexiMergeTest {
@ClassRule
public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlexiMergeTest",
AkkaSpec.testConf());
final ActorSystem system = actorSystemResource.getSystem();
final FlowMaterializer materializer = FlowMaterializer.create(system);
final Source<String> in1 = Source.from(Arrays.asList("a", "b", "c", "d"));
final Source<String> in2 = Source.from(Arrays.asList("e", "f"));
final KeyedSink<String, Publisher<String>> out1 = Sink.publisher();
@Test
public void mustBuildSimpleFairMerge() throws Exception {
Fair<String> merge = new Fair<String>();
MaterializedMap m = FlowGraph.builder().addEdge(in1, merge.input1()).addEdge(in2, merge.input2())
.addEdge(merge.out(), out1).build().run(materializer);
final Publisher<String> pub = m.get(out1);
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>future(), materializer);
final List<String> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS));
assertEquals(
new HashSet<String>(Arrays.asList("a", "b", "c", "d", "e", "f")),
new HashSet<String>(result));
}
@Test
public void mustBuildSimpleRoundRobinMerge() throws Exception {
StrictRoundRobin<String> merge = new StrictRoundRobin<String>();
MaterializedMap m = FlowGraph.builder().addEdge(in1, merge.input1()).addEdge(in2, merge.input2())
.addEdge(merge.out(), out1).build().run(materializer);
final Publisher<String> pub = m.get(out1);
final Future<List<String>> all = Source.from(pub).grouped(100).runWith(Sink.<List<String>>future(), materializer);
final List<String> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS));
assertEquals(Arrays.asList("a", "e", "b", "f", "c", "d"), result);
}
@Test
public void mustBuildSimpleZip() throws Exception {
Zip<Integer, String> zip = new Zip<Integer, String>();
Source<Integer> inA = Source.from(Arrays.asList(1, 2, 3, 4));
Source<String> inB = Source.from(Arrays.asList("a", "b", "c"));
KeyedSink<Pair<Integer, String>, Publisher<Pair<Integer, String>>> out = Sink.publisher();
MaterializedMap m = FlowGraph.builder().addEdge(inA, zip.inputA).addEdge(inB, zip.inputB)
.addEdge(zip.out(), out).build().run(materializer);
final Publisher<Pair<Integer, String>> pub = m.get(out);
final Future<List<Pair<Integer, String>>> all = Source.from(pub).grouped(100).
runWith(Sink.<List<Pair<Integer, String>>>future(), materializer);
final List<Pair<Integer, String>> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS));
assertEquals(
Arrays.asList(new Pair(1, "a"), new Pair(2, "b"), new Pair(3, "c")),
result);
}
/**
* This is fair in that sense that after dequeueing from an input it yields to
* other inputs if they are available. Or in other words, if all inputs have
* elements available at the same time then in finite steps all those elements
* are dequeued from them.
*/
static public class Fair<T> extends FlexiMerge<T, T> {
private final InputPort<T, T> input1 = createInputPort();
private final InputPort<T, T> input2 = createInputPort();
public Fair() {
super("fairMerge");
}
public InputPort<T, T> input1() {
return input1;
}
public InputPort<T, T> input2() {
return input2;
}
@Override
public MergeLogic<T, T> createMergeLogic() {
return new MergeLogic<T, T>() {
@Override
public List<InputHandle> inputHandles(int inputCount) {
return Arrays.asList(input1.handle(), input2.handle());
}
@Override
public State<T, T> initialState() {
return new State<T, T>(readAny(input1, input2)) {
@Override
public State<T, T> onInput(MergeLogicContext<T> ctx, InputHandle inputHandle, T element) {
ctx.emit(element);
return sameState();
}
};
}
};
}
}
/**
* It never skips an input while cycling but waits on it instead (closed
* inputs are skipped though). The fair merge above is a non-strict
* round-robin (skips currently unavailable inputs).
*/
static public class StrictRoundRobin<T> extends FlexiMerge<T, T> {
private final InputPort<T, T> input1 = createInputPort();
private final InputPort<T, T> input2 = createInputPort();
public StrictRoundRobin() {
super("roundRobinMerge");
}
public InputPort<T, T> input1() {
return input1;
}
public InputPort<T, T> input2() {
return input2;
}
@Override
public MergeLogic<T, T> createMergeLogic() {
return new MergeLogic<T, T>() {
@Override
public List<InputHandle> inputHandles(int inputCount) {
return Arrays.asList(input1.handle(), input2.handle());
}
private final CompletionHandling<T> emitOtherOnClose = new CompletionHandling<T>() {
@Override
public State<T, T> onComplete(MergeLogicContext<T> ctx, InputHandle input) {
ctx.changeCompletionHandling(defaultCompletionHandling());
return readRemaining(other(input));
}
@Override
public State<T, T> onError(MergeLogicContext<T> ctx, InputHandle inputHandle, Throwable cause) {
ctx.error(cause);
return sameState();
}
};
private InputHandle other(InputHandle input) {
if (input == input1)
return input2;
else
return input1;
}
private final State<T, T> read1 = new State<T, T>(read(input1)) {
@Override
public State<T, T> onInput(MergeLogicContext<T> ctx, InputHandle inputHandle, T element) {
ctx.emit(element);
return read2;
}
};
private final State<T, T> read2 = new State<T, T>(read(input2)) {
@Override
public State<T, T> onInput(MergeLogicContext<T> ctx, InputHandle inputHandle, T element) {
ctx.emit(element);
return read1;
}
};
private State<T, T> readRemaining(InputHandle input) {
return new State<T, T>(read(input)) {
@Override
public State<T, T> onInput(MergeLogicContext<T> ctx, InputHandle inputHandle, T element) {
ctx.emit(element);
return sameState();
}
};
}
@Override
public State<T, T> initialState() {
return read1;
}
@Override
public CompletionHandling<T> initialCompletionHandling() {
return emitOtherOnClose;
}
};
}
}
static public class Zip<A, B> extends FlexiMerge<A, Pair<A, B>> {
public final InputPort<A, Pair<A, B>> inputA = createInputPort();
public final InputPort<B, Pair<A, B>> inputB = createInputPort();
public Zip() {
super("zip");
}
@Override
public MergeLogic<A, Pair<A, B>> createMergeLogic() {
return new MergeLogic<A, Pair<A, B>>() {
private A lastInA = null;
@Override
public List<InputHandle> inputHandles(int inputCount) {
if(inputCount != 2)
throw new IllegalArgumentException("Zip must have two connected inputs, was " + inputCount);
return Arrays.asList(inputA.handle(), inputB.handle());
}
private final State<A, Pair<A, B>> readA = new State<A, Pair<A, B>>(read(inputA)) {
@Override
public State<B, Pair<A, B>> onInput(MergeLogicContext<Pair<A, B>> ctx, InputHandle inputHandle, A element) {
lastInA = element;
return readB;
}
};
private final State<B, Pair<A, B>> readB = new State<B, Pair<A, B>>(read(inputB)) {
@Override
public State<A, Pair<A, B>> onInput(MergeLogicContext<Pair<A, B>> ctx, InputHandle inputHandle, B element) {
ctx.emit(new Pair<A, B>(lastInA, element));
return readA;
}
};
@Override
public State<A, Pair<A, B>> initialState() {
return readA;
}
@Override
public CompletionHandling<Pair<A, B>> initialCompletionHandling() {
return eagerClose();
}
};
}
}
}

View file

@ -109,8 +109,9 @@ private[akka] object Ast {
override def name = "concat"
}
case class FlexiMergeNode(merger: FlexiMerge[Any]) extends FanInAstNode {
override def name = merger.name.getOrElse("flexMerge")
case class FlexiMergeNode(merger: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode {
override def name = merger.name.getOrElse("flexiMerge")
}
case class RouteNode(route: FlexiRoute[Any]) extends FanOutAstNode {

View file

@ -14,6 +14,11 @@ import akka.actor.Props
private[akka] object FlexiMergeImpl {
def props(settings: MaterializerSettings, inputCount: Int, mergeLogic: FlexiMerge.MergeLogic[Any]): Props =
Props(new FlexiMergeImpl(settings, inputCount, mergeLogic))
trait MergeLogicFactory[Out] {
def name: Option[String]
def createMergeLogic(): FlexiMerge.MergeLogic[Out]
}
}
/**
@ -128,4 +133,4 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings,
if (inputBunch.isDepleted(inputHandle.portIndex))
changeBehavior(completion.onComplete(ctx, inputHandle))
}
}

View file

@ -0,0 +1,346 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import scala.annotation.varargs
import akka.stream.scaladsl
import scala.collection.immutable
import java.util.{ List JList }
import akka.japi.Util.immutableIndexedSeq
import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory
object FlexiMerge {
/**
* @see [[InputPort]]
*/
sealed trait InputHandle extends scaladsl.FlexiMerge.InputHandle
/**
* An `InputPort` can be connected to a [[Source]] with the [[FlowGraphBuilder]].
* The `InputPort` is also an [[InputHandle]], which is passed as parameter
* to [[State]] `onInput` when an input element has been read so that you
* can know exactly from which input the element was read.
*/
class InputPort[In, Out] private[akka] (val port: Int, parent: FlexiMerge[_, Out])
extends JunctionInPort[In] with InputHandle {
def handle: InputHandle = this
override val asScala: scaladsl.JunctionInPort[In] = new scaladsl.JunctionInPort[In] {
override def port: Int = InputPort.this.port
override def vertex = parent.vertex
type NextT = Nothing
override def next = scaladsl.NoNext
}
/**
* INTERNAL API
*/
override private[akka] def portIndex: Int = port
override def toString: String = s"InputPort($port)"
}
sealed trait ReadCondition
/**
* Read condition for the [[State]] that will be
* fulfilled when there are elements for one specific upstream
* input.
*
* It is not allowed to use a handle that has been cancelled or
* has been completed. `IllegalArgumentException` is thrown if
* that is not obeyed.
*/
class Read(val input: InputHandle) extends ReadCondition
/**
* Read condition for the [[State]] that will be
* fulfilled when there are elements for any of the given upstream
* inputs.
*
* Cancelled and completed inputs are not used, i.e. it is allowed
* to specify them in the list of `inputs`.
*/
class ReadAny(val inputs: JList[InputHandle]) extends ReadCondition
/**
* Context that is passed to the methods of [[State]] and [[CompletionHandling]].
* The context provides means for performing side effects, such as emitting elements
* downstream.
*/
trait MergeLogicContext[Out] {
/**
* @return `true` if at least one element has been requested by downstream (output).
*/
def isDemandAvailable: Boolean
/**
* Emit one element downstream. It is only allowed to `emit` when
* [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException`
* is thrown.
*/
def emit(elem: Out): Unit
/**
* Complete this stream succesfully. Upstream subscriptions will be cancelled.
*/
def complete(): Unit
/**
* Complete this stream with failure. Upstream subscriptions will be cancelled.
*/
def error(cause: Throwable): Unit
/**
* Cancel a specific upstream input stream.
*/
def cancel(input: InputHandle): Unit
/**
* Replace current [[CompletionHandling]].
*/
def changeCompletionHandling(completion: CompletionHandling[Out]): Unit
}
/**
* How to handle completion or error from upstream input.
*
* The `onComplete` method is called when an upstream input was completed sucessfully.
* It returns next behavior or [[#SameState]] to keep current behavior.
* A completion can be propagated downstream with [[MergeLogicContext#complete]],
* or it can be swallowed to continue with remaining inputs.
*
* The `onError` method is called when an upstream input was completed sucessfully.
* It returns next behavior or [[#SameState]] to keep current behavior.
* An error can be propagated downstream with [[MergeLogicContext#error]],
* or it can be swallowed to continue with remaining inputs.
*/
abstract class CompletionHandling[Out] {
def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out]
def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out]
}
/**
* Definition of which inputs to read from and how to act on the read elements.
* When an element has been read [[#onInput]] is called and then it is ensured
* that downstream has requested at least one element, i.e. it is allowed to
* emit at least one element downstream with [[MergeLogicContext#emit]].
*
* The `onInput` method is called when an `element` was read from the `input`.
* The method returns next behavior or [[#SameState]] to keep current behavior.
*/
abstract class State[In, Out](val condition: ReadCondition) {
def onInput(ctx: MergeLogicContext[Out], input: InputHandle, element: In): State[_, Out]
}
/**
* The possibly stateful logic that reads from input via the defined [[State]] and
* handles completion and error via the defined [[CompletionHandling]].
*
* Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]].
*/
abstract class MergeLogic[In, Out] {
def inputHandles(inputCount: Int): JList[InputHandle]
def initialState: State[In, Out]
def initialCompletionHandling: CompletionHandling[Out] = defaultCompletionHandling
/**
* Return this from [[State]] `onInput` to use same state for next element.
*/
def sameState[A]: State[A, Out] = FlexiMerge.sameStateInstance.asInstanceOf[State[A, Out]]
/**
* Convenience to create a [[ReadAny]] condition.
*/
@varargs def readAny(inputs: InputHandle*): ReadAny = {
import scala.collection.JavaConverters._
new ReadAny(inputs.asJava)
}
/**
* Convenience to create a [[Read]] condition.
*/
def read(input: InputHandle): Read = new Read(input)
/**
* Will continue to operate until a read becomes unsatisfiable, then it completes.
* Errors are immediately propagated.
*/
def defaultCompletionHandling[A]: CompletionHandling[Out] =
new CompletionHandling[Out] {
override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] =
sameState
override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.error(cause)
sameState
}
}
/**
* Completes as soon as any input completes.
* Errors are immediately propagated.
*/
def eagerClose[A]: CompletionHandling[Out] =
new CompletionHandling[Out] {
override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = {
ctx.complete()
sameState
}
override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = {
ctx.error(cause)
sameState
}
}
}
private val sameStateInstance = new State[Any, Any](new ReadAny(java.util.Collections.emptyList[InputHandle])) {
override def onInput(ctx: MergeLogicContext[Any], input: InputHandle, element: Any): State[Any, Any] =
throw new UnsupportedOperationException("SameState.onInput should not be called")
override def toString: String = "SameState"
}
/**
* INTERNAL API
*/
private[akka] object Internal {
class MergeLogicWrapper[Out](delegate: MergeLogic[_, Out]) extends scaladsl.FlexiMerge.MergeLogic[Out] {
override def inputHandles(inputCount: Int): immutable.IndexedSeq[scaladsl.FlexiMerge.InputHandle] =
immutableIndexedSeq(delegate.inputHandles(inputCount))
override def initialState: this.State[_] = wrapState(delegate.initialState)
override def initialCompletionHandling: this.CompletionHandling =
wrapCompletionHandling(delegate.initialCompletionHandling)
private def wrapState[In](delegateState: FlexiMerge.State[In, Out]): State[In] =
if (sameStateInstance == delegateState)
SameState
else
State(convertReadCondition(delegateState.condition)) { (ctx, inputHandle, elem)
val newDelegateState =
delegateState.onInput(new MergeLogicContextWrapper(ctx), asJava(inputHandle), elem)
wrapState(newDelegateState)
}
private def wrapCompletionHandling(
delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling =
CompletionHandling(
onComplete = (ctx, inputHandle) {
val newDelegateState = delegateCompletionHandling.onComplete(
new MergeLogicContextWrapper(ctx), asJava(inputHandle))
wrapState(newDelegateState)
},
onError = (ctx, inputHandle, cause) {
val newDelegateState = delegateCompletionHandling.onError(
new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause)
wrapState(newDelegateState)
})
private def asJava(inputHandle: scaladsl.FlexiMerge.InputHandle): InputHandle =
inputHandle.asInstanceOf[InputHandle]
class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] {
override def isDemandAvailable: Boolean = delegate.isDemandAvailable
override def emit(elem: Out): Unit = delegate.emit(elem)
override def complete(): Unit = delegate.complete()
override def error(cause: Throwable): Unit = delegate.error(cause)
override def cancel(input: InputHandle): Unit = delegate.cancel(input)
override def changeCompletionHandling(completion: FlexiMerge.CompletionHandling[Out]): Unit =
delegate.changeCompletionHandling(wrapCompletionHandling(completion))
}
}
def convertReadCondition(condition: ReadCondition): scaladsl.FlexiMerge.ReadCondition =
condition match {
case r: ReadAny scaladsl.FlexiMerge.ReadAny(immutableIndexedSeq(r.inputs))
case r: Read scaladsl.FlexiMerge.Read(r.input)
}
}
}
/**
* Base class for implementing custom merge junctions.
* Such a junction always has one [[#out]] port and one or more input ports.
* The input ports are to be defined in the concrete subclass and are created with
* [[#createInputPort]].
*
* The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]]
* that will be used when reading input elements and emitting output elements.
* The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance
* must not hold mutable state, since it may be shared across several materialized ``FlowGraph``
* instances.
*
* Note that a `FlexiMerge` with a specific name can only be used at one place (one vertex)
* in the `FlowGraph`. If the `name` is not specified the `FlexiMerge` instance can only
* be used at one place (one vertex) in the `FlowGraph`.
*
* @param name optional name of the junction in the [[FlowGraph]],
*/
abstract class FlexiMerge[In, Out](val name: Option[String]) {
import FlexiMerge._
import scaladsl.FlowGraphInternal
import akka.stream.impl.Ast
def this() = this(None)
def this(name: String) = this(Option(name))
private var inputCount = 0
def createMergeLogic(): MergeLogic[In, Out]
// hide the internal vertex things from subclass, and make it possible to create new instance
private class FlexiMergeVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex {
override def minimumInputCount = 2
override def maximumInputCount = inputCount
override def minimumOutputCount = 1
override def maximumOutputCount = 1
override private[akka] val astNode = {
val factory = new MergeLogicFactory[Any] {
override def name: Option[String] = vertexName
override def createMergeLogic(): scaladsl.FlexiMerge.MergeLogic[Any] =
new Internal.MergeLogicWrapper(FlexiMerge.this.createMergeLogic().asInstanceOf[MergeLogic[Any, Any]])
}
Ast.FlexiMergeNode(factory)
}
override def name = vertexName
final override def newInstance() = new FlexiMergeVertex(None)
}
/**
* INTERNAL API
*/
private[akka] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(name)
/**
* Output port of the `FlexiMerge` junction. A [[Sink]] can be connected to this output
* with the [[FlowGraphBuilder]].
*/
val out: JunctionOutPort[Out] = new JunctionOutPort[Out] {
override val asScala: scaladsl.JunctionOutPort[Out] = new scaladsl.JunctionOutPort[Out] {
override def vertex: FlowGraphInternal.Vertex = FlexiMerge.this.vertex
}
}
/**
* Concrete subclass is supposed to define one or more input ports and
* they are created by calling this method. Each [[FlexiMerge.InputPort]] can be
* connected to a [[Source]] with the [[FlowGraphBuilder]].
* The `InputPort` is also an [[FlexiMerge.InputHandle]], which is passed as parameter
* to [[FlexiMerge#State]] `onInput` when an input element has been read so that you
* can know exactly from which input the element was read.
*/
protected final def createInputPort[T](): InputPort[T, Out] = {
val port = inputCount
inputCount += 1
new InputPort(port, parent = this)
}
}

View file

@ -5,13 +5,14 @@ package akka.stream.scaladsl
import scala.collection.immutable
import akka.stream.impl.Ast
import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory
object FlexiMerge {
/**
* @see [[InputPort]]
*/
sealed trait InputHandle {
trait InputHandle {
private[akka] def portIndex: Int
}
@ -187,7 +188,7 @@ object FlexiMerge {
*
* @param name optional name of the junction in the [[FlowGraph]],
*/
abstract class FlexiMerge[Out](val name: Option[String]) {
abstract class FlexiMerge[Out](val name: Option[String]) extends MergeLogicFactory[Out] {
import FlexiMerge._
def this(name: String) = this(Some(name))
@ -236,7 +237,7 @@ abstract class FlexiMerge[Out](val name: Option[String]) {
* Create the stateful logic that will be used when reading input elements
* and emitting output elements. Create a new instance every time.
*/
def createMergeLogic(): MergeLogic[Out]
override def createMergeLogic(): MergeLogic[Out]
override def toString = name match {
case Some(n) n