From dd8b514e89b0ece91b981dddb05b8097d3c187c6 Mon Sep 17 00:00:00 2001 From: Muskan Gupta Date: Thu, 13 May 2021 00:46:41 +0530 Subject: [PATCH] Add Sink.asPublisher example and update doc (#30105) Co-authored-by: Renato Cavalcanti --- .../stream/operators/Sink/asPublisher.md | 29 ++++++++++++++++++- .../stream/operators/SinkDocExamples.java | 22 ++++++++++++++ .../stream/operators/sink/AsPublisher.scala | 27 +++++++++++++++++ 3 files changed, 77 insertions(+), 1 deletion(-) create mode 100644 akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala diff --git a/akka-docs/src/main/paradox/stream/operators/Sink/asPublisher.md b/akka-docs/src/main/paradox/stream/operators/Sink/asPublisher.md index 8217c2bdf5..4120006846 100644 --- a/akka-docs/src/main/paradox/stream/operators/Sink/asPublisher.md +++ b/akka-docs/src/main/paradox/stream/operators/Sink/asPublisher.md @@ -12,4 +12,31 @@ Integration with Reactive Streams, materializes into a `org.reactivestreams.Publ ## Description -TODO: We would welcome help on contributing descriptions and examples, see: https://github.com/akka/akka/issues/25646 +This method gives you the capability to publish the data from the `Sink` through a Reactive Streams [Publisher](http://www.reactive-streams.org/reactive-streams-1.0.3-javadoc/org/reactivestreams/Publisher.html). +Generally, in Akka Streams a `Sink` is considered a subscriber, which consumes the data from source. To integrate with other Reactive Stream implementations `Sink.asPublisher` provides a `Publisher` materialized value when run. +Now, the data from this publisher can be consumed by subscribing to it. We can control if we allow more than one downstream subscriber from the single running Akka stream through the `fanout` parameter. +In Java 9, the Reactive Stream API was included in the JDK, and `Publisher` is available through [Flow.Publisher](https://docs.oracle.com/javase/9/docs/api/java/util/concurrent/Flow.Publisher.html). +Since those APIs are identical but exist at different package namespaces and does not depend on the Reactive Streams package a separate publisher sink for those is available +through @scala[`akka.stream.scaladsl.JavaFlowSupport.Sink#asPublisher`]@java[`akka.stream.javadsl.JavaFlowSupport.Sink#asPublisher`]. + + +## Example + +In the example we are using a source and then creating a Publisher. After that, we see that when `fanout` is true multiple subscribers can subscribe to it, +but when it is false only the first subscriber will be able to subscribe and others will be rejected. + +Scala +: @@snip [AsPublisher.scala](/akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala) { #asPublisher } + +Java +: @@snip [SinkDocExamples.java](/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java) { #asPublisher } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** the materialized publisher + +**completes** after the source is consumed and materialized publisher is created + +@@@ diff --git a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java index baddcdb7ec..02c5fd984d 100644 --- a/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java +++ b/akka-docs/src/test/java/jdocs/stream/operators/SinkDocExamples.java @@ -7,11 +7,13 @@ package jdocs.stream.operators; import akka.NotUsed; import akka.actor.ActorSystem; +import akka.stream.javadsl.AsPublisher; import akka.stream.javadsl.Flow; import akka.stream.javadsl.Sink; import akka.stream.javadsl.Source; // #takeLast-operator-example import akka.japi.Pair; +import org.reactivestreams.Publisher; // #takeLast-operator-example import java.util.*; import java.util.concurrent.CompletableFuture; @@ -139,6 +141,26 @@ public class SinkDocExamples { // #ignore } + static void asPublisherExample() { + // #asPublisher + Source source = Source.range(1, 5); + + Publisher publisherFalse = + source.runWith(Sink.asPublisher(AsPublisher.WITHOUT_FANOUT), system); + CompletionStage resultFromFirstSubscriberFalse = + Source.fromPublisher(publisherFalse) + .runWith(Sink.fold(0, (acc, element) -> acc + element), system); + CompletionStage resultFromSecondSubscriberFalse = + Source.fromPublisher(publisherFalse) + .runWith(Sink.fold(1, (acc, element) -> acc * element), system); + + resultFromFirstSubscriberFalse.thenAccept(System.out::println); // 15 + resultFromSecondSubscriberFalse.thenAccept( + System.out + ::println); // No output, because the source was not able to subscribe to the publisher. + // #asPublisher + } + private static Source readLinesFromFile() { return Source.empty(); } diff --git a/akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala b/akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala new file mode 100644 index 0000000000..0c3aef2c60 --- /dev/null +++ b/akka-docs/src/test/scala/docs/stream/operators/sink/AsPublisher.scala @@ -0,0 +1,27 @@ +/* + * Copyright (C) 2020-2021 Lightbend Inc. + */ + +package docs.stream.operators.sink + +import scala.concurrent.{ ExecutionContextExecutor, Future } +import akka.actor.ActorSystem +import akka.stream.scaladsl.{ Sink, Source } + +object AsPublisher { + implicit val system: ActorSystem = ??? + implicit val ec: ExecutionContextExecutor = system.dispatcher + def asPublisherExample() = { + def asPublisherExample() = { + //#asPublisher + val source = Source(1 to 5) + + val publisher = source.runWith(Sink.asPublisher(false)) + Source.fromPublisher(publisher).runWith(Sink.foreach(println)) // 1 2 3 4 5 + Source + .fromPublisher(publisher) + .runWith(Sink.foreach(println)) //No output, because the source was not able to subscribe to the publisher. + //#asPublisher + } + } +}