Merge pull request #26474 from jrudolph/jr/26330-remove-WithContext-statefulMapContext
!str remove {Source,Flow}WithContext.statefulMapConcat, fixes #26330
This commit is contained in:
commit
f1872b773c
5 changed files with 11 additions and 136 deletions
|
|
@ -107,23 +107,5 @@ class SourceWithContextSpec extends StreamSpec {
|
|||
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
|
||||
.expectComplete()
|
||||
}
|
||||
|
||||
"pass through context via statefulMapConcat" in {
|
||||
val statefulFunction: () ⇒ String ⇒ collection.immutable.Iterable[String] = () ⇒ {
|
||||
var counter = 0
|
||||
str ⇒ {
|
||||
counter = counter + 1
|
||||
(1 to counter).map(_ ⇒ str)
|
||||
}
|
||||
}
|
||||
Source(Vector(Message("a", 1L), Message("z", 2L)))
|
||||
.asSourceWithContext(_.offset)
|
||||
.map(_.data)
|
||||
.statefulMapConcat(statefulFunction)
|
||||
.runWith(TestSink.probe[(String, Long)])
|
||||
.request(3)
|
||||
.expectNext(("a", 1L), ("z", 2L), ("z", 2L))
|
||||
.expectComplete()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -24,3 +24,10 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.GraphDelegate.wi
|
|||
|
||||
# rename `from` to `fromTuples` in WithContext Scala dsl #26370
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.from")
|
||||
|
||||
# Remove statefulMapConcat from @ApiMayChange {Source,Flow}WithContext #26330
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.FlowWithContext.statefulMapConcat")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.javadsl.SourceWithContext.statefulMapConcat")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContext.statefulMapConcat")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.SourceWithContext.statefulMapConcat")
|
||||
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.stream.scaladsl.FlowWithContextOps.statefulMapConcat")
|
||||
|
|
|
|||
|
|
@ -179,40 +179,6 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: scaladsl
|
|||
def sliding(n: Int, step: Int = 1): FlowWithContext[In, CtxIn, java.util.List[Out @uncheckedVariance], java.util.List[CtxOut @uncheckedVariance], Mat] =
|
||||
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[akka.stream.javadsl.Flow.statefulMapConcat]].
|
||||
*
|
||||
* The context of the input element will be associated with each of the output elements calculated from
|
||||
* this input element.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* ```
|
||||
* def dup(element: String) = Seq(element, element)
|
||||
*
|
||||
* Input:
|
||||
*
|
||||
* ("a", 1)
|
||||
* ("b", 2)
|
||||
*
|
||||
* inputElements.statefulMapConcat(() => dup)
|
||||
*
|
||||
* Output:
|
||||
*
|
||||
* ("a", 1)
|
||||
* ("a", 1)
|
||||
* ("b", 2)
|
||||
* ("b", 2)
|
||||
* ```
|
||||
*
|
||||
* @see [[akka.stream.javadsl.Flow.statefulMapConcat]]
|
||||
*/
|
||||
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): FlowWithContext[In, CtxIn, Out2, CtxOut, Mat] =
|
||||
viaScala(_.statefulMapConcat { () ⇒
|
||||
val fun = f.create()
|
||||
elem ⇒ Util.immutableSeq(fun(elem))
|
||||
})
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[akka.stream.javadsl.Flow.log]].
|
||||
*
|
||||
|
|
|
|||
|
|
@ -169,40 +169,6 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
|
|||
def sliding(n: Int, step: Int = 1): SourceWithContext[java.util.List[Out @uncheckedVariance], java.util.List[Ctx @uncheckedVariance], Mat] =
|
||||
viaScala(_.sliding(n, step).map(_.asJava).mapContext(_.asJava))
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[akka.stream.javadsl.Source.statefulMapConcat]].
|
||||
*
|
||||
* The context of the input element will be associated with each of the output elements calculated from
|
||||
* this input element.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* ```
|
||||
* def dup(element: String) = Seq(element, element)
|
||||
*
|
||||
* Input:
|
||||
*
|
||||
* ("a", 1)
|
||||
* ("b", 2)
|
||||
*
|
||||
* inputElements.statefulMapConcat(() => dup)
|
||||
*
|
||||
* Output:
|
||||
*
|
||||
* ("a", 1)
|
||||
* ("a", 1)
|
||||
* ("b", 2)
|
||||
* ("b", 2)
|
||||
* ```
|
||||
*
|
||||
* @see [[akka.stream.javadsl.Source.statefulMapConcat]]
|
||||
*/
|
||||
def statefulMapConcat[Out2](f: function.Creator[function.Function[Out, java.lang.Iterable[Out2]]]): SourceWithContext[Out2, Ctx, Mat] =
|
||||
viaScala(_.statefulMapConcat { () ⇒
|
||||
val fun = f.create()
|
||||
elem ⇒ Util.immutableSeq(fun(elem))
|
||||
})
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[akka.stream.javadsl.Source.log]].
|
||||
*
|
||||
|
|
|
|||
|
|
@ -155,56 +155,10 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
|
|||
*
|
||||
* @see [[akka.stream.scaladsl.FlowOps.mapConcat]]
|
||||
*/
|
||||
def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = statefulMapConcat(() ⇒ f)
|
||||
|
||||
/**
|
||||
* Context-preserving variant of [[akka.stream.scaladsl.FlowOps.statefulMapConcat]].
|
||||
*
|
||||
* The context of the input element will be associated with each of the output elements calculated from
|
||||
* this input element.
|
||||
*
|
||||
* Example:
|
||||
*
|
||||
* ```
|
||||
* val statefulRepeat: () ⇒ String ⇒ collection.immutable.Iterable[String] = () ⇒ {
|
||||
* var counter = 0
|
||||
* str ⇒ {
|
||||
* counter = counter + 1
|
||||
* (1 to counter).map(_ ⇒ str)
|
||||
* }
|
||||
* }
|
||||
* ```
|
||||
*
|
||||
* Input:
|
||||
*
|
||||
* ("a", 4)
|
||||
* ("b", 5)
|
||||
* ("c", 6)
|
||||
*
|
||||
* inputElements.statefulMapConcat(statefulRepeat)
|
||||
*
|
||||
* Output:
|
||||
*
|
||||
* ("a", 4)
|
||||
* ("b", 5)
|
||||
* ("b", 5)
|
||||
* ("c", 6)
|
||||
* ("c", 6)
|
||||
* ("c", 6)
|
||||
* ```
|
||||
*
|
||||
* @see [[akka.stream.scaladsl.FlowOps.statefulMapConcat]]
|
||||
*/
|
||||
def statefulMapConcat[Out2](f: () ⇒ Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] = {
|
||||
val fCtx: () ⇒ ((Out, Ctx)) ⇒ immutable.Iterable[(Out2, Ctx)] = () ⇒ {
|
||||
val plainFun = f()
|
||||
elWithContext ⇒ {
|
||||
val (el, ctx) = elWithContext
|
||||
plainFun(el).map(o ⇒ (o, ctx))
|
||||
}
|
||||
}
|
||||
via(flow.statefulMapConcat(fCtx))
|
||||
}
|
||||
def mapConcat[Out2](f: Out ⇒ immutable.Iterable[Out2]): Repr[Out2, Ctx] =
|
||||
via(flow.mapConcat {
|
||||
case (e, ctx) ⇒ f(e).map(_ -> ctx)
|
||||
})
|
||||
|
||||
/**
|
||||
* Apply the given function to each context element (leaving the data elements unchanged).
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue