From 073e7058dcb189cbbeb1995867efb1e1ff357b67 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Tue, 1 Dec 2015 18:03:30 +0100 Subject: [PATCH] !str #15089 add flatMapMerge --- .../rst/java/code/docs/MigrationsJava.java | 4 +- akka-docs-dev/rst/stages-overview.rst | 3 +- .../java/akka/stream/javadsl/FlowTest.java | 38 +++- .../java/akka/stream/javadsl/SourceTest.java | 35 +++- .../stream/scaladsl/FlowConcatAllSpec.scala | 14 +- .../scaladsl/FlowFlattenMergeSpec.scala | 173 ++++++++++++++++++ .../stream/impl/ActorMaterializerImpl.scala | 1 - .../akka/stream/impl/ConcatAllImpl.scala | 43 ----- .../main/scala/akka/stream/impl/Stages.scala | 4 - .../stream/impl/fusing/StreamOfStreams.scala | 173 ++++++++++++++++++ .../main/scala/akka/stream/javadsl/Flow.scala | 21 ++- .../scala/akka/stream/javadsl/Source.scala | 21 ++- .../scala/akka/stream/scaladsl/Flow.scala | 29 ++- 13 files changed, 485 insertions(+), 74 deletions(-) create mode 100644 akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala delete mode 100644 akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala create mode 100644 akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala diff --git a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java index 8d35218ddc..ca3ffe0559 100644 --- a/akka-docs-dev/rst/java/code/docs/MigrationsJava.java +++ b/akka-docs-dev/rst/java/code/docs/MigrationsJava.java @@ -147,8 +147,8 @@ public class MigrationsJava { //#flatMapConcat Flow.>create(). - flatMapConcat(new Function, Source>(){ - @Override public Source apply(Source param) throws Exception { + flatMapConcat(new Function, Source>(){ + @Override public Source apply(Source param) throws Exception { return param; } }); diff --git a/akka-docs-dev/rst/stages-overview.rst b/akka-docs-dev/rst/stages-overview.rst index 69b53476b3..541226a477 100644 --- a/akka-docs-dev/rst/stages-overview.rst +++ b/akka-docs-dev/rst/stages-overview.rst @@ -105,7 +105,8 @@ prefixAndTail the configured number of prefix elements are available. E groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_ splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_ splitAfter an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_ -flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete +flatMapConcat the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete +flatMapMerge one of the currently consumed substreams has an element available downstream backpressures upstream completes and all consumed substreams complete ===================== ========================================================================================================================================= ============================================================================================================================== ===================================================================================== Fan-in stages diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index f3a18821f4..e4a767dccf 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -482,12 +482,12 @@ public class FlowTest extends StreamTest { final Iterable input1 = Arrays.asList(1, 2, 3); final Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); - final Flow, List, ?> flow = Flow.>create(). - flatMapConcat(ConstantFun.>javaIdentityFunction()).grouped(6); + final Flow, List, ?> flow = Flow.>create(). + flatMapConcat(ConstantFun.>javaIdentityFunction()).grouped(6); Future> future = Source.from(mainInputs).via(flow) .runWith(Sink.>head(), materializer); @@ -495,6 +495,38 @@ public class FlowTest extends StreamTest { assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); } + + @Test + public void mustBeAbleToUseFlatMapMerge() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + final Iterable input2 = Arrays.asList(10, 11, 12, 13, 14, 15, 16, 17, 18, 19); + final Iterable input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + final Iterable input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39); + + final List> mainInputs = new ArrayList>(); + mainInputs.add(Source.from(input1)); + mainInputs.add(Source.from(input2)); + mainInputs.add(Source.from(input3)); + mainInputs.add(Source.from(input4)); + + final Flow, List, ?> flow = Flow.>create(). + flatMapMerge(3, ConstantFun.>javaIdentityFunction()).grouped(60); + Future> future = Source.from(mainInputs).via(flow) + .runWith(Sink.>head(), materializer); + + List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + final Set set = new HashSet(); + for (Integer i: result) { + set.add(i); + } + final Set expected = new HashSet(); + for (int i = 0; i < 40; ++i) { + expected.add(i); + } + + assertEquals(expected, set); + } @Test public void mustBeAbleToUseBuffer() throws Exception { diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index ad200ba1f1..430b60a991 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -349,12 +349,12 @@ public class SourceTest extends StreamTest { final Iterable input1 = Arrays.asList(1, 2, 3); final Iterable input2 = Arrays.asList(4, 5); - final List> mainInputs = new ArrayList>(); + final List> mainInputs = new ArrayList>(); mainInputs.add(Source.from(input1)); mainInputs.add(Source.from(input2)); Future> future = Source.from(mainInputs) - .flatMapConcat(ConstantFun.>javaIdentityFunction()) + .flatMapConcat(ConstantFun.>javaIdentityFunction()) .grouped(6) .runWith(Sink.>head(), materializer); @@ -363,6 +363,37 @@ public class SourceTest extends StreamTest { assertEquals(Arrays.asList(1, 2, 3, 4, 5), result); } + @Test + public void mustBeAbleToUseFlatMapMerge() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final Iterable input1 = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9); + final Iterable input2 = Arrays.asList(10, 11, 12, 13, 14, 15, 16, 17, 18, 19); + final Iterable input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29); + final Iterable input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39); + + final List> mainInputs = new ArrayList>(); + mainInputs.add(Source.from(input1)); + mainInputs.add(Source.from(input2)); + mainInputs.add(Source.from(input3)); + mainInputs.add(Source.from(input4)); + + Future> future = Source.from(mainInputs) + .flatMapMerge(3, ConstantFun.>javaIdentityFunction()).grouped(60) + .runWith(Sink.>head(), materializer); + + List result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS))); + final Set set = new HashSet(); + for (Integer i: result) { + set.add(i); + } + final Set expected = new HashSet(); + for (int i = 0; i < 40; ++i) { + expected.add(i); + } + + assertEquals(expected, set); + } + @Test public void mustBeAbleToUseBuffer() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala index 4fcdd95652..ad5f32286a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowConcatAllSpec.scala @@ -54,7 +54,7 @@ class FlowConcatAllSpec extends AkkaSpec { } "on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Source[Int, _]]() + val publisher = TestPublisher.manualProbe[Source[Int, Unit]]() val subscriber = TestSubscriber.manualProbe[Int]() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() @@ -74,7 +74,7 @@ class FlowConcatAllSpec extends AkkaSpec { } "on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Source[Int, _]]() + val publisher = TestPublisher.manualProbe[Source[Int, Unit]]() val subscriber = TestSubscriber.manualProbe[Int]() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() @@ -114,7 +114,7 @@ class FlowConcatAllSpec extends AkkaSpec { } "on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Source[Int, _]]() + val publisher = TestPublisher.manualProbe[Source[Int, Unit]]() val subscriber = TestSubscriber.manualProbe[Int]() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() @@ -134,7 +134,7 @@ class FlowConcatAllSpec extends AkkaSpec { } "on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Source[Int, _]]() + val publisher = TestPublisher.manualProbe[Source[Int, Unit]]() val subscriber = TestSubscriber.manualProbe[Int]() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() @@ -155,7 +155,7 @@ class FlowConcatAllSpec extends AkkaSpec { } "on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped { - val publisher = TestPublisher.manualProbe[Source[Int, _]]() + val publisher = TestPublisher.manualProbe[Source[Int, Unit]]() val subscriber = TestSubscriber.manualProbe[Int]() Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run() @@ -178,11 +178,11 @@ class FlowConcatAllSpec extends AkkaSpec { } "pass along early cancellation" in assertAllStagesStopped { - val up = TestPublisher.manualProbe[Source[Int, _]]() + val up = TestPublisher.manualProbe[Source[Int, Unit]]() val down = TestSubscriber.manualProbe[Int]() val flowSubscriber = Source - .subscriber[Source[Int, _]] + .subscriber[Source[Int, Unit]] .flatMapConcat(ConstantFun.scalaIdentityFunction) .to(Sink(down)) .run() diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala new file mode 100644 index 0000000000..af594d82d2 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowFlattenMergeSpec.scala @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.AkkaSpec +import akka.stream.ActorMaterializer +import scala.concurrent._ +import scala.concurrent.duration._ +import org.scalatest.concurrent.ScalaFutures +import org.scalactic.ConversionCheckedTripleEquals +import akka.stream.testkit.TestPublisher +import org.scalatest.exceptions.TestFailedException +import akka.stream.testkit.scaladsl.TestSink +import akka.testkit.TestLatch + +class FlowFlattenMergeSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals { + implicit val mat = ActorMaterializer() + import system.dispatcher + + def src10(i: Int) = Source(i until (i + 10)) + def blocked = Source(Promise[Int].future) + + val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right) + val toSet = toSeq.mapMaterializedValue(_.map(_.toSet)) + + implicit val patience = PatienceConfig(1.second) + + "A FattenMerge" must { + + "work in the nominal case" in { + Source(List(src10(0), src10(10), src10(20), src10(30))) + .flatMapMerge(4, identity) + .runWith(toSet) + .futureValue should ===((0 until 40).toSet) + } + + "not be held back by one slow stream" in { + Source(List(src10(0), src10(10), blocked, src10(20), src10(30))) + .flatMapMerge(3, identity) + .take(40) + .runWith(toSet) + .futureValue should ===((0 until 40).toSet) + } + + "respect breadth" in { + val seq = Source(List(src10(0), src10(10), src10(20), blocked, blocked, src10(30))) + .flatMapMerge(3, identity) + .take(40) + .runWith(toSeq) + .futureValue + + seq.take(30).toSet should ===((0 until 30).toSet) + seq.drop(30).toSet should ===((30 until 40).toSet) + } + + "propagate early failure from main stream" in { + val ex = new Exception("buh") + intercept[TestFailedException] { + Source.failed(ex) + .flatMapMerge(1, identity) + .runWith(Sink.head) + .futureValue + }.cause.get should ===(ex) + } + + "propagate late failure from main stream" in { + val ex = new Exception("buh") + intercept[TestFailedException] { + (Source(List(blocked, blocked)) ++ Source.failed(ex)) + .flatMapMerge(10, identity) + .runWith(Sink.head) + .futureValue + }.cause.get should ===(ex) + } + + "propagate failure from map function" in { + val ex = new Exception("buh") + intercept[TestFailedException] { + Source(1 to 3) + .flatMapMerge(10, i ⇒ if (i == 3) throw ex else blocked) + .runWith(Sink.head) + .futureValue + }.cause.get should ===(ex) + } + + "bubble up substream exceptions" in { + val ex = new Exception("buh") + intercept[TestFailedException] { + Source(List(blocked, blocked, Source.failed(ex))) + .flatMapMerge(10, identity) + .runWith(Sink.head) + .futureValue + }.cause.get should ===(ex) + } + + "cancel substreams when failing from main stream" in { + val p1, p2 = TestPublisher.probe[Int]() + val ex = new Exception("buh") + val p = Promise[Source[Int, Unit]] + (Source(List(Source(p1), Source(p2))) ++ Source(p.future)) + .flatMapMerge(5, identity) + .runWith(Sink.head) + p1.expectRequest() + p2.expectRequest() + p.failure(ex) + p1.expectCancellation() + p2.expectCancellation() + } + + "cancel substreams when failing from substream" in { + val p1, p2 = TestPublisher.probe[Int]() + val ex = new Exception("buh") + val p = Promise[Int] + Source(List(Source(p1), Source(p2), Source(p.future))) + .flatMapMerge(5, identity) + .runWith(Sink.head) + p1.expectRequest() + p2.expectRequest() + p.failure(ex) + p1.expectCancellation() + p2.expectCancellation() + } + + "cancel substreams when failing map function" in { + val p1, p2 = TestPublisher.probe[Int]() + val ex = new Exception("buh") + val latch = TestLatch() + Source(1 to 3) + .flatMapMerge(10, { + case 1 ⇒ Source(p1) + case 2 ⇒ Source(p2) + case 3 ⇒ + Await.ready(latch, 3.seconds) + throw ex + }) + .runWith(Sink.head) + p1.expectRequest() + p2.expectRequest() + latch.countDown() + p1.expectCancellation() + p2.expectCancellation() + } + + "cancel substreams when being cancelled" in { + val p1, p2 = TestPublisher.probe[Int]() + val ex = new Exception("buh") + val sink = Source(List(Source(p1), Source(p2))) + .flatMapMerge(5, identity) + .runWith(TestSink.probe) + sink.request(1) + p1.expectRequest() + p2.expectRequest() + sink.cancel() + p1.expectCancellation() + p2.expectCancellation() + } + + "work with many concurrently queued events" in { + val p = Source((0 until 100).map(i ⇒ src10(10 * i))) + .flatMapMerge(Int.MaxValue, identity) + .runWith(TestSink.probe) + p.within(1.second) { + p.ensureSubscription() + p.expectNoMsg() + } + val elems = p.within(1.second)((1 to 1000).map(i ⇒ p.requestNext()).toSet) + p.expectComplete() + elems should ===((0 until 1000).toSet) + } + + } +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala index e64e7f4de6..07339a4124 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/ActorMaterializerImpl.scala @@ -262,7 +262,6 @@ private[akka] object ActorProcessorFactory { case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ()) case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ()) case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ()) - case ConcatAll(f, _) ⇒ (ConcatAllImpl.props(f, materializer), ()) case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory") } } diff --git a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala b/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala deleted file mode 100644 index 176324b744..0000000000 --- a/akka-stream/src/main/scala/akka/stream/impl/ConcatAllImpl.scala +++ /dev/null @@ -1,43 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.stream.impl - -import akka.stream.ActorMaterializer -import akka.stream.scaladsl.{ Source, Sink } -import akka.actor.{ Deploy, Props } - -/** - * INTERNAL API - */ -private[akka] object ConcatAllImpl { - def props(f: Any ⇒ Source[Any, _], materializer: ActorMaterializer): Props = - Props(new ConcatAllImpl(f, materializer)).withDeploy(Deploy.local) -} - -/** - * INTERNAL API - */ -private[akka] class ConcatAllImpl(f: Any ⇒ Source[Any, _], materializer: ActorMaterializer) - extends MultiStreamInputProcessor(materializer.settings) { - - import akka.stream.impl.MultiStreamInputProcessor._ - - val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒ - val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher(false))(materializer) - // FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now) - val inputs = createAndSubscribeSubstreamInput(publisher) - nextPhase(streamSubstream(inputs)) - } - - def streamSubstream(substream: SubstreamInput): TransferPhase = - TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒ - if (substream.inputsDepleted) nextPhase(takeNextSubstream) - else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement()) - } - - initialPhase(1, takeNextSubstream) - - override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e) - -} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 5e3aa20e1a..e68ddcfe9f 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -233,10 +233,6 @@ private[stream] object Stages { def after(f: Any ⇒ Boolean) = Split(el ⇒ if (f(el)) SplitAfter else Continue, name("splitAfter")) } - final case class ConcatAll(f: Any ⇒ Source[Any, _], attributes: Attributes = concatAll) extends StageModule { - override def withAttributes(attributes: Attributes) = copy(attributes = attributes) - } - final case class DirectProcessor(p: () ⇒ (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule { override def withAttributes(attributes: Attributes) = copy(attributes = attributes) } diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala new file mode 100644 index 0000000000..be1df51daf --- /dev/null +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/StreamOfStreams.scala @@ -0,0 +1,173 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.stream.impl.fusing + +import akka.stream._ +import akka.stream.stage._ +import akka.stream.scaladsl._ +import akka.stream.actor.ActorSubscriberMessage +import akka.stream.actor.ActorSubscriberMessage._ +import akka.stream.actor.ActorPublisherMessage +import akka.stream.actor.ActorPublisherMessage._ +import scala.concurrent.forkjoin.ThreadLocalRandom + +import java.{ util ⇒ ju } + +final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source[T, M], T]] { + private val in = Inlet[Source[T, M]]("flatten.in") + private val out = Outlet[T]("flatten.out") + override val shape = FlowShape(in, out) + + override def createLogic(attr: Attributes) = new GraphStageLogic(shape) { + + var sources = Set.empty[LocalSource[T]] + def activeSources = sources.size + + private trait Queue { + def hasData: Boolean + def enqueue(src: LocalSource[T]): Unit + def dequeue(): LocalSource[T] + } + + private class FixedQueue extends Queue { + final val Size = 16 + final val Mask = 15 + + private val queue = new Array[LocalSource[T]](Size) + private var head = 0 + private var tail = 0 + + def hasData = tail != head + def enqueue(src: LocalSource[T]): Unit = + if (tail - head == Size) { + val queue = new DynamicQueue + while (hasData) { + queue.add(dequeue()) + } + queue.add(src) + q = queue + } else { + queue(tail & Mask) = src + tail += 1 + } + def dequeue(): LocalSource[T] = { + val ret = queue(head & Mask) + head += 1 + ret + } + } + + private class DynamicQueue extends ju.LinkedList[LocalSource[T]] with Queue { + def hasData = !isEmpty() + def enqueue(src: LocalSource[T]): Unit = add(src) + def dequeue(): LocalSource[T] = remove() + } + + private var q: Queue = new FixedQueue + + def pushOut(): Unit = { + val src = q.dequeue() + push(out, src.elem) + src.elem = null.asInstanceOf[T] + if (src.sub != null) src.sub.pull() + else removeSource(src) + } + + setHandler(in, new InHandler { + override def onPush(): Unit = { + val source = grab(in) + addSource(source) + if (activeSources < breadth) tryPull(in) + } + override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage() + }) + + setHandler(out, new OutHandler { + override def onPull(): Unit = { + pull(in) + setHandler(out, outHandler) + } + }) + + val outHandler = new OutHandler { + // could be unavailable due to async input having been executed before this notification + override def onPull(): Unit = if (q.hasData && isAvailable(out)) pushOut() + } + + def addSource(source: Source[T, M]): Unit = { + val localSource = new LocalSource[T]() + sources += localSource + val sub = source.runWith(new LocalSink(getAsyncCallback[ActorSubscriberMessage] { + case OnNext(elem) ⇒ + val elemT = elem.asInstanceOf[T] + if (isAvailable(out)) { + push(out, elemT) + localSource.sub.pull() + } else { + localSource.elem = elemT + q.enqueue(localSource) + } + case OnComplete ⇒ + localSource.sub = null + if (localSource.elem == null) removeSource(localSource) + case OnError(ex) ⇒ + failStage(ex) + }.invoke))(interpreter.materializer) + localSource.sub = sub + } + + def removeSource(src: LocalSource[T]): Unit = { + val pullSuppressed = activeSources == breadth + sources -= src + if (pullSuppressed) tryPull(in) + if (activeSources == 0 && isClosed(in)) completeStage() + } + + override def postStop(): Unit = { + sources.foreach { src ⇒ + if (src.sub != null) src.sub.cancel() + } + } + } +} + +// TODO possibly place the Local* classes in a companion object depending on where they are reused + +/** + * INTERNAL API + */ +private[fusing] final class LocalSinkSubscription[T](sub: ActorPublisherMessage ⇒ Unit) { + def pull(): Unit = sub(Request(1)) + def cancel(): Unit = sub(Cancel) +} + +/** + * INTERNAL API + */ +private[fusing] final class LocalSource[T](var sub: LocalSinkSubscription[T] = null, var elem: T = null.asInstanceOf[T]) + +/** + * INTERNAL API + */ +private[fusing] final class LocalSink[T](notifier: ActorSubscriberMessage ⇒ Unit) extends GraphStageWithMaterializedValue[SinkShape[T], LocalSinkSubscription[T]] { + private val in = Inlet[T]("LocalSink.in") + override val shape = SinkShape(in) + override def createLogicAndMaterializedValue(attr: Attributes) = { + class Logic extends GraphStageLogic(shape) { + setHandler(in, new InHandler { + override def onPush(): Unit = notifier(OnNext(grab(in))) + override def onUpstreamFinish(): Unit = notifier(OnComplete) + override def onUpstreamFailure(ex: Throwable): Unit = notifier(OnError(ex)) + }) + val sub = new LocalSinkSubscription[T](getAsyncCallback[ActorPublisherMessage] { + case Request(1) ⇒ tryPull(in) + case Cancel ⇒ completeStage() + case _ ⇒ + }.invoke) + override def preStart(): Unit = pull(in) + } + val logic = new Logic + logic -> logic.sub + } +} diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 5cf58dcb1c..3d73c0ced0 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -852,10 +852,25 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels - * */ - def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Flow[In, T, Mat] = - new Flow(delegate.flatMapConcat[T](x ⇒ f(x).asScala)) + def flatMapConcat[T, M](f: function.Function[Out, Source[T, M]]): Flow[In, T, Mat] = + new Flow(delegate.flatMapConcat[T, M](x ⇒ f(x).asScala)) + + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by merging, where at most `breadth` + * substreams are being consumed at any given time. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, Source[T, M]]): Flow[In, T, Mat] = + new Flow(delegate.flatMapMerge(breadth, o ⇒ f(o).asScala)) /** * Concatenate the given [[Source]] to this [[Flow]], meaning that once this diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 99afbd8014..8ce0777f12 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -931,10 +931,25 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels - * */ - def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Source[T, Mat] = - new Source(delegate.flatMapConcat[T](x ⇒ f(x).asScala)) + def flatMapConcat[T, M](f: function.Function[Out, Source[T, M]]): Source[T, Mat] = + new Source(delegate.flatMapConcat[T, M](x ⇒ f(x).asScala)) + + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by merging, where at most `breadth` + * substreams are being consumed at any given time. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, Source[T, M]]): Source[T, Mat] = + new Source(delegate.flatMapMerge(breadth, o ⇒ f(o).asScala)) /** * If the first element has not passed through this stage before the provided timeout, the stream is failed diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 7f06d909d5..dfdadb380d 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -13,12 +13,12 @@ import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUn import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue } import akka.stream.stage._ import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } - import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.{ Duration, FiniteDuration } import scala.language.higherKinds +import akka.stream.impl.fusing.FlattenMerge /** * A `Flow` is a set of stream processing steps that has one open input and one open output. @@ -330,6 +330,13 @@ trait FlowOps[+Out, +Mat] { import akka.stream.impl.Stages._ type Repr[+O, +M] <: FlowOps[O, M] + /* + * Repr is actually self-bounded, but that would be a cyclic type declaration that is illegal in Scala. + * Therefore we need to help the compiler by specifying that Repr + */ + import language.implicitConversions + private implicit def reprFlatten[O1, M1, O2, M2](r: Repr[O1, M1]#Repr[O2, M2]): Repr[O2, M2] = r.asInstanceOf[Repr[O2, M2]] + /** * Transform this [[Flow]] by appending the given processing steps. * {{{ @@ -1009,10 +1016,23 @@ trait FlowOps[+Out, +Mat] { * '''Completes when''' upstream completes and all consumed substreams complete * * '''Cancels when''' downstream cancels - * */ - def flatMapConcat[T](f: Out ⇒ Source[T, _]): Repr[T, Mat] = - deprecatedAndThen(ConcatAll(f.asInstanceOf[Any ⇒ Source[Any, _]])) + def flatMapConcat[T, M](f: Out ⇒ Source[T, M]): Repr[T, Mat] = map(f).via(new FlattenMerge[T, M](1)) + + /** + * Transform each input element into a `Source` of output elements that is + * then flattened into the output stream by merging, where at most `breadth` + * substreams are being consumed at any given time. + * + * '''Emits when''' a currently consumed substream has an element available + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes and all consumed substreams complete + * + * '''Cancels when''' downstream cancels + */ + def flatMapMerge[T, M](breadth: Int, f: Out ⇒ Source[T, M]): Repr[T, Mat] = map(f).via(new FlattenMerge[T, M](breadth)) /** * If the first element has not passed through this stage before the provided timeout, the stream is failed @@ -1348,4 +1368,3 @@ trait FlowOps[+Out, +Mat] { private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U, Mat] } -