From 45fa9dd55235be3bcad94f0a2dccaba9998971ac Mon Sep 17 00:00:00 2001 From: Johannes Rudolph Date: Wed, 6 Mar 2019 11:21:41 +0100 Subject: [PATCH] !str remove {Source,Flow}WithContext.statefulMapConcat, fixes #26330 statefulMapConcat will by design confound processing of single elements. However, in the WithContext APIs the user only has access to the elements but not to the contexts so the association is likely to be wrong in the end. So, for now, we follow our basic rule not to include potentially order-changing operations in the WithContext APIs and remove statefulMapConcat. Users that need something like that should drop to the regular tuple passing APIs and manage context propagation manually. --- .../scaladsl/SourceWithContextSpec.scala | 18 ------- .../mima-filters/2.5.21.backwards.excludes | 7 +++ .../akka/stream/javadsl/FlowWithContext.scala | 34 ------------ .../stream/javadsl/SourceWithContext.scala | 34 ------------ .../stream/scaladsl/FlowWithContextOps.scala | 54 ++----------------- 5 files changed, 11 insertions(+), 136 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index b37aa31174..c283e1d900 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -107,23 +107,5 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L))) .expectComplete() } - - "pass through context via statefulMapConcat" in { - val statefulFunction: () ⇒ String ⇒ collection.immutable.Iterable[String] = () ⇒ { - var counter = 0 - str ⇒ { - counter = counter + 1 - (1 to counter).map(_ ⇒ str) - } - } - Source(Vector(Message("a", 1L), Message("z", 2L))) - .asSourceWithContext(_.offset) - .map(_.data) - .statefulMapConcat(statefulFunction) - .runWith(TestSink.probe[(String, Long)]) - .request(3) - .expectNext(("a", 1L), ("z", 2L), ("z", 2L)) - .expectComplete() - } } } diff --git a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes index 6573730526..53d7014390 100644 --- a/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes +++ b/akka-stream/src/main/mima-filters/2.5.21.backwards.excludes @@ -24,3 +24,10 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.GraphDelegate.wi # rename `from` to `fromTuples` in WithContext Scala dsl #26370 ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.from") + +# Remove statefulMapConcat from @ApiMayChange {Source,Flow}WithContext #26330 +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.FlowWithContext.statefulMapConcat") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWithContext.statefulMapConcat") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.statefulMapConcat") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.statefulMapConcat") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.statefulMapConcat") diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index b8f5c30b3a..116098923f 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -179,40 +179,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: scaladsl def sliding(n: Int, step: Int = 1): FlowWithContext[In, CtxIn, java.util.List[Out @uncheckedVariance], java.util.List[CtxOut @uncheckedVariance], Mat] = viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) - /** - * Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]]. - * - * The context of the input element will be associated with each of the output elements calculated from - * this input element. - * - * Example: - * - * ``` - * def dup(element: String) = Seq(element, element) - * - * Input: - * - * ("a", 1) - * ("b", 2) - * - * inputElements.statefulMapConcat(() => dup) - * - * Output: - * - * ("a", 1) - * ("a", 1) - * ("b", 2) - * ("b", 2) - * ``` - * - * @see [[akka.stream.javadsl.Flow.statefulMapConcat]] - */ - def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] = - viaScala(_.statefulMapConcat { () ⇒ - val fun = f.create() - elem ⇒ Util.immutableSeq(fun(elem)) - }) - /** * Context-preserving variant of [[akka.stream.javadsl.Flow.log]]. * diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index b12fc5dc63..69f0278431 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -169,40 +169,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Out @uncheckedVariance], java.util.List[Ctx @uncheckedVariance], Mat] = viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava)) - /** - * Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]]. - * - * The context of the input element will be associated with each of the output elements calculated from - * this input element. - * - * Example: - * - * ``` - * def dup(element: String) = Seq(element, element) - * - * Input: - * - * ("a", 1) - * ("b", 2) - * - * inputElements.statefulMapConcat(() => dup) - * - * Output: - * - * ("a", 1) - * ("a", 1) - * ("b", 2) - * ("b", 2) - * ``` - * - * @see [[akka.stream.javadsl.Source.statefulMapConcat]] - */ - def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Out2, Ctx, Mat] = - viaScala(_.statefulMapConcat { () ⇒ - val fun = f.create() - elem ⇒ Util.immutableSeq(fun(elem)) - }) - /** * Context-preserving variant of [[akka.stream.javadsl.Source.log]]. * diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 92bf314a92..820163033f 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -155,56 +155,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * * @see [[akka.stream.scaladsl.FlowOps.mapConcat]] */ - def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = statefulMapConcat(() ⇒ f) - - /** - * Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]. - * - * The context of the input element will be associated with each of the output elements calculated from - * this input element. - * - * Example: - * - * ``` - * val statefulRepeat: () ⇒ String ⇒ collection.immutable.Iterable[String] = () ⇒ { - * var counter = 0 - * str ⇒ { - * counter = counter + 1 - * (1 to counter).map(_ ⇒ str) - * } - * } - * ``` - * - * Input: - * - * ("a", 4) - * ("b", 5) - * ("c", 6) - * - * inputElements.statefulMapConcat(statefulRepeat) - * - * Output: - * - * ("a", 4) - * ("b", 5) - * ("b", 5) - * ("c", 6) - * ("c", 6) - * ("c", 6) - * ``` - * - * @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]] - */ - def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = { - val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = () ⇒ { - val plainFun = f() - elWithContext ⇒ { - val (el, ctx) = elWithContext - plainFun(el).map(o ⇒ (o, ctx)) - } - } - via(flow.statefulMapConcat(fCtx)) - } + def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = + via(flow.mapConcat { + case (e, ctx) ⇒ f(e).map(_ -> ctx) + }) /** * Apply the given function to each context element (leaving the data elements unchanged).