diff --git a/build.sbt b/build.sbt index 246854995f..b98261030f 100644 --- a/build.sbt +++ b/build.sbt @@ -103,6 +103,7 @@ lazy val aggregatedProjects: Seq[ProjectReference] = userProjects ++ List[Projec persistenceTypedTests, remoteTests, streamTests, + streamTypedTests, streamTestsTck) lazy val root = Project(id = "pekko", base = file(".")) @@ -567,6 +568,12 @@ lazy val streamTyped = pekkoModule("stream-typed") .settings(AutomaticModuleName.settings("pekko.stream.typed")) .enablePlugins(ScaladocNoVerificationOfDiagrams) +lazy val streamTypedTests = pekkoModule("stream-typed-tests") + .dependsOn(streamTestkit % "test->test", streamTyped) + .settings(Dependencies.streamTests) + .enablePlugins(NoPublish) + .disablePlugins(MimaPlugin) + lazy val actorTestkitTyped = pekkoModule("actor-testkit-typed") .dependsOn(actorTyped, slf4j, testkit % "compile->compile;test->test") .settings(AutomaticModuleName.settings("pekko.actor.testkit.typed")) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md index 10aef9444f..48b87c9f95 100644 --- a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsync.md @@ -1,6 +1,6 @@ # mapAsync -Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result. +Pass incoming elements to a function that returns a @scala[`Future`] @java[`CompletionStage`] result. @ref[Asynchronous operators](../index.md#asynchronous-operators) diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncPartitioned.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncPartitioned.md new file mode 100644 index 0000000000..69df57e400 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncPartitioned.md @@ -0,0 +1,28 @@ +# mapAsyncPartitioned + +Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will have elements that retain the order of the original Source or Flow. + +@ref[Asynchronous operators](../index.md#asynchronous-operators) + +## Signature + +@apidoc[Source.mapAsyncPartitioned](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" } +@apidoc[Flow.mapAsyncPartitioned](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" } + +## Description + +Like `mapAsync` but an intermediate partitioning stage is used. +Up to `parallelism` elements can be processed concurrently, but regardless of their completion time the incoming +order will be kept when results complete. For use cases where order does not matter, `mapAsyncPartitionedUnordered` can be used. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** when the next in order @scala[`Future`] @java[`CompletionStage`] returned by the provided function completes successfully + +**backpressures** when downstream backgpressures and completed and incomplete @scala[`Future`] @java[`CompletionStage`] has reached the configured `parallelism` + +**completes** when upstream completes and all @scala[Futures] @java[CompletionStages] have completed and all results have been emitted + +@@@ diff --git a/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncPartitionedUnordered.md b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncPartitionedUnordered.md new file mode 100644 index 0000000000..8db4d67f28 --- /dev/null +++ b/docs/src/main/paradox/stream/operators/Source-or-Flow/mapAsyncPartitionedUnordered.md @@ -0,0 +1,29 @@ +# mapAsyncPartitionedUnordered + +Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will not have ordered elements. + +@ref[Asynchronous operators](../index.md#asynchronous-operators) + +## Signature + +@apidoc[Source.mapAsyncPartitionedUnordered](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" } +@apidoc[Flow.mapAsyncPartitionedUnordered](Source) { scala="#mapAsyncPartitioned[T,P](parallelism:Int)(partitioner:Out=%3EP)(f:(Out,P)=%3Escala.concurrent.Future[T]):FlowOps.this.Repr[T]" java="#mapAsyncPartitioned(int,org.apache.pekko.japi.function.Function,org.apache.pekko.japi.function.Function2" } + +## Description + +Like `mapAsyncUnordered` but an intermediate partitioning stage is used. +Up to `parallelism` elements can be processed concurrently for a partition and pushed down the stream regardless of the +order of the partitions that triggered them. In other words, the order of the output elements will be preserved only within a partition. +For use cases where order matters, `mapAsyncPartitioned` can be used. + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** any of the @scala[`Future` s] @java[`CompletionStage` s] returned by the provided function complete + +**backpressures** when the number of @scala[`Future` s] @java[`CompletionStage` s] reaches the configured parallelism and the downstream backpressures + +**completes** upstream completes and all @scala[`Future` s] @java[`CompletionStage` s] has been completed and all elements has been emitted + +@@@ diff --git a/docs/src/main/paradox/stream/operators/index.md b/docs/src/main/paradox/stream/operators/index.md index e6cb416cdc..28be441d18 100644 --- a/docs/src/main/paradox/stream/operators/index.md +++ b/docs/src/main/paradox/stream/operators/index.md @@ -198,7 +198,9 @@ operation at the same time (usually handling the completion of a @scala[`Future` | |Operator|Description| |--|--|--| -|Source/Flow|@ref[mapAsync](Source-or-Flow/mapAsync.md)|Pass incoming elements to a function that return a @scala[`Future`] @java[`CompletionStage`] result.| +|Source/Flow|@ref[mapAsync](Source-or-Flow/mapAsync.md)|Pass incoming elements to a function that returns a @scala[`Future`] @java[`CompletionStage`] result.| +|Source/Flow|@ref[mapAsyncPartitioned](Source-or-Flow/mapAsyncPartitioned.md)|Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will have elements that retain the order of the original Source or Flow.| +|Source/Flow|@ref[mapAsyncPartitionedUnordered](Source-or-Flow/mapAsyncPartitionedUnordered.md)|Pass incoming elements to a partitioning function that returns a partition result for each element and then to a processing function that returns a @scala[`Future`] @java[`CompletionStage`] result. The resulting Source or Flow will not have ordered elements.| |Source/Flow|@ref[mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md)|Like `mapAsync` but @scala[`Future`] @java[`CompletionStage`] results are passed downstream as they arrive regardless of the order of the elements that triggered them.| ## Timer driven operators @@ -512,6 +514,8 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [logWithMarker](Source-or-Flow/logWithMarker.md) * [map](Source-or-Flow/map.md) * [mapAsync](Source-or-Flow/mapAsync.md) +* [mapAsyncPartitioned](Source-or-Flow/mapAsyncPartitioned.md) +* [mapAsyncPartitionedUnordered](Source-or-Flow/mapAsyncPartitionedUnordered.md) * [mapAsyncUnordered](Source-or-Flow/mapAsyncUnordered.md) * [mapConcat](Source-or-Flow/mapConcat.md) * [mapError](Source-or-Flow/mapError.md) diff --git a/project/CopyrightHeader.scala b/project/CopyrightHeader.scala index d4dba69293..1b9b17b9c4 100644 --- a/project/CopyrightHeader.scala +++ b/project/CopyrightHeader.scala @@ -124,9 +124,8 @@ trait CopyrightHeader extends AutoPlugin { private def isLightbendCopyrighted(text: String): Boolean = StringUtils.containsIgnoreCase(text, "lightbend inc.") - private def isValidCopyrightAnnotated(text: String): Boolean = { + private def isValidCopyrightAnnotated(text: String): Boolean = isApacheCopyrighted(text) - } private def isOnlyLightbendOrEpflCopyrightAnnotated(text: String): Boolean = { (isLightbendCopyrighted(text) || isLAMPCopyrighted(text)) && !isApacheCopyrighted(text) diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index 5df754d409..c920a8307c 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -886,6 +886,54 @@ public class FlowTest extends StreamTest { assertEquals(0, result.size()); } + @Test + public void mustBeAbleToUseMapAsyncPartitioned() throws Exception { + final TestKit probe = new TestKit(system); + final Iterable input = Arrays.asList("2c", "1a", "1b"); + final Flow flow = + Flow.of(String.class) + .mapAsyncPartitioned( + 4, + elem -> elem.substring(0, 1), + (elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase())); + Source.from(input) + .via(flow) + .runForeach( + new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, + system); + probe.expectMsgEquals("2C"); + probe.expectMsgEquals("1A"); + probe.expectMsgEquals("1B"); + } + + @Test + public void mustBeAbleToUseMapAsyncPartitionedUnordered() throws Exception { + final TestKit probe = new TestKit(system); + final Iterable input = Arrays.asList("1a", "1b", "2c"); + final Flow flow = + Flow.of(String.class) + .mapAsyncPartitionedUnordered( + 4, + elem -> elem.substring(0, 1), + (elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase())); + Source.from(input) + .via(flow) + .runForeach( + new Procedure() { + public void apply(String elem) { + probe.getRef().tell(elem, ActorRef.noSender()); + } + }, + system); + probe.expectMsgEquals("1A"); + probe.expectMsgEquals("1B"); + probe.expectMsgEquals("2C"); + } + @Test public void mustBeAbleToUseCollectType() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 4be43bbecb..b316b96a9d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -568,6 +568,36 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals("C"); } + @Test + public void mustBeAbleToUseMapAsyncPartitioned() throws Exception { + final TestKit probe = new TestKit(system); + final Iterable input = Arrays.asList("2c", "1a", "1b"); + Source.from(input) + .mapAsyncPartitioned( + 4, + elem -> elem.substring(0, 1), + (elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase())) + .runForeach(elem -> probe.getRef().tell(elem, ActorRef.noSender()), system); + probe.expectMsgEquals("2C"); + probe.expectMsgEquals("1A"); + probe.expectMsgEquals("1B"); + } + + @Test + public void mustBeAbleToUseMapAsyncPartitionedUnordered() throws Exception { + final TestKit probe = new TestKit(system); + final Iterable input = Arrays.asList("1a", "1b", "2c"); + Source.from(input) + .mapAsyncPartitionedUnordered( + 4, + elem -> elem.substring(0, 1), + (elem, p) -> CompletableFuture.completedFuture(elem.toUpperCase())) + .runForeach(elem -> probe.getRef().tell(elem, ActorRef.noSender()), system); + probe.expectMsgEquals("1A"); + probe.expectMsgEquals("1B"); + probe.expectMsgEquals("2C"); + } + @Test public void mustBeAbleToUseCollectType() throws Exception { final TestKit probe = new TestKit(system); diff --git a/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala new file mode 100644 index 0000000000..d70e7fabce --- /dev/null +++ b/stream-typed-tests/src/test/scala/org/apache/pekko/stream/MapAsyncPartitionedSpec.scala @@ -0,0 +1,449 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import org.apache.pekko.actor.typed.ActorSystem +import org.apache.pekko.actor.typed.scaladsl.Behaviors +import org.apache.pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, Sink, Source, SourceWithContext } +import org.scalacheck.{ Arbitrary, Gen } +import org.scalatest.BeforeAndAfterAll +import org.scalatest.concurrent.ScalaFutures +import org.scalatest.flatspec.AnyFlatSpec +import org.scalatest.matchers.should.Matchers +import org.scalatestplus.scalacheck.ScalaCheckDrivenPropertyChecks + +import java.time.Instant +import java.util.concurrent.Executors +import scala.annotation.nowarn +import scala.concurrent.duration.{ DurationInt, FiniteDuration } +import scala.concurrent.{ blocking, ExecutionContext, Future } +import scala.language.postfixOps +import scala.util.Random + +private object MapAsyncPartitionedSpec { + + object TestData { + + case class Parallelism(value: Int) extends AnyVal + + case class TestKeyValue(key: Int, delay: FiniteDuration, value: String) + + implicit val parallelismArb: Arbitrary[Parallelism] = Arbitrary { + Gen.choose(2, 8).map(Parallelism.apply) + } + implicit val elementsArb: Arbitrary[Seq[TestKeyValue]] = Arbitrary { + for { + totalElements <- Gen.choose(1, 100) + totalPartitions <- Gen.choose(1, 8) + } yield { + generateElements(totalPartitions, totalElements) + } + } + + def generateElements(totalPartitions: Int, totalElements: Int): Seq[TestKeyValue] = + for (i <- 1 to totalElements) yield { + TestKeyValue( + key = Random.nextInt(totalPartitions), + delay = DurationInt(Random.nextInt(20) + 10).millis, + value = i.toString) + } + + def extractPartition(e: TestKeyValue): Int = + e.key + + type Operation = TestKeyValue => Future[(Int, String)] + + def asyncOperation(e: TestKeyValue, p: Int)(implicit ec: ExecutionContext): Future[(Int, String)] = + Future { + p -> e.value + } + + def blockingOperation(e: TestKeyValue, p: Int)(implicit ec: ExecutionContext): Future[(Int, String)] = + Future { + blocking { + Thread.sleep(e.delay.toMillis) + p -> e.value + } + } + + } +} + +class MapAsyncPartitionedSpec + extends AnyFlatSpec + with Matchers + with ScalaFutures + with BeforeAndAfterAll + with ScalaCheckDrivenPropertyChecks { + + import MapAsyncPartitionedSpec.TestData._ + + override implicit def patienceConfig: PatienceConfig = PatienceConfig( + timeout = 5 seconds, + interval = 100 millis) + + private implicit val system: ActorSystem[_] = ActorSystem(Behaviors.empty, "test-system") + private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newCachedThreadPool()) + + override protected def afterAll(): Unit = { + system.terminate() + system.whenTerminated.futureValue + super.afterAll() + } + + @nowarn("msg=deprecated") // use Stream to support Scala 2.12 + private def infiniteStream(): Stream[Int] = Stream.from(1) + + @nowarn("msg=never used") + private def f(i: Int, p: Int): Future[Int] = + Future(i % 2) + + behavior.of("MapAsyncPartitionedUnordered") + + it should "process elements in parallel by partition" in { + val elements = List( + TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"), + TestKeyValue(key = 2, delay = 700 millis, value = "2.a"), + TestKeyValue(key = 1, delay = 500 millis, value = "1.b"), + TestKeyValue(key = 2, delay = 900 millis, value = "2.b"), + TestKeyValue(key = 1, delay = 500 millis, value = "1.c")) + + val result = + Source(elements) + .mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(blockingOperation) + .runWith(Sink.seq) + .futureValue + .map(_._2) + + result shouldBe Vector("2.a", "1.a", "1.b", "2.b", "1.c") + } + + it should "process elements in parallel preserving order in partition" in { + forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) => + val result = + Source(elements.toIndexedSeq) + .mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(asyncOperation) + .runWith(Sink.seq) + .futureValue + + val actual = result.groupBy(_._1).mapValues2(_.map(_._2)).toMap + val expected = elements.toSeq.groupBy(_.key).mapValues2(_.map(_.value)).toMap + + actual shouldBe expected + } + } + + it should "process elements in sequence preserving order in partition" in { + forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) => + val result = + Source + .fromIterator(() => elements.iterator) + .mapAsyncPartitionedUnordered(parallelism = 1)(extractPartition)(asyncOperation) + .runWith(Sink.seq) + .futureValue + + val actual = result.groupBy(_._1).mapValues2(_.map(_._2)).toMap + val expected = elements.toSeq.groupBy(_.key).mapValues2(_.map(_.value)).toMap + + actual shouldBe expected + } + } + + it should "process elements in parallel preserving order in partition with blocking operation" in { + forAll(minSuccessful(10)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) => + val result = + Source + .fromIterator(() => elements.iterator) + .mapAsyncPartitionedUnordered(parallelism.value)(extractPartition)(blockingOperation) + .runWith(Sink.seq) + .futureValue + + val actual = result.groupBy(_._1).mapValues2(_.map(_._2)).toMap + val expected = elements.toSeq.groupBy(_.key).mapValues2(_.map(_.value)).toMap + + actual shouldBe expected + } + } + + it should "stop the stream via a KillSwitch" in { + val (killSwitch, future) = + Source(infiniteStream()) + .mapAsyncPartitionedUnordered(parallelism = 6)(i => i % 6) { (i, _) => + Future { + blocking { + Thread.sleep(40) + (i % 6).toString -> i.toString + } + } + } + .viaMat(KillSwitches.single)(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + Thread.sleep(500) + + killSwitch.shutdown() + + val result = future.futureValue.groupBy(_._1) + result should have size 6 + result.values.foreach { + _.size should be >= 10 + } + } + + it should "stop the stream if any operation fails" in { + val future = + Source(infiniteStream()) + .mapAsyncPartitionedUnordered(parallelism = 4)(i => i % 8) { (i, _) => + Future { + if (i == 23) throw new RuntimeException("Ignore it") + else i.toString + } + } + .toMat(Sink.ignore)(Keep.right) + .run() + + future.failed.futureValue shouldBe a[RuntimeException] + } + + it should "handle nulls" in { + val elements = List( + TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"), + TestKeyValue(key = 2, delay = 700 millis, value = "2.a"), + TestKeyValue(key = 1, delay = 500 millis, value = null)) + + @nowarn("msg=never used") + def fun(v: TestKeyValue, p: Int): Future[String] = Future.successful(v.value) + + val result = + Source(elements) + .mapAsyncPartitionedUnordered(parallelism = 2)(extractPartition)(fun) + .runWith(Sink.seq) + .futureValue + + result.toSet shouldBe Set("1.a", "2.a") + } + + it should "fail to create an operator if parallelism is less than 1" in { + forAll(Gen.negNum[Int]) { zeroOrNegativeParallelism: Int => + an[IllegalArgumentException] shouldBe thrownBy { + Source(infiniteStream()) + .mapAsyncPartitionedUnordered( + parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit) + .runWith(Sink.ignore) + .futureValue + } + } + } + + behavior.of("MapAsyncPartitionedOrdered") + + it should "process elements in parallel by partition" in { + val elements = List( + TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"), + TestKeyValue(key = 2, delay = 700 millis, value = "2.a"), + TestKeyValue(key = 1, delay = 500 millis, value = "1.b"), + TestKeyValue(key = 1, delay = 500 millis, value = "1.c"), + TestKeyValue(key = 2, delay = 900 millis, value = "2.b")) + + def processElement(e: TestKeyValue, p: Int)(implicit ec: ExecutionContext): Future[(Int, (String, Instant))] = + Future { + blocking { + val startedAt = Instant.now() + Thread.sleep(e.delay.toMillis) + p -> (e.value -> startedAt) + } + } + + val result = + Source(elements) + .mapAsyncPartitioned(parallelism = 2)(extractPartition)(processElement) + .runWith(Sink.seq) + .futureValue + .map(_._2) + + result.map(_._1) shouldBe Vector("1.a", "2.a", "1.b", "1.c", "2.b") + val elementStartTime = result.toMap + + elementStartTime("1.a") should be < elementStartTime("1.b") + elementStartTime("1.b") should be < elementStartTime("1.c") + elementStartTime("2.a") should be < elementStartTime("2.b") + } + + it should "process elements in parallel preserving order in partition" in { + forAll(minSuccessful(1000)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) => + val result = + Source(elements.toIndexedSeq) + .mapAsyncPartitioned(parallelism.value)(extractPartition)(asyncOperation) + .runWith(Sink.seq) + .futureValue + + val actual = result.map(_._2) + val expected = elements.map(_.value) + + actual shouldBe expected + } + } + + it should "process elements in sequence preserving order in partition" in { + forAll(minSuccessful(1000)) { (elements: Seq[TestKeyValue]) => + val result = + Source + .fromIterator(() => elements.iterator) + .mapAsyncPartitioned(parallelism = 1)(extractPartition)(asyncOperation) + .runWith(Sink.seq) + .futureValue + + val actual = result.map(_._2) + val expected = elements.map(_.value) + + actual shouldBe expected + } + } + + it should "process elements in parallel preserving order in partition with blocking operation" in { + forAll(minSuccessful(10)) { (parallelism: Parallelism, elements: Seq[TestKeyValue]) => + val result = + Source + .fromIterator(() => elements.iterator) + .mapAsyncPartitioned(parallelism.value)(extractPartition)(blockingOperation) + .runWith(Sink.seq) + .futureValue + + val actual = result.map(_._2) + val expected = elements.map(_.value) + + actual shouldBe expected + } + } + + it should "stop the stream via a KillSwitch" in { + val (killSwitch, future) = + Source(infiniteStream()) + .mapAsyncPartitioned(parallelism = 6)(i => i % 6) { (i, _) => + Future { + blocking { + Thread.sleep(40) + (i % 6).toString -> i.toString + } + } + } + .viaMat(KillSwitches.single)(Keep.right) + .toMat(Sink.seq)(Keep.both) + .run() + + Thread.sleep(500) + + killSwitch.shutdown() + + val result = future.futureValue.groupBy(_._1) + result should have size 6 + result.values.foreach { + _.size should be >= 10 + } + } + + it should "stop the stream if any operation fails" in { + val future = + Source(infiniteStream()) + .mapAsyncPartitioned(parallelism = 4)(i => i % 8) { (i, _) => + Future { + if (i == 23) throw new RuntimeException("Ignore it") + else i.toString + } + } + .toMat(Sink.ignore)(Keep.right) + .run() + + future.failed.futureValue shouldBe a[RuntimeException] + } + + it should "handle nulls" in { + val elements = List( + TestKeyValue(key = 1, delay = 1000 millis, value = "1.a"), + TestKeyValue(key = 2, delay = 700 millis, value = "2.a"), + TestKeyValue(key = 1, delay = 500 millis, value = null)) + + @nowarn("msg=never used") + def fun(v: TestKeyValue, p: Int): Future[String] = Future.successful(v.value) + + val result = + Source(elements) + .mapAsyncPartitioned(parallelism = 2)(extractPartition)(fun) + .runWith(Sink.seq) + .futureValue + + result shouldBe Seq("1.a", "2.a") + } + + it should "fail to create an operator if parallelism is less than 1" in { + forAll(Gen.negNum[Int]) { zeroOrNegativeParallelism: Int => + an[IllegalArgumentException] shouldBe thrownBy { + Source(infiniteStream()) + .mapAsyncPartitioned( + parallelism = zeroOrNegativeParallelism)(extractPartition = identity)(f = (_, _) => Future.unit) + .runWith(Sink.ignore) + .futureValue + } + } + } + + behavior.of("operator applicability") + + it should "be applicable to a source" in { + Source + .single(3) + .mapAsyncPartitioned(parallelism = 1)(identity)(f) + .runWith(Sink.seq) + .futureValue shouldBe Seq(1) + } + + it should "be applicable to a source with context" in { + SourceWithContext + .fromTuples(Source.single(3 -> "A")) + .mapAsyncPartitioned(parallelism = 1)(identity)(f) + .runWith(Sink.seq) + .futureValue shouldBe Seq(1 -> "A") + } + + it should "be applicable to a flow" in { + Flow[Int] + .mapAsyncPartitioned(parallelism = 1)(identity)(f) + .runWith(Source.single(3), Sink.seq) + ._2 + .futureValue shouldBe Seq(1) + } + + it should "be applicable to a flow with context" in { + val flow = + FlowWithContext[Int, String] + .mapAsyncPartitioned(parallelism = 1)(identity)(f) + + SourceWithContext + .fromTuples(Source.single(3 -> "A")) + .via(flow) + .runWith(Sink.seq) + .futureValue shouldBe Seq(1 -> "A") + } + + private implicit class MapWrapper[K, V](map: Map[K, V]) { + @nowarn("msg=deprecated") + def mapValues2[W](f: V => W) = map.mapValues(f) + } + +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala new file mode 100644 index 0000000000..f328d66fe0 --- /dev/null +++ b/stream/src/main/scala/org/apache/pekko/stream/MapAsyncPartitioned.scala @@ -0,0 +1,346 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.pekko.stream + +import scala.collection.mutable +import scala.concurrent.Future +import scala.util.control.{ NoStackTrace, NonFatal } +import scala.util.{ Failure, Success, Try } + +import org.apache.pekko +import pekko.dispatch.ExecutionContexts +import pekko.stream.ActorAttributes.SupervisionStrategy +import pekko.stream.Attributes.{ Name, SourceLocation } +import pekko.stream.MapAsyncPartitioned._ +import pekko.stream.scaladsl.{ Flow, FlowWithContext, Source, SourceWithContext } +import pekko.stream.stage._ + +private[stream] object MapAsyncPartitioned { + + private def extractPartitionWithCtx[In, Ctx, Partition](extract: In => Partition)(tuple: (In, Ctx)): Partition = + extract(tuple._1) + + private def fWithCtx[In, Out, Ctx, Partition](f: (In, Partition) => Future[Out])(tuple: (In, Ctx), + partition: Partition): Future[(Out, Ctx)] = + f(tuple._1, partition).map(_ -> tuple._2)(ExecutionContexts.parasitic) + + def mapSourceOrdered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)( + extractPartition: In => Partition)( + f: (In, Partition) => Future[Out]): Source[Out, Mat] = + source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = true, parallelism, extractPartition, f)) + + def mapSourceUnordered[In, Out, Partition, Mat](source: Source[In, Mat], parallelism: Int)( + extractPartition: In => Partition)( + f: (In, Partition) => Future[Out]): Source[Out, Mat] = + source.via(new MapAsyncPartitioned[In, Out, Partition](orderedOutput = false, parallelism, extractPartition, f)) + + def mapSourceWithContextOrdered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat], parallelism: Int)( + extractPartition: In => Partition)( + f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] = + flow.via( + new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition]( + orderedOutput = true, + parallelism, + extractPartitionWithCtx(extractPartition), + fWithCtx[In, T, Ctx, Partition](f))) + + def mapSourceWithContextUnordered[In, Ctx, T, Partition, Mat](flow: SourceWithContext[In, Ctx, Mat], + parallelism: Int)(extractPartition: In => Partition)( + f: (In, Partition) => Future[T]): SourceWithContext[T, Ctx, Mat] = + flow.via( + new MapAsyncPartitioned[(In, Ctx), (T, Ctx), Partition]( + orderedOutput = false, + parallelism, + extractPartitionWithCtx(extractPartition), + fWithCtx[In, T, Ctx, Partition](f))) + + def mapFlowOrdered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)( + extractPartition: Out => Partition)( + f: (Out, Partition) => Future[T]): Flow[In, T, Mat] = + flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = true, parallelism, extractPartition, + f)) + + def mapFlowUnordered[In, Out, T, Partition, Mat](flow: Flow[In, Out, Mat], parallelism: Int)( + extractPartition: Out => Partition)( + f: (Out, Partition) => Future[T]): Flow[In, T, Mat] = + flow.via(new MapAsyncPartitioned[Out, T, Partition](orderedOutput = false, parallelism, + extractPartition, f)) + + def mapFlowWithContextOrdered[In, Out, CtxIn, CtxOut, T, Partition, Mat]( + flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)( + extractPartition: Out => Partition)( + f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = + flow.via( + new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition]( + orderedOutput = true, + parallelism, + extractPartitionWithCtx(extractPartition), + fWithCtx[Out, T, CtxOut, Partition](f))) + + def mapFlowWithContextUnordered[In, Out, CtxIn, CtxOut, T, Partition, Mat]( + flow: FlowWithContext[In, CtxIn, Out, CtxOut, Mat], parallelism: Int)(extractPartition: Out => Partition)( + f: (Out, Partition) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = + flow.via( + new MapAsyncPartitioned[(Out, CtxOut), (T, CtxOut), Partition]( + orderedOutput = false, + parallelism, + extractPartitionWithCtx(extractPartition), + fWithCtx[Out, T, CtxOut, Partition](f))) + + private[stream] val NotYetThere: Failure[Nothing] = Failure(new Exception with NoStackTrace) + + private[stream] final class Holder[In, Out]( + val in: In, + var out: Try[Out], + callback: AsyncCallback[Holder[In, Out]]) extends (Try[Out] => Unit) { + + // To support both fail-fast when the supervision directive is Stop + // and not calling the decider multiple times (#23888) we need to cache the decider result and re-use that + private var cachedSupervisionDirective: Option[Supervision.Directive] = None + + def supervisionDirectiveFor(decider: Supervision.Decider, ex: Throwable): Supervision.Directive = { + cachedSupervisionDirective match { + case Some(d) => d + case _ => + val d = decider(ex) + cachedSupervisionDirective = Some(d) + d + } + } + + def setOut(t: Try[Out]): Unit = + out = t + + override def apply(t: Try[Out]): Unit = { + setOut(t) + callback.invoke(this) + } + } +} + +private[stream] class MapAsyncPartitioned[In, Out, Partition]( + orderedOutput: Boolean, + parallelism: Int, + extractPartition: In => Partition, + f: (In, Partition) => Future[Out]) extends GraphStage[FlowShape[In, Out]] { + + if (parallelism < 1) throw new IllegalArgumentException("parallelism must be at least 1") + + private val in = Inlet[In]("MapAsyncPartitionOrdered.in") + private val out = Outlet[Out]("MapAsyncPartitionOrdered.out") + + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override def initialAttributes: Attributes = + Attributes(Name("MapAsyncPartitionOrdered")) and SourceLocation.forLambda(f) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + private val contextPropagation = pekko.stream.impl.ContextPropagation() + + private final class Contextual[T](context: AnyRef, val element: T) { + private var suspended = false + + def suspend(): Unit = + if (!suspended) { + suspended = true + contextPropagation.suspendContext() + } + + def resume(): Unit = + if (suspended) { + suspended = false + contextPropagation.resumeContext(context) + } + + } + + private lazy val decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + + private var partitionsInProgress: mutable.Set[Partition] = _ + private var buffer: mutable.Queue[(Partition, Contextual[Holder[In, Out]])] = _ + + private val futureCB = getAsyncCallback[Holder[In, Out]](holder => + holder.out match { + case Success(_) => pushNextIfPossible() + case Failure(ex) => + holder.supervisionDirectiveFor(decider, ex) match { + // fail fast as if supervision says so + case Supervision.Stop => failStage(ex) + case _ => pushNextIfPossible() + } + }) + + override def preStart(): Unit = { + partitionsInProgress = mutable.Set() + buffer = mutable.Queue() + } + + override def onPull(): Unit = + pushNextIfPossible() + + override def onPush(): Unit = { + try { + val element = grab(in) + val partition = extractPartition(element) + + val wrappedInput = new Contextual( + contextPropagation.currentContext(), + new Holder[In, Out](element, NotYetThere, futureCB)) + + buffer.enqueue(partition -> wrappedInput) + + if (canStartNextElement(partition)) { + processElement(partition, wrappedInput) + } else { + wrappedInput.suspend() + } + } catch { + case NonFatal(ex) => if (decider(ex) == Supervision.Stop) failStage(ex) + } + + pullIfNeeded() + } + + override def onUpstreamFinish(): Unit = + if (idle()) completeStage() + + private def processElement(partition: Partition, wrappedInput: Contextual[Holder[In, Out]]): Unit = { + import wrappedInput.{ element => holder } + val future = f(holder.in, partition) + + partitionsInProgress += partition + + future.value match { + case None => future.onComplete(holder)(ExecutionContexts.parasitic) + case Some(v) => + // #20217 the future is already here, optimization: avoid scheduling it on the dispatcher and + // run the logic directly on this thread + holder.setOut(v) + v match { + // this optimization also requires us to stop the stage to fail fast if the decider says so: + case Failure(ex) if holder.supervisionDirectiveFor(decider, ex) == Supervision.Stop => failStage(ex) + case _ => pushNextIfPossible() + } + } + } + + private val pushNextIfPossible: () => Unit = + if (orderedOutput) pushNextIfPossibleOrdered _ + else pushNextIfPossibleUnordered _ + + private def pushNextIfPossibleOrdered(): Unit = + if (partitionsInProgress.isEmpty) { + drainQueue() + pullIfNeeded() + } else { + while (buffer.nonEmpty && !(buffer.front._2.element.out eq NotYetThere) && isAvailable(out)) { + val (partition, wrappedInput) = buffer.dequeue() + import wrappedInput.{ element => holder } + partitionsInProgress -= partition + + holder.out match { + case Success(elem) => + if (elem != null) { + push(out, elem) + pullIfNeeded() + } else { + // elem is null + pullIfNeeded() + } + + case Failure(NonFatal(ex)) => + holder.supervisionDirectiveFor(decider, ex) match { + // this could happen if we are looping in pushNextIfPossible and end up on a failed future before the + // onComplete callback has run + case Supervision.Stop => + failStage(ex) + case _ => + // try next element + } + case Failure(ex) => + // fatal exception in buffer, not sure that it can actually happen, but for good measure + throw ex + } + } + drainQueue() + } + + private def pushNextIfPossibleUnordered(): Unit = + if (partitionsInProgress.isEmpty) { + drainQueue() + pullIfNeeded() + } else { + buffer = buffer.filter { case (partition, wrappedInput) => + import wrappedInput.{ element => holder } + + if ((holder.out eq MapAsyncPartitioned.NotYetThere) || !isAvailable(out)) { + true + } else { + partitionsInProgress -= partition + + holder.out match { + case Success(elem) => + if (elem != null) { + push(out, elem) + } + + case Failure(NonFatal(ex)) => + holder.supervisionDirectiveFor(decider, ex) match { + // this could happen if we are looping in pushNextIfPossible and end up on a failed future before the + // onComplete callback has run + case Supervision.Stop => + failStage(ex) + case _ => + // try next element + } + case Failure(ex) => + // fatal exception in buffer, not sure that it can actually happen, but for good measure + throw ex + } + false + } + } + pullIfNeeded() + drainQueue() + } + + private def drainQueue(): Unit = { + buffer.foreach { + case (partition, wrappedInput) => + if (canStartNextElement(partition)) { + wrappedInput.resume() + processElement(partition, wrappedInput) + } + } + } + + private def pullIfNeeded(): Unit = + if (isClosed(in) && idle()) completeStage() + else if (buffer.size < parallelism && !hasBeenPulled(in)) tryPull(in) + // else already pulled and waiting for next element + + private def idle(): Boolean = + buffer.isEmpty + + private def canStartNextElement(partition: Partition): Boolean = + !partitionsInProgress(partition) && partitionsInProgress.size < parallelism + + setHandlers(in, out, this) + } +} diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index f979d1bffc..1d1c8f7854 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -839,6 +839,34 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapAsync(parallelism)(x => f(x).asScala)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] = + MapAsyncPartitioned.mapFlowOrdered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): Flow[In, T, Mat] = + MapAsyncPartitioned.mapFlowUnordered(delegate, parallelism)(extractPartition(_))(f(_, _).asScala).asJava + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. The function returns a `CompletionStage` and the diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala index fdcabc8980..84736777db 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/FlowWithContext.scala @@ -178,6 +178,36 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( f: function.Function[Out, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[Out2, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = { + viaScala(_.mapAsyncPartitioned(parallelism)(extractPartition(_))(f(_, _).asScala)) + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[Out2]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = { + viaScala(_.mapAsyncPartitionedUnordered(parallelism)(extractPartition(_))(f(_, _).asScala)) + } + /** * Context-preserving variant of [[pekko.stream.javadsl.Flow.mapConcat]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index e2dee057f0..416652e5b7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -2493,6 +2493,36 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mapAsync[T](parallelism: Int, f: function.Function[Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapAsync(parallelism)(x => f(x).asScala)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] = + MapAsyncPartitioned.mapSourceOrdered(delegate, parallelism)(extractPartition(_))(f(_, + _).asScala).asJava + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[T]]): javadsl.Source[T, Mat] = + MapAsyncPartitioned.mapSourceUnordered(delegate, parallelism)(extractPartition(_))(f(_, + _).asScala).asJava + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. The function returns a `CompletionStage` and the diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala index 90c6103443..687c01fc05 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SourceWithContext.scala @@ -174,6 +174,40 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon f: function.Function[Out, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = viaScala(_.mapAsync[Out2](parallelism)(o => f.apply(o).asScala)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[Out2, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = { + MapAsyncPartitioned.mapSourceWithContextOrdered(delegate, parallelism)(extractPartition(_))(f(_, + _).asScala) + .asJava + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[Out2, P](parallelism: Int, + extractPartition: function.Function[Out, P], + f: function.Function2[Out, P, CompletionStage[Out2]]): SourceWithContext[Out2, Ctx, Mat] = { + MapAsyncPartitioned.mapSourceWithContextUnordered(delegate, parallelism)(extractPartition(_))(f(_, + _).asScala) + .asJava + } + /** * Context-preserving variant of [[pekko.stream.javadsl.Source.mapConcat]]. * diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala index 1642248639..42b29030d7 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Flow.scala @@ -163,6 +163,36 @@ final class Flow[-In, +Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] = new Flow(traversalBuilder.transformMat(f), shape) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int)( + extractPartition: Out => P)( + f: (Out, P) => Future[T]): Flow[In, T, Mat] = { + MapAsyncPartitioned.mapFlowOrdered(this, parallelism)(extractPartition)(f) + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( + extractPartition: Out => P)( + f: (Out, P) => Future[T]): Flow[In, T, Mat] = { + MapAsyncPartitioned.mapFlowUnordered(this, parallelism)(extractPartition)(f) + } + /** * Materializes this [[Flow]], immediately returning (1) its materialized value, and (2) a newly materialized [[Flow]]. * The returned flow is partial materialized and do not support multiple times materialization. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala index b2e9bced04..1def589f37 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/FlowWithContext.scala @@ -14,7 +14,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance - +import scala.concurrent.Future import org.apache.pekko import pekko.NotUsed import pekko.japi.Pair @@ -90,6 +90,36 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] = new FlowWithContext(delegate.mapMaterializedValue(f)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int)( + extractPartition: Out => P)( + f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = { + MapAsyncPartitioned.mapFlowWithContextOrdered(this, parallelism)(extractPartition)(f) + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( + extractPartition: Out => P)( + f: (Out, P) => Future[T]): FlowWithContext[In, CtxIn, T, CtxOut, Mat] = { + MapAsyncPartitioned.mapFlowWithContextUnordered(this, parallelism)(extractPartition)(f) + } + def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat] diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala index cd33cfb799..0c55691475 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/Source.scala @@ -99,6 +99,34 @@ final class Source[+Out, +Mat]( override def mapMaterializedValue[Mat2](f: Mat => Mat2): ReprMat[Out, Mat2] = new Source[Out, Mat2](traversalBuilder.transformMat(f.asInstanceOf[Any => Any]), shape) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int)( + extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = { + MapAsyncPartitioned.mapSourceOrdered(this, parallelism)(extractPartition)(f) + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( + extractPartition: Out => P)(f: (Out, P) => Future[T]): Source[T, Mat] = { + MapAsyncPartitioned.mapSourceUnordered(this, parallelism)(extractPartition)(f) + } + /** * Materializes this Source, immediately returning (1) its materialized value, and (2) a new Source * that can be used to consume elements from the newly materialized Source. diff --git a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala index 55a6ce316c..47d2c14bed 100644 --- a/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/scaladsl/SourceWithContext.scala @@ -14,7 +14,7 @@ package org.apache.pekko.stream.scaladsl import scala.annotation.unchecked.uncheckedVariance - +import scala.concurrent.Future import org.apache.pekko import pekko.stream._ @@ -78,6 +78,34 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] = new SourceWithContext(delegate.mapMaterializedValue(f)) + /** + * Transforms this stream. Works very similarly to [[#mapAsync]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsync]] + * @see [[#mapAsyncPartitionedUnordered]] + */ + def mapAsyncPartitioned[T, P](parallelism: Int)( + extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = { + MapAsyncPartitioned.mapSourceWithContextOrdered(this, parallelism)(extractPartition)(f) + } + + /** + * Transforms this stream. Works very similarly to [[#mapAsyncUnordered]] but with an additional + * partition step before the transform step. The transform function receives the an individual + * stream entry and the calculated partition value for that entry. + * + * @since 1.1.0 + * @see [[#mapAsyncUnordered]] + * @see [[#mapAsyncPartitioned]] + */ + def mapAsyncPartitionedUnordered[T, P](parallelism: Int)( + extractPartition: Out => P)(f: (Out, P) => Future[T]): SourceWithContext[T, Ctx, Mat] = { + MapAsyncPartitioned.mapSourceWithContextUnordered(this, parallelism)(extractPartition)(f) + } + /** * Connect this [[pekko.stream.scaladsl.SourceWithContext]] to a [[pekko.stream.scaladsl.Sink]], * concatenating the processing steps of both.