Convenience stream operators for publishing and subscribing to typed pub sub #31037

This commit is contained in:
Johan Andrén 2022-01-12 16:12:15 +01:00 committed by GitHub
parent ccd5219de8
commit b9a1a66058
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 283 additions and 1 deletions

View file

@ -224,3 +224,20 @@ Sends the elements of the stream to the given @java[`ActorRef<T>`]@scala[`ActorR
@@@ note
See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/ActorSink/actorRefWithBackpressure.md)
@@@
### Topic.source
A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic.
@@@ note
See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/PubSub/source.md)
@@@
### Topic.sink
A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].
@@@ note
See also: @ref[ActorSink.actorRefWithBackpressure operator reference docs](operators/PubSub/sink.md)
@@@

View file

@ -0,0 +1,37 @@
# PubSub.sink
A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].
@ref[Actor interop operators](../index.md#actor-interop-operators)
Note that there is no backpressure from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
by subscribers.
If the topic does not have any subscribers when a message is published, or the topic actor is stopped, the message is sent to dead letters.
## Dependency
This operator is included in:
@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}
## Signature
@apidoc[PubSub.sink](akka.stream.typed.*.PubSub$) { scala="#sink[T](topic:akka.actor.typed.Toppic[T]):akka.stream.scaladsl.Sink[T,akka.NotUsed]" java="#sink(akka.actor.typed.Topic)" }
## Reactive Streams semantics
@@@div { .callout }
**cancels** never
**backpressures** never
@@@

View file

@ -0,0 +1,40 @@
# PubSub.source
A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic.
@ref[Actor interop operators](../index.md#actor-interop-operators)
The source can be materialized multiple times, each materialized stream will stream messages published to the topic after the stream has started.
Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
strategy.
## Dependency
This operator is included in:
@@dependency[sbt,Maven,Gradle] {
bomGroup=com.typesafe.akka bomArtifact=akka-bom_$scala.binary.version$ bomVersionSymbols=AkkaVersion
symbol1=AkkaVersion
value1="$akka.version$"
group="com.typesafe.akka"
artifact="akka-stream-typed_$scala.binary.version$"
version=AkkaVersion
}
## Signature
@apidoc[PubSub.source](akka.stream.typed.*.PubSub$) { scala="#source[T](topic:akka.actor.typed.Toppic[T]):akka.stream.scaladsl.Source[T,akka.NotUsed]" java="#source(akka.actor.typed.Topic)" }
## Reactive Streams semantics
@@@div { .callout }
**emits** a message published to the topic is emitted as soon as there is demand from downstream
**completes** when the topic actor terminates
@@@

View file

@ -326,6 +326,8 @@ Operators meant for inter-operating between Akka Streams and Actors:
|Source/Flow|<a name="ask"></a>@ref[ask](Source-or-Flow/ask.md)|Use the "Ask Pattern" to send a request-reply message to the target `ref` actor (of the classic actors API).|
|ActorFlow|<a name="ask"></a>@ref[ask](ActorFlow/ask.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply that will be emitted downstream.|
|ActorFlow|<a name="askwithstatus"></a>@ref[askWithStatus](ActorFlow/askWithStatus.md)|Use the "Ask Pattern" to send each stream element as an `ask` to the target actor (of the new actors API), and expect a reply of Type @scala[`StatusReply[T]`]@java[`StatusReply<T>`] where the T will be unwrapped and emitted downstream.|
|PubSub|<a name="sink"></a>@ref[sink](PubSub/sink.md)|A sink that will publish emitted messages to a @apidoc[akka.actor.typed.pubsub.Topic$].|
|PubSub|<a name="source"></a>@ref[source](PubSub/source.md)|A source that will subscribe to a @apidoc[akka.actor.typed.pubsub.Topic$] and stream messages published to the topic. |
|Source/Flow|<a name="watch"></a>@ref[watch](Source-or-Flow/watch.md)|Watch a specific `ActorRef` and signal a failure downstream once the actor terminates.|
## Compression operators
@ -525,7 +527,9 @@ For more background see the @ref[Error Handling in Streams](../stream-error.md)
* [setup](Source-or-Flow/setup.md)
* [setup](Sink/setup.md)
* [single](Source/single.md)
* [sink](PubSub/sink.md)
* [sliding](Source-or-Flow/sliding.md)
* [source](PubSub/source.md)
* [splitAfter](Source-or-Flow/splitAfter.md)
* [splitWhen](Source-or-Flow/splitWhen.md)
* [statefulMapConcat](Source-or-Flow/statefulMapConcat.md)

View file

@ -0,0 +1,55 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.typed.javadsl
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.actor.typed.pubsub.Topic
import akka.annotation.ApiMayChange
import akka.stream.OverflowStrategy
import akka.stream.javadsl.Sink
import akka.stream.javadsl.Source
/**
* Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed
* publishing and subscribing of elements through a stream.
*/
object PubSub {
/**
* Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized
* multiple times, each materialized stream will contain messages published after it was started.
*
* Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
* if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
* the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
* strategy.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @param bufferSize The maximum number of messages to buffer if the stream applies backpressure
* @param overflowStrategy Strategy to use once the buffer is full.
* @tparam T The type of the published messages
*/
@ApiMayChange
def source[T](
topicActor: ActorRef[Topic.Command[T]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, NotUsed] =
akka.stream.typed.scaladsl.PubSub.source(topicActor, bufferSize, overflowStrategy).asJava
/**
* Create a sink that will publish each message to the given topic. Note that there is no backpressure
* from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
* by subscribers. If the topic does not have any subscribers when a message is published the message is
* sent to dead letters.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @tparam T the type of the messages that can be published
*/
@ApiMayChange
def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] =
akka.stream.typed.scaladsl.PubSub.sink[T](topicActor).asJava
}

View file

@ -0,0 +1,64 @@
/*
* Copyright (C) 2009-2021 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.typed.scaladsl
import akka.NotUsed
import akka.actor.typed.ActorRef
import akka.actor.typed.pubsub.Topic
import akka.annotation.ApiMayChange
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
/**
* Sources and sinks to integrate [[akka.actor.typed.pubsub.Topic]] with streams allowing for local or distributed
* publishing and subscribing of elements through a stream.
*/
object PubSub {
/**
* Create a source that will subscribe to a topic and stream messages published to the topic. Can be materialized
* multiple times, each materialized stream will contain messages published after it was started.
*
* Note that it is not possible to propagate the backpressure from the running stream to the pub sub topic,
* if the stream is backpressuring published messages are buffered up to a limit and if the limit is hit
* the configurable `OverflowStrategy` decides what happens. It is not possible to use the `Backpressure`
* strategy.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @param bufferSize The maximum number of messages to buffer if the stream applies backpressure
* @param overflowStrategy Strategy to use once the buffer is full.
* @tparam T The type of the published messages
*/
@ApiMayChange
def source[T](
topicActor: ActorRef[Topic.Command[T]],
bufferSize: Int,
overflowStrategy: OverflowStrategy): Source[T, NotUsed] =
ActorSource
.actorRef[T](PartialFunction.empty, PartialFunction.empty, bufferSize, overflowStrategy)
.mapMaterializedValue { ref =>
topicActor ! Topic.Subscribe(ref)
NotUsed
}
/**
* Create a sink that will publish each message to the given topic. Note that there is no backpressure
* from the topic, so care must be taken to not publish messages at a higher rate than that can be handled
* by subscribers. If the topic does not have any subscribers when a message is published or the topic actor is stopped,
* the message is sent to dead letters.
*
* @param topicActor The actor ref for an `akka.actor.typed.pubsub.Topic` actor representing a specific topic.
* @tparam T the type of the messages that can be published
*/
@ApiMayChange
def sink[T](topicActor: ActorRef[Topic.Command[T]]): Sink[T, NotUsed] = {
Sink
.foreach[T] { message =>
topicActor ! Topic.Publish(message)
}
.mapMaterializedValue(_ => NotUsed)
}
}

View file

@ -0,0 +1,63 @@
/*
* Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
*/
/*
* Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.stream.typed.scaladsl
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import akka.actor.typed.internal.pubsub.TopicImpl
import akka.actor.typed.pubsub.Topic
import akka.stream.OverflowStrategy
import akka.stream.scaladsl.Source
import akka.stream.testkit.scaladsl.TestSink
import org.scalatest.wordspec.AnyWordSpecLike
class PubSubSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike {
"PubSub.source" should {
"emit messages from the topic" in {
val topic = testKit.spawn(Topic[String]("my-topic-1"))
val source = PubSub.source(topic, 100, OverflowStrategy.fail)
val sourceProbe = source.runWith(TestSink())
sourceProbe.ensureSubscription()
// wait until subscription has been seen
val probe = testKit.createTestProbe[TopicImpl.TopicStats]()
probe.awaitAssert {
topic ! TopicImpl.GetTopicStats(probe.ref)
probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should ===(1)
}
topic ! Topic.Publish("published")
sourceProbe.requestNext("published")
sourceProbe.cancel()
}
}
"PubSub.sink" should {
"publish messages" in {
val topic = testKit.spawn(Topic[String]("my-topic-2"))
val subscriberProbe = testKit.createTestProbe[String]()
topic ! Topic.Subscribe(subscriberProbe.ref)
// wait until subscription has been seen
val probe = testKit.createTestProbe[TopicImpl.TopicStats]()
probe.awaitAssert {
topic ! TopicImpl.GetTopicStats(probe.ref)
probe.expectMessageType[TopicImpl.TopicStats].localSubscriberCount should ===(1)
}
Source.single("published").runWith(PubSub.sink(topic))
subscriberProbe.expectMessage("published")
}
}
}

View file

@ -157,7 +157,9 @@ object StreamOperatorsIndexGenerator extends AutoPlugin {
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorFlow.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorFlow.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/ActorSink.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala").flatMap { f =>
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/ActorSink.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/scaladsl/PubSub.scala",
"akka-stream-typed/src/main/scala/akka/stream/typed/javadsl/PubSub.scala").flatMap { f =>
val slashesNr = f.count(_ == '/')
val element = f.split("/")(slashesNr).split("\\.")(0)
IO.read(new File(f))