diff --git a/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md b/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md index 4e6ac43f05..e198d69426 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md @@ -4,12 +4,51 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Subs @ref[Source operators](../index.md#source-operators) -@@@ div { .group-scala } ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #asSubscriber } +@@@ div { .group-scala } + +@@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } + +@@@ + +@@@ div { .group-java } + +@@snip[JavaFlowSupport.java](/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java) { #asSubscriber } + @@@ ## Description -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 +If you want to create a @apidoc[Source] that gets its elements from another library that supports +[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.asSubscriber`. +Each time this @apidoc[Source] is materialized, it produces a materialized value of type +@javadoc[java.util.concurrent.Flow.Subscriber](java.util.concurrent.Flow.Subscriber). +This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached to a +[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher) +to populate it. + +@@@ note + +For JDK 8 users: since @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced in JDK version 9, +if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with `Source.asSubscriber` and `Flow.asSubscriber`. + +@@@ + +## Example + +Suppose we use a database client that supports [Reactive Streams](https://www.reactive-streams.org/), +we could create a @apidoc[Source] that queries the database for its rows. That @apidoc[Source] can then +be used for further processing, for example creating a @apidoc[Source] that contains the names of the +rows. + +Note that since the database is queried for each materialization, the `rowSource` can be safely re-used. +Because both the database driver and Akka Streams support [Reactive Streams](https://www.reactive-streams.org/), +backpressure is applied throughout the stream, preventing us from running out of memory when the database +rows are consumed slower than they are produced by the database. + +Scala +: @@snip [AsSubscriber.scala](/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/AsSubscriber.scala) { #imports #example } + +Java +: @@snip [AsSubscriber.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java) { #imports #example } diff --git a/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java b/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java new file mode 100644 index 0000000000..f64796dd93 --- /dev/null +++ b/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java @@ -0,0 +1,67 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +//#imports +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Publisher; + +import akka.NotUsed; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.JavaFlowSupport; + +//#imports + +import org.apache.commons.lang.NotImplementedException; + +public interface AsSubscriber { + static class JavaFlowSupport { + public static final class Source { + public + // #api + static akka.stream.javadsl.Source> asSubscriber() + // #api + { + return akka.stream.javadsl.JavaFlowSupport.Source.asSubscriber(); + } + } + } + + static class Row { + public String getField(String fieldName) { + throw new NotImplementedException(); + } + } + + static class DatabaseClient { + Publisher fetchRows() { + throw new NotImplementedException(); + } + } + + DatabaseClient databaseClient = null; + + // #example + class Example { + Source rowSource = + JavaFlowSupport.Source.asSubscriber() + .mapMaterializedValue( + subscriber -> { + // For each materialization, fetch the rows from the database: + Publisher rows = databaseClient.fetchRows(); + rows.subscribe(subscriber); + + return NotUsed.getInstance(); + }); + + public Source names() { + // rowSource can be re-used, since it will start a new + // query for each materialization, fully supporting backpressure + // for each materialized stream: + return rowSource.map(row -> row.getField("name")); + } + } + // #example +} diff --git a/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/AsSubscriber.scala b/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/AsSubscriber.scala new file mode 100644 index 0000000000..eba9be9c81 --- /dev/null +++ b/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/AsSubscriber.scala @@ -0,0 +1,43 @@ +/* + * Copyright (C) 2019 Lightbend Inc. + */ + +package docs.stream.operators.source; + +//#imports +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Publisher; + +import akka.NotUsed; +import akka.stream.scaladsl.Source; +import akka.stream.scaladsl.JavaFlowSupport; + +//#imports + +object AsSubscriber { + case class Row(name: String) + + class DatabaseClient { + def fetchRows(): Publisher[Row] = ??? + } + + val databaseClient: DatabaseClient = ???; + + // #example + val rowSource: Source[Row, NotUsed] = + JavaFlowSupport.Source.asSubscriber + .mapMaterializedValue( + (subscriber: Subscriber[Row]) => { + // For each materialization, fetch the rows from the database: + val rows: Publisher[Row] = databaseClient.fetchRows() + rows.subscribe(subscriber) + NotUsed + }); + + val names: Source[String, NotUsed] = + // rowSource can be re-used, since it will start a new + // query for each materialization, fully supporting backpressure + // for each materialized stream: + rowSource.map(row => row.name); + //#example +} diff --git a/akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala b/akka-stream-testkit/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala similarity index 100% rename from akka-stream-tests/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala rename to akka-stream-testkit/src/test/scala/akka/stream/impl/fusing/GraphInterpreterSpecKit.scala diff --git a/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java b/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java index 2d34e5d25a..18470105a6 100644 --- a/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java +++ b/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java @@ -52,7 +52,9 @@ public final class JavaFlowSupport { * See also {@code Source.asSubscriber} if wanting to integrate with {@link org.reactivestreams.Subscriber} instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ + //#asSubscriber public static akka.stream.javadsl.Source> asSubscriber() { + //#asSubscriber return akka.stream.javadsl.Source.asSubscriber().mapMaterializedValue(JavaFlowAndRsConverters::asJava); } } diff --git a/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala b/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala index 6a139ee1c3..09976d1ccb 100644 --- a/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala +++ b/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala @@ -46,7 +46,9 @@ object JavaFlowSupport { * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ - final def asSubscriber[T]: Source[T, juc.Flow.Subscriber[T]] = + //#asSubscriber + final def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = + //#asSubscriber scaladsl.Source.asSubscriber[T].mapMaterializedValue(_.asJava) } diff --git a/build.sbt b/build.sbt index b1c3ae3400..d7b7774e39 100644 --- a/build.sbt +++ b/build.sbt @@ -102,7 +102,8 @@ lazy val akkaScalaNightly = akkaModule("akka-scala-nightly") .disablePlugins(ValidatePullRequest, MimaPlugin, CopyrightHeaderInPr) lazy val benchJmh = akkaModule("akka-bench-jmh") - .dependsOn(Seq(actor, actorTyped, stream, streamTests, persistence, distributedData, jackson, testkit).map( + .enablePlugins(Jdk9) + .dependsOn(Seq(actor, actorTyped, stream, streamTestkit, persistence, distributedData, jackson, testkit).map( _ % "compile->compile;compile->test"): _*) .settings(Dependencies.benchJmh) .settings(javacOptions += "-parameters") // for Jackson @@ -170,6 +171,7 @@ lazy val distributedData = akkaModule("akka-distributed-data") .enablePlugins(MultiNodeScalaTest) lazy val docs = akkaModule("akka-docs") + .configs(akka.Jdk9.TestJdk9) .dependsOn( actor, cluster, @@ -180,6 +182,7 @@ lazy val docs = akkaModule("akka-docs") persistenceQuery, distributedData, stream, + stream % "TestJdk9->CompileJdk9", actorTyped, clusterTools % "compile->compile;test->test", clusterSharding % "compile->compile;test->test", @@ -201,7 +204,8 @@ lazy val docs = akkaModule("akka-docs") NoPublish, ParadoxBrowse, ScaladocNoVerificationOfDiagrams, - StreamOperatorsIndexGenerator) + StreamOperatorsIndexGenerator, + Jdk9) .disablePlugins(MimaPlugin, WhiteSourcePlugin) .disablePlugins(ScalafixPlugin) @@ -344,9 +348,14 @@ lazy val streamTestkit = akkaModule("akka-stream-testkit") .disablePlugins(MimaPlugin) lazy val streamTests = akkaModule("akka-stream-tests") - .dependsOn(streamTestkit % "test->test", remote % "test->test", stream) + .configs(akka.Jdk9.TestJdk9) + .dependsOn( + streamTestkit % "test->test", + remote % "test->test", + stream % "TestJdk9->CompileJdk9" + ) .settings(Dependencies.streamTests) - .enablePlugins(NoPublish) + .enablePlugins(NoPublish, Jdk9) .disablePlugins(MimaPlugin, WhiteSourcePlugin) lazy val streamTestsTck = akkaModule("akka-stream-tests-tck") diff --git a/project/Jdk9.scala b/project/Jdk9.scala index 8ef5b49905..f1591cd37f 100644 --- a/project/Jdk9.scala +++ b/project/Jdk9.scala @@ -10,7 +10,9 @@ import sbt.Keys._ object Jdk9 extends AutoPlugin { import JdkOptions.notOnJdk8 - lazy val CompileJdk9 = config("CompileJdk9").extend(Compile) + val CompileJdk9 = config("CompileJdk9").extend(Compile) + + val TestJdk9 = config("TestJdk9").extend(Test) val SCALA_SOURCE_DIRECTORY = "scala-jdk-9" val SCALA_TEST_SOURCE_DIRECTORY = "scala-jdk9-only" @@ -20,12 +22,26 @@ object Jdk9 extends AutoPlugin { val compileJdk9Settings = Seq( // following the scala-2.12, scala-sbt-1.0, ... convention unmanagedSourceDirectories := notOnJdk8( - Seq( - (Compile / sourceDirectory).value / SCALA_SOURCE_DIRECTORY, - (Compile / sourceDirectory).value / JAVA_SOURCE_DIRECTORY)), + Seq( + (Compile / sourceDirectory).value / SCALA_SOURCE_DIRECTORY, + (Compile / sourceDirectory).value / JAVA_SOURCE_DIRECTORY)), + scalacOptions := AkkaBuild.DefaultScalacOptions ++ notOnJdk8(Seq("-release", "11")), javacOptions := AkkaBuild.DefaultJavacOptions ++ notOnJdk8(Seq("--release", "11"))) + val testJdk9Settings = Seq( + // following the scala-2.12, scala-sbt-1.0, ... convention + unmanagedSourceDirectories := notOnJdk8( + Seq( + (Test / sourceDirectory).value / SCALA_TEST_SOURCE_DIRECTORY, + (Test / sourceDirectory).value / JAVA_TEST_SOURCE_DIRECTORY)), + + scalacOptions := AkkaBuild.DefaultScalacOptions ++ notOnJdk8(Seq("-release", "11")), + javacOptions := AkkaBuild.DefaultJavacOptions ++ notOnJdk8(Seq("--release", "11")), + classpathConfiguration := TestJdk9, + externalDependencyClasspath := (externalDependencyClasspath in Test).value + ) + val compileSettings = Seq( // It might have been more 'neat' to add the jdk9 products to the jar via packageBin/mappings, but that doesn't work with the OSGi plugin, // so we add them to the fullClasspath instead. @@ -38,5 +54,7 @@ object Jdk9 extends AutoPlugin { override lazy val projectSettings = inConfig(CompileJdk9)(Defaults.compileSettings) ++ inConfig(CompileJdk9)(compileJdk9Settings) ++ - compileSettings + compileSettings ++ + inConfig(TestJdk9)(Defaults.testSettings) ++ + inConfig(TestJdk9)(testJdk9Settings) } diff --git a/project/Paradox.scala b/project/Paradox.scala index 6e76d5a79f..ae50040c1e 100644 --- a/project/Paradox.scala +++ b/project/Paradox.scala @@ -25,6 +25,8 @@ object Paradox { "extref.ecs.base_url" -> "https://example.lightbend.com/v1/download/%s", "scaladoc.akka.base_url" -> "https://doc.akka.io/api/akka/2.6", "scaladoc.akka.http.base_url" -> "https://doc.akka.io/api/akka-http/current", + "javadoc.java.base_url" -> "https://docs.oracle.com/en/java/javase/11/docs/api/java.base/", + "javadoc.java.link_style" -> "direct", "javadoc.akka.base_url" -> "https://doc.akka.io/japi/akka/2.6", "javadoc.akka.link_style" -> "direct", "javadoc.akka.http.base_url" -> "https://doc.akka.io/japi/akka-http/current",