From 76200143586c5b524b0a5576127babedf8e90f04 Mon Sep 17 00:00:00 2001 From: Alexander Golubev Date: Mon, 29 Jun 2015 23:47:31 -0400 Subject: [PATCH] +str #17399 add boilerplate remover for fan-in junctions --- akka-docs-dev/rst/java/stream-graphs.rst | 13 +++++ .../StreamPartialFlowGraphDocSpec.scala | 33 ++++++++++-- akka-docs-dev/rst/scala/stream-graphs.rst | 13 +++++ .../java/akka/stream/javadsl/SinkTest.java | 36 +++++++++++++ .../java/akka/stream/javadsl/SourceTest.java | 34 +++++++++++-- .../stream/DslFactoriesConsistencySpec.scala | 1 + .../scala/akka/stream/scaladsl/SinkSpec.scala | 40 ++++++++++++++- .../akka/stream/scaladsl/SourceSpec.scala | 45 ++++++++++++++++ .../main/scala/akka/stream/javadsl/Sink.scala | 10 ++++ .../scala/akka/stream/javadsl/Source.scala | 9 ++++ .../scala/akka/stream/scaladsl/Sink.scala | 21 ++++++++ .../scala/akka/stream/scaladsl/Source.scala | 51 ++++++++++++++++--- 12 files changed, 290 insertions(+), 16 deletions(-) diff --git a/akka-docs-dev/rst/java/stream-graphs.rst b/akka-docs-dev/rst/java/stream-graphs.rst index d15fbee789..9b238262f8 100644 --- a/akka-docs-dev/rst/java/stream-graphs.rst +++ b/akka-docs-dev/rst/java/stream-graphs.rst @@ -143,6 +143,19 @@ For defining a ``Flow`` we need to expose both an undefined source and sink: .. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#flow-from-partial-flow-graph +Combining Sources and Sinks with simplified API +----------------------------------------------- + +There is simplified API you can use to combine sources and sinks with junctions like: ``Broadcast``, ``Balance``, +``Merge`` and ``Concat`` without the need for using the Graph DSL. The combine method takes care of constructing +the necessary graph underneath. In following example we combine two sources into one (fan-in): + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#source-combine + +The same can be done for a ``Sink`` but in this case it will be fan-out: + +.. includecode:: ../../../akka-samples/akka-docs-java-lambda/src/test/java/docs/stream/StreamPartialFlowGraphDocTest.java#sink-combine + .. _bidi-flow-java: Bidirectional Flows diff --git a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala index d198879246..3826ba0a00 100644 --- a/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala +++ b/akka-docs-dev/rst/scala/code/docs/stream/StreamPartialFlowGraphDocSpec.scala @@ -3,13 +3,12 @@ */ package docs.stream -import akka.stream.scaladsl._ +import akka.actor.ActorRef import akka.stream._ +import akka.stream.scaladsl._ import akka.stream.testkit.AkkaSpec -import scala.collection.immutable -import scala.concurrent.Await -import scala.concurrent.Future +import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ class StreamPartialFlowGraphDocSpec extends AkkaSpec { @@ -100,4 +99,30 @@ class StreamPartialFlowGraphDocSpec extends AkkaSpec { Await.result(matSink, 300.millis) should equal(1 -> "1") } + + "combine sources with simplified API" in { + //#source-combine + val sourceOne = Source(List(1)) + val sourceTwo = Source(List(2)) + val merged = Source.combine(sourceOne, sourceTwo)(Merge(_)) + + val mergedResult: Future[Int] = merged.runWith(Sink.fold(0)(_ + _)) + //#source-combine + Await.result(mergedResult, 300.millis) should equal(3) + } + + "combine sinks with simplified API" in { + val actorRef: ActorRef = testActor + //#sink-combine + val sendRmotely = Sink.actorRef(actorRef, "Done") + val localProcessing = Sink.foreach[Int](_ => /* do something usefull */ ()) + + val sink = Sink.combine(sendRmotely, localProcessing)(Broadcast(_)) + + Source(List(0, 1, 2)).runWith(sink) + //#sink-combine + expectMsg(0) + expectMsg(1) + expectMsg(2) + } } diff --git a/akka-docs-dev/rst/scala/stream-graphs.rst b/akka-docs-dev/rst/scala/stream-graphs.rst index a763f20364..08489d1618 100644 --- a/akka-docs-dev/rst/scala/stream-graphs.rst +++ b/akka-docs-dev/rst/scala/stream-graphs.rst @@ -152,6 +152,19 @@ For defining a ``Flow[T]`` we need to expose both an inlet and an outlet: .. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#flow-from-partial-flow-graph +Combining Sources and Sinks with simplified API +----------------------------------------------- + +There is simplified API you can use to combine sources and sinks with junctions like: ``Broadcast[T]``, ``Balance[T]``, +``Merge[In]`` and ``Concat[A]`` without the need for using the Graph DSL. The combine method takes care of constructing +the necessary graph underneath. In following example we combine two sources into one (fan-in): + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#source-combine + +The same can be done for a ``Sink[T]`` but in this case it will be fan-out: + +.. includecode:: code/docs/stream/StreamPartialFlowGraphDocSpec.scala#sink-combine + Building reusable Graph components ---------------------------------- diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java index 104db76021..3272808283 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SinkTest.java @@ -8,6 +8,14 @@ import java.util.Arrays; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.concurrent.TimeUnit; + +import akka.actor.ActorRef; +import akka.japi.function.Function; +import akka.japi.function.Procedure; +import akka.stream.Graph; +import akka.stream.UniformFanInShape; +import akka.stream.UniformFanOutShape; import org.junit.ClassRule; import org.junit.Test; import org.reactivestreams.Publisher; @@ -18,6 +26,7 @@ import akka.stream.StreamTest; import akka.japi.function.Function2; import akka.stream.testkit.AkkaSpec; import akka.testkit.JavaTestKit; +import scala.runtime.BoxedUnit; public class SinkTest extends StreamTest { public SinkTest() { @@ -65,4 +74,31 @@ public class SinkTest extends StreamTest { probe.expectMsgEquals("done"); } + @Test + public void mustBeAbleToCombine() throws Exception { + final JavaTestKit probe1 = new JavaTestKit(system); + final JavaTestKit probe2 = new JavaTestKit(system); + + final Sink sink1 = Sink.actorRef(probe1.getRef(), "done1"); + final Sink sink2 = Sink.actorRef(probe2.getRef(), "done2"); + + final Sink sink = Sink.combine(sink1, sink2, new ArrayList(), + new Function, BoxedUnit>>() { + public Graph, BoxedUnit> apply(Integer elem) { + return Broadcast.create(elem); + } + } + ); + + Source.from(Arrays.asList(0, 1)).runWith(sink, materializer); + + probe1.expectMsgEquals(0); + probe2.expectMsgEquals(0); + probe1.expectMsgEquals(1); + probe2.expectMsgEquals(1); + + probe1.expectMsgEquals("done1"); + probe2.expectMsgEquals("done2"); + } + } diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 8146260114..426c154a38 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -10,14 +10,15 @@ import akka.dispatch.Futures; import akka.dispatch.OnSuccess; import akka.japi.JavaPartialFunction; import akka.japi.Pair; +import akka.japi.function.*; +import akka.stream.Graph; import akka.stream.OverflowStrategy; import akka.stream.StreamTest; +import akka.stream.UniformFanInShape; import akka.stream.stage.*; -import akka.japi.function.*; import akka.stream.testkit.AkkaSpec; import akka.stream.testkit.TestPublisher; import akka.testkit.JavaTestKit; - import org.junit.ClassRule; import org.junit.Test; import scala.concurrent.Await; @@ -26,11 +27,13 @@ import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; import scala.runtime.BoxedUnit; import scala.util.Try; + import java.util.*; import java.util.concurrent.TimeUnit; -import static org.junit.Assert.assertEquals; + import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.TestPublisher.ManualProbe; +import static org.junit.Assert.assertEquals; @SuppressWarnings("serial") public class SourceTest extends StreamTest { @@ -548,7 +551,6 @@ public class SourceTest extends StreamTest { probe.getRef().tell(elem, ActorRef.noSender()); } }), materializer); - final PublisherProbeSubscription s = publisherProbe.expectSubscription(); s.sendNext(0); probe.expectMsgEquals(0); @@ -558,4 +560,28 @@ public class SourceTest extends StreamTest { Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); } + @Test + public void mustBeAbleToCombine() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Source source1 = Source.from(Arrays.asList(0, 1)); + final Source source2 = Source.from(Arrays.asList(2, 3)); + + final Source source = Source.combine(source1, source2, new ArrayList(), + new Function, BoxedUnit>>() { + public Graph, BoxedUnit> apply(Integer elem) { + return Merge.create(elem); + } + }); + + final Future future = source.runWith(Sink.foreach(new Procedure() { // Scala Future + public void apply(Integer elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }), materializer); + + probe.expectMsgAllOf(0, 1, 2, 3); + + Await.ready(future, Duration.apply(200, TimeUnit.MILLISECONDS)); + } + } diff --git a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala index 4b02e69bf8..492093a703 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/DslFactoriesConsistencySpec.scala @@ -29,6 +29,7 @@ class DslFactoriesConsistencySpec extends WordSpec with Matchers { val `scala -> java types` = (classOf[scala.collection.immutable.Iterable[_]], classOf[java.lang.Iterable[_]]) :: (classOf[scala.collection.Iterator[_]], classOf[java.util.Iterator[_]]) :: + (classOf[scala.collection.Seq[_]], classOf[java.util.List[_]]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: (classOf[scala.Function0[_]], classOf[java.util.concurrent.Callable[_]]) :: (classOf[scala.Function0[_]], classOf[akka.japi.function.Creator[_]]) :: diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala index 1abd6e364a..a9d25df84f 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SinkSpec.scala @@ -3,10 +3,12 @@ */ package akka.stream.scaladsl -import akka.stream.testkit._ import akka.stream.ActorMaterializer +import akka.stream.testkit.TestPublisher.ManualProbe +import akka.stream.testkit._ class SinkSpec extends AkkaSpec { + import FlowGraph.Implicits._ implicit val mat = ActorMaterializer() @@ -89,6 +91,42 @@ class SinkSpec extends AkkaSpec { } } + "combine to many outputs with simplified API" in { + val probes = Seq.fill(3)(TestSubscriber.manualProbe[Int]()) + val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)), Sink(probes(2)))(Broadcast(_)) + + Source(List(0, 1, 2)).runWith(sink) + + val subscriptions = probes.map(_.expectSubscription()) + + subscriptions.foreach { s ⇒ s.request(1) } + probes.foreach { p ⇒ p.expectNext(0) } + + subscriptions.foreach { s ⇒ s.request(2) } + probes.foreach { p ⇒ + p.expectNextN(List(1, 2)) + p.expectComplete + } + } + + "combine to two sinks with simplified API" in { + val probes = Seq.fill(2)(TestSubscriber.manualProbe[Int]()) + val sink = Sink.combine(Sink(probes(0)), Sink(probes(1)))(Broadcast(_)) + + Source(List(0, 1, 2)).runWith(sink) + + val subscriptions = probes.map(_.expectSubscription()) + + subscriptions.foreach { s ⇒ s.request(1) } + probes.foreach { p ⇒ p.expectNext(0) } + + subscriptions.foreach { s ⇒ s.request(2) } + probes.foreach { p ⇒ + p.expectNextN(List(1, 2)) + p.expectComplete + } + } + } } \ No newline at end of file diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala index 277fb7df6d..fb5dc1ce9d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceSpec.scala @@ -153,6 +153,51 @@ class SourceSpec extends AkkaSpec { gotten.toSet should ===(Set(0, 1, 2, 3, 4)) out.expectComplete() } + + "combine from many inputs with simplified API" in { + val probes = Seq.fill(3)(TestPublisher.manualProbe[Int]()) + val source = for (i ← 0 to 2) yield Source(probes(i)) + val out = TestSubscriber.manualProbe[Int] + + Source.combine(source(0), source(1), source(2))(Merge(_)).to(Sink(out)).run() + + val sub = out.expectSubscription() + sub.request(3) + + for (i ← 0 to 2) { + val s = probes(i).expectSubscription() + s.expectRequest() + s.sendNext(i) + s.sendComplete() + } + + val gotten = for (_ ← 0 to 2) yield out.expectNext() + gotten.toSet should ===(Set(0, 1, 2)) + out.expectComplete() + } + + "combine from two inputs with simplified API" in { + val probes = Seq.fill(2)(TestPublisher.manualProbe[Int]()) + val source = Source(probes(0)) :: Source(probes(1)) :: Nil + val out = TestSubscriber.manualProbe[Int] + + Source.combine(source(0), source(1))(Merge(_)).to(Sink(out)).run() + + val sub = out.expectSubscription() + sub.request(3) + + for (i ← 0 to 1) { + val s = probes(i).expectSubscription() + s.expectRequest() + s.sendNext(i) + s.sendComplete() + } + + val gotten = for (_ ← 0 to 1) yield out.expectNext() + gotten.toSet should ===(Set(0, 1)) + out.expectComplete() + } + } "Repeat Source" must { diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala index e55b6177e2..b56cdb15fd 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Sink.scala @@ -137,6 +137,16 @@ object Sink { case s: Sink[T, M] ⇒ s case other ⇒ new Sink(scaladsl.Sink.wrap(other)) } + + /** + * Combine several sinks with fan-out strategy like `Broadcast` or `Balance` and returns `Sink`. + */ + def combine[T, U](output1: Sink[U, _], output2: Sink[U, _], rest: java.util.List[Sink[U, _]], strategy: function.Function[java.lang.Integer, Graph[UniformFanOutShape[T, U], Unit]]): Sink[T, Unit] = { + import scala.collection.JavaConverters._ + val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq() + new Sink(scaladsl.Sink.combine(output1.asScala, output2.asScala, seq: _*)(num ⇒ strategy.apply(num))) + } + } /** diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 40dfc9d177..cf84b4c9d5 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -226,6 +226,15 @@ object Source { case s: Source[T, M] ⇒ s case other ⇒ new Source(scaladsl.Source.wrap(other)) } + + /** + * Combines several sources with fan-in strategy like `Merge` or `Concat` and returns `Source`. + */ + def combine[T, U](first: Source[T, _], second: Source[T, _], rest: java.util.List[Source[T, _]], strategy: function.Function[java.lang.Integer, Graph[UniformFanInShape[T, U], Unit]]): Source[U, Unit] = { + import scala.collection.JavaConverters._ + val seq = if (rest != null) rest.asScala.map(_.asScala) else Seq() + new Source(scaladsl.Source.combine(first.asScala, second.asScala, seq: _*)(num ⇒ strategy.apply(num))) + } } /** diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala index d90c014dc2..76b76a8750 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Sink.scala @@ -13,6 +13,7 @@ import akka.stream.Attributes._ import akka.stream.stage.{ TerminationDirective, Directive, Context, PushStage, SyncDirective } import org.reactivestreams.{ Publisher, Subscriber } +import scala.annotation.tailrec import scala.concurrent.{ ExecutionContext, Future, Promise } import scala.util.{ Failure, Success, Try } @@ -106,6 +107,26 @@ object Sink extends SinkApply { def foreach[T](f: T ⇒ Unit): Sink[T, Future[Unit]] = Flow[T].map(f).toMat(Sink.ignore)(Keep.right).named("foreachSink") + /** + * Combine several sinks with fun-out strategy like `Broadcast` or `Balance` and returns `Sink`. + */ + def combine[T, U](first: Sink[U, _], second: Sink[U, _], rest: Sink[U, _]*)(strategy: Int ⇒ Graph[UniformFanOutShape[T, U], Unit]): Sink[T, Unit] = + + Sink.wrap(FlowGraph.partial() { implicit b ⇒ + import FlowGraph.Implicits._ + val d = b.add(strategy(rest.size + 2)) + d.out(0) ~> first + d.out(1) ~> second + + @tailrec def combineRest(idx: Int, i: Iterator[Sink[U, _]]): SinkShape[T] = + if (i.hasNext) { + d.out(idx) ~> i.next() + combineRest(idx + 1, i) + } else new SinkShape(d.in) + + combineRest(2, rest.iterator) + }) + /** * A `Sink` that will invoke the given function to each of the elements * as they pass in. The sink is materialized into a [[scala.concurrent.Future]] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala index 7b5d2a4db5..95b9d759e2 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala @@ -3,19 +3,18 @@ */ package akka.stream.scaladsl -import scala.language.higherKinds - import akka.actor.{ ActorRef, Cancellable, Props } -import akka.stream._ - -import akka.stream.impl.Stages.{ MaterializingStageFactory, StageModule } -import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.Stages.{ DefaultAttributes, MaterializingStageFactory, StageModule } import akka.stream.impl.StreamLayout.Module -import akka.stream.impl._ +import akka.stream.impl.{ EmptyPublisher, ErrorPublisher, _ } +import akka.stream.{ Outlet, SourceShape, _ } import org.reactivestreams.{ Publisher, Subscriber } + +import scala.annotation.tailrec import scala.collection.immutable import scala.concurrent.duration.FiniteDuration import scala.concurrent.{ Future, Promise } +import scala.language.higherKinds /** * A `Source` is a set of stream processing steps that has one open output. It can comprise @@ -148,6 +147,25 @@ final class Source[+Out, +Mat](private[stream] override val module: Module) /** Converts this Scala DSL element to it's Java DSL counterpart. */ def asJava: javadsl.Source[Out, Mat] = new javadsl.Source(this) + + /** + * Combines several sources with fun-in strategy like `Merge` or `Concat` and returns `Source`. + */ + def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], Unit]): Source[U, Unit] = + Source.wrap(FlowGraph.partial() { implicit b ⇒ + import FlowGraph.Implicits._ + val c = b.add(strategy(rest.size + 2)) + first ~> c.in(0) + second ~> c.in(1) + + @tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] = + if (i.hasNext) { + i.next() ~> c.in(idx) + combineRest(idx + 1, i) + } else SourceShape(c.out) + + combineRest(2, rest.iterator) + }) } object Source extends SourceApply { @@ -364,4 +382,23 @@ object Source extends SourceApply { new Source(new ActorRefSource(bufferSize, overflowStrategy, DefaultAttributes.actorRefSource, shape("ActorRefSource"))) } + /** + * Combines several sources with fun-in strategy like `Merge` or `Concat` and returns `Source`. + */ + def combine[T, U](first: Source[T, _], second: Source[T, _], rest: Source[T, _]*)(strategy: Int ⇒ Graph[UniformFanInShape[T, U], Unit]): Source[U, Unit] = + Source.wrap(FlowGraph.partial() { implicit b ⇒ + import FlowGraph.Implicits._ + val c = b.add(strategy(rest.size + 2)) + first ~> c.in(0) + second ~> c.in(1) + + @tailrec def combineRest(idx: Int, i: Iterator[Source[T, _]]): SourceShape[U] = + if (i.hasNext) { + i.next() ~> c.in(idx) + combineRest(idx + 1, i) + } else SourceShape(c.out) + + combineRest(2, rest.iterator) + }) + }