diff --git a/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md b/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md index e198d69426..bdacf6cc6b 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/asSubscriber.md @@ -1,22 +1,16 @@ # asSubscriber -Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`. +Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber). @ref[Source operators](../index.md#source-operators) ## Signature -@@@ div { .group-scala } +Scala +: @@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } -@@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber } - -@@@ - -@@@ div { .group-java } - -@@snip[JavaFlowSupport.java](/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java) { #asSubscriber } - -@@@ +Java +: @@snip[JavaFlowSupport.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java) { #api } ## Description @@ -28,10 +22,12 @@ This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached [Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher) to populate it. +If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md). + @@@ note For JDK 8 users: since @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced in JDK version 9, -if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with `Source.asSubscriber` and `Flow.asSubscriber`. +if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:akka.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }. @@@ diff --git a/akka-docs/src/main/paradox/stream/operators/Source/fromPublisher.md b/akka-docs/src/main/paradox/stream/operators/Source/fromPublisher.md index 511834d145..f59510c55f 100644 --- a/akka-docs/src/main/paradox/stream/operators/Source/fromPublisher.md +++ b/akka-docs/src/main/paradox/stream/operators/Source/fromPublisher.md @@ -1,17 +1,47 @@ # fromPublisher -Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`. +Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher). @ref[Source operators](../index.md#source-operators) -@@@div { .group-scala } - ## Signature -@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #fromPublisher } +Scala +: @@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #fromPublisher } + +Java +: @@snip[JavaFlowSupport.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java) { #api } -@@@ ## Description -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 +If you want to create a @apidoc[Source] that gets its elements from another library that supports +[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.fromPublisher`. +This source will produce the elements from the @javadoc[Publisher](java.util.concurrent.Flow.Publisher), +and coordinate backpressure as needed. + +If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asSubscriber](asSubscriber.md). + +@@@ note + +For JDK 8 users: since @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced in JDK version 9, +if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.fromPublisher](Source$) { scala="#fromPublisher[T](publisher:org.reactivestreams.Publisher[T]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromPublisher(org.reactivestreams.Publisher)" }. + +@@@ + +## Example + +Suppose we use a database client that supports [Reactive Streams](https://www.reactive-streams.org/), +we could create a @apidoc[Source] that queries the database for its rows. That @apidoc[Source] can then +be used for further processing, for example creating a @apidoc[Source] that contains the names of the +rows. + +Because both the database driver and Akka Streams support [Reactive Streams](https://www.reactive-streams.org/), +backpressure is applied throughout the stream, preventing us from running out of memory when the database +rows are consumed slower than they are produced by the database. + +Scala +: @@snip [FromPublisher.scala](/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/FromPublisher.scala) { #imports #example } + +Java +: @@snip [FromPublisher.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java) { #imports #example } diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index e1455e551b..c1dacfa03c 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -10,7 +10,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.| |Source|@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.| |Source|@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.| -|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.| +|Source|@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).| |Source|@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.| |Source|@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.| |Source|@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.| @@ -18,16 +18,16 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`] |Source|@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.| |Source|@ref[failed](Source/failed.md)|Fail directly with a user specified exception.| |Source|@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].| -|Source|@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated by @ref:[`Source.completionStage`](Source/completionStage.md).| +|Source|@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated by @ref[`Source.completionStage`](Source/completionStage.md).| |Source|@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).| -|Source|@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref:[`Source.futureSource`](Source/futureSource.md).| +|Source|@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).| |Source|@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.| -|Source|@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`.| -|Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref:[`Source.completionStageSource`](Source/completionStageSource.md).| +|Source|@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).| +|Source|@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).| |Source|@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.| |Source|@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.| -|Source|@ref[lazily](Source/lazily.md)|Deprecated by @ref:[`Source.lazySource`](Source/lazySource.md).| -|Source|@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref:[`Source.lazyFutureSource`](Source/lazyFutureSource.md).| +|Source|@ref[lazily](Source/lazily.md)|Deprecated by @ref[`Source.lazySource`](Source/lazySource.md).| +|Source|@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).| |Source|@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers creation of a future of a single element source until there is demand.| |Source|@ref[lazyCompletionStageSource](Source/lazyCompletionStageSource.md)|Defers creation of a future source until there is demand.| |Source|@ref[lazyFuture](Source/lazyFuture.md)|Defers creation of a future of a single element source until there is demand.| @@ -75,7 +75,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav |Sink|@ref[lastOption](Sink/lastOption.md)|Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage>`] which completes with the last value emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes.| |Sink|@ref[lazyCompletionStageSink](Sink/lazyCompletionStageSink.md)|Defers creation and materialization of a `Sink` until there is a first element.| |Sink|@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.| -|Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref:[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).| +|Sink|@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).| |Sink|@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.| |Sink|@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.| |Sink|@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.| @@ -160,7 +160,7 @@ depending on being backpressured by downstream or not. |Flow|@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| |Flow|@ref[lazyFlow](Flow/lazyFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| |Flow|@ref[lazyFutureFlow](Flow/lazyFutureFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.| -|Flow|@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Deprecated by @ref:[`Flow.lazyFutureFlow`](Flow/lazyFutureFlow.md) in combination with @ref:[`prefixAndTail`](Flow/../Source-or-Flow/prefixAndTail.md).| +|Flow|@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Deprecated by @ref[`Flow.lazyFutureFlow`](Flow/lazyFutureFlow.md) in combination with @ref[`prefixAndTail`](Flow/../Source-or-Flow/prefixAndTail.md).| |Source/Flow|@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.| |Source/Flow|@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Limit the total weight of incoming elements| |Source/Flow|@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.| diff --git a/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java b/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java index 31fe8c12a2..529d2cb939 100644 --- a/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java +++ b/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java @@ -17,6 +17,8 @@ import akka.stream.javadsl.JavaFlowSupport; import org.apache.commons.lang.NotImplementedException; public interface AsSubscriber { + // We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in the API, + // because we're not publishing those (jdk9+) classes in our API docs yet. static class JavaFlowSupport { public static final class Source { public diff --git a/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java b/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java new file mode 100644 index 0000000000..318608e9bb --- /dev/null +++ b/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java @@ -0,0 +1,58 @@ +/* + * Copyright (C) 2019-2020 Lightbend Inc. + */ + +package jdocs.stream.operators.source; + +//#imports +import java.util.concurrent.Flow.Publisher; + +import akka.NotUsed; +import akka.stream.javadsl.Source; +import akka.stream.javadsl.JavaFlowSupport; + +//#imports + +import org.apache.commons.lang.NotImplementedException; + +public interface FromPublisher { + // We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in the API, + // because we're not publishing those (jdk9+) classes in our API docs yet. + static class JavaFlowSupport { + public static final class Source { + public + // #api + static akka.stream.javadsl.Source fromPublisher(Publisher publisher) + // #api + { + return akka.stream.javadsl.JavaFlowSupport.Source.fromPublisher(publisher); + } + } + } + + static class Row { + public String getField(String fieldName) { + throw new NotImplementedException(); + } + } + + static class DatabaseClient { + Publisher fetchRows() { + throw new NotImplementedException(); + } + } + + DatabaseClient databaseClient = null; + + // #example + class Example { + public Source names() { + // A new subscriber will subscribe to the supplied publisher for each + // materialization, so depending on whether the database client supports + // this the Source can be materialized more than once. + return JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) + .map(row -> row.getField("name")); + } + } + // #example +} diff --git a/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/FromPublisher.scala b/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/FromPublisher.scala new file mode 100644 index 0000000000..c4c4488d27 --- /dev/null +++ b/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/FromPublisher.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2020 Lightbend Inc. + */ + +package docs.stream.operators.source; + +//#imports +import java.util.concurrent.Flow.Subscriber; +import java.util.concurrent.Flow.Publisher; + +import akka.NotUsed; +import akka.stream.scaladsl.Source; +import akka.stream.scaladsl.JavaFlowSupport; + +//#imports + +object FromPublisher { + case class Row(name: String) + + class DatabaseClient { + def fetchRows(): Publisher[Row] = ??? + } + + val databaseClient: DatabaseClient = ??? + + // #example + val names: Source[String, NotUsed] = + // A new subscriber will subscribe to the supplied publisher for each + // materialization, so depending on whether the database client supports + // this the Source can be materialized more than once. + JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows()) + .map(row => row.name); + //#example +} diff --git a/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala b/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala index 6951f1c0fa..01f0dad606 100644 --- a/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala +++ b/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala @@ -37,8 +37,11 @@ object JavaFlowSupport { * @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ - final def fromPublisher[T](publisher: juc.Flow.Publisher[T]): Source[T, NotUsed] = - scaladsl.Source.fromPublisher(publisher.asRs) + final + //#fromPublisher + def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] = + //#fromPublisher + scaladsl.Source.fromPublisher(publisher.asRs) /** * Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]] @@ -46,8 +49,9 @@ object JavaFlowSupport { * @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead * (which carries the same semantics, however existed before RS's inclusion in Java 9). */ + final //#asSubscriber - final def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = + def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] = //#asSubscriber scaladsl.Source.asSubscriber[T].mapMaterializedValue(_.asJava) } diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 243e471c40..760fc4ac6e 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -251,7 +251,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { // This forces the short description to be on a single line. We could make this smarter, // but 'forcing' the short description to be really short seems nice as well. val description = lines(2) - .replaceAll("]\\(", "](" + file.getAbsolutePath.replaceFirst(".*/([^/]+/).*", "$1")) + .replaceAll("ref:?\\[(.*?)\\]\\(", "ref[$1](" + file.getAbsolutePath.replaceFirst(".*/([^/]+/).*", "$1")) require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line") val categoryLink = lines(4) require(