diff --git a/akka-docs/src/main/paradox/stream/actor-interop.md b/akka-docs/src/main/paradox/stream/actor-interop.md index d75c7d16d1..bb32de79e9 100644 --- a/akka-docs/src/main/paradox/stream/actor-interop.md +++ b/akka-docs/src/main/paradox/stream/actor-interop.md @@ -224,3 +224,20 @@ Sends the elements of the stream to the given @java[`ActorRef`]@scala[`ActorR @@@ note See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/ActorSink/actorRefWithBackpressure.md) @@@ + + +### Topic.source + +A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. + +@@@ note +See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/PubSub/source.md) +@@@ + +### Topic.sink + +A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$]. + +@@@ note +See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/PubSub/sink.md) +@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/PubSub/sink.md b/akka-docs/src/main/paradox/stream/operators/PubSub/sink.md new file mode 100644 index 0000000000..e397906944 --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/PubSub/sink.md @@ -0,0 +1,37 @@ +# PubSub.sink + +A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$]. + +@ref[Actor interop operators](../index.md#actor-interop-operators) + +Note that there is no backpressure from the topic, so care must be taken to not publish messages at a higher rate than that can be handled +by subscribers. + +If the topic does not have any subscribers when a message is published, or the topic actor is stopped, the message is sent to dead letters. + +## Dependency + +This operator is included in: + +@@dependency[sbt,Maven,Gradle] { +bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion +symbol1=AkkaVersion +value1="$akka.version$" +group="com.typesafe.akka" +artifact="akka-stream-typed_$scala.binary.version$" +version=AkkaVersion +} + +## Signature + +@apidoc[PubSub.sink](akka.stream.typed.*.PubSub$) { scala="#sink[T](topic:akka.actor.typed.Toppic[T]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#sink(akka.actor.typed.Topic)" } + +## Reactive Streams semantics + +@@@div { .callout } + +**cancels** never + +**backpressures** never + +@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/PubSub/source.md b/akka-docs/src/main/paradox/stream/operators/PubSub/source.md new file mode 100644 index 0000000000..ef3535a7bd --- /dev/null +++ b/akka-docs/src/main/paradox/stream/operators/PubSub/source.md @@ -0,0 +1,40 @@ +# PubSub.source + +A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. + +@ref[Actor interop operators](../index.md#actor-interop-operators) + +The source can be materialized multiple times, each materialized stream will stream messages published to the topic after the stream has started. + +Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic, +if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit +the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure` +strategy. + + +## Dependency + +This operator is included in: + +@@dependency[sbt,Maven,Gradle] { +bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion +symbol1=AkkaVersion +value1="$akka.version$" +group="com.typesafe.akka" +artifact="akka-stream-typed_$scala.binary.version$" +version=AkkaVersion +} + +## Signature + +@apidoc[PubSub.source](akka.stream.typed.*.PubSub$) { scala="#source[T](topic:akka.actor.typed.Toppic[T]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#source(akka.actor.typed.Topic)" } + +## Reactive Streams semantics + +@@@div { .callout } + +**emits** a message published to the topic is emitted as soon as there is demand from downstream + +**completes** when the topic actor terminates + +@@@ \ No newline at end of file diff --git a/akka-docs/src/main/paradox/stream/operators/index.md b/akka-docs/src/main/paradox/stream/operators/index.md index ffc086b25e..c6f9a37d7f 100644 --- a/akka-docs/src/main/paradox/stream/operators/index.md +++ b/akka-docs/src/main/paradox/stream/operators/index.md @@ -326,6 +326,8 @@ Operators meant for inter-operating between Akka Streams and Actors: |Source/Flow|@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).| |ActorFlow|@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.| |ActorFlow|@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply`] where the T will be unwrapped and emitted downstream.| +|PubSub|@ref[sink](PubSub/sink.md)|A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].| +|PubSub|@ref[source](PubSub/source.md)|A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. | |Source/Flow|@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.| ## Compression operators @@ -525,7 +527,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md) * [setup](Source-or-Flow/setup.md) * [setup](Sink/setup.md) * [single](Source/single.md) +* [sink](PubSub/sink.md) * [sliding](Source-or-Flow/sliding.md) +* [source](PubSub/source.md) * [splitAfter](Source-or-Flow/splitAfter.md) * [splitWhen](Source-or-Flow/splitWhen.md) * [statefulMapConcat](Source-or-Flow/statefulMapConcat.md) diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/PubSub.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/PubSub.scala new file mode 100644 index 0000000000..f28f3f5e34 --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/PubSub.scala @@ -0,0 +1,55 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.stream.typed.javadsl + +import akka.NotUsed +import akka.actor.typed.ActorRef +import akka.actor.typed.pubsub.Topic +import akka.annotation.ApiMayChange +import akka.stream.OverflowStrategy +import akka.stream.javadsl.Sink +import akka.stream.javadsl.Source + +/** + * Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed + * publishing and subscribing of elements through a stream. + */ +object PubSub { + + /** + * Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized + * multiple times, each materialized stream will contain messages published after it was started. + * + * Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic, + * if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit + * the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure` + * strategy. + * + * @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic. + * @param bufferSize The maximum number of messages to buffer if the stream applies backpressure + * @param overflowStrategy Strategy to use once the buffer is full. + * @tparam T The type of the published messages + */ + @ApiMayChange + def source[T]( + topicActor: ActorRef[Topic.Command[T]], + bufferSize: Int, + overflowStrategy: OverflowStrategy): Source[T, NotUsed] = + akka.stream.typed.scaladsl.PubSub.source(topicActor, bufferSize, overflowStrategy).asJava + + /** + * Create a sink that will publish each message to the given topic. Note that there is no backpressure + * from the topic, so care must be taken to not publish messages at a higher rate than that can be handled + * by subscribers. If the topic does not have any subscribers when a message is published the message is + * sent to dead letters. + * + * @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic. + * @tparam T the type of the messages that can be published + */ + @ApiMayChange + def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] = + akka.stream.typed.scaladsl.PubSub.sink[T](topicActor).asJava + +} diff --git a/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/PubSub.scala b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/PubSub.scala new file mode 100644 index 0000000000..dea920de2d --- /dev/null +++ b/akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/PubSub.scala @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2009-2021 Lightbend Inc. + */ + +package akka.stream.typed.scaladsl + +import akka.NotUsed +import akka.actor.typed.ActorRef +import akka.actor.typed.pubsub.Topic +import akka.annotation.ApiMayChange +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.Sink +import akka.stream.scaladsl.Source + +/** + * Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed + * publishing and subscribing of elements through a stream. + */ +object PubSub { + + /** + * Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized + * multiple times, each materialized stream will contain messages published after it was started. + * + * Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic, + * if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit + * the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure` + * strategy. + * + * @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic. + * @param bufferSize The maximum number of messages to buffer if the stream applies backpressure + * @param overflowStrategy Strategy to use once the buffer is full. + * @tparam T The type of the published messages + */ + @ApiMayChange + def source[T]( + topicActor: ActorRef[Topic.Command[T]], + bufferSize: Int, + overflowStrategy: OverflowStrategy): Source[T, NotUsed] = + ActorSource + .actorRef[T](PartialFunction.empty, PartialFunction.empty, bufferSize, overflowStrategy) + .mapMaterializedValue { ref => + topicActor ! Topic.Subscribe(ref) + NotUsed + } + + /** + * Create a sink that will publish each message to the given topic. Note that there is no backpressure + * from the topic, so care must be taken to not publish messages at a higher rate than that can be handled + * by subscribers. If the topic does not have any subscribers when a message is published or the topic actor is stopped, + * the message is sent to dead letters. + * + * @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic. + * @tparam T the type of the messages that can be published + */ + @ApiMayChange + def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] = { + Sink + .foreach[T] { message => + topicActor ! Topic.Publish(message) + } + .mapMaterializedValue(_ => NotUsed) + } +} diff --git a/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/PubSubSpec.scala b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/PubSubSpec.scala new file mode 100644 index 0000000000..b1030e5482 --- /dev/null +++ b/akka-stream-typed/src/test/scala/akka/stream/typed/scaladsl/PubSubSpec.scala @@ -0,0 +1,63 @@ +/* + * Copyright (C) 2021 Lightbend Inc. + */ + +/* + * Copyright (C) 2009-2022 Lightbend Inc. + */ +package akka.stream.typed.scaladsl + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.internal.pubsub.TopicImpl +import akka.actor.typed.pubsub.Topic +import akka.stream.OverflowStrategy +import akka.stream.scaladsl.Source +import akka.stream.testkit.scaladsl.TestSink +import org.scalatest.wordspec.AnyWordSpecLike + +class PubSubSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike { + + "PubSub.source" should { + + "emit messages from the topic" in { + val topic = testKit.spawn(Topic[String]("my-topic-1")) + + val source = PubSub.source(topic, 100, OverflowStrategy.fail) + val sourceProbe = source.runWith(TestSink()) + sourceProbe.ensureSubscription() + + // wait until subscription has been seen + val probe = testKit.createTestProbe[TopicImpl.TopicStats]() + probe.awaitAssert { + topic ! TopicImpl.GetTopicStats(probe.ref) + probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should ===(1) + } + + topic ! Topic.Publish("published") + sourceProbe.requestNext("published") + sourceProbe.cancel() + } + + } + + "PubSub.sink" should { + "publish messages" in { + val topic = testKit.spawn(Topic[String]("my-topic-2")) + + val subscriberProbe = testKit.createTestProbe[String]() + topic ! Topic.Subscribe(subscriberProbe.ref) + + // wait until subscription has been seen + val probe = testKit.createTestProbe[TopicImpl.TopicStats]() + probe.awaitAssert { + topic ! TopicImpl.GetTopicStats(probe.ref) + probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should ===(1) + } + + Source.single("published").runWith(PubSub.sink(topic)) + + subscriberProbe.expectMessage("published") + } + } + +} diff --git a/project/StreamOperatorsIndexGenerator.scala b/project/StreamOperatorsIndexGenerator.scala index 97f644645b..f606a8f5f1 100644 --- a/project/StreamOperatorsIndexGenerator.scala +++ b/project/StreamOperatorsIndexGenerator.scala @@ -157,7 +157,9 @@ object StreamOperatorsIndexGenerator extends AutoPlugin { "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala", "akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala", "akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala", - "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala").flatMap { f => + "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala", + "akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/PubSub.scala", + "akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/PubSub.scala").flatMap { f => val slashesNr = f.count(_ == '/') val element = f.split("/")(slashesNr).split("\\.")(0) IO.read(new File(f))