diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java index d7bafe8edb..264c1c2314 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowGraphTest.java @@ -4,8 +4,12 @@ package akka.stream.javadsl; import akka.actor.ActorRef; -import akka.japi.Pair; +import akka.japi.*; import akka.stream.StreamTest; +import akka.stream.javadsl.japi.Creator; +import akka.stream.javadsl.japi.Function; +import akka.stream.javadsl.japi.Function2; +import akka.stream.javadsl.japi.Procedure; import akka.stream.stage.*; import akka.stream.javadsl.japi.*; import akka.stream.testkit.AkkaSpec; @@ -16,6 +20,7 @@ import org.junit.Test; import org.reactivestreams.Publisher; import scala.concurrent.Await; import scala.concurrent.Future; +import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; @@ -99,7 +104,7 @@ public class FlowGraphTest extends StreamTest { final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); - final ZipWith> zip = Zip.create(); + final Zip2With> zip = Zip.create(); final KeyedSink, Future> out = Sink .foreach(new Procedure>() { @Override @@ -124,7 +129,7 @@ public class FlowGraphTest extends StreamTest { @SuppressWarnings("unchecked") final List> input = Arrays.asList(new Pair("A", 1), - new Pair("B", 2), new Pair("C", 3)); + new Pair("B", 2), new Pair("C", 3)); final Iterable expected1 = Arrays.asList("A", "B", "C"); final Iterable expected2 = Arrays.asList(1, 2, 3); @@ -154,4 +159,59 @@ public class FlowGraphTest extends StreamTest { assertEquals(expected2, output2); } + @Test + public void mustBeAbleToUseZipWith() throws Exception { + final Source in1 = Source.single(1); + final Source in2 = Source.single(10); + + final Zip2With sumZip = ZipWith.create( + + new Function2() { + @Override public Integer apply(Integer l, Integer r) throws Exception { + return l + r; + } + }); + + final KeyedSink> out = Sink.head(); + + MaterializedMap mat = FlowGraph.builder() + .addEdge(in1, sumZip.left()) + .addEdge(in2, sumZip.right()) + .addEdge(sumZip.out(), out) + .run(materializer); + + final Integer result = Await.result(mat.get(out), Duration.create(300, TimeUnit.MILLISECONDS)); + assertEquals(11, (int) result); + } + + @Test + public void mustBeAbleToUseZip4With() throws Exception { + final Source in1 = Source.single(1); + final Source in2 = Source.single(10); + final Source in3 = Source.single(100); + final Source in4 = Source.single(1000); + + Function, Integer> sum4 = new Function, Integer>() { + @Override + public Integer apply(ZipWith.Zip4WithInputs inputs) throws Exception { + return inputs.t1() + inputs.t2() + inputs.t3() + inputs.t4(); + } + }; + + Zip4With sum4Zip = ZipWith.create(sum4); + + final KeyedSink> out = Sink.head(); + + MaterializedMap mat = FlowGraph.builder() + .addEdge(in1, sum4Zip.input1()) + .addEdge(in2, sum4Zip.input2()) + .addEdge(in3, sum4Zip.input3()) + .addEdge(in4, sum4Zip.input4()) + .addEdge(sum4Zip.out(), out) + .run(materializer); + + final Integer result = Await.result(mat.get(out), Duration.create(300, TimeUnit.MILLISECONDS)); + assertEquals(1111, (int) result); + } + } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 1b1e61d5a3..3659f82762 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -278,7 +278,7 @@ public class FlowTest extends StreamTest { final Source in1 = Source.from(input1); final Source in2 = Source.from(input2); - final ZipWith> zip = Zip.create(); + final Zip2With> zip = Zip.create(); final KeyedSink, Future> out = Sink .foreach(new Procedure>() { @Override diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala index 370de31234..c9b1e2bb4f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/GraphZipWithSpec.scala @@ -99,6 +99,81 @@ class GraphZipWithSpec extends TwoStreamsSetup { val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException) } + "zipWith a ETA expanded Person.apply (3 inputs)" in { + val probe = StreamTestKit.SubscriberProbe[Person]() + + case class Person(name: String, surname: String, int: Int) + + FlowGraph { implicit b ⇒ + val zip = ZipWith(Person.apply _) + + Source.single("Caplin") ~> zip.input1 + Source.single("Capybara") ~> zip.input2 + Source.single(3) ~> zip.input3 + + zip.out ~> Sink(probe) + }.run() + + val subscription = probe.expectSubscription() + + subscription.request(5) + probe.expectNext(Person("Caplin", "Capybara", 3)) + + probe.expectComplete() + } + + "work with up to 22 inputs" in { + val probe = StreamTestKit.SubscriberProbe[String]() + + FlowGraph { implicit b ⇒ + + val sum22 = (v1: Int, v2: String, v3: Int, v4: String, v5: Int, v6: String, v7: Int, v8: String, v9: Int, v10: String, + v11: Int, v12: String, v13: Int, v14: String, v15: Int, v16: String, v17: Int, v18: String, v19: Int, + v20: String, v21: Int, v22: String) ⇒ + v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 + + v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18 + v19 + v20 + + v21 + v22 + + // odd input ports will be Int, even input ports will be String + val zip = ZipWith(sum22) + + val one = Source.single(1) + + one ~> zip.input1 + one.map(_.toString) ~> zip.input2 + one ~> zip.input3 + one.map(_.toString) ~> zip.input4 + one ~> zip.input5 + one.map(_.toString) ~> zip.input6 + one ~> zip.input7 + one.map(_.toString) ~> zip.input8 + one ~> zip.input9 + one.map(_.toString) ~> zip.input10 + one ~> zip.input11 + one.map(_.toString) ~> zip.input12 + one ~> zip.input13 + one.map(_.toString) ~> zip.input14 + one ~> zip.input15 + one.map(_.toString) ~> zip.input16 + one ~> zip.input17 + one.map(_.toString) ~> zip.input18 + one ~> zip.input19 + one.map(_.toString) ~> zip.input20 + one ~> zip.input21 + one.map(_.toString) ~> zip.input22 + + zip.out ~> Sink(probe) + }.run() + + val subscription = probe.expectSubscription() + + subscription.request(1) + probe.expectNext("1" * 22) + + probe.expectComplete() + + } + } } diff --git a/akka-stream/src/main/boilerplate/akka/stream/impl/ZipWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/impl/ZipWith.scala.template new file mode 100644 index 0000000000..584a656a74 --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/impl/ZipWith.scala.template @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.impl + +import akka.actor.Props +import akka.stream.MaterializerSettings + +/** + * INTERNAL API + */ +private[akka] object ZipWith { + + /** @param f MUST be a FunctionN type. */ + def props(settings: MaterializerSettings, f: Any): Props = f match { + [2..#case f1: Function1[[#Any#], Any] => Props(new Zip1With(settings, f1))# + ] + } + + [2..#def props(settings: MaterializerSettings, f: Function1[[#Any#], Any]): Props = + Props(new Zip1With(settings, f))# + ] +} + +[2..#/** INTERNAL API */ +private[akka] final class Zip1With(_settings: MaterializerSettings, f: Function1[[#Any#], Any]) extends FanIn(_settings, inputPorts = 1) { + inputBunch.markAllInputs() + + nextPhase(TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ + val elem##0 = inputBunch.dequeue(##0) + [2..#val elem0 = inputBunch.dequeue(0)# + ] + + primaryOutputs.enqueueOutputElement(f([#elem0#])) + }) +}# +] \ No newline at end of file diff --git a/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template new file mode 100644 index 0000000000..b38b662983 --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/javadsl/ZipWith.scala.template @@ -0,0 +1,73 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.javadsl + +import akka.stream.scaladsl +import akka.stream.javadsl.japi + +object ZipWith { + + /** + * Create a new `ZipWith` vertex with the specified input types and zipping-function `f`. + * + * @param f zipping-function from the input values to the output value + * @param attributes optional attributes for this vertex + */ + def create[A, B, Out](f: japi.Function2[A, B, Out], attributes: OperationAttributes): Zip2With[A, B, Out] = + new Zip2With(new scaladsl.Zip2With[A, B, Out](f.apply _, attributes.asScala)) + + /** + * Create a new `ZipWith` vertex with the specified input types and zipping-function `f`. + * + * @param f zipping-function from the input values to the output value + * Creates a new named `ZipWith` vertex with the specified input types and zipping-function `f`. + * Note that a `ZipWith` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def create[A, B, Out](f: japi.Function2[A, B, Out]): Zip2With[A, B, Out] = + create(f, OperationAttributes.none) + + + [3..#/** Create a new `ZipWith` specialized for 1 input streams. */ + def create[[#T1#], Out](f: japi.Function[Zip1WithInputs[[#T1#]], Out]) = + new Zip1With(new scaladsl.Zip1With[[#T1#], Out](([#t1#]) ⇒ f.apply(new Zip1WithInputs[[#T1#]]([#t1#])), scaladsl.OperationAttributes.none))# + ] + + // CLASS BOILERPLATE + + sealed trait ZipWithInputs + [2..#final class Zip1WithInputs[[#T1#]]([#val t1: T1#]) extends ZipWithInputs# + ] + + [#final class Input1[T1, Out] private[akka] (val asScala: scaladsl.ZipWith.Input1[T1, Out]) extends JunctionInPort[T1]# + ] + + final class Left[A, B, Out](override val asScala: scaladsl.ZipWith.Left[A, B, Out]) extends JunctionInPort[A] + final class Right[A, B, Out](override val asScala: scaladsl.ZipWith.Right[A, B, Out]) extends JunctionInPort[B] + final class Out[Out](override val asScala: scaladsl.ZipWith.Out[Out]) extends JunctionOutPort[Out] +} + +/** + * Takes two streams and outputs an output stream formed from the two input streams + * by combining corresponding elements in pairs. If one of the two streams is + * longer than the other, its remaining elements are ignored. + */ +final class Zip2With[A, B, Out] private[akka] (val asScala: scaladsl.Zip2With[A, B, Out]) { + val left = new ZipWith.Left[A, B, Out](asScala.left) + val right = new ZipWith.Right[A, B, Out](asScala.right) + val out = new ZipWith.Out[Out](asScala.out) +} + +[3..#/** + * Takes multiple streams and outputs an output stream formed from the two input streams + * by combining corresponding elements in pairs. If one of the two streams is + * longer than the other, its remaining elements are ignored. + */ +final class Zip1With[[#T1#], Out] private[akka] (val asScala: scaladsl.Zip1With[[#T1#], Out]) { + val out = new ZipWith.Out[Out](asScala.out) + [#val input1 = new ZipWith.Input1[T1, Out](asScala.input1)# + ] +}# +] \ No newline at end of file diff --git a/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWith.scala.template b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWith.scala.template new file mode 100644 index 0000000000..b1b9fe67f8 --- /dev/null +++ b/akka-stream/src/main/boilerplate/akka/stream/scaladsl/ZipWith.scala.template @@ -0,0 +1,79 @@ +/** + * Copyright (C) 2014 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.impl.Ast.FanInAstNode +import akka.stream.impl.Ast +import akka.stream.impl.Ast.Defaults._ + +object ZipWith { + + [2..#/** + * Create a new anonymous `ZipWith` vertex with 1 specified input types. + * Note that a `ZipWith` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. + */ + def apply[[#T1#], C](f: Function1[[#T1#], C]): Zip1With[[#T1#], C] = + new Zip1With[[#T1#], C](f, OperationAttributes.none)# + ] + + final class Left[A, B, C] private[akka] (private[akka] val vertex: Zip2With[A, B, C]) extends JunctionInPort[A] { + type NextT = C + override private[akka] def port = 0 + override private[akka] def next = vertex.out + } + + final class Right[A, B, C] private[akka] (private[akka] val vertex: Zip2With[A, B, C]) extends JunctionInPort[B] { + type NextT = C + override private[akka] def port = 1 + override private[akka] def next = vertex.out + } + + [#final class Input1[T1, C] private[akka] (private[akka] val vertex: ZipWithBase[C]) extends JunctionInPort[T1] { + type NextT = C + override private[akka] def port = 1 + override private[akka] def next = vertex.out + }# + ] + + final class Out[C] private[akka] (private[akka] val vertex: ZipWithBase[C]) extends JunctionOutPort[C] +} + +/** + * Takes two streams and outputs an output stream formed from the two input streams + * by combining corresponding elements using the supplied function. + * If one of the two streams is longer than the other, its remaining elements are ignored. + */ +private[akka] final class Zip2With[A, B, C](override val f: (A, B) ⇒ C, override val attributes: OperationAttributes) extends ZipWithBase[C] { + val left = new ZipWith.Left(this) + val right = new ZipWith.Right(this) + + override def minimumInputCount: Int = 2 + override def maximumInputCount: Int = 2 + + // FIXME cache + private[akka] override def astNode: FanInAstNode = Ast.Zip2With(f.asInstanceOf[(Any, Any) ⇒ Any], zip and attributes) + + private[scaladsl] final override def newInstance() = new Zip2With[A, B, C](f = f, attributes.withoutName) +} + +[3..#/** + * Takes 1 streams and outputs an output stream formed from the two input streams + * by combining corresponding elements using the supplied function. + * If one of the two streams is longer than the other, its remaining elements are ignored. + */ +private[akka] final class Zip1With[[#T1#], C](override val f: Function1[[#T1#], C], override val attributes: OperationAttributes) extends ZipWithBase[C] { + [#val input1 = new ZipWith.Input1[T1, C](this)# + ] + + override def minimumInputCount: Int = 1 + override def maximumInputCount: Int = 1 + + // FIXME cache + private[akka] override def astNode: FanInAstNode = Ast.Zip1With(f.asInstanceOf[Function1[[#T1#], Any]], zip and attributes) + + private[scaladsl] final override def newInstance() = new Zip1With[[#T1#], C](f, attributes.withoutName) +}# +] diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala index 020a716803..b1dd6acae2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorBasedFlowMaterializer.scala @@ -175,7 +175,35 @@ private[akka] object Ast { sealed trait FanInAstNode extends JunctionAstNode sealed trait FanOutAstNode extends JunctionAstNode - final case class ZipWith(f: (Any, Any) ⇒ Any, attributes: OperationAttributes) extends FanInAstNode + /** + * INTERNAL API + * `f` MUST be implemented as value of type `scala.FunctionN` + */ + sealed trait ZipWith extends FanInAstNode { + /** MUST be implemented as type of FunctionN */ + def f: Any + } + final case class Zip2With[T1, T2](f: Function2[T1, T2, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip3With[T1, T2, T3](f: Function3[T1, T2, T3, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip4With[T1, T2, T3, T4](f: Function4[T1, T2, T3, T4, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip5With[T1, T2, T3, T4, T5](f: Function5[T1, T2, T3, T4, T5, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip6With[T1, T2, T3, T4, T5, T6](f: Function6[T1, T2, T3, T4, T5, T6, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip7With[T1, T2, T3, T4, T5, T6, T7](f: Function7[T1, T2, T3, T4, T5, T6, T7, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip8With[T1, T2, T3, T4, T5, T6, T7, T8](f: Function8[T1, T2, T3, T4, T5, T6, T7, T8, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip9With[T1, T2, T3, T4, T5, T6, T7, T8, T9](f: Function9[T1, T2, T3, T4, T5, T6, T7, T8, T9, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip10With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10](f: Function10[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip11With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11](f: Function11[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip12With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12](f: Function12[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip13With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13](f: Function13[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip14With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14](f: Function14[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip15With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15](f: Function15[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip16With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16](f: Function16[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip17With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17](f: Function17[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip18With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18](f: Function18[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip19With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19](f: Function19[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip20With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20](f: Function20[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip21With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21](f: Function21[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, Any], attributes: OperationAttributes) extends ZipWith + final case class Zip22With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22](f: Function22[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, Any], attributes: OperationAttributes) extends ZipWith // FIXME Why do we need this? case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode @@ -457,7 +485,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting val props = fanin match { case Ast.Merge(_) ⇒ FairMerge.props(transformedSettings, inputCount) case Ast.MergePreferred(_) ⇒ UnfairMerge.props(transformedSettings, inputCount) - case Ast.ZipWith(f, _) ⇒ ZipWith.props(transformedSettings, f) + case z: Ast.ZipWith ⇒ ZipWith.props(transformedSettings, z.f) case Ast.Concat(_) ⇒ Concat.props(transformedSettings) case Ast.FlexiMergeNode(merger, _) ⇒ FlexiMergeImpl.props(transformedSettings, inputCount, merger.createMergeLogic()) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala index cfc1bb3e2f..33dad6bfc7 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/FanIn.scala @@ -278,26 +278,6 @@ private[akka] final class UnfairMerge(_settings: MaterializerSettings, }) } -/** - * INTERNAL API - */ -private[akka] object ZipWith { - def props(settings: MaterializerSettings, f: (Any, Any) ⇒ Any): Props = Props(new ZipWith(settings, f)) -} - -/** - * INTERNAL API - */ -private[akka] final class ZipWith(_settings: MaterializerSettings, f: (Any, Any) ⇒ Any) extends FanIn(_settings, inputPorts = 2) { - inputBunch.markAllInputs() - - nextPhase(TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒ - val elem0 = inputBunch.dequeue(0) - val elem1 = inputBunch.dequeue(1) - primaryOutputs.enqueueOutputElement(f(elem0, elem1)) - }) -} - /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala index e4685899dd..c3b2c0a260 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowGraph.scala @@ -5,9 +5,6 @@ package akka.stream.javadsl import akka.stream._ import akka.stream.scaladsl -import akka.stream.impl.Ast - -import akka.stream._ trait JunctionInPort[-T] { /** Convert this element to it's `scaladsl` equivalent. */ @@ -219,73 +216,41 @@ class Balance[T](delegate: scaladsl.Balance[T]) extends javadsl.Junction[T] { } object Zip { - import akka.japi.{ Pair, Function2 } + import akka.stream.javadsl.japi.Function2 + import akka.japi.Pair /** - * Create a new `ZipWith` vertex with the specified input types and zipping-function + * Create a new anonymous `Zip2With` vertex with the specified input types and zipping-function * which creates `akka.japi.Pair`s. - * + * Note that a `ZipWith` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. * @param attributes optional attributes for this vertex */ - def create[A, B](attributes: OperationAttributes): ZipWith[A, B, A Pair B] = + def create[A, B](attributes: OperationAttributes): Zip2With[A, B, A Pair B] = ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]], attributes) /** * Create a new `ZipWith` vertex with the specified input types and zipping-function * which creates `akka.japi.Pair`s. */ - def create[A, B]: ZipWith[A, B, A Pair B] = create(OperationAttributes.none) + def create[A, B]: Zip2With[A, B, A Pair B] = create(OperationAttributes.none) private[this] final val _toPair: Function2[Any, Any, Any Pair Any] = new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) } } -object ZipWith { - - /** - * Create a new `ZipWith` vertex with the specified input types and zipping-function `f`. - * - * @param f zipping-function from the input values to the output value - * @param attributes optional attributes for this vertex - */ - def create[A, B, C](f: akka.japi.Function2[A, B, C], attributes: OperationAttributes): ZipWith[A, B, C] = - new ZipWith(new scaladsl.ZipWith[A, B, C](f.apply _, attributes.asScala)) - - /** - * Create a new `ZipWith` vertex with the specified input types and zipping-function `f`. - * - * @param f zipping-function from the input values to the output value - */ - def create[A, B, C](f: akka.japi.Function2[A, B, C]): ZipWith[A, B, C] = - create(f = f, OperationAttributes.none) - - final class Left[A, B, C](override val asScala: scaladsl.ZipWith.Left[A, B, C]) extends JunctionInPort[A] - final class Right[A, B, C](override val asScala: scaladsl.ZipWith.Right[A, B, C]) extends JunctionInPort[B] - final class Out[A, B, C](override val asScala: scaladsl.ZipWith.Out[A, B, C]) extends JunctionOutPort[C] -} - -/** - * Takes two streams and outputs an output stream formed from the two input streams - * by combining corresponding elements in pairs. If one of the two streams is - * longer than the other, its remaining elements are ignored. - */ -final class ZipWith[A, B, C] private (val asScala: scaladsl.ZipWith[A, B, C]) { - val left = new ZipWith.Left[A, B, C](asScala.left) - val right = new ZipWith.Right[A, B, C](asScala.right) - val out = new ZipWith.Out[A, B, C](asScala.out) -} - object Unzip { /** * Creates a new `Unzip` vertex with the specified output types and attributes. * - * @param attributes optional attributes for this vertex + * @param attributes attributes for this vertex */ def create[A, B](attributes: OperationAttributes): Unzip[A, B] = new Unzip[A, B](new scaladsl.Unzip[A, B](attributes.asScala)) /** - * Creates a new `Unzip` vertex with the specified output types. + * Creates a new `Unzip` vertex with the specified output types and attributes. */ def create[A, B](): Unzip[A, B] = create(OperationAttributes.none) @@ -335,21 +300,30 @@ final class Unzip[A, B] private (delegate: scaladsl.Unzip[A, B]) { object Concat { /** - * Create a new `Concat` vertex with the specified input types and attributes. - * - * @param attributes optional attributes for this vertex + * Create a new anonymous `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. */ - def create[T](attributes: OperationAttributes): Concat[T] = new Concat(scaladsl.Concat[T]) + def create[T](): Concat[T] = + create(OperationAttributes.none) /** - * Create a new `Concat` vertex with the specified input types. + * Create a new anonymous `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. */ - def create[T](): Concat[T] = create(OperationAttributes.none) + def create[T](attributes: OperationAttributes): Concat[T] = + new Concat(scaladsl.Concat[T](attributes.asScala)) /** - * Create a new `Concat` vertex with the specified input types. + * Create a new anonymous `Concat` vertex with the specified input types. + * Note that a `Concat` instance can only be used at one place (one vertex) + * in the `FlowGraph`. This method creates a new instance every time it + * is called and those instances are not `equal`. */ - def create[T](clazz: Class[T]): Concat[T] = create() + def create[T](clazz: Class[T], attributes: OperationAttributes): Concat[T] = create(attributes) class First[T] private[akka] (delegate: scaladsl.Concat.First[T]) extends JunctionInPort[T] { override def asScala: scaladsl.JunctionInPort[T] = delegate @@ -449,7 +423,6 @@ object FlowGraph { * Builder of [[FlowGraph]] and [[PartialFlowGraph]]. */ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) { - import akka.stream.scaladsl.JavaConverters._ def this() { this(new scaladsl.FlowGraphBuilder()) @@ -571,9 +544,10 @@ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) { object PartialFlowGraphBuilder extends FlowGraphBuilder class PartialFlowGraph(delegate: scaladsl.PartialFlowGraph) { - import collection.JavaConverters._ import akka.stream.scaladsl.JavaConverters._ + import collection.JavaConverters._ + def asScala: scaladsl.PartialFlowGraph = delegate def undefinedSources(): java.util.Set[UndefinedSource[Any]] = diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala index 1b1807c030..522ddd2bb4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/japi/WithVariance.scala @@ -9,7 +9,7 @@ package akka.stream.javadsl.japi /** * A Function interface. Used to create first-class-functions is Java. */ -@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince") +@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! trait Function[-T, +R] { @throws(classOf[Exception]) def apply(param: T): R @@ -18,7 +18,7 @@ trait Function[-T, +R] { /** * A Function interface. Used to create 2-arg first-class-functions is Java. */ -@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince") +@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! trait Function2[-T1, -T2, +R] { @throws(classOf[Exception]) def apply(arg1: T1, arg2: T2): R @@ -27,8 +27,7 @@ trait Function2[-T1, -T2, +R] { /** * A constructor/factory, takes no parameters but creates a new value of type T every call. */ -@SerialVersionUID(1L) -@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince") +@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! trait Creator[+T] extends Serializable { /** * This method must return a different instance upon every call. @@ -40,7 +39,7 @@ trait Creator[+T] extends Serializable { /** * A Procedure is like a Function, but it doesn't produce a return value. */ -@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince") +@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! trait Procedure[-T] { @throws(classOf[Exception]) def apply(param: T): Unit @@ -49,7 +48,7 @@ trait Procedure[-T] { /** * Java API: Defines a criteria and determines whether the parameter meets this criteria. */ -@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince") +@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi! trait Predicate[-T] { def test(param: T): Boolean } diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala index 41e06f1077..5b2654e035 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowGraph.scala @@ -243,75 +243,39 @@ final class Balance[T](val waitForAllDownstreams: Boolean, override val attribut } object Zip { + /** * Create a new `ZipWith` vertex with the specified input types and zipping-function * which creates `Tuple2`s. * * @param attributes optional attributes for this vertex */ - def apply[A, B](attributes: OperationAttributes): ZipWith[A, B, (A, B)] = - new ZipWith(_toTuple.asInstanceOf[(A, B) ⇒ (A, B)], attributes) + def apply[A, B](attributes: OperationAttributes): Zip2With[A, B, (A, B)] = + new Zip2With(_toTuple.asInstanceOf[(A, B) ⇒ (A, B)], attributes) /** * Create a new `ZipWith` vertex with the specified input types and zipping-function * which creates `Tuple2`s. */ - def apply[A, B]: ZipWith[A, B, (A, B)] = apply(OperationAttributes.none) + def apply[A, B]: Zip2With[A, B, (A, B)] = apply(OperationAttributes.none) private[this] final val _toTuple: (Any, Any) ⇒ (Any, Any) = (a, b) ⇒ (a, b) } -object ZipWith { - /** - * Create a new `ZipWith` vertex with the specified input types. - * - * @param f zipping-function from the input values to the output value - * @param attributes optional attributes for this vertex - */ - def apply[A, B, C](f: (A, B) ⇒ C, attributes: OperationAttributes = OperationAttributes.none): ZipWith[A, B, C] = - new ZipWith[A, B, C](f, attributes) +/** INTERNAL API - shared base between 2 inputs ZipWith as well as boilerplate plugin generated ZipWith classes */ +private[akka] abstract class ZipWithBase[C] extends FlowGraphInternal.InternalVertex { - final class Left[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionInPort[A] { - type NextT = C - override private[akka] def port = 0 - override private[akka] def next = vertex.out - } + def attributes: OperationAttributes - final class Right[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionInPort[B] { - type NextT = C - override private[akka] def port = 1 - override private[akka] def next = vertex.out - } + /** MUST be implemented as an FunctionN value */ + def f: Any + require(f.getClass.getName.contains("Function") || f.getClass.getName.contains("anonfun"), + "ZipWiths `f` field MUST be implemented using a FunctionN value, was: " + f.getClass) // TODO remove this check? - final class Out[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionOutPort[C] -} + val out = new ZipWith.Out[C](this) -/** - * Takes two streams and outputs an output stream formed from the two input streams - * by combining corresponding elements using the supplied function. - * If one of the two streams is longer than the other, its remaining elements are ignored. - * - * Note that a junction instance describes exactly one place (vertex) in the `FlowGraph` - * that multiple flows can be attached to; if you want to have multiple independent - * junctions within the same `FlowGraph` then you will have to create multiple such - * instances. - */ -private[akka] final class ZipWith[A, B, C](f: (A, B) ⇒ C, override val attributes: OperationAttributes) - extends FlowGraphInternal.InternalVertex { - - val left = new ZipWith.Left(this) - val right = new ZipWith.Right(this) - val out = new ZipWith.Out(this) - - override def minimumInputCount: Int = 2 - override def maximumInputCount: Int = 2 - override def minimumOutputCount: Int = 1 - override def maximumOutputCount: Int = 1 - - // FIXME cache - private[akka] override def astNode: FanInAstNode = Ast.ZipWith(f.asInstanceOf[(Any, Any) ⇒ Any], zip and attributes) - - private[scaladsl] final override def newInstance() = new ZipWith[A, B, C](f = f, attributes.withoutName) + final override def minimumOutputCount: Int = 1 + final override def maximumOutputCount: Int = 1 } object Unzip { @@ -1345,7 +1309,7 @@ object PartialFlowGraph { * be `run` until those are attached. * * Build a `PartialFlowGraph` by starting with one of the `apply` methods in - * in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]]. + * in [[PartialFlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]]. */ class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex], private[scaladsl] override val cyclesAllowed: Boolean,