Add Sink.asPublisher example and update doc (#30105)

Co-authored-by: Renato Cavalcanti <renato@cavalcanti.be>
This commit is contained in:
Muskan Gupta 2021-05-13 00:46:41 +05:30 committed by GitHub
parent 240378f062
commit dd8b514e89
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 77 additions and 1 deletions

View file

@ -12,4 +12,31 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ
## Description
TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646
This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](http://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html).
Generally, in Akka Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run.
Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Akka stream through the `fanout` parameter.
In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is available through [Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html).
Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available
through @scala[`akka.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`akka.stream.javadsl.JavaFlowSupport.Sink#asPublisher`].
## Example
In the example we are using a source and then creating a Publisher. After that, we see that when `fanout` is true multiple subscribers can subscribe to it,
but when it is false only the first subscriber will be able to subscribe and others will be rejected.
Scala
: @@snip [AsPublisher.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala) { #asPublisher }
Java
: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #asPublisher }
## Reactive Streams semantics
@@@div { .callout }
**emits** the materialized publisher
**completes** after the source is consumed and materialized publisher is created
@@@

View file

@ -7,11 +7,13 @@ package jdocs.stream.operators;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.stream.javadsl.AsPublisher;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;
// #takeLast-operator-example
import akka.japi.Pair;
import org.reactivestreams.Publisher;
// #takeLast-operator-example
import java.util.*;
import java.util.concurrent.CompletableFuture;
@ -139,6 +141,26 @@ public class SinkDocExamples {
// #ignore
}
static void asPublisherExample() {
// #asPublisher
Source<Integer, NotUsed> source = Source.range(1, 5);
Publisher<Integer> publisherFalse =
source.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system);
CompletionStage<Integer> resultFromFirstSubscriberFalse =
Source.fromPublisher(publisherFalse)
.runWith(Sink.fold(0, (acc, element) -> acc + element), system);
CompletionStage<Integer> resultFromSecondSubscriberFalse =
Source.fromPublisher(publisherFalse)
.runWith(Sink.fold(1, (acc, element) -> acc * element), system);
resultFromFirstSubscriberFalse.thenAccept(System.out::println); // 15
resultFromSecondSubscriberFalse.thenAccept(
System.out
::println); // No output, because the source was not able to subscribe to the publisher.
// #asPublisher
}
private static Source<String, NotUsed> readLinesFromFile() {
return Source.empty();
}

View file

@ -0,0 +1,27 @@
/*
* Copyright (C) 2020-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.stream.operators.sink
import scala.concurrent.{ ExecutionContextExecutor, Future }
import akka.actor.ActorSystem
import akka.stream.scaladsl.{ Sink, Source }
object AsPublisher {
implicit val system: ActorSystem = ???
implicit val ec: ExecutionContextExecutor = system.dispatcher
def asPublisherExample() = {
def asPublisherExample() = {
//#asPublisher
val source = Source(1 to 5)
val publisher = source.runWith(Sink.asPublisher(false))
Source.fromPublisher(publisher).runWith(Sink.foreach(println)) // 1 2 3 4 5
Source
.fromPublisher(publisher)
.runWith(Sink.foreach(println)) //No output, because the source was not able to subscribe to the publisher.
//#asPublisher
}
}
}