Merge pull request #17998 from ktoso/agolubev-agolubev-#16965-UnzipWith-2.3-dev-based

Agolubev #16965 unzip with 2.3 dev based
This commit is contained in:
Konrad Malawski 2015-07-15 11:41:21 +02:00
commit 2c8919ce3f
14 changed files with 556 additions and 41 deletions

View file

@ -27,19 +27,20 @@ Akka Streams currently provide these junctions (for a detailed list see :ref:`st
* **Fan-out** * **Fan-out**
- ``Broadcast[T]`` *(1 input, N outputs)* given an input element emits to each output - ``Broadcast<T>`` *(1 input, N outputs)* given an input element emits to each output
- ``Balance[T]`` *(1 input, N outputs)* given an input element emits to one of its output ports - ``Balance<T>`` *(1 input, N outputs)* given an input element emits to one of its output ports
- ``UnZip[A,B]`` *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B`` - ``UnzipWith<In,A,B,...>`` *(1 input, N outputs)* takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
- ``FlexiRoute[In]`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL - ``UnZip<A,B>`` *(1 input, 2 outputs)* splits a stream of ``Pair<A,B>`` tuples into two streams, one of type ``A`` and one of type ``B``
- ``FlexiRoute<In>`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL
* **Fan-in** * **Fan-in**
- ``Merge[In]`` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output - ``Merge<In>`` *(N inputs , 1 output)* picks randomly from inputs pushing them one by one to its output
- ``MergePreferred[In]`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others`` - ``MergePreferred<In>`` like :class:`Merge` but if elements are available on ``preferred`` port, it picks from it, otherwise randomly from ``others``
- ``ZipWith[A,B,...,Out]`` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element - ``ZipWith<A,B,...,Out>`` *(N inputs, 1 output)* which takes a function of N inputs that given a value for each input emits 1 output element
- ``Zip[A,B]`` *(2 inputs, 1 output)* is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into an ``(A,B)`` tuple stream - ``Zip<A,B>`` *(2 inputs, 1 output)* is a :class:`ZipWith` specialised to zipping input streams of ``A`` and ``B`` into a ``Pair(A,B)`` tuple stream
- ``Concat[A]`` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one) - ``Concat<A>`` *(2 inputs, 1 output)* concatenates two streams (first consume one, then the second one)
- ``FlexiMerge[Out]`` *(N inputs, 1 output)* enables writing custom fan-in elements using a simple DSL - ``FlexiMerge<Out>`` *(N inputs, 1 output)* enables writing custom fan-in elements using a simple DSL
One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is One of the goals of the FlowGraph DSL is to look similar to how one would draw a graph on a whiteboard, so that it is
simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating simple to translate a design from whiteboard to code and be able to relate those two. Let's illustrate this by translating

View file

@ -30,6 +30,7 @@ Akka Streams currently provide these junctions (for a detailed list see :ref:`st
- ``Broadcast[T]`` *(1 input, N outputs)* given an input element emits to each output - ``Broadcast[T]`` *(1 input, N outputs)* given an input element emits to each output
- ``Balance[T]`` *(1 input, N outputs)* given an input element emits to one of its output ports - ``Balance[T]`` *(1 input, N outputs)* given an input element emits to one of its output ports
- ``UnzipWith[In,A,B,...]`` *(1 input, N outputs)* takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
- ``UnZip[A,B]`` *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B`` - ``UnZip[A,B]`` *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B``
- ``FlexiRoute[In]`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL - ``FlexiRoute[In]`` *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL

View file

@ -139,6 +139,7 @@ route the elements between different outputs, or emit elements on multiple outpu
Stage Emits when Backpressures when Completes when Stage Emits when Backpressures when Completes when
===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== ===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
unzip all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes unzip all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
unzipWith all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
broadcast all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes broadcast all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes
===================== ========================================================================================================================= ============================================================================================================================== ===================================================================================== ===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================

View file

@ -4,8 +4,10 @@
package akka.stream.javadsl; package akka.stream.javadsl;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.dispatch.japi;
import akka.japi.Pair; import akka.japi.Pair;
import akka.pattern.Patterns; import akka.pattern.Patterns;
import akka.japi.tuple.Tuple4;
import akka.stream.*; import akka.stream.*;
import akka.stream.javadsl.FlowGraph.Builder; import akka.stream.javadsl.FlowGraph.Builder;
import akka.stream.stage.*; import akka.stream.stage.*;
@ -97,13 +99,7 @@ public class FlowGraphTest extends StreamTest {
final Source<String, BoxedUnit> in1 = Source.from(input1); final Source<String, BoxedUnit> in1 = Source.from(input1);
final Source<Integer, BoxedUnit> in2 = Source.from(input2); final Source<Integer, BoxedUnit> in2 = Source.from(input2);
final FanInShape2<String, Integer, Pair<String,Integer>> zip = b.graph(Zip.<String, Integer>create()); final FanInShape2<String, Integer, Pair<String,Integer>> zip = b.graph(Zip.<String, Integer>create());
final Sink<Pair<String, Integer>, Future<BoxedUnit>> out = Sink final Sink<Pair<String, Integer>, BoxedUnit> out = createSink(probe);
.foreach(new Procedure<Pair<String, Integer>>() {
@Override
public void apply(Pair<String, Integer> param) throws Exception {
probe.getRef().tell(param, ActorRef.noSender());
}
});
b.edge(b.source(in1), zip.in0()); b.edge(b.source(in1), zip.in0());
b.edge(b.source(in2), zip.in1()); b.edge(b.source(in2), zip.in1());
@ -133,19 +129,9 @@ public class FlowGraphTest extends StreamTest {
final Outlet<Pair<String, Integer>> in = b.source(Source.from(input)); final Outlet<Pair<String, Integer>> in = b.source(Source.from(input));
final FanOutShape2<Pair<String, Integer>, String, Integer> unzip = b.graph(Unzip.<String, Integer>create()); final FanOutShape2<Pair<String, Integer>, String, Integer> unzip = b.graph(Unzip.<String, Integer>create());
final Sink<String, Future<BoxedUnit>> out1 = Sink.foreach(new Procedure<String>() { final Sink<String, BoxedUnit> out1 = createSink(probe1);
@Override final Sink<Integer, BoxedUnit> out2 = createSink(probe2);
public void apply(String param) throws Exception {
probe1.getRef().tell(param, ActorRef.noSender());
}
});
final Sink<Integer, Future<BoxedUnit>> out2 = Sink.foreach(new Procedure<Integer>() {
@Override
public void apply(Integer param) throws Exception {
probe2.getRef().tell(param, ActorRef.noSender());
}
});
b.edge(in, unzip.in()); b.edge(in, unzip.in());
b.edge(unzip.out0(), b.sink(out1)); b.edge(unzip.out0(), b.sink(out1));
b.edge(unzip.out1(), b.sink(out2)); b.edge(unzip.out1(), b.sink(out2));
@ -157,6 +143,87 @@ public class FlowGraphTest extends StreamTest {
assertEquals(expected2, output2); assertEquals(expected2, output2);
} }
private static <T> Sink<T, BoxedUnit> createSink(final JavaTestKit probe){
return Sink.actorRef(probe.getRef(), "onComplete");
}
@Test
public void mustBeAbleToUseUnzipWith() throws Exception {
final JavaTestKit probe1 = new JavaTestKit(system);
final JavaTestKit probe2 = new JavaTestKit(system);
final Builder<BoxedUnit> b = FlowGraph.builder();
final Source<Integer, BoxedUnit> in = Source.single(1);
final FanOutShape2<Integer, String, Integer> unzip = b.graph(UnzipWith.create(
new Function<Integer, Pair<String, Integer>>() {
@Override public Pair<String, Integer> apply(Integer l) throws Exception {
return new Pair<String, Integer>(l + "!", l);
}
})
);
final Sink<String, BoxedUnit> out1 = createSink(probe1);
final Sink<Integer, BoxedUnit> out2 = createSink(probe2);
b.edge(b.source(in), unzip.in());
b.edge(unzip.out0(), b.sink(out1));
b.edge(unzip.out1(), b.sink(out2));
b.run(materializer);
Duration d = Duration.create(300, TimeUnit.MILLISECONDS);
Object output1 = probe1.receiveOne(d);
Object output2 = probe2.receiveOne(d);
assertEquals("1!", output1);
assertEquals(1, output2);
}
@Test
public void mustBeAbleToUseUnzip4With() throws Exception {
final JavaTestKit probe1 = new JavaTestKit(system);
final JavaTestKit probe2 = new JavaTestKit(system);
final JavaTestKit probe3 = new JavaTestKit(system);
final JavaTestKit probe4 = new JavaTestKit(system);
final Builder<BoxedUnit> b = FlowGraph.builder();
final Source<Integer, BoxedUnit> in = Source.single(1);
final FanOutShape4<Integer, String, Integer, String, Integer> unzip = b.graph(UnzipWith.create4(
new Function<Integer, Tuple4<String, Integer, String, Integer>>() {
@Override public Tuple4<String, Integer, String, Integer> apply(Integer l) throws Exception {
return new Tuple4<String, Integer, String, Integer>(l.toString(), l, l + "+" + l, l + l);
}
})
);
final Sink<String, BoxedUnit> out1 = createSink(probe1);
final Sink<Integer, BoxedUnit> out2 = createSink(probe2);
final Sink<String, BoxedUnit> out3 = createSink(probe3);
final Sink<Integer, BoxedUnit> out4 = createSink(probe4);
b.edge(b.source(in), unzip.in());
b.edge(unzip.out0(), b.sink(out1));
b.edge(unzip.out1(), b.sink(out2));
b.edge(unzip.out2(), b.sink(out3));
b.edge(unzip.out3(), b.sink(out4));
b.run(materializer);
Duration d = Duration.create(300, TimeUnit.MILLISECONDS);
Object output1 = probe1.receiveOne(d);
Object output2 = probe2.receiveOne(d);
Object output3 = probe3.receiveOne(d);
Object output4 = probe4.receiveOne(d);
assertEquals("1", output1);
assertEquals(1, output2);
assertEquals("1+1", output3);
assertEquals(2, output4);
}
@Test @Test
public void mustBeAbleToUseZipWith() throws Exception { public void mustBeAbleToUseZipWith() throws Exception {
final Source<Integer, BoxedUnit> in1 = Source.single(1); final Source<Integer, BoxedUnit> in1 = Source.single(1);

View file

@ -1,3 +1,6 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl package akka.stream.scaladsl
import scala.concurrent.duration._ import scala.concurrent.duration._

View file

@ -0,0 +1,284 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream._
import akka.stream.testkit.TestSubscriber.Probe
import akka.stream.testkit.Utils._
import akka.stream.testkit._
import org.reactivestreams.Publisher
import scala.concurrent.duration._
import scala.util.control.NoStackTrace
class GraphUnzipWithSpec extends AkkaSpec {
import FlowGraph.Implicits._
val settings = ActorMaterializerSettings(system)
.withInputBuffer(initialSize = 2, maxSize = 16)
implicit val materializer = ActorMaterializer(settings)
val TestException = new RuntimeException("test") with NoStackTrace
type LeftOutput = Int
type RightOutput = String
abstract class Fixture(b: FlowGraph.Builder[_]) {
def in: Inlet[Int]
def left: Outlet[LeftOutput]
def right: Outlet[RightOutput]
}
val f: (Int (Int, String)) = b (b + b, b + "+" + b)
def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
val unzip = b.add(UnzipWith[Int, Int, String](f))
override def in: Inlet[Int] = unzip.in
override def left: Outlet[Int] = unzip.out0
override def right: Outlet[String] = unzip.out1
}
def setup(p: Publisher[Int]) = {
val leftSubscriber = TestSubscriber.probe[LeftOutput]()
val rightSubscriber = TestSubscriber.probe[RightOutput]()
FlowGraph.closed() { implicit b
val f = fixture(b)
Source(p) ~> f.in
f.left ~> Sink(leftSubscriber)
f.right ~> Sink(rightSubscriber)
}.run()
(leftSubscriber, rightSubscriber)
}
def validateSubscriptionAndComplete(subscribers: (Probe[LeftOutput], Probe[RightOutput])): Unit = {
subscribers._1.expectSubscriptionAndComplete()
subscribers._2.expectSubscriptionAndComplete()
}
def validateSubscriptionAndError(subscribers: (Probe[LeftOutput], Probe[RightOutput])): Unit = {
subscribers._1.expectSubscriptionAndError(TestException)
subscribers._2.expectSubscriptionAndError(TestException)
}
"UnzipWith" must {
"work with immediately completed publisher" in assertAllStagesStopped {
val subscribers = setup(TestPublisher.empty[Int])
validateSubscriptionAndComplete(subscribers)
}
"work with delayed completed publisher" in assertAllStagesStopped {
val subscribers = setup(TestPublisher.lazyEmpty)
validateSubscriptionAndComplete(subscribers)
}
"work with two immediately failed publishers" in assertAllStagesStopped {
val subscribers = setup(TestPublisher.error(TestException))
validateSubscriptionAndError(subscribers)
}
"work with two delayed failed publishers" in assertAllStagesStopped {
val subscribers = setup(TestPublisher.lazyError(TestException))
validateSubscriptionAndError(subscribers)
}
"work in the happy case" in {
val leftProbe = TestSubscriber.manualProbe[LeftOutput]()
val rightProbe = TestSubscriber.manualProbe[RightOutput]()
FlowGraph.closed() { implicit b
val unzip = b.add(UnzipWith(f))
Source(1 to 4) ~> unzip.in
unzip.out0 ~> Flow[LeftOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(leftProbe)
unzip.out1 ~> Flow[RightOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(rightProbe)
}.run()
val leftSubscription = leftProbe.expectSubscription()
val rightSubscription = rightProbe.expectSubscription()
leftSubscription.request(2)
rightSubscription.request(1)
leftProbe.expectNext(2)
leftProbe.expectNext(4)
leftProbe.expectNoMsg(100.millis)
rightProbe.expectNext("1+1")
rightProbe.expectNoMsg(100.millis)
leftSubscription.request(1)
rightSubscription.request(2)
leftProbe.expectNext(6)
leftProbe.expectNoMsg(100.millis)
rightProbe.expectNext("2+2")
rightProbe.expectNext("3+3")
rightProbe.expectNoMsg(100.millis)
leftSubscription.request(1)
rightSubscription.request(1)
leftProbe.expectNext(8)
rightProbe.expectNext("4+4")
leftProbe.expectComplete()
rightProbe.expectComplete()
}
"work in the sad case" in {
val leftProbe = TestSubscriber.manualProbe[LeftOutput]()
val rightProbe = TestSubscriber.manualProbe[RightOutput]()
FlowGraph.closed() { implicit b
val unzip = b.add(UnzipWith[Int, Int, String]((b: Int) (1 / b, 1 + "/" + b)))
Source(-2 to 2) ~> unzip.in
unzip.out0 ~> Flow[LeftOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(leftProbe)
unzip.out1 ~> Flow[RightOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(rightProbe)
}.run()
val leftSubscription = leftProbe.expectSubscription()
val rightSubscription = rightProbe.expectSubscription()
leftSubscription.request(2)
leftProbe.expectNext(1 / -2)
leftProbe.expectNext(1 / -1)
rightSubscription.request(1)
rightProbe.expectNext("1/-2")
leftSubscription.request(2)
leftProbe.expectError() match {
case a: java.lang.ArithmeticException a.getMessage should be("/ by zero")
}
rightProbe.expectError()
leftProbe.expectNoMsg(100.millis)
rightProbe.expectNoMsg(100.millis)
}
"unzipWith expanded Person.unapply (3 ouputs)" in {
val probe0 = TestSubscriber.manualProbe[String]()
val probe1 = TestSubscriber.manualProbe[String]()
val probe2 = TestSubscriber.manualProbe[Int]()
case class Person(name: String, surname: String, int: Int)
FlowGraph.closed() { implicit b
val unzip = b.add(UnzipWith((a: Person) Person.unapply(a).get))
Source.single(Person("Caplin", "Capybara", 3)) ~> unzip.in
unzip.out0 ~> Sink(probe0)
unzip.out1 ~> Sink(probe1)
unzip.out2 ~> Sink(probe2)
}.run()
val subscription0 = probe0.expectSubscription()
val subscription1 = probe1.expectSubscription()
val subscription2 = probe2.expectSubscription()
subscription0.request(1)
subscription1.request(1)
subscription2.request(1)
probe0.expectNext("Caplin")
probe1.expectNext("Capybara")
probe2.expectNext(3)
probe0.expectComplete()
probe1.expectComplete()
probe2.expectComplete()
}
"work with up to 20 outputs" in {
val probe0 = TestSubscriber.manualProbe[Int]()
val probe5 = TestSubscriber.manualProbe[String]()
val probe10 = TestSubscriber.manualProbe[Int]()
val probe15 = TestSubscriber.manualProbe[String]()
val probe19 = TestSubscriber.manualProbe[String]()
FlowGraph.closed() { implicit b
val split20 = (a: (List[Int]))
(a(0), a(0).toString,
a(1), a(1).toString,
a(2), a(2).toString,
a(3), a(3).toString,
a(4), a(4).toString,
a(5), a(5).toString,
a(6), a(6).toString,
a(7), a(7).toString,
a(8), a(8).toString,
a(9), a(9).toString)
// odd input ports will be Int, even input ports will be String
val unzip = b.add(UnzipWith(split20))
Source.single((0 to 19).toList) ~> unzip.in
def createSink[T](o: Outlet[T]) =
o ~> Flow[T].buffer(1, OverflowStrategy.backpressure) ~> Sink(TestSubscriber.manualProbe[T]())
unzip.out0 ~> Sink(probe0)
createSink(unzip.out1)
createSink(unzip.out2)
createSink(unzip.out3)
createSink(unzip.out4)
unzip.out5 ~> Sink(probe5)
createSink(unzip.out6)
createSink(unzip.out7)
createSink(unzip.out8)
createSink(unzip.out9)
unzip.out10 ~> Sink(probe10)
createSink(unzip.out11)
createSink(unzip.out12)
createSink(unzip.out13)
createSink(unzip.out14)
unzip.out15 ~> Sink(probe15)
createSink(unzip.out16)
createSink(unzip.out17)
createSink(unzip.out18)
unzip.out19 ~> Sink(probe19)
}.run()
probe0.expectSubscription().request(1)
probe5.expectSubscription().request(1)
probe10.expectSubscription().request(1)
probe15.expectSubscription().request(1)
probe19.expectSubscription().request(1)
probe0.expectNext(0)
probe5.expectNext("2")
probe10.expectNext(5)
probe15.expectNext("7")
probe19.expectNext("9")
probe0.expectComplete()
probe5.expectComplete()
probe10.expectComplete()
probe15.expectComplete()
probe19.expectComplete()
}
}
}

View file

@ -1,5 +1,5 @@
/** /**
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com> * Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/ */
package akka.stream.scaladsl package akka.stream.scaladsl

View file

@ -6,7 +6,7 @@ package akka.stream.impl
import akka.actor.Props import akka.actor.Props
import akka.actor.Deploy import akka.actor.Deploy
import akka.stream._ import akka.stream._
import akka.stream.impl.Junctions.FanInModule import akka.stream.impl.Junctions.{ FanInModule, FanOutModule }
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.Attributes import akka.stream.Attributes
import akka.stream.Attributes._ import akka.stream.Attributes._
@ -34,4 +34,24 @@ private[akka] object GenJunctions {
}# }#
] ]
sealed trait UnzipWithModule {
/** Allows hiding the boilerplate Props creation from the materializer */
def props(settings: ActorMaterializerSettings): Props
}
[2..20#
final case class UnzipWith1Module[B, [#A1#]](
shape: FanOutShape1[B, [#A1#]],
f: B ⇒ ([#A1#]),
override val attributes: Attributes = name("unzipWith1")) extends FanOutModule with UnzipWithModule {
override def withAttributes(attr: Attributes): Module = copy(attributes = attr)
override def carbonCopy: Module = UnzipWith1Module(shape.deepCopy(), f, attributes)
override def props(settings: ActorMaterializerSettings): Props =
Props(new Unzip1With(settings, f.asInstanceOf[Function##1[Any, ([#Any#])]])).withDeploy(Deploy.local)
}#
]
} }

View file

@ -0,0 +1,22 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.impl
import akka.stream.ActorMaterializerSettings
[2..20#/** INTERNAL API */
private[akka] final class Unzip1With(_settings: ActorMaterializerSettings, f: Function##1[Any, ([#Any#])])
extends FanOut(_settings, outputCount = 1) {
outputBunch.markAllOutputs()
initialPhase(##1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs){ () ⇒
val elem = primaryInputs.dequeueInputElement()
val tuple = f(elem)
[1..#outputBunch.enqueue(0, tuple._1)#
]
})
}#
]

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.javadsl
import akka.stream._
import akka.stream.scaladsl
import akka.japi.function
import akka.japi.Pair
import akka.japi.tuple._
/**
* Split one stream into several streams using a splitting function.
*
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
*
* '''Backpressures when''' any of the outputs backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' any downstream cancels
*/
object UnzipWith {
/**
* Create a new `UnzipWith` vertex with the specified input type and unzipping-function `f`.
*
* @param f unzipping-function from the input value to the pair of output values
*/
def create[In, A, B](f: function.Function[In, Pair[A, B]]): Graph[FanOutShape2[In, A, B], Unit] =
scaladsl.UnzipWith[In, A, B]((in: In) => f.apply(in) match { case Pair(a, b) => (a, b) })
[3..20#/** Create a new `UnzipWith` specialized for 1 outputs.
*
* @param f unzipping-function from the input value to the output values
*/
def create1[In, [#T1#]](f: function.Function[In, Tuple1[[#T1#]]]): Graph[FanOutShape1[In, [#T1#]], Unit] =
scaladsl.UnzipWith[In, [#T1#]]((in: In) => f.apply(in).toScala)#
]
}

View file

@ -0,0 +1,57 @@
/**
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.stream.scaladsl
import akka.stream._
import akka.stream.impl.GenJunctions._
import akka.stream.impl.StreamLayout
object UnzipWithApply {
import scala.language.implicitConversions
abstract trait UnzipWithCreator[In, Out, T] {
def create(unzipper: Function1[In, Out]): T
}
[2..20#trait UnzipWithCreator1[In, [#A1#]] extends UnzipWithCreator[In, Tuple1[[#A1#]], UnzipWith1[In, [#A1#]]] {
override def create(unzipper: In ⇒ ([#A1#])): UnzipWith1[In, [#A1#]] = {
val shape = new FanOutShape1[In, [#A1#]]("UnzipWith1")
new UnzipWith1(shape, new UnzipWith1Module(shape, unzipper, Attributes.name("UnzipWith1")))
}
}
implicit object UnzipWithCreatorObject1 extends UnzipWithCreator1[Any, [#Any#]]#
]
}
trait UnzipWithApply {
import UnzipWithApply._
[2..20#/**
* Create a new `UnzipWith` specialized for 1 outputs.
*
* @param unzipper unzipping-function from the input value to 1 output values
*/
def apply[In, [#A1#]](unzipper: In ⇒ Tuple1[[#A1#]])(implicit creator: UnzipWithCreator1[Any, [#Any#]]): UnzipWith1[In, [#A1#]] = {
creator.asInstanceOf[UnzipWithCreator1[In, [#A1#]]].create(unzipper)
}#
]
}
[2..20#/** `UnzipWith` specialized for 1 outputs */
class UnzipWith1[In, [#A1#]] private[stream] (override val shape: FanOutShape1[In, [#A1#]],
private[stream] override val module: StreamLayout.Module)
extends Graph[FanOutShape1[In, [#A1#]], Unit] {
override def withAttributes(attr: Attributes): UnzipWith1[In, [#A1#]] =
new UnzipWith1(shape, module.withAttributes(attr).nest())
override def named(name: String): UnzipWith1[In, [#A1#]] = withAttributes(Attributes.name(name))
}
#
]

View file

@ -10,6 +10,7 @@ import akka.dispatch.Dispatchers
import akka.pattern.ask import akka.pattern.ask
import akka.stream.actor.ActorSubscriber import akka.stream.actor.ActorSubscriber
import akka.stream.impl.GenJunctions.ZipWithModule import akka.stream.impl.GenJunctions.ZipWithModule
import akka.stream.impl.GenJunctions.UnzipWithModule
import akka.stream.impl.Junctions._ import akka.stream.impl.Junctions._
import akka.stream.impl.StreamLayout.Module import akka.stream.impl.StreamLayout.Module
import akka.stream.impl.fusing.ActorInterpreter import akka.stream.impl.fusing.ActorInterpreter
@ -178,8 +179,8 @@ private[akka] case class ActorMaterializerImpl(
case BalanceModule(shape, waitForDownstreams, _) case BalanceModule(shape, waitForDownstreams, _)
(Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq) (Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
case UnzipModule(shape, _) case unzip: UnzipWithModule
(Unzip.props(effectiveSettings), shape.in, shape.outlets) (unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
} }
val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher) val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
val size = outs.size val size = outs.size

View file

@ -209,17 +209,13 @@ object Zip {
* '''Cancels when''' any downstream cancels * '''Cancels when''' any downstream cancels
*/ */
object Unzip { object Unzip {
import akka.japi.function.Function
/** /**
* Creates a new `Unzip` vertex with the specified output types. * Creates a new `Unzip` vertex with the specified output types.
*/ */
def create[A, B](): Graph[FanOutShape2[A Pair B, A, B], Unit] = def create[A, B](): Graph[FanOutShape2[A Pair B, A, B], Unit] =
scaladsl.FlowGraph.partial() { implicit b UnzipWith.create(JavaIdentityFunction.asInstanceOf[Function[Pair[A, B], Pair[A, B]]])
val unzip = b.add(scaladsl.Unzip[A, B]())
val tuple = b.add(scaladsl.Flow[A Pair B].map(p (p.first, p.second)))
b.addEdge(tuple.outlet, unzip.in)
new FanOutShape2(FanOutShape.Ports(tuple.inlet, unzip.out0 :: unzip.out1 :: Nil))
}
/** /**
* Creates a new `Unzip` vertex with the specified output types. * Creates a new `Unzip` vertex with the specified output types.

View file

@ -240,12 +240,18 @@ object ZipWith extends ZipWithApply
* '''Cancels when''' any downstream cancels * '''Cancels when''' any downstream cancels
*/ */
object Unzip { object Unzip {
private final val _identity: Any Any = a a
/** /**
* Create a new `Unzip`. * Create a new `Unzip`.
*/ */
def apply[A, B](): Unzip[A, B] = { def apply[A, B](): Unzip[A, B] = {
val shape = new FanOutShape2[(A, B), A, B]("Unzip") val shape = new FanOutShape2[(A, B), A, B]("Unzip")
new Unzip(shape, new UnzipModule(shape, Attributes.name("Unzip"))) new Unzip(shape, new UnzipWith2Module[(A, B), A, B](
shape,
_identity.asInstanceOf[((A, B)) (A, B)],
Attributes.name("Unzip")))
} }
} }
@ -262,6 +268,19 @@ class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B],
override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name)) override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name))
} }
/**
* Transforms each element of input stream into multiple streams using a splitter function.
*
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
*
* '''Backpressures when''' any of the outputs backpressures
*
* '''Completes when''' upstream completes
*
* '''Cancels when''' any downstream cancels
*/
object UnzipWith extends UnzipWithApply
object Concat { object Concat {
/** /**
* Create a new `Concat`. * Create a new `Concat`.