diff --git a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java index fbfc4379ab..83bd3acbb0 100644 --- a/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java +++ b/akka-stream-tests/src/test/java/akka/stream/javadsl/FlowTest.java @@ -99,6 +99,33 @@ public class FlowTest extends StreamTest { future.toCompletableFuture().get(200, TimeUnit.MILLISECONDS); } + @Test + public void mustBeAbleToUseStatefullMaponcat() throws Exception { + final JavaTestKit probe = new JavaTestKit(system); + final java.lang.Iterable input = Arrays.asList(1, 2, 3, 4, 5); + final Source ints = Source.from(input); + final Flow flow = Flow.of(Integer.class).statefulMapConcat( + new Creator>>() { + public Function> create() { + int[] state = new int[1]; + state[0] = 0; + return new Function>() { + public List apply(Integer elem) { + List list = new ArrayList<>(Collections.nCopies(state[0], elem)); + state[0] = elem; + return list; + } + }; + } + }); + + ints.via(flow) + .runFold("", (acc, elem) -> acc + elem, materializer) + .thenAccept(elem -> probe.getRef().tell(elem, ActorRef.noSender())); + + probe.expectMsgEquals("2334445555"); + } + @Test public void mustBeAbleToUseIntersperse() throws Exception { final JavaTestKit probe = new JavaTestKit(system); diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala index 90f08118e4..0fdcdec358 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/InterpreterSupervisionSpec.scala @@ -303,26 +303,6 @@ class InterpreterSupervisionSpec extends AkkaSpec with GraphInterpreterSpecKit { lastEvents() should be(Set(OnNext(3))) } - "resume when MapConcat throws" in new OneBoundedSetup[Int](Seq( - MapConcat((x: Int) ⇒ if (x == 0) throw TE else List(x, -x), resumingDecider))) { - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(1) - lastEvents() should be(Set(OnNext(1))) - downstream.requestOne() - lastEvents() should be(Set(OnNext(-1))) - - downstream.requestOne() - lastEvents() should be(Set(RequestOne)) - upstream.onNext(0) // boom - lastEvents() should be(Set(RequestOne)) - - upstream.onNext(2) - lastEvents() should be(Set(OnNext(2))) - downstream.requestOne() - lastEvents() should be(Set(OnNext(-2))) - } - "restart when Collect throws" in { // TODO can't get type inference to work with `pf` inlined val pf: PartialFunction[Int, Int] = diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala index 7156dfaba6..93bf0d93f1 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowMapConcatSpec.scala @@ -3,16 +3,18 @@ */ package akka.stream.scaladsl -import scala.concurrent.duration._ -import akka.stream.ActorMaterializerSettings -import akka.stream.testkit._ +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ Supervision, ActorAttributes, ActorMaterializer, ActorMaterializerSettings } import akka.stream.testkit.Utils._ -import akka.stream.ActorMaterializer +import akka.stream.testkit._ + +import scala.util.control.NoStackTrace class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { val settings = ActorMaterializerSettings(system) .withInputBuffer(initialSize = 2, maxSize = 16) + implicit val materializer = ActorMaterializer(settings) "A MapConcat" must { @@ -27,19 +29,24 @@ class FlowMapConcatSpec extends AkkaSpec with ScriptedTest { TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.mapConcat(x ⇒ (1 to x) map (_ ⇒ x)))) } - "map and concat grouping with slow downstream" in { - val settings = ActorMaterializerSettings(system) - .withInputBuffer(initialSize = 2, maxSize = 2) - implicit val materializer = ActorMaterializer(settings) - assertAllStagesStopped { - val s = TestSubscriber.manualProbe[Int] - val input = (1 to 20).grouped(5).toList - Source(input).mapConcat(identity).map(x ⇒ { Thread.sleep(10); x }).runWith(Sink.fromSubscriber(s)) - val sub = s.expectSubscription() - sub.request(100) - for (i ← 1 to 20) s.expectNext(i) - s.expectComplete() - } + "map and concat grouping with slow downstream" in assertAllStagesStopped { + val s = TestSubscriber.manualProbe[Int] + val input = (1 to 20).grouped(5).toList + Source(input).mapConcat(identity).map(x ⇒ { Thread.sleep(10); x }).runWith(Sink.fromSubscriber(s)) + val sub = s.expectSubscription() + sub.request(100) + for (i ← 1 to 20) s.expectNext(i) + s.expectComplete() + } + + "be able to resume" in assertAllStagesStopped { + val ex = new Exception("TEST") with NoStackTrace + + Source(1 to 5).mapConcat(x ⇒ if (x == 3) throw ex else List(x)) + .withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink.probe[Int]) + .request(4).expectNext(1, 2, 4, 5) + .expectComplete() } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStatefulMapConcatSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStatefulMapConcatSpec.scala new file mode 100644 index 0000000000..d0fa85817a --- /dev/null +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowStatefulMapConcatSpec.scala @@ -0,0 +1,85 @@ +/** + * Copyright (C) 2016 Typesafe Inc. + */ +package akka.stream.scaladsl + +import akka.stream.testkit.scaladsl.TestSink +import akka.stream.{ ActorMaterializer, ActorAttributes, Supervision, ActorMaterializerSettings } +import akka.stream.testkit._ + +import scala.util.control.NoStackTrace + +class FlowStatefulMapConcatSpec extends AkkaSpec with ScriptedTest { + + val settings = ActorMaterializerSettings(system) + .withInputBuffer(initialSize = 2, maxSize = 16) + implicit val materializer = ActorMaterializer(settings) + val ex = new Exception("TEST") with NoStackTrace + + "A StatefulMapConcat" must { + + "work in happy case" in { + val script = Script( + Seq(2) -> Seq(), + Seq(1) -> Seq(1, 1), + Seq(3) -> Seq(3), + Seq(6) -> Seq(6, 6, 6)) + TestConfig.RandomTestRange foreach (_ ⇒ runScript(script, settings)(_.statefulMapConcat(() ⇒ { + var prev: Option[Int] = None + x ⇒ prev match { + case Some(e) ⇒ + prev = Some(x) + (1 to e) map (_ ⇒ x) + case None ⇒ + prev = Some(x) + List.empty[Int] + } + }))) + } + + "be able to restart" in { + Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() ⇒ { + var prev: Option[Int] = None + x ⇒ { + if (x % 3 == 0) throw ex + prev match { + case Some(e) ⇒ + prev = Some(x) + (1 to e) map (_ ⇒ x) + case None ⇒ + prev = Some(x) + List.empty[Int] + } + } + }).withAttributes(ActorAttributes.supervisionStrategy(Supervision.restartingDecider)) + .runWith(TestSink.probe[Int]) + .request(2).expectNext(1, 1) + .request(4).expectNext(1, 1, 1, 1) + .expectComplete() + } + + "be able to resume" in { + Source(List(2, 1, 3, 4, 1)).statefulMapConcat(() ⇒ { + var prev: Option[Int] = None + x ⇒ { + if (x % 3 == 0) throw ex + prev match { + case Some(e) ⇒ + prev = Some(x) + (1 to e) map (_ ⇒ x) + case None ⇒ + prev = Some(x) + List.empty[Int] + } + } + }).withAttributes(ActorAttributes.supervisionStrategy(Supervision.resumingDecider)) + .runWith(TestSink.probe[Int]) + .request(2).expectNext(1, 1) + .requestNext(4) + .request(4).expectNext(1, 1, 1, 1) + .expectComplete() + } + + } + +} diff --git a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala index 9eaa14261a..8726d633e2 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/Stages.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/Stages.scala @@ -52,7 +52,7 @@ private[stream] object Stages { val batch = name("batch") val batchWeighted = name("batchWeighted") val expand = name("expand") - val mapConcat = name("mapConcat") + val statefulMapConcat = name("statefulMapConcat") val detacher = name("detacher") val groupBy = name("groupBy") val prefixAndTail = name("prefixAndTail") @@ -206,10 +206,6 @@ private[stream] object Stages { override def create(attr: Attributes): Stage[T, T] = fusing.Buffer(size, overflowStrategy) } - final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], attributes: Attributes = mapConcat) extends SymbolicStage[In, Out] { - override def create(attr: Attributes): Stage[In, Out] = fusing.MapConcat(f, supervision(attr)) - } - // FIXME: These are not yet proper stages, therefore they use the deprecated StageModule infrastructure final case class GroupBy(maxSubstreams: Int, f: Any ⇒ Any, attributes: Attributes = groupBy) extends StageModule { diff --git a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala index 2a5d17d0d1..3e69e6309c 100644 --- a/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala +++ b/akka-stream/src/main/scala/akka/stream/impl/fusing/Ops.scala @@ -21,6 +21,7 @@ import scala.util.{ Failure, Success, Try } import akka.stream.ActorAttributes.SupervisionStrategy import scala.concurrent.duration.{ FiniteDuration, _ } import akka.stream.impl.Stages.DefaultAttributes +import akka.NotUsed /** * INTERNAL API @@ -121,39 +122,6 @@ private[akka] final case class Recover[T](pf: PartialFunction[Throwable, T]) ext } -/** - * INTERNAL API - */ -private[akka] final case class MapConcat[In, Out](f: In ⇒ immutable.Iterable[Out], decider: Supervision.Decider) extends PushPullStage[In, Out] { - private var currentIterator: Iterator[Out] = Iterator.empty - - override def onPush(elem: In, ctx: Context[Out]): SyncDirective = { - currentIterator = f(elem).iterator - if (!currentIterator.hasNext) ctx.pull() - else ctx.push(currentIterator.next()) - } - - override def onPull(ctx: Context[Out]): SyncDirective = - if (ctx.isFinishing) { - if (currentIterator.hasNext) { - val elem = currentIterator.next() - if (currentIterator.hasNext) ctx.push(elem) - else ctx.pushAndFinish(elem) - } else ctx.finish() - } else { - if (currentIterator.hasNext) ctx.push(currentIterator.next()) - else ctx.pull() - } - - override def onUpstreamFinish(ctx: Context[Out]): TerminationDirective = - if (currentIterator.hasNext) ctx.absorbTermination() - else ctx.finish() - - override def decide(t: Throwable): Supervision.Directive = decider(t) - - override def restart(): MapConcat[In, Out] = copy() -} - /** * INTERNAL API */ @@ -1168,4 +1136,56 @@ private[stream] final class RecoverWith[T, M](pf: PartialFunction[Throwable, Gra } override def toString: String = "RecoverWith" +} + +/** + * INTERNAL API + */ +private[stream] final class StatefulMapConcat[In, Out](f: () ⇒ In ⇒ immutable.Iterable[Out]) extends GraphStage[FlowShape[In, Out]] { + val in = Inlet[In]("StatefulMapConcat.in") + val out = Outlet[Out]("StatefulMapConcat.out") + override val shape = FlowShape(in, out) + override def initialAttributes: Attributes = DefaultAttributes.statefulMapConcat + + def createLogic(inheritedAttributes: Attributes) = new GraphStageLogic(shape) with InHandler with OutHandler { + val decider = inheritedAttributes.get[SupervisionStrategy].map(_.decider).getOrElse(Supervision.stoppingDecider) + var currentIterator: Iterator[Out] = _ + var plainFun = f() + def hasNext = if (currentIterator != null) currentIterator.hasNext else false + setHandlers(in, out, this) + + def pushPull(): Unit = + if (hasNext) { + push(out, currentIterator.next()) + if (!hasNext && isClosed(in)) completeStage() + } else if (!isClosed(in)) + pull(in) + else completeStage() + + def onFinish(): Unit = if (!hasNext) completeStage() + + override def onPush(): Unit = + try { + currentIterator = plainFun(grab(in)).iterator + pushPull() + } catch { + case NonFatal(ex) ⇒ decider(ex) match { + case Supervision.Stop ⇒ failStage(ex) + case Supervision.Resume ⇒ if (!hasBeenPulled(in)) pull(in) + case Supervision.Restart ⇒ + restartState() + if (!hasBeenPulled(in)) pull(in) + } + } + + override def onUpstreamFinish(): Unit = onFinish() + override def onPull(): Unit = pushPull() + + private def restartState(): Unit = { + plainFun = f() + currentIterator = null + } + } + override def toString = "StatefulMapConcat" + } \ No newline at end of file diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala index bc7223de86..593325a499 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Flow.scala @@ -329,6 +329,36 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): javadsl.Flow[In, T, Mat] = new Flow(delegate.mapConcat { elem ⇒ Util.immutableSeq(f(elem)) }) + /** + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ + def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):javadsl.Flow[In, T, Mat] = + new Flow(delegate.statefulMapConcat{ () ⇒ { + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }}) + /** * Transform this stream by applying the given function to each of the elements * as they pass through this processing step. The function returns a `CompletionStage` and the @@ -772,6 +802,7 @@ final class Flow[-In, +Out, +Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * Source may be materialized. + * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala index 974d1ea494..27ce254689 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/Source.scala @@ -779,6 +779,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * Source may be materialized. + * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * @@ -796,7 +797,7 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap new Source(delegate.recoverWith(pf)) /** - * Transform each input element into an `Iterable of output elements that is + * Transform each input element into an `Iterable` of output elements that is * then flattened into the output stream. * * Make sure that the `Iterable` is immutable or at least not modified after @@ -817,7 +818,38 @@ final class Source[+Out, +Mat](delegate: scaladsl.Source[Out, Mat]) extends Grap * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, _ <: java.lang.Iterable[T]]): javadsl.Source[T, Mat] = - new Source(delegate.mapConcat(elem ⇒ Util.immutableSeq(f.apply(elem)))) + new Source(delegate.statefulMapConcat(() ⇒ elem ⇒ Util.immutableSeq(f.apply(elem)))) + + + /** + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ + def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): javadsl.Source[T, Mat] = + new Source(delegate.statefulMapConcat{ () ⇒ { + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }}) /** * Transform this stream by applying the given function to each of the elements diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala index 5176084a10..74a9d81bc4 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubFlow.scala @@ -163,7 +163,38 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubFlow[In, T, Mat] = - new SubFlow(delegate.mapConcat { elem ⇒ Util.immutableSeq(f(elem)) }) + new SubFlow(delegate.statefulMapConcat { () ⇒ elem ⇒ Util.immutableSeq(f(elem)) }) + + /** + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ + def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]): SubFlow[In, T, Mat] = + new SubFlow(delegate.statefulMapConcat{ () ⇒ { + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }}) + /** * Transform this stream by applying the given function to each of the elements @@ -610,6 +641,7 @@ class SubFlow[-In, +Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Flo * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * Source may be materialized. + * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala index e804341e6f..f1bb731c90 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SubSource.scala @@ -159,7 +159,37 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * '''Cancels when''' downstream cancels */ def mapConcat[T](f: function.Function[Out, java.lang.Iterable[T]]): SubSource[T, Mat] = - new SubSource(delegate.mapConcat { elem ⇒ Util.immutableSeq(f(elem)) }) + new SubSource(delegate.statefulMapConcat { () ⇒ elem ⇒ Util.immutableSeq(f(elem)) }) + + /** + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[#mapConcat]]. + * + * Make sure that the `Iterable` is immutable or at least not modified after + * being used as an output sequence. Otherwise the stream may fail with + * `ConcurrentModificationException` or other more subtle errors may occur. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + */ + def statefulMapConcat[T](f: function.Creator[function.Function[Out, java.lang.Iterable[T]]]):SubSource[T, Mat] = + new SubSource(delegate.statefulMapConcat{ () ⇒ { + val fun = f.create() + elem ⇒ Util.immutableSeq(fun(elem)) + }}) /** * Transform this stream by applying the given function to each of the elements @@ -606,6 +636,7 @@ class SubSource[+Out, +Mat](delegate: scaladsl.SubFlow[Out, Mat, scaladsl.Source * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * Source may be materialized. + * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala index d921814db1..bfc32f6b65 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/Flow.scala @@ -430,6 +430,7 @@ trait FlowOps[+Out, +Mat] { * RecoverWith allows to switch to alternative Source on flow failure. It will stay in effect after * a failure has been recovered so that each time there is a failure it is fed into the `pf` and a new * Source may be materialized. + * * Since the underlying failure signal onError arrives out-of-band, it might jump over existing elements. * This stage can recover the failure signal, but not the skipped elements, which will be dropped. * @@ -479,7 +480,32 @@ trait FlowOps[+Out, +Mat] { * '''Cancels when''' downstream cancels * */ - def mapConcat[T](f: Out ⇒ immutable.Iterable[T]): Repr[T] = andThen(MapConcat(f)) + def mapConcat[T](f: Out ⇒ immutable.Iterable[T]): Repr[T] = statefulMapConcat(() => f) + + /** + * Transform each input element into an `Iterable` of output elements that is + * then flattened into the output stream. The transformation is meant to be stateful, + * which is enabled by creating the transformation function anew for every materialization — + * the returned function will typically close over mutable objects to store state between + * invocations. For the stateless variant see [[FlowOps.mapConcat]]. + * + * The returned `Iterable` MUST NOT contain `null` values, + * as they are illegal as stream elements - according to the Reactive Streams specification. + * + * '''Emits when''' the mapping function returns an element or there are still remaining elements + * from the previously calculated collection + * + * '''Backpressures when''' downstream backpressures or there are still remaining elements from the + * previously calculated collection + * + * '''Completes when''' upstream completes and all remaining elements has been emitted + * + * '''Cancels when''' downstream cancels + * + * See also [[FlowOps.mapConcat]] + */ + def statefulMapConcat[T](f: () ⇒ Out ⇒ immutable.Iterable[T]): Repr[T] = + via(new StatefulMapConcat(f)) /** * Transform this stream by applying the given function to each of the elements