mapMaterializedValue implementation for withContext Flow/Source (#27201)

This commit is contained in:
tayvs 2019-06-25 16:57:03 +03:00 committed by Arnout Engelen
parent c932582238
commit e567272766
7 changed files with 58 additions and 1 deletions

View file

@ -31,5 +31,20 @@ class FlowWithContextSpec extends StreamSpec {
.expectNext(((Message("az", 1L), 1L)))
.expectComplete()
}
"be able to map materialized value via FlowWithContext.mapMaterializedValue" in {
val materializedValue = "MatedValue"
val mapMaterializedValueFlow = FlowWithContext[Message, Long].mapMaterializedValue(_ => materializedValue)
val msg = Message("a", 1L)
val (matValue, probe) = Source(Vector(msg))
.mapMaterializedValue(_ => 42)
.asSourceWithContext(_.offset)
.viaMat(mapMaterializedValueFlow)(Keep.both)
.toMat(TestSink.probe[(Message, Long)])(Keep.both)
.run
matValue shouldBe (42 -> materializedValue)
probe.request(1).expectNext(((Message("a", 1L), 1L))).expectComplete()
}
}
}

View file

@ -107,5 +107,15 @@ class SourceWithContextSpec extends StreamSpec {
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
.expectComplete()
}
"be able to change meterialized value via mapMaterializedValue" in {
val materializedValue = "MatedValue"
Source
.empty[Message]
.asSourceWithContext(_.offset)
.mapMaterializedValue(_ => materializedValue)
.to(Sink.ignore)
.run() shouldBe materializedValue
}
}
}

View file

@ -73,6 +73,14 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat](
override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
viaScala(_.withAttributes(attr))
/**
* Context-preserving variant of [[akka.stream.javadsl.Flow.mapMaterializedValue]].
*
* @see [[akka.stream.scaladsl.Flow.mapMaterializedValue]]
*/
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] =
new FlowWithContext(delegate.mapMaterializedValue[Mat2](f))
/**
* Creates a regular flow of pairs (data, context).
*/

View file

@ -58,6 +58,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] =
viaScala(_.withAttributes(attr))
/**
* Context-preserving variant of [[akka.stream.javadsl.Source.mapMaterializedValue]].
*
* @see [[akka.stream.javadsl.Flow.mapMaterializedValue]]
*/
def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): SourceWithContext[Out, Ctx, Mat2] =
viaScala(_.mapMaterializedValue(f.apply _))
/**
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).

View file

@ -65,6 +65,14 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In
override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] =
new FlowWithContext(delegate.withAttributes(attr))
/**
* Context-preserving variant of [[akka.stream.scaladsl.Flow.mapMaterializedValue]].
*
* @see [[akka.stream.scaladsl.Flow.mapMaterializedValue]]
*/
def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] =
new FlowWithContext(delegate.mapMaterializedValue(f))
def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate
def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat]

View file

@ -49,7 +49,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] {
* The `combine` function is used to compose the materialized values of this flow and that
* flow into the materialized value of the resulting Flow.
*
* @see [[akka.stream.scaladsl.FlowOps.viaMat]]
* @see [[akka.stream.scaladsl.FlowOpsMat.viaMat]]
*/
def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])(
combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3]

View file

@ -45,6 +45,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc
override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] =
new SourceWithContext(delegate.withAttributes(attr))
/**
* Context-preserving variant of [[akka.stream.scaladsl.Source.mapMaterializedValue]].
*
* @see [[akka.stream.scaladsl.Source.mapMaterializedValue]]
*/
def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] =
new SourceWithContext(delegate.mapMaterializedValue(f))
/**
* Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both.