Add docs and examples for RS fromPublisher #25468
This commit is contained in:
parent
45636e5af4
commit
dba69dba1d
8 changed files with 155 additions and 31 deletions
|
|
@ -1,22 +1,16 @@
|
||||||
# asSubscriber
|
# asSubscriber
|
||||||
|
|
||||||
Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.
|
Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).
|
||||||
|
|
||||||
@ref[Source operators](../index.md#source-operators)
|
@ref[Source operators](../index.md#source-operators)
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
@@@ div { .group-scala }
|
Scala
|
||||||
|
: @@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber }
|
||||||
|
|
||||||
@@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #asSubscriber }
|
Java
|
||||||
|
: @@snip[JavaFlowSupport.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/AsSubscriber.java) { #api }
|
||||||
@@@
|
|
||||||
|
|
||||||
@@@ div { .group-java }
|
|
||||||
|
|
||||||
@@snip[JavaFlowSupport.java](/akka-stream/src/main/java-jdk-9/akka/stream/javadsl/JavaFlowSupport.java) { #asSubscriber }
|
|
||||||
|
|
||||||
@@@
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
|
|
@ -28,10 +22,12 @@ This @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) can be attached
|
||||||
[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher)
|
[Reactive Streams](https://www.reactive-streams.org/) @javadoc[Publisher](java.util.concurrent.Flow.Publisher)
|
||||||
to populate it.
|
to populate it.
|
||||||
|
|
||||||
|
If the API you want to consume elements from provides a @javadoc[Publisher](java.util.concurrent.Flow.Publisher) instead of accepting a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber), see @ref[fromPublisher](fromPublisher.md).
|
||||||
|
|
||||||
@@@ note
|
@@@ note
|
||||||
|
|
||||||
For JDK 8 users: since @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced in JDK version 9,
|
For JDK 8 users: since @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced in JDK version 9,
|
||||||
if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with `Source.asSubscriber` and `Flow.asSubscriber`.
|
if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.asSubscriber](Source$) { scala="#asSubscriber[T]:akka.stream.scaladsl.Source[T,org.reactivestreams.Subscriber[T]]" java="#asSubscriber()" }.
|
||||||
|
|
||||||
@@@
|
@@@
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -1,17 +1,47 @@
|
||||||
# fromPublisher
|
# fromPublisher
|
||||||
|
|
||||||
Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`.
|
Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).
|
||||||
|
|
||||||
@ref[Source operators](../index.md#source-operators)
|
@ref[Source operators](../index.md#source-operators)
|
||||||
|
|
||||||
@@@div { .group-scala }
|
|
||||||
|
|
||||||
## Signature
|
## Signature
|
||||||
|
|
||||||
@@signature [Source.scala](/akka-stream/src/main/scala/akka/stream/scaladsl/Source.scala) { #fromPublisher }
|
Scala
|
||||||
|
: @@snip[JavaFlowSupport.scala](/akka-stream/src/main/scala-jdk-9/akka/stream/scaladsl/JavaFlowSupport.scala) { #fromPublisher }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip[JavaFlowSupport.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java) { #api }
|
||||||
|
|
||||||
@@@
|
|
||||||
|
|
||||||
## Description
|
## Description
|
||||||
|
|
||||||
TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646
|
If you want to create a @apidoc[Source] that gets its elements from another library that supports
|
||||||
|
[Reactive Streams](https://www.reactive-streams.org/), you can use `JavaFlowSupport.Source.fromPublisher`.
|
||||||
|
This source will produce the elements from the @javadoc[Publisher](java.util.concurrent.Flow.Publisher),
|
||||||
|
and coordinate backpressure as needed.
|
||||||
|
|
||||||
|
If the API you want to consume elements from accepts a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber) instead of providing a @javadoc[Publisher](java.util.concurrent.Flow.Publisher), see @ref[asSubscriber](asSubscriber.md).
|
||||||
|
|
||||||
|
@@@ note
|
||||||
|
|
||||||
|
For JDK 8 users: since @javadoc[java.util.concurrent.Flow](java.util.concurrent.Flow) was introduced in JDK version 9,
|
||||||
|
if you are still on version 8 you may use the [org.reactivestreams](https://github.com/reactive-streams/reactive-streams-jvm#reactive-streams) library with @apidoc[Source.fromPublisher](Source$) { scala="#fromPublisher[T](publisher:org.reactivestreams.Publisher[T]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#fromPublisher(org.reactivestreams.Publisher)" }.
|
||||||
|
|
||||||
|
@@@
|
||||||
|
|
||||||
|
## Example
|
||||||
|
|
||||||
|
Suppose we use a database client that supports [Reactive Streams](https://www.reactive-streams.org/),
|
||||||
|
we could create a @apidoc[Source] that queries the database for its rows. That @apidoc[Source] can then
|
||||||
|
be used for further processing, for example creating a @apidoc[Source] that contains the names of the
|
||||||
|
rows.
|
||||||
|
|
||||||
|
Because both the database driver and Akka Streams support [Reactive Streams](https://www.reactive-streams.org/),
|
||||||
|
backpressure is applied throughout the stream, preventing us from running out of memory when the database
|
||||||
|
rows are consumed slower than they are produced by the database.
|
||||||
|
|
||||||
|
Scala
|
||||||
|
: @@snip [FromPublisher.scala](/akka-docs/src/test/scala-jdk9-only/docs/stream/operators/source/FromPublisher.scala) { #imports #example }
|
||||||
|
|
||||||
|
Java
|
||||||
|
: @@snip [FromPublisher.java](/akka-docs/src/test/java-jdk9-only/jdocs/stream/operators/source/FromPublisher.java) { #imports #example }
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|
||||||
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|
|Source|<a name="actorref"></a>@ref[actorRef](Source/actorRef.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream.|
|
||||||
|Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
|Source|<a name="actorrefwithbackpressure"></a>@ref[actorRefWithBackpressure](Source/actorRefWithBackpressure.md)|Materialize an `ActorRef`; sending messages to it will emit them on the stream. The source acknowledges reception after emitting a message, to provide back pressure from the source.|
|
||||||
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|
|Source|<a name="assourcewithcontext"></a>@ref[asSourceWithContext](Source/asSourceWithContext.md)|Turns a Source into a SourceWithContext which can propagate a context per element along a stream.|
|
||||||
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a `org.reactivestreams.Subscriber`.|
|
|Source|<a name="assubscriber"></a>@ref[asSubscriber](Source/asSubscriber.md)|Integration with Reactive Streams, materializes into a @javadoc[Subscriber](java.util.concurrent.Flow.Subscriber).|
|
||||||
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|
|Source|<a name="combine"></a>@ref[combine](Source/combine.md)|Combine several sources, using a given strategy such as merge or concat, into one source.|
|
||||||
|Source|<a name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.|
|
|Source|<a name="completionstage"></a>@ref[completionStage](Source/completionStage.md)|Send the single value of the `CompletionStage` when it completes and there is demand.|
|
||||||
|Source|<a name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.|
|
|Source|<a name="completionstagesource"></a>@ref[completionStageSource](Source/completionStageSource.md)|Streams the elements of an asynchronous source once its given *completion* operator completes.|
|
||||||
|
|
@ -18,16 +18,16 @@ These built-in sources are available from @scala[`akka.stream.scaladsl.Source`]
|
||||||
|Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.|
|
|Source|<a name="empty"></a>@ref[empty](Source/empty.md)|Complete right away without ever emitting any elements.|
|
||||||
|Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with a user specified exception.|
|
|Source|<a name="failed"></a>@ref[failed](Source/failed.md)|Fail directly with a user specified exception.|
|
||||||
|Source|<a name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].|
|
|Source|<a name="from"></a>@ref[@scala[apply]@java[from]](Source/from.md)|Stream the values of an @scala[`immutable.Seq`]@java[`Iterable`].|
|
||||||
|Source|<a name="fromcompletionstage"></a>@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated by @ref:[`Source.completionStage`](Source/completionStage.md).|
|
|Source|<a name="fromcompletionstage"></a>@ref[fromCompletionStage](Source/fromCompletionStage.md)|Deprecated by @ref[`Source.completionStage`](Source/completionStage.md).|
|
||||||
|Source|<a name="fromfuture"></a>@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).|
|
|Source|<a name="fromfuture"></a>@ref[fromFuture](Source/fromFuture.md)|Deprecated by @ref[`Source.future`](Source/future.md).|
|
||||||
|Source|<a name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref:[`Source.futureSource`](Source/futureSource.md).|
|
|Source|<a name="fromfuturesource"></a>@ref[fromFutureSource](Source/fromFutureSource.md)|Deprecated by @ref[`Source.futureSource`](Source/futureSource.md).|
|
||||||
|Source|<a name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.|
|
|Source|<a name="fromiterator"></a>@ref[fromIterator](Source/fromIterator.md)|Stream the values from an `Iterator`, requesting the next value when there is demand.|
|
||||||
|Source|<a name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a `org.reactivestreams.Publisher`.|
|
|Source|<a name="frompublisher"></a>@ref[fromPublisher](Source/fromPublisher.md)|Integration with Reactive Streams, subscribes to a @javadoc[Publisher](java.util.concurrent.Flow.Publisher).|
|
||||||
|Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref:[`Source.completionStageSource`](Source/completionStageSource.md).|
|
|Source|<a name="fromsourcecompletionstage"></a>@ref[fromSourceCompletionStage](Source/fromSourceCompletionStage.md)|Deprecated by @ref[`Source.completionStageSource`](Source/completionStageSource.md).|
|
||||||
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.|
|
|Source|<a name="future"></a>@ref[future](Source/future.md)|Send the single value of the `Future` when it completes and there is demand.|
|
||||||
|Source|<a name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.|
|
|Source|<a name="futuresource"></a>@ref[futureSource](Source/futureSource.md)|Streams the elements of the given future source once it successfully completes.|
|
||||||
|Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Deprecated by @ref:[`Source.lazySource`](Source/lazySource.md).|
|
|Source|<a name="lazily"></a>@ref[lazily](Source/lazily.md)|Deprecated by @ref[`Source.lazySource`](Source/lazySource.md).|
|
||||||
|Source|<a name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref:[`Source.lazyFutureSource`](Source/lazyFutureSource.md).|
|
|Source|<a name="lazilyasync"></a>@ref[lazilyAsync](Source/lazilyAsync.md)|Deprecated by @ref[`Source.lazyFutureSource`](Source/lazyFutureSource.md).|
|
||||||
|Source|<a name="lazycompletionstage"></a>@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers creation of a future of a single element source until there is demand.|
|
|Source|<a name="lazycompletionstage"></a>@ref[lazyCompletionStage](Source/lazyCompletionStage.md)|Defers creation of a future of a single element source until there is demand.|
|
||||||
|Source|<a name="lazycompletionstagesource"></a>@ref[lazyCompletionStageSource](Source/lazyCompletionStageSource.md)|Defers creation of a future source until there is demand.|
|
|Source|<a name="lazycompletionstagesource"></a>@ref[lazyCompletionStageSource](Source/lazyCompletionStageSource.md)|Defers creation of a future source until there is demand.|
|
||||||
|Source|<a name="lazyfuture"></a>@ref[lazyFuture](Source/lazyFuture.md)|Defers creation of a future of a single element source until there is demand.|
|
|Source|<a name="lazyfuture"></a>@ref[lazyFuture](Source/lazyFuture.md)|Defers creation of a future of a single element source until there is demand.|
|
||||||
|
|
@ -75,7 +75,7 @@ These built-in sinks are available from @scala[`akka.stream.scaladsl.Sink`] @jav
|
||||||
|Sink|<a name="lastoption"></a>@ref[lastOption](Sink/lastOption.md)|Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the last value emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes.|
|
|Sink|<a name="lastoption"></a>@ref[lastOption](Sink/lastOption.md)|Materialize a @scala[`Future[Option[T]]`] @java[`CompletionStage<Optional<T>>`] which completes with the last value emitted wrapped in an @scala[`Some`] @java[`Optional`] when the stream completes.|
|
||||||
|Sink|<a name="lazycompletionstagesink"></a>@ref[lazyCompletionStageSink](Sink/lazyCompletionStageSink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|
|Sink|<a name="lazycompletionstagesink"></a>@ref[lazyCompletionStageSink](Sink/lazyCompletionStageSink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|
||||||
|Sink|<a name="lazyfuturesink"></a>@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|
|Sink|<a name="lazyfuturesink"></a>@ref[lazyFutureSink](Sink/lazyFutureSink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|
||||||
|Sink|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref:[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
|
|Sink|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Sink/lazyInitAsync.md)|Deprecated by @ref[`Sink.lazyFutureSink`](Sink/lazyFutureSink.md).|
|
||||||
|Sink|<a name="lazysink"></a>@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|
|Sink|<a name="lazysink"></a>@ref[lazySink](Sink/lazySink.md)|Defers creation and materialization of a `Sink` until there is a first element.|
|
||||||
|Sink|<a name="oncomplete"></a>@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
|
|Sink|<a name="oncomplete"></a>@ref[onComplete](Sink/onComplete.md)|Invoke a callback when the stream has completed or failed.|
|
||||||
|Sink|<a name="prematerialize"></a>@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
|
|Sink|<a name="prematerialize"></a>@ref[preMaterialize](Sink/preMaterialize.md)|Materializes this Sink, immediately returning (1) its materialized value, and (2) a new Sink that can be consume elements 'into' the pre-materialized one.|
|
||||||
|
|
@ -160,7 +160,7 @@ depending on being backpressured by downstream or not.
|
||||||
|Flow|<a name="lazycompletionstageflow"></a>@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|
|Flow|<a name="lazycompletionstageflow"></a>@ref[lazyCompletionStageFlow](Flow/lazyCompletionStageFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|
||||||
|Flow|<a name="lazyflow"></a>@ref[lazyFlow](Flow/lazyFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|
|Flow|<a name="lazyflow"></a>@ref[lazyFlow](Flow/lazyFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|
||||||
|Flow|<a name="lazyfutureflow"></a>@ref[lazyFutureFlow](Flow/lazyFutureFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|
|Flow|<a name="lazyfutureflow"></a>@ref[lazyFutureFlow](Flow/lazyFutureFlow.md)|Defers creation and materialization of a `Flow` until there is a first element.|
|
||||||
|Flow|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Deprecated by @ref:[`Flow.lazyFutureFlow`](Flow/lazyFutureFlow.md) in combination with @ref:[`prefixAndTail`](Flow/../Source-or-Flow/prefixAndTail.md).|
|
|Flow|<a name="lazyinitasync"></a>@ref[lazyInitAsync](Flow/lazyInitAsync.md)|Deprecated by @ref[`Flow.lazyFutureFlow`](Flow/lazyFutureFlow.md) in combination with @ref[`prefixAndTail`](Flow/../Source-or-Flow/prefixAndTail.md).|
|
||||||
|Source/Flow|<a name="limit"></a>@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.|
|
|Source/Flow|<a name="limit"></a>@ref[limit](Source-or-Flow/limit.md)|Limit number of element from upstream to given `max` number.|
|
||||||
|Source/Flow|<a name="limitweighted"></a>@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Limit the total weight of incoming elements|
|
|Source/Flow|<a name="limitweighted"></a>@ref[limitWeighted](Source-or-Flow/limitWeighted.md)|Limit the total weight of incoming elements|
|
||||||
|Source/Flow|<a name="log"></a>@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.|
|
|Source/Flow|<a name="log"></a>@ref[log](Source-or-Flow/log.md)|Log elements flowing through the stream as well as completion and erroring.|
|
||||||
|
|
|
||||||
|
|
@ -17,6 +17,8 @@ import akka.stream.javadsl.JavaFlowSupport;
|
||||||
import org.apache.commons.lang.NotImplementedException;
|
import org.apache.commons.lang.NotImplementedException;
|
||||||
|
|
||||||
public interface AsSubscriber {
|
public interface AsSubscriber {
|
||||||
|
// We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in the API,
|
||||||
|
// because we're not publishing those (jdk9+) classes in our API docs yet.
|
||||||
static class JavaFlowSupport {
|
static class JavaFlowSupport {
|
||||||
public static final class Source {
|
public static final class Source {
|
||||||
public
|
public
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,58 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package jdocs.stream.operators.source;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
import java.util.concurrent.Flow.Publisher;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.stream.javadsl.Source;
|
||||||
|
import akka.stream.javadsl.JavaFlowSupport;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
import org.apache.commons.lang.NotImplementedException;
|
||||||
|
|
||||||
|
public interface FromPublisher {
|
||||||
|
// We are 'faking' the JavaFlowSupport API here so we can include the signature as a snippet in the API,
|
||||||
|
// because we're not publishing those (jdk9+) classes in our API docs yet.
|
||||||
|
static class JavaFlowSupport {
|
||||||
|
public static final class Source {
|
||||||
|
public
|
||||||
|
// #api
|
||||||
|
static <T> akka.stream.javadsl.Source<T, NotUsed> fromPublisher(Publisher<T> publisher)
|
||||||
|
// #api
|
||||||
|
{
|
||||||
|
return akka.stream.javadsl.JavaFlowSupport.Source.<T>fromPublisher(publisher);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class Row {
|
||||||
|
public String getField(String fieldName) {
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
static class DatabaseClient {
|
||||||
|
Publisher<Row> fetchRows() {
|
||||||
|
throw new NotImplementedException();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
DatabaseClient databaseClient = null;
|
||||||
|
|
||||||
|
// #example
|
||||||
|
class Example {
|
||||||
|
public Source<String, NotUsed> names() {
|
||||||
|
// A new subscriber will subscribe to the supplied publisher for each
|
||||||
|
// materialization, so depending on whether the database client supports
|
||||||
|
// this the Source can be materialized more than once.
|
||||||
|
return JavaFlowSupport.Source.<Row>fromPublisher(databaseClient.fetchRows())
|
||||||
|
.map(row -> row.getField("name"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// #example
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
/*
|
||||||
|
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package docs.stream.operators.source;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
import java.util.concurrent.Flow.Subscriber;
|
||||||
|
import java.util.concurrent.Flow.Publisher;
|
||||||
|
|
||||||
|
import akka.NotUsed;
|
||||||
|
import akka.stream.scaladsl.Source;
|
||||||
|
import akka.stream.scaladsl.JavaFlowSupport;
|
||||||
|
|
||||||
|
//#imports
|
||||||
|
|
||||||
|
object FromPublisher {
|
||||||
|
case class Row(name: String)
|
||||||
|
|
||||||
|
class DatabaseClient {
|
||||||
|
def fetchRows(): Publisher[Row] = ???
|
||||||
|
}
|
||||||
|
|
||||||
|
val databaseClient: DatabaseClient = ???
|
||||||
|
|
||||||
|
// #example
|
||||||
|
val names: Source[String, NotUsed] =
|
||||||
|
// A new subscriber will subscribe to the supplied publisher for each
|
||||||
|
// materialization, so depending on whether the database client supports
|
||||||
|
// this the Source can be materialized more than once.
|
||||||
|
JavaFlowSupport.Source.fromPublisher(databaseClient.fetchRows())
|
||||||
|
.map(row => row.name);
|
||||||
|
//#example
|
||||||
|
}
|
||||||
|
|
@ -37,8 +37,11 @@ object JavaFlowSupport {
|
||||||
* @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead
|
* @see See also [[Source.fromPublisher]] if wanting to integrate with [[org.reactivestreams.Publisher]] instead
|
||||||
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
||||||
*/
|
*/
|
||||||
final def fromPublisher[T](publisher: juc.Flow.Publisher[T]): Source[T, NotUsed] =
|
final
|
||||||
scaladsl.Source.fromPublisher(publisher.asRs)
|
//#fromPublisher
|
||||||
|
def fromPublisher[T](publisher: java.util.concurrent.Flow.Publisher[T]): Source[T, NotUsed] =
|
||||||
|
//#fromPublisher
|
||||||
|
scaladsl.Source.fromPublisher(publisher.asRs)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]]
|
* Creates a `Source` that is materialized as a [[java.util.concurrent.Flow.Subscriber]]
|
||||||
|
|
@ -46,8 +49,9 @@ object JavaFlowSupport {
|
||||||
* @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead
|
* @see See also [[Source.asSubscriber]] if wanting to integrate with [[org.reactivestreams.Subscriber]] instead
|
||||||
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
* (which carries the same semantics, however existed before RS's inclusion in Java 9).
|
||||||
*/
|
*/
|
||||||
|
final
|
||||||
//#asSubscriber
|
//#asSubscriber
|
||||||
final def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] =
|
def asSubscriber[T]: Source[T, java.util.concurrent.Flow.Subscriber[T]] =
|
||||||
//#asSubscriber
|
//#asSubscriber
|
||||||
scaladsl.Source.asSubscriber[T].mapMaterializedValue(_.asJava)
|
scaladsl.Source.asSubscriber[T].mapMaterializedValue(_.asJava)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -251,7 +251,7 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
|
||||||
// This forces the short description to be on a single line. We could make this smarter,
|
// This forces the short description to be on a single line. We could make this smarter,
|
||||||
// but 'forcing' the short description to be really short seems nice as well.
|
// but 'forcing' the short description to be really short seems nice as well.
|
||||||
val description = lines(2)
|
val description = lines(2)
|
||||||
.replaceAll("]\\(", "](" + file.getAbsolutePath.replaceFirst(".*/([^/]+/).*", "$1"))
|
.replaceAll("ref:?\\[(.*?)\\]\\(", "ref[$1](" + file.getAbsolutePath.replaceFirst(".*/([^/]+/).*", "$1"))
|
||||||
require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line")
|
require(!description.isEmpty, s"description in $file must be non-empty, single-line description at the 3rd line")
|
||||||
val categoryLink = lines(4)
|
val categoryLink = lines(4)
|
||||||
require(
|
require(
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue