From a0daf3e5eb3ad192a14825227c349a5f07dca9fe Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 5 Mar 2019 09:17:52 +0100 Subject: [PATCH] add runWith to SourceWithContext --- .../akka/stream/scaladsl/SourceWithContextSpec.scala | 9 +++------ .../scala/akka/stream/javadsl/SourceWithContext.scala | 7 +++++++ .../scala/akka/stream/scaladsl/SourceWithContext.scala | 7 +++++++ 3 files changed, 17 insertions(+), 6 deletions(-) diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index 296546578d..83edfb2247 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -64,8 +64,7 @@ class SourceWithContextSpec extends StreamSpec { .asSourceWithContext(_.offset) .map(_.data) .via(flowWithContext.map(s ⇒ s + "b")) - .toMat(TestSink.probe[(String, Long)])(Keep.right) - .run + .runWith(TestSink.probe[(String, Long)]) .request(1) .expectNext(("ab", 1L)) .expectComplete() @@ -78,8 +77,7 @@ class SourceWithContextSpec extends StreamSpec { .mapConcat { str ⇒ List(1, 2, 3).map(i ⇒ s"$str-$i") } - .toMat(TestSink.probe[(String, Long)])(Keep.right) - .run + .runWith(TestSink.probe[(String, Long)]) .request(3) .expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L)) .expectComplete() @@ -112,8 +110,7 @@ class SourceWithContextSpec extends StreamSpec { .asSourceWithContext(_.offset) .map(_.data) .statefulMapConcat(statefulFunction) - .toMat(TestSink.probe[(String, Long)])(Keep.right) - .run + .runWith(TestSink.probe[(String, Long)]) .request(3) .expectNext(("a", 1L), ("z", 2L), ("z", 2L)) .expectComplete() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index f76915672d..eb1b39d96c 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -235,6 +235,13 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): javadsl.RunnableGraph[Mat3] = RunnableGraph.fromGraph(asScala.asSource.map { case (o, e) ⇒ Pair(o, e) }.toMat(sink)(combinerToScala(combine))) + /** + * Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]] and run it. + * The returned value is the materialized value of the `Sink`. + */ + def runWith[M](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], M], materializer: Materializer): M = + toMat(sink, Keep.right[Mat, M]).run(materializer) + def asScala: scaladsl.SourceWithContext[Out, Ctx, Mat] = delegate private[this] def viaScala[Out2, Ctx2, Mat2](f: scaladsl.SourceWithContext[Out, Ctx, Mat] ⇒ scaladsl.SourceWithContext[Out2, Ctx2, Mat2]): SourceWithContext[Out2, Ctx2, Mat2] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index 6202997d1e..f7296ff2bb 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -53,6 +53,13 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] ( def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = delegate.toMat(sink)(combine) + /** + * Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]] and run it. + * The returned value is the materialized value of the `Sink`. + */ + def runWith[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(implicit materializer: Materializer): Mat2 = + delegate.runWith(sink) + /** * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context).