!str #15089 add flatMapMerge
This commit is contained in:
parent
3f3f5c8575
commit
073e7058dc
13 changed files with 485 additions and 74 deletions
|
|
@ -147,8 +147,8 @@ public class MigrationsJava {
|
|||
|
||||
//#flatMapConcat
|
||||
Flow.<Source<Integer, BoxedUnit>>create().
|
||||
<Integer>flatMapConcat(new Function<Source<Integer, BoxedUnit>, Source<Integer, ?>>(){
|
||||
@Override public Source<Integer, ?> apply(Source<Integer, BoxedUnit> param) throws Exception {
|
||||
<Integer, BoxedUnit>flatMapConcat(new Function<Source<Integer, BoxedUnit>, Source<Integer, BoxedUnit>>(){
|
||||
@Override public Source<Integer, BoxedUnit> apply(Source<Integer, BoxedUnit> param) throws Exception {
|
||||
return param;
|
||||
}
|
||||
});
|
||||
|
|
|
|||
|
|
@ -105,7 +105,8 @@ prefixAndTail the configured number of prefix elements are available. E
|
|||
groupBy an element for which the grouping function returns a group that has not yet been created. Emits the new group there is an element pending for a group whose substream backpressures upstream completes [3]_
|
||||
splitWhen an element for which the provided predicate is true, opening and emitting a new substream for subsequent elements there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_
|
||||
splitAfter an element passes through. When the provided predicate is true it emitts the element * and opens a new substream for subsequent element there is an element pending for the next substream, but the previous is not fully consumed yet, or the substream backpressures upstream completes [3]_
|
||||
flatten (Concat) the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete
|
||||
flatMapConcat the current consumed substream has an element available downstream backpressures upstream completes and all consumed substreams complete
|
||||
flatMapMerge one of the currently consumed substreams has an element available downstream backpressures upstream completes and all consumed substreams complete
|
||||
===================== ========================================================================================================================================= ============================================================================================================================== =====================================================================================
|
||||
|
||||
Fan-in stages
|
||||
|
|
|
|||
|
|
@ -482,12 +482,12 @@ public class FlowTest extends StreamTest {
|
|||
final Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
|
||||
final Iterable<Integer> input2 = Arrays.asList(4, 5);
|
||||
|
||||
final List<Source<Integer, ?>> mainInputs = new ArrayList<Source<Integer,?>>();
|
||||
final List<Source<Integer, BoxedUnit>> mainInputs = new ArrayList<Source<Integer,BoxedUnit>>();
|
||||
mainInputs.add(Source.from(input1));
|
||||
mainInputs.add(Source.from(input2));
|
||||
|
||||
final Flow<Source<Integer, ?>, List<Integer>, ?> flow = Flow.<Source<Integer, ?>>create().
|
||||
flatMapConcat(ConstantFun.<Source<Integer, ?>>javaIdentityFunction()).grouped(6);
|
||||
final Flow<Source<Integer, BoxedUnit>, List<Integer>, ?> flow = Flow.<Source<Integer, BoxedUnit>>create().
|
||||
flatMapConcat(ConstantFun.<Source<Integer, BoxedUnit>>javaIdentityFunction()).grouped(6);
|
||||
Future<List<Integer>> future = Source.from(mainInputs).via(flow)
|
||||
.runWith(Sink.<List<Integer>>head(), materializer);
|
||||
|
||||
|
|
@ -496,6 +496,38 @@ public class FlowTest extends StreamTest {
|
|||
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseFlatMapMerge() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Iterable<Integer> input1 = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
||||
final Iterable<Integer> input2 = Arrays.asList(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
|
||||
final Iterable<Integer> input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
|
||||
final Iterable<Integer> input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39);
|
||||
|
||||
final List<Source<Integer, BoxedUnit>> mainInputs = new ArrayList<Source<Integer,BoxedUnit>>();
|
||||
mainInputs.add(Source.from(input1));
|
||||
mainInputs.add(Source.from(input2));
|
||||
mainInputs.add(Source.from(input3));
|
||||
mainInputs.add(Source.from(input4));
|
||||
|
||||
final Flow<Source<Integer, BoxedUnit>, List<Integer>, ?> flow = Flow.<Source<Integer, BoxedUnit>>create().
|
||||
flatMapMerge(3, ConstantFun.<Source<Integer, BoxedUnit>>javaIdentityFunction()).grouped(60);
|
||||
Future<List<Integer>> future = Source.from(mainInputs).via(flow)
|
||||
.runWith(Sink.<List<Integer>>head(), materializer);
|
||||
|
||||
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
final Set<Integer> set = new HashSet<Integer>();
|
||||
for (Integer i: result) {
|
||||
set.add(i);
|
||||
}
|
||||
final Set<Integer> expected = new HashSet<Integer>();
|
||||
for (int i = 0; i < 40; ++i) {
|
||||
expected.add(i);
|
||||
}
|
||||
|
||||
assertEquals(expected, set);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseBuffer() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
|
|
|
|||
|
|
@ -349,12 +349,12 @@ public class SourceTest extends StreamTest {
|
|||
final Iterable<Integer> input1 = Arrays.asList(1, 2, 3);
|
||||
final Iterable<Integer> input2 = Arrays.asList(4, 5);
|
||||
|
||||
final List<Source<Integer, ?>> mainInputs = new ArrayList<Source<Integer,?>>();
|
||||
final List<Source<Integer, BoxedUnit>> mainInputs = new ArrayList<Source<Integer,BoxedUnit>>();
|
||||
mainInputs.add(Source.from(input1));
|
||||
mainInputs.add(Source.from(input2));
|
||||
|
||||
Future<List<Integer>> future = Source.from(mainInputs)
|
||||
.<Integer>flatMapConcat(ConstantFun.<Source<Integer,?>>javaIdentityFunction())
|
||||
.<Integer, BoxedUnit>flatMapConcat(ConstantFun.<Source<Integer,BoxedUnit>>javaIdentityFunction())
|
||||
.grouped(6)
|
||||
.runWith(Sink.<List<Integer>>head(), materializer);
|
||||
|
||||
|
|
@ -363,6 +363,37 @@ public class SourceTest extends StreamTest {
|
|||
assertEquals(Arrays.asList(1, 2, 3, 4, 5), result);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseFlatMapMerge() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
final Iterable<Integer> input1 = Arrays.asList(0, 1, 2, 3, 4, 5, 6, 7, 8, 9);
|
||||
final Iterable<Integer> input2 = Arrays.asList(10, 11, 12, 13, 14, 15, 16, 17, 18, 19);
|
||||
final Iterable<Integer> input3 = Arrays.asList(20, 21, 22, 23, 24, 25, 26, 27, 28, 29);
|
||||
final Iterable<Integer> input4 = Arrays.asList(30, 31, 32, 33, 34, 35, 36, 37, 38, 39);
|
||||
|
||||
final List<Source<Integer, BoxedUnit>> mainInputs = new ArrayList<Source<Integer,BoxedUnit>>();
|
||||
mainInputs.add(Source.from(input1));
|
||||
mainInputs.add(Source.from(input2));
|
||||
mainInputs.add(Source.from(input3));
|
||||
mainInputs.add(Source.from(input4));
|
||||
|
||||
Future<List<Integer>> future = Source.from(mainInputs)
|
||||
.flatMapMerge(3, ConstantFun.<Source<Integer, BoxedUnit>>javaIdentityFunction()).grouped(60)
|
||||
.runWith(Sink.<List<Integer>>head(), materializer);
|
||||
|
||||
List<Integer> result = Await.result(future, probe.dilated(FiniteDuration.create(3, TimeUnit.SECONDS)));
|
||||
final Set<Integer> set = new HashSet<Integer>();
|
||||
for (Integer i: result) {
|
||||
set.add(i);
|
||||
}
|
||||
final Set<Integer> expected = new HashSet<Integer>();
|
||||
for (int i = 0; i < 40; ++i) {
|
||||
expected.add(i);
|
||||
}
|
||||
|
||||
assertEquals(expected, set);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void mustBeAbleToUseBuffer() throws Exception {
|
||||
final JavaTestKit probe = new JavaTestKit(system);
|
||||
|
|
|
|||
|
|
@ -54,7 +54,7 @@ class FlowConcatAllSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"on onError on master stream cancel the current open substream and signal error" in assertAllStagesStopped {
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, _]]()
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, Unit]]()
|
||||
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||
Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
|
||||
|
||||
|
|
@ -74,7 +74,7 @@ class FlowConcatAllSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"on onError on master stream cancel the currently opening substream and signal error" in assertAllStagesStopped {
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, _]]()
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, Unit]]()
|
||||
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||
Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
|
||||
|
||||
|
|
@ -114,7 +114,7 @@ class FlowConcatAllSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"on onError on open substream, cancel the master stream and signal error " in assertAllStagesStopped {
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, _]]()
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, Unit]]()
|
||||
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||
Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
|
||||
|
||||
|
|
@ -134,7 +134,7 @@ class FlowConcatAllSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"on cancellation cancel the current open substream and the master stream" in assertAllStagesStopped {
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, _]]()
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, Unit]]()
|
||||
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||
Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
|
||||
|
||||
|
|
@ -155,7 +155,7 @@ class FlowConcatAllSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"on cancellation cancel the currently opening substream and the master stream" in assertAllStagesStopped {
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, _]]()
|
||||
val publisher = TestPublisher.manualProbe[Source[Int, Unit]]()
|
||||
val subscriber = TestSubscriber.manualProbe[Int]()
|
||||
Source(publisher).flatMapConcat(ConstantFun.scalaIdentityFunction).to(Sink(subscriber)).run()
|
||||
|
||||
|
|
@ -178,11 +178,11 @@ class FlowConcatAllSpec extends AkkaSpec {
|
|||
}
|
||||
|
||||
"pass along early cancellation" in assertAllStagesStopped {
|
||||
val up = TestPublisher.manualProbe[Source[Int, _]]()
|
||||
val up = TestPublisher.manualProbe[Source[Int, Unit]]()
|
||||
val down = TestSubscriber.manualProbe[Int]()
|
||||
|
||||
val flowSubscriber = Source
|
||||
.subscriber[Source[Int, _]]
|
||||
.subscriber[Source[Int, Unit]]
|
||||
.flatMapConcat(ConstantFun.scalaIdentityFunction)
|
||||
.to(Sink(down))
|
||||
.run()
|
||||
|
|
|
|||
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.scaladsl
|
||||
|
||||
import akka.stream.testkit.AkkaSpec
|
||||
import akka.stream.ActorMaterializer
|
||||
import scala.concurrent._
|
||||
import scala.concurrent.duration._
|
||||
import org.scalatest.concurrent.ScalaFutures
|
||||
import org.scalactic.ConversionCheckedTripleEquals
|
||||
import akka.stream.testkit.TestPublisher
|
||||
import org.scalatest.exceptions.TestFailedException
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.testkit.TestLatch
|
||||
|
||||
class FlowFlattenMergeSpec extends AkkaSpec with ScalaFutures with ConversionCheckedTripleEquals {
|
||||
implicit val mat = ActorMaterializer()
|
||||
import system.dispatcher
|
||||
|
||||
def src10(i: Int) = Source(i until (i + 10))
|
||||
def blocked = Source(Promise[Int].future)
|
||||
|
||||
val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
|
||||
val toSet = toSeq.mapMaterializedValue(_.map(_.toSet))
|
||||
|
||||
implicit val patience = PatienceConfig(1.second)
|
||||
|
||||
"A FattenMerge" must {
|
||||
|
||||
"work in the nominal case" in {
|
||||
Source(List(src10(0), src10(10), src10(20), src10(30)))
|
||||
.flatMapMerge(4, identity)
|
||||
.runWith(toSet)
|
||||
.futureValue should ===((0 until 40).toSet)
|
||||
}
|
||||
|
||||
"not be held back by one slow stream" in {
|
||||
Source(List(src10(0), src10(10), blocked, src10(20), src10(30)))
|
||||
.flatMapMerge(3, identity)
|
||||
.take(40)
|
||||
.runWith(toSet)
|
||||
.futureValue should ===((0 until 40).toSet)
|
||||
}
|
||||
|
||||
"respect breadth" in {
|
||||
val seq = Source(List(src10(0), src10(10), src10(20), blocked, blocked, src10(30)))
|
||||
.flatMapMerge(3, identity)
|
||||
.take(40)
|
||||
.runWith(toSeq)
|
||||
.futureValue
|
||||
|
||||
seq.take(30).toSet should ===((0 until 30).toSet)
|
||||
seq.drop(30).toSet should ===((30 until 40).toSet)
|
||||
}
|
||||
|
||||
"propagate early failure from main stream" in {
|
||||
val ex = new Exception("buh")
|
||||
intercept[TestFailedException] {
|
||||
Source.failed(ex)
|
||||
.flatMapMerge(1, identity)
|
||||
.runWith(Sink.head)
|
||||
.futureValue
|
||||
}.cause.get should ===(ex)
|
||||
}
|
||||
|
||||
"propagate late failure from main stream" in {
|
||||
val ex = new Exception("buh")
|
||||
intercept[TestFailedException] {
|
||||
(Source(List(blocked, blocked)) ++ Source.failed(ex))
|
||||
.flatMapMerge(10, identity)
|
||||
.runWith(Sink.head)
|
||||
.futureValue
|
||||
}.cause.get should ===(ex)
|
||||
}
|
||||
|
||||
"propagate failure from map function" in {
|
||||
val ex = new Exception("buh")
|
||||
intercept[TestFailedException] {
|
||||
Source(1 to 3)
|
||||
.flatMapMerge(10, i ⇒ if (i == 3) throw ex else blocked)
|
||||
.runWith(Sink.head)
|
||||
.futureValue
|
||||
}.cause.get should ===(ex)
|
||||
}
|
||||
|
||||
"bubble up substream exceptions" in {
|
||||
val ex = new Exception("buh")
|
||||
intercept[TestFailedException] {
|
||||
Source(List(blocked, blocked, Source.failed(ex)))
|
||||
.flatMapMerge(10, identity)
|
||||
.runWith(Sink.head)
|
||||
.futureValue
|
||||
}.cause.get should ===(ex)
|
||||
}
|
||||
|
||||
"cancel substreams when failing from main stream" in {
|
||||
val p1, p2 = TestPublisher.probe[Int]()
|
||||
val ex = new Exception("buh")
|
||||
val p = Promise[Source[Int, Unit]]
|
||||
(Source(List(Source(p1), Source(p2))) ++ Source(p.future))
|
||||
.flatMapMerge(5, identity)
|
||||
.runWith(Sink.head)
|
||||
p1.expectRequest()
|
||||
p2.expectRequest()
|
||||
p.failure(ex)
|
||||
p1.expectCancellation()
|
||||
p2.expectCancellation()
|
||||
}
|
||||
|
||||
"cancel substreams when failing from substream" in {
|
||||
val p1, p2 = TestPublisher.probe[Int]()
|
||||
val ex = new Exception("buh")
|
||||
val p = Promise[Int]
|
||||
Source(List(Source(p1), Source(p2), Source(p.future)))
|
||||
.flatMapMerge(5, identity)
|
||||
.runWith(Sink.head)
|
||||
p1.expectRequest()
|
||||
p2.expectRequest()
|
||||
p.failure(ex)
|
||||
p1.expectCancellation()
|
||||
p2.expectCancellation()
|
||||
}
|
||||
|
||||
"cancel substreams when failing map function" in {
|
||||
val p1, p2 = TestPublisher.probe[Int]()
|
||||
val ex = new Exception("buh")
|
||||
val latch = TestLatch()
|
||||
Source(1 to 3)
|
||||
.flatMapMerge(10, {
|
||||
case 1 ⇒ Source(p1)
|
||||
case 2 ⇒ Source(p2)
|
||||
case 3 ⇒
|
||||
Await.ready(latch, 3.seconds)
|
||||
throw ex
|
||||
})
|
||||
.runWith(Sink.head)
|
||||
p1.expectRequest()
|
||||
p2.expectRequest()
|
||||
latch.countDown()
|
||||
p1.expectCancellation()
|
||||
p2.expectCancellation()
|
||||
}
|
||||
|
||||
"cancel substreams when being cancelled" in {
|
||||
val p1, p2 = TestPublisher.probe[Int]()
|
||||
val ex = new Exception("buh")
|
||||
val sink = Source(List(Source(p1), Source(p2)))
|
||||
.flatMapMerge(5, identity)
|
||||
.runWith(TestSink.probe)
|
||||
sink.request(1)
|
||||
p1.expectRequest()
|
||||
p2.expectRequest()
|
||||
sink.cancel()
|
||||
p1.expectCancellation()
|
||||
p2.expectCancellation()
|
||||
}
|
||||
|
||||
"work with many concurrently queued events" in {
|
||||
val p = Source((0 until 100).map(i ⇒ src10(10 * i)))
|
||||
.flatMapMerge(Int.MaxValue, identity)
|
||||
.runWith(TestSink.probe)
|
||||
p.within(1.second) {
|
||||
p.ensureSubscription()
|
||||
p.expectNoMsg()
|
||||
}
|
||||
val elems = p.within(1.second)((1 to 1000).map(i ⇒ p.requestNext()).toSet)
|
||||
p.expectComplete()
|
||||
elems should ===((0 until 1000).toSet)
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
@ -262,7 +262,6 @@ private[akka] object ActorProcessorFactory {
|
|||
case GroupBy(f, _) ⇒ (GroupByProcessorImpl.props(settings, f), ())
|
||||
case PrefixAndTail(n, _) ⇒ (PrefixAndTailImpl.props(settings, n), ())
|
||||
case Split(d, _) ⇒ (SplitWhereProcessorImpl.props(settings, d), ())
|
||||
case ConcatAll(f, _) ⇒ (ConcatAllImpl.props(f, materializer), ())
|
||||
case DirectProcessor(p, m) ⇒ throw new AssertionError("DirectProcessor cannot end up in ActorProcessorFactory")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,43 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl
|
||||
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.scaladsl.{ Source, Sink }
|
||||
import akka.actor.{ Deploy, Props }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] object ConcatAllImpl {
|
||||
def props(f: Any ⇒ Source[Any, _], materializer: ActorMaterializer): Props =
|
||||
Props(new ConcatAllImpl(f, materializer)).withDeploy(Deploy.local)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] class ConcatAllImpl(f: Any ⇒ Source[Any, _], materializer: ActorMaterializer)
|
||||
extends MultiStreamInputProcessor(materializer.settings) {
|
||||
|
||||
import akka.stream.impl.MultiStreamInputProcessor._
|
||||
|
||||
val takeNextSubstream = TransferPhase(primaryInputs.NeedsInput && primaryOutputs.NeedsDemand) { () ⇒
|
||||
val publisher = f(primaryInputs.dequeueInputElement()).runWith(Sink.publisher(false))(materializer)
|
||||
// FIXME we can pass the flow to createSubstreamInput (but avoiding copy impl now)
|
||||
val inputs = createAndSubscribeSubstreamInput(publisher)
|
||||
nextPhase(streamSubstream(inputs))
|
||||
}
|
||||
|
||||
def streamSubstream(substream: SubstreamInput): TransferPhase =
|
||||
TransferPhase(substream.NeedsInputOrComplete && primaryOutputs.NeedsDemand) { () ⇒
|
||||
if (substream.inputsDepleted) nextPhase(takeNextSubstream)
|
||||
else primaryOutputs.enqueueOutputElement(substream.dequeueInputElement())
|
||||
}
|
||||
|
||||
initialPhase(1, takeNextSubstream)
|
||||
|
||||
override def invalidateSubstreamInput(substream: SubstreamKey, e: Throwable): Unit = fail(e)
|
||||
|
||||
}
|
||||
|
|
@ -233,10 +233,6 @@ private[stream] object Stages {
|
|||
def after(f: Any ⇒ Boolean) = Split(el ⇒ if (f(el)) SplitAfter else Continue, name("splitAfter"))
|
||||
}
|
||||
|
||||
final case class ConcatAll(f: Any ⇒ Source[Any, _], attributes: Attributes = concatAll) extends StageModule {
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
||||
final case class DirectProcessor(p: () ⇒ (Processor[Any, Any], Any), attributes: Attributes = processor) extends StageModule {
|
||||
override def withAttributes(attributes: Attributes) = copy(attributes = attributes)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,173 @@
|
|||
/**
|
||||
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
|
||||
*/
|
||||
package akka.stream.impl.fusing
|
||||
|
||||
import akka.stream._
|
||||
import akka.stream.stage._
|
||||
import akka.stream.scaladsl._
|
||||
import akka.stream.actor.ActorSubscriberMessage
|
||||
import akka.stream.actor.ActorSubscriberMessage._
|
||||
import akka.stream.actor.ActorPublisherMessage
|
||||
import akka.stream.actor.ActorPublisherMessage._
|
||||
import scala.concurrent.forkjoin.ThreadLocalRandom
|
||||
|
||||
import java.{ util ⇒ ju }
|
||||
|
||||
final class FlattenMerge[T, M](breadth: Int) extends GraphStage[FlowShape[Source[T, M], T]] {
|
||||
private val in = Inlet[Source[T, M]]("flatten.in")
|
||||
private val out = Outlet[T]("flatten.out")
|
||||
override val shape = FlowShape(in, out)
|
||||
|
||||
override def createLogic(attr: Attributes) = new GraphStageLogic(shape) {
|
||||
|
||||
var sources = Set.empty[LocalSource[T]]
|
||||
def activeSources = sources.size
|
||||
|
||||
private trait Queue {
|
||||
def hasData: Boolean
|
||||
def enqueue(src: LocalSource[T]): Unit
|
||||
def dequeue(): LocalSource[T]
|
||||
}
|
||||
|
||||
private class FixedQueue extends Queue {
|
||||
final val Size = 16
|
||||
final val Mask = 15
|
||||
|
||||
private val queue = new Array[LocalSource[T]](Size)
|
||||
private var head = 0
|
||||
private var tail = 0
|
||||
|
||||
def hasData = tail != head
|
||||
def enqueue(src: LocalSource[T]): Unit =
|
||||
if (tail - head == Size) {
|
||||
val queue = new DynamicQueue
|
||||
while (hasData) {
|
||||
queue.add(dequeue())
|
||||
}
|
||||
queue.add(src)
|
||||
q = queue
|
||||
} else {
|
||||
queue(tail & Mask) = src
|
||||
tail += 1
|
||||
}
|
||||
def dequeue(): LocalSource[T] = {
|
||||
val ret = queue(head & Mask)
|
||||
head += 1
|
||||
ret
|
||||
}
|
||||
}
|
||||
|
||||
private class DynamicQueue extends ju.LinkedList[LocalSource[T]] with Queue {
|
||||
def hasData = !isEmpty()
|
||||
def enqueue(src: LocalSource[T]): Unit = add(src)
|
||||
def dequeue(): LocalSource[T] = remove()
|
||||
}
|
||||
|
||||
private var q: Queue = new FixedQueue
|
||||
|
||||
def pushOut(): Unit = {
|
||||
val src = q.dequeue()
|
||||
push(out, src.elem)
|
||||
src.elem = null.asInstanceOf[T]
|
||||
if (src.sub != null) src.sub.pull()
|
||||
else removeSource(src)
|
||||
}
|
||||
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
val source = grab(in)
|
||||
addSource(source)
|
||||
if (activeSources < breadth) tryPull(in)
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = if (activeSources == 0) completeStage()
|
||||
})
|
||||
|
||||
setHandler(out, new OutHandler {
|
||||
override def onPull(): Unit = {
|
||||
pull(in)
|
||||
setHandler(out, outHandler)
|
||||
}
|
||||
})
|
||||
|
||||
val outHandler = new OutHandler {
|
||||
// could be unavailable due to async input having been executed before this notification
|
||||
override def onPull(): Unit = if (q.hasData && isAvailable(out)) pushOut()
|
||||
}
|
||||
|
||||
def addSource(source: Source[T, M]): Unit = {
|
||||
val localSource = new LocalSource[T]()
|
||||
sources += localSource
|
||||
val sub = source.runWith(new LocalSink(getAsyncCallback[ActorSubscriberMessage] {
|
||||
case OnNext(elem) ⇒
|
||||
val elemT = elem.asInstanceOf[T]
|
||||
if (isAvailable(out)) {
|
||||
push(out, elemT)
|
||||
localSource.sub.pull()
|
||||
} else {
|
||||
localSource.elem = elemT
|
||||
q.enqueue(localSource)
|
||||
}
|
||||
case OnComplete ⇒
|
||||
localSource.sub = null
|
||||
if (localSource.elem == null) removeSource(localSource)
|
||||
case OnError(ex) ⇒
|
||||
failStage(ex)
|
||||
}.invoke))(interpreter.materializer)
|
||||
localSource.sub = sub
|
||||
}
|
||||
|
||||
def removeSource(src: LocalSource[T]): Unit = {
|
||||
val pullSuppressed = activeSources == breadth
|
||||
sources -= src
|
||||
if (pullSuppressed) tryPull(in)
|
||||
if (activeSources == 0 && isClosed(in)) completeStage()
|
||||
}
|
||||
|
||||
override def postStop(): Unit = {
|
||||
sources.foreach { src ⇒
|
||||
if (src.sub != null) src.sub.cancel()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// TODO possibly place the Local* classes in a companion object depending on where they are reused
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[fusing] final class LocalSinkSubscription[T](sub: ActorPublisherMessage ⇒ Unit) {
|
||||
def pull(): Unit = sub(Request(1))
|
||||
def cancel(): Unit = sub(Cancel)
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[fusing] final class LocalSource[T](var sub: LocalSinkSubscription[T] = null, var elem: T = null.asInstanceOf[T])
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[fusing] final class LocalSink[T](notifier: ActorSubscriberMessage ⇒ Unit) extends GraphStageWithMaterializedValue[SinkShape[T], LocalSinkSubscription[T]] {
|
||||
private val in = Inlet[T]("LocalSink.in")
|
||||
override val shape = SinkShape(in)
|
||||
override def createLogicAndMaterializedValue(attr: Attributes) = {
|
||||
class Logic extends GraphStageLogic(shape) {
|
||||
setHandler(in, new InHandler {
|
||||
override def onPush(): Unit = notifier(OnNext(grab(in)))
|
||||
override def onUpstreamFinish(): Unit = notifier(OnComplete)
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = notifier(OnError(ex))
|
||||
})
|
||||
val sub = new LocalSinkSubscription[T](getAsyncCallback[ActorPublisherMessage] {
|
||||
case Request(1) ⇒ tryPull(in)
|
||||
case Cancel ⇒ completeStage()
|
||||
case _ ⇒
|
||||
}.invoke)
|
||||
override def preStart(): Unit = pull(in)
|
||||
}
|
||||
val logic = new Logic
|
||||
logic -> logic.sub
|
||||
}
|
||||
}
|
||||
|
|
@ -852,10 +852,25 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends
|
|||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
*/
|
||||
def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Flow[In, T, Mat] =
|
||||
new Flow(delegate.flatMapConcat[T](x ⇒ f(x).asScala))
|
||||
def flatMapConcat[T, M](f: function.Function[Out, Source[T, M]]): Flow[In, T, Mat] =
|
||||
new Flow(delegate.flatMapConcat[T, M](x ⇒ f(x).asScala))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
* substreams are being consumed at any given time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, Source[T, M]]): Flow[In, T, Mat] =
|
||||
new Flow(delegate.flatMapMerge(breadth, o ⇒ f(o).asScala))
|
||||
|
||||
/**
|
||||
* Concatenate the given [[Source]] to this [[Flow]], meaning that once this
|
||||
|
|
|
|||
|
|
@ -931,10 +931,25 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap
|
|||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
*/
|
||||
def flatMapConcat[T](f: function.Function[Out, Source[T, _]]): Source[T, Mat] =
|
||||
new Source(delegate.flatMapConcat[T](x ⇒ f(x).asScala))
|
||||
def flatMapConcat[T, M](f: function.Function[Out, Source[T, M]]): Source[T, Mat] =
|
||||
new Source(delegate.flatMapConcat[T, M](x ⇒ f(x).asScala))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
* substreams are being consumed at any given time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def flatMapMerge[T, M](breadth: Int, f: function.Function[Out, Source[T, M]]): Source[T, Mat] =
|
||||
new Source(delegate.flatMapMerge(breadth, o ⇒ f(o).asScala))
|
||||
|
||||
/**
|
||||
* If the first element has not passed through this stage before the provided timeout, the stream is failed
|
||||
|
|
|
|||
|
|
@ -13,12 +13,12 @@ import akka.stream.impl.fusing.{ DropWithin, GroupedWithin, MapAsync, MapAsyncUn
|
|||
import akka.stream.stage.AbstractStage.{ PushPullGraphStage, PushPullGraphStageWithMaterializedValue }
|
||||
import akka.stream.stage._
|
||||
import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription }
|
||||
|
||||
import scala.annotation.unchecked.uncheckedVariance
|
||||
import scala.collection.immutable
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.{ Duration, FiniteDuration }
|
||||
import scala.language.higherKinds
|
||||
import akka.stream.impl.fusing.FlattenMerge
|
||||
|
||||
/**
|
||||
* A `Flow` is a set of stream processing steps that has one open input and one open output.
|
||||
|
|
@ -330,6 +330,13 @@ trait FlowOps[+Out, +Mat] {
|
|||
import akka.stream.impl.Stages._
|
||||
type Repr[+O, +M] <: FlowOps[O, M]
|
||||
|
||||
/*
|
||||
* Repr is actually self-bounded, but that would be a cyclic type declaration that is illegal in Scala.
|
||||
* Therefore we need to help the compiler by specifying that Repr
|
||||
*/
|
||||
import language.implicitConversions
|
||||
private implicit def reprFlatten[O1, M1, O2, M2](r: Repr[O1, M1]#Repr[O2, M2]): Repr[O2, M2] = r.asInstanceOf[Repr[O2, M2]]
|
||||
|
||||
/**
|
||||
* Transform this [[Flow]] by appending the given processing steps.
|
||||
* {{{
|
||||
|
|
@ -1009,10 +1016,23 @@ trait FlowOps[+Out, +Mat] {
|
|||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*
|
||||
*/
|
||||
def flatMapConcat[T](f: Out ⇒ Source[T, _]): Repr[T, Mat] =
|
||||
deprecatedAndThen(ConcatAll(f.asInstanceOf[Any ⇒ Source[Any, _]]))
|
||||
def flatMapConcat[T, M](f: Out ⇒ Source[T, M]): Repr[T, Mat] = map(f).via(new FlattenMerge[T, M](1))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
* substreams are being consumed at any given time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
*/
|
||||
def flatMapMerge[T, M](breadth: Int, f: Out ⇒ Source[T, M]): Repr[T, Mat] = map(f).via(new FlattenMerge[T, M](breadth))
|
||||
|
||||
/**
|
||||
* If the first element has not passed through this stage before the provided timeout, the stream is failed
|
||||
|
|
@ -1348,4 +1368,3 @@ trait FlowOps[+Out, +Mat] {
|
|||
|
||||
private[scaladsl] def deprecatedAndThen[U](op: StageModule): Repr[U, Mat]
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue