diff --git a/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMap.md b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMap.md new file mode 100644 index 0000000000..6f4d1870ff --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/Source-or-Flow/statefulMap.md @@ -0,0 +1,78 @@ +# statefulMap + +Transform each stream element with the help of a state. + +@ref[Simple operators](../index.md#simple-operators) + +## Signature + +@apidoc[Flow.statefulMap](Flow) { scala="#statefulMap%5BS%2CT%5D%28create%3A%28%29%3D%3ES%29%28f%3A%28S%2COut%29%20%3D%3E%28S%2CT%29%2ConComplete%3AS%3D%3EOption%5BT%5D%29%3ARepr%5BT%5D" java="#statefulMap(akka.japi.function.Creator,akka.japi.function.Function2,akka.japi.function.Function)" } + +## Description + +Transform each stream element with the help of a state. + +The state creation function is invoked once when the stream is materialized and the returned state is passed to the mapping function for mapping the first element. + +The mapping function returns a mapped element to emit downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, be a new immutable state but it is also safe to use a mutable state. + +The on complete function is called, once, when the first of upstream completion, downstream cancellation or stream failure happens. If the cause is upstream completion and the downstream is still accepting elements, the returned value from the function is passed downstream before completing the operator itself, for the other cases the returned value is ignored. + +The `statefulMap` operator adheres to the +ActorAttributes.SupervisionStrategy attribute. + +For mapping stream elements without keeping a state see @ref:[map](map.md). + +## Examples + +In the first example we implement an `zipWithIndex` operator like @ref:[zipWithIndex](zipWithIndex.md), it always associates a unique index +with each element of the stream, the index starts from 0. + +Scala +: @@snip [StatefulMap.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala) { #zipWithIndex } + +Java +: @@snip [StatefulMap.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java) { #zipWithIndex } + + + +In the second example, the elements are buffered until the incoming element is different, and then emitted downstream. +When upstream completes, if there are buffered elements, they are emitted before completing. + +Scala +: @@snip [StatefulMap.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala) { #bufferUntilChanged } + +Java +: @@snip [StatefulMap.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java) { #bufferUntilChanged } + +In the forth example, repeated incoming elements are only emitted once and then dropped. + +Scala +: @@snip [StatefulMap.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala) { #distinctUntilChanged } + +Java +: @@snip [StatefulMap.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java) { #distinctUntilChanged } + +In the fifth example, we combine the @ref:[statefulMap](statefulMap.md) and @ref:[mapConcat](mapConcat.md) to implement +a @ref:[statefulMapConcat](statefulMapConcat.md) like behavior. + +Scala +: @@snip [StatefulMap.scala](/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala) { #statefulMapConcatLike } + +Java +: @@snip [StatefulMap.java](/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java) { #statefulMapConcatLike } + + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the mapping function returns an element and downstream is ready to consume it + +**backpressures** downstream backpressures + +**completes** upstream completes + +**cancels** downstream cancels + +@@@ diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index e2f108b2e1..5deca6aa36 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -175,6 +175,7 @@ depending on being backpressured by downstream or not. |Source/Flow|@ref[scanAsync](Source-or-Flow/scanAsync.md)|Just like @ref[`scan`](Source-or-Flow/./scan.md) but receives a function that results in a @scala[`Future`] @java[`CompletionStage`] to the next value.| |Source/Flow|@ref[setup](Source-or-Flow/setup.md)|Defer the creation of a `Source/Flow` until materialization and access `Materializer` and `Attributes`| |Source/Flow|@ref[sliding](Source-or-Flow/sliding.md)|Provide a sliding window over the incoming stream and pass the windows as groups of elements downstream.| +|Source/Flow|@ref[statefulMap](Source-or-Flow/statefulMap.md)|Transform each stream element with the help of a state.| |Source/Flow|@ref[statefulMapConcat](Source-or-Flow/statefulMapConcat.md)|Transform each element into zero or more elements that are individually passed downstream.| |Source/Flow|@ref[take](Source-or-Flow/take.md)|Pass `n` incoming elements downstream and then complete| |Source/Flow|@ref[takeWhile](Source-or-Flow/takeWhile.md)|Pass elements downstream as long as a predicate function returns true and then complete. | @@ -548,6 +549,7 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [source](PubSub/source.md) * [splitAfter](Source-or-Flow/splitAfter.md) * [splitWhen](Source-or-Flow/splitWhen.md) +* [statefulMap](Source-or-Flow/statefulMap.md) * [statefulMapConcat](Source-or-Flow/statefulMapConcat.md) * [take](Source-or-Flow/take.md) * [takeLast](Sink/takeLast.md) diff --git a/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java b/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java new file mode 100644 index 0000000000..83afc4d48a --- /dev/null +++ b/akka-docs/src/test/java/jdocs/stream/operators/flow/StatefulMap.java @@ -0,0 +1,114 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package jdocs.stream.operators.flow; + +import akka.actor.typed.ActorSystem; +import akka.japi.Pair; +import akka.stream.javadsl.Flow; +import akka.stream.javadsl.Source; + +import java.util.*; +import java.util.stream.IntStream; + +public class StatefulMap { + static final ActorSystem system = null; + + public void indexed() { + // #zipWithIndex + Source.from(Arrays.asList("A", "B", "C", "D")) + .statefulMap( + () -> 0L, + (index, element) -> Pair.create(index + 1, Pair.create(element, index)), + indexOnComplete -> Optional.empty()) + .runForeach(System.out::println, system); + // prints + // Pair(A,0) + // Pair(B,1) + // Pair(C,2) + // Pair(D,3) + // #zipWithIndex + } + + public void bufferUntilChanged() { + // #bufferUntilChanged + Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D")) + .statefulMap( + () -> { + final List buffer = new LinkedList<>(); + return buffer; + }, + (buffer, element) -> { + if (buffer.size() > 0 && (!buffer.get(0).equals(element))) { + return Pair.create( + new LinkedList<>(Collections.singletonList(element)), + Collections.unmodifiableList(buffer)); + } else { + buffer.add(element); + return Pair.create(buffer, Collections.emptyList()); + } + }, + listOnComplete -> Optional.ofNullable(listOnComplete)) + .runForeach(System.out::println, system); + // prints + // [A] + // [B, B] + // [C, C, C] + // [D] + // #bufferUntilChanged + } + + public void distinctUntilChanged() { + // #distinctUntilChanged + Source.from(Arrays.asList("A", "B", "B", "C", "C", "C", "D")) + .statefulMap( + Optional::empty, + (lastElement, element) -> { + if (lastElement.isPresent() && lastElement.get().equals(element)) { + return Pair.create(lastElement, Optional.empty()); + } else { + return Pair.create(Optional.of(element), Optional.of(element)); + } + }, + listOnComplete -> Optional.empty()) + .via(Flow.flattenOptional()) + .runForeach(System.out::println, system); + // prints + // A + // B + // C + // D + // #distinctUntilChanged + } + + public void statefulMapConcatLike() { + // #statefulMapConcatLike + Source.fromJavaStream(() -> IntStream.rangeClosed(1, 10)) + .statefulMap( + () -> new ArrayList(3), + (list, element) -> { + list.add(element); + if (list.size() == 3) { + return Pair.create(new ArrayList(3), Collections.unmodifiableList(list)); + } else { + return Pair.create(list, Collections.emptyList()); + } + }, + listOnComplete -> Optional.ofNullable(listOnComplete)) + .mapConcat(list -> list) + .runForeach(System.out::println, system); + // prints + // 1 + // 2 + // 3 + // 4 + // 5 + // 6 + // 7 + // 8 + // 9 + // 10 + // #statefulMapConcatLike + } +} diff --git a/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala b/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala new file mode 100644 index 0000000000..2ca3cb050f --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/flow/StatefulMap.scala @@ -0,0 +1,94 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package docs.stream.operators.flow +import akka.actor.ActorSystem +import akka.stream.scaladsl.Source + +object StatefulMap { + + implicit val actorSystem: ActorSystem = ??? + + def indexed(): Unit = { + //#zipWithIndex + Source(List("A", "B", "C", "D")) + .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None) + .runForeach(println) + //prints + //(A,0) + //(B,1) + //(C,2) + //(D,3) + //#zipWithIndex + } + + def bufferUntilChanged(): Unit = { + //#bufferUntilChanged + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .statefulMap(() => List.empty[String])( + (buffer, element) => + buffer match { + case head :: _ if head != element => (element :: Nil, buffer) + case _ => (element :: buffer, Nil) + }, + buffer => Some(buffer)) + .filter(_.nonEmpty) + .runForeach(println) + //prints + //List(A) + //List(B, B) + //List(C, C, C) + //List(D) + //#bufferUntilChanged + } + + def distinctUntilChanged(): Unit = { + //#distinctUntilChanged + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .statefulMap(() => Option.empty[String])( + (lastElement, elem) => + lastElement match { + case Some(head) if head == elem => (Some(elem), None) + case _ => (Some(elem), Some(elem)) + }, + _ => None) + .collect { case Some(elem) => elem } + .runForeach(println) + //prints + //A + //B + //C + //D + //#distinctUntilChanged + } + + def statefulMapConcatLike(): Unit = { + //#statefulMapConcatLike + Source(1 to 10) + .statefulMap(() => List.empty[Int])( + (state, elem) => { + //grouped 3 elements into a list + val newState = elem :: state + if (newState.size == 3) + (Nil, newState.reverse) + else + (newState, Nil) + }, + state => Some(state.reverse)) + .mapConcat(identity) + .runForeach(println) + //prints + //1 + //2 + //3 + //4 + //5 + //6 + //7 + //8 + //9 + //10 + //#statefulMapConcatLike + } +} diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index 7456824ffe..bcca0f6da9 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -151,6 +151,34 @@ public class FlowTest extends StreamTest { probe.expectMsgEquals("2334445555"); } + @Test + public void mustBeAbleToUseStatefulMap() throws Exception { + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); + final Source source = Source.from(input); + final Flow flow = + Flow.of(Integer.class) + .statefulMap( + () -> new ArrayList(2), + (buffer, elem) -> { + if (buffer.size() == 2) { + final ArrayList group = new ArrayList<>(buffer); + buffer.clear(); + buffer.add(elem); + return Pair.create(buffer, group); + } else { + buffer.add(elem); + return Pair.create(buffer, Collections.emptyList()); + } + }, + Optional::ofNullable) + .filterNot(List::isEmpty) + .map(String::valueOf); + + final CompletionStage grouped = + source.via(flow).runFold("", (acc, elem) -> acc + elem, system); + Assert.assertEquals("[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseIntersperse() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java index 6f16370275..f6526aed71 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/SourceTest.java @@ -14,28 +14,26 @@ import akka.japi.function.*; import akka.japi.pf.PFBuilder; // #imports import akka.stream.*; - // #imports import akka.stream.scaladsl.FlowSpec; -import akka.util.ConstantFun; -import akka.stream.stage.*; -import akka.testkit.AkkaSpec; +import akka.stream.stage.AbstractInHandler; +import akka.stream.stage.AbstractOutHandler; +import akka.stream.stage.GraphStage; +import akka.stream.stage.GraphStageLogic; import akka.stream.testkit.TestPublisher; +import akka.testkit.AkkaJUnitActorSystemResource; +import akka.testkit.AkkaSpec; import akka.testkit.javadsl.TestKit; +import akka.util.ConstantFun; import com.google.common.collect.Iterables; import org.junit.Assert; import org.junit.ClassRule; import org.junit.Test; import scala.util.Try; -import akka.testkit.AkkaJUnitActorSystemResource; import java.time.Duration; import java.util.*; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.concurrent.*; import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; @@ -43,9 +41,10 @@ import java.util.stream.Stream; import static akka.NotUsed.notUsed; import static akka.stream.testkit.StreamTestKit.PublisherProbeSubscription; import static akka.stream.testkit.TestPublisher.ManualProbe; -import static org.hamcrest.core.Is.is; -import static org.junit.Assert.*; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; @SuppressWarnings("serial") public class SourceTest extends StreamTest { @@ -684,6 +683,56 @@ public class SourceTest extends StreamTest { probe.expectMsgEquals("2334445555"); } + @Test + public void mustBeAbleToUseStatefulMap() throws Exception { + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); + final CompletionStage grouped = + Source.from(input) + .statefulMap( + () -> new ArrayList(2), + (buffer, elem) -> { + if (buffer.size() == 2) { + final ArrayList group = new ArrayList<>(buffer); + buffer.clear(); + buffer.add(elem); + return Pair.create(buffer, group); + } else { + buffer.add(elem); + return Pair.create(buffer, Collections.emptyList()); + } + }, + Optional::ofNullable) + .filterNot(List::isEmpty) + .map(String::valueOf) + .runFold("", (acc, elem) -> acc + elem, system); + Assert.assertEquals("[1, 2][3, 4][5]", grouped.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + + @Test + public void mustBeAbleToUseStatefulMapAsDistinctUntilChanged() throws Exception { + final java.lang.Iterable input = Arrays.asList(1, 1, 1, 2, 3, 3, 3, 4, 5, 5, 5); + final CompletionStage result = + Source.from(input) + .statefulMap( + Optional::empty, + (buffer, elem) -> { + if (buffer.isPresent()) { + if (buffer.get().equals(elem)) { + return Pair.create(buffer, Optional.empty()); + } else { + return Pair.create(Optional.of(elem), Optional.of(elem)); + } + } else { + return Pair.create(Optional.of(elem), Optional.of(elem)); + } + }, + last -> Optional.empty()) + .filter(Optional::isPresent) + .map(Optional::get) + .runFold("", (acc, elem) -> acc + elem, system); + Assert.assertEquals("12345", result.toCompletableFuture().get(3, TimeUnit.SECONDS)); + } + @Test public void mustBeAbleToUseIntersperse() throws Exception { final TestKit probe = new TestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStatefulMapSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStatefulMapSpec.scala new file mode 100644 index 0000000000..c1ae274055 --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStatefulMapSpec.scala @@ -0,0 +1,269 @@ +/* + * Copyright (C) 2022 Lightbend Inc. + */ + +package akka.stream.scaladsl + +import akka.Done +import akka.stream.AbruptStageTerminationException +import akka.stream.ActorAttributes +import akka.stream.ActorMaterializer +import akka.stream.Supervision +import akka.stream.testkit.StreamSpec +import akka.stream.testkit.TestSubscriber +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.testkit.scaladsl.TestSource + +import scala.annotation.nowarn +import scala.concurrent.Await +import scala.concurrent.Promise +import scala.concurrent.duration.DurationInt +import scala.util.Success +import scala.util.control.NoStackTrace + +class FlowStatefulMapSpec extends StreamSpec { + + val ex = new Exception("TEST") with NoStackTrace + + "A StatefulMap" must { + "work in the happy case" in { + val sinkProb = Source(List(1, 2, 3, 4, 5)) + .statefulMap(() => 0)((agg, elem) => { + (agg + elem, (agg, elem)) + }, _ => None) + .runWith(TestSink.probe[(Int, Int)]) + sinkProb.expectSubscription().request(6) + sinkProb + .expectNext((0, 1)) + .expectNext((1, 2)) + .expectNext((3, 3)) + .expectNext((6, 4)) + .expectNext((10, 5)) + .expectComplete() + } + + "can remember the state when complete" in { + val sinkProb = Source(1 to 10) + .statefulMap(() => List.empty[Int])( + (state, elem) => { + //grouped 3 elements into a list + val newState = elem :: state + if (newState.size == 3) + (Nil, newState.reverse) + else + (newState, Nil) + }, + state => Some(state.reverse)) + .mapConcat(identity) + .runWith(TestSink.probe[Int]) + sinkProb.expectSubscription().request(10) + sinkProb.expectNextN(List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)).expectComplete() + } + + "be able to resume" in { + val testSink = Source(List(1, 2, 3, 4, 5)) + .statefulMap(() => 0)((agg, elem) => { + if (elem % 2 == 0) + throw ex + else + (agg + elem, (agg, elem)) + }, _ => None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink.probe[(Int, Int)]) + + testSink.expectSubscription().request(5) + testSink.expectNext((0, 1)).expectNext((1, 3)).expectNext((4, 5)).expectComplete() + } + + "be able to restart" in { + val testSink = Source(List(1, 2, 3, 4, 5)) + .statefulMap(() => 0)((agg, elem) => { + if (elem % 3 == 0) + throw ex + else + (agg + elem, (agg, elem)) + }, _ => None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink.probe[(Int, Int)]) + + testSink.expectSubscription().request(5) + testSink.expectNext((0, 1)).expectNext((1, 2)).expectNext((0, 4)).expectNext((4, 5)).expectComplete() + } + + "be able to stop" in { + val testSink = Source(List(1, 2, 3, 4, 5)) + .statefulMap(() => 0)((agg, elem) => { + if (elem % 3 == 0) + throw ex + else + (agg + elem, (agg, elem)) + }, _ => None) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.stoppingDecider)) + .runWith(TestSink.probe[(Int, Int)]) + + testSink.expectSubscription().request(5) + testSink.expectNext((0, 1)).expectNext((1, 2)).expectError(ex) + } + + "fail on upstream failure" in { + val (testSource, testSink) = TestSource + .probe[Int] + .statefulMap(() => 0)((agg, elem) => { + (agg + elem, (agg, elem)) + }, _ => None) + .toMat(TestSink.probe[(Int, Int)])(Keep.both) + .run() + + testSink.expectSubscription().request(5) + testSource.sendNext(1) + testSink.expectNext((0, 1)) + testSource.sendNext(2) + testSink.expectNext((1, 2)) + testSource.sendNext(3) + testSink.expectNext((3, 3)) + testSource.sendNext(4) + testSink.expectNext((6, 4)) + testSource.sendError(ex) + testSink.expectError(ex) + } + + "defer upstream failure and remember state" in { + val (testSource, testSink) = TestSource + .probe[Int] + .statefulMap(() => 0)((agg, elem) => { (agg + elem, (agg, elem)) }, (state: Int) => Some((state, -1))) + .toMat(TestSink.probe[(Int, Int)])(Keep.both) + .run() + + testSink.expectSubscription().request(5) + testSource.sendNext(1) + testSink.expectNext((0, 1)) + testSource.sendNext(2) + testSink.expectNext((1, 2)) + testSource.sendNext(3) + testSink.expectNext((3, 3)) + testSource.sendNext(4) + testSink.expectNext((6, 4)) + testSource.sendError(ex) + testSink.expectNext((10, -1)) + testSink.expectError(ex) + } + + "cancel upstream when downstream cancel" in { + val promise = Promise[Done]() + val testSource = TestSource + .probe[Int] + .statefulMap(() => 100)((agg, elem) => { + (agg + elem, (agg, elem)) + }, (state: Int) => { + promise.complete(Success(Done)) + Some((state, -1)) + }) + .toMat(Sink.cancelled)(Keep.left) + .run() + testSource.expectSubscription().expectCancellation() + Await.result(promise.future, 3.seconds) shouldBe Done + } + + "cancel upstream when downstream fail" in { + val promise = Promise[Done]() + val testProb = TestSubscriber.probe[(Int, Int)]() + val testSource = TestSource + .probe[Int] + .statefulMap(() => 100)((agg, elem) => { + (agg + elem, (agg, elem)) + }, (state: Int) => { + promise.complete(Success(Done)) + Some((state, -1)) + }) + .toMat(Sink.fromSubscriber(testProb))(Keep.left) + .run() + testProb.cancel(ex) + testSource.expectCancellationWithCause(ex) + Await.result(promise.future, 3.seconds) shouldBe Done + } + + "call its onComplete callback on abrupt materializer termination" in { + @nowarn("msg=deprecated") + val mat = ActorMaterializer() + val promise = Promise[Done]() + + val matVal = Source + .single(1) + .statefulMap(() => -1)((_, elem) => (elem, elem), _ => { + promise.complete(Success(Done)) + None + }) + .runWith(Sink.never)(mat) + mat.shutdown() + matVal.failed.futureValue shouldBe a[AbruptStageTerminationException] + Await.result(promise.future, 3.seconds) shouldBe Done + } + + "call its onComplete callback when stop" in { + val promise = Promise[Done]() + Source + .single(1) + .statefulMap(() => -1)((_, elem) => { + throw ex + (elem, elem) + }, _ => { + promise.complete(Success(Done)) + None + }) + .runWith(Sink.ignore) + Await.result(promise.future, 3.seconds) shouldBe Done + } + + "be able to be used as zipWithIndex" in { + Source(List("A", "B", "C", "D")) + .statefulMap(() => 0L)((index, elem) => (index + 1, (elem, index)), _ => None) + .runWith(TestSink.probe[(String, Long)]) + .request(4) + .expectNext(("A", 0L)) + .expectNext(("B", 1L)) + .expectNext(("C", 2L)) + .expectNext(("D", 3L)) + .expectComplete() + } + + "be able to be used as bufferUntilChanged" in { + val sink = TestSink.probe[List[String]] + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .statefulMap(() => List.empty[String])( + (buffer, elem) => + buffer match { + case head :: _ if head != elem => (elem :: Nil, buffer) + case _ => (elem :: buffer, Nil) + }, + buffer => Some(buffer)) + .filter(_.nonEmpty) + .alsoTo(Sink.foreach(println)) + .runWith(sink) + .request(4) + .expectNext(List("A")) + .expectNext(List("B", "B")) + .expectNext(List("C", "C", "C")) + .expectNext(List("D")) + .expectComplete() + } + + "be able to be used as distinctUntilChanged" in { + Source("A" :: "B" :: "B" :: "C" :: "C" :: "C" :: "D" :: Nil) + .statefulMap(() => Option.empty[String])( + (lastElement, elem) => + lastElement match { + case Some(head) if head == elem => (Some(elem), None) + case _ => (Some(elem), Some(elem)) + }, + _ => None) + .collect({ case Some(elem) => elem }) + .runWith(TestSink.probe[String]) + .request(4) + .expectNext("A") + .expectNext("B") + .expectNext("C") + .expectNext("D") + .expectComplete() + } + } +} diff --git a/akka-stream/src/main/scala/akka/stream/Attributes.scala b/akka-stream/src/main/scala/akka/stream/Attributes.scala index b0c2093188..c1d4515a42 100644 --- a/akka-stream/src/main/scala/akka/stream/Attributes.scala +++ b/akka-stream/src/main/scala/akka/stream/Attributes.scala @@ -7,10 +7,13 @@ package akka.stream import java.net.URLEncoder import java.time.Duration import java.util.Optional + import scala.annotation.tailrec import scala.compat.java8.OptionConverters._ import scala.concurrent.duration.FiniteDuration import scala.reflect.{ classTag, ClassTag } +import scala.util.control.NonFatal + import akka.annotation.ApiMayChange import akka.annotation.DoNotInherit import akka.annotation.InternalApi @@ -21,8 +24,6 @@ import akka.util.{ ByteString, OptionVal } import akka.util.JavaDurationConverters._ import akka.util.LineNumbers -import scala.util.control.NonFatal - /** * Holds attributes which can be used to alter [[akka.stream.scaladsl.Flow]] / [[akka.stream.javadsl.Flow]] * or [[akka.stream.scaladsl.GraphDSL]] / [[akka.stream.javadsl.GraphDSL]] materialization. diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index b45db7e4ef..d5a396a4da 100755 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -53,6 +53,7 @@ import akka.stream.Attributes._ val batch = name("batch") val batchWeighted = name("batchWeighted") val expand = name("expand") + val statefulMap = name("statefulMap") val statefulMapConcat = name("statefulMapConcat") val detacher = name("detacher") val groupBy = name("groupBy") diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 3ff5da35cf..06b87f90fb 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -5,31 +5,33 @@ package akka.stream.impl.fusing import java.util.concurrent.TimeUnit.NANOSECONDS -import akka.actor.{ ActorRef, Terminated } -import akka.annotation.{ DoNotInherit, InternalApi } -import akka.event.Logging.LogLevel -import akka.event._ -import akka.stream.ActorAttributes.SupervisionStrategy -import akka.stream.Attributes.SourceLocation -import akka.stream.Attributes.{ InputBuffer, LogLevels } -import akka.stream.OverflowStrategies._ -import akka.stream.impl.Stages.DefaultAttributes -import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage -import akka.stream.impl.{ ReactiveStreamsCompliance, Buffer => BufferImpl, ContextPropagation } -import akka.stream.scaladsl.{ DelayStrategy, Source } -import akka.stream.stage._ -import akka.stream.{ Supervision, _ } -import akka.util.{ unused, OptionVal } -import scala.annotation.nowarn +import scala.annotation.nowarn import scala.annotation.tailrec import scala.collection.immutable import scala.collection.immutable.VectorBuilder import scala.concurrent.Future import scala.concurrent.duration.{ FiniteDuration, _ } -import scala.util.control.Exception.Catcher -import scala.util.control.{ NoStackTrace, NonFatal } import scala.util.{ Failure, Success, Try } +import scala.util.control.{ NoStackTrace, NonFatal } +import scala.util.control.Exception.Catcher + +import akka.actor.{ ActorRef, Terminated } +import akka.annotation.{ DoNotInherit, InternalApi } +import akka.event._ +import akka.event.Logging.LogLevel +import akka.stream.{ Supervision, _ } +import akka.stream.ActorAttributes.SupervisionStrategy +import akka.stream.Attributes.{ InputBuffer, LogLevels } +import akka.stream.Attributes.SourceLocation +import akka.stream.OverflowStrategies._ +import akka.stream.Supervision.Decider +import akka.stream.impl.{ ContextPropagation, ReactiveStreamsCompliance, Buffer => BufferImpl } +import akka.stream.impl.Stages.DefaultAttributes +import akka.stream.impl.fusing.GraphStages.SimpleLinearGraphStage +import akka.stream.scaladsl.{ DelayStrategy, Source } +import akka.stream.stage._ +import akka.util.{ unused, OptionVal } import akka.util.ccompat._ /** @@ -2196,6 +2198,97 @@ private[stream] object Collect { override def toString: String = "RecoverWith" } +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class StatefulMap[S, In, Out](create: () => S, f: (S, In) => (S, Out), onComplete: S => Option[Out]) + extends GraphStage[FlowShape[In, Out]] { + require(create != null, "create function should not be null") + require(f != null, "f function should not be null") + require(onComplete != null, "onComplete function should not be null") + + private val in = Inlet[In]("StatefulMap.in") + private val out = Outlet[Out]("StatefulMap.out") + override val shape: FlowShape[In, Out] = FlowShape(in, out) + + override protected def initialAttributes: Attributes = DefaultAttributes.statefulMap and SourceLocation.forLambda(f) + + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = + new GraphStageLogic(shape) with InHandler with OutHandler { + lazy val decider: Decider = inheritedAttributes.mandatoryAttribute[SupervisionStrategy].decider + + private var state: S = _ + private var needInvokeOnCompleteCallback: Boolean = false + + override def preStart(): Unit = { + state = create() + needInvokeOnCompleteCallback = true + } + + override def onPush(): Unit = + try { + val elem = grab(in) + val (newState, newElem) = f(state, elem) + state = newState + push(out, newElem) + } catch { + case NonFatal(ex) => + decider(ex) match { + case Supervision.Stop => closeStateAndFail(ex) + case Supervision.Resume => pull(in) + case Supervision.Restart => resetStateAndPull() + } + } + + override def onUpstreamFinish(): Unit = closeStateAndComplete() + + override def onUpstreamFailure(ex: Throwable): Unit = closeStateAndFail(ex) + + override def onDownstreamFinish(cause: Throwable): Unit = { + onComplete(state) + needInvokeOnCompleteCallback = false + super.onDownstreamFinish(cause) + } + + private def resetStateAndPull(): Unit = { + needInvokeOnCompleteCallback = false + onComplete(state) + state = create() + needInvokeOnCompleteCallback = true; + pull(in) + } + + private def closeStateAndComplete(): Unit = { + onComplete(state) match { + case Some(elem) => emit(out, elem, () => completeStage()) + case None => completeStage() + } + needInvokeOnCompleteCallback = false + } + + private def closeStateAndFail(ex: Throwable): Unit = { + onComplete(state) match { + case Some(elem) => emit(out, elem, () => failStage(ex)) + case None => failStage(ex) + } + needInvokeOnCompleteCallback = false + } + + override def onPull(): Unit = pull(in) + + override def postStop(): Unit = { + if (needInvokeOnCompleteCallback) { + onComplete(state) + } + } + + setHandlers(in, out, this) + } + + override def toString = "StatefulMap" +} + /** * INTERNAL API */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index 07e1d10f99..67c9c5f708 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -9,12 +9,16 @@ import java.util.Optional import java.util.concurrent.CompletionStage import java.util.function.BiFunction import java.util.function.Supplier + +import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import scala.annotation.{ nowarn, varargs } + import org.reactivestreams.Processor + import akka.Done import akka.NotUsed import akka.actor.ActorRef @@ -705,6 +709,45 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr Util.immutableSeq(f(elem)) }) + /** + * Transform each stream element with the help of a state. + * + * The state creation function is invoked once when the stream is materialized and the returned state is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, + * be a new immutable state but it is also safe to use a mutable state. The returned `T` MUST NOT be `null` as it is + * illegal as stream element - according to the Reactive Streams specification. + * + * For stateless variant see [[map]]. + * + * The `onComplete` function is called only once when the upstream or downstream finished, You can do some clean-up here, + * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam S the type of the state + * @tparam T the type of the output elements + * @param create a function that creates the initial state + * @param f a function that transforms the upstream element and the state into a pair of next state and output element + * @param onComplete a function that transforms the ongoing state into an optional output element + */ + def statefulMap[S, T]( + create: function.Creator[S], + f: function.Function2[S, Out, Pair[S, T]], + onComplete: function.Function[S, Optional[T]]): javadsl.Flow[In, T, Mat] = + new Flow( + delegate.statefulMap(() => create.create())( + (s: S, out: Out) => f.apply(s, out).toScala, + (s: S) => onComplete.apply(s).asScala)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, @@ -1112,7 +1155,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.scan]] + * See also [[#scan]] */ def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.scanAsync(zero) { (out, in) => diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index baa28ce1a1..c5f6a25c0c 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -8,14 +8,18 @@ import java.util import java.util.Optional import java.util.concurrent.{ CompletableFuture, CompletionStage } import java.util.function.{ BiFunction, Supplier } + +import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.collection.immutable import scala.compat.java8.FutureConverters._ -import scala.compat.java8.OptionConverters._ +import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.{ Future, Promise } import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag + import org.reactivestreams.{ Publisher, Subscriber } + import akka.{ Done, NotUsed } import akka.actor.{ ActorRef, Cancellable, ClassicActorSystemProvider } import akka.annotation.ApiMayChange @@ -29,8 +33,6 @@ import akka.util.{ unused, _ } import akka.util.JavaDurationConverters._ import akka.util.ccompat.JavaConverters._ -import scala.annotation.{ nowarn, varargs } - /** Java API */ object Source { private[this] val _empty = new Source[Any, NotUsed](scaladsl.Source.empty) @@ -2303,6 +2305,45 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] = new Source(delegate.mapConcat(elem => Util.immutableSeq(f.apply(elem)))) + /** + * Transform each stream element with the help of a state. + * + * The state creation function is invoked once when the stream is materialized and the returned state is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, + * be a new immutable state but it is also safe to use a mutable state. The returned `T` MUST NOT be `null` as it is + * illegal as stream element - according to the Reactive Streams specification. + * + * For stateless variant see [[map]]. + * + * The `onComplete` function is called only once when the upstream or downstream finished, You can do some clean-up here, + * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam S the type of the state + * @tparam T the type of the output elements + * @param create a function that creates the initial state + * @param f a function that transforms the upstream element and the state into a pair of next state and output element + * @param onComplete a function that transforms the ongoing state into an optional output element + */ + def statefulMap[S, T]( + create: function.Creator[S], + f: function.Function2[S, Out, Pair[S, T]], + onComplete: function.Function[S, Optional[T]]): javadsl.Source[T, Mat] = + new Source( + delegate.statefulMap(() => create.create())( + (s: S, out: Out) => f.apply(s, out).toScala, + (s: S) => onComplete.apply(s).asScala)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, @@ -2707,7 +2748,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.scan]] + * See also [[FlowOps#scan]] */ def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): javadsl.Source[T, Mat] = new Source(delegate.scanAsync(zero) { (out, in) => diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 11a59da8c3..3fed0a9439 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -4,14 +4,17 @@ package akka.stream.javadsl -import java.util.Comparator +import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage import java.util.function.Supplier + +import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import scala.annotation.{ nowarn, varargs } + import akka.NotUsed import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } @@ -192,6 +195,45 @@ class SubFlow[In, Out, Mat]( Util.immutableSeq(f(elem)) }) + /** + * Transform each stream element with the help of a state. + * + * The state creation function is invoked once when the stream is materialized and the returned state is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, + * be a new immutable state but it is also safe to use a mutable state. The returned `T` MUST NOT be `null` as it is + * illegal as stream element - according to the Reactive Streams specification. + * + * For stateless variant see [[map]]. + * + * The `onComplete` function is called only once when the upstream or downstream finished, You can do some clean-up here, + * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam S the type of the state + * @tparam T the type of the output elements + * @param create a function that creates the initial state + * @param f a function that transforms the upstream element and the state into a pair of next state and output element + * @param onComplete a function that transforms the ongoing state into an optional output element + */ + def statefulMap[S, T]( + create: function.Creator[S], + f: function.Function2[S, Out, Pair[S, T]], + onComplete: function.Function[S, Optional[T]]): javadsl.SubFlow[In, T, Mat] = + new SubFlow( + delegate.statefulMap(() => create.create())( + (s: S, out: Out) => f.apply(s, out).toScala, + (s: S) => onComplete.apply(s).asScala)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, @@ -520,7 +562,7 @@ class SubFlow[In, Out, Mat]( * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.scan]] + * See also [[#scan]] */ def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubFlow[In, T, Mat] = new SubFlow(delegate.scanAsync(zero) { (out, in) => diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index 9dd35d1370..d5387f6f55 100755 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -4,14 +4,17 @@ package akka.stream.javadsl -import java.util.Comparator +import java.util.{ Comparator, Optional } import java.util.concurrent.CompletionStage import java.util.function.Supplier + +import scala.annotation.{ nowarn, varargs } import scala.annotation.unchecked.uncheckedVariance import scala.compat.java8.FutureConverters._ +import scala.compat.java8.OptionConverters.RichOptionalGeneric import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag -import scala.annotation.{ nowarn, varargs } + import akka.NotUsed import akka.annotation.ApiMayChange import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } @@ -183,6 +186,45 @@ class SubSource[Out, Mat]( Util.immutableSeq(f(elem)) }) + /** + * Transform each stream element with the help of a state. + * + * The state creation function is invoked once when the stream is materialized and the returned state is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, + * be a new immutable state but it is also safe to use a mutable state. The returned `T` MUST NOT be `null` as it is + * illegal as stream element - according to the Reactive Streams specification. + * + * For stateless variant see [[map]]. + * + * The `onComplete` function is called only once when the upstream or downstream finished, You can do some clean-up here, + * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam S the type of the state + * @tparam T the type of the output elements + * @param create a function that creates the initial state + * @param f a function that transforms the upstream element and the state into a pair of next state and output element + * @param onComplete a function that transforms the ongoing state into an optional output element + */ + def statefulMap[S, T]( + create: function.Creator[S], + f: function.Function2[S, Out, Pair[S, T]], + onComplete: function.Function[S, Optional[T]]): javadsl.SubSource[T, Mat] = + new SubSource( + delegate.statefulMap(() => create.create())( + (s: S, out: Out) => f.apply(s, out).toScala, + (s: S) => onComplete.apply(s).asScala)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful, @@ -511,7 +553,7 @@ class SubSource[Out, Mat]( * * '''Cancels when''' downstream cancels * - * See also [[FlowOps.scan]] + * See also [[#scan]] */ def scanAsync[T](zero: T)(f: function.Function2[T, Out, CompletionStage[T]]): SubSource[T, Mat] = new SubSource(delegate.scanAsync(zero) { (out, in) => diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index 0a2fbec82c..e72cf027af 100755 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -10,15 +10,16 @@ import scala.collection.immutable import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration import scala.reflect.ClassTag + import org.reactivestreams.{ Processor, Publisher, Subscriber, Subscription } + import akka.Done import akka.NotUsed import akka.actor.ActorRef import akka.annotation.{ ApiMayChange, DoNotInherit } import akka.event.{ LogMarker, LoggingAdapter, MarkerLoggingAdapter } -import akka.stream.Attributes.SourceLocation import akka.stream._ -import akka.stream.impl.SingleConcat +import akka.stream.Attributes.SourceLocation import akka.stream.impl.{ fusing, LinearTraversalBuilder, @@ -29,11 +30,12 @@ import akka.stream.impl.{ Timers, TraversalBuilder } +import akka.stream.impl.SingleConcat import akka.stream.impl.fusing._ import akka.stream.impl.fusing.FlattenMerge import akka.stream.stage._ -import akka.util.OptionVal import akka.util.{ ConstantFun, Timeout } +import akka.util.OptionVal import akka.util.ccompat._ /** @@ -800,6 +802,7 @@ final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBui @ccompatUsedUntil213 trait FlowOps[+Out, +Mat] { import GraphDSL.Implicits._ + import akka.stream.impl.Stages._ type Repr[+O] <: FlowOps[O, Mat] { @@ -988,6 +991,39 @@ trait FlowOps[+Out, +Mat] { */ def mapConcat[T](f: Out => IterableOnce[T]): Repr[T] = statefulMapConcat(() => f) + /** + * Transform each stream element with the help of a state. + * + * The state creation function is invoked once when the stream is materialized and the returned state is passed to + * the mapping function for mapping the first element. The mapping function returns a mapped element to emit + * downstream and a state to pass to the next mapping function. The state can be the same for each mapping return, + * be a new immutable state but it is also safe to use a mutable state. The returned `T` MUST NOT be `null` as it is + * illegal as stream element - according to the Reactive Streams specification. + * + * For stateless variant see [[FlowOps.map]]. + * + * The `onComplete` function is called only once when the upstream or downstream finished, You can do some clean-up here, + * and if the returned value is not empty, it will be emitted to the downstream if available, otherwise the value will be dropped. + * + * Adheres to the [[ActorAttributes.SupervisionStrategy]] attribute. + * + * '''Emits when''' the mapping function returns an element and downstream is ready to consume it + * + * '''Backpressures when''' downstream backpressures + * + * '''Completes when''' upstream completes + * + * '''Cancels when''' downstream cancels + * + * @tparam S the type of the state + * @tparam T the type of the output elements + * @param create a function that creates the initial state + * @param f a function that transforms the upstream element and the state into a pair of next state and output element + * @param onComplete a function that transforms the ongoing state into an optional output element + */ + def statefulMap[S, T](create: () => S)(f: (S, Out) => (S, T), onComplete: S => Option[T]): Repr[T] = + via(new StatefulMap[S, Out, T](create, f, onComplete)) + /** * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. The transformation is meant to be stateful,