diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java new file mode 100644 index 0000000000..cbe9eeac8c --- /dev/null +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlexiMergeTest.java @@ -0,0 +1,276 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl; + +import java.util.Arrays; +import java.util.List; +import java.util.HashSet; +import org.junit.ClassRule; +import org.junit.Test; + +import static org.junit.Assert.assertEquals; + +import java.util.concurrent.TimeUnit; +import org.reactivestreams.Publisher; +import akka.actor.ActorSystem; +import akka.stream.FlowMaterializer; +import akka.stream.testkit.AkkaSpec; +import akka.stream.javadsl.FlexiMerge; +import scala.concurrent.Await; +import scala.concurrent.Future; +import scala.concurrent.duration.Duration; +import akka.japi.Pair; + +public class FlexiMergeTest { + + @ClassRule + public static AkkaJUnitActorSystemResource actorSystemResource = new AkkaJUnitActorSystemResource("FlexiMergeTest", + AkkaSpec.testConf()); + + final ActorSystem system = actorSystemResource.getSystem(); + + final FlowMaterializer materializer = FlowMaterializer.create(system); + + final Source in1 = Source.from(Arrays.asList("a", "b", "c", "d")); + final Source in2 = Source.from(Arrays.asList("e", "f")); + + final KeyedSink> out1 = Sink.publisher(); + + @Test + public void mustBuildSimpleFairMerge() throws Exception { + Fair merge = new Fair(); + + MaterializedMap m = FlowGraph.builder().addEdge(in1, merge.input1()).addEdge(in2, merge.input2()) + .addEdge(merge.out(), out1).build().run(materializer); + + final Publisher pub = m.get(out1); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>future(), materializer); + final List result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS)); + assertEquals( + new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), + new HashSet(result)); + } + + @Test + public void mustBuildSimpleRoundRobinMerge() throws Exception { + StrictRoundRobin merge = new StrictRoundRobin(); + + MaterializedMap m = FlowGraph.builder().addEdge(in1, merge.input1()).addEdge(in2, merge.input2()) + .addEdge(merge.out(), out1).build().run(materializer); + + final Publisher pub = m.get(out1); + final Future> all = Source.from(pub).grouped(100).runWith(Sink.>future(), materializer); + final List result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS)); + assertEquals(Arrays.asList("a", "e", "b", "f", "c", "d"), result); + } + + @Test + public void mustBuildSimpleZip() throws Exception { + Zip zip = new Zip(); + + Source inA = Source.from(Arrays.asList(1, 2, 3, 4)); + Source inB = Source.from(Arrays.asList("a", "b", "c")); + KeyedSink, Publisher>> out = Sink.publisher(); + + MaterializedMap m = FlowGraph.builder().addEdge(inA, zip.inputA).addEdge(inB, zip.inputB) + .addEdge(zip.out(), out).build().run(materializer); + + final Publisher> pub = m.get(out); + final Future>> all = Source.from(pub).grouped(100). + runWith(Sink.>>future(), materializer); + final List> result = Await.result(all, Duration.apply(3, TimeUnit.SECONDS)); + assertEquals( + Arrays.asList(new Pair(1, "a"), new Pair(2, "b"), new Pair(3, "c")), + result); + } + + /** + * This is fair in that sense that after dequeueing from an input it yields to + * other inputs if they are available. Or in other words, if all inputs have + * elements available at the same time then in finite steps all those elements + * are dequeued from them. + */ + static public class Fair extends FlexiMerge { + + private final InputPort input1 = createInputPort(); + private final InputPort input2 = createInputPort(); + + public Fair() { + super("fairMerge"); + } + + public InputPort input1() { + return input1; + } + + public InputPort input2() { + return input2; + } + + @Override + public MergeLogic createMergeLogic() { + return new MergeLogic() { + @Override + public List inputHandles(int inputCount) { + return Arrays.asList(input1.handle(), input2.handle()); + } + + @Override + public State initialState() { + return new State(readAny(input1, input2)) { + @Override + public State onInput(MergeLogicContext ctx, InputHandle inputHandle, T element) { + ctx.emit(element); + return sameState(); + } + }; + } + }; + } + } + + /** + * It never skips an input while cycling but waits on it instead (closed + * inputs are skipped though). The fair merge above is a non-strict + * round-robin (skips currently unavailable inputs). + */ + static public class StrictRoundRobin extends FlexiMerge { + + private final InputPort input1 = createInputPort(); + private final InputPort input2 = createInputPort(); + + public StrictRoundRobin() { + super("roundRobinMerge"); + } + + public InputPort input1() { + return input1; + } + + public InputPort input2() { + return input2; + } + + @Override + public MergeLogic createMergeLogic() { + return new MergeLogic() { + @Override + public List inputHandles(int inputCount) { + return Arrays.asList(input1.handle(), input2.handle()); + } + + private final CompletionHandling emitOtherOnClose = new CompletionHandling() { + @Override + public State onComplete(MergeLogicContext ctx, InputHandle input) { + ctx.changeCompletionHandling(defaultCompletionHandling()); + return readRemaining(other(input)); + } + + @Override + public State onError(MergeLogicContext ctx, InputHandle inputHandle, Throwable cause) { + ctx.error(cause); + return sameState(); + } + }; + + private InputHandle other(InputHandle input) { + if (input == input1) + return input2; + else + return input1; + } + + private final State read1 = new State(read(input1)) { + @Override + public State onInput(MergeLogicContext ctx, InputHandle inputHandle, T element) { + ctx.emit(element); + return read2; + } + }; + + private final State read2 = new State(read(input2)) { + @Override + public State onInput(MergeLogicContext ctx, InputHandle inputHandle, T element) { + ctx.emit(element); + return read1; + } + }; + + private State readRemaining(InputHandle input) { + return new State(read(input)) { + @Override + public State onInput(MergeLogicContext ctx, InputHandle inputHandle, T element) { + ctx.emit(element); + return sameState(); + } + }; + } + + @Override + public State initialState() { + return read1; + } + + @Override + public CompletionHandling initialCompletionHandling() { + return emitOtherOnClose; + } + + }; + } + } + + static public class Zip extends FlexiMerge> { + + public final InputPort> inputA = createInputPort(); + public final InputPort> inputB = createInputPort(); + + public Zip() { + super("zip"); + } + + @Override + public MergeLogic> createMergeLogic() { + return new MergeLogic>() { + + private A lastInA = null; + + @Override + public List inputHandles(int inputCount) { + if(inputCount != 2) + throw new IllegalArgumentException("Zip must have two connected inputs, was " + inputCount); + return Arrays.asList(inputA.handle(), inputB.handle()); + } + + private final State> readA = new State>(read(inputA)) { + @Override + public State> onInput(MergeLogicContext> ctx, InputHandle inputHandle, A element) { + lastInA = element; + return readB; + } + }; + + private final State> readB = new State>(read(inputB)) { + @Override + public State> onInput(MergeLogicContext> ctx, InputHandle inputHandle, B element) { + ctx.emit(new Pair(lastInA, element)); + return readA; + } + }; + + @Override + public State> initialState() { + return readA; + } + + @Override + public CompletionHandling> initialCompletionHandling() { + return eagerClose(); + } + + }; + } + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index a40a074c9a..55cec396e7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -109,8 +109,9 @@ private[akka] object Ast { override def name = "concat" } - case class FlexiMergeNode(merger: FlexiMerge[Any]) extends FanInAstNode { - override def name = merger.name.getOrElse("flexMerge") + case class FlexiMergeNode(merger: FlexiMergeImpl.MergeLogicFactory[Any]) extends FanInAstNode { + override def name = merger.name.getOrElse("flexiMerge") + } case class RouteNode(route: FlexiRoute[Any]) extends FanOutAstNode { diff --git a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala index e8bef7c87f..ab9972a259 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FlexiMergeImpl.scala @@ -14,6 +14,11 @@ import akka.actor.Props private[akka] object FlexiMergeImpl { def props(settings: MaterializerSettings, inputCount: Int, mergeLogic: FlexiMerge.MergeLogic[Any]): Props = Props(new FlexiMergeImpl(settings, inputCount, mergeLogic)) + + trait MergeLogicFactory[Out] { + def name: Option[String] + def createMergeLogic(): FlexiMerge.MergeLogic[Out] + } } /** @@ -128,4 +133,4 @@ private[akka] class FlexiMergeImpl(_settings: MaterializerSettings, if (inputBunch.isDepleted(inputHandle.portIndex)) changeBehavior(completion.onComplete(ctx, inputHandle)) -} \ No newline at end of file +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala new file mode 100644 index 0000000000..e51a231544 --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlexiMerge.scala @@ -0,0 +1,346 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl + +import scala.annotation.varargs +import akka.stream.scaladsl +import scala.collection.immutable +import java.util.{ List ⇒ JList } +import akka.japi.Util.immutableIndexedSeq +import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory + +object FlexiMerge { + + /** + * @see [[InputPort]] + */ + sealed trait InputHandle extends scaladsl.FlexiMerge.InputHandle + + /** + * An `InputPort` can be connected to a [[Source]] with the [[FlowGraphBuilder]]. + * The `InputPort` is also an [[InputHandle]], which is passed as parameter + * to [[State]] `onInput` when an input element has been read so that you + * can know exactly from which input the element was read. + */ + class InputPort[In, Out] private[akka] (val port: Int, parent: FlexiMerge[_, Out]) + extends JunctionInPort[In] with InputHandle { + + def handle: InputHandle = this + + override val asScala: scaladsl.JunctionInPort[In] = new scaladsl.JunctionInPort[In] { + override def port: Int = InputPort.this.port + override def vertex = parent.vertex + type NextT = Nothing + override def next = scaladsl.NoNext + } + + /** + * INTERNAL API + */ + override private[akka] def portIndex: Int = port + + override def toString: String = s"InputPort($port)" + } + + sealed trait ReadCondition + + /** + * Read condition for the [[State]] that will be + * fulfilled when there are elements for one specific upstream + * input. + * + * It is not allowed to use a handle that has been cancelled or + * has been completed. `IllegalArgumentException` is thrown if + * that is not obeyed. + */ + class Read(val input: InputHandle) extends ReadCondition + + /** + * Read condition for the [[State]] that will be + * fulfilled when there are elements for any of the given upstream + * inputs. + * + * Cancelled and completed inputs are not used, i.e. it is allowed + * to specify them in the list of `inputs`. + */ + class ReadAny(val inputs: JList[InputHandle]) extends ReadCondition + + /** + * Context that is passed to the methods of [[State]] and [[CompletionHandling]]. + * The context provides means for performing side effects, such as emitting elements + * downstream. + */ + trait MergeLogicContext[Out] { + /** + * @return `true` if at least one element has been requested by downstream (output). + */ + def isDemandAvailable: Boolean + + /** + * Emit one element downstream. It is only allowed to `emit` when + * [[#isDemandAvailable]] is `true`, otherwise `IllegalArgumentException` + * is thrown. + */ + def emit(elem: Out): Unit + + /** + * Complete this stream succesfully. Upstream subscriptions will be cancelled. + */ + def complete(): Unit + + /** + * Complete this stream with failure. Upstream subscriptions will be cancelled. + */ + def error(cause: Throwable): Unit + + /** + * Cancel a specific upstream input stream. + */ + def cancel(input: InputHandle): Unit + + /** + * Replace current [[CompletionHandling]]. + */ + def changeCompletionHandling(completion: CompletionHandling[Out]): Unit + } + + /** + * How to handle completion or error from upstream input. + * + * The `onComplete` method is called when an upstream input was completed sucessfully. + * It returns next behavior or [[#SameState]] to keep current behavior. + * A completion can be propagated downstream with [[MergeLogicContext#complete]], + * or it can be swallowed to continue with remaining inputs. + * + * The `onError` method is called when an upstream input was completed sucessfully. + * It returns next behavior or [[#SameState]] to keep current behavior. + * An error can be propagated downstream with [[MergeLogicContext#error]], + * or it can be swallowed to continue with remaining inputs. + */ + abstract class CompletionHandling[Out] { + def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[_, Out] + def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[_, Out] + } + + /** + * Definition of which inputs to read from and how to act on the read elements. + * When an element has been read [[#onInput]] is called and then it is ensured + * that downstream has requested at least one element, i.e. it is allowed to + * emit at least one element downstream with [[MergeLogicContext#emit]]. + * + * The `onInput` method is called when an `element` was read from the `input`. + * The method returns next behavior or [[#SameState]] to keep current behavior. + */ + abstract class State[In, Out](val condition: ReadCondition) { + def onInput(ctx: MergeLogicContext[Out], input: InputHandle, element: In): State[_, Out] + } + + /** + * The possibly stateful logic that reads from input via the defined [[State]] and + * handles completion and error via the defined [[CompletionHandling]]. + * + * Concrete instance is supposed to be created by implementing [[FlexiMerge#createMergeLogic]]. + */ + abstract class MergeLogic[In, Out] { + def inputHandles(inputCount: Int): JList[InputHandle] + def initialState: State[In, Out] + def initialCompletionHandling: CompletionHandling[Out] = defaultCompletionHandling + /** + * Return this from [[State]] `onInput` to use same state for next element. + */ + def sameState[A]: State[A, Out] = FlexiMerge.sameStateInstance.asInstanceOf[State[A, Out]] + + /** + * Convenience to create a [[ReadAny]] condition. + */ + @varargs def readAny(inputs: InputHandle*): ReadAny = { + import scala.collection.JavaConverters._ + new ReadAny(inputs.asJava) + } + + /** + * Convenience to create a [[Read]] condition. + */ + def read(input: InputHandle): Read = new Read(input) + + /** + * Will continue to operate until a read becomes unsatisfiable, then it completes. + * Errors are immediately propagated. + */ + def defaultCompletionHandling[A]: CompletionHandling[Out] = + new CompletionHandling[Out] { + override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = + sameState + override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { + ctx.error(cause) + sameState + } + } + + /** + * Completes as soon as any input completes. + * Errors are immediately propagated. + */ + def eagerClose[A]: CompletionHandling[Out] = + new CompletionHandling[Out] { + override def onComplete(ctx: MergeLogicContext[Out], input: InputHandle): State[A, Out] = { + ctx.complete() + sameState + } + override def onError(ctx: MergeLogicContext[Out], input: InputHandle, cause: Throwable): State[A, Out] = { + ctx.error(cause) + sameState + } + } + } + + private val sameStateInstance = new State[Any, Any](new ReadAny(java.util.Collections.emptyList[InputHandle])) { + override def onInput(ctx: MergeLogicContext[Any], input: InputHandle, element: Any): State[Any, Any] = + throw new UnsupportedOperationException("SameState.onInput should not be called") + + override def toString: String = "SameState" + } + + /** + * INTERNAL API + */ + private[akka] object Internal { + class MergeLogicWrapper[Out](delegate: MergeLogic[_, Out]) extends scaladsl.FlexiMerge.MergeLogic[Out] { + override def inputHandles(inputCount: Int): immutable.IndexedSeq[scaladsl.FlexiMerge.InputHandle] = + immutableIndexedSeq(delegate.inputHandles(inputCount)) + + override def initialState: this.State[_] = wrapState(delegate.initialState) + + override def initialCompletionHandling: this.CompletionHandling = + wrapCompletionHandling(delegate.initialCompletionHandling) + + private def wrapState[In](delegateState: FlexiMerge.State[In, Out]): State[In] = + if (sameStateInstance == delegateState) + SameState + else + State(convertReadCondition(delegateState.condition)) { (ctx, inputHandle, elem) ⇒ + val newDelegateState = + delegateState.onInput(new MergeLogicContextWrapper(ctx), asJava(inputHandle), elem) + wrapState(newDelegateState) + } + + private def wrapCompletionHandling( + delegateCompletionHandling: FlexiMerge.CompletionHandling[Out]): CompletionHandling = + CompletionHandling( + onComplete = (ctx, inputHandle) ⇒ { + val newDelegateState = delegateCompletionHandling.onComplete( + new MergeLogicContextWrapper(ctx), asJava(inputHandle)) + wrapState(newDelegateState) + }, + onError = (ctx, inputHandle, cause) ⇒ { + val newDelegateState = delegateCompletionHandling.onError( + new MergeLogicContextWrapper(ctx), asJava(inputHandle), cause) + wrapState(newDelegateState) + }) + + private def asJava(inputHandle: scaladsl.FlexiMerge.InputHandle): InputHandle = + inputHandle.asInstanceOf[InputHandle] + + class MergeLogicContextWrapper[In](delegate: MergeLogicContext) extends FlexiMerge.MergeLogicContext[Out] { + override def isDemandAvailable: Boolean = delegate.isDemandAvailable + override def emit(elem: Out): Unit = delegate.emit(elem) + override def complete(): Unit = delegate.complete() + override def error(cause: Throwable): Unit = delegate.error(cause) + override def cancel(input: InputHandle): Unit = delegate.cancel(input) + override def changeCompletionHandling(completion: FlexiMerge.CompletionHandling[Out]): Unit = + delegate.changeCompletionHandling(wrapCompletionHandling(completion)) + } + + } + + def convertReadCondition(condition: ReadCondition): scaladsl.FlexiMerge.ReadCondition = + condition match { + case r: ReadAny ⇒ scaladsl.FlexiMerge.ReadAny(immutableIndexedSeq(r.inputs)) + case r: Read ⇒ scaladsl.FlexiMerge.Read(r.input) + } + + } +} + +/** + * Base class for implementing custom merge junctions. + * Such a junction always has one [[#out]] port and one or more input ports. + * The input ports are to be defined in the concrete subclass and are created with + * [[#createInputPort]]. + * + * The concrete subclass must implement [[#createMergeLogic]] to define the [[FlexiMerge#MergeLogic]] + * that will be used when reading input elements and emitting output elements. + * The [[FlexiMerge#MergeLogic]] instance may be stateful, but the ``FlexiMerge`` instance + * must not hold mutable state, since it may be shared across several materialized ``FlowGraph`` + * instances. + * + * Note that a `FlexiMerge` with a specific name can only be used at one place (one vertex) + * in the `FlowGraph`. If the `name` is not specified the `FlexiMerge` instance can only + * be used at one place (one vertex) in the `FlowGraph`. + * + * @param name optional name of the junction in the [[FlowGraph]], + */ +abstract class FlexiMerge[In, Out](val name: Option[String]) { + import FlexiMerge._ + import scaladsl.FlowGraphInternal + import akka.stream.impl.Ast + + def this() = this(None) + def this(name: String) = this(Option(name)) + + private var inputCount = 0 + + def createMergeLogic(): MergeLogic[In, Out] + + // hide the internal vertex things from subclass, and make it possible to create new instance + private class FlexiMergeVertex(vertexName: Option[String]) extends FlowGraphInternal.InternalVertex { + override def minimumInputCount = 2 + override def maximumInputCount = inputCount + override def minimumOutputCount = 1 + override def maximumOutputCount = 1 + + override private[akka] val astNode = { + val factory = new MergeLogicFactory[Any] { + override def name: Option[String] = vertexName + override def createMergeLogic(): scaladsl.FlexiMerge.MergeLogic[Any] = + new Internal.MergeLogicWrapper(FlexiMerge.this.createMergeLogic().asInstanceOf[MergeLogic[Any, Any]]) + } + Ast.FlexiMergeNode(factory) + } + + override def name = vertexName + + final override def newInstance() = new FlexiMergeVertex(None) + } + + /** + * INTERNAL API + */ + private[akka] val vertex: FlowGraphInternal.InternalVertex = new FlexiMergeVertex(name) + + /** + * Output port of the `FlexiMerge` junction. A [[Sink]] can be connected to this output + * with the [[FlowGraphBuilder]]. + */ + val out: JunctionOutPort[Out] = new JunctionOutPort[Out] { + override val asScala: scaladsl.JunctionOutPort[Out] = new scaladsl.JunctionOutPort[Out] { + override def vertex: FlowGraphInternal.Vertex = FlexiMerge.this.vertex + } + } + + /** + * Concrete subclass is supposed to define one or more input ports and + * they are created by calling this method. Each [[FlexiMerge.InputPort]] can be + * connected to a [[Source]] with the [[FlowGraphBuilder]]. + * The `InputPort` is also an [[FlexiMerge.InputHandle]], which is passed as parameter + * to [[FlexiMerge#State]] `onInput` when an input element has been read so that you + * can know exactly from which input the element was read. + */ + protected final def createInputPort[T](): InputPort[T, Out] = { + val port = inputCount + inputCount += 1 + new InputPort(port, parent = this) + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala index ed895ccf2e..fb7261af60 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlexiMerge.scala @@ -5,13 +5,14 @@ package akka.stream.scaladsl import scala.collection.immutable import akka.stream.impl.Ast +import akka.stream.impl.FlexiMergeImpl.MergeLogicFactory object FlexiMerge { /** * @see [[InputPort]] */ - sealed trait InputHandle { + trait InputHandle { private[akka] def portIndex: Int } @@ -187,7 +188,7 @@ object FlexiMerge { * * @param name optional name of the junction in the [[FlowGraph]], */ -abstract class FlexiMerge[Out](val name: Option[String]) { +abstract class FlexiMerge[Out](val name: Option[String]) extends MergeLogicFactory[Out] { import FlexiMerge._ def this(name: String) = this(Some(name)) @@ -236,7 +237,7 @@ abstract class FlexiMerge[Out](val name: Option[String]) { * Create the stateful logic that will be used when reading input elements * and emitting output elements. Create a new instance every time. */ - def createMergeLogic(): MergeLogic[Out] + override def createMergeLogic(): MergeLogic[Out] override def toString = name match { case Some(n) ⇒ n