add runWith to SourceWithContext

This commit is contained in:
Patrik Nordwall 2019-03-05 09:17:52 +01:00
parent a98daa0d08
commit a0daf3e5eb
3 changed files with 17 additions and 6 deletions

View file

@ -64,8 +64,7 @@ class SourceWithContextSpec extends StreamSpec {
.asSourceWithContext(_.offset)
.map(_.data)
.via(flowWithContext.map(s s + "b"))
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.runWith(TestSink.probe[(String, Long)])
.request(1)
.expectNext(("ab", 1L))
.expectComplete()
@ -78,8 +77,7 @@ class SourceWithContextSpec extends StreamSpec {
.mapConcat { str
List(1, 2, 3).map(i s"$str-$i")
}
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L))
.expectComplete()
@ -112,8 +110,7 @@ class SourceWithContextSpec extends StreamSpec {
.asSourceWithContext(_.offset)
.map(_.data)
.statefulMapConcat(statefulFunction)
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.runWith(TestSink.probe[(String, Long)])
.request(3)
.expectNext(("a", 1L), ("z", 2L), ("z", 2L))
.expectComplete()

View file

@ -235,6 +235,13 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): javadsl.RunnableGraph[Mat3] =
RunnableGraph.fromGraph(asScala.asSource.map { case (o, e) Pair(o, e) }.toMat(sink)(combinerToScala(combine)))
/**
* Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]] and run it.
* The returned value is the materialized value of the `Sink`.
*/
def runWith[M](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], M], materializer: Materializer): M =
toMat(sink, Keep.right[Mat, M]).run(materializer)
def asScala: scaladsl.SourceWithContext[Out, Ctx, Mat] = delegate
private[this] def viaScala[Out2, Ctx2, Mat2](f: scaladsl.SourceWithContext[Out, Ctx, Mat] scaladsl.SourceWithContext[Out2, Ctx2, Mat2]): SourceWithContext[Out2, Ctx2, Mat2] =

View file

@ -53,6 +53,13 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (
def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] =
delegate.toMat(sink)(combine)
/**
* Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]] and run it.
* The returned value is the materialized value of the `Sink`.
*/
def runWith[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(implicit materializer: Materializer): Mat2 =
delegate.runWith(sink)
/**
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).