Distributed pubsub for typed (#28625)

* First stab at distributed pubsub for typed

* Also allow sending to a single subscriber across the topic

* Revert "Also allow sending to a single subscriber across the topic"

This reverts commit 4fd4f0b75c0dda01706dcde70645dcfa09da889b.

* Serializer and basic multi-jvm test

* docs

* Review feedback

* This reads better

* One brace too many

* sample formatting/headers/yadi

* Hide actual messages to ease bincomp evolution

* More tesssssts

* And even moar tessssssssts

* Review feedback addressed

* Same serialization as typed sharding
Mention turnaround in docs
This commit is contained in:
Johan Andrén 2020-03-10 15:01:19 +01:00 committed by GitHub
parent 27da0a23a9
commit 59ce257209
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 1312 additions and 96 deletions

View file

@ -0,0 +1,47 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package jdocs.akka.typed.pubsub;
import akka.actor.typed.ActorRef;
import akka.actor.typed.Behavior;
import akka.actor.typed.javadsl.Behaviors;
// #start-topic
import akka.actor.typed.pubsub.Topic;
// #start-topic
public class PubSubExample {
static class Message {
public final String text;
public Message(String text) {
this.text = text;
}
}
private Behavior<?> behavior =
// #start-topic
Behaviors.setup(
context -> {
ActorRef<Topic.Command<Message>> topic =
context.spawn(Topic.create(Message.class, "my-topic"), "MyTopic");
// #start-topic
ActorRef<Message> subscriberActor = null;
// #subscribe
topic.tell(Topic.subscribe(subscriberActor));
topic.tell(Topic.unsubscribe(subscriberActor));
// #subscribe
// #publish
topic.tell(Topic.publish(new Message("Hello Subscribers!")));
// #publish
return Behaviors.empty();
});
}

View file

@ -0,0 +1,130 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.pubsub
import akka.actor.testkit.typed.scaladsl.LoggingTestKit
import akka.actor.testkit.typed.scaladsl.LogCapturing
import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import org.scalatest.wordspec.AnyWordSpecLike
import scala.concurrent.duration._
class LocalPubSubSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing {
"A pub-sub topic running locally" must {
"publish to all local subscriber actors of a topic" in {
val fruitTopic =
LoggingTestKit.debug("Topic list updated").expect {
testKit.spawn(Topic[String]("fruit"))
}
try {
val probe1 = testKit.createTestProbe[String]()
val probe2 = testKit.createTestProbe[String]()
val probe3 = testKit.createTestProbe[String]()
LoggingTestKit.debug("Topic list updated").expect {
fruitTopic ! Topic.Subscribe(probe1.ref)
fruitTopic ! Topic.Subscribe(probe2.ref)
fruitTopic ! Topic.Subscribe(probe3.ref)
}
fruitTopic ! Topic.Publish("banana")
probe1.expectMessage("banana")
probe2.expectMessage("banana")
probe3.expectMessage("banana")
} finally {
testKit.stop(fruitTopic)
}
}
"publish to all subscriber actors across several instances of the same topic" in {
val fruitTopic1 =
LoggingTestKit.debug("Topic list updated").expect {
testKit.spawn(Topic[String]("fruit"))
}
val fruitTopic2 =
LoggingTestKit.debug("Topic list updated").expect {
testKit.spawn(Topic[String]("fruit"))
}
try {
val probe1 = testKit.createTestProbe[String]()
val probe2 = testKit.createTestProbe[String]()
val probe3 = testKit.createTestProbe[String]()
LoggingTestKit
.debug("Topic list updated")
// both topic instances should have seen the updated list
.withOccurrences(2)
.expect {
fruitTopic2 ! Topic.Subscribe(probe1.ref)
fruitTopic2 ! Topic.Subscribe(probe2.ref)
fruitTopic2 ! Topic.Subscribe(probe3.ref)
}
fruitTopic1 ! Topic.Publish("banana")
probe1.expectMessage("banana")
probe2.expectMessage("banana")
probe3.expectMessage("banana")
} finally {
testKit.stop(fruitTopic1)
testKit.stop(fruitTopic2)
}
}
"doesn't publish across topics unsubscribe" in {
val fruitTopic =
LoggingTestKit.debug("Topic list updated").expect {
testKit.spawn(Topic[String]("fruit"))
}
val veggieTopic =
LoggingTestKit.debug("Topic list updated").expect {
testKit.spawn(Topic[String]("veggies"))
}
try {
val probe1 = testKit.createTestProbe[String]()
LoggingTestKit.debug("Topic list updated").expect {
fruitTopic ! Topic.Subscribe(probe1.ref)
}
veggieTopic ! Topic.Publish("carrot")
probe1.expectNoMessage(200.millis)
} finally {
testKit.stop(fruitTopic)
}
}
"doesn't publish after unsubscribe" in {
val fruitTopic =
LoggingTestKit.debug("Topic list updated").expect {
testKit.spawn(Topic[String]("fruit"))
}
try {
val probe1 = testKit.createTestProbe[String]()
LoggingTestKit.debug("Topic list updated").expect {
fruitTopic ! Topic.Subscribe(probe1.ref)
}
LoggingTestKit.debug("Topic list updated").expect {
fruitTopic ! Topic.Unsubscribe(probe1.ref)
}
fruitTopic ! Topic.Publish("banana")
probe1.expectNoMessage(200.millis)
} finally {
testKit.stop(fruitTopic)
}
}
}
}

View file

@ -0,0 +1,38 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package docs.akka.typed.pubsub
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.scaladsl.Behaviors
object PubSubExample {
case class Message(text: String)
def example: Behavior[Any] = {
// #start-topic
import akka.actor.typed.pubsub.Topic
Behaviors.setup { context =>
val topic = context.spawn(Topic[Message]("my-topic"), "MyTopic")
// #start-topic
val subscriberActor: ActorRef[Message] = ???
// #subscribe
topic ! Topic.Subscribe(subscriberActor)
topic ! Topic.Unsubscribe(subscriberActor)
// #subscribe
// #publish
topic ! Topic.Publish(Message("Hello Subscribers!"))
// #publish
Behaviors.empty
}
}
}

View file

@ -0,0 +1,143 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.internal.pubsub
import akka.actor.Dropped
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.pubsub.Topic
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.AbstractBehavior
import akka.actor.typed.scaladsl.ActorContext
import akka.actor.typed.scaladsl.LoggerOps
import akka.actor.typed.scaladsl.adapter._
import akka.annotation.InternalApi
import scala.reflect.ClassTag
/**
* INTERNAL API
*/
@InternalApi
private[akka] object TopicImpl {
trait Command[T]
// actual public messages but internal to ease bincomp evolution
final case class Publish[T](message: T) extends Topic.Command[T]
final case class Subscribe[T](subscriber: ActorRef[T]) extends Topic.Command[T]
final case class Unsubscribe[T](subscriber: ActorRef[T]) extends Topic.Command[T]
// internal messages, note that the protobuf serializer for those sent remotely is defined in akka-cluster-typed
final case class GetTopicStats[T](replyTo: ActorRef[TopicStats]) extends Topic.Command[T]
final case class TopicStats(localSubscriberCount: Int, topicInstanceCount: Int)
final case class TopicInstancesUpdated[T](topics: Set[ActorRef[TopicImpl.Command[T]]]) extends Command[T]
final case class MessagePublished[T](message: T) extends Command[T]
final case class SubscriberTerminated[T](subscriber: ActorRef[T]) extends Command[T]
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class TopicImpl[T](topicName: String, context: ActorContext[TopicImpl.Command[T]])(
implicit classTag: ClassTag[T])
extends AbstractBehavior[TopicImpl.Command[T]](context) {
/*
* The topic actor keeps a local set of subscribers, whenever that is non-empty it registers itself for
* a topic service key, and when it becomes empty again it deregisters from the service key. Published
* messages go to all currently known topics registered for the topic service key and the individual topic
* instances then broadcast the message to all local subscribers. This achieves deduplication for nodes
* with multiple subscribers and avoids sending to nodes without any subscribers.
*/
import TopicImpl._
private val topicServiceKey = ServiceKey[TopicImpl.Command[T]](topicName)
context.log.debugN(
"Starting up pub-sub topic [{}] for messages of type [{}]",
topicName,
classTag.runtimeClass.getName)
private var topicInstances = Set.empty[ActorRef[TopicImpl.Command[T]]]
private var localSubscribers = Set.empty[ActorRef[T]]
private val receptionist = context.system.receptionist
private val receptionistAdapter = context.messageAdapter[Receptionist.Listing] {
case topicServiceKey.Listing(topics) => TopicInstancesUpdated(topics)
}
receptionist ! Receptionist.Subscribe(topicServiceKey, receptionistAdapter)
override def onMessage(msg: TopicImpl.Command[T]): Behavior[TopicImpl.Command[T]] = msg match {
case Publish(message) =>
if (topicInstances.isEmpty) {
context.log.trace("Publishing message of type [{}] but no subscribers, dropping", msg.getClass)
context.system.deadLetters ! Dropped(message, "No topic subscribers known", context.self.toClassic)
} else {
context.log.trace("Publishing message of type [{}]", msg.getClass)
val pub = MessagePublished(message)
topicInstances.foreach(_ ! pub)
}
this
case MessagePublished(msg) =>
context.log.trace("Message of type [{}] published", msg.getClass)
localSubscribers.foreach(_ ! msg)
this
case Subscribe(subscriber) =>
if (!localSubscribers.contains(subscriber)) {
context.watchWith(subscriber, SubscriberTerminated(subscriber))
localSubscribers = localSubscribers + subscriber
if (localSubscribers.size == 1) {
context.log.debug(
"Local subscriber [{}] added, went from no subscribers to one, subscribing to receptionist",
subscriber)
// we went from no subscribers to one, register to the receptionist
receptionist ! Receptionist.Register(topicServiceKey, context.self)
} else {
context.log.debug("Local subscriber [{}] added", subscriber)
}
} else {
context.log.debug("Local subscriber [{}] already subscribed, ignoring Subscribe command")
}
this
case Unsubscribe(subscriber) =>
context.unwatch(subscriber)
localSubscribers = localSubscribers.filterNot(_ == subscriber)
if (localSubscribers.isEmpty) {
context.log.debug("Last local subscriber [{}] unsubscribed, deregistering from receptionist", subscriber)
// that was the lost subscriber, deregister from the receptionist
receptionist ! Receptionist.Deregister(topicServiceKey, context.self)
} else {
context.log.debug("Local subscriber [{}] unsubscribed", subscriber)
}
this
case SubscriberTerminated(subscriber) =>
localSubscribers -= subscriber
if (localSubscribers.isEmpty) {
context.log.debug("Last local subscriber [{}] terminated, deregistering from receptionist", subscriber)
// that was the last subscriber, deregister from the receptionist
receptionist ! Receptionist.Deregister(topicServiceKey, context.self)
} else {
context.log.debug("Local subscriber [{}] terminated, removing from subscriber list", subscriber)
}
this
case TopicInstancesUpdated(newTopics) =>
context.log.debug("Topic list updated [{}]", newTopics)
topicInstances = newTopics
this
case GetTopicStats(replyTo) =>
replyTo ! TopicStats(localSubscribers.size, topicInstances.size)
this
}
}

View file

@ -0,0 +1,86 @@
/*
* Copyright (C) 2009-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.actor.typed.pubsub
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.internal.pubsub.TopicImpl
import akka.actor.typed.scaladsl.Behaviors
import akka.annotation.DoNotInherit
import scala.reflect.ClassTag
/**
* A pub sub topic is an actor that handles subscribing to a topic and publishing messages to all subscribed actors.
*
* It is mostly useful in a clustered setting, where it is intended to be started once on every node that want to
* house subscribers or publish messages to the topic, but it also works in a local setting without cluster.
*
* In a clustered context messages are deduplicated so that there is at most one message sent to each node for
* each publish and if there are no subscribers on a node, no message is sent to it. Note that the list of subscribers
* is eventually consistent and there are no delivery guarantees built in.
*
* Each topic results in a [[akka.actor.typed.receptionist.ServiceKey]] in the [[akka.actor.typed.receptionist.Receptionist]]
* so the same scaling recommendation holds for topics, see docs:
* https://doc.akka.io/docs/akka/current/typed/actor-discovery.html#receptionist-scalability
*/
object Topic {
/**
* Not for user extension
*/
@DoNotInherit
trait Command[T] extends TopicImpl.Command[T]
/**
* Scala API: Publish the message to all currently known subscribers.
*/
object Publish {
def apply[T](message: T): Command[T] =
TopicImpl.Publish(message)
}
/**
* Java API: Publish the message to all currently known subscribers.
*/
def publish[T](message: T): Command[T] = Publish(message)
/**
* Scala API: Subscribe to this topic. Should only be used for local subscribers.
*/
object Subscribe {
def apply[T](subscriber: ActorRef[T]): Command[T] = TopicImpl.Subscribe(subscriber)
}
/**
* Java API: Subscribe to this topic. Should only be used for local subscribers.
*/
def subscribe[T](subscriber: ActorRef[T]): Command[T] = Subscribe(subscriber)
/**
* Scala API: Unsubscribe a previously subscribed actor from this topic.
*/
object Unsubscribe {
def apply[T](subscriber: ActorRef[T]): Command[T] = TopicImpl.Unsubscribe(subscriber)
}
/**
* Java API: Unsubscribe a previously subscribed actor from this topic.
*/
def unsubscribe[T](subscriber: ActorRef[T]): Command[T] = Unsubscribe(subscriber)
/**
* Scala API: Create a topic actor behavior for the given topic name and message type.
*/
def apply[T](topicName: String)(implicit classTag: ClassTag[T]): Behavior[Command[T]] =
Behaviors.setup[TopicImpl.Command[T]](context => new TopicImpl[T](topicName, context)).narrow
/**
* Java API: Create a topic actor behavior for the given topic name and message class
*/
def create[T](messageClass: Class[T], topicName: String): Behavior[Command[T]] =
apply[T](topicName)(ClassTag(messageClass))
}

View file

@ -1,5 +1,5 @@
/*
* Copyright (C) 2019-2020 Lightbend Inc. <https://www.lightbend.com>
* Copyright (C) 2020 Lightbend Inc. <https://www.lightbend.com>
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!
@ -731,11 +731,659 @@ public final class ClusterMessages {
}
public interface PubSubMessagePublishedOrBuilder extends
// @@protoc_insertion_point(interface_extends:akka.cluster.typed.PubSubMessagePublished)
akka.protobufv3.internal.MessageOrBuilder {
/**
* <code>required .Payload message = 1;</code>
* @return Whether the message field is set.
*/
boolean hasMessage();
/**
* <code>required .Payload message = 1;</code>
* @return The message.
*/
akka.remote.ContainerFormats.Payload getMessage();
/**
* <code>required .Payload message = 1;</code>
*/
akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder();
}
/**
* Protobuf type {@code akka.cluster.typed.PubSubMessagePublished}
*/
public static final class PubSubMessagePublished extends
akka.protobufv3.internal.GeneratedMessageV3 implements
// @@protoc_insertion_point(message_implements:akka.cluster.typed.PubSubMessagePublished)
PubSubMessagePublishedOrBuilder {
private static final long serialVersionUID = 0L;
// Use PubSubMessagePublished.newBuilder() to construct.
private PubSubMessagePublished(akka.protobufv3.internal.GeneratedMessageV3.Builder<?> builder) {
super(builder);
}
private PubSubMessagePublished() {
}
@java.lang.Override
@SuppressWarnings({"unused"})
protected java.lang.Object newInstance(
akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) {
return new PubSubMessagePublished();
}
@java.lang.Override
public final akka.protobufv3.internal.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private PubSubMessagePublished(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
this();
if (extensionRegistry == null) {
throw new java.lang.NullPointerException();
}
int mutable_bitField0_ = 0;
akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields =
akka.protobufv3.internal.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
case 10: {
akka.remote.ContainerFormats.Payload.Builder subBuilder = null;
if (((bitField0_ & 0x00000001) != 0)) {
subBuilder = message_.toBuilder();
}
message_ = input.readMessage(akka.remote.ContainerFormats.Payload.PARSER, extensionRegistry);
if (subBuilder != null) {
subBuilder.mergeFrom(message_);
message_ = subBuilder.buildPartial();
}
bitField0_ |= 0x00000001;
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
done = true;
}
break;
}
}
}
} catch (akka.protobufv3.internal.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new akka.protobufv3.internal.InvalidProtocolBufferException(
e).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final akka.protobufv3.internal.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor;
}
@java.lang.Override
protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.class, akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.Builder.class);
}
private int bitField0_;
public static final int MESSAGE_FIELD_NUMBER = 1;
private akka.remote.ContainerFormats.Payload message_;
/**
* <code>required .Payload message = 1;</code>
* @return Whether the message field is set.
*/
public boolean hasMessage() {
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <code>required .Payload message = 1;</code>
* @return The message.
*/
public akka.remote.ContainerFormats.Payload getMessage() {
return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
}
/**
* <code>required .Payload message = 1;</code>
*/
public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() {
return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized == 1) return true;
if (isInitialized == 0) return false;
if (!hasMessage()) {
memoizedIsInitialized = 0;
return false;
}
if (!getMessage().isInitialized()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
@java.lang.Override
public void writeTo(akka.protobufv3.internal.CodedOutputStream output)
throws java.io.IOException {
if (((bitField0_ & 0x00000001) != 0)) {
output.writeMessage(1, getMessage());
}
unknownFields.writeTo(output);
}
@java.lang.Override
public int getSerializedSize() {
int size = memoizedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeMessageSize(1, getMessage());
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
}
@java.lang.Override
public boolean equals(final java.lang.Object obj) {
if (obj == this) {
return true;
}
if (!(obj instanceof akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished)) {
return super.equals(obj);
}
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished other = (akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished) obj;
if (hasMessage() != other.hasMessage()) return false;
if (hasMessage()) {
if (!getMessage()
.equals(other.getMessage())) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@java.lang.Override
public int hashCode() {
if (memoizedHashCode != 0) {
return memoizedHashCode;
}
int hash = 41;
hash = (19 * hash) + getDescriptor().hashCode();
if (hasMessage()) {
hash = (37 * hash) + MESSAGE_FIELD_NUMBER;
hash = (53 * hash) + getMessage().hashCode();
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
java.nio.ByteBuffer data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
java.nio.ByteBuffer data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
akka.protobufv3.internal.ByteString data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
akka.protobufv3.internal.ByteString data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(byte[] data)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
byte[] data,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(java.io.InputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
java.io.InputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseDelimitedFrom(
java.io.InputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseDelimitedWithIOException(PARSER, input, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
akka.protobufv3.internal.CodedInputStream input)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return akka.protobufv3.internal.GeneratedMessageV3
.parseWithIOException(PARSER, input, extensionRegistry);
}
@java.lang.Override
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder() {
return DEFAULT_INSTANCE.toBuilder();
}
public static Builder newBuilder(akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished prototype) {
return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype);
}
@java.lang.Override
public Builder toBuilder() {
return this == DEFAULT_INSTANCE
? new Builder() : new Builder().mergeFrom(this);
}
@java.lang.Override
protected Builder newBuilderForType(
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code akka.cluster.typed.PubSubMessagePublished}
*/
public static final class Builder extends
akka.protobufv3.internal.GeneratedMessageV3.Builder<Builder> implements
// @@protoc_insertion_point(builder_implements:akka.cluster.typed.PubSubMessagePublished)
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublishedOrBuilder {
public static final akka.protobufv3.internal.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor;
}
@java.lang.Override
protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.class, akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.Builder.class);
}
// Construct using akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (akka.protobufv3.internal.GeneratedMessageV3
.alwaysUseFieldBuilders) {
getMessageFieldBuilder();
}
}
@java.lang.Override
public Builder clear() {
super.clear();
if (messageBuilder_ == null) {
message_ = null;
} else {
messageBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
@java.lang.Override
public akka.protobufv3.internal.Descriptors.Descriptor
getDescriptorForType() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor;
}
@java.lang.Override
public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished getDefaultInstanceForType() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.getDefaultInstance();
}
@java.lang.Override
public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished build() {
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
@java.lang.Override
public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished buildPartial() {
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished result = new akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) != 0)) {
if (messageBuilder_ == null) {
result.message_ = message_;
} else {
result.message_ = messageBuilder_.build();
}
to_bitField0_ |= 0x00000001;
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
@java.lang.Override
public Builder clone() {
return super.clone();
}
@java.lang.Override
public Builder setField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
java.lang.Object value) {
return super.setField(field, value);
}
@java.lang.Override
public Builder clearField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field) {
return super.clearField(field);
}
@java.lang.Override
public Builder clearOneof(
akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) {
return super.clearOneof(oneof);
}
@java.lang.Override
public Builder setRepeatedField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
int index, java.lang.Object value) {
return super.setRepeatedField(field, index, value);
}
@java.lang.Override
public Builder addRepeatedField(
akka.protobufv3.internal.Descriptors.FieldDescriptor field,
java.lang.Object value) {
return super.addRepeatedField(field, value);
}
@java.lang.Override
public Builder mergeFrom(akka.protobufv3.internal.Message other) {
if (other instanceof akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished) {
return mergeFrom((akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished other) {
if (other == akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.getDefaultInstance()) return this;
if (other.hasMessage()) {
mergeMessage(other.getMessage());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
}
@java.lang.Override
public final boolean isInitialized() {
if (!hasMessage()) {
return false;
}
if (!getMessage().isInitialized()) {
return false;
}
return true;
}
@java.lang.Override
public Builder mergeFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobufv3.internal.InvalidProtocolBufferException e) {
parsedMessage = (akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished) e.getUnfinishedMessage();
throw e.unwrapIOException();
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
private akka.remote.ContainerFormats.Payload message_;
private akka.protobufv3.internal.SingleFieldBuilderV3<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_;
/**
* <code>required .Payload message = 1;</code>
* @return Whether the message field is set.
*/
public boolean hasMessage() {
return ((bitField0_ & 0x00000001) != 0);
}
/**
* <code>required .Payload message = 1;</code>
* @return The message.
*/
public akka.remote.ContainerFormats.Payload getMessage() {
if (messageBuilder_ == null) {
return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
} else {
return messageBuilder_.getMessage();
}
}
/**
* <code>required .Payload message = 1;</code>
*/
public Builder setMessage(akka.remote.ContainerFormats.Payload value) {
if (messageBuilder_ == null) {
if (value == null) {
throw new NullPointerException();
}
message_ = value;
onChanged();
} else {
messageBuilder_.setMessage(value);
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>required .Payload message = 1;</code>
*/
public Builder setMessage(
akka.remote.ContainerFormats.Payload.Builder builderForValue) {
if (messageBuilder_ == null) {
message_ = builderForValue.build();
onChanged();
} else {
messageBuilder_.setMessage(builderForValue.build());
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>required .Payload message = 1;</code>
*/
public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) {
if (messageBuilder_ == null) {
if (((bitField0_ & 0x00000001) != 0) &&
message_ != null &&
message_ != akka.remote.ContainerFormats.Payload.getDefaultInstance()) {
message_ =
akka.remote.ContainerFormats.Payload.newBuilder(message_).mergeFrom(value).buildPartial();
} else {
message_ = value;
}
onChanged();
} else {
messageBuilder_.mergeFrom(value);
}
bitField0_ |= 0x00000001;
return this;
}
/**
* <code>required .Payload message = 1;</code>
*/
public Builder clearMessage() {
if (messageBuilder_ == null) {
message_ = null;
onChanged();
} else {
messageBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000001);
return this;
}
/**
* <code>required .Payload message = 1;</code>
*/
public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() {
bitField0_ |= 0x00000001;
onChanged();
return getMessageFieldBuilder().getBuilder();
}
/**
* <code>required .Payload message = 1;</code>
*/
public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() {
if (messageBuilder_ != null) {
return messageBuilder_.getMessageOrBuilder();
} else {
return message_ == null ?
akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_;
}
}
/**
* <code>required .Payload message = 1;</code>
*/
private akka.protobufv3.internal.SingleFieldBuilderV3<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>
getMessageFieldBuilder() {
if (messageBuilder_ == null) {
messageBuilder_ = new akka.protobufv3.internal.SingleFieldBuilderV3<
akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>(
getMessage(),
getParentForChildren(),
isClean());
message_ = null;
}
return messageBuilder_;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
return super.setUnknownFields(unknownFields);
}
@java.lang.Override
public final Builder mergeUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
return super.mergeUnknownFields(unknownFields);
}
// @@protoc_insertion_point(builder_scope:akka.cluster.typed.PubSubMessagePublished)
}
// @@protoc_insertion_point(class_scope:akka.cluster.typed.PubSubMessagePublished)
private static final akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished DEFAULT_INSTANCE;
static {
DEFAULT_INSTANCE = new akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished();
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished getDefaultInstance() {
return DEFAULT_INSTANCE;
}
@java.lang.Deprecated public static final akka.protobufv3.internal.Parser<PubSubMessagePublished>
PARSER = new akka.protobufv3.internal.AbstractParser<PubSubMessagePublished>() {
@java.lang.Override
public PubSubMessagePublished parsePartialFrom(
akka.protobufv3.internal.CodedInputStream input,
akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry)
throws akka.protobufv3.internal.InvalidProtocolBufferException {
return new PubSubMessagePublished(input, extensionRegistry);
}
};
public static akka.protobufv3.internal.Parser<PubSubMessagePublished> parser() {
return PARSER;
}
@java.lang.Override
public akka.protobufv3.internal.Parser<PubSubMessagePublished> getParserForType() {
return PARSER;
}
@java.lang.Override
public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished getDefaultInstanceForType() {
return DEFAULT_INSTANCE;
}
}
private static final akka.protobufv3.internal.Descriptors.Descriptor
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor;
private static final
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable;
private static final akka.protobufv3.internal.Descriptors.Descriptor
internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor;
private static final
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable
internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable;
public static akka.protobufv3.internal.Descriptors.FileDescriptor
getDescriptor() {
@ -746,13 +1394,16 @@ public final class ClusterMessages {
static {
java.lang.String[] descriptorData = {
"\n\025ClusterMessages.proto\022\022akka.cluster.ty" +
"ped\"8\n\021ReceptionistEntry\022\020\n\010actorRef\030\001 \002" +
"(\t\022\021\n\tsystemUid\030\002 \002(\004B(\n$akka.cluster.ty" +
"ped.internal.protobufH\001"
"ped\032\026ContainerFormats.proto\"8\n\021Reception" +
"istEntry\022\020\n\010actorRef\030\001 \002(\t\022\021\n\tsystemUid\030" +
"\002 \002(\004\"3\n\026PubSubMessagePublished\022\031\n\007messa" +
"ge\030\001 \002(\0132\010.PayloadB(\n$akka.cluster.typed" +
".internal.protobufH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new akka.protobufv3.internal.Descriptors.FileDescriptor[] {
akka.remote.ContainerFormats.getDescriptor(),
});
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor =
getDescriptor().getMessageTypes().get(0);
@ -760,6 +1411,13 @@ public final class ClusterMessages {
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor,
new java.lang.String[] { "ActorRef", "SystemUid", });
internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor,
new java.lang.String[] { "Message", });
akka.remote.ContainerFormats.getDescriptor();
}
// @@protoc_insertion_point(outer_class_scope)

View file

@ -8,9 +8,14 @@ package akka.cluster.typed;
option java_package = "akka.cluster.typed.internal.protobuf";
option optimize_for = SPEED;
import "ContainerFormats.proto";
message ReceptionistEntry {
required string actorRef = 1;
required uint64 systemUid = 2;
}
message PubSubMessagePublished {
required Payload message = 1;
}

View file

@ -49,6 +49,7 @@ akka {
}
serialization-bindings {
"akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster
"akka.actor.typed.internal.pubsub.TopicImpl$MessagePublished" = typed-cluster
}
}
cluster.configuration-compatibility-check.checkers {

View file

@ -8,11 +8,13 @@ import java.io.NotSerializableException
import akka.actor.ExtendedActorSystem
import akka.actor.typed.ActorRefResolver
import akka.actor.typed.internal.pubsub.TopicImpl
import akka.annotation.InternalApi
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.typed.internal.protobuf.ClusterMessages
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.Entry
import akka.remote.serialization.WrappedPayloadSupport
/**
* INTERNAL API
@ -24,27 +26,41 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
// Serializers are initialized early on. `toTyped` might then try to initialize the classic ActorSystemAdapter extension.
private lazy val resolver = ActorRefResolver(system.toTyped)
private val payloadSupport = new WrappedPayloadSupport(system)
private val ReceptionistEntryManifest = "a"
private val PubSubPublishManifest = "b"
override def manifest(o: AnyRef): String = o match {
case _: Entry => ReceptionistEntryManifest
case _: Entry => ReceptionistEntryManifest
case _: TopicImpl.MessagePublished[_] => PubSubPublishManifest
case _ =>
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case e: Entry => receptionistEntryToBinary(e)
case e: Entry => receptionistEntryToBinary(e)
case m: TopicImpl.MessagePublished[_] => pubSubPublishToBinary(m)
case _ =>
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case ReceptionistEntryManifest => receptionistEntryFromBinary(bytes)
case PubSubPublishManifest => pubSubMessageFromBinary(bytes)
case _ =>
throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
private def pubSubPublishToBinary(m: TopicImpl.MessagePublished[_]): Array[Byte] = {
ClusterMessages.PubSubMessagePublished
.newBuilder()
.setMessage(payloadSupport.payloadBuilder(m.message))
.build()
.toByteArray
}
private def receptionistEntryToBinary(e: Entry): Array[Byte] =
ClusterMessages.ReceptionistEntry
.newBuilder()
@ -53,6 +69,12 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
.build()
.toByteArray
private def pubSubMessageFromBinary(bytes: Array[Byte]): TopicImpl.MessagePublished[_] = {
val parsed = ClusterMessages.PubSubMessagePublished.parseFrom(bytes)
val userMessage = payloadSupport.deserializePayload(parsed.getMessage)
TopicImpl.MessagePublished(userMessage)
}
private def receptionistEntryFromBinary(bytes: Array[Byte]): Entry = {
val re = ClusterMessages.ReceptionistEntry.parseFrom(bytes)
Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)

View file

@ -6,16 +6,24 @@ package akka.cluster.typed
import java.util.concurrent.ConcurrentHashMap
import akka.actor.typed.ActorRef
import akka.actor.typed.ActorSystem
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.SpawnProtocol
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.{ Address, Scheduler }
import akka.cluster.{ ClusterEvent, MemberStatus }
import akka.remote.testconductor.RoleName
import akka.remote.testkit.{ MultiNodeSpec, STMultiNodeSpec }
import akka.testkit.WatchedByCoroner
import akka.util.Timeout
import org.scalatest.Suite
import org.scalatest.matchers.should.Matchers
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.language.implicitConversions
@ -75,4 +83,12 @@ trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedB
enterBarrier("all-joined")
}
private lazy val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command]
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
implicit val timeout: Timeout = testKitSettings.DefaultTimeout
val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _))
Await.result(f, timeout.duration * 2)
}
}

View file

@ -0,0 +1,115 @@
/*
* Copyright (C) 2018-2020 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.internal.pubsub.TopicImpl
import akka.actor.typed.pubsub.Topic
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MultiNodeClusterSpec
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.serialization.jackson.CborSerializable
import com.typesafe.config.ConfigFactory
object PubSubSpecConfig extends MultiNodeConfig {
val first: RoleName = role("first")
val second = role("second")
val third = role("third")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
""").withFallback(MultiNodeClusterSpec.clusterConfig))
nodeConfig(first)(ConfigFactory.parseString("""
akka.cluster.multi-data-center.self-data-center = "dc1"
"""))
nodeConfig(second, third)(ConfigFactory.parseString("""
akka.cluster.multi-data-center.self-data-center = "dc2"
"""))
case class Message(msg: String) extends CborSerializable
}
class PubSubMultiJvmNode1 extends PubSubSpec
class PubSubMultiJvmNode2 extends PubSubSpec
class PubSubMultiJvmNode3 extends PubSubSpec
abstract class PubSubSpec extends MultiNodeSpec(PubSubSpecConfig) with MultiNodeTypedClusterSpec {
import PubSubSpecConfig._
var topic: ActorRef[Topic.Command[Message]] = null
val topicProbe = TestProbe[Message]()
var otherTopic: ActorRef[Topic.Command[Message]] = null
val otherTopicProbe = TestProbe[Message]()
"A cluster" must {
"be able to form" in {
formCluster(first, second, third)
}
"start a topic on each node" in {
topic = spawn(Topic[Message]("animals"), "AnimalsTopic")
topic ! Topic.Subscribe(topicProbe.ref)
runOn(second, third) {
otherTopic = system.actorOf(PropsAdapter(Topic[Message]("other"))).toTyped[Topic.Command[Message]]
otherTopic ! Topic.Subscribe(otherTopicProbe.ref)
}
enterBarrier("topics started")
}
"see nodes with subscribers registered" in {
val statsProbe = TestProbe[TopicImpl.TopicStats]()
statsProbe.awaitAssert({
topic ! TopicImpl.GetTopicStats[Message](statsProbe.ref)
statsProbe.receiveMessage().topicInstanceCount should ===(3)
})
enterBarrier("topic instances with subscribers seen")
}
"publish to all nodes" in {
runOn(first) {
topic ! Topic.Publish(Message("monkey"))
}
enterBarrier("first published")
topicProbe.expectMessage(Message("monkey"))
runOn(second, third) {
// check that messages are not leaking between topics
otherTopicProbe.expectNoMessage()
}
enterBarrier("publish seen")
}
"not publish to unsubscribed" in {
runOn(first) {
topic ! Topic.Unsubscribe(topicProbe.ref)
// unsubscribe does not need to be gossiped before it is effective
val statsProbe = TestProbe[TopicImpl.TopicStats]()
statsProbe.awaitAssert({
topic ! TopicImpl.GetTopicStats[Message](statsProbe.ref)
statsProbe.receiveMessage().topicInstanceCount should ===(2)
})
}
enterBarrier("unsubscribed")
Thread.sleep(200) // but it needs to reach the topic
runOn(third) {
topic ! Topic.Publish(Message("donkey"))
}
enterBarrier("second published")
runOn(second, third) {
topicProbe.expectMessage(Message("donkey"))
}
runOn(first) {
topicProbe.expectNoMessage()
}
}
}
}

View file

@ -5,15 +5,9 @@
package akka.cluster.typed.internal
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.typed.ActorRef
import akka.actor.typed.Behavior
import akka.actor.typed.Props
import akka.actor.typed.SpawnProtocol
import akka.actor.typed.receptionist.Receptionist
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.MultiNodeClusterSpec
import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.first
import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.second
@ -22,11 +16,8 @@ import akka.cluster.typed.MultiNodeTypedClusterSpec
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.util.Timeout
import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration._
object ClusterReceptionistUnreachabilitySpecConfig extends MultiNodeConfig {
@ -55,14 +46,6 @@ abstract class ClusterReceptionistUnreachabilitySpec
import ClusterReceptionistUnreachabilitySpec._
val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command]
def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = {
implicit val timeout: Timeout = 3.seconds
val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _))
Await.result(f, 3.seconds)
}
val probe = TestProbe[AnyRef]()
val receptionistProbe = TestProbe[AnyRef]()

View file

@ -28,9 +28,8 @@ Akka Cluster. The data is accessed with an actor providing a key-value store lik
<!--- #cluster-pubsub --->
### Distributed Publish Subscribe
Publish-subscribe messaging between actors in the cluster, and point-to-point messaging
using the logical path of the actors, i.e. the sender does not have to know on which
node the destination actor is running.
Publish-subscribe messaging between actors in the cluster based on a topic,
i.e. the sender does not have to know on which node the destination actor is running.
<!--- #cluster-pubsub --->

View file

@ -440,8 +440,6 @@ See @ref:[Cluster Sharding](cluster-sharding.md).
See @ref:[Distributed Data](distributed-data.md).
@@include[cluster.md](../includes/cluster.md) { #cluster-pubsub }
Classic Pub Sub can be used by leveraging the `.toClassic` adapters.
See @ref:[Distributed Publish Subscribe in Cluster](distributed-pub-sub.md). The API is @github[#26338](#26338).
@@include[cluster.md](../includes/cluster.md) { #cluster-multidc }
See @ref:[Cluster Multi-DC](cluster-dc.md).

View file

@ -1,91 +1,66 @@
# Distributed Publish Subscribe in Cluster
For the Akka Classic documentation of this feature see @ref:[Classic Distributed Publish Subscribe](../distributed-pub-sub.md).
Classic Pub Sub can be used by leveraging the `.toClassic` adapters until @github[#26338](#26338).
## Module info
Until the new Distributed Publish Subscribe API, see @github[#26338](#26338),
you can use Classic Distributed Publish Subscribe
@ref:[coexisting](coexisting.md) with new Cluster and actors. To do this, add following dependency in your project:
The distributed publish subscribe topic API is available and usable with the core `akka-actor-typed` module, however it will only be distributed
when used in a clustered application:
@@dependency[sbt,Maven,Gradle] {
group="com.typesafe.akka"
artifact="akka-cluster-tools_$scala.binary_version$"
artifact="akka-cluster-typed_$scala.binary_version$"
version="$akka.version$"
}
Add the new Cluster API if you don't already have it in an existing Cluster application:
## The Topic Actor
@@dependency[sbt,Maven,Gradle] {
group=com.typesafe.akka
artifact=akka-cluster-typed_$scala.binary_version$
version=$akka.version$
}
Distributed publish subscribe is achieved by representing each pub sub topic with an actor, `akka.actor.typed.pubsub.Topic`.
@@project-info{ projectId="akka-cluster-typed" }
The topic actor needs to run on each node where subscribers will live or that wants to publish messages to the topic.
## Sample project
Until @github[#26338](#26338), [this simple example]($github.base_url$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) shows how to use
@ref:[Classic Distributed Publish Subscribe](../distributed-pub-sub.md) with the new Cluster API.
### The DistributedPubSub extension
The mediator can either be started and accessed with the `akka.cluster.pubsub.DistributedPubSub` extension as shown below,
or started as an ordinary actor, see the full Akka Classic documentation @ref:[Classic Distributed PubSub Extension](../distributed-pub-sub.md#distributedpubsub-extension).
The identity of the topic is a tuple of the type of messages that can be published and a string topic name but it is recommended
to not define multiple topics with different types and the same topic name.
Scala
: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #mediator }
: @@snip [PubSubExample.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala) { #start-topic }
Actors register to a topic for Pub-Sub mode, or register to a path for point-to-point mode.
Java
: @@snip [PubSubExample.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java) { #start-topic }
## Publish
Pub-Sub mode. For the full Akka Classic documentation of this feature see @ref:[Classic Distributed PubSub Publish](../distributed-pub-sub.md#publish).
### Subscribers
Subscriber actors can be started on several nodes in the cluster, and all will receive
messages published to the "content" topic.
An actor that subscribes to a topic:
Local actors can then subscribe to the topic (and unsubscribe from it):
Scala
: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #subscriber }
: @@snip [PubSubExample.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala) { #subscribe }
Java
: @@snip [PubSubExample.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java) { #subscribe }
Actors may also be subscribed to a named topic with a `group` id.
For the full feature description see @ref:[topic groups](../distributed-pub-sub.md#topic-groups).
### Publishers
Publishers publish messages to the topic from anywhere in the cluster.
Messages are published by sending `DistributedPubSubMediator.Publish` message to the
local mediator.
An actor that publishes to a topic:
And publish messages to the topic:
Scala
: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #publisher }
: @@snip [PubSubExample.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala) { #publish }
## Send
Java
: @@snip [PubSubExample.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java) { #publish }
Messages can be sent in point-to-point or broadcast mode. For the full Akka Classic documentation of this feature see @ref:[Classic Distributed PubSub Send](../distributed-pub-sub.md#send).
## Pub Sub Scalability
First, an actor must register a destination to send to:
Each topic is represented by one @ref[Receptionist](actor-discovery.md) service key meaning that the number of topics
will scale to thousands or tens of thousands but for higher numbers of topics will require custom solutions. It also means
that a very high turnaround of unique topics will not work well and for such use cases a custom solution is advised.
Scala
: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #destination }
The topic actor acts as a proxy and delegates to the local subscribers handling deduplication so that a published message
is only sent once to a node regardless of how many subscribers there are to the topic on that node.
An actor that sends to a registered path:
Scala
: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #send }
Actors are automatically removed from the registry when they are terminated, or you
can explicitly remove entries with `DistributedPubSubMediator.Remove`.
When a topic actor has no subscribers for a topic it will deregister itself from the receptionist meaning published messages
for the topic will not be sent to it.
## Delivery Guarantee
For the full Akka Classic documentation of this see @ref:[Classic Distributed PubSub Delivery Guarantee](../distributed-pub-sub.md#delivery-guarantee).
As in @ref:[Message Delivery Reliability](../general/message-delivery-reliability.md) of Akka, message delivery guarantee in distributed pub sub modes is **at-most-once delivery**. In other words, messages can be lost over the wire. In addition to that the registry of nodes which have subscribers is eventually consistent
meaning that subscribing an actor on one node will have a short delay before it is known on other nodes and published to.
If you are looking for at-least-once delivery guarantee, we recommend [Alpakka Kafka](https://doc.akka.io/docs/alpakka-kafka/current/).

View file

@ -20,7 +20,7 @@ Unit testing of `EventSourcedBehavior` can be done with the @ref:[ActorTestKit](
in the same way as other behaviors.
@ref:[Synchronous behavior testing](testing-sync.md) for `EventSourcedBehavior` is not supported yet, but
tracked in @github[issue #26338](#23712).
tracked in @github[issue #23712](#23712).
You need to configure a journal, and the in-memory journal is sufficient for unit testing. To enable the
in-memory journal you need to pass the following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`].

View file

@ -290,7 +290,8 @@ lazy val protobufV3 = akkaModule("akka-protobuf-v3")
exportJars := true, // in dependent projects, use assembled and shaded jar
makePomConfiguration := makePomConfiguration.value
.withConfigurations(Vector(Compile)), // prevent original dependency to be added to pom as runtime dep
packageBin in Compile := ReproducibleBuildsPlugin.postProcessJar((assembly in Compile).value), // package by running assembly
packageBin in Compile := ReproducibleBuildsPlugin
.postProcessJar((assembly in Compile).value), // package by running assembly
// Prevent cyclic task dependencies, see https://github.com/sbt/sbt-assembly/issues/365
fullClasspath in assembly := (managedClasspath in Runtime).value, // otherwise, there's a cyclic dependency between packageBin and assembly
test in assembly := {}, // assembly runs tests for unknown reason which introduces another cyclic dependency to packageBin via exportedJars
@ -350,11 +351,7 @@ lazy val streamTestkit = akkaModule("akka-stream-testkit")
lazy val streamTests = akkaModule("akka-stream-tests")
.configs(akka.Jdk9.TestJdk9)
.dependsOn(
streamTestkit % "test->test",
remote % "test->test",
stream % "TestJdk9->CompileJdk9"
)
.dependsOn(streamTestkit % "test->test", remote % "test->test", stream % "TestJdk9->CompileJdk9")
.settings(Dependencies.streamTests)
.enablePlugins(NoPublish, Jdk9)
.disablePlugins(MimaPlugin, WhiteSourcePlugin)
@ -419,6 +416,9 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
remoteTests % "test->test",
jackson % "test->test")
.settings(AutomaticModuleName.settings("akka.cluster.typed"))
.settings(Protobuf.settings)
// To be able to import ContainerFormats.proto
.settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf"))
.configs(MultiJvm)
.enablePlugins(MultiNodeScalaTest)