diff --git a/akka-docs/src/main/paradox/persistence-query.md b/akka-docs/src/main/paradox/persistence-query.md index 1ee0573a8c..3ec590c236 100644 --- a/akka-docs/src/main/paradox/persistence-query.md +++ b/akka-docs/src/main/paradox/persistence-query.md @@ -286,7 +286,7 @@ their exposed semantics as well as handled query scenarios. A read journal plugin must implement `akka.persistence.query.ReadJournalProvider` which creates instances of `akka.persistence.query.scaladsl.ReadJournal` and -`akka.persistence.query.javaadsl.ReadJournal`. The plugin must implement both the `scaladsl` +`akka.persistence.query.javadsl.ReadJournal`. The plugin must implement both the `scaladsl` and the `javadsl` @scala[traits]@java[interfaces] because the `akka.stream.scaladsl.Source` and `akka.stream.javadsl.Source` are different types and even though those types can be converted to each other it is most convenient for the end user to get access to the Java or Scala `Source` directly. diff --git a/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala b/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala index 4941edc279..41fc3a035e 100644 --- a/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala +++ b/akka-persistence-query/src/main/scala/akka/persistence/query/ReadJournalProvider.scala @@ -9,7 +9,7 @@ package akka.persistence.query * It provides the concrete implementations for the Java and Scala APIs. * * A read journal plugin must provide implementations for both - * `akka.persistence.query.scaladsl.ReadJournal` and `akka.persistence.query.javaadsl.ReadJournal`. + * `akka.persistence.query.scaladsl.ReadJournal` and `akka.persistence.query.javadsl.ReadJournal`. * The plugin must implement both the `scaladsl` and the `javadsl` traits because the * `akka.stream.scaladsl.Source` and `akka.stream.javadsl.Source` are different types * and even though those types can easily be converted to each other it is most convenient diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala index fb845d7342..296546578d 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/SourceWithContextSpec.scala @@ -21,8 +21,8 @@ class SourceWithContextSpec extends StreamSpec { val msg = Message("a", 1L) Source(Vector(msg)) .asSourceWithContext(_.offset) - .asSource - .runWith(TestSink.probe[(Message, Long)]) + .toMat(TestSink.probe[(Message, Long)])(Keep.right) + .run .request(1) .expectNext((msg, 1L)) .expectComplete() @@ -48,8 +48,8 @@ class SourceWithContextSpec extends StreamSpec { .map(_.data.toLowerCase) .filter(_ != "b") .filterNot(_ == "d") - .asSource - .runWith(TestSink.probe[(String, Long)]) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run .request(2) .expectNext(("a", 1L)) .expectNext(("c", 4L)) @@ -64,8 +64,8 @@ class SourceWithContextSpec extends StreamSpec { .asSourceWithContext(_.offset) .map(_.data) .via(flowWithContext.map(s ⇒ s + "b")) - .asSource - .runWith(TestSink.probe[(String, Long)]) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run .request(1) .expectNext(("ab", 1L)) .expectComplete() @@ -78,8 +78,8 @@ class SourceWithContextSpec extends StreamSpec { .mapConcat { str ⇒ List(1, 2, 3).map(i ⇒ s"$str-$i") } - .asSource - .runWith(TestSink.probe[(String, Long)]) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run .request(3) .expectNext(("a-1", 1L), ("a-2", 1L), ("a-3", 1L)) .expectComplete() @@ -93,8 +93,8 @@ class SourceWithContextSpec extends StreamSpec { List(1, 2, 3, 4).map(i ⇒ s"$str-$i") } .grouped(2) - .asSource - .runWith(TestSink.probe[(Seq[String], Seq[Long])]) + .toMat(TestSink.probe[(Seq[String], Seq[Long])])(Keep.right) + .run .request(2) .expectNext((Seq("a-1", "a-2"), Seq(1L, 1L)), (Seq("a-3", "a-4"), Seq(1L, 1L))) .expectComplete() @@ -112,8 +112,8 @@ class SourceWithContextSpec extends StreamSpec { .asSourceWithContext(_.offset) .map(_.data) .statefulMapConcat(statefulFunction) - .asSource - .runWith(TestSink.probe[(String, Long)]) + .toMat(TestSink.probe[(String, Long)])(Keep.right) + .run .request(3) .expectNext(("a", 1L), ("z", 2L), ("z", 2L)) .expectComplete() diff --git a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala index 7e2ed55850..f76915672d 100644 --- a/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/javadsl/SourceWithContext.scala @@ -19,7 +19,7 @@ import scala.compat.java8.FutureConverters._ /** * A source that provides operations which automatically propagate the context of an element. * Only a subset of common operations from [[Source]] is supported. As an escape hatch you can - * use [[SourceWithContext.via]] to manually provide the context propagation for otherwise unsupported + * use [[SourceWithContext#via]] to manually provide the context propagation for otherwise unsupported * operations. * * Can be created by calling [[Source.asSourceWithContext()]] @@ -221,6 +221,20 @@ final class SourceWithContext[+Out, +Ctx, +Mat](delegate: scaladsl.SourceWithCon def log(name: String): SourceWithContext[Out, Ctx, Mat] = this.log(name, ConstantFun.javaIdentityFunction[Out], null) + /** + * Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]], + * concatenating the processing steps of both. + */ + def to[Mat2](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2]): javadsl.RunnableGraph[Mat] = + RunnableGraph.fromGraph(asScala.asSource.map { case (o, e) ⇒ Pair(o, e) }.to(sink)) + + /** + * Connect this [[akka.stream.javadsl.SourceWithContext]] to a [[akka.stream.javadsl.Sink]], + * concatenating the processing steps of both. + */ + def toMat[Mat2, Mat3](sink: Graph[SinkShape[Pair[Out @uncheckedVariance, Ctx @uncheckedVariance]], Mat2], combine: function.Function2[Mat, Mat2, Mat3]): javadsl.RunnableGraph[Mat3] = + RunnableGraph.fromGraph(asScala.asSource.map { case (o, e) ⇒ Pair(o, e) }.toMat(sink)(combinerToScala(combine))) + def asScala: scaladsl.SourceWithContext[Out, Ctx, Mat] = delegate private[this] def viaScala[Out2, Ctx2, Mat2](f: scaladsl.SourceWithContext[Out, Ctx, Mat] ⇒ scaladsl.SourceWithContext[Out2, Ctx2, Mat2]): SourceWithContext[Out2, Ctx2, Mat2] = diff --git a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala index 3ae5d7aad3..6202997d1e 100644 --- a/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala +++ b/akka-stream/src/main/scala/akka/stream/scaladsl/SourceWithContext.scala @@ -39,6 +39,20 @@ final class SourceWithContext[+Out, +Ctx, +Mat] private[stream] ( override def withAttributes(attr: Attributes): SourceWithContext[Out, Ctx, Mat] = new SourceWithContext(delegate.withAttributes(attr)) + /** + * Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]], + * concatenating the processing steps of both. + */ + def to[Mat2](sink: Graph[SinkShape[(Out, Ctx)], Mat2]): RunnableGraph[Mat] = + delegate.toMat(sink)(Keep.left) + + /** + * Connect this [[akka.stream.scaladsl.SourceWithContext]] to a [[akka.stream.scaladsl.Sink]], + * concatenating the processing steps of both. + */ + def toMat[Mat2, Mat3](sink: Graph[SinkShape[(Out, Ctx)], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = + delegate.toMat(sink)(combine) + /** * Stops automatic context propagation from here and converts this to a regular * stream of a pair of (data, context).