=str Tweak the stream mapAsyncPartitioned operator

This commit is contained in:
He-Pin 2023-09-23 14:51:31 +08:00 committed by kerr
parent 34815bc196
commit 1b1f57224b
14 changed files with 421 additions and 299 deletions

View file

@ -63,8 +63,7 @@ private object MapAsyncPartitionedSpec {
value = i.toString)
}
def extractPartition(e: TestKeyValue): Int =
e.key
val partitioner: TestKeyValue => Int = kv => kv.key
type Operation = TestKeyValue => Future[(Int, String)]
@ -125,7 +124,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
.mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(blockingOperation)
.mapAsyncPartitionedUnordered(parallelism = 2)(partitioner)(blockingOperation)
.runWith(Sink.seq)
.futureValue
.map(_._2)
@ -137,7 +136,7 @@ class MapAsyncPartitionedSpec
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(asyncOperation)
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@ -153,7 +152,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
.mapAsyncPartitionedUnordered(parallelism = 1)(extractPartition)(asyncOperation)
.mapAsyncPartitionedUnordered(parallelism = 1)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@ -169,7 +168,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(blockingOperation)
.mapAsyncPartitionedUnordered(parallelism.value)(partitioner)(blockingOperation)
.runWith(Sink.seq)
.futureValue
@ -232,7 +231,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
.mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(fun)
.mapAsyncPartitionedUnordered(parallelism = 2)(partitioner)(fun)
.runWith(Sink.seq)
.futureValue
@ -244,7 +243,7 @@ class MapAsyncPartitionedSpec
an[IllegalArgumentException] shouldBe thrownBy {
Source(infiniteStream())
.mapAsyncPartitionedUnordered(
parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit)
parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f = (_, _) => Future.unit)
.runWith(Sink.ignore)
.futureValue
}
@ -272,7 +271,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
.mapAsyncPartitioned(parallelism = 2)(extractPartition)(processElement)
.mapAsyncPartitioned(parallelism = 2)(partitioner)(processElement)
.runWith(Sink.seq)
.futureValue
.map(_._2)
@ -289,7 +288,7 @@ class MapAsyncPartitionedSpec
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
val result =
Source(elements.toIndexedSeq)
.mapAsyncPartitioned(parallelism.value)(extractPartition)(asyncOperation)
.mapAsyncPartitioned(parallelism.value)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@ -305,7 +304,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
.mapAsyncPartitioned(parallelism = 1)(extractPartition)(asyncOperation)
.mapAsyncPartitioned(parallelism = 1)(partitioner)(asyncOperation)
.runWith(Sink.seq)
.futureValue
@ -321,7 +320,7 @@ class MapAsyncPartitionedSpec
val result =
Source
.fromIterator(() => elements.iterator)
.mapAsyncPartitioned(parallelism.value)(extractPartition)(blockingOperation)
.mapAsyncPartitioned(parallelism.value)(partitioner)(blockingOperation)
.runWith(Sink.seq)
.futureValue
@ -384,7 +383,7 @@ class MapAsyncPartitionedSpec
val result =
Source(elements)
.mapAsyncPartitioned(parallelism = 2)(extractPartition)(fun)
.mapAsyncPartitioned(parallelism = 2)(partitioner)(fun)
.runWith(Sink.seq)
.futureValue
@ -396,7 +395,7 @@ class MapAsyncPartitionedSpec
an[IllegalArgumentException] shouldBe thrownBy {
Source(infiniteStream())
.mapAsyncPartitioned(
parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit)
parallelism = zeroOrNegativeParallelism)(partitioner = identity)(f = (_, _) => Future.unit)
.runWith(Sink.ignore)
.futureValue
}

View file

