diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 1b0572a954..342aa97bc5 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -112,7 +112,7 @@ public class FlowTest { public void mustBeAbleToUseTransform() { final JavaTestKit probe = new JavaTestKit(system); final JavaTestKit probe2 = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); + final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7); // duplicate each element, stop after 4 elements, and emit sum to the end Source.from(input).transform("publish", new Creator>() { @Override @@ -165,7 +165,7 @@ public class FlowTest { @Test public void mustBeAbleToUseTransformRecover() { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); + final Iterable input = Arrays.asList(0, 1, 2, 3, 4, 5); Source.from(input).map(new Function() { public Integer apply(Integer elem) { if (elem == 4) { @@ -224,7 +224,7 @@ public class FlowTest { @Test public void mustBeAbleToUseGroupBy() { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee"); + final Iterable input = Arrays.asList("Aaa", "Abb", "Bcc", "Cdd", "Cee"); Source.from(input).groupBy(new Function() { public String apply(String elem) { return elem.substring(0, 1); @@ -258,7 +258,7 @@ public class FlowTest { @Test public void mustBeAbleToUseSplitWhen() { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("A", "B", "C", "\n", "D", "\n", "E", "F"); + final Iterable input = Arrays.asList("A", "B", "C", "\n", "D", "\n", "E", "F"); Source.from(input).splitWhen(new Predicate() { public boolean test(String elem) { return elem.equals("\n"); @@ -338,44 +338,100 @@ public class FlowTest { assertEquals(new HashSet(Arrays.asList("a", "b", "c", "d", "e", "f")), new HashSet(result)); } - // FIXME, implement the remaining junctions -// @Test -// public void mustBeAbleToUseZip() { -// final JavaTestKit probe = new JavaTestKit(system); -// final java.lang.Iterable input1 = Arrays.asList("A", "B", "C"); -// final java.lang.Iterable input2 = Arrays.asList(1, 2, 3); -// -// Source.from(input1).zip(Flow.of(input2).toPublisher(materializer)) -// .foreach(new Procedure>() { -// public void apply(Pair elem) { -// probe.getRef().tell(elem, ActorRef.noSender()); -// } -// }, materializer); -// -// List output = Arrays.asList(probe.receiveN(3)); -// @SuppressWarnings("unchecked") -// List> expected = Arrays.asList( -// new Pair("A", 1), -// new Pair("B", 2), -// new Pair("C", 3)); -// assertEquals(expected, output); -// } -// -// @Test -// public void mustBeAbleToUseConcat() { -// final JavaTestKit probe = new JavaTestKit(system); -// final java.lang.Iterable input1 = Arrays.asList("A", "B", "C"); -// final java.lang.Iterable input2 = Arrays.asList("D", "E", "F"); -// Flow.of(input1).concat(Flow.of(input2).toPublisher(materializer)).foreach(new Procedure() { -// public void apply(String elem) { -// probe.getRef().tell(elem, ActorRef.noSender()); -// } -// }, materializer); -// -// List output = Arrays.asList(probe.receiveN(6)); -// assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); -// } -// + @Test + public void mustBeAbleToUseZip() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList(1, 2, 3); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + final Zip zip = Zip.create(); + final KeyedSink, Future> out = Sink.foreach(new Procedure>() { + @Override + public void apply(Pair param) throws Exception { + probe.getRef().tell(param, ActorRef.noSender()); + } + }); + + FlowGraph. + builder(). + addEdge(in1, zip.left()). + addEdge(in2, zip.right()). + addEdge(zip.out(), out). + run(materializer); + + List output = Arrays.asList(probe.receiveN(3)); + @SuppressWarnings("unchecked") + List> expected = Arrays.asList( + new Pair("A", 1), + new Pair("B", 2), + new Pair("C", 3)); + assertEquals(expected, output); + } + + @Test + public void mustBeAbleToUseUnzip() { + final JavaTestKit probe1 = new JavaTestKit(system); + final JavaTestKit probe2 = new JavaTestKit(system); + + @SuppressWarnings("unchecked") + final List> input = Arrays.asList( + new Pair("A", 1), + new Pair("B", 2), + new Pair("C", 3)); + + final Iterable expected1 = Arrays.asList("A", "B", "C"); + final Iterable expected2 = Arrays.asList(1, 2, 3); + + final Source> in = Source.from(input); + final Unzip unzip = Unzip.create(); + + final KeyedSink> out1 = Sink.foreach(new Procedure() { + @Override + public void apply(String param) throws Exception { + probe1.getRef().tell(param, ActorRef.noSender()); + } + }); + final KeyedSink> out2 = Sink.foreach(new Procedure() { + @Override + public void apply(Integer param) throws Exception { + probe2.getRef().tell(param, ActorRef.noSender()); + } + }); + + FlowGraph. + builder(). + addEdge(in, unzip.in()). + addEdge(unzip.left(), out1). + addEdge(unzip.right(), out2). + run(materializer); + + List output1 = Arrays.asList(probe1.receiveN(3)); + List output2 = Arrays.asList(probe2.receiveN(3)); + assertEquals(expected1, output1); + assertEquals(expected2, output2); + } + + @Test + public void mustBeAbleToUseConcat() { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList("A", "B", "C"); + final Iterable input2 = Arrays.asList("D", "E", "F"); + + final Source in1 = Source.from(input1); + final Source in2 = Source.from(input2); + + in1.concat(in2).foreach(new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, materializer); + + List output = Arrays.asList(probe.receiveN(6)); + assertEquals(Arrays.asList("A", "B", "C", "D", "E", "F"), output); + } + @Test public void mustBeAbleToUseCallableInput() { final JavaTestKit probe = new JavaTestKit(system); @@ -406,7 +462,7 @@ public class FlowTest { @Test public void mustBeAbleToUseOnCompleteSuccess() { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("A", "B", "C"); + final Iterable input = Arrays.asList("A", "B", "C"); Source.from(input) .runWith(Sink.onComplete(new Procedure() { @@ -422,7 +478,7 @@ public class FlowTest { @Test public void mustBeAbleToUseOnCompleteError() { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("A", "B", "C"); + final Iterable input = Arrays.asList("A", "B", "C"); Source.from(input).map(new Function() { public String apply(String arg0) throws Exception { @@ -445,7 +501,7 @@ public class FlowTest { @Test public void mustBeAbleToUseToFuture() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("A", "B", "C"); + final Iterable input = Arrays.asList("A", "B", "C"); Future future = Source.from(input).runWith(Sink.future(), materializer); String result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); assertEquals("A", result); @@ -454,7 +510,7 @@ public class FlowTest { @Test public void mustBeAbleToUsePrefixAndTail() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); + final Iterable input = Arrays.asList(1, 2, 3, 4, 5, 6); Future, Source>> future = Source .from(input) .prefixAndTail(3) @@ -470,8 +526,8 @@ public class FlowTest { @Test public void mustBeAbleToUseConcatAllWithSources() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input1 = Arrays.asList(1, 2, 3); - final java.lang.Iterable input2 = Arrays.asList(4, 5); + final Iterable input1 = Arrays.asList(1, 2, 3); + final Iterable input2 = Arrays.asList(4, 5); final List> mainInputs = Arrays.asList( Source.from(input1), @@ -573,7 +629,7 @@ public class FlowTest { @Test public void mustBeAbleToUseMapFuture() throws Exception { final JavaTestKit probe = new JavaTestKit(system); - final java.lang.Iterable input = Arrays.asList("a", "b", "c"); + final Iterable input = Arrays.asList("a", "b", "c"); Source.from(input).mapAsync(new Function>() { public Future apply(String elem) { return Futures.successful(elem.toUpperCase()); diff --git a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala index c6ebdde6d9..373d55161f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/ActorBasedFlowMaterializer.scala @@ -9,6 +9,7 @@ import akka.actor._ import akka.pattern.ask import akka.stream.actor.ActorSubscriber import akka.stream.impl.{ ActorProcessor, ActorPublisher, BufferImpl, ConflateImpl, ExpandImpl, ExposedPublisher, MapAsyncProcessorImpl, TimerTransformerProcessorsImpl, TransformProcessorImpl } +import akka.stream.impl2.Zip.ZipAs import akka.stream.scaladsl2._ import akka.stream.{ MaterializerSettings, OverflowStrategy, TimerTransformer, Transformer } import org.reactivestreams.{ Processor, Publisher, Subscriber } @@ -89,7 +90,7 @@ private[akka] object Ast { override def name = "balance" } - case object Zip extends FanInAstNode { + final case class Zip(as: ZipAs) extends FanInAstNode { override def name = "zip" } @@ -225,8 +226,8 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting actorOf(FairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName) case Ast.MergePreferred ⇒ actorOf(UnfairMerge.props(settings, inputCount).withDispatcher(settings.dispatcher), actorName) - case Ast.Zip ⇒ - actorOf(Zip.props(settings).withDispatcher(settings.dispatcher), actorName) + case zip: Ast.Zip ⇒ + actorOf(Zip.props(settings, zip.as).withDispatcher(settings.dispatcher), actorName) case Ast.Concat ⇒ actorOf(Concat.props(settings).withDispatcher(settings.dispatcher), actorName) case Ast.FlexiMergeNode(merger) ⇒ diff --git a/akka-stream/src/main/scala/akka/stream/impl2/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl2/FanIn.scala index a366bb1784..95ac455062 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/FanIn.scala @@ -7,9 +7,11 @@ import akka.actor.{ ActorRef, ActorLogging, Actor } import akka.stream.MaterializerSettings import akka.stream.actor.{ ActorSubscriberMessage, ActorSubscriber } import akka.stream.impl._ +import akka.stream.impl2.Zip.ZipAs import org.reactivestreams.{ Subscription, Subscriber } import akka.actor.Props import akka.stream.scaladsl2.FlexiMerge +import scala.language.higherKinds /** * INTERNAL API @@ -282,20 +284,41 @@ private[akka] class UnfairMerge(_settings: MaterializerSettings, _inputPorts: In * INTERNAL API */ private[akka] object Zip { - def props(settings: MaterializerSettings): Props = - Props(new Zip(settings)) + def props(settings: MaterializerSettings, zipAs: ZipAs): Props = + Props(new Zip(settings, zipAs)) + + /** + * INTERNAL API + * + * Allows to zip to different tuple implementations (e.g. Scala's `Tuple2` or Java's `Pair`), + * while sharing the same `Zip` implementation. + */ + private[akka] sealed trait ZipAs { + type Zipped[A, B] + def apply[A, B](first: A, second: B): Zipped[A, B] + } + /** INTERNAL API */ + private[akka] object AsJavaPair extends ZipAs { + override type Zipped[A, B] = akka.japi.Pair[A, B] + override def apply[A, B](first: A, second: B) = akka.japi.Pair(first, second) + } + /** INTERNAL API */ + private[akka] object AsScalaTuple2 extends ZipAs { + override type Zipped[A, B] = (A, B) + override def apply[A, B](first: A, second: B) = first → second + } } /** * INTERNAL API */ -private[akka] class Zip(_settings: MaterializerSettings) extends FanIn(_settings, inputPorts = 2) { +private[akka] class Zip(_settings: MaterializerSettings, zip: ZipAs) extends FanIn(_settings, inputPorts = 2) { inputBunch.markAllInputs() nextPhase(TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ val elem0 = inputBunch.dequeue(0) val elem1 = inputBunch.dequeue(1) - primaryOutputs.enqueueOutputElement((elem0, elem1)) + primaryOutputs.enqueueOutputElement(zip(elem0, elem1)) }) } diff --git a/akka-stream/src/main/scala/akka/stream/impl2/FanOut.scala b/akka-stream/src/main/scala/akka/stream/impl2/FanOut.scala index 1f0283d011..463c158a52 100644 --- a/akka-stream/src/main/scala/akka/stream/impl2/FanOut.scala +++ b/akka-stream/src/main/scala/akka/stream/impl2/FanOut.scala @@ -3,13 +3,20 @@ */ package akka.stream.impl2 -import java.util.concurrent.atomic.AtomicReference -import akka.actor.{ Actor, ActorLogging, ActorRef } -import akka.stream.MaterializerSettings -import akka.stream.impl.{ BatchingInputBuffer, Pump, SimpleOutputs, SubReceive, TransferState, _ } -import org.reactivestreams.{ Subscription, Subscriber, Publisher } -import scala.collection.immutable import akka.actor.Props +import akka.actor.Actor +import akka.actor.ActorLogging +import akka.actor.ActorRef +import akka.stream.MaterializerSettings +import akka.stream.impl.BatchingInputBuffer +import akka.stream.impl.Pump +import akka.stream.impl.SimpleOutputs +import akka.stream.impl.SubReceive +import akka.stream.impl.TransferState +import akka.stream.impl._ +import org.reactivestreams.Subscription + +import scala.collection.immutable /** * INTERNAL API @@ -263,9 +270,20 @@ private[akka] class Unzip(_settings: MaterializerSettings) extends FanOut(_setti (0 until outputPorts) foreach outputBunch.markOutput nextPhase(TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs) { () ⇒ - val (elem0, elem1) = primaryInputs.dequeueInputElement().asInstanceOf[(Any, Any)] - outputBunch.enqueue(0, elem0) - outputBunch.enqueue(1, elem1) + primaryInputs.dequeueInputElement() match { + case (a, b) ⇒ + outputBunch.enqueue(0, a) + outputBunch.enqueue(1, b) + + case t: akka.japi.Pair[_, _] ⇒ + outputBunch.enqueue(0, t.first) + outputBunch.enqueue(1, t.second) + + case t ⇒ + throw new IllegalArgumentException( + s"Unable to unzip elements of type {t.getClass.getName}, " + + s"can only handle Tuple2 and akka.japi.Pair!") + } }) } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index 5cd4013d89..9554a33731 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -3,12 +3,13 @@ */ package akka.stream.javadsl -import akka.stream.javadsl -import akka.stream.scaladsl2 - import akka.stream._ import akka.stream.scaladsl2 import akka.stream.scaladsl2 +import akka.stream.scaladsl2 +import impl2.Ast + +import akka.stream._ // elements // @@ -164,8 +165,7 @@ object Broadcast { * two downstream subscribers have been established. */ class Broadcast[T](delegate: scaladsl2.Broadcast[T]) extends javadsl.Junction[T] { - /** Convert this element to it's `scaladsl2` equivalent. */ - def asScala: scaladsl2.Broadcast[T] = delegate + override def asScala: scaladsl2.Broadcast[T] = delegate } object Balance { @@ -211,7 +211,169 @@ class Balance[T](delegate: scaladsl2.Balance[T]) extends javadsl.Junction[T] { override def asScala: scaladsl2.Balance[T] = delegate } -// TODO implement: Concat, Zip, Unzip and friends +object Zip { + + /** + * Create a new anonymous `Zip` vertex with the specified input types. + * Note that a `Zip` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`.* + */ + def create[A, B](): Zip[A, B] = create(name = null) + + /** + * Create a new anonymous `Zip` vertex with the specified input types. + * Note that a `Zip` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def create[A, B](left: Class[A], right: Class[B]): Zip[A, B] = create[A, B](name = null) + + /** + * Create a named `Zip` vertex with the specified input types. + * Note that a `Zip` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`.* + */ + def create[A, B](name: String): Zip[A, B] = + new Zip(new scaladsl2.Zip[A, B](Option(name)) { + override private[akka] def astNode: Ast.FanInAstNode = Ast.Zip(impl2.Zip.AsJavaPair) + }) + + /** + * Create a named `Zip` vertex with the specified input types. + * Note that a `Zip` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`.* + */ + def create[A, B](name: String, left: Class[A], right: Class[A]): Zip[A, B] = + create[A, B](name) + + class Left[A, B](private val zip: Zip[A, B]) extends JunctionInPort[A] { + override def asScala: scaladsl2.JunctionInPort[A] = zip.asScala.left + } + class Right[A, B](private val zip: Zip[A, B]) extends JunctionInPort[B] { + override def asScala: scaladsl2.JunctionInPort[B] = zip.asScala.right + } + class Out[A, B](private val zip: Zip[A, B]) extends JunctionOutPort[akka.japi.Pair[A, B]] { + // this cast is safe thanks to using `ZipAs` in the Ast element, Zip will emit the expected type (Pair) + override def asScala: scaladsl2.JunctionOutPort[akka.japi.Pair[A, B]] = + zip.asScala.out.asInstanceOf[scaladsl2.JunctionOutPort[akka.japi.Pair[A, B]]] + } +} + +/** + * Takes two streams and outputs an output stream formed from the two input streams + * by combining corresponding elements in pairs. If one of the two streams is + * longer than the other, its remaining elements are ignored. + */ +final class Zip[A, B] private (delegate: scaladsl2.Zip[A, B]) { + + /** Convert this element to it's `scaladsl2` equivalent. */ + def asScala = delegate + + val left = new Zip.Left(this) + val right = new Zip.Right(this) + val out = new Zip.Out(this) +} + +object Unzip { + def create[A, B](): Unzip[A, B] = + create(null) + + def create[A, B](name: String): Unzip[A, B] = + new Unzip[A, B](new scaladsl2.Unzip[A, B](Option(name))) + + def create[A, B](left: Class[A], right: Class[B]): Unzip[A, B] = + create[A, B]() + + def create[A, B](name: String, left: Class[A], right: Class[B]): Unzip[A, B] = + create[A, B](name) + + class In[A, B](private val unzip: Unzip[A, B]) extends JunctionInPort[akka.japi.Pair[A, B]] { + // this cast is safe thanks to using `ZipAs` in the Ast element, Zip will emit the expected type (Pair) + override def asScala: scaladsl2.JunctionInPort[akka.japi.Pair[A, B]] = + unzip.asScala.in.asInstanceOf[scaladsl2.JunctionInPort[akka.japi.Pair[A, B]]] + } + class Left[A, B](private val unzip: Unzip[A, B]) extends JunctionOutPort[A] { + override def asScala: scaladsl2.JunctionOutPort[A] = + unzip.asScala.left + } + class Right[A, B](private val unzip: Unzip[A, B]) extends JunctionOutPort[B] { + override def asScala: scaladsl2.JunctionOutPort[B] = + unzip.asScala.right + } +} + +final class Unzip[A, B] private (delegate: scaladsl2.Unzip[A, B]) { + + /** Convert this element to it's `scaladsl2` equivalent. */ + def asScala = delegate + + val in = new Unzip.In(this) + val left = new Unzip.Left(this) + val right = new Unzip.Right(this) +} + +object Concat { + /** + * Create a new anonymous `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def create[T](): Concat[T] = new Concat(scaladsl2.Concat[T]) + + /** + * Create a new anonymous `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def create[T](clazz: Class[T]): Concat[T] = create() + + /** + * Create a named `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`.* + */ + def create[T](name: String): Concat[T] = new Concat(scaladsl2.Concat[T](name)) + + /** + * Create a named `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`.* + */ + def create[T](name: String, clazz: Class[T]): Concat[T] = create(name) + + class First[T] private[akka] (delegate: scaladsl2.Concat.First[T]) extends JunctionInPort[T] { + override def asScala: scaladsl2.JunctionInPort[T] = delegate + } + class Second[T] private[akka] (delegate: scaladsl2.Concat.Second[T]) extends JunctionInPort[T] { + override def asScala: scaladsl2.JunctionInPort[T] = delegate + } + class Out[T] private[akka] (delegate: scaladsl2.Concat.Out[T]) extends JunctionOutPort[T] { + override def asScala: scaladsl2.JunctionOutPort[T] = delegate + } + +} + +/** + * Takes two streams and outputs an output stream formed from the two input streams + * by consuming one stream first emitting all of its elements, then consuming the + * second stream emitting all of its elements. + */ +class Concat[T] private (delegate: scaladsl2.Concat[T]) { + + /** Convert this element to it's `scaladsl2` equivalent. */ + def asScala = delegate + + val first = new Concat.First[T](delegate.first) + val second = new Concat.Second[T](delegate.second) + val out = new Concat.Out[T](delegate.out) +} // undefined elements // @@ -350,6 +512,11 @@ class FlowGraphBuilder(b: scaladsl2.FlowGraphBuilder) { this } + def addEdge[In](source: javadsl.Source[In], junctionIn: javadsl.JunctionInPort[In]): FlowGraphBuilder = { + b.addEdge(source.asScala, junctionIn.asScala) + this + } + def addEdge[In, Out](junctionOut: javadsl.JunctionOutPort[In], sink: Sink[In]): FlowGraphBuilder = { b.addEdge(junctionOut.asScala, sink.asScala) this @@ -474,6 +641,7 @@ class PartialFlowGraph(delegate: scaladsl2.PartialFlowGraph) { class FlowGraph(delegate: scaladsl2.FlowGraph) extends RunnableFlow { + /** Convert this element to it's `scaladsl2` equivalent. */ def asScala: scaladsl2.FlowGraph = delegate override def run(materializer: scaladsl2.FlowMaterializer): javadsl.MaterializedMap = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala index 008c47c727..f5ddef7782 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl2/FlowGraph.scala @@ -3,6 +3,10 @@ */ package akka.stream.scaladsl2 +import akka.stream.impl2 +import akka.stream.impl2.Ast.FanInAstNode +import impl2.Ast + import scala.language.existentials import scalax.collection.edge.{ LkBase, LkDiEdge } import scalax.collection.mutable.Graph @@ -240,7 +244,7 @@ object Zip { * by combining corresponding elements in pairs. If one of the two streams is * longer than the other, its remaining elements are ignored. */ -final class Zip[A, B](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { +private[akka] class Zip[A, B](override val name: Option[String]) extends FlowGraphInternal.InternalVertex { val left = new Zip.Left(this) val right = new Zip.Right(this) val out = new Zip.Out(this) @@ -250,7 +254,7 @@ final class Zip[A, B](override val name: Option[String]) extends FlowGraphIntern override def minimumOutputCount: Int = 1 override def maximumOutputCount: Int = 1 - override private[akka] def astNode = Ast.Zip + override private[akka] def astNode: FanInAstNode = Ast.Zip(impl2.Zip.AsScalaTuple2) final override private[scaladsl2] def newInstance() = new Zip[A, B](name = None) }