!str remove {Source,Flow}WithContext.statefulMapConcat, fixes #26330

statefulMapConcat will by design confound processing of single elements.
However, in the WithContext APIs the user only has access to the elements
but not to the contexts so the association is likely to be wrong in the end.

So, for now, we follow our basic rule not to include potentially order-changing
operations in the WithContext APIs and remove statefulMapConcat. Users that
need something like that should drop to the regular tuple passing APIs and
manage context propagation manually.
This commit is contained in:
Johannes Rudolph 2019-03-06 11:21:41 +01:00
parent c186c1bde9
commit 45fa9dd552
No known key found for this signature in database
GPG key ID: 4D293A24CCD39E19
5 changed files with 11 additions and 136 deletions

View file

@ -107,23 +107,5 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
.expectComplete()
}
"pass through context via statefulMapConcat" in {
val statefulFunction: () String collection.immutable.Iterable[String] = () {
var counter = 0
str {
counter = counter + 1
(1 to counter).map(_ str)
}
}
Source(Vector(Message("a", 1L), Message("z", 2L)))
.asSourceWithContext(_.offset)
.map(_.data)
.statefulMapConcat(statefulFunction)
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a", 1L), ("z", 2L), ("z", 2L))
.expectComplete()
}
}
}

View file

@ -24,3 +24,10 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.GraphDelegate.wi
# rename `from` to `fromTuples` in WithContext Scala dsl #26370
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.from")
# Remove statefulMapConcat from @ApiMayChange {Source,Flow}WithContext #26330
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.FlowWithContext.statefulMapConcat")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWithContext.statefulMapConcat")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.statefulMapConcat")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.statefulMapConcat")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.statefulMapConcat")

View file

@ -179,40 +179,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: scaladsl
def sliding(n: Int, step: Int = 1): FlowWithContext[In, CtxIn, java.util.List[Out @uncheckedVariance], java.util.List[CtxOut @uncheckedVariance], Mat] =
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.statefulMapConcat(() => dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.javadsl.Flow.statefulMapConcat]]
*/
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
viaScala(_.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
})
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
*

View file

@ -169,40 +169,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Out @uncheckedVariance], java.util.List[Ctx @uncheckedVariance], Mat] =
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* def dup(element: String) = Seq(element, element)
*
* Input:
*
* ("a", 1)
* ("b", 2)
*
* inputElements.statefulMapConcat(() => dup)
*
* Output:
*
* ("a", 1)
* ("a", 1)
* ("b", 2)
* ("b", 2)
* ```
*
* @see [[akka.stream.javadsl.Source.statefulMapConcat]]
*/
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Out2, Ctx, Mat] =
viaScala(_.statefulMapConcat { ()
val fun = f.create()
elem Util.immutableSeq(fun(elem))
})
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.log]].
*

View file

@ -155,56 +155,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
*
* @see [[akka.stream.scaladsl.FlowOps.mapConcat]]
*/
def mapConcat[Out2](f: Out immutable.Iterable[Out2]): Repr[Out2, Ctx] = statefulMapConcat(() f)
/**
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]].
*
* The context of the input element will be associated with each of the output elements calculated from
* this input element.
*
* Example:
*
* ```
* val statefulRepeat: () String collection.immutable.Iterable[String] = () {
* var counter = 0
* str {
* counter = counter + 1
* (1 to counter).map(_ str)
* }
* }
* ```
*
* Input:
*
* ("a", 4)
* ("b", 5)
* ("c", 6)
*
* inputElements.statefulMapConcat(statefulRepeat)
*
* Output:
*
* ("a", 4)
* ("b", 5)
* ("b", 5)
* ("c", 6)
* ("c", 6)
* ("c", 6)
* ```
*
* @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]
*/
def statefulMapConcat[Out2](f: () Out immutable.Iterable[Out2]): Repr[Out2, Ctx] = {
val fCtx: () ((Out, Ctx)) immutable.Iterable[(Out2, Ctx)] = () {
val plainFun = f()
elWithContext {
val (el, ctx) = elWithContext
plainFun(el).map(o (o, ctx))
}
}
via(flow.statefulMapConcat(fCtx))
}
def mapConcat[Out2](f: Out immutable.Iterable[Out2]): Repr[Out2, Ctx] =
via(flow.mapConcat {
case (e, ctx) f(e).map(_ -> ctx)
})
/**
* Apply the given function to each context element (leaving the data elements unchanged).