From e5672727663dc256f4902a4ba84812b7f2eba33c Mon Sep 17 00:00:00 2001 From: tayvs Date: Tue, 25 Jun 2019 16:57:03 +0300 Subject: [PATCH] mapMaterializedValue implementation for withContext Flow/Source (#27201) --- .../stream/scaladsl/FlowWithContextSpec.scala | 15 +++++++++++++++ .../stream/scaladsl/SourceWithContextSpec.scala | 10 ++++++++++ .../akka/stream/javadsl/FlowWithContext.scala | 8 ++++++++ .../akka/stream/javadsl/SourceWithContext.scala | 8 ++++++++ .../akka/stream/scaladsl/FlowWithContext.scala | 8 ++++++++ .../akka/stream/scaladsl/FlowWithContextOps.scala | 2 +- .../akka/stream/scaladsl/SourceWithContext.scala | 8 ++++++++ 7 files changed, 58 insertions(+), 1 deletion(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala index 1a88530c98..72b02563f5 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowWithContextSpec.scala @@ -31,5 +31,20 @@ class FlowWithContextSpec extends StreamSpec { .expectNext(((Message("az", 1L), 1L))) .expectComplete() } + + "be able to map materialized value via FlowWithContext.mapMaterializedValue" in { + val materializedValue = "MatedValue" + val mapMaterializedValueFlow = FlowWithContext[Message, Long].mapMaterializedValue(_ => materializedValue) + + val msg = Message("a", 1L) + val (matValue, probe) = Source(Vector(msg)) + .mapMaterializedValue(_ => 42) + .asSourceWithContext(_.offset) + .viaMat(mapMaterializedValueFlow)(Keep.both) + .toMat(TestSink.probe[(Message, Long)])(Keep.both) + .run + matValue shouldBe (42 -> materializedValue) + probe.request(1).expectNext(((Message("a", 1L), 1L))).expectComplete() + } } } diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index f83a735746..702bb4dd9a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -107,5 +107,15 @@ class SourceWithContextSpec extends StreamSpec { .expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L))) .expectComplete() } + + "be able to change meterialized value via mapMaterializedValue" in { + val materializedValue = "MatedValue" + Source + .empty[Message] + .asSourceWithContext(_.offset) + .mapMaterializedValue(_ => materializedValue) + .to(Sink.ignore) + .run() shouldBe materializedValue + } } } diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala index 66efef46ce..b6bcc004f8 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/FlowWithContext.scala @@ -73,6 +73,14 @@ final class FlowWithContext[In, CtxIn, Out, CtxOut, +Mat]( override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = viaScala(_.withAttributes(attr)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Flow.mapMaterializedValue]]. + * + * @see [[akka.stream.scaladsl.Flow.mapMaterializedValue]] + */ + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] = + new FlowWithContext(delegate.mapMaterializedValue[Mat2](f)) + /** * Creates a regular flow of pairs (data, context). */ diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 45218aef86..ea19c4778e 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -58,6 +58,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = viaScala(_.withAttributes(attr)) + /** + * Context-preserving variant of [[akka.stream.javadsl.Source.mapMaterializedValue]]. + * + * @see [[akka.stream.javadsl.Flow.mapMaterializedValue]] + */ + def mapMaterializedValue[Mat2](f: function.Function[Mat, Mat2]): SourceWithContext[Out, Ctx, Mat2] = + viaScala(_.mapMaterializedValue(f.apply _)) + /** * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context). diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala index 0f79b2e75f..53766897ea 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContext.scala @@ -65,6 +65,14 @@ final class FlowWithContext[-In, -CtxIn, +Out, +CtxOut, +Mat](delegate: Flow[(In override def withAttributes(attr: Attributes): FlowWithContext[In, CtxIn, Out, CtxOut, Mat] = new FlowWithContext(delegate.withAttributes(attr)) + /** + * Context-preserving variant of [[akka.stream.scaladsl.Flow.mapMaterializedValue]]. + * + * @see [[akka.stream.scaladsl.Flow.mapMaterializedValue]] + */ + def mapMaterializedValue[Mat2](f: Mat => Mat2): FlowWithContext[In, CtxIn, Out, CtxOut, Mat2] = + new FlowWithContext(delegate.mapMaterializedValue(f)) + def asFlow: Flow[(In, CtxIn), (Out, CtxOut), Mat] = delegate def asJava[JIn <: In, JCtxIn <: CtxIn, JOut >: Out, JCtxOut >: CtxOut, JMat >: Mat] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala index 0dfe0a52d2..fd86b38972 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/FlowWithContextOps.scala @@ -49,7 +49,7 @@ trait FlowWithContextOps[+Out, +Ctx, +Mat] { * The `combine` function is used to compose the materialized values of this flow and that * flow into the materialized value of the resulting Flow. * - * @see [[akka.stream.scaladsl.FlowOps.viaMat]] + * @see [[akka.stream.scaladsl.FlowOpsMat.viaMat]] */ def viaMat[Out2, Ctx2, Mat2, Mat3](flow: Graph[FlowShape[(Out, Ctx), (Out2, Ctx2)], Mat2])( combine: (Mat, Mat2) => Mat3): ReprMat[Out2, Ctx2, Mat3] diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index 88fd9a1633..c4a0d49cce 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -45,6 +45,14 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (delegate: Sourc override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(delegate.withAttributes(attr)) + /** + * Context-preserving variant of [[akka.stream.scaladsl.Source.mapMaterializedValue]]. + * + * @see [[akka.stream.scaladsl.Source.mapMaterializedValue]] + */ + def mapMaterializedValue[Mat2](f: Mat => Mat2): SourceWithContext[Out, Ctx, Mat2] = + new SourceWithContext(delegate.mapMaterializedValue(f)) + /** * Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]], * concatenating the processing steps of both.