+str #16464 generalised zipWith for up to 22 params

- removing deprecarted marker from japi, people will have to use it
  currently
This commit is contained in:
Konrad 'ktoso' Malawski 2014-12-06 14:51:40 +01:00
parent 3b1bc67090
commit bc5f3aaa90
11 changed files with 407 additions and 138 deletions

View file

@ -4,8 +4,12 @@
package akka.stream.javadsl;
import akka.actor.ActorRef;
import akka.japi.Pair;
import akka.japi.*;
import akka.stream.StreamTest;
import akka.stream.javadsl.japi.Creator;
import akka.stream.javadsl.japi.Function;
import akka.stream.javadsl.japi.Function2;
import akka.stream.javadsl.japi.Procedure;
import akka.stream.stage.*;
import akka.stream.javadsl.japi.*;
import akka.stream.testkit.AkkaSpec;
@ -16,6 +20,7 @@ import org.junit.Test;
import org.reactivestreams.Publisher;
import scala.concurrent.Await;
import scala.concurrent.Future;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;
import scala.runtime.BoxedUnit;
@ -99,7 +104,7 @@ public class FlowGraphTest extends StreamTest {
final Source<String> in1 = Source.from(input1);
final Source<Integer> in2 = Source.from(input2);
final ZipWith<String, Integer, Pair<String,Integer>> zip = Zip.create();
final Zip2With<String, Integer, Pair<String,Integer>> zip = Zip.create();
final KeyedSink<Pair<String, Integer>, Future<BoxedUnit>> out = Sink
.foreach(new Procedure<Pair<String, Integer>>() {
@Override
@ -124,7 +129,7 @@ public class FlowGraphTest extends StreamTest {
@SuppressWarnings("unchecked")
final List<Pair<String, Integer>> input = Arrays.asList(new Pair<String, Integer>("A", 1),
new Pair<String, Integer>("B", 2), new Pair<String, Integer>("C", 3));
new Pair<String, Integer>("B", 2), new Pair<String, Integer>("C", 3));
final Iterable<String> expected1 = Arrays.asList("A", "B", "C");
final Iterable<Integer> expected2 = Arrays.asList(1, 2, 3);
@ -154,4 +159,59 @@ public class FlowGraphTest extends StreamTest {
assertEquals(expected2, output2);
}
@Test
public void mustBeAbleToUseZipWith() throws Exception {
final Source<Integer> in1 = Source.single(1);
final Source<Integer> in2 = Source.single(10);
final Zip2With<Integer, Integer, Integer> sumZip = ZipWith.create(
new Function2<Integer, Integer, Integer>() {
@Override public Integer apply(Integer l, Integer r) throws Exception {
return l + r;
}
});
final KeyedSink<Integer, Future<Integer>> out = Sink.head();
MaterializedMap mat = FlowGraph.builder()
.addEdge(in1, sumZip.left())
.addEdge(in2, sumZip.right())
.addEdge(sumZip.out(), out)
.run(materializer);
final Integer result = Await.result(mat.get(out), Duration.create(300, TimeUnit.MILLISECONDS));
assertEquals(11, (int) result);
}
@Test
public void mustBeAbleToUseZip4With() throws Exception {
final Source<Integer> in1 = Source.single(1);
final Source<Integer> in2 = Source.single(10);
final Source<Integer> in3 = Source.single(100);
final Source<Integer> in4 = Source.single(1000);
Function<ZipWith.Zip4WithInputs<Integer, Integer, Integer, Integer>, Integer> sum4 = new Function<ZipWith.Zip4WithInputs<Integer, Integer, Integer, Integer>, Integer>() {
@Override
public Integer apply(ZipWith.Zip4WithInputs<Integer, Integer, Integer, Integer> inputs) throws Exception {
return inputs.t1() + inputs.t2() + inputs.t3() + inputs.t4();
}
};
Zip4With<Integer, Integer, Integer, Integer, Integer> sum4Zip = ZipWith.create(sum4);
final KeyedSink<Integer, Future<Integer>> out = Sink.head();
MaterializedMap mat = FlowGraph.builder()
.addEdge(in1, sum4Zip.input1())
.addEdge(in2, sum4Zip.input2())
.addEdge(in3, sum4Zip.input3())
.addEdge(in4, sum4Zip.input4())
.addEdge(sum4Zip.out(), out)
.run(materializer);
final Integer result = Await.result(mat.get(out), Duration.create(300, TimeUnit.MILLISECONDS));
assertEquals(1111, (int) result);
}
}

View file

@ -278,7 +278,7 @@ public class FlowTest extends StreamTest {
final Source<String> in1 = Source.from(input1);
final Source<Integer> in2 = Source.from(input2);
final ZipWith<String, Integer, Pair<String, Integer>> zip = Zip.create();
final Zip2With<String, Integer, Pair<String, Integer>> zip = Zip.create();
final KeyedSink<Pair<String, Integer>, Future<BoxedUnit>> out = Sink
.foreach(new Procedure<Pair<String, Integer>>() {
@Override

View file

@ -99,6 +99,81 @@ class GraphZipWithSpec extends TwoStreamsSetup {
val subscription2 = subscriber2.expectErrorOrSubscriptionFollowedByError(TestException)
}
"zipWith a ETA expanded Person.apply (3 inputs)" in {
val probe = StreamTestKit.SubscriberProbe[Person]()
case class Person(name: String, surname: String, int: Int)
FlowGraph { implicit b
val zip = ZipWith(Person.apply _)
Source.single("Caplin") ~> zip.input1
Source.single("Capybara") ~> zip.input2
Source.single(3) ~> zip.input3
zip.out ~> Sink(probe)
}.run()
val subscription = probe.expectSubscription()
subscription.request(5)
probe.expectNext(Person("Caplin", "Capybara", 3))
probe.expectComplete()
}
"work with up to 22 inputs" in {
val probe = StreamTestKit.SubscriberProbe[String]()
FlowGraph { implicit b
val sum22 = (v1: Int, v2: String, v3: Int, v4: String, v5: Int, v6: String, v7: Int, v8: String, v9: Int, v10: String,
v11: Int, v12: String, v13: Int, v14: String, v15: Int, v16: String, v17: Int, v18: String, v19: Int,
v20: String, v21: Int, v22: String)
v1 + v2 + v3 + v4 + v5 + v6 + v7 + v8 + v9 + v10 +
v11 + v12 + v13 + v14 + v15 + v16 + v17 + v18 + v19 + v20 +
v21 + v22
// odd input ports will be Int, even input ports will be String
val zip = ZipWith(sum22)
val one = Source.single(1)
one ~> zip.input1
one.map(_.toString) ~> zip.input2
one ~> zip.input3
one.map(_.toString) ~> zip.input4
one ~> zip.input5
one.map(_.toString) ~> zip.input6
one ~> zip.input7
one.map(_.toString) ~> zip.input8
one ~> zip.input9
one.map(_.toString) ~> zip.input10
one ~> zip.input11
one.map(_.toString) ~> zip.input12
one ~> zip.input13
one.map(_.toString) ~> zip.input14
one ~> zip.input15
one.map(_.toString) ~> zip.input16
one ~> zip.input17
one.map(_.toString) ~> zip.input18
one ~> zip.input19
one.map(_.toString) ~> zip.input20
one ~> zip.input21
one.map(_.toString) ~> zip.input22
zip.out ~> Sink(probe)
}.run()
val subscription = probe.expectSubscription()
subscription.request(1)
probe.expectNext("1" * 22)
probe.expectComplete()
}
}
}

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.actor.Props
import akka.stream.MaterializerSettings
/**
* INTERNAL API
*/
private[akka] object ZipWith {
/** @param f MUST be a FunctionN type. */
def props(settings: MaterializerSettings, f: Any): Props = f match {
[2..#case f1: Function1[[#Any#], Any] => Props(new Zip1With(settings, f1))#
]
}
[2..#def props(settings: MaterializerSettings, f: Function1[[#Any#], Any]): Props =
Props(new Zip1With(settings, f))#
]
}
[2..#/** INTERNAL API */
private[akka] final class Zip1With(_settings: MaterializerSettings, f: Function1[[#Any#], Any]) extends FanIn(_settings, inputPorts = 1) {
inputBunch.markAllInputs()
nextPhase(TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { () ⇒
val elem##0 = inputBunch.dequeue(##0)
[2..#val elem0 = inputBunch.dequeue(0)#
]
primaryOutputs.enqueueOutputElement(f([#elem0#]))
})
}#
]

View file

@ -0,0 +1,73 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import akka.stream.scaladsl
import akka.stream.javadsl.japi
object ZipWith {
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function `f`.
*
* @param f zipping-function from the input values to the output value
* @param attributes optional attributes for this vertex
*/
def create[A, B, Out](f: japi.Function2[A, B, Out], attributes: OperationAttributes): Zip2With[A, B, Out] =
new Zip2With(new scaladsl.Zip2With[A, B, Out](f.apply _, attributes.asScala))
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function `f`.
*
* @param f zipping-function from the input values to the output value
* Creates a new named `ZipWith` vertex with the specified input types and zipping-function `f`.
* Note that a `ZipWith` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def create[A, B, Out](f: japi.Function2[A, B, Out]): Zip2With[A, B, Out] =
create(f, OperationAttributes.none)
[3..#/** Create a new `ZipWith` specialized for 1 input streams. */
def create[[#T1#], Out](f: japi.Function[Zip1WithInputs[[#T1#]], Out]) =
new Zip1With(new scaladsl.Zip1With[[#T1#], Out](([#t1#]) ⇒ f.apply(new Zip1WithInputs[[#T1#]]([#t1#])), scaladsl.OperationAttributes.none))#
]
// CLASS BOILERPLATE
sealed trait ZipWithInputs
[2..#final class Zip1WithInputs[[#T1#]]([#val t1: T1#]) extends ZipWithInputs#
]
[#final class Input1[T1, Out] private[akka] (val asScala: scaladsl.ZipWith.Input1[T1, Out]) extends JunctionInPort[T1]#
]
final class Left[A, B, Out](override val asScala: scaladsl.ZipWith.Left[A, B, Out]) extends JunctionInPort[A]
final class Right[A, B, Out](override val asScala: scaladsl.ZipWith.Right[A, B, Out]) extends JunctionInPort[B]
final class Out[Out](override val asScala: scaladsl.ZipWith.Out[Out]) extends JunctionOutPort[Out]
}
/**
* Takes two streams and outputs an output stream formed from the two input streams
* by combining corresponding elements in pairs. If one of the two streams is
* longer than the other, its remaining elements are ignored.
*/
final class Zip2With[A, B, Out] private[akka] (val asScala: scaladsl.Zip2With[A, B, Out]) {
val left = new ZipWith.Left[A, B, Out](asScala.left)
val right = new ZipWith.Right[A, B, Out](asScala.right)
val out = new ZipWith.Out[Out](asScala.out)
}
[3..#/**
* Takes multiple streams and outputs an output stream formed from the two input streams
* by combining corresponding elements in pairs. If one of the two streams is
* longer than the other, its remaining elements are ignored.
*/
final class Zip1With[[#T1#], Out] private[akka] (val asScala: scaladsl.Zip1With[[#T1#], Out]) {
val out = new ZipWith.Out[Out](asScala.out)
[#val input1 = new ZipWith.Input1[T1, Out](asScala.input1)#
]
}#
]

View file

@ -0,0 +1,79 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream.impl.Ast.FanInAstNode
import akka.stream.impl.Ast
import akka.stream.impl.Ast.Defaults._
object ZipWith {
[2..#/**
* Create a new anonymous `ZipWith` vertex with 1 specified input types.
* Note that a `ZipWith` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def apply[[#T1#], C](f: Function1[[#T1#], C]): Zip1With[[#T1#], C] =
new Zip1With[[#T1#], C](f, OperationAttributes.none)#
]
final class Left[A, B, C] private[akka] (private[akka] val vertex: Zip2With[A, B, C]) extends JunctionInPort[A] {
type NextT = C
override private[akka] def port = 0
override private[akka] def next = vertex.out
}
final class Right[A, B, C] private[akka] (private[akka] val vertex: Zip2With[A, B, C]) extends JunctionInPort[B] {
type NextT = C
override private[akka] def port = 1
override private[akka] def next = vertex.out
}
[#final class Input1[T1, C] private[akka] (private[akka] val vertex: ZipWithBase[C]) extends JunctionInPort[T1] {
type NextT = C
override private[akka] def port = 1
override private[akka] def next = vertex.out
}#
]
final class Out[C] private[akka] (private[akka] val vertex: ZipWithBase[C]) extends JunctionOutPort[C]
}
/**
* Takes two streams and outputs an output stream formed from the two input streams
* by combining corresponding elements using the supplied function.
* If one of the two streams is longer than the other, its remaining elements are ignored.
*/
private[akka] final class Zip2With[A, B, C](override val f: (A, B) ⇒ C, override val attributes: OperationAttributes) extends ZipWithBase[C] {
val left = new ZipWith.Left(this)
val right = new ZipWith.Right(this)
override def minimumInputCount: Int = 2
override def maximumInputCount: Int = 2
// FIXME cache
private[akka] override def astNode: FanInAstNode = Ast.Zip2With(f.asInstanceOf[(Any, Any) ⇒ Any], zip and attributes)
private[scaladsl] final override def newInstance() = new Zip2With[A, B, C](f = f, attributes.withoutName)
}
[3..#/**
* Takes 1 streams and outputs an output stream formed from the two input streams
* by combining corresponding elements using the supplied function.
* If one of the two streams is longer than the other, its remaining elements are ignored.
*/
private[akka] final class Zip1With[[#T1#], C](override val f: Function1[[#T1#], C], override val attributes: OperationAttributes) extends ZipWithBase[C] {
[#val input1 = new ZipWith.Input1[T1, C](this)#
]
override def minimumInputCount: Int = 1
override def maximumInputCount: Int = 1
// FIXME cache
private[akka] override def astNode: FanInAstNode = Ast.Zip1With(f.asInstanceOf[Function1[[#T1#], Any]], zip and attributes)
private[scaladsl] final override def newInstance() = new Zip1With[[#T1#], C](f, attributes.withoutName)
}#
]

View file

@ -175,7 +175,35 @@ private[akka] object Ast {
sealed trait FanInAstNode extends JunctionAstNode
sealed trait FanOutAstNode extends JunctionAstNode
final case class ZipWith(f: (Any, Any) Any, attributes: OperationAttributes) extends FanInAstNode
/**
* INTERNAL API
* `f` MUST be implemented as value of type `scala.FunctionN`
*/
sealed trait ZipWith extends FanInAstNode {
/** MUST be implemented as type of FunctionN */
def f: Any
}
final case class Zip2With[T1, T2](f: Function2[T1, T2, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip3With[T1, T2, T3](f: Function3[T1, T2, T3, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip4With[T1, T2, T3, T4](f: Function4[T1, T2, T3, T4, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip5With[T1, T2, T3, T4, T5](f: Function5[T1, T2, T3, T4, T5, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip6With[T1, T2, T3, T4, T5, T6](f: Function6[T1, T2, T3, T4, T5, T6, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip7With[T1, T2, T3, T4, T5, T6, T7](f: Function7[T1, T2, T3, T4, T5, T6, T7, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip8With[T1, T2, T3, T4, T5, T6, T7, T8](f: Function8[T1, T2, T3, T4, T5, T6, T7, T8, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip9With[T1, T2, T3, T4, T5, T6, T7, T8, T9](f: Function9[T1, T2, T3, T4, T5, T6, T7, T8, T9, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip10With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10](f: Function10[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip11With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11](f: Function11[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip12With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12](f: Function12[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip13With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13](f: Function13[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip14With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14](f: Function14[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip15With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15](f: Function15[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip16With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16](f: Function16[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip17With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17](f: Function17[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip18With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18](f: Function18[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip19With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19](f: Function19[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip20With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20](f: Function20[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip21With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21](f: Function21[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, Any], attributes: OperationAttributes) extends ZipWith
final case class Zip22With[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22](f: Function22[T1, T2, T3, T4, T5, T6, T7, T8, T9, T10, T11, T12, T13, T14, T15, T16, T17, T18, T19, T20, T21, T22, Any], attributes: OperationAttributes) extends ZipWith
// FIXME Why do we need this?
case class IdentityAstNode(attributes: OperationAttributes) extends JunctionAstNode
@ -457,7 +485,7 @@ case class ActorBasedFlowMaterializer(override val settings: MaterializerSetting
val props = fanin match {
case Ast.Merge(_) FairMerge.props(transformedSettings, inputCount)
case Ast.MergePreferred(_) UnfairMerge.props(transformedSettings, inputCount)
case Ast.ZipWith(f, _) ZipWith.props(transformedSettings, f)
case z: Ast.ZipWith ZipWith.props(transformedSettings, z.f)
case Ast.Concat(_) Concat.props(transformedSettings)
case Ast.FlexiMergeNode(merger, _) FlexiMergeImpl.props(transformedSettings, inputCount, merger.createMergeLogic())
}

View file

@ -278,26 +278,6 @@ private[akka] final class UnfairMerge(_settings: MaterializerSettings,
})
}
/**
* INTERNAL API
*/
private[akka] object ZipWith {
def props(settings: MaterializerSettings, f: (Any, Any) Any): Props = Props(new ZipWith(settings, f))
}
/**
* INTERNAL API
*/
private[akka] final class ZipWith(_settings: MaterializerSettings, f: (Any, Any) Any) extends FanIn(_settings, inputPorts = 2) {
inputBunch.markAllInputs()
nextPhase(TransferPhase(inputBunch.AllOfMarkedInputs && primaryOutputs.NeedsDemand) { ()
val elem0 = inputBunch.dequeue(0)
val elem1 = inputBunch.dequeue(1)
primaryOutputs.enqueueOutputElement(f(elem0, elem1))
})
}
/**
* INTERNAL API
*/

View file

@ -5,9 +5,6 @@ package akka.stream.javadsl
import akka.stream._
import akka.stream.scaladsl
import akka.stream.impl.Ast
import akka.stream._
trait JunctionInPort[-T] {
/** Convert this element to it's `scaladsl` equivalent. */
@ -219,73 +216,41 @@ class Balance[T](delegate: scaladsl.Balance[T]) extends javadsl.Junction[T] {
}
object Zip {
import akka.japi.{ Pair, Function2 }
import akka.stream.javadsl.japi.Function2
import akka.japi.Pair
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function
* Create a new anonymous `Zip2With` vertex with the specified input types and zipping-function
* which creates `akka.japi.Pair`s.
*
* Note that a `ZipWith` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
* @param attributes optional attributes for this vertex
*/
def create[A, B](attributes: OperationAttributes): ZipWith[A, B, A Pair B] =
def create[A, B](attributes: OperationAttributes): Zip2With[A, B, A Pair B] =
ZipWith.create(_toPair.asInstanceOf[Function2[A, B, A Pair B]], attributes)
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function
* which creates `akka.japi.Pair`s.
*/
def create[A, B]: ZipWith[A, B, A Pair B] = create(OperationAttributes.none)
def create[A, B]: Zip2With[A, B, A Pair B] = create(OperationAttributes.none)
private[this] final val _toPair: Function2[Any, Any, Any Pair Any] =
new Function2[Any, Any, Any Pair Any] { override def apply(a: Any, b: Any): Any Pair Any = new Pair(a, b) }
}
object ZipWith {
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function `f`.
*
* @param f zipping-function from the input values to the output value
* @param attributes optional attributes for this vertex
*/
def create[A, B, C](f: akka.japi.Function2[A, B, C], attributes: OperationAttributes): ZipWith[A, B, C] =
new ZipWith(new scaladsl.ZipWith[A, B, C](f.apply _, attributes.asScala))
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function `f`.
*
* @param f zipping-function from the input values to the output value
*/
def create[A, B, C](f: akka.japi.Function2[A, B, C]): ZipWith[A, B, C] =
create(f = f, OperationAttributes.none)
final class Left[A, B, C](override val asScala: scaladsl.ZipWith.Left[A, B, C]) extends JunctionInPort[A]
final class Right[A, B, C](override val asScala: scaladsl.ZipWith.Right[A, B, C]) extends JunctionInPort[B]
final class Out[A, B, C](override val asScala: scaladsl.ZipWith.Out[A, B, C]) extends JunctionOutPort[C]
}
/**
* Takes two streams and outputs an output stream formed from the two input streams
* by combining corresponding elements in pairs. If one of the two streams is
* longer than the other, its remaining elements are ignored.
*/
final class ZipWith[A, B, C] private (val asScala: scaladsl.ZipWith[A, B, C]) {
val left = new ZipWith.Left[A, B, C](asScala.left)
val right = new ZipWith.Right[A, B, C](asScala.right)
val out = new ZipWith.Out[A, B, C](asScala.out)
}
object Unzip {
/**
* Creates a new `Unzip` vertex with the specified output types and attributes.
*
* @param attributes optional attributes for this vertex
* @param attributes attributes for this vertex
*/
def create[A, B](attributes: OperationAttributes): Unzip[A, B] =
new Unzip[A, B](new scaladsl.Unzip[A, B](attributes.asScala))
/**
* Creates a new `Unzip` vertex with the specified output types.
* Creates a new `Unzip` vertex with the specified output types and attributes.
*/
def create[A, B](): Unzip[A, B] = create(OperationAttributes.none)
@ -335,21 +300,30 @@ final class Unzip[A, B] private (delegate: scaladsl.Unzip[A, B]) {
object Concat {
/**
* Create a new `Concat` vertex with the specified input types and attributes.
*
* @param attributes optional attributes for this vertex
* Create a new anonymous `Concat` vertex with the specified input types.
* Note that a `Concat` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def create[T](attributes: OperationAttributes): Concat[T] = new Concat(scaladsl.Concat[T])
def create[T](): Concat[T] =
create(OperationAttributes.none)
/**
* Create a new `Concat` vertex with the specified input types.
* Create a new anonymous `Concat` vertex with the specified input types.
* Note that a `Concat` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def create[T](): Concat[T] = create(OperationAttributes.none)
def create[T](attributes: OperationAttributes): Concat[T] =
new Concat(scaladsl.Concat[T](attributes.asScala))
/**
* Create a new `Concat` vertex with the specified input types.
* Create a new anonymous `Concat` vertex with the specified input types.
* Note that a `Concat` instance can only be used at one place (one vertex)
* in the `FlowGraph`. This method creates a new instance every time it
* is called and those instances are not `equal`.
*/
def create[T](clazz: Class[T]): Concat[T] = create()
def create[T](clazz: Class[T], attributes: OperationAttributes): Concat[T] = create(attributes)
class First[T] private[akka] (delegate: scaladsl.Concat.First[T]) extends JunctionInPort[T] {
override def asScala: scaladsl.JunctionInPort[T] = delegate
@ -449,7 +423,6 @@ object FlowGraph {
* Builder of [[FlowGraph]] and [[PartialFlowGraph]].
*/
class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) {
import akka.stream.scaladsl.JavaConverters._
def this() {
this(new scaladsl.FlowGraphBuilder())
@ -571,9 +544,10 @@ class FlowGraphBuilder(b: scaladsl.FlowGraphBuilder) {
object PartialFlowGraphBuilder extends FlowGraphBuilder
class PartialFlowGraph(delegate: scaladsl.PartialFlowGraph) {
import collection.JavaConverters._
import akka.stream.scaladsl.JavaConverters._
import collection.JavaConverters._
def asScala: scaladsl.PartialFlowGraph = delegate
def undefinedSources(): java.util.Set[UndefinedSource[Any]] =

View file

@ -9,7 +9,7 @@ package akka.stream.javadsl.japi
/**
* A Function interface. Used to create first-class-functions is Java.
*/
@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince")
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
trait Function[-T, +R] {
@throws(classOf[Exception])
def apply(param: T): R
@ -18,7 +18,7 @@ trait Function[-T, +R] {
/**
* A Function interface. Used to create 2-arg first-class-functions is Java.
*/
@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince")
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
trait Function2[-T1, -T2, +R] {
@throws(classOf[Exception])
def apply(arg1: T1, arg2: T2): R
@ -27,8 +27,7 @@ trait Function2[-T1, -T2, +R] {
/**
* A constructor/factory, takes no parameters but creates a new value of type T every call.
*/
@SerialVersionUID(1L)
@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince")
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
trait Creator[+T] extends Serializable {
/**
* This method must return a different instance upon every call.
@ -40,7 +39,7 @@ trait Creator[+T] extends Serializable {
/**
* A Procedure is like a Function, but it doesn't produce a return value.
*/
@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince")
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
trait Procedure[-T] {
@throws(classOf[Exception])
def apply(param: T): Unit
@ -49,7 +48,7 @@ trait Procedure[-T] {
/**
* Java API: Defines a criteria and determines whether the parameter meets this criteria.
*/
@deprecated("add variance to akka.japi and remove this akka.stream.japi!", since = "eversince")
@SerialVersionUID(1L) // FIXME: add variance to akka.japi and remove this akka.stream.japi!
trait Predicate[-T] {
def test(param: T): Boolean
}

View file

@ -243,75 +243,39 @@ final class Balance[T](val waitForAllDownstreams: Boolean, override val attribut
}
object Zip {
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function
* which creates `Tuple2`s.
*
* @param attributes optional attributes for this vertex
*/
def apply[A, B](attributes: OperationAttributes): ZipWith[A, B, (A, B)] =
new ZipWith(_toTuple.asInstanceOf[(A, B) (A, B)], attributes)
def apply[A, B](attributes: OperationAttributes): Zip2With[A, B, (A, B)] =
new Zip2With(_toTuple.asInstanceOf[(A, B) (A, B)], attributes)
/**
* Create a new `ZipWith` vertex with the specified input types and zipping-function
* which creates `Tuple2`s.
*/
def apply[A, B]: ZipWith[A, B, (A, B)] = apply(OperationAttributes.none)
def apply[A, B]: Zip2With[A, B, (A, B)] = apply(OperationAttributes.none)
private[this] final val _toTuple: (Any, Any) (Any, Any) = (a, b) (a, b)
}
object ZipWith {
/**
* Create a new `ZipWith` vertex with the specified input types.
*
* @param f zipping-function from the input values to the output value
* @param attributes optional attributes for this vertex
*/
def apply[A, B, C](f: (A, B) C, attributes: OperationAttributes = OperationAttributes.none): ZipWith[A, B, C] =
new ZipWith[A, B, C](f, attributes)
/** INTERNAL API - shared base between 2 inputs ZipWith as well as boilerplate plugin generated ZipWith classes */
private[akka] abstract class ZipWithBase[C] extends FlowGraphInternal.InternalVertex {
final class Left[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionInPort[A] {
type NextT = C
override private[akka] def port = 0
override private[akka] def next = vertex.out
}
def attributes: OperationAttributes
final class Right[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionInPort[B] {
type NextT = C
override private[akka] def port = 1
override private[akka] def next = vertex.out
}
/** MUST be implemented as an FunctionN value */
def f: Any
require(f.getClass.getName.contains("Function") || f.getClass.getName.contains("anonfun"),
"ZipWiths `f` field MUST be implemented using a FunctionN value, was: " + f.getClass) // TODO remove this check?
final class Out[A, B, C] private[akka] (private[akka] val vertex: ZipWith[A, B, C]) extends JunctionOutPort[C]
}
val out = new ZipWith.Out[C](this)
/**
* Takes two streams and outputs an output stream formed from the two input streams
* by combining corresponding elements using the supplied function.
* If one of the two streams is longer than the other, its remaining elements are ignored.
*
* Note that a junction instance describes exactly one place (vertex) in the `FlowGraph`
* that multiple flows can be attached to; if you want to have multiple independent
* junctions within the same `FlowGraph` then you will have to create multiple such
* instances.
*/
private[akka] final class ZipWith[A, B, C](f: (A, B) C, override val attributes: OperationAttributes)
extends FlowGraphInternal.InternalVertex {
val left = new ZipWith.Left(this)
val right = new ZipWith.Right(this)
val out = new ZipWith.Out(this)
override def minimumInputCount: Int = 2
override def maximumInputCount: Int = 2
override def minimumOutputCount: Int = 1
override def maximumOutputCount: Int = 1
// FIXME cache
private[akka] override def astNode: FanInAstNode = Ast.ZipWith(f.asInstanceOf[(Any, Any) Any], zip and attributes)
private[scaladsl] final override def newInstance() = new ZipWith[A, B, C](f = f, attributes.withoutName)
final override def minimumOutputCount: Int = 1
final override def maximumOutputCount: Int = 1
}
object Unzip {
@ -1345,7 +1309,7 @@ object PartialFlowGraph {
* be `run` until those are attached.
*
* Build a `PartialFlowGraph` by starting with one of the `apply` methods in
* in [[FlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]].
* in [[PartialFlowGraph$ companion object]]. Syntactic sugar is provided by [[FlowGraphImplicits]].
*/
class PartialFlowGraph private[akka] (private[akka] val graph: DirectedGraphBuilder[FlowGraphInternal.EdgeLabel, FlowGraphInternal.Vertex],
private[scaladsl] override val cyclesAllowed: Boolean,