@ -23,102 +23,29 @@ import scala.util.control.{ NoStackTrace, NonFatal }
import scala.util.{ Failure, Success, Try }
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.stream.ActorAttributes.SupervisionStrategy
import pekko.stream.Attributes.{ Name, SourceLocation }
import pekko.stream.MapAsyncPartitioned._
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, SourceWithContext }
import pekko.stream.stage._
import pekko.util.OptionVal
/**
* Internal API
*/
@InternalApi
private[stream] object MapAsyncPartitioned {
private val NotYetThere = Failure(new Exception with NoStackTrace)
private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => Partition)(tuple: (In, Ctx)): Partition =
extract(tuple._1)
private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => Future[Out])(tuple: (In, Ctx),
partition: Partition): Future[(Out, Ctx)] =
f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)(
extractPartition: In => Partition)(
f: (In, Partition) => Future[Out]): Source[Out, Mat] =
source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = true, parallelism, extractPartition, f))
def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)(
extractPartition: In => Partition)(
f: (In, Partition) => Future[Out]): Source[Out, Mat] =
source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = false, parallelism, extractPartition, f))
def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat], parallelism: Int)(
extractPartition: In => Partition)(
f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
flow.via(
new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
orderedOutput = true,
parallelism,
extractPartitionWithCtx(extractPartition),
fWithCtx[In, T, Ctx, Partition](f)))
def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat],
parallelism: Int)(extractPartition: In => Partition)(
f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
flow.via(
new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
orderedOutput = false,
parallelism,
extractPartitionWithCtx(extractPartition),
fWithCtx[In, T, Ctx, Partition](f)))
def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)(
extractPartition: Out => Partition)(
f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, parallelism, extractPartition,
f))
def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)(
extractPartition: Out => Partition)(
f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, parallelism,
extractPartition, f))
def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
extractPartition: Out => Partition)(
f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] =
flow.via(
new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
orderedOutput = true,
parallelism,
extractPartitionWithCtx(extractPartition),
fWithCtx[Out, T, CtxOut, Partition](f)))
def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(extractPartition: Out => Partition)(
f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] =
flow.via(
new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
orderedOutput = false,
parallelism,
extractPartitionWithCtx(extractPartition),
fWithCtx[Out, T, CtxOut, Partition](f)))
private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception with NoStackTrace)
private[stream] final class Holder[In, Out](
val in: In,
var out: Try[Out],
callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
// To support both fail-fast when the supervision directive is Stop
// and not calling the decider multiple times (#23888) we need to cache the decider result and re-use that
private var cachedSupervisionDirective: Option[Supervision.Directive] = None
private final class Holder[In, Out](val in: In, var out: Try[Out], val cb: AsyncCallback[Holder[In, Out]]) extends (
Try[Out] => Unit) {
private var cachedSupervisionDirective: OptionVal[Supervision.Directive] = OptionVal.None
def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): Supervision.Directive = {
cachedSupervisionDirective match {
case Some(d) => d
case OptionVal.Some(d) => d
case _ =>
val d = decider(ex)
cachedSupervisionDirective = Some(d)
cachedSupervisionDirective = OptionVal.Some(d)
d
}
}
@ -128,27 +55,32 @@ private[stream] object MapAsyncPartitioned {
override def apply(t: Try[Out]): Unit = {
setOut(t)
callback.invoke(this)
cb.invoke(this)
}
override def toString = s"Holder($in, $out)"
}
}
private[stream] class MapAsyncPartitioned[In, Out, Partition](
orderedOutput: Boolean,
/**
* Internal API
*/
@InternalApi
private[stream] final class MapAsyncPartitioned[In, Out, Partition](
parallelism: Int,
extractPartition: In => Partition,
orderedOutput: Boolean,
partitioner: In => Partition,
f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
require(parallelism >= 1, "parallelism must be at least 1")
require(partitioner != null, "partitioner function should not be null")
require(f != null, "f function should not be null.")
import MapAsyncPartitioned._
if (parallelism < 1) throw new IllegalArgumentException("parallelism must be at least 1")
private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
private val in = Inlet[In]("MapAsyncPartitioned.in")
private val out = Outlet[Out]("MapAsyncPartitioned.out")
override val shape: FlowShape[In, Out] = FlowShape(in, out)
override def initialAttributes: Attributes =
Attributes(Name("MapAsyncPartitionOrdered")) and SourceLocation.forLambda(f)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
new GraphStageLogic(shape) with InHandler with OutHandler {
private val contextPropagation = pekko.stream.impl.ContextPropagation()
@ -191,13 +123,12 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition](
buffer = mutable.Queue()
}
override def onPull(): Unit =
pushNextIfPossible()
override def onPull(): Unit = pushNextIfPossible()
override def onPush(): Unit = {
try {
val element = grab(in)
val partition = extractPartition(element)
val partition = partitioner(element)
val wrappedInput = new Contextual(
contextPropagation.currentContext(),
@ -217,8 +148,7 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition](
pullIfNeeded()
}
override def onUpstreamFinish(): Unit =
if (idle()) completeStage()
override def onUpstreamFinish(): Unit = if (idle()) completeStage()
private def processElement(partition: Partition, wrappedInput: Contextual[Holder[In, Out]]): Unit = {
import wrappedInput.{ element => holder }
@ -289,7 +219,7 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition](
buffer = buffer.filter { case (partition, wrappedInput) =>
import wrappedInput.{ element => holder }
if ((holder.out eq MapAsyncPartitioned.NotYetThere) || !isAvailable(out)) {
if ((holder.out eq NotYetThere) || !isAvailable(out)) {
true
} else {
partitionsInProgress -= partition
@ -321,12 +251,14 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition](
}
private def drainQueue(): Unit = {
buffer.foreach {
case (partition, wrappedInput) =>
if (canStartNextElement(partition)) {
wrappedInput.resume()
processElement(partition, wrappedInput)
}
if (buffer.nonEmpty) {
buffer.foreach {
case (partition, wrappedInput) =>
if (canStartNextElement(partition)) {
wrappedInput.resume()
processElement(partition, wrappedInput)
}
}
}
}
@ -335,11 +267,10 @@ private[stream] class MapAsyncPartitioned[In, Out, Partition](
else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in)
// else already pulled and waiting for next element
private def idle(): Boolean =
buffer.isEmpty
private def idle(): Boolean = buffer.isEmpty
private def canStartNextElement(partition: Partition): Boolean =
!partitionsInProgress(partition) && partitionsInProgress.size < parallelism
!partitionsInProgress.contains(partition) && partitionsInProgress.size < parallelism
setHandlers(in, out, this)
}

View file

@ -40,6 +40,8 @@ import pekko.stream.Attributes._
val mapError = name("mapError")
val mapAsync = name("mapAsync")
val mapAsyncUnordered = name("mapAsyncUnordered")
val mapAsyncPartition = name("mapAsyncPartition")
val mapAsyncPartitionUnordered = name("mapAsyncPartitionUnordered")
val ask = name("ask")
val grouped = name("grouped")
val groupedWithin = name("groupedWithin")

View file

@ -842,30 +842,68 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitioned[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
MapAsyncPartitioned.mapFlowOrdered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava
new Flow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes and downstream available.
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitionedUnordered[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
MapAsyncPartitioned.mapFlowUnordered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava
new Flow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -173,39 +173,40 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
def map[Out2](f: function.Function[Out, Out2]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
viaScala(_.map(f.apply))
/**
* Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsync]].
*
* @see [[pekko.stream.javadsl.Flow.mapAsync]]
*/
def mapAsync[Out2](
parallelism: Int,
f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsyncPartitioned]].
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
* @see [[pekko.stream.javadsl.Flow.mapAsyncPartitioned]]
*/
def mapAsyncPartitioned[Out2, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitioned[Out2, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
viaScala(_.mapAsyncPartitioned(parallelism)(extractPartition(_))(f(_, _).asScala))
viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala))
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* Context-preserving variant of [[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]].
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
* @see [[pekko.stream.javadsl.Flow.mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitionedUnordered[Out2, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
viaScala(_.mapAsyncPartitionedUnordered(parallelism)(extractPartition(_))(f(_, _).asScala))
viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala))
}
/**

View file

@ -2496,32 +2496,68 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitioned[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] =
MapAsyncPartitioned.mapSourceOrdered(delegate, parallelism)(extractPartition(_))(f(_,
_).asScala).asJava
new Source(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes and downstream available.
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitionedUnordered[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] =
MapAsyncPartitioned.mapSourceUnordered(delegate, parallelism)(extractPartition(_))(f(_,
_).asScala).asJava
new Source(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Transform this stream by applying the given function to each of the elements

View file

@ -169,44 +169,40 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
def map[Out2](f: function.Function[Out, Out2]): SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.map(f.apply))
/**
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsync]].
*
* @see [[pekko.stream.javadsl.Source.mapAsync]]
*/
def mapAsync[Out2](
parallelism: Int,
f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsyncPartitioned]].
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
* @see [[pekko.stream.javadsl.Source.mapAsyncPartitioned]]
*/
def mapAsyncPartitioned[Out2, P](parallelism: Int,
extractPartition: function.Function[Out, P],
def mapAsyncPartitioned[Out2, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = {
MapAsyncPartitioned.mapSourceWithContextOrdered(delegate, parallelism)(extractPartition(_))(f(_,
_).asScala)
.asJava
viaScala(_.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala))
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]].
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
* @see [[pekko.stream.javadsl.Source.mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
extractPartition: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = {
MapAsyncPartitioned.mapSourceWithContextUnordered(delegate, parallelism)(extractPartition(_))(f(_,
_).asScala)
.asJava
}
def mapAsyncPartitionedUnordered[Out2, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]].

View file

@ -348,6 +348,72 @@ class SubFlow[In, Out, Mat](
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes and downstream available.
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): SubFlow[In, T, Mat] =
new SubFlow(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Only pass on those elements that satisfy the given predicate.
*

View file

@ -339,6 +339,72 @@ class SubSource[Out, Mat](
def mapAsyncUnordered[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncUnordered(parallelism)(x => f(x).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncPartitioned(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[CompletionStage]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes and downstream available.
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](
parallelism: Int,
partitioner: function.Function[Out, P],
f: function.Function2[Out, P, CompletionStage[T]]): SubSource[T, Mat] =
new SubSource(delegate.mapAsyncPartitionedUnordered(parallelism)(partitioner(_))(f(_, _).asScala))
/**
* Only pass on those elements that satisfy the given predicate.
*

View file

@ -163,36 +163,6 @@ final class Flow[-In, +Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
new Flow(traversalBuilder.transformMat(f), shape)
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int)(
extractPartition: Out => P)(
f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
MapAsyncPartitioned.mapFlowOrdered(this, parallelism)(extractPartition)(f)
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
extractPartition: Out => P)(
f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
MapAsyncPartitioned.mapFlowUnordered(this, parallelism)(extractPartition)(f)
}
/**
* Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]].
* The returned flow is partial materialized and do not support multiple times materialization.
@ -1173,6 +1143,81 @@ trait FlowOps[+Out, +Mat] {
*/
def mapAsyncUnordered[T](parallelism: Int)(f: Out => Future[T]): Repr[T] = via(MapAsyncUnordered(parallelism, f))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry. The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[Future]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes for the next element in sequence
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int)(
partitioner: Out => P)(
f: (Out, P) => Future[T]): Repr[T] = {
(if (parallelism == 1) {
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
} else {
via(new MapAsyncPartitioned(parallelism, orderedOutput = true, partitioner, f))
})
.withAttributes(DefaultAttributes.mapAsyncPartition and SourceLocation.forLambda(f))
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.The max parallelism of per partition is 1.
*
* The function `partitioner` is always invoked on the elements in the order they arrive.
* The function `f` is always invoked on the elements which in the same partition in the order they arrive.
*
* If the function `partitioner` or `f` throws an exception or if the [[Future]] is completed
* with failure and the supervision decision is [[pekko.stream.Supervision.Stop]]
* the stream will be completed with failure, otherwise the stream continues and the current element is dropped.
*
* Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute.
*
* '''Emits when''' the Future returned by the provided function finishes and downstream available.
*
* '''Backpressures when''' the number of futures reaches the configured parallelism and the downstream
* backpressures
*
* '''Completes when''' upstream completes and all futures have been completed and all elements have been emitted
*
* '''Cancels when''' downstream cancels
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
partitioner: Out => P)(
f: (Out, P) => Future[T]): Repr[T] = {
(if (parallelism == 1) {
via(MapAsyncUnordered(1, elem => f(elem, partitioner(elem))))
} else {
via(new MapAsyncPartitioned(parallelism, orderedOutput = false, partitioner, f))
}).withAttributes(DefaultAttributes.mapAsyncPartitionUnordered and SourceLocation.forLambda(f))
}
/**
* Use the `ask` pattern to send a request-reply message to the target `ref` actor.
* If any of the asks times out it will fail the stream with a [[pekko.pattern.AskTimeoutException]].

View file

@ -14,7 +14,6 @@
package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Future
import org.apache.pekko
import pekko.NotUsed
import pekko.japi.Pair
@ -90,36 +89,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] =
new FlowWithContext(delegate.mapMaterializedValue(f))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int)(
extractPartition: Out => P)(
f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
MapAsyncPartitioned.mapFlowWithContextOrdered(this, parallelism)(extractPartition)(f)
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
extractPartition: Out => P)(
f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
MapAsyncPartitioned.mapFlowWithContextUnordered(this, parallelism)(extractPartition)(f)
}
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat]

View file

@ -113,6 +113,36 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
case (e, ctx) => f(e).map(o => (o, ctx))(ExecutionContexts.parasitic)
})
/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]].
*
* @since 1.1.0
* @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitioned]]
*/
def mapAsyncPartitioned[Out2, P](parallelism: Int)(
partitioner: Out => P)(
f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
via(flow[Out, Ctx].mapAsyncPartitioned(parallelism)(pair => partitioner(pair._1)) {
(pair, partition) =>
f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
})
}
/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]].
*
* @since 1.1.0
* @see [[pekko.stream.scaladsl.FlowOps.mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int)(
partitioner: Out => P)(
f: (Out, P) => Future[Out2]): Repr[Out2, Ctx] = {
via(flow[Out, Ctx].mapAsyncPartitionedUnordered(parallelism)(pair => partitioner(pair._1)) {
(pair, partition) =>
f(pair._1, partition).map((_, pair._2))(ExecutionContexts.parasitic)
})
}
/**
* Context-preserving variant of [[pekko.stream.scaladsl.FlowOps.collect]].
*

View file

@ -99,34 +99,6 @@ final class Source[+Out, +Mat](
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any => Any]), shape)
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int)(
extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
MapAsyncPartitioned.mapSourceOrdered(this, parallelism)(extractPartition)(f)
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
MapAsyncPartitioned.mapSourceUnordered(this, parallelism)(extractPartition)(f)
}
/**
* Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source
* that can be used to consume elements from the newly materialized Source.

View file

@ -14,7 +14,6 @@
package org.apache.pekko.stream.scaladsl
import scala.annotation.unchecked.uncheckedVariance
import scala.concurrent.Future
import org.apache.pekko
import pekko.stream._
@ -78,34 +77,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] =
new SourceWithContext(delegate.mapMaterializedValue(f))
/**
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsync]]
* @see [[#mapAsyncPartitionedUnordered]]
*/
def mapAsyncPartitioned[T, P](parallelism: Int)(
extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = {
MapAsyncPartitioned.mapSourceWithContextOrdered(this, parallelism)(extractPartition)(f)
}
/**
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
* partition step before the transform step. The transform function receives the an individual
* stream entry and the calculated partition value for that entry.
*
* @since 1.1.0
* @see [[#mapAsyncUnordered]]
* @see [[#mapAsyncPartitioned]]
*/
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = {
MapAsyncPartitioned.mapSourceWithContextUnordered(this, parallelism)(extractPartition)(f)
}
/**
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]],
* concatenating the processing steps of both.