Adds to and toMap to SourceWithContext

This commit is contained in:
Luc Bourlier 2019-02-18 17:47:20 +01:00 committed by Patrik Nordwall
parent c58d348b2a
commit a98daa0d08
5 changed files with 43 additions and 15 deletions

View file

@ -286,7 +286,7 @@ their exposed semantics as well as handled query scenarios.
A read journal plugin must implement `akka.persistence.query.ReadJournalProvider` which
creates instances of `akka.persistence.query.scaladsl.ReadJournal` and
`akka.persistence.query.javaadsl.ReadJournal`. The plugin must implement both the `scaladsl`
`akka.persistence.query.javadsl.ReadJournal`. The plugin must implement both the `scaladsl`
and the `javadsl` @scala[traits]@java[interfaces] because the `akka.stream.scaladsl.Source` and
`akka.stream.javadsl.Source` are different types and even though those types can be converted
to each other it is most convenient for the end user to get access to the Java or Scala `Source` directly.

View file

@ -9,7 +9,7 @@ package akka.persistence.query
* It provides the concrete implementations for the Java and Scala APIs.
*
* A read journal plugin must provide implementations for both
* `akka.persistence.query.scaladsl.ReadJournal` and `akka.persistence.query.javaadsl.ReadJournal`.
* `akka.persistence.query.scaladsl.ReadJournal` and `akka.persistence.query.javadsl.ReadJournal`.
* The plugin must implement both the `scaladsl` and the `javadsl` traits because the
* `akka.stream.scaladsl.Source` and `akka.stream.javadsl.Source` are different types
* and even though those types can easily be converted to each other it is most convenient

View file

@ -21,8 +21,8 @@ class SourceWithContextSpec extends StreamSpec {
val msg = Message("a", 1L)
Source(Vector(msg))
.asSourceWithContext(_.offset)
.asSource
.runWith(TestSink.probe[(Message, Long)])
.toMat(TestSink.probe[(Message, Long)])(Keep.right)
.run
.request(1)
.expectNext((msg, 1L))
.expectComplete()
@ -48,8 +48,8 @@ class SourceWithContextSpec extends StreamSpec {
.map(_.data.toLowerCase)
.filter(_ != "b")
.filterNot(_ == "d")
.asSource
.runWith(TestSink.probe[(String, Long)])
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.request(2)
.expectNext(("a", 1L))
.expectNext(("c", 4L))
@ -64,8 +64,8 @@ class SourceWithContextSpec extends StreamSpec {
.asSourceWithContext(_.offset)
.map(_.data)
.via(flowWithContext.map(s s + "b"))
.asSource
.runWith(TestSink.probe[(String, Long)])
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.request(1)
.expectNext(("ab", 1L))
.expectComplete()
@ -78,8 +78,8 @@ class SourceWithContextSpec extends StreamSpec {
.mapConcat { str
List(1, 2, 3).map(i s"$str-$i")
}
.asSource
.runWith(TestSink.probe[(String, Long)])
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.request(3)
.expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L))
.expectComplete()
@ -93,8 +93,8 @@ class SourceWithContextSpec extends StreamSpec {
List(1, 2, 3, 4).map(i s"$str-$i")
}
.grouped(2)
.asSource
.runWith(TestSink.probe[(Seq[String], Seq[Long])])
.toMat(TestSink.probe[(Seq[String], Seq[Long])])(Keep.right)
.run
.request(2)
.expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L)))
.expectComplete()
@ -112,8 +112,8 @@ class SourceWithContextSpec extends StreamSpec {
.asSourceWithContext(_.offset)
.map(_.data)
.statefulMapConcat(statefulFunction)
.asSource
.runWith(TestSink.probe[(String, Long)])
.toMat(TestSink.probe[(String, Long)])(Keep.right)
.run
.request(3)
.expectNext(("a", 1L), ("z", 2L), ("z", 2L))
.expectComplete()

View file

@ -19,7 +19,7 @@ import scala.compat.java8.FutureConverters._
/**
* A source that provides operations which automatically propagate the context of an element.
* Only a subset of common operations from [[Source]] is supported. As an escape hatch you can
* use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported
* use [[SourceWithContext#via]] to manually provide the context propagation for otherwise unsupported
* operations.
*
* Can be created by calling [[Source.asSourceWithContext()]]
@ -221,6 +221,20 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon
def log(name: String): SourceWithContext[Out, Ctx, Mat] =
this.log(name, ConstantFun.javaIdentityFunction[Out], null)
/**
* Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]],
* concatenating the processing steps of both.
*/
def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2]): javadsl.RunnableGraph[Mat] =
RunnableGraph.fromGraph(asScala.asSource.map { case (o, e) Pair(o, e) }.to(sink))
/**
* Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]],
* concatenating the processing steps of both.
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): javadsl.RunnableGraph[Mat3] =
RunnableGraph.fromGraph(asScala.asSource.map { case (o, e) Pair(o, e) }.toMat(sink)(combinerToScala(combine)))
def asScala: scaladsl.SourceWithContext[Out, Ctx, Mat] = delegate
private[this] def viaScala[Out2, Ctx2, Mat2](f: scaladsl.SourceWithContext[Out, Ctx, Mat] scaladsl.SourceWithContext[Out2, Ctx2, Mat2]): SourceWithContext[Out2, Ctx2, Mat2] =

View file

@ -39,6 +39,20 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] (
override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] =
new SourceWithContext(delegate.withAttributes(attr))
/**
* Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both.
*/
def to[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2]): RunnableGraph[Mat] =
delegate.toMat(sink)(Keep.left)
/**
* Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]],
* concatenating the processing steps of both.
*/
def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) Mat3): RunnableGraph[Mat3] =
delegate.toMat(sink)(combine)
/**
* Stops automatic context propagation from here and converts this to a regular
* stream of a pair of (data, context).