+str #17399 add boilerplate remover for fan-in junctions
This commit is contained in:
parent
343d64050d
commit
7620014358
12 changed files with 290 additions and 16 deletions
|
|
@ -143,6 +143,19 @@ For defining a ``Flow<T>`` we need to expose both an undefined source and sink:
|
|||
|
||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#flow-from-partial-flow-graph
|
||||
|
||||
Combining Sources and Sinks with simplified API
|
||||
-----------------------------------------------
|
||||
|
||||
There is simplified API you can use to combine sources and sinks with junctions like: ``Broadcast<T>``, ``Balance<T>``,
|
||||
``Merge<In>`` and ``Concat<A>`` without the need for using the Graph DSL. The combine method takes care of constructing
|
||||
the necessary graph underneath. In following example we combine two sources into one (fan-in):
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#source-combine
|
||||
|
||||
The same can be done for a ``Sink`` but in this case it will be fan-out:
|
||||
|
||||
.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#sink-combine
|
||||
|
||||
.. _bidi-flow-java:
|
||||
|
||||
Bidirectional Flows
|
||||
|
|
|
|||
|
|
@ -3,13 +3,12 @@
|
|||
*/
|
||||
package docs.stream
|
||||
|
||||
import akka.stream.scaladsl._
|
||||
import akka.actor.ActorRef
|
||||
import akka.stream._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
class StreamPartialFlowGraphDocSpec extends AkkaSpec {
|
||||
|
|
@ -100,4 +99,30 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec {
|
|||
|
||||
Await.result(matSink, 300.millis) should equal(1 -> "1")
|
||||
}
|
||||
|
||||
"combine sources with simplified API" in {
|
||||
//#source-combine
|
||||
val sourceOne = Source(List(1))
|
||||
val sourceTwo = Source(List(2))
|
||||
val merged = Source.combine(sourceOne, sourceTwo)(Merge(_))
|
||||
|
||||
val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _))
|
||||
//#source-combine
|
||||
Await.result(mergedResult, 300.millis) should equal(3)
|
||||
}
|
||||
|
||||
"combine sinks with simplified API" in {
|
||||
val actorRef: ActorRef = testActor
|
||||
//#sink-combine
|
||||
val sendRmotely = Sink.actorRef(actorRef, "Done")
|
||||
val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ())
|
||||
|
||||
val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast(_))
|
||||
|
||||
Source(List(0, 1, 2)).runWith(sink)
|
||||
//#sink-combine
|
||||
expectMsg(0)
|
||||
expectMsg(1)
|
||||
expectMsg(2)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -152,6 +152,19 @@ For defining a ``Flow[T]`` we need to expose both an inlet and an outlet:
|
|||
|
||||
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph
|
||||
|
||||
Combining Sources and Sinks with simplified API
|
||||
-----------------------------------------------
|
||||
|
||||
There is simplified API you can use to combine sources and sinks with junctions like: ``Broadcast[T]``, ``Balance[T]``,
|
||||
``Merge[In]`` and ``Concat[A]`` without the need for using the Graph DSL. The combine method takes care of constructing
|
||||
the necessary graph underneath. In following example we combine two sources into one (fan-in):
|
||||
|
||||
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-combine
|
||||
|
||||
The same can be done for a ``Sink[T]`` but in this case it will be fan-out:
|
||||
|
||||
.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#sink-combine
|
||||
|
||||
Building reusable Graph components
|
||||
----------------------------------
|
||||
|
||||
|
|
|
|||
|
|
@ -8,6 +8,14 @@ import java.util.Arrays;
|
|||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import akka.actor.ActorRef;
|
||||
import akka.japi.function.Function;
|
||||
import akka.japi.function.Procedure;
|
||||
import akka.stream.Graph;
|
||||
import akka.stream.UniformFanInShape;
|
||||
import akka.stream.UniformFanOutShape;
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import org.reactivestreams.Publisher;
|
||||
|
|
@ -18,6 +26,7 @@ import akka.stream.StreamTest;
|
|||
import akka.japi.function.Function2;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.testkit.JavaTestKit;
|
||||
import scala.runtime.BoxedUnit;
|
||||
|
||||
public class SinkTest extends StreamTest {
|
||||
public SinkTest() {
|
||||
|
|
@ -65,4 +74,31 @@ public class SinkTest extends StreamTest {
|
|||
probe.expectMsgEquals("done");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToCombine() throws Exception {
|
||||
final JavaTestKit probe1 = new JavaTestKit(system);
|
||||
final JavaTestKit probe2 = new JavaTestKit(system);
|
||||
|
||||
final Sink<Integer, ?> sink1 = Sink.actorRef(probe1.getRef(), "done1");
|
||||
final Sink<Integer, ?> sink2 = Sink.actorRef(probe2.getRef(), "done2");
|
||||
|
||||
final Sink<Integer, ?> sink = Sink.combine(sink1, sink2, new ArrayList(),
|
||||
new Function<Integer, Graph<UniformFanOutShape<Integer, Integer>, BoxedUnit>>() {
|
||||
public Graph<UniformFanOutShape<Integer, Integer>, BoxedUnit> apply(Integer elem) {
|
||||
return Broadcast.create(elem);
|
||||
}
|
||||
}
|
||||
);
|
||||
|
||||
Source.from(Arrays.asList(0, 1)).runWith(sink, materializer);
|
||||
|
||||
probe1.expectMsgEquals(0);
|
||||
probe2.expectMsgEquals(0);
|
||||
probe1.expectMsgEquals(1);
|
||||
probe2.expectMsgEquals(1);
|
||||
|
||||
probe1.expectMsgEquals("done1");
|
||||
probe2.expectMsgEquals("done2");
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,14 +10,15 @@ import akka.dispatch.Futures;
|
|||
import akka.dispatch.OnSuccess;
|
||||
import akka.japi.JavaPartialFunction;
|
||||
import akka.japi.Pair;
|
||||
import akka.japi.function.*;
|
||||
import akka.stream.Graph;
|
||||
import akka.stream.OverflowStrategy;
|
||||
import akka.stream.StreamTest;
|
||||
import akka.stream.UniformFanInShape;
|
||||
import akka.stream.stage.*;
|
||||
import akka.japi.function.*;
|
||||
import akka.stream.testkit.AkkaSpec;
|
||||
import akka.stream.testkit.TestPublisher;
|
||||
import akka.testkit.JavaTestKit;
|
||||
|
||||
import org.junit.ClassRule;
|
||||
import org.junit.Test;
|
||||
import scala.concurrent.Await;
|
||||
|
|
@ -26,11 +27,13 @@ import scala.concurrent.duration.Duration;
|
|||
import scala.concurrent.duration.FiniteDuration;
|
||||
import scala.runtime.BoxedUnit;
|
||||
import scala.util.Try;
|
||||
|
||||
import java.util.*;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription;
|
||||
import static akka.stream.testkit.TestPublisher.ManualProbe;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
@SuppressWarnings("serial")
|
||||
public class SourceTest extends StreamTest {
|
||||
|
|
@ -548,7 +551,6 @@ public class SourceTest extends StreamTest {
|
|||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}), materializer);
|
||||
|
||||
final PublisherProbeSubscription<Integer> s = publisherProbe.expectSubscription();
|
||||
s.sendNext(0);
|
||||
probe.expectMsgEquals(0);
|
||||
|
|
@ -558,4 +560,28 @@ public class SourceTest extends StreamTest {
|
|||
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToCombine() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Source<Integer, ?> source1 = Source.from(Arrays.asList(0, 1));
|
||||
final Source<Integer, ?> source2 = Source.from(Arrays.asList(2, 3));
|
||||
|
||||
final Source<Integer, ?> source = Source.combine(source1, source2, new ArrayList(),
|
||||
new Function<Integer, Graph<UniformFanInShape<Integer, Integer>, BoxedUnit>>() {
|
||||
public Graph<UniformFanInShape<Integer, Integer>, BoxedUnit> apply(Integer elem) {
|
||||
return Merge.create(elem);
|
||||
}
|
||||
});
|
||||
|
||||
final Future<BoxedUnit> future = source.runWith(Sink.foreach(new Procedure<Integer>() { // Scala Future
|
||||
public void apply(Integer elem) {
|
||||
probe.getRef().tell(elem, ActorRef.noSender());
|
||||
}
|
||||
}), materializer);
|
||||
|
||||
probe.expectMsgAllOf(0, 1, 2, 3);
|
||||
|
||||
Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS));
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,6 +29,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers {
|
|||
val `scala -> java types` =
|
||||
(classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) ::
|
||||
(classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) ::
|
||||
(classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) ::
|
||||
(classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) ::
|
||||
|
|
|
|||
|
|
@ -3,10 +3,12 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.testkit._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.testkit.TestPublisher.ManualProbe
|
||||
import akka.stream.testkit._
|
||||
|
||||
class SinkSpec extends AkkaSpec {
|
||||
|
||||
import FlowGraph.Implicits._
|
||||
|
||||
implicit val mat = ActorMaterializer()
|
||||
|
|
@ -89,6 +91,42 @@ class SinkSpec extends AkkaSpec {
|
|||
}
|
||||
}
|
||||
|
||||
"combine to many outputs with simplified API" in {
|
||||
val probes = Seq.fill(3)(TestSubscriber.manualProbe[Int]())
|
||||
val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast(_))
|
||||
|
||||
Source(List(0, 1, 2)).runWith(sink)
|
||||
|
||||
val subscriptions = probes.map(_.expectSubscription())
|
||||
|
||||
subscriptions.foreach { s ⇒ s.request(1) }
|
||||
probes.foreach { p ⇒ p.expectNext(0) }
|
||||
|
||||
subscriptions.foreach { s ⇒ s.request(2) }
|
||||
probes.foreach { p ⇒
|
||||
p.expectNextN(List(1, 2))
|
||||
p.expectComplete
|
||||
}
|
||||
}
|
||||
|
||||
"combine to two sinks with simplified API" in {
|
||||
val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]())
|
||||
val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast(_))
|
||||
|
||||
Source(List(0, 1, 2)).runWith(sink)
|
||||
|
||||
val subscriptions = probes.map(_.expectSubscription())
|
||||
|
||||
subscriptions.foreach { s ⇒ s.request(1) }
|
||||
probes.foreach { p ⇒ p.expectNext(0) }
|
||||
|
||||
subscriptions.foreach { s ⇒ s.request(2) }
|
||||
probes.foreach { p ⇒
|
||||
p.expectNextN(List(1, 2))
|
||||
p.expectComplete
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
|
@ -153,6 +153,51 @@ class SourceSpec extends AkkaSpec {
|
|||
gotten.toSet should ===(Set(0, 1, 2, 3, 4))
|
||||
out.expectComplete()
|
||||
}
|
||||
|
||||
"combine from many inputs with simplified API" in {
|
||||
val probes = Seq.fill(3)(TestPublisher.manualProbe[Int]())
|
||||
val source = for (i ← 0 to 2) yield Source(probes(i))
|
||||
val out = TestSubscriber.manualProbe[Int]
|
||||
|
||||
Source.combine(source(0), source(1), source(2))(Merge(_)).to(Sink(out)).run()
|
||||
|
||||
val sub = out.expectSubscription()
|
||||
sub.request(3)
|
||||
|
||||
for (i ← 0 to 2) {
|
||||
val s = probes(i).expectSubscription()
|
||||
s.expectRequest()
|
||||
s.sendNext(i)
|
||||
s.sendComplete()
|
||||
}
|
||||
|
||||
val gotten = for (_ ← 0 to 2) yield out.expectNext()
|
||||
gotten.toSet should ===(Set(0, 1, 2))
|
||||
out.expectComplete()
|
||||
}
|
||||
|
||||
"combine from two inputs with simplified API" in {
|
||||
val probes = Seq.fill(2)(TestPublisher.manualProbe[Int]())
|
||||
val source = Source(probes(0)) :: Source(probes(1)) :: Nil
|
||||
val out = TestSubscriber.manualProbe[Int]
|
||||
|
||||
Source.combine(source(0), source(1))(Merge(_)).to(Sink(out)).run()
|
||||
|
||||
val sub = out.expectSubscription()
|
||||
sub.request(3)
|
||||
|
||||
for (i ← 0 to 1) {
|
||||
val s = probes(i).expectSubscription()
|
||||
s.expectRequest()
|
||||
s.sendNext(i)
|
||||
s.sendComplete()
|
||||
}
|
||||
|
||||
val gotten = for (_ ← 0 to 1) yield out.expectNext()
|
||||
gotten.toSet should ===(Set(0, 1))
|
||||
out.expectComplete()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
"Repeat Source" must {
|
||||
|
|
|
|||
|
|
@ -137,6 +137,16 @@ object Sink {
|
|||
case s: Sink[T, M] ⇒ s
|
||||
case other ⇒ new Sink(scaladsl.Sink.wrap(other))
|
||||
}
|
||||
|
||||
/**
|
||||
* Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`.
|
||||
*/
|
||||
def combine[T, U](output1: Sink[U, _], output2: Sink[U, _], rest: java.util.List[Sink[U, _]], strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], Unit]]): Sink[T, Unit] = {
|
||||
import scala.collection.JavaConverters._
|
||||
val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq()
|
||||
new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num ⇒ strategy.apply(num)))
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -226,6 +226,15 @@ object Source {
|
|||
case s: Source[T, M] ⇒ s
|
||||
case other ⇒ new Source(scaladsl.Source.wrap(other))
|
||||
}
|
||||
|
||||
/**
|
||||
* Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`.
|
||||
*/
|
||||
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: java.util.List[Source[T, _]], strategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], Unit]]): Source[U, Unit] = {
|
||||
import scala.collection.JavaConverters._
|
||||
val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq()
|
||||
new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num ⇒ strategy.apply(num)))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import akka.stream.Attributes._
|
|||
import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage, SyncDirective }
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.concurrent.{ ExecutionContext, Future, Promise }
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
|
||||
|
|
@ -106,6 +107,26 @@ object Sink extends SinkApply {
|
|||
def foreach[T](f: T ⇒ Unit): Sink[T, Future[Unit]] =
|
||||
Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink")
|
||||
|
||||
/**
|
||||
* Combine several sinks with fun-out strategy like `Broadcast` or `Balance` and returns `Sink`.
|
||||
*/
|
||||
def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int ⇒ Graph[UniformFanOutShape[T, U], Unit]): Sink[T, Unit] =
|
||||
|
||||
Sink.wrap(FlowGraph.partial() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
val d = b.add(strategy(rest.size + 2))
|
||||
d.out(0) ~> first
|
||||
d.out(1) ~> second
|
||||
|
||||
@tailrec def combineRest(idx: Int, i: Iterator[Sink[U, _]]): SinkShape[T] =
|
||||
if (i.hasNext) {
|
||||
d.out(idx) ~> i.next()
|
||||
combineRest(idx + 1, i)
|
||||
} else new SinkShape(d.in)
|
||||
|
||||
combineRest(2, rest.iterator)
|
||||
})
|
||||
|
||||
/**
|
||||
* A `Sink` that will invoke the given function to each of the elements
|
||||
* as they pass in. The sink is materialized into a [[scala.concurrent.Future]]
|
||||
|
|
|
|||
|
|
@ -3,19 +3,18 @@
|
|||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import scala.language.higherKinds
|
||||
|
||||
import akka.actor.{ ActorRef, Cancellable, Props }
|
||||
import akka.stream._
|
||||
|
||||
import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule }
|
||||
import akka.stream.impl.Stages.DefaultAttributes
|
||||
import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule }
|
||||
import akka.stream.impl.StreamLayout.Module
|
||||
import akka.stream.impl._
|
||||
import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ }
|
||||
import akka.stream.{ Outlet, SourceShape, _ }
|
||||
import org.reactivestreams.{ Publisher, Subscriber }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.{ Future, Promise }
|
||||
import scala.language.higherKinds
|
||||
|
||||
/**
|
||||
* A `Source` is a set of stream processing steps that has one open output. It can comprise
|
||||
|
|
@ -148,6 +147,25 @@ final class Source[+Out, +Mat](private[stream] override val module: Module)
|
|||
|
||||
/** Converts this Scala DSL element to it's Java DSL counterpart. */
|
||||
def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this)
|
||||
|
||||
/**
|
||||
* Combines several sources with fun-in strategy like `Merge` or `Concat` and returns `Source`.
|
||||
*/
|
||||
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], Unit]): Source[U, Unit] =
|
||||
Source.wrap(FlowGraph.partial() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
val c = b.add(strategy(rest.size + 2))
|
||||
first ~> c.in(0)
|
||||
second ~> c.in(1)
|
||||
|
||||
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
|
||||
if (i.hasNext) {
|
||||
i.next() ~> c.in(idx)
|
||||
combineRest(idx + 1, i)
|
||||
} else SourceShape(c.out)
|
||||
|
||||
combineRest(2, rest.iterator)
|
||||
})
|
||||
}
|
||||
|
||||
object Source extends SourceApply {
|
||||
|
|
@ -364,4 +382,23 @@ object Source extends SourceApply {
|
|||
new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource")))
|
||||
}
|
||||
|
||||
/**
|
||||
* Combines several sources with fun-in strategy like `Merge` or `Concat` and returns `Source`.
|
||||
*/
|
||||
def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], Unit]): Source[U, Unit] =
|
||||
Source.wrap(FlowGraph.partial() { implicit b ⇒
|
||||
import FlowGraph.Implicits._
|
||||
val c = b.add(strategy(rest.size + 2))
|
||||
first ~> c.in(0)
|
||||
second ~> c.in(1)
|
||||
|
||||
@tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] =
|
||||
if (i.hasNext) {
|
||||
i.next() ~> c.in(idx)
|
||||
combineRest(idx + 1, i)
|
||||
} else SourceShape(c.out)
|
||||
|
||||
combineRest(2, rest.iterator)
|
||||
})
|
||||
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue