+str #16965 add generalized UnzipWith
This commit is contained in:
parent
f34a684564
commit
50de39e886
15 changed files with 566 additions and 31 deletions
|
|
@ -29,6 +29,7 @@ Akka Streams currently provide these junctions (for a detailed list see :ref:`st
|
|||
|
||||
- ``Broadcast[T]`` – *(1 input, N outputs)* given an input element emits to each output
|
||||
- ``Balance[T]`` – *(1 input, N outputs)* given an input element emits to one of its output ports
|
||||
- ``UnzipWith[In,A,B,...]`` – *(1 input, N outputs)* takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
|
||||
- ``UnZip[A,B]`` – *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B``
|
||||
- ``FlexiRoute[In]`` – *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@ Akka Streams currently provide these junctions (for a detailed list see :ref:`st
|
|||
|
||||
- ``Broadcast[T]`` – *(1 input, N outputs)* given an input element emits to each output
|
||||
- ``Balance[T]`` – *(1 input, N outputs)* given an input element emits to one of its output ports
|
||||
- ``UnzipWith[In,A,B,...]`` – *(1 input, N outputs)* takes a function of 1 input that given a value for each input emits N output elements (where N <= 20)
|
||||
- ``UnZip[A,B]`` – *(1 input, 2 outputs)* splits a stream of ``(A,B)`` tuples into two streams, one of type ``A`` and one of type ``B``
|
||||
- ``FlexiRoute[In]`` – *(1 input, N outputs)* enables writing custom fan out elements using a simple DSL
|
||||
|
||||
|
|
|
|||
|
|
@ -138,6 +138,7 @@ route the elements between different outputs, or emit elements on multiple outpu
|
|||
Stage Emits when Backpressures when Completes when
|
||||
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
|
||||
unzip all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
|
||||
unzipWith all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
|
||||
broadcast all of the outputs stops backpressuring and there is an input element available any of the outputs backpressures upstream completes
|
||||
balance any of the outputs stops backpressuring; emits the element to the first available output all of the outputs backpressure upstream completes
|
||||
===================== ========================================================================================================================= ============================================================================================================================== =====================================================================================
|
||||
|
|
|
|||
|
|
@ -4,8 +4,10 @@
|
|||
package akka.stream.javadsl;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.dispatch.japi;
|
||||
import akka.japi.Pair;
|
||||
import akka.pattern.Patterns;
|
||||
import akka.japi.Tuple4;
|
||||
import akka.stream.*;
|
||||
import akka.stream.javadsl.FlowGraph.Builder;
|
||||
import akka.stream.stage.*;
|
||||
|
|
@ -97,13 +99,7 @@ public class FlowGraphTest extends StreamTest {
|
|||
final Source<String, BoxedUnit> in1 = Source.from(input1);
|
||||
final Source<Integer, BoxedUnit> in2 = Source.from(input2);
|
||||
final FanInShape2<String, Integer, Pair<String,Integer>> zip = b.graph(Zip.<String, Integer>create());
|
||||
final Sink<Pair<String, Integer>, Future<BoxedUnit>> out = Sink
|
||||
.foreach(new Procedure<Pair<String, Integer>>() {
|
||||
@Override
|
||||
public void apply(Pair<String, Integer> param) throws Exception {
|
||||
probe.getRef().tell(param, ActorRef.noSender());
|
||||
}
|
||||
});
|
||||
final Sink<Pair<String, Integer>, BoxedUnit> out = createSink(probe);
|
||||
|
||||
b.edge(b.source(in1), zip.in0());
|
||||
b.edge(b.source(in2), zip.in1());
|
||||
|
|
@ -133,18 +129,8 @@ public class FlowGraphTest extends StreamTest {
|
|||
final Outlet<Pair<String, Integer>> in = b.source(Source.from(input));
|
||||
final FanOutShape2<Pair<String, Integer>, String, Integer> unzip = b.graph(Unzip.<String, Integer>create());
|
||||
|
||||
final Sink<String, Future<BoxedUnit>> out1 = Sink.foreach(new Procedure<String>() {
|
||||
@Override
|
||||
public void apply(String param) throws Exception {
|
||||
probe1.getRef().tell(param, ActorRef.noSender());
|
||||
}
|
||||
});
|
||||
final Sink<Integer, Future<BoxedUnit>> out2 = Sink.foreach(new Procedure<Integer>() {
|
||||
@Override
|
||||
public void apply(Integer param) throws Exception {
|
||||
probe2.getRef().tell(param, ActorRef.noSender());
|
||||
}
|
||||
});
|
||||
final Sink<String, BoxedUnit> out1 = createSink(probe1);
|
||||
final Sink<Integer, BoxedUnit> out2 = createSink(probe2);
|
||||
|
||||
b.edge(in, unzip.in());
|
||||
b.edge(unzip.out0(), b.sink(out1));
|
||||
|
|
@ -157,6 +143,87 @@ public class FlowGraphTest extends StreamTest {
|
|||
assertEquals(expected2, output2);
|
||||
}
|
||||
|
||||
private static <T> Sink<T, BoxedUnit> createSink(final JavaTestKit probe){
|
||||
return Sink.actorRef(probe.getRef(), "onComplete");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseUnzipWith() throws Exception {
|
||||
final JavaTestKit probe1 = new JavaTestKit(system);
|
||||
final JavaTestKit probe2 = new JavaTestKit(system);
|
||||
|
||||
final Builder<BoxedUnit> b = FlowGraph.builder();
|
||||
final Source<Integer, BoxedUnit> in = Source.single(1);
|
||||
|
||||
final FanOutShape2<Integer, String, Integer> unzip = b.graph(UnzipWith.create(
|
||||
new Function<Integer, Pair<String, Integer>>() {
|
||||
@Override public Pair<String, Integer> apply(Integer l) throws Exception {
|
||||
return new Pair<String, Integer>(l + "!", l);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
final Sink<String, BoxedUnit> out1 = createSink(probe1);
|
||||
final Sink<Integer, BoxedUnit> out2 = createSink(probe2);
|
||||
|
||||
b.edge(b.source(in), unzip.in());
|
||||
b.edge(unzip.out0(), b.sink(out1));
|
||||
b.edge(unzip.out1(), b.sink(out2));
|
||||
b.run(materializer);
|
||||
|
||||
Duration d = Duration.create(300, TimeUnit.MILLISECONDS);
|
||||
|
||||
Object output1 = probe1.receiveOne(d);
|
||||
Object output2 = probe2.receiveOne(d);
|
||||
|
||||
assertEquals("1!", output1);
|
||||
assertEquals(1, output2);
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseUnzip4With() throws Exception {
|
||||
final JavaTestKit probe1 = new JavaTestKit(system);
|
||||
final JavaTestKit probe2 = new JavaTestKit(system);
|
||||
final JavaTestKit probe3 = new JavaTestKit(system);
|
||||
final JavaTestKit probe4 = new JavaTestKit(system);
|
||||
|
||||
final Builder<BoxedUnit> b = FlowGraph.builder();
|
||||
final Source<Integer, BoxedUnit> in = Source.single(1);
|
||||
|
||||
final FanOutShape4<Integer, String, Integer, String, Integer> unzip = b.graph(UnzipWith.create4(
|
||||
new Function<Integer, Tuple4<String, Integer, String, Integer>>() {
|
||||
@Override public Tuple4<String, Integer, String, Integer> apply(Integer l) throws Exception {
|
||||
return new Tuple4<String, Integer, String, Integer>(l.toString(), l, l + "+" + l, l + l);
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
final Sink<String, BoxedUnit> out1 = createSink(probe1);
|
||||
final Sink<Integer, BoxedUnit> out2 = createSink(probe2);
|
||||
final Sink<String, BoxedUnit> out3 = createSink(probe3);
|
||||
final Sink<Integer, BoxedUnit> out4 = createSink(probe4);
|
||||
|
||||
b.edge(b.source(in), unzip.in());
|
||||
b.edge(unzip.out0(), b.sink(out1));
|
||||
b.edge(unzip.out1(), b.sink(out2));
|
||||
b.edge(unzip.out2(), b.sink(out3));
|
||||
b.edge(unzip.out3(), b.sink(out4));
|
||||
b.run(materializer);
|
||||
|
||||
Duration d = Duration.create(300, TimeUnit.MILLISECONDS);
|
||||
|
||||
Object output1 = probe1.receiveOne(d);
|
||||
Object output2 = probe2.receiveOne(d);
|
||||
Object output3 = probe3.receiveOne(d);
|
||||
Object output4 = probe4.receiveOne(d);
|
||||
|
||||
assertEquals("1", output1);
|
||||
assertEquals(1, output2);
|
||||
assertEquals("1+1", output3);
|
||||
assertEquals(2, output4);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseZipWith() throws Exception {
|
||||
final Source<Integer, BoxedUnit> in1 = Source.single(1);
|
||||
|
|
|
|||
|
|
@ -1,3 +1,6 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
|
|
|||
|
|
@ -0,0 +1,284 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.testkit.TestSubscriber.Probe
|
||||
import akka.stream.testkit.Utils._
|
||||
import akka.stream.testkit._
|
||||
import org.reactivestreams.Publisher
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
class GraphUnzipWithSpec extends AkkaSpec {
|
||||
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
val settings = ActorMaterializerSettings(system)
|
||||
.withInputBuffer(initialSize = 2, maxSize = 16)
|
||||
|
||||
implicit val materializer = ActorMaterializer(settings)
|
||||
|
||||
val TestException = new RuntimeException("test") with NoStackTrace
|
||||
|
||||
type LeftOutput = Int
|
||||
type RightOutput = String
|
||||
|
||||
abstract class Fixture(b: FlowGraph.Builder[_]) {
|
||||
def in: Inlet[Int]
|
||||
def left: Outlet[LeftOutput]
|
||||
def right: Outlet[RightOutput]
|
||||
}
|
||||
|
||||
val f: (Int ⇒ (Int, String)) = b ⇒ (b + b, b + "+" + b)
|
||||
|
||||
def fixture(b: FlowGraph.Builder[_]): Fixture = new Fixture(b) {
|
||||
val unzip = b.add(UnzipWith[Int, Int, String](f))
|
||||
|
||||
override def in: Inlet[Int] = unzip.in
|
||||
|
||||
override def left: Outlet[Int] = unzip.out0
|
||||
|
||||
override def right: Outlet[String] = unzip.out1
|
||||
}
|
||||
|
||||
def setup(p: Publisher[Int]) = {
|
||||
val leftSubscriber = TestSubscriber.probe[LeftOutput]()
|
||||
val rightSubscriber = TestSubscriber.probe[RightOutput]()
|
||||
|
||||
FlowGraph.closed() { implicit b ⇒
|
||||
val f = fixture(b)
|
||||
|
||||
Source(p) ~> f.in
|
||||
f.left ~> Sink(leftSubscriber)
|
||||
f.right ~> Sink(rightSubscriber)
|
||||
|
||||
}.run()
|
||||
|
||||
(leftSubscriber, rightSubscriber)
|
||||
}
|
||||
|
||||
def validateSubscriptionAndComplete(subscribers: (Probe[LeftOutput], Probe[RightOutput])): Unit = {
|
||||
subscribers._1.expectSubscriptionAndComplete()
|
||||
subscribers._2.expectSubscriptionAndComplete()
|
||||
}
|
||||
|
||||
def validateSubscriptionAndError(subscribers: (Probe[LeftOutput], Probe[RightOutput])): Unit = {
|
||||
subscribers._1.expectSubscriptionAndError(TestException)
|
||||
subscribers._2.expectSubscriptionAndError(TestException)
|
||||
}
|
||||
|
||||
"UnzipWith" must {
|
||||
|
||||
"work with immediately completed publisher" in assertAllStagesStopped {
|
||||
val subscribers = setup(TestPublisher.empty[Int])
|
||||
validateSubscriptionAndComplete(subscribers)
|
||||
}
|
||||
|
||||
"work with delayed completed publisher" in assertAllStagesStopped {
|
||||
val subscribers = setup(TestPublisher.lazyEmpty)
|
||||
validateSubscriptionAndComplete(subscribers)
|
||||
}
|
||||
|
||||
"work with two immediately failed publishers" in assertAllStagesStopped {
|
||||
val subscribers = setup(TestPublisher.error(TestException))
|
||||
validateSubscriptionAndError(subscribers)
|
||||
}
|
||||
|
||||
"work with two delayed failed publishers" in assertAllStagesStopped {
|
||||
val subscribers = setup(TestPublisher.lazyError(TestException))
|
||||
validateSubscriptionAndError(subscribers)
|
||||
}
|
||||
|
||||
"work in the happy case" in {
|
||||
val leftProbe = TestSubscriber.manualProbe[LeftOutput]()
|
||||
val rightProbe = TestSubscriber.manualProbe[RightOutput]()
|
||||
|
||||
FlowGraph.closed() { implicit b ⇒
|
||||
val unzip = b.add(UnzipWith(f))
|
||||
Source(1 to 4) ~> unzip.in
|
||||
|
||||
unzip.out0 ~> Flow[LeftOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(leftProbe)
|
||||
unzip.out1 ~> Flow[RightOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(rightProbe)
|
||||
}.run()
|
||||
|
||||
val leftSubscription = leftProbe.expectSubscription()
|
||||
val rightSubscription = rightProbe.expectSubscription()
|
||||
|
||||
leftSubscription.request(2)
|
||||
rightSubscription.request(1)
|
||||
|
||||
leftProbe.expectNext(2)
|
||||
leftProbe.expectNext(4)
|
||||
leftProbe.expectNoMsg(100.millis)
|
||||
|
||||
rightProbe.expectNext("1+1")
|
||||
rightProbe.expectNoMsg(100.millis)
|
||||
|
||||
leftSubscription.request(1)
|
||||
rightSubscription.request(2)
|
||||
|
||||
leftProbe.expectNext(6)
|
||||
leftProbe.expectNoMsg(100.millis)
|
||||
|
||||
rightProbe.expectNext("2+2")
|
||||
rightProbe.expectNext("3+3")
|
||||
rightProbe.expectNoMsg(100.millis)
|
||||
|
||||
leftSubscription.request(1)
|
||||
rightSubscription.request(1)
|
||||
|
||||
leftProbe.expectNext(8)
|
||||
rightProbe.expectNext("4+4")
|
||||
|
||||
leftProbe.expectComplete()
|
||||
rightProbe.expectComplete()
|
||||
}
|
||||
|
||||
"work in the sad case" in {
|
||||
val leftProbe = TestSubscriber.manualProbe[LeftOutput]()
|
||||
val rightProbe = TestSubscriber.manualProbe[RightOutput]()
|
||||
|
||||
FlowGraph.closed() { implicit b ⇒
|
||||
val unzip = b.add(UnzipWith[Int, Int, String]((b: Int) ⇒ (1 / b, 1 + "/" + b)))
|
||||
|
||||
Source(-2 to 2) ~> unzip.in
|
||||
|
||||
unzip.out0 ~> Flow[LeftOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(leftProbe)
|
||||
unzip.out1 ~> Flow[RightOutput].buffer(4, OverflowStrategy.backpressure) ~> Sink(rightProbe)
|
||||
}.run()
|
||||
|
||||
val leftSubscription = leftProbe.expectSubscription()
|
||||
val rightSubscription = rightProbe.expectSubscription()
|
||||
|
||||
leftSubscription.request(2)
|
||||
leftProbe.expectNext(1 / -2)
|
||||
leftProbe.expectNext(1 / -1)
|
||||
|
||||
rightSubscription.request(1)
|
||||
rightProbe.expectNext("1/-2")
|
||||
|
||||
leftSubscription.request(2)
|
||||
leftProbe.expectError() match {
|
||||
case a: java.lang.ArithmeticException ⇒ a.getMessage should be("/ by zero")
|
||||
}
|
||||
rightProbe.expectError()
|
||||
|
||||
leftProbe.expectNoMsg(100.millis)
|
||||
rightProbe.expectNoMsg(100.millis)
|
||||
}
|
||||
|
||||
"unzipWith expanded Person.unapply (3 ouputs)" in {
|
||||
val probe0 = TestSubscriber.manualProbe[String]()
|
||||
val probe1 = TestSubscriber.manualProbe[String]()
|
||||
val probe2 = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
case class Person(name: String, surname: String, int: Int)
|
||||
|
||||
FlowGraph.closed() { implicit b ⇒
|
||||
val unzip = b.add(UnzipWith((a: Person) ⇒ Person.unapply(a).get))
|
||||
|
||||
Source.single(Person("Caplin", "Capybara", 3)) ~> unzip.in
|
||||
|
||||
unzip.out0 ~> Sink(probe0)
|
||||
unzip.out1 ~> Sink(probe1)
|
||||
unzip.out2 ~> Sink(probe2)
|
||||
}.run()
|
||||
|
||||
val subscription0 = probe0.expectSubscription()
|
||||
val subscription1 = probe1.expectSubscription()
|
||||
val subscription2 = probe2.expectSubscription()
|
||||
|
||||
subscription0.request(1)
|
||||
subscription1.request(1)
|
||||
subscription2.request(1)
|
||||
|
||||
probe0.expectNext("Caplin")
|
||||
probe1.expectNext("Capybara")
|
||||
probe2.expectNext(3)
|
||||
|
||||
probe0.expectComplete()
|
||||
probe1.expectComplete()
|
||||
probe2.expectComplete()
|
||||
}
|
||||
|
||||
"work with up to 20 outputs" in {
|
||||
val probe0 = TestSubscriber.manualProbe[Int]()
|
||||
val probe5 = TestSubscriber.manualProbe[String]()
|
||||
val probe10 = TestSubscriber.manualProbe[Int]()
|
||||
val probe15 = TestSubscriber.manualProbe[String]()
|
||||
val probe19 = TestSubscriber.manualProbe[String]()
|
||||
|
||||
FlowGraph.closed() { implicit b ⇒
|
||||
|
||||
val split20 = (a: (List[Int])) ⇒
|
||||
(a(0), a(0).toString,
|
||||
a(1), a(1).toString,
|
||||
a(2), a(2).toString,
|
||||
a(3), a(3).toString,
|
||||
a(4), a(4).toString,
|
||||
a(5), a(5).toString,
|
||||
a(6), a(6).toString,
|
||||
a(7), a(7).toString,
|
||||
a(8), a(8).toString,
|
||||
a(9), a(9).toString)
|
||||
|
||||
// odd input ports will be Int, even input ports will be String
|
||||
val unzip = b.add(UnzipWith(split20))
|
||||
|
||||
Source.single((0 to 19).toList) ~> unzip.in
|
||||
|
||||
def createSink[T](o: Outlet[T]) =
|
||||
o ~> Flow[T].buffer(1, OverflowStrategy.backpressure) ~> Sink(TestSubscriber.manualProbe[T]())
|
||||
|
||||
unzip.out0 ~> Sink(probe0)
|
||||
createSink(unzip.out1)
|
||||
createSink(unzip.out2)
|
||||
createSink(unzip.out3)
|
||||
createSink(unzip.out4)
|
||||
|
||||
unzip.out5 ~> Sink(probe5)
|
||||
createSink(unzip.out6)
|
||||
createSink(unzip.out7)
|
||||
createSink(unzip.out8)
|
||||
createSink(unzip.out9)
|
||||
|
||||
unzip.out10 ~> Sink(probe10)
|
||||
createSink(unzip.out11)
|
||||
createSink(unzip.out12)
|
||||
createSink(unzip.out13)
|
||||
createSink(unzip.out14)
|
||||
|
||||
unzip.out15 ~> Sink(probe15)
|
||||
createSink(unzip.out16)
|
||||
createSink(unzip.out17)
|
||||
createSink(unzip.out18)
|
||||
|
||||
unzip.out19 ~> Sink(probe19)
|
||||
|
||||
}.run()
|
||||
|
||||
probe0.expectSubscription().request(1)
|
||||
probe5.expectSubscription().request(1)
|
||||
probe10.expectSubscription().request(1)
|
||||
probe15.expectSubscription().request(1)
|
||||
probe19.expectSubscription().request(1)
|
||||
|
||||
probe0.expectNext(0)
|
||||
probe5.expectNext("2")
|
||||
probe10.expectNext(5)
|
||||
probe15.expectNext("7")
|
||||
probe19.expectNext("9")
|
||||
|
||||
probe0.expectComplete()
|
||||
probe5.expectComplete()
|
||||
probe10.expectComplete()
|
||||
probe15.expectComplete()
|
||||
probe19.expectComplete()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
|
|
|
|||
|
|
@ -0,0 +1,21 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.japi
|
||||
|
||||
[3..22#/**
|
||||
* Used to create tuples with 1 elements in Java.
|
||||
*/
|
||||
object Tuple1 {
|
||||
def create[[#T1#]]([#t1: T1#]) = new Tuple1([#t1#])
|
||||
}
|
||||
|
||||
/**
|
||||
* Java API Tuple container.
|
||||
*/
|
||||
@SerialVersionUID(##1L)
|
||||
final case class Tuple1[[#T1#]]([#t1: T1#]) extends scala.Product with scala.Serializable {
|
||||
val toScala = ([#t1#])
|
||||
}#
|
||||
|
||||
]
|
||||
|
|
@ -6,7 +6,7 @@ package akka.stream.impl
|
|||
import akka.actor.Props
|
||||
import akka.actor.Deploy
|
||||
import akka.stream._
|
||||
import akka.stream.impl.Junctions.FanInModule
|
||||
import akka.stream.impl.Junctions.{ FanInModule, FanOutModule }
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.Attributes
|
||||
import akka.stream.Attributes._
|
||||
|
|
@ -34,4 +34,24 @@ private[akka] object GenJunctions {
|
|||
}#
|
||||
]
|
||||
|
||||
sealed trait UnzipWithModule {
|
||||
/** Allows hiding the boilerplate Props creation from the materializer */
|
||||
def props(settings: ActorMaterializerSettings): Props
|
||||
}
|
||||
|
||||
[2..20#
|
||||
final case class UnzipWith1Module[B, [#A1#]](
|
||||
shape: FanOutShape1[B, [#A1#]],
|
||||
f: B ⇒ ([#A1#]),
|
||||
override val attributes: Attributes = name("unzipWith1")) extends FanOutModule with UnzipWithModule {
|
||||
|
||||
override def withAttributes(attr: Attributes): Module = copy(attributes = attr)
|
||||
|
||||
override def carbonCopy: Module = UnzipWith1Module(shape.deepCopy(), f, attributes)
|
||||
|
||||
override def props(settings: ActorMaterializerSettings): Props =
|
||||
Props(new Unzip1With(settings, f.asInstanceOf[Function##1[Any, ([#Any#])]])).withDeploy(Deploy.local)
|
||||
}#
|
||||
]
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,22 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
|
||||
[2..20#/** INTERNAL API */
|
||||
private[akka] final class Unzip1With(_settings: ActorMaterializerSettings, f: Function##1[Any, ([#Any#])])
|
||||
extends FanOut(_settings, outputCount = 1) {
|
||||
|
||||
outputBunch.markAllOutputs()
|
||||
|
||||
initialPhase(##1, TransferPhase(primaryInputs.NeedsInput && outputBunch.AllOfMarkedOutputs){ () ⇒
|
||||
val elem = primaryInputs.dequeueInputElement()
|
||||
val tuple = f(elem)
|
||||
|
||||
[1..#outputBunch.enqueue(0, tuple._1)#
|
||||
]
|
||||
})
|
||||
}#
|
||||
]
|
||||
|
|
@ -0,0 +1,42 @@
|
|||
/**
|
||||
* Copyright (C) 2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.javadsl
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl
|
||||
import akka.japi.function
|
||||
import akka.japi
|
||||
|
||||
/**
|
||||
* Split one stream into several streams using a splitting function.
|
||||
*
|
||||
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
|
||||
*
|
||||
* '''Backpressures when''' any of the outputs backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' any downstream cancels
|
||||
*/
|
||||
object UnzipWith {
|
||||
|
||||
/**
|
||||
* Create a new `UnzipWith` vertex with the specified input type and unzipping-function `f`.
|
||||
*
|
||||
* @param f unzipping-function from the input value to the pair of output values
|
||||
*/
|
||||
def create[In, A, B](f: function.Function[In, japi.Pair[A, B]]): Graph[FanOutShape2[In, A, B], Unit] =
|
||||
scaladsl.UnzipWith[In, A, B]((in: In) => f.apply(in) match { case japi.Pair(a, b) => (a, b) })
|
||||
|
||||
|
||||
[3..20#/** Create a new `UnzipWith` specialized for 1 outputs.
|
||||
*
|
||||
* @param f unzipping-function from the input value to the output values
|
||||
*/
|
||||
def create1[In, [#T1#]](f: function.Function[In, japi.Tuple1[[#T1#]]]): Graph[FanOutShape1[In, [#T1#]], Unit] =
|
||||
scaladsl.UnzipWith[In, [#T1#]]((in: In) => f.apply(in).toScala)#
|
||||
|
||||
]
|
||||
|
||||
}
|
||||
|
|
@ -0,0 +1,57 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.impl.GenJunctions._
|
||||
import akka.stream.impl.StreamLayout
|
||||
|
||||
object UnzipWithApply {
|
||||
import scala.language.implicitConversions
|
||||
|
||||
abstract trait UnzipWithCreator[In, Out, T] {
|
||||
def create(unzipper: Function1[In, Out]): T
|
||||
}
|
||||
|
||||
[2..20#trait UnzipWithCreator1[In, [#A1#]] extends UnzipWithCreator[In, Tuple1[[#A1#]], UnzipWith1[In, [#A1#]]] {
|
||||
override def create(unzipper: In ⇒ ([#A1#])): UnzipWith1[In, [#A1#]] = {
|
||||
val shape = new FanOutShape1[In, [#A1#]]("UnzipWith1")
|
||||
new UnzipWith1(shape, new UnzipWith1Module(shape, unzipper, Attributes.name("UnzipWith1")))
|
||||
}
|
||||
}
|
||||
|
||||
implicit object UnzipWithCreatorObject1 extends UnzipWithCreator1[Any, [#Any#]]#
|
||||
|
||||
]
|
||||
}
|
||||
|
||||
|
||||
trait UnzipWithApply {
|
||||
import UnzipWithApply._
|
||||
|
||||
|
||||
[2..20#/**
|
||||
* Create a new `UnzipWith` specialized for 1 outputs.
|
||||
*
|
||||
* @param unzipper unzipping-function from the input value to 1 output values
|
||||
*/
|
||||
def apply[In, [#A1#]](unzipper: In ⇒ Tuple1[[#A1#]])(implicit creator: UnzipWithCreator1[Any, [#Any#]]): UnzipWith1[In, [#A1#]] = {
|
||||
creator.asInstanceOf[UnzipWithCreator1[In, [#A1#]]].create(unzipper)
|
||||
}#
|
||||
|
||||
]
|
||||
}
|
||||
|
||||
[2..20#/** `UnzipWith` specialized for 1 outputs */
|
||||
class UnzipWith1[In, [#A1#]] private[stream] (override val shape: FanOutShape1[In, [#A1#]],
|
||||
private[stream] override val module: StreamLayout.Module)
|
||||
extends Graph[FanOutShape1[In, [#A1#]], Unit] {
|
||||
|
||||
override def withAttributes(attr: Attributes): UnzipWith1[In, [#A1#]] =
|
||||
new UnzipWith1(shape, module.withAttributes(attr).nest())
|
||||
|
||||
override def named(name: String): UnzipWith1[In, [#A1#]] = withAttributes(Attributes.name(name))
|
||||
}
|
||||
#
|
||||
]
|
||||
|
|
@ -10,6 +10,7 @@ import akka.dispatch.Dispatchers
|
|||
import akka.pattern.ask
|
||||
import akka.stream.actor.ActorSubscriber
|
||||
import akka.stream.impl.GenJunctions.ZipWithModule
|
||||
import akka.stream.impl.GenJunctions.UnzipWithModule
|
||||
import akka.stream.impl.Junctions._
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.impl.fusing.ActorInterpreter
|
||||
|
|
@ -178,8 +179,8 @@ private[akka] case class ActorMaterializerImpl(
|
|||
case BalanceModule(shape, waitForDownstreams, _) ⇒
|
||||
(Balance.props(effectiveSettings, shape.outArray.size, waitForDownstreams), shape.in, shape.outArray.toSeq)
|
||||
|
||||
case UnzipModule(shape, _) ⇒
|
||||
(Unzip.props(effectiveSettings), shape.in, shape.outlets)
|
||||
case unzip: UnzipWithModule ⇒
|
||||
(unzip.props(effectiveSettings), unzip.inPorts.head, unzip.shape.outlets)
|
||||
}
|
||||
val impl = actorOf(props, stageName(effectiveAttributes), effectiveSettings.dispatcher)
|
||||
val size = outs.size
|
||||
|
|
|
|||
|
|
@ -209,17 +209,13 @@ object Zip {
|
|||
* '''Cancels when''' any downstream cancels
|
||||
*/
|
||||
object Unzip {
|
||||
import akka.japi.function.Function
|
||||
|
||||
/**
|
||||
* Creates a new `Unzip` vertex with the specified output types.
|
||||
*/
|
||||
def create[A, B](): Graph[FanOutShape2[A Pair B, A, B], Unit] =
|
||||
scaladsl.FlowGraph.partial() { implicit b ⇒
|
||||
val unzip = b.add(scaladsl.Unzip[A, B]())
|
||||
val tuple = b.add(scaladsl.Flow[A Pair B].map(p ⇒ (p.first, p.second)))
|
||||
b.addEdge(tuple.outlet, unzip.in)
|
||||
new FanOutShape2(FanOutShape.Ports(tuple.inlet, unzip.out0 :: unzip.out1 :: Nil))
|
||||
}
|
||||
UnzipWith.create(JavaIdentityFunction.asInstanceOf[Function[Pair[A, B], Pair[A, B]]])
|
||||
|
||||
/**
|
||||
* Creates a new `Unzip` vertex with the specified output types.
|
||||
|
|
|
|||
|
|
@ -240,12 +240,18 @@ object ZipWith extends ZipWithApply
|
|||
* '''Cancels when''' any downstream cancels
|
||||
*/
|
||||
object Unzip {
|
||||
|
||||
private final val _identity: Any ⇒ Any = a ⇒ a
|
||||
|
||||
/**
|
||||
* Create a new `Unzip`.
|
||||
*/
|
||||
def apply[A, B](): Unzip[A, B] = {
|
||||
val shape = new FanOutShape2[(A, B), A, B]("Unzip")
|
||||
new Unzip(shape, new UnzipModule(shape, Attributes.name("Unzip")))
|
||||
new Unzip(shape, new UnzipWith2Module[(A, B), A, B](
|
||||
shape,
|
||||
_identity.asInstanceOf[((A, B)) ⇒ (A, B)],
|
||||
Attributes.name("Unzip")))
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -262,6 +268,19 @@ class Unzip[A, B] private (override val shape: FanOutShape2[(A, B), A, B],
|
|||
override def named(name: String): Unzip[A, B] = withAttributes(Attributes.name(name))
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms each element of input stream into multiple streams using a splitter function.
|
||||
*
|
||||
* '''Emits when''' all of the outputs stops backpressuring and there is an input element available
|
||||
*
|
||||
* '''Backpressures when''' any of the outputs backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes
|
||||
*
|
||||
* '''Cancels when''' any downstream cancels
|
||||
*/
|
||||
object UnzipWith extends UnzipWithApply
|
||||
|
||||
object Concat {
|
||||
/**
|
||||
* Create a new `Concat`.
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue