=str #16965 use akka-actor provided tuples for gen UnzipWith

This commit is contained in:
Konrad Malawski 2015-07-15 10:55:47 +02:00
parent 50de39e886
commit 911e02bd40
4 changed files with 18 additions and 38 deletions

View file

@ -27,20 +27,20 @@ Akka Streams currently provide these junctions (for a detailed list see :ref:`st
* **Fan-out**
- ``Broadcast[T]`` *(1 input, N outputs)* given an input element emits to each output
- ``Balance[T]`` *(1 input, N outputs)* given an input element emits to one of its output ports
- ``UnzipWith[In,A,B,...]`` *(1 input, N outputs)* takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
- ``UnZip[A,B]`` *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B``
- ``FlexiRoute[In]`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL
- ``Broadcast<T>`` *(1 input, N outputs)* given an input element emits to each output
- ``Balance<T>`` *(1 input, N outputs)* given an input element emits to one of its output ports
- ``UnzipWith<In,A,B,...>`` *(1 input, N outputs)* takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
- ``UnZip<A,B>`` *(1 input, 2 outputs)* splits a stream of ``Pair<A,B>`` tuples into two streams, one of type ``A`` and one of type ``B``
- ``FlexiRoute<In>`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL
* **Fan-in**
- ``Merge[In]`` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output
- ``MergePreferred[In]`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``
- ``ZipWith[A,B,...,Out]`` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element
- ``Zip[A,B]`` *(2 inputs, 1 output)* is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream
- ``Concat[A]`` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)
- ``FlexiMerge[Out]`` *(N inputs, 1 output)* enables writing custom fan-in elements using a simple DSL
- ``Merge<In>`` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output
- ``MergePreferred<In>`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``
- ``ZipWith<A,B,...,Out>`` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element
- ``Zip<A,B>`` *(2 inputs, 1 output)* is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair(A,B)`` tuple stream
- ``Concat<A>`` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)
- ``FlexiMerge<Out>`` *(N inputs, 1 output)* enables writing custom fan-in elements using a simple DSL
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating

View file

@ -7,7 +7,7 @@ import akka.actor.ActorRef;
import akka.dispatch.japi;
import akka.japi.Pair;
import akka.pattern.Patterns;
import akka.japi.Tuple4;
import akka.japi.tuple.Tuple4;
import akka.stream.*;
import akka.stream.javadsl.FlowGraph.Builder;
import akka.stream.stage.*;

View file

@ -1,21 +0,0 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.japi
[3..22#/**
* Used to create tuples with 1 elements in Java.
*/
object Tuple1 {
def create[[#T1#]]([#t1: T1#]) = new Tuple1([#t1#])
}
/**
* Java API Tuple container.
*/
@SerialVersionUID(##1L)
final case class Tuple1[[#T1#]]([#t1: T1#]) extends scala.Product with scala.Serializable {
val toScala = ([#t1#])
}#
]

View file

@ -1,12 +1,13 @@
/**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import akka.stream._
import akka.stream.scaladsl
import akka.japi.function
import akka.japi
import akka.japi.Pair
import akka.japi.tuple._
/**
* Split one stream into several streams using a splitting function.
@ -26,15 +27,15 @@ object UnzipWith {
*
* @param f unzipping-function from the input value to the pair of output values
*/
def create[In, A, B](f: function.Function[In, japi.Pair[A, B]]): Graph[FanOutShape2[In, A, B], Unit] =
scaladsl.UnzipWith[In, A, B]((in: In) => f.apply(in) match { case japi.Pair(a, b) => (a, b) })
def create[In, A, B](f: function.Function[In, Pair[A, B]]): Graph[FanOutShape2[In, A, B], Unit] =
scaladsl.UnzipWith[In, A, B]((in: In) => f.apply(in) match { case Pair(a, b) => (a, b) })
[3..20#/** Create a new `UnzipWith` specialized for 1 outputs.
*
* @param f unzipping-function from the input value to the output values
*/
def create1[In, [#T1#]](f: function.Function[In, japi.Tuple1[[#T1#]]]): Graph[FanOutShape1[In, [#T1#]], Unit] =
def create1[In, [#T1#]](f: function.Function[In, Tuple1[[#T1#]]]): Graph[FanOutShape1[In, [#T1#]], Unit] =
scaladsl.UnzipWith[In, [#T1#]]((in: In) => f.apply(in).toScala)#
]