mapAsyncPartitioned / mapAsyncPartitionedUnordered (#561)
* Create MapAsyncPartition.scala add license add test Update LICENSE try to fix test * Update MapAsyncPartitionSpec.scala wip Update MapAsyncPartition.scala wip * changes to get code to compile with scala 2.12 * more tests scalafmt * Update MapAsyncPartition.scala * make code more closely match the Akka API java Update Flow.scala more java api * Add ordered version of the operator * Fix formatting * update docs * test null function result * java api * add back code to get scala 2.12 compile working again * Unify mapAsyncPartitioned implementations * remove special license Update CopyrightHeader.scala * java tests javafmt * update docs update tests update javadoc --------- Co-authored-by: Jacek Sokol <jacek@scalabs.pl>
This commit is contained in:
parent
e94e7b971b
commit
7bee80e058
18 changed files with 1184 additions and 6 deletions
|
|
@ -103,6 +103,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = userProjects ++ List[Projec
|
||||||
persistenceTypedTests,
|
persistenceTypedTests,
|
||||||
remoteTests,
|
remoteTests,
|
||||||
streamTests,
|
streamTests,
|
||||||
|
streamTypedTests,
|
||||||
streamTestsTck)
|
streamTestsTck)
|
||||||
|
|
||||||
lazy val root = Project(id = "pekko", base = file("."))
|
lazy val root = Project(id = "pekko", base = file("."))
|
||||||
|
|
@ -567,6 +568,12 @@ lazy val streamTyped = pekkoModule("stream-typed")
|
||||||
.settings(AutomaticModuleName.settings("pekko.stream.typed"))
|
.settings(AutomaticModuleName.settings("pekko.stream.typed"))
|
||||||
.enablePlugins(ScaladocNoVerificationOfDiagrams)
|
.enablePlugins(ScaladocNoVerificationOfDiagrams)
|
||||||
|
|
||||||
|
lazy val streamTypedTests = pekkoModule("stream-typed-tests")
|
||||||
|
.dependsOn(streamTestkit % "test->test", streamTyped)
|
||||||
|
.settings(Dependencies.streamTests)
|
||||||
|
.enablePlugins(NoPublish)
|
||||||
|
.disablePlugins(MimaPlugin)
|
||||||
|
|
||||||
lazy val actorTestkitTyped = pekkoModule("actor-testkit-typed")
|
lazy val actorTestkitTyped = pekkoModule("actor-testkit-typed")
|
||||||
.dependsOn(actorTyped, slf4j, testkit % "compile->compile;test->test")
|
.dependsOn(actorTyped, slf4j, testkit % "compile->compile;test->test")
|
||||||
.settings(AutomaticModuleName.settings("pekko.actor.testkit.typed"))
|
.settings(AutomaticModuleName.settings("pekko.actor.testkit.typed"))
|
||||||
|
|
|
||||||
|
|
@ -1,6 +1,6 @@
|
||||||
# mapAsync
|
# mapAsync
|
||||||
|
|
||||||
Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result.
|
Pass incoming elements to a function that returns a @scala[`Future`] @java[`CompletionStage`] result.
|
||||||
|
|
||||||
@ref[Asynchronous operators](../index.md#asynchronous-operators)
|
@ref[Asynchronous operators](../index.md#asynchronous-operators)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,28 @@
|
||||||
|
# mapAsyncPartitioned
|
||||||
|
|
||||||
|
Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will have elements that retain the order of the original Source or Flow.
|
||||||
|
|
||||||
|
@ref[Asynchronous operators](../index.md#asynchronous-operators)
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@apidoc[Source.mapAsyncPartitioned](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" }
|
||||||
|
@apidoc[Flow.mapAsyncPartitioned](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" }
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
Like `mapAsync` but an intermediate partitioning stage is used.
|
||||||
|
Up to `parallelism` elements can be processed concurrently, but regardless of their completion time the incoming
|
||||||
|
order will be kept when results complete. For use cases where order does not matter, `mapAsyncPartitionedUnordered` can be used.
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** when the next in order @scala[`Future`] @java[`CompletionStage`] returned by the provided function completes successfully
|
||||||
|
|
||||||
|
**backpressures** when downstream backgpressures and completed and incomplete @scala[`Future`] @java[`CompletionStage`] has reached the configured `parallelism`
|
||||||
|
|
||||||
|
**completes** when upstream completes and all @scala[Futures] @java[CompletionStages] have completed and all results have been emitted
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -0,0 +1,29 @@
|
||||||
|
# mapAsyncPartitionedUnordered
|
||||||
|
|
||||||
|
Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will not have ordered elements.
|
||||||
|
|
||||||
|
@ref[Asynchronous operators](../index.md#asynchronous-operators)
|
||||||
|
|
||||||
|
## Signature
|
||||||
|
|
||||||
|
@apidoc[Source.mapAsyncPartitionedUnordered](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" }
|
||||||
|
@apidoc[Flow.mapAsyncPartitionedUnordered](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" }
|
||||||
|
|
||||||
|
## Description
|
||||||
|
|
||||||
|
Like `mapAsyncUnordered` but an intermediate partitioning stage is used.
|
||||||
|
Up to `parallelism` elements can be processed concurrently for a partition and pushed down the stream regardless of the
|
||||||
|
order of the partitions that triggered them. In other words, the order of the output elements will be preserved only within a partition.
|
||||||
|
For use cases where order matters, `mapAsyncPartitioned` can be used.
|
||||||
|
|
||||||
|
## Reactive Streams semantics
|
||||||
|
|
||||||
|
@@@div { .callout }
|
||||||
|
|
||||||
|
**emits** any of the @scala[`Future` s] @java[`CompletionStage` s] returned by the provided function complete
|
||||||
|
|
||||||
|
**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures
|
||||||
|
|
||||||
|
**completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
@ -198,7 +198,9 @@ operation at the same time (usually handling the completion of a @scala[`Future`
|
||||||
|
|
||||||
| |Operator|Description|
|
| |Operator|Description|
|
||||||
|--|--|--|
|
|--|--|--|
|
||||||
|Source/Flow|<a name="mapasync"></a>@ref[mapAsync](Source-or-Flow/mapAsync.md)|Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result.|
|
|Source/Flow|<a name="mapasync"></a>@ref[mapAsync](Source-or-Flow/mapAsync.md)|Pass incoming elements to a function that returns a @scala[`Future`] @java[`CompletionStage`] result.|
|
||||||
|
|Source/Flow|<a name="mapasyncpartitioned"></a>@ref[mapAsyncPartitioned](Source-or-Flow/mapAsyncPartitioned.md)|Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will have elements that retain the order of the original Source or Flow.|
|
||||||
|
|Source/Flow|<a name="mapasyncpartitionedunordered"></a>@ref[mapAsyncPartitionedUnordered](Source-or-Flow/mapAsyncPartitionedUnordered.md)|Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will not have ordered elements.|
|
||||||
|Source/Flow|<a name="mapasyncunordered"></a>@ref[mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)|Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them.|
|
|Source/Flow|<a name="mapasyncunordered"></a>@ref[mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)|Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them.|
|
||||||
|
|
||||||
## Timer driven operators
|
## Timer driven operators
|
||||||
|
|
@ -512,6 +514,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
|
||||||
* [logWithMarker](Source-or-Flow/logWithMarker.md)
|
* [logWithMarker](Source-or-Flow/logWithMarker.md)
|
||||||
* [map](Source-or-Flow/map.md)
|
* [map](Source-or-Flow/map.md)
|
||||||
* [mapAsync](Source-or-Flow/mapAsync.md)
|
* [mapAsync](Source-or-Flow/mapAsync.md)
|
||||||
|
* [mapAsyncPartitioned](Source-or-Flow/mapAsyncPartitioned.md)
|
||||||
|
* [mapAsyncPartitionedUnordered](Source-or-Flow/mapAsyncPartitionedUnordered.md)
|
||||||
* [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
|
* [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)
|
||||||
* [mapConcat](Source-or-Flow/mapConcat.md)
|
* [mapConcat](Source-or-Flow/mapConcat.md)
|
||||||
* [mapError](Source-or-Flow/mapError.md)
|
* [mapError](Source-or-Flow/mapError.md)
|
||||||
|
|
|
||||||
|
|
@ -124,9 +124,8 @@ trait CopyrightHeader extends AutoPlugin {
|
||||||
private def isLightbendCopyrighted(text: String): Boolean =
|
private def isLightbendCopyrighted(text: String): Boolean =
|
||||||
StringUtils.containsIgnoreCase(text, "lightbend inc.")
|
StringUtils.containsIgnoreCase(text, "lightbend inc.")
|
||||||
|
|
||||||
private def isValidCopyrightAnnotated(text: String): Boolean = {
|
private def isValidCopyrightAnnotated(text: String): Boolean =
|
||||||
isApacheCopyrighted(text)
|
isApacheCopyrighted(text)
|
||||||
}
|
|
||||||
|
|
||||||
private def isOnlyLightbendOrEpflCopyrightAnnotated(text: String): Boolean = {
|
private def isOnlyLightbendOrEpflCopyrightAnnotated(text: String): Boolean = {
|
||||||
(isLightbendCopyrighted(text) || isLAMPCopyrighted(text)) && !isApacheCopyrighted(text)
|
(isLightbendCopyrighted(text) || isLAMPCopyrighted(text)) && !isApacheCopyrighted(text)
|
||||||
|
|
|
||||||
|
|
@ -886,6 +886,54 @@ public class FlowTest extends StreamTest {
|
||||||
assertEquals(0, result.size());
|
assertEquals(0, result.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToUseMapAsyncPartitioned() throws Exception {
|
||||||
|
final TestKit probe = new TestKit(system);
|
||||||
|
final Iterable<String> input = Arrays.asList("2c", "1a", "1b");
|
||||||
|
final Flow<String, String, NotUsed> flow =
|
||||||
|
Flow.of(String.class)
|
||||||
|
.mapAsyncPartitioned(
|
||||||
|
4,
|
||||||
|
elem -> elem.substring(0, 1),
|
||||||
|
(elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase()));
|
||||||
|
Source.from(input)
|
||||||
|
.via(flow)
|
||||||
|
.runForeach(
|
||||||
|
new Procedure<String>() {
|
||||||
|
public void apply(String elem) {
|
||||||
|
probe.getRef().tell(elem, ActorRef.noSender());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
system);
|
||||||
|
probe.expectMsgEquals("2C");
|
||||||
|
probe.expectMsgEquals("1A");
|
||||||
|
probe.expectMsgEquals("1B");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToUseMapAsyncPartitionedUnordered() throws Exception {
|
||||||
|
final TestKit probe = new TestKit(system);
|
||||||
|
final Iterable<String> input = Arrays.asList("1a", "1b", "2c");
|
||||||
|
final Flow<String, String, NotUsed> flow =
|
||||||
|
Flow.of(String.class)
|
||||||
|
.mapAsyncPartitionedUnordered(
|
||||||
|
4,
|
||||||
|
elem -> elem.substring(0, 1),
|
||||||
|
(elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase()));
|
||||||
|
Source.from(input)
|
||||||
|
.via(flow)
|
||||||
|
.runForeach(
|
||||||
|
new Procedure<String>() {
|
||||||
|
public void apply(String elem) {
|
||||||
|
probe.getRef().tell(elem, ActorRef.noSender());
|
||||||
|
}
|
||||||
|
},
|
||||||
|
system);
|
||||||
|
probe.expectMsgEquals("1A");
|
||||||
|
probe.expectMsgEquals("1B");
|
||||||
|
probe.expectMsgEquals("2C");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mustBeAbleToUseCollectType() throws Exception {
|
public void mustBeAbleToUseCollectType() throws Exception {
|
||||||
final TestKit probe = new TestKit(system);
|
final TestKit probe = new TestKit(system);
|
||||||
|
|
|
||||||
|
|
@ -568,6 +568,36 @@ public class SourceTest extends StreamTest {
|
||||||
probe.expectMsgEquals("C");
|
probe.expectMsgEquals("C");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToUseMapAsyncPartitioned() throws Exception {
|
||||||
|
final TestKit probe = new TestKit(system);
|
||||||
|
final Iterable<String> input = Arrays.asList("2c", "1a", "1b");
|
||||||
|
Source.from(input)
|
||||||
|
.mapAsyncPartitioned(
|
||||||
|
4,
|
||||||
|
elem -> elem.substring(0, 1),
|
||||||
|
(elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase()))
|
||||||
|
.runForeach(elem -> probe.getRef().tell(elem, ActorRef.noSender()), system);
|
||||||
|
probe.expectMsgEquals("2C");
|
||||||
|
probe.expectMsgEquals("1A");
|
||||||
|
probe.expectMsgEquals("1B");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void mustBeAbleToUseMapAsyncPartitionedUnordered() throws Exception {
|
||||||
|
final TestKit probe = new TestKit(system);
|
||||||
|
final Iterable<String> input = Arrays.asList("1a", "1b", "2c");
|
||||||
|
Source.from(input)
|
||||||
|
.mapAsyncPartitionedUnordered(
|
||||||
|
4,
|
||||||
|
elem -> elem.substring(0, 1),
|
||||||
|
(elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase()))
|
||||||
|
.runForeach(elem -> probe.getRef().tell(elem, ActorRef.noSender()), system);
|
||||||
|
probe.expectMsgEquals("1A");
|
||||||
|
probe.expectMsgEquals("1B");
|
||||||
|
probe.expectMsgEquals("2C");
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void mustBeAbleToUseCollectType() throws Exception {
|
public void mustBeAbleToUseCollectType() throws Exception {
|
||||||
final TestKit probe = new TestKit(system);
|
final TestKit probe = new TestKit(system);
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,449 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.stream
|
||||||
|
|
||||||
|
import org.apache.pekko.actor.typed.ActorSystem
|
||||||
|
import org.apache.pekko.actor.typed.scaladsl.Behaviors
|
||||||
|
import org.apache.pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, Sink, Source, SourceWithContext }
|
||||||
|
import org.scalacheck.{ Arbitrary, Gen }
|
||||||
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
import org.scalatest.concurrent.ScalaFutures
|
||||||
|
import org.scalatest.flatspec.AnyFlatSpec
|
||||||
|
import org.scalatest.matchers.should.Matchers
|
||||||
|
import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks
|
||||||
|
|
||||||
|
import java.time.Instant
|
||||||
|
import java.util.concurrent.Executors
|
||||||
|
import scala.annotation.nowarn
|
||||||
|
import scala.concurrent.duration.{ DurationInt, FiniteDuration }
|
||||||
|
import scala.concurrent.{ blocking, ExecutionContext, Future }
|
||||||
|
import scala.language.postfixOps
|
||||||
|
import scala.util.Random
|
||||||
|
|
||||||
|
private object MapAsyncPartitionedSpec {
|
||||||
|
|
||||||
|
object TestData {
|
||||||
|
|
||||||
|
case class Parallelism(value: Int) extends AnyVal
|
||||||
|
|
||||||
|
case class TestKeyValue(key: Int, delay: FiniteDuration, value: String)
|
||||||
|
|
||||||
|
implicit val parallelismArb: Arbitrary[Parallelism] = Arbitrary {
|
||||||
|
Gen.choose(2, 8).map(Parallelism.apply)
|
||||||
|
}
|
||||||
|
implicit val elementsArb: Arbitrary[Seq[TestKeyValue]] = Arbitrary {
|
||||||
|
for {
|
||||||
|
totalElements <- Gen.choose(1, 100)
|
||||||
|
totalPartitions <- Gen.choose(1, 8)
|
||||||
|
} yield {
|
||||||
|
generateElements(totalPartitions, totalElements)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def generateElements(totalPartitions: Int, totalElements: Int): Seq[TestKeyValue] =
|
||||||
|
for (i <- 1 to totalElements) yield {
|
||||||
|
TestKeyValue(
|
||||||
|
key = Random.nextInt(totalPartitions),
|
||||||
|
delay = DurationInt(Random.nextInt(20) + 10).millis,
|
||||||
|
value = i.toString)
|
||||||
|
}
|
||||||
|
|
||||||
|
def extractPartition(e: TestKeyValue): Int =
|
||||||
|
e.key
|
||||||
|
|
||||||
|
type Operation = TestKeyValue => Future[(Int, String)]
|
||||||
|
|
||||||
|
def asyncOperation(e: TestKeyValue, p: Int)(implicit ec: ExecutionContext): Future[(Int, String)] =
|
||||||
|
Future {
|
||||||
|
p -> e.value
|
||||||
|
}
|
||||||
|
|
||||||
|
def blockingOperation(e: TestKeyValue, p: Int)(implicit ec: ExecutionContext): Future[(Int, String)] =
|
||||||
|
Future {
|
||||||
|
blocking {
|
||||||
|
Thread.sleep(e.delay.toMillis)
|
||||||
|
p -> e.value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class MapAsyncPartitionedSpec
|
||||||
|
extends AnyFlatSpec
|
||||||
|
with Matchers
|
||||||
|
with ScalaFutures
|
||||||
|
with BeforeAndAfterAll
|
||||||
|
with ScalaCheckDrivenPropertyChecks {
|
||||||
|
|
||||||
|
import MapAsyncPartitionedSpec.TestData._
|
||||||
|
|
||||||
|
override implicit def patienceConfig: PatienceConfig = PatienceConfig(
|
||||||
|
timeout = 5 seconds,
|
||||||
|
interval = 100 millis)
|
||||||
|
|
||||||
|
private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "test-system")
|
||||||
|
private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newCachedThreadPool())
|
||||||
|
|
||||||
|
override protected def afterAll(): Unit = {
|
||||||
|
system.terminate()
|
||||||
|
system.whenTerminated.futureValue
|
||||||
|
super.afterAll()
|
||||||
|
}
|
||||||
|
|
||||||
|
@nowarn("msg=deprecated") // use Stream to support Scala 2.12
|
||||||
|
private def infiniteStream(): Stream[Int] = Stream.from(1)
|
||||||
|
|
||||||
|
@nowarn("msg=never used")
|
||||||
|
private def f(i: Int, p: Int): Future[Int] =
|
||||||
|
Future(i % 2)
|
||||||
|
|
||||||
|
behavior.of("MapAsyncPartitionedUnordered")
|
||||||
|
|
||||||
|
it should "process elements in parallel by partition" in {
|
||||||
|
val elements = List(
|
||||||
|
TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"),
|
||||||
|
TestKeyValue(key = 2, delay = 700 millis, value = "2.a"),
|
||||||
|
TestKeyValue(key = 1, delay = 500 millis, value = "1.b"),
|
||||||
|
TestKeyValue(key = 2, delay = 900 millis, value = "2.b"),
|
||||||
|
TestKeyValue(key = 1, delay = 500 millis, value = "1.c"))
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(elements)
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(blockingOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
.map(_._2)
|
||||||
|
|
||||||
|
result shouldBe Vector("2.a", "1.a", "1.b", "2.b", "1.c")
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "process elements in parallel preserving order in partition" in {
|
||||||
|
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
|
||||||
|
val result =
|
||||||
|
Source(elements.toIndexedSeq)
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(asyncOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val actual = result.groupBy(_._1).mapValues2(_.map(_._2)).toMap
|
||||||
|
val expected = elements.toSeq.groupBy(_.key).mapValues2(_.map(_.value)).toMap
|
||||||
|
|
||||||
|
actual shouldBe expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "process elements in sequence preserving order in partition" in {
|
||||||
|
forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
|
||||||
|
val result =
|
||||||
|
Source
|
||||||
|
.fromIterator(() => elements.iterator)
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism = 1)(extractPartition)(asyncOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val actual = result.groupBy(_._1).mapValues2(_.map(_._2)).toMap
|
||||||
|
val expected = elements.toSeq.groupBy(_.key).mapValues2(_.map(_.value)).toMap
|
||||||
|
|
||||||
|
actual shouldBe expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "process elements in parallel preserving order in partition with blocking operation" in {
|
||||||
|
forAll(minSuccessful(10)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
|
||||||
|
val result =
|
||||||
|
Source
|
||||||
|
.fromIterator(() => elements.iterator)
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(blockingOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val actual = result.groupBy(_._1).mapValues2(_.map(_._2)).toMap
|
||||||
|
val expected = elements.toSeq.groupBy(_.key).mapValues2(_.map(_.value)).toMap
|
||||||
|
|
||||||
|
actual shouldBe expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "stop the stream via a KillSwitch" in {
|
||||||
|
val (killSwitch, future) =
|
||||||
|
Source(infiniteStream())
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism = 6)(i => i % 6) { (i, _) =>
|
||||||
|
Future {
|
||||||
|
blocking {
|
||||||
|
Thread.sleep(40)
|
||||||
|
(i % 6).toString -> i.toString
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.viaMat(KillSwitches.single)(Keep.right)
|
||||||
|
.toMat(Sink.seq)(Keep.both)
|
||||||
|
.run()
|
||||||
|
|
||||||
|
Thread.sleep(500)
|
||||||
|
|
||||||
|
killSwitch.shutdown()
|
||||||
|
|
||||||
|
val result = future.futureValue.groupBy(_._1)
|
||||||
|
result should have size 6
|
||||||
|
result.values.foreach {
|
||||||
|
_.size should be >= 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "stop the stream if any operation fails" in {
|
||||||
|
val future =
|
||||||
|
Source(infiniteStream())
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism = 4)(i => i % 8) { (i, _) =>
|
||||||
|
Future {
|
||||||
|
if (i == 23) throw new RuntimeException("Ignore it")
|
||||||
|
else i.toString
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.toMat(Sink.ignore)(Keep.right)
|
||||||
|
.run()
|
||||||
|
|
||||||
|
future.failed.futureValue shouldBe a[RuntimeException]
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "handle nulls" in {
|
||||||
|
val elements = List(
|
||||||
|
TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"),
|
||||||
|
TestKeyValue(key = 2, delay = 700 millis, value = "2.a"),
|
||||||
|
TestKeyValue(key = 1, delay = 500 millis, value = null))
|
||||||
|
|
||||||
|
@nowarn("msg=never used")
|
||||||
|
def fun(v: TestKeyValue, p: Int): Future[String] = Future.successful(v.value)
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(elements)
|
||||||
|
.mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(fun)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
result.toSet shouldBe Set("1.a", "2.a")
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "fail to create an operator if parallelism is less than 1" in {
|
||||||
|
forAll(Gen.negNum[Int]) { zeroOrNegativeParallelism: Int =>
|
||||||
|
an[IllegalArgumentException] shouldBe thrownBy {
|
||||||
|
Source(infiniteStream())
|
||||||
|
.mapAsyncPartitionedUnordered(
|
||||||
|
parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit)
|
||||||
|
.runWith(Sink.ignore)
|
||||||
|
.futureValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
behavior.of("MapAsyncPartitionedOrdered")
|
||||||
|
|
||||||
|
it should "process elements in parallel by partition" in {
|
||||||
|
val elements = List(
|
||||||
|
TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"),
|
||||||
|
TestKeyValue(key = 2, delay = 700 millis, value = "2.a"),
|
||||||
|
TestKeyValue(key = 1, delay = 500 millis, value = "1.b"),
|
||||||
|
TestKeyValue(key = 1, delay = 500 millis, value = "1.c"),
|
||||||
|
TestKeyValue(key = 2, delay = 900 millis, value = "2.b"))
|
||||||
|
|
||||||
|
def processElement(e: TestKeyValue, p: Int)(implicit ec: ExecutionContext): Future[(Int, (String, Instant))] =
|
||||||
|
Future {
|
||||||
|
blocking {
|
||||||
|
val startedAt = Instant.now()
|
||||||
|
Thread.sleep(e.delay.toMillis)
|
||||||
|
p -> (e.value -> startedAt)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(elements)
|
||||||
|
.mapAsyncPartitioned(parallelism = 2)(extractPartition)(processElement)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
.map(_._2)
|
||||||
|
|
||||||
|
result.map(_._1) shouldBe Vector("1.a", "2.a", "1.b", "1.c", "2.b")
|
||||||
|
val elementStartTime = result.toMap
|
||||||
|
|
||||||
|
elementStartTime("1.a") should be < elementStartTime("1.b")
|
||||||
|
elementStartTime("1.b") should be < elementStartTime("1.c")
|
||||||
|
elementStartTime("2.a") should be < elementStartTime("2.b")
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "process elements in parallel preserving order in partition" in {
|
||||||
|
forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
|
||||||
|
val result =
|
||||||
|
Source(elements.toIndexedSeq)
|
||||||
|
.mapAsyncPartitioned(parallelism.value)(extractPartition)(asyncOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val actual = result.map(_._2)
|
||||||
|
val expected = elements.map(_.value)
|
||||||
|
|
||||||
|
actual shouldBe expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "process elements in sequence preserving order in partition" in {
|
||||||
|
forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) =>
|
||||||
|
val result =
|
||||||
|
Source
|
||||||
|
.fromIterator(() => elements.iterator)
|
||||||
|
.mapAsyncPartitioned(parallelism = 1)(extractPartition)(asyncOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val actual = result.map(_._2)
|
||||||
|
val expected = elements.map(_.value)
|
||||||
|
|
||||||
|
actual shouldBe expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "process elements in parallel preserving order in partition with blocking operation" in {
|
||||||
|
forAll(minSuccessful(10)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) =>
|
||||||
|
val result =
|
||||||
|
Source
|
||||||
|
.fromIterator(() => elements.iterator)
|
||||||
|
.mapAsyncPartitioned(parallelism.value)(extractPartition)(blockingOperation)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val actual = result.map(_._2)
|
||||||
|
val expected = elements.map(_.value)
|
||||||
|
|
||||||
|
actual shouldBe expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "stop the stream via a KillSwitch" in {
|
||||||
|
val (killSwitch, future) =
|
||||||
|
Source(infiniteStream())
|
||||||
|
.mapAsyncPartitioned(parallelism = 6)(i => i % 6) { (i, _) =>
|
||||||
|
Future {
|
||||||
|
blocking {
|
||||||
|
Thread.sleep(40)
|
||||||
|
(i % 6).toString -> i.toString
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.viaMat(KillSwitches.single)(Keep.right)
|
||||||
|
.toMat(Sink.seq)(Keep.both)
|
||||||
|
.run()
|
||||||
|
|
||||||
|
Thread.sleep(500)
|
||||||
|
|
||||||
|
killSwitch.shutdown()
|
||||||
|
|
||||||
|
val result = future.futureValue.groupBy(_._1)
|
||||||
|
result should have size 6
|
||||||
|
result.values.foreach {
|
||||||
|
_.size should be >= 10
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "stop the stream if any operation fails" in {
|
||||||
|
val future =
|
||||||
|
Source(infiniteStream())
|
||||||
|
.mapAsyncPartitioned(parallelism = 4)(i => i % 8) { (i, _) =>
|
||||||
|
Future {
|
||||||
|
if (i == 23) throw new RuntimeException("Ignore it")
|
||||||
|
else i.toString
|
||||||
|
}
|
||||||
|
}
|
||||||
|
.toMat(Sink.ignore)(Keep.right)
|
||||||
|
.run()
|
||||||
|
|
||||||
|
future.failed.futureValue shouldBe a[RuntimeException]
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "handle nulls" in {
|
||||||
|
val elements = List(
|
||||||
|
TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"),
|
||||||
|
TestKeyValue(key = 2, delay = 700 millis, value = "2.a"),
|
||||||
|
TestKeyValue(key = 1, delay = 500 millis, value = null))
|
||||||
|
|
||||||
|
@nowarn("msg=never used")
|
||||||
|
def fun(v: TestKeyValue, p: Int): Future[String] = Future.successful(v.value)
|
||||||
|
|
||||||
|
val result =
|
||||||
|
Source(elements)
|
||||||
|
.mapAsyncPartitioned(parallelism = 2)(extractPartition)(fun)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
result shouldBe Seq("1.a", "2.a")
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "fail to create an operator if parallelism is less than 1" in {
|
||||||
|
forAll(Gen.negNum[Int]) { zeroOrNegativeParallelism: Int =>
|
||||||
|
an[IllegalArgumentException] shouldBe thrownBy {
|
||||||
|
Source(infiniteStream())
|
||||||
|
.mapAsyncPartitioned(
|
||||||
|
parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit)
|
||||||
|
.runWith(Sink.ignore)
|
||||||
|
.futureValue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
behavior.of("operator applicability")
|
||||||
|
|
||||||
|
it should "be applicable to a source" in {
|
||||||
|
Source
|
||||||
|
.single(3)
|
||||||
|
.mapAsyncPartitioned(parallelism = 1)(identity)(f)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue shouldBe Seq(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "be applicable to a source with context" in {
|
||||||
|
SourceWithContext
|
||||||
|
.fromTuples(Source.single(3 -> "A"))
|
||||||
|
.mapAsyncPartitioned(parallelism = 1)(identity)(f)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue shouldBe Seq(1 -> "A")
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "be applicable to a flow" in {
|
||||||
|
Flow[Int]
|
||||||
|
.mapAsyncPartitioned(parallelism = 1)(identity)(f)
|
||||||
|
.runWith(Source.single(3), Sink.seq)
|
||||||
|
._2
|
||||||
|
.futureValue shouldBe Seq(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
it should "be applicable to a flow with context" in {
|
||||||
|
val flow =
|
||||||
|
FlowWithContext[Int, String]
|
||||||
|
.mapAsyncPartitioned(parallelism = 1)(identity)(f)
|
||||||
|
|
||||||
|
SourceWithContext
|
||||||
|
.fromTuples(Source.single(3 -> "A"))
|
||||||
|
.via(flow)
|
||||||
|
.runWith(Sink.seq)
|
||||||
|
.futureValue shouldBe Seq(1 -> "A")
|
||||||
|
}
|
||||||
|
|
||||||
|
private implicit class MapWrapper[K, V](map: Map[K, V]) {
|
||||||
|
@nowarn("msg=deprecated")
|
||||||
|
def mapValues2[W](f: V => W) = map.mapValues(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,346 @@
|
||||||
|
/*
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one or more
|
||||||
|
* contributor license agreements. See the NOTICE file distributed with
|
||||||
|
* this work for additional information regarding copyright ownership.
|
||||||
|
* The ASF licenses this file to You under the Apache License, Version 2.0
|
||||||
|
* (the "License"); you may not use this file except in compliance with
|
||||||
|
* the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.pekko.stream
|
||||||
|
|
||||||
|
import scala.collection.mutable
|
||||||
|
import scala.concurrent.Future
|
||||||
|
import scala.util.control.{ NoStackTrace, NonFatal }
|
||||||
|
import scala.util.{ Failure, Success, Try }
|
||||||
|
|
||||||
|
import org.apache.pekko
|
||||||
|
import pekko.dispatch.ExecutionContexts
|
||||||
|
import pekko.stream.ActorAttributes.SupervisionStrategy
|
||||||
|
import pekko.stream.Attributes.{ Name, SourceLocation }
|
||||||
|
import pekko.stream.MapAsyncPartitioned._
|
||||||
|
import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, SourceWithContext }
|
||||||
|
import pekko.stream.stage._
|
||||||
|
|
||||||
|
private[stream] object MapAsyncPartitioned {
|
||||||
|
|
||||||
|
private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => Partition)(tuple: (In, Ctx)): Partition =
|
||||||
|
extract(tuple._1)
|
||||||
|
|
||||||
|
private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => Future[Out])(tuple: (In, Ctx),
|
||||||
|
partition: Partition): Future[(Out, Ctx)] =
|
||||||
|
f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic)
|
||||||
|
|
||||||
|
def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)(
|
||||||
|
extractPartition: In => Partition)(
|
||||||
|
f: (In, Partition) => Future[Out]): Source[Out, Mat] =
|
||||||
|
source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = true, parallelism, extractPartition, f))
|
||||||
|
|
||||||
|
def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)(
|
||||||
|
extractPartition: In => Partition)(
|
||||||
|
f: (In, Partition) => Future[Out]): Source[Out, Mat] =
|
||||||
|
source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = false, parallelism, extractPartition, f))
|
||||||
|
|
||||||
|
def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat], parallelism: Int)(
|
||||||
|
extractPartition: In => Partition)(
|
||||||
|
f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
|
||||||
|
flow.via(
|
||||||
|
new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
|
||||||
|
orderedOutput = true,
|
||||||
|
parallelism,
|
||||||
|
extractPartitionWithCtx(extractPartition),
|
||||||
|
fWithCtx[In, T, Ctx, Partition](f)))
|
||||||
|
|
||||||
|
def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat],
|
||||||
|
parallelism: Int)(extractPartition: In => Partition)(
|
||||||
|
f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] =
|
||||||
|
flow.via(
|
||||||
|
new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition](
|
||||||
|
orderedOutput = false,
|
||||||
|
parallelism,
|
||||||
|
extractPartitionWithCtx(extractPartition),
|
||||||
|
fWithCtx[In, T, Ctx, Partition](f)))
|
||||||
|
|
||||||
|
def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)(
|
||||||
|
extractPartition: Out => Partition)(
|
||||||
|
f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
|
||||||
|
flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, parallelism, extractPartition,
|
||||||
|
f))
|
||||||
|
|
||||||
|
def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)(
|
||||||
|
extractPartition: Out => Partition)(
|
||||||
|
f: (Out, Partition) => Future[T]): Flow[In, T, Mat] =
|
||||||
|
flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, parallelism,
|
||||||
|
extractPartition, f))
|
||||||
|
|
||||||
|
def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
|
||||||
|
flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(
|
||||||
|
extractPartition: Out => Partition)(
|
||||||
|
f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] =
|
||||||
|
flow.via(
|
||||||
|
new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
|
||||||
|
orderedOutput = true,
|
||||||
|
parallelism,
|
||||||
|
extractPartitionWithCtx(extractPartition),
|
||||||
|
fWithCtx[Out, T, CtxOut, Partition](f)))
|
||||||
|
|
||||||
|
def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat](
|
||||||
|
flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(extractPartition: Out => Partition)(
|
||||||
|
f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] =
|
||||||
|
flow.via(
|
||||||
|
new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition](
|
||||||
|
orderedOutput = false,
|
||||||
|
parallelism,
|
||||||
|
extractPartitionWithCtx(extractPartition),
|
||||||
|
fWithCtx[Out, T, CtxOut, Partition](f)))
|
||||||
|
|
||||||
|
private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception with NoStackTrace)
|
||||||
|
|
||||||
|
private[stream] final class Holder[In, Out](
|
||||||
|
val in: In,
|
||||||
|
var out: Try[Out],
|
||||||
|
callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) {
|
||||||
|
|
||||||
|
// To support both fail-fast when the supervision directive is Stop
|
||||||
|
// and not calling the decider multiple times (#23888) we need to cache the decider result and re-use that
|
||||||
|
private var cachedSupervisionDirective: Option[Supervision.Directive] = None
|
||||||
|
|
||||||
|
def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): Supervision.Directive = {
|
||||||
|
cachedSupervisionDirective match {
|
||||||
|
case Some(d) => d
|
||||||
|
case _ =>
|
||||||
|
val d = decider(ex)
|
||||||
|
cachedSupervisionDirective = Some(d)
|
||||||
|
d
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
def setOut(t: Try[Out]): Unit =
|
||||||
|
out = t
|
||||||
|
|
||||||
|
override def apply(t: Try[Out]): Unit = {
|
||||||
|
setOut(t)
|
||||||
|
callback.invoke(this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private[stream] class MapAsyncPartitioned[In, Out, Partition](
|
||||||
|
orderedOutput: Boolean,
|
||||||
|
parallelism: Int,
|
||||||
|
extractPartition: In => Partition,
|
||||||
|
f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] {
|
||||||
|
|
||||||
|
if (parallelism < 1) throw new IllegalArgumentException("parallelism must be at least 1")
|
||||||
|
|
||||||
|
private val in = Inlet[In]("MapAsyncPartitionOrdered.in")
|
||||||
|
private val out = Outlet[Out]("MapAsyncPartitionOrdered.out")
|
||||||
|
|
||||||
|
override val shape: FlowShape[In, Out] = FlowShape(in, out)
|
||||||
|
|
||||||
|
override def initialAttributes: Attributes =
|
||||||
|
Attributes(Name("MapAsyncPartitionOrdered")) and SourceLocation.forLambda(f)
|
||||||
|
|
||||||
|
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
|
||||||
|
new GraphStageLogic(shape) with InHandler with OutHandler {
|
||||||
|
private val contextPropagation = pekko.stream.impl.ContextPropagation()
|
||||||
|
|
||||||
|
private final class Contextual[T](context: AnyRef, val element: T) {
|
||||||
|
private var suspended = false
|
||||||
|
|
||||||
|
def suspend(): Unit =
|
||||||
|
if (!suspended) {
|
||||||
|
suspended = true
|
||||||
|
contextPropagation.suspendContext()
|
||||||
|
}
|
||||||
|
|
||||||
|
def resume(): Unit =
|
||||||
|
if (suspended) {
|
||||||
|
suspended = false
|
||||||
|
contextPropagation.resumeContext(context)
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider
|
||||||
|
|
||||||
|
private var partitionsInProgress: mutable.Set[Partition] = _
|
||||||
|
private var buffer: mutable.Queue[(Partition, Contextual[Holder[In, Out]])] = _
|
||||||
|
|
||||||
|
private val futureCB = getAsyncCallback[Holder[In, Out]](holder =>
|
||||||
|
holder.out match {
|
||||||
|
case Success(_) => pushNextIfPossible()
|
||||||
|
case Failure(ex) =>
|
||||||
|
holder.supervisionDirectiveFor(decider, ex) match {
|
||||||
|
// fail fast as if supervision says so
|
||||||
|
case Supervision.Stop => failStage(ex)
|
||||||
|
case _ => pushNextIfPossible()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
override def preStart(): Unit = {
|
||||||
|
partitionsInProgress = mutable.Set()
|
||||||
|
buffer = mutable.Queue()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onPull(): Unit =
|
||||||
|
pushNextIfPossible()
|
||||||
|
|
||||||
|
override def onPush(): Unit = {
|
||||||
|
try {
|
||||||
|
val element = grab(in)
|
||||||
|
val partition = extractPartition(element)
|
||||||
|
|
||||||
|
val wrappedInput = new Contextual(
|
||||||
|
contextPropagation.currentContext(),
|
||||||
|
new Holder[In, Out](element, NotYetThere, futureCB))
|
||||||
|
|
||||||
|
buffer.enqueue(partition -> wrappedInput)
|
||||||
|
|
||||||
|
if (canStartNextElement(partition)) {
|
||||||
|
processElement(partition, wrappedInput)
|
||||||
|
} else {
|
||||||
|
wrappedInput.suspend()
|
||||||
|
}
|
||||||
|
} catch {
|
||||||
|
case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex)
|
||||||
|
}
|
||||||
|
|
||||||
|
pullIfNeeded()
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onUpstreamFinish(): Unit =
|
||||||
|
if (idle()) completeStage()
|
||||||
|
|
||||||
|
private def processElement(partition: Partition, wrappedInput: Contextual[Holder[In, Out]]): Unit = {
|
||||||
|
import wrappedInput.{ element => holder }
|
||||||
|
val future = f(holder.in, partition)
|
||||||
|
|
||||||
|
partitionsInProgress += partition
|
||||||
|
|
||||||
|
future.value match {
|
||||||
|
case None => future.onComplete(holder)(ExecutionContexts.parasitic)
|
||||||
|
case Some(v) =>
|
||||||
|
// #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and
|
||||||
|
// run the logic directly on this thread
|
||||||
|
holder.setOut(v)
|
||||||
|
v match {
|
||||||
|
// this optimization also requires us to stop the stage to fail fast if the decider says so:
|
||||||
|
case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop => failStage(ex)
|
||||||
|
case _ => pushNextIfPossible()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private val pushNextIfPossible: () => Unit =
|
||||||
|
if (orderedOutput) pushNextIfPossibleOrdered _
|
||||||
|
else pushNextIfPossibleUnordered _
|
||||||
|
|
||||||
|
private def pushNextIfPossibleOrdered(): Unit =
|
||||||
|
if (partitionsInProgress.isEmpty) {
|
||||||
|
drainQueue()
|
||||||
|
pullIfNeeded()
|
||||||
|
} else {
|
||||||
|
while (buffer.nonEmpty && !(buffer.front._2.element.out eq NotYetThere) && isAvailable(out)) {
|
||||||
|
val (partition, wrappedInput) = buffer.dequeue()
|
||||||
|
import wrappedInput.{ element => holder }
|
||||||
|
partitionsInProgress -= partition
|
||||||
|
|
||||||
|
holder.out match {
|
||||||
|
case Success(elem) =>
|
||||||
|
if (elem != null) {
|
||||||
|
push(out, elem)
|
||||||
|
pullIfNeeded()
|
||||||
|
} else {
|
||||||
|
// elem is null
|
||||||
|
pullIfNeeded()
|
||||||
|
}
|
||||||
|
|
||||||
|
case Failure(NonFatal(ex)) =>
|
||||||
|
holder.supervisionDirectiveFor(decider, ex) match {
|
||||||
|
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
|
||||||
|
// onComplete callback has run
|
||||||
|
case Supervision.Stop =>
|
||||||
|
failStage(ex)
|
||||||
|
case _ =>
|
||||||
|
// try next element
|
||||||
|
}
|
||||||
|
case Failure(ex) =>
|
||||||
|
// fatal exception in buffer, not sure that it can actually happen, but for good measure
|
||||||
|
throw ex
|
||||||
|
}
|
||||||
|
}
|
||||||
|
drainQueue()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def pushNextIfPossibleUnordered(): Unit =
|
||||||
|
if (partitionsInProgress.isEmpty) {
|
||||||
|
drainQueue()
|
||||||
|
pullIfNeeded()
|
||||||
|
} else {
|
||||||
|
buffer = buffer.filter { case (partition, wrappedInput) =>
|
||||||
|
import wrappedInput.{ element => holder }
|
||||||
|
|
||||||
|
if ((holder.out eq MapAsyncPartitioned.NotYetThere) || !isAvailable(out)) {
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
partitionsInProgress -= partition
|
||||||
|
|
||||||
|
holder.out match {
|
||||||
|
case Success(elem) =>
|
||||||
|
if (elem != null) {
|
||||||
|
push(out, elem)
|
||||||
|
}
|
||||||
|
|
||||||
|
case Failure(NonFatal(ex)) =>
|
||||||
|
holder.supervisionDirectiveFor(decider, ex) match {
|
||||||
|
// this could happen if we are looping in pushNextIfPossible and end up on a failed future before the
|
||||||
|
// onComplete callback has run
|
||||||
|
case Supervision.Stop =>
|
||||||
|
failStage(ex)
|
||||||
|
case _ =>
|
||||||
|
// try next element
|
||||||
|
}
|
||||||
|
case Failure(ex) =>
|
||||||
|
// fatal exception in buffer, not sure that it can actually happen, but for good measure
|
||||||
|
throw ex
|
||||||
|
}
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
pullIfNeeded()
|
||||||
|
drainQueue()
|
||||||
|
}
|
||||||
|
|
||||||
|
private def drainQueue(): Unit = {
|
||||||
|
buffer.foreach {
|
||||||
|
case (partition, wrappedInput) =>
|
||||||
|
if (canStartNextElement(partition)) {
|
||||||
|
wrappedInput.resume()
|
||||||
|
processElement(partition, wrappedInput)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def pullIfNeeded(): Unit =
|
||||||
|
if (isClosed(in) && idle()) completeStage()
|
||||||
|
else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in)
|
||||||
|
// else already pulled and waiting for next element
|
||||||
|
|
||||||
|
private def idle(): Boolean =
|
||||||
|
buffer.isEmpty
|
||||||
|
|
||||||
|
private def canStartNextElement(partition: Partition): Boolean =
|
||||||
|
!partitionsInProgress(partition) && partitionsInProgress.size < parallelism
|
||||||
|
|
||||||
|
setHandlers(in, out, this)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -839,6 +839,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr
|
||||||
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
|
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] =
|
||||||
new Flow(delegate.mapAsync(parallelism)(x => f(x).asScala))
|
new Flow(delegate.mapAsync(parallelism)(x => f(x).asScala))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[T, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
|
||||||
|
MapAsyncPartitioned.mapFlowOrdered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] =
|
||||||
|
MapAsyncPartitioned.mapFlowUnordered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform this stream by applying the given function to each of the elements
|
* Transform this stream by applying the given function to each of the elements
|
||||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||||
|
|
|
||||||
|
|
@ -178,6 +178,36 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
|
||||||
f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
|
f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
|
||||||
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
|
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[Out2, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
|
||||||
|
viaScala(_.mapAsyncPartitioned(parallelism)(extractPartition(_))(f(_, _).asScala))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = {
|
||||||
|
viaScala(_.mapAsyncPartitionedUnordered(parallelism)(extractPartition(_))(f(_, _).asScala))
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Context-preserving variant of [[pekko.stream.javadsl.Flow.mapConcat]].
|
* Context-preserving variant of [[pekko.stream.javadsl.Flow.mapConcat]].
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -2493,6 +2493,36 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[
|
||||||
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
|
def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] =
|
||||||
new Source(delegate.mapAsync(parallelism)(x => f(x).asScala))
|
new Source(delegate.mapAsync(parallelism)(x => f(x).asScala))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[T, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] =
|
||||||
|
MapAsyncPartitioned.mapSourceOrdered(delegate, parallelism)(extractPartition(_))(f(_,
|
||||||
|
_).asScala).asJava
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[T, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] =
|
||||||
|
MapAsyncPartitioned.mapSourceUnordered(delegate, parallelism)(extractPartition(_))(f(_,
|
||||||
|
_).asScala).asJava
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Transform this stream by applying the given function to each of the elements
|
* Transform this stream by applying the given function to each of the elements
|
||||||
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
* as they pass through this processing step. The function returns a `CompletionStage` and the
|
||||||
|
|
|
||||||
|
|
@ -174,6 +174,40 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
|
||||||
f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] =
|
f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] =
|
||||||
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
|
viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[Out2, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapSourceWithContextOrdered(delegate, parallelism)(extractPartition(_))(f(_,
|
||||||
|
_).asScala)
|
||||||
|
.asJava
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int,
|
||||||
|
extractPartition: function.Function[Out, P],
|
||||||
|
f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapSourceWithContextUnordered(delegate, parallelism)(extractPartition(_))(f(_,
|
||||||
|
_).asScala)
|
||||||
|
.asJava
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]].
|
* Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]].
|
||||||
*
|
*
|
||||||
|
|
|
||||||
|
|
@ -163,6 +163,36 @@ final class Flow[-In, +Out, +Mat](
|
||||||
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
|
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
|
||||||
new Flow(traversalBuilder.transformMat(f), shape)
|
new Flow(traversalBuilder.transformMat(f), shape)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(
|
||||||
|
f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapFlowOrdered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(
|
||||||
|
f: (Out, P) => Future[T]): Flow[In, T, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapFlowUnordered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]].
|
* Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]].
|
||||||
* The returned flow is partial materialized and do not support multiple times materialization.
|
* The returned flow is partial materialized and do not support multiple times materialization.
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@
|
||||||
package org.apache.pekko.stream.scaladsl
|
package org.apache.pekko.stream.scaladsl
|
||||||
|
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
import scala.concurrent.Future
|
||||||
import org.apache.pekko
|
import org.apache.pekko
|
||||||
import pekko.NotUsed
|
import pekko.NotUsed
|
||||||
import pekko.japi.Pair
|
import pekko.japi.Pair
|
||||||
|
|
@ -90,6 +90,36 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
|
||||||
def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] =
|
def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] =
|
||||||
new FlowWithContext(delegate.mapMaterializedValue(f))
|
new FlowWithContext(delegate.mapMaterializedValue(f))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(
|
||||||
|
f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapFlowWithContextOrdered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(
|
||||||
|
f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapFlowWithContextUnordered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
|
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
|
||||||
|
|
||||||
def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat]
|
def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat]
|
||||||
|
|
|
||||||
|
|
@ -99,6 +99,34 @@ final class Source[+Out, +Mat](
|
||||||
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
|
override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] =
|
||||||
new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any => Any]), shape)
|
new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any => Any]), shape)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapSourceOrdered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapSourceUnordered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source
|
* Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source
|
||||||
* that can be used to consume elements from the newly materialized Source.
|
* that can be used to consume elements from the newly materialized Source.
|
||||||
|
|
|
||||||
|
|
@ -14,7 +14,7 @@
|
||||||
package org.apache.pekko.stream.scaladsl
|
package org.apache.pekko.stream.scaladsl
|
||||||
|
|
||||||
import scala.annotation.unchecked.uncheckedVariance
|
import scala.annotation.unchecked.uncheckedVariance
|
||||||
|
import scala.concurrent.Future
|
||||||
import org.apache.pekko
|
import org.apache.pekko
|
||||||
import pekko.stream._
|
import pekko.stream._
|
||||||
|
|
||||||
|
|
@ -78,6 +78,34 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
|
||||||
def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] =
|
def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] =
|
||||||
new SourceWithContext(delegate.mapMaterializedValue(f))
|
new SourceWithContext(delegate.mapMaterializedValue(f))
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsync]]
|
||||||
|
* @see [[#mapAsyncPartitionedUnordered]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitioned[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapSourceWithContextOrdered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional
|
||||||
|
* partition step before the transform step. The transform function receives the an individual
|
||||||
|
* stream entry and the calculated partition value for that entry.
|
||||||
|
*
|
||||||
|
* @since 1.1.0
|
||||||
|
* @see [[#mapAsyncUnordered]]
|
||||||
|
* @see [[#mapAsyncPartitioned]]
|
||||||
|
*/
|
||||||
|
def mapAsyncPartitionedUnordered[T, P](parallelism: Int)(
|
||||||
|
extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = {
|
||||||
|
MapAsyncPartitioned.mapSourceWithContextUnordered(this, parallelism)(extractPartition)(f)
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]],
|
* Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]],
|
||||||
* concatenating the processing steps of both.
|
* concatenating the processing steps of both.
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue