feat: Add flatMapConcat with parallelism support. (#1702)
This commit is contained in:
parent
eb5dc14fb0
commit
8ff1d82717
9 changed files with 730 additions and 1 deletions
|
|
@ -16,7 +16,7 @@ package org.apache.pekko.stream
|
|||
import java.util.concurrent.CountDownLatch
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import scala.concurrent.Await
|
||||
import scala.concurrent.{ Await, Future }
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
|
@ -76,6 +76,16 @@ class FlatMapConcatBenchmark {
|
|||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def sourceDotSingleP1(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource.flatMapConcat(1, Source.single).runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def internalSingleSource(): Unit = {
|
||||
|
|
@ -88,6 +98,18 @@ class FlatMapConcatBenchmark {
|
|||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def internalSingleSourceP1(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource
|
||||
.flatMapConcat(1, elem => new GraphStages.SingleSource(elem))
|
||||
.runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def oneElementList(): Unit = {
|
||||
|
|
@ -98,6 +120,64 @@ class FlatMapConcatBenchmark {
|
|||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def oneElementListP1(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource.flatMapConcat(1, n => Source(n :: Nil)).runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def completedFuture(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource
|
||||
.flatMapConcat(n => Source.future(Future.successful(n)))
|
||||
.runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def completedFutureP1(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource
|
||||
.flatMapConcat(1, n => Source.future(Future.successful(n)))
|
||||
.runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def normalFuture(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource
|
||||
.flatMapConcat(n => Source.future(Future(n)(system.dispatcher)))
|
||||
.runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def normalFutureP1(): Unit = {
|
||||
val latch = new CountDownLatch(1)
|
||||
|
||||
testSource
|
||||
.flatMapConcat(1, n => Source.future(Future(n)(system.dispatcher)))
|
||||
.runWith(new LatchSink(OperationsPerInvocation, latch))
|
||||
|
||||
awaitLatch(latch)
|
||||
}
|
||||
|
||||
@Benchmark
|
||||
@OperationsPerInvocation(OperationsPerInvocation)
|
||||
def mapBaseline(): Unit = {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,205 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.pekko.stream.scaladsl
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.pattern.FutureTimeoutSupport
|
||||
import pekko.NotUsed
|
||||
import pekko.stream._
|
||||
import pekko.stream.testkit.{ ScriptedTest, StreamSpec }
|
||||
import pekko.stream.testkit.scaladsl.TestSink
|
||||
|
||||
import java.util.concurrent.ThreadLocalRandom
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.util.Collections
|
||||
|
||||
import scala.annotation.switch
|
||||
import scala.concurrent.duration.DurationInt
|
||||
import scala.concurrent.Future
|
||||
import scala.util.control.NoStackTrace
|
||||
|
||||
class FlowFlatMapConcatParallelismSpec extends StreamSpec("""
|
||||
pekko.stream.materializer.initial-input-buffer-size = 2
|
||||
""") with ScriptedTest with FutureTimeoutSupport {
|
||||
val toSeq = Flow[Int].grouped(1000).toMat(Sink.head)(Keep.right)
|
||||
|
||||
class BoomException extends RuntimeException("BOOM~~") with NoStackTrace
|
||||
"A flatMapConcat" must {
|
||||
|
||||
for (i <- 1 until 129) {
|
||||
s"work with value presented sources with parallelism: $i" in {
|
||||
Source(
|
||||
List(
|
||||
Source.empty[Int],
|
||||
Source.single(1),
|
||||
Source.empty[Int],
|
||||
Source(List(2, 3, 4)),
|
||||
Source.future(Future.successful(5)),
|
||||
Source.lazyFuture(() => Future.successful(6)),
|
||||
Source.future(after(1.millis)(Future.successful(7)))))
|
||||
.flatMapConcat(i, identity)
|
||||
.runWith(toSeq)
|
||||
.futureValue should ===(1 to 7)
|
||||
}
|
||||
}
|
||||
|
||||
def generateRandomValuePresentedSources(nums: Int): (Int, List[Source[Int, NotUsed]]) = {
|
||||
val seq = List.tabulate(nums) { _ =>
|
||||
val random = ThreadLocalRandom.current().nextInt(1, 10)
|
||||
(random: @switch) match {
|
||||
case 1 => Source.single(1)
|
||||
case 2 => Source(List(1))
|
||||
case 3 => Source.fromJavaStream(() => Collections.singleton(1).stream())
|
||||
case 4 => Source.future(Future.successful(1))
|
||||
case 5 => Source.future(after(1.millis)(Future.successful(1)))
|
||||
case _ => Source.empty[Int]
|
||||
}
|
||||
}
|
||||
val sum = seq.filterNot(_.eq(Source.empty[Int])).size
|
||||
(sum, seq)
|
||||
}
|
||||
|
||||
def generateSequencedValuePresentedSources(nums: Int): (Int, List[Source[Int, NotUsed]]) = {
|
||||
val seq = List.tabulate(nums) { index =>
|
||||
val random = ThreadLocalRandom.current().nextInt(1, 6)
|
||||
(random: @switch) match {
|
||||
case 1 => Source.single(index)
|
||||
case 2 => Source(List(index))
|
||||
case 3 => Source.fromJavaStream(() => Collections.singleton(index).stream())
|
||||
case 4 => Source.future(Future.successful(index))
|
||||
case 5 => Source.future(after(1.millis)(Future.successful(index)))
|
||||
case _ => throw new IllegalStateException("unexpected")
|
||||
}
|
||||
}
|
||||
val sum = (0 until nums).sum
|
||||
(sum, seq)
|
||||
}
|
||||
|
||||
for (i <- 1 until 129) {
|
||||
s"work with generated value presented sources with parallelism: $i " in {
|
||||
val (sum, sources @ _) = generateRandomValuePresentedSources(100000)
|
||||
Source(sources)
|
||||
.flatMapConcat(i, identity(_)) // scala 2.12 can't infer the type of identity
|
||||
.runWith(Sink.seq)
|
||||
.map(_.sum)(pekko.dispatch.ExecutionContexts.parasitic)
|
||||
.futureValue shouldBe sum
|
||||
}
|
||||
}
|
||||
|
||||
for (i <- 1 until 129) {
|
||||
s"work with generated value sequenced sources with parallelism: $i " in {
|
||||
val (sum, sources @ _) = generateSequencedValuePresentedSources(100000)
|
||||
Source(sources)
|
||||
.flatMapConcat(i, identity(_)) // scala 2.12 can't infer the type of identity
|
||||
// check the order
|
||||
.statefulMap(() => -1)((pre, current) => {
|
||||
if (pre + 1 != current) {
|
||||
throw new IllegalStateException(s"expected $pre + 1 == $current")
|
||||
}
|
||||
(current, current)
|
||||
}, _ => None)
|
||||
.runWith(Sink.seq)
|
||||
.map(_.sum)(pekko.dispatch.ExecutionContexts.parasitic)
|
||||
.futureValue shouldBe sum
|
||||
}
|
||||
}
|
||||
|
||||
"work with value presented failed sources" in {
|
||||
val ex = new BoomException
|
||||
Source(
|
||||
List(
|
||||
Source.empty[Int],
|
||||
Source.single(1),
|
||||
Source.empty[Int],
|
||||
Source(List(2, 3, 4)),
|
||||
Source.future(Future.failed(ex)),
|
||||
Source.lazyFuture(() => Future.successful(5))))
|
||||
.flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity)
|
||||
.onErrorComplete[BoomException]()
|
||||
.runWith(toSeq)
|
||||
.futureValue should ===(1 to 4)
|
||||
}
|
||||
|
||||
"work with value presented sources when demands slow" in {
|
||||
val prob = Source(
|
||||
List(Source.empty[Int], Source.single(1), Source(List(2, 3, 4)), Source.lazyFuture(() => Future.successful(5))))
|
||||
.flatMapConcat(ThreadLocalRandom.current().nextInt(1, 129), identity)
|
||||
.runWith(TestSink())
|
||||
|
||||
prob.request(1)
|
||||
prob.expectNext(1)
|
||||
prob.expectNoMessage(1.seconds)
|
||||
prob.request(2)
|
||||
prob.expectNext(2, 3)
|
||||
prob.expectNoMessage(1.seconds)
|
||||
prob.request(2)
|
||||
prob.expectNext(4, 5)
|
||||
prob.expectComplete()
|
||||
}
|
||||
|
||||
val parallelism = ThreadLocalRandom.current().nextInt(4, 65)
|
||||
s"can do pre materialization when parallelism > 1, parallelism is $parallelism" in {
|
||||
val materializationCounter = new AtomicInteger(0)
|
||||
val prob = Source(1 to (parallelism * 3))
|
||||
.flatMapConcat(
|
||||
parallelism,
|
||||
value => {
|
||||
Source
|
||||
.lazySingle(() => {
|
||||
materializationCounter.incrementAndGet()
|
||||
value
|
||||
})
|
||||
.buffer(1, overflowStrategy = OverflowStrategy.backpressure)
|
||||
})
|
||||
.runWith(TestSink())
|
||||
|
||||
expectNoMessage(1.seconds)
|
||||
materializationCounter.get() shouldBe 0
|
||||
|
||||
prob.request(1)
|
||||
prob.expectNext(1.seconds, 1)
|
||||
expectNoMessage(1.seconds)
|
||||
materializationCounter.get() shouldBe (parallelism + 1)
|
||||
materializationCounter.set(0)
|
||||
|
||||
prob.request(2)
|
||||
prob.expectNextN(List(2, 3))
|
||||
expectNoMessage(1.seconds)
|
||||
materializationCounter.get() shouldBe 2
|
||||
materializationCounter.set(0)
|
||||
|
||||
prob.request(parallelism - 3)
|
||||
prob.expectNextN(4 to parallelism)
|
||||
expectNoMessage(1.seconds)
|
||||
materializationCounter.get() shouldBe (parallelism - 3)
|
||||
materializationCounter.set(0)
|
||||
|
||||
prob.request(parallelism)
|
||||
prob.expectNextN(parallelism + 1 to parallelism * 2)
|
||||
expectNoMessage(1.seconds)
|
||||
materializationCounter.get() shouldBe parallelism
|
||||
materializationCounter.set(0)
|
||||
|
||||
prob.request(parallelism)
|
||||
prob.expectNextN(parallelism * 2 + 1 to parallelism * 3)
|
||||
expectNoMessage(1.seconds)
|
||||
materializationCounter.get() shouldBe 0
|
||||
prob.expectComplete()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -94,6 +94,7 @@ import pekko.stream.Attributes._
|
|||
val mergePreferred = name("mergePreferred")
|
||||
val mergePrioritized = name("mergePrioritized")
|
||||
val flattenMerge = name("flattenMerge")
|
||||
val flattenConcat = name("flattenConcat")
|
||||
val recoverWith = name("recoverWith")
|
||||
val onErrorComplete = name("onErrorComplete")
|
||||
val broadcast = name("broadcast")
|
||||
|
|
|
|||
|
|
@ -0,0 +1,347 @@
|
|||
/*
|
||||
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||
* contributor license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright ownership.
|
||||
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||
* (the "License"); you may not use this file except in compliance with
|
||||
* the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.pekko.stream.impl.fusing
|
||||
|
||||
import org.apache.pekko
|
||||
import pekko.annotation.InternalApi
|
||||
import pekko.stream.scaladsl.Source
|
||||
import pekko.stream.{ Attributes, FlowShape, Graph, Inlet, Outlet, SourceShape, SubscriptionWithCancelException }
|
||||
import pekko.stream.impl.Stages.DefaultAttributes
|
||||
import pekko.stream.impl.{ Buffer => BufferImpl, FailedSource, JavaStreamSource, TraversalBuilder }
|
||||
import pekko.stream.impl.fusing.GraphStages.{ FutureSource, SingleSource }
|
||||
import pekko.stream.stage.{ GraphStage, GraphStageLogic, InHandler, OutHandler }
|
||||
import pekko.util.OptionVal
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.util.{ Failure, Try }
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[pekko] object FlattenConcat {
|
||||
private sealed abstract class InflightSource[T] {
|
||||
def hasNext: Boolean
|
||||
def next(): T
|
||||
def tryPull(): Unit
|
||||
def cancel(cause: Throwable): Unit
|
||||
def isClosed: Boolean
|
||||
def hasFailed: Boolean = failure.isDefined
|
||||
def failure: Option[Throwable] = None
|
||||
def materialize(): Unit = ()
|
||||
}
|
||||
|
||||
private final class InflightIteratorSource[T](iterator: Iterator[T]) extends InflightSource[T] {
|
||||
override def hasNext: Boolean = iterator.hasNext
|
||||
override def next(): T = iterator.next()
|
||||
override def tryPull(): Unit = ()
|
||||
override def cancel(cause: Throwable): Unit = ()
|
||||
override def isClosed: Boolean = !hasNext
|
||||
}
|
||||
|
||||
private final class InflightCompletedFutureSource[T](result: Try[T]) extends InflightSource[T] {
|
||||
private var _hasNext = result.isSuccess
|
||||
override def hasNext: Boolean = _hasNext
|
||||
override def next(): T = {
|
||||
if (_hasNext) {
|
||||
_hasNext = false
|
||||
result.get
|
||||
} else throw new NoSuchElementException("next called after completion")
|
||||
}
|
||||
override def hasFailed: Boolean = result.isFailure
|
||||
override def failure: Option[Throwable] = result.failed.toOption
|
||||
override def tryPull(): Unit = ()
|
||||
override def cancel(cause: Throwable): Unit = ()
|
||||
override def isClosed: Boolean = true
|
||||
}
|
||||
|
||||
private final class InflightPendingFutureSource[T](cb: InflightSource[T] => Unit)
|
||||
extends InflightSource[T]
|
||||
with (Try[T] => Unit) {
|
||||
private var result: Try[T] = MapAsync.NotYetThere
|
||||
private var consumed = false
|
||||
override def apply(result: Try[T]): Unit = {
|
||||
this.result = result
|
||||
cb(this)
|
||||
}
|
||||
override def hasNext: Boolean = (result ne MapAsync.NotYetThere) && !consumed && result.isSuccess
|
||||
override def next(): T = {
|
||||
if (!consumed) {
|
||||
consumed = true
|
||||
result.get
|
||||
} else throw new NoSuchElementException("next called after completion")
|
||||
}
|
||||
override def hasFailed: Boolean = (result ne MapAsync.NotYetThere) && result.isFailure
|
||||
override def failure: Option[Throwable] = if (result eq MapAsync.NotYetThere) None else result.failed.toOption
|
||||
override def tryPull(): Unit = ()
|
||||
override def cancel(cause: Throwable): Unit = ()
|
||||
override def isClosed: Boolean = consumed || hasFailed
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[pekko] final class FlattenConcat[T, M](parallelism: Int)
|
||||
extends GraphStage[FlowShape[Graph[SourceShape[T], M], T]] {
|
||||
require(parallelism >= 1, "parallelism should >= 1")
|
||||
private val in = Inlet[Graph[SourceShape[T], M]]("flattenConcat.in")
|
||||
private val out = Outlet[T]("flattenConcat.out")
|
||||
|
||||
override def initialAttributes: Attributes = DefaultAttributes.flattenConcat
|
||||
override val shape: FlowShape[Graph[SourceShape[T], M], T] = FlowShape(in, out)
|
||||
override def createLogic(enclosingAttributes: Attributes) = {
|
||||
object FlattenConcatLogic extends GraphStageLogic(shape) with InHandler with OutHandler {
|
||||
import FlattenConcat._
|
||||
// InflightSource[T] or SingleSource[T]
|
||||
// AnyRef here to avoid lift the SingleSource[T] to InflightSource[T]
|
||||
private var queue: BufferImpl[AnyRef] = _
|
||||
private val invokeCb: InflightSource[T] => Unit =
|
||||
getAsyncCallback[InflightSource[T]](futureSourceCompleted).invoke
|
||||
|
||||
override def preStart(): Unit = queue = BufferImpl(parallelism, enclosingAttributes)
|
||||
|
||||
private def futureSourceCompleted(futureSource: InflightSource[T]): Unit = {
|
||||
if (queue.peek() eq futureSource) {
|
||||
if (isAvailable(out) && futureSource.hasNext) {
|
||||
push(out, futureSource.next()) // TODO should filter out the `null` here?
|
||||
if (futureSource.isClosed) {
|
||||
handleCurrentSourceClosed(futureSource)
|
||||
}
|
||||
} else if (futureSource.isClosed) {
|
||||
handleCurrentSourceClosed(futureSource)
|
||||
}
|
||||
} // else just ignore, it will be picked up by onPull
|
||||
}
|
||||
|
||||
override def onPush(): Unit = {
|
||||
addSource(grab(in))
|
||||
// must try pull after addSource to avoid queue overflow
|
||||
if (!queue.isFull) { // try to keep the maximum parallelism
|
||||
tryPull(in)
|
||||
}
|
||||
}
|
||||
|
||||
override def onUpstreamFinish(): Unit = if (queue.isEmpty) completeStage()
|
||||
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
super.onUpstreamFailure(ex)
|
||||
cancelInflightSources(SubscriptionWithCancelException.NoMoreElementsNeeded)
|
||||
}
|
||||
|
||||
override def onPull(): Unit = {
|
||||
// purge if possible
|
||||
queue.peek() match {
|
||||
case src: SingleSource[T] @unchecked =>
|
||||
push(out, src.elem)
|
||||
removeSource()
|
||||
case src: InflightSource[T] @unchecked => pushOut(src)
|
||||
case null => // queue is empty
|
||||
if (!hasBeenPulled(in)) {
|
||||
tryPull(in)
|
||||
} else if (isClosed(in)) {
|
||||
completeStage()
|
||||
}
|
||||
case _ => throw new IllegalStateException("Should not reach here.")
|
||||
}
|
||||
}
|
||||
|
||||
private def pushOut(src: InflightSource[T]): Unit = {
|
||||
if (src.hasNext) {
|
||||
push(out, src.next())
|
||||
if (src.isClosed) {
|
||||
handleCurrentSourceClosed(src)
|
||||
}
|
||||
} else if (src.isClosed) {
|
||||
handleCurrentSourceClosed(src)
|
||||
} else {
|
||||
src.tryPull()
|
||||
}
|
||||
}
|
||||
|
||||
private def handleCurrentSourceClosed(source: InflightSource[T]): Unit = {
|
||||
source.failure match {
|
||||
case Some(cause) => onUpstreamFailure(cause)
|
||||
case None => removeSource(source)
|
||||
}
|
||||
}
|
||||
|
||||
override def onDownstreamFinish(cause: Throwable): Unit = {
|
||||
super.onDownstreamFinish(cause)
|
||||
cancelInflightSources(cause)
|
||||
}
|
||||
|
||||
private def cancelInflightSources(cause: Throwable): Unit = {
|
||||
if (queue.nonEmpty) {
|
||||
var source = queue.dequeue()
|
||||
while ((source ne null) && (source.isInstanceOf[InflightSource[T] @unchecked])) {
|
||||
source.asInstanceOf[InflightSource[T]].cancel(cause)
|
||||
source = queue.dequeue()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def addSource(singleSource: SingleSource[T]): Unit = {
|
||||
if (isAvailable(out) && queue.isEmpty) {
|
||||
push(out, singleSource.elem)
|
||||
} else {
|
||||
queue.enqueue(singleSource)
|
||||
}
|
||||
}
|
||||
|
||||
private def addSourceElements(iterator: Iterator[T]): Unit = {
|
||||
val inflightSource = new InflightIteratorSource[T](iterator)
|
||||
if (isAvailable(out) && queue.isEmpty) {
|
||||
if (inflightSource.hasNext) {
|
||||
push(out, inflightSource.next())
|
||||
if (inflightSource.hasNext) {
|
||||
queue.enqueue(inflightSource)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
queue.enqueue(inflightSource)
|
||||
}
|
||||
}
|
||||
|
||||
private def addCompletedFutureElem(elem: Try[T]): Unit = {
|
||||
if (isAvailable(out) && queue.isEmpty) {
|
||||
elem match {
|
||||
case scala.util.Success(value) => push(out, value)
|
||||
case scala.util.Failure(ex) => onUpstreamFailure(ex)
|
||||
}
|
||||
} else {
|
||||
queue.enqueue(new InflightCompletedFutureSource(elem))
|
||||
}
|
||||
}
|
||||
|
||||
private def addPendingFutureElem(future: Future[T]): Unit = {
|
||||
val inflightSource = new InflightPendingFutureSource[T](invokeCb)
|
||||
future.onComplete(inflightSource)(pekko.dispatch.ExecutionContexts.parasitic)
|
||||
queue.enqueue(inflightSource)
|
||||
}
|
||||
|
||||
private def attachAndMaterializeSource(source: Graph[SourceShape[T], M]): Unit = {
|
||||
object inflightSource extends InflightSource[T] { self =>
|
||||
private val sinkIn = new SubSinkInlet[T]("FlattenConcatSink")
|
||||
private var upstreamFailure = Option.empty[Throwable]
|
||||
sinkIn.setHandler(new InHandler {
|
||||
override def onPush(): Unit = {
|
||||
if (isAvailable(out) && (queue.peek() eq self)) {
|
||||
push(out, sinkIn.grab())
|
||||
}
|
||||
}
|
||||
override def onUpstreamFinish(): Unit = if (!sinkIn.isAvailable) removeSource(self)
|
||||
override def onUpstreamFailure(ex: Throwable): Unit = {
|
||||
upstreamFailure = Some(ex)
|
||||
// if it's the current emitting source, fail the stage
|
||||
if (queue.peek() eq self) {
|
||||
super.onUpstreamFailure(ex)
|
||||
} // else just mark the source as failed
|
||||
}
|
||||
})
|
||||
|
||||
final override def materialize(): Unit = {
|
||||
val graph = Source.fromGraph(source).to(sinkIn.sink)
|
||||
interpreter.subFusingMaterializer.materialize(graph, defaultAttributes = enclosingAttributes)
|
||||
}
|
||||
final override def cancel(cause: Throwable): Unit = sinkIn.cancel(cause)
|
||||
final override def hasNext: Boolean = sinkIn.isAvailable
|
||||
final override def isClosed: Boolean = sinkIn.isClosed
|
||||
final override def failure: Option[Throwable] = upstreamFailure
|
||||
final override def next(): T = sinkIn.grab()
|
||||
final override def tryPull(): Unit = if (!sinkIn.isClosed && !sinkIn.hasBeenPulled) sinkIn.pull()
|
||||
}
|
||||
if (isAvailable(out) && queue.isEmpty) {
|
||||
// this is the first one, pull
|
||||
inflightSource.tryPull()
|
||||
}
|
||||
queue.enqueue(inflightSource)
|
||||
inflightSource.materialize()
|
||||
}
|
||||
|
||||
private def addSource(source: Graph[SourceShape[T], M]): Unit = {
|
||||
TraversalBuilder.getValuePresentedSource(source) match {
|
||||
case OptionVal.Some(graph) =>
|
||||
graph match {
|
||||
case single: SingleSource[T] @unchecked => addSource(single)
|
||||
case futureSource: FutureSource[T] @unchecked =>
|
||||
val future = futureSource.future
|
||||
future.value match {
|
||||
case Some(elem) => addCompletedFutureElem(elem)
|
||||
case None => addPendingFutureElem(future)
|
||||
}
|
||||
case iterable: IterableSource[T] @unchecked => addSourceElements(iterable.elements.iterator)
|
||||
case javaStream: JavaStreamSource[T, _] @unchecked =>
|
||||
import pekko.util.ccompat.JavaConverters._
|
||||
addSourceElements(javaStream.open().iterator.asScala)
|
||||
case failed: FailedSource[T] @unchecked => addCompletedFutureElem(Failure(failed.failure))
|
||||
case maybeEmpty if TraversalBuilder.isEmptySource(maybeEmpty) => // Empty source is discarded
|
||||
case _ => attachAndMaterializeSource(source)
|
||||
}
|
||||
case _ => attachAndMaterializeSource(source)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private def removeSource(): Unit = {
|
||||
queue.dequeue()
|
||||
pullIfNeeded()
|
||||
}
|
||||
|
||||
private def removeSource(source: InflightSource[T]): Unit = {
|
||||
if (source eq queue.peek()) {
|
||||
// only dequeue if it's the current emitting source
|
||||
queue.dequeue()
|
||||
pullIfNeeded()
|
||||
} // not the head source, just ignore
|
||||
}
|
||||
|
||||
private def pullIfNeeded(): Unit = {
|
||||
if (isClosed(in)) {
|
||||
if (queue.isEmpty) {
|
||||
completeStage()
|
||||
} else {
|
||||
tryPullNextSourceInQueue()
|
||||
}
|
||||
} else {
|
||||
if (queue.nonEmpty) {
|
||||
tryPullNextSourceInQueue()
|
||||
}
|
||||
if (!hasBeenPulled(in)) {
|
||||
tryPull(in)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private def tryPullNextSourceInQueue(): Unit = {
|
||||
// pull the new emitting source
|
||||
val nextSource = queue.peek()
|
||||
if (nextSource.isInstanceOf[InflightSource[T] @unchecked]) {
|
||||
nextSource.asInstanceOf[InflightSource[T]].tryPull()
|
||||
}
|
||||
}
|
||||
|
||||
setHandlers(in, out, this)
|
||||
}
|
||||
|
||||
FlattenConcatLogic
|
||||
}
|
||||
|
||||
override def toString: String = s"FlattenConcat(parallelism=$parallelism)"
|
||||
}
|
||||
|
|
@ -2743,6 +2743,26 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
|||
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Flow[In, T, Mat] =
|
||||
new Flow(delegate.flatMapConcat[T, M](x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by concatenation,
|
||||
* fully consuming one Source after the other.
|
||||
* `parallelism` can be used to config the max inflight sources, which will be materialized at the same time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def flatMapConcat[T, M](
|
||||
parallelism: Int,
|
||||
f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Flow[In, T, Mat] =
|
||||
new Flow(delegate.flatMapConcat[T, M](parallelism, x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
|
|
|
|||
|
|
@ -4247,6 +4247,24 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
|||
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] =
|
||||
new Source(delegate.flatMapConcat[T, M](x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by concatenation,
|
||||
* fully consuming one Source after the other.
|
||||
* `parallelism` can be used to config the max inflight sources, which will be materialized at the same time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def flatMapConcat[T, M](parallelism: Int, f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): Source[T, Mat] =
|
||||
new Source(delegate.flatMapConcat[T, M](parallelism, x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
|
|
|
|||
|
|
@ -1783,6 +1783,26 @@ class SubFlow[In, Out, Mat](
|
|||
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubFlow[In, T, Mat] =
|
||||
new SubFlow(delegate.flatMapConcat(x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by concatenation,
|
||||
* fully consuming one Source after the other.
|
||||
* `parallelism` can be used to config the max inflight sources, which will be materialized at the same time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def flatMapConcat[T, M](
|
||||
parallelism: Int,
|
||||
f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubFlow[In, T, Mat] =
|
||||
new SubFlow(delegate.flatMapConcat(parallelism, x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
|
|
|
|||
|
|
@ -1757,6 +1757,26 @@ class SubSource[Out, Mat](
|
|||
def flatMapConcat[T, M](f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat] =
|
||||
new SubSource(delegate.flatMapConcat(x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by concatenation,
|
||||
* fully consuming one Source after the other.
|
||||
* `parallelism` can be used to config the max inflight sources, which will be materialized at the same time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def flatMapConcat[T, M](
|
||||
parallelism: Int,
|
||||
f: function.Function[Out, _ <: Graph[SourceShape[T], M]]): SubSource[T, Mat] =
|
||||
new SubSource(delegate.flatMapConcat(parallelism, x => f(x)))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by merging, where at most `breadth`
|
||||
|
|
|
|||
|
|
@ -2744,6 +2744,24 @@ trait FlowOps[+Out, +Mat] {
|
|||
*/
|
||||
def flatMapConcat[T, M](f: Out => Graph[SourceShape[T], M]): Repr[T] = map(f).via(new FlattenMerge[T, M](1))
|
||||
|
||||
/**
|
||||
* Transform each input element into a `Source` of output elements that is
|
||||
* then flattened into the output stream by concatenation,
|
||||
* fully consuming one Source after the other.
|
||||
* `parallelism` can be used to config the max inflight sources, which will be materialized at the same time.
|
||||
*
|
||||
* '''Emits when''' a currently consumed substream has an element available
|
||||
*
|
||||
* '''Backpressures when''' downstream backpressures
|
||||
*
|
||||
* '''Completes when''' upstream completes and all consumed substreams complete
|
||||
*
|
||||
* '''Cancels when''' downstream cancels
|
||||
* @since 1.2.0
|
||||
*/
|
||||
def flatMapConcat[T, M](parallelism: Int, f: Out => Graph[SourceShape[T], M]): Repr[T] =
|
||||
map(f).via(new FlattenConcat[T, M](parallelism))
|
||||
|
||||
/**
|
||||
* Alias for [[flatMapConcat]], added to enable for comprehensions.
|
||||
*
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue