diff --git a/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java new file mode 100644 index 0000000000..c5335d8733 --- /dev/null +++ b/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package jdocs.akka.typed.pubsub; + +import akka.actor.typed.ActorRef; +import akka.actor.typed.Behavior; +import akka.actor.typed.javadsl.Behaviors; + +// #start-topic +import akka.actor.typed.pubsub.Topic; + +// #start-topic + +public class PubSubExample { + + static class Message { + public final String text; + + public Message(String text) { + this.text = text; + } + } + + private Behavior behavior = + // #start-topic + Behaviors.setup( + context -> { + ActorRef> topic = + context.spawn(Topic.create(Message.class, "my-topic"), "MyTopic"); + // #start-topic + + ActorRef subscriberActor = null; + // #subscribe + topic.tell(Topic.subscribe(subscriberActor)); + + topic.tell(Topic.unsubscribe(subscriberActor)); + // #subscribe + + // #publish + topic.tell(Topic.publish(new Message("Hello Subscribers!"))); + // #publish + + return Behaviors.empty(); + }); +} diff --git a/akka-actor-typed-tests/src/test/scala/akka/actor/typed/pubsub/LocalPubSubSpec.scala b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/pubsub/LocalPubSubSpec.scala new file mode 100644 index 0000000000..0e562728f0 --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/akka/actor/typed/pubsub/LocalPubSubSpec.scala @@ -0,0 +1,130 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.actor.typed.pubsub + +import akka.actor.testkit.typed.scaladsl.LoggingTestKit +import akka.actor.testkit.typed.scaladsl.LogCapturing +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import org.scalatest.wordspec.AnyWordSpecLike +import scala.concurrent.duration._ + +class LocalPubSubSpec extends ScalaTestWithActorTestKit with AnyWordSpecLike with LogCapturing { + + "A pub-sub topic running locally" must { + + "publish to all local subscriber actors of a topic" in { + val fruitTopic = + LoggingTestKit.debug("Topic list updated").expect { + testKit.spawn(Topic[String]("fruit")) + } + + try { + val probe1 = testKit.createTestProbe[String]() + val probe2 = testKit.createTestProbe[String]() + val probe3 = testKit.createTestProbe[String]() + + LoggingTestKit.debug("Topic list updated").expect { + fruitTopic ! Topic.Subscribe(probe1.ref) + fruitTopic ! Topic.Subscribe(probe2.ref) + fruitTopic ! Topic.Subscribe(probe3.ref) + } + + fruitTopic ! Topic.Publish("banana") + probe1.expectMessage("banana") + probe2.expectMessage("banana") + probe3.expectMessage("banana") + + } finally { + testKit.stop(fruitTopic) + } + } + + "publish to all subscriber actors across several instances of the same topic" in { + val fruitTopic1 = + LoggingTestKit.debug("Topic list updated").expect { + testKit.spawn(Topic[String]("fruit")) + } + val fruitTopic2 = + LoggingTestKit.debug("Topic list updated").expect { + testKit.spawn(Topic[String]("fruit")) + } + + try { + val probe1 = testKit.createTestProbe[String]() + val probe2 = testKit.createTestProbe[String]() + val probe3 = testKit.createTestProbe[String]() + + LoggingTestKit + .debug("Topic list updated") + // both topic instances should have seen the updated list + .withOccurrences(2) + .expect { + fruitTopic2 ! Topic.Subscribe(probe1.ref) + fruitTopic2 ! Topic.Subscribe(probe2.ref) + fruitTopic2 ! Topic.Subscribe(probe3.ref) + } + + fruitTopic1 ! Topic.Publish("banana") + probe1.expectMessage("banana") + probe2.expectMessage("banana") + probe3.expectMessage("banana") + + } finally { + testKit.stop(fruitTopic1) + testKit.stop(fruitTopic2) + } + } + + "doesn't publish across topics unsubscribe" in { + val fruitTopic = + LoggingTestKit.debug("Topic list updated").expect { + testKit.spawn(Topic[String]("fruit")) + } + val veggieTopic = + LoggingTestKit.debug("Topic list updated").expect { + testKit.spawn(Topic[String]("veggies")) + } + + try { + val probe1 = testKit.createTestProbe[String]() + + LoggingTestKit.debug("Topic list updated").expect { + fruitTopic ! Topic.Subscribe(probe1.ref) + } + + veggieTopic ! Topic.Publish("carrot") + probe1.expectNoMessage(200.millis) + + } finally { + testKit.stop(fruitTopic) + } + } + + "doesn't publish after unsubscribe" in { + val fruitTopic = + LoggingTestKit.debug("Topic list updated").expect { + testKit.spawn(Topic[String]("fruit")) + } + + try { + val probe1 = testKit.createTestProbe[String]() + + LoggingTestKit.debug("Topic list updated").expect { + fruitTopic ! Topic.Subscribe(probe1.ref) + } + LoggingTestKit.debug("Topic list updated").expect { + fruitTopic ! Topic.Unsubscribe(probe1.ref) + } + + fruitTopic ! Topic.Publish("banana") + probe1.expectNoMessage(200.millis) + + } finally { + testKit.stop(fruitTopic) + } + } + + } +} diff --git a/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala new file mode 100644 index 0000000000..86682ac22a --- /dev/null +++ b/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala @@ -0,0 +1,38 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package docs.akka.typed.pubsub + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.scaladsl.Behaviors + +object PubSubExample { + + case class Message(text: String) + + def example: Behavior[Any] = { + // #start-topic + import akka.actor.typed.pubsub.Topic + + Behaviors.setup { context => + val topic = context.spawn(Topic[Message]("my-topic"), "MyTopic") + // #start-topic + + val subscriberActor: ActorRef[Message] = ??? + // #subscribe + topic ! Topic.Subscribe(subscriberActor) + + topic ! Topic.Unsubscribe(subscriberActor) + // #subscribe + + // #publish + topic ! Topic.Publish(Message("Hello Subscribers!")) + // #publish + + Behaviors.empty + } + } + +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/internal/pubsub/TopicImpl.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/pubsub/TopicImpl.scala new file mode 100644 index 0000000000..d3e276287e --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/internal/pubsub/TopicImpl.scala @@ -0,0 +1,143 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.actor.typed.internal.pubsub + +import akka.actor.Dropped +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.pubsub.Topic +import akka.actor.typed.receptionist.Receptionist +import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.AbstractBehavior +import akka.actor.typed.scaladsl.ActorContext +import akka.actor.typed.scaladsl.LoggerOps +import akka.actor.typed.scaladsl.adapter._ +import akka.annotation.InternalApi + +import scala.reflect.ClassTag + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object TopicImpl { + + trait Command[T] + + // actual public messages but internal to ease bincomp evolution + final case class Publish[T](message: T) extends Topic.Command[T] + final case class Subscribe[T](subscriber: ActorRef[T]) extends Topic.Command[T] + final case class Unsubscribe[T](subscriber: ActorRef[T]) extends Topic.Command[T] + + // internal messages, note that the protobuf serializer for those sent remotely is defined in akka-cluster-typed + final case class GetTopicStats[T](replyTo: ActorRef[TopicStats]) extends Topic.Command[T] + final case class TopicStats(localSubscriberCount: Int, topicInstanceCount: Int) + final case class TopicInstancesUpdated[T](topics: Set[ActorRef[TopicImpl.Command[T]]]) extends Command[T] + final case class MessagePublished[T](message: T) extends Command[T] + final case class SubscriberTerminated[T](subscriber: ActorRef[T]) extends Command[T] +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class TopicImpl[T](topicName: String, context: ActorContext[TopicImpl.Command[T]])( + implicit classTag: ClassTag[T]) + extends AbstractBehavior[TopicImpl.Command[T]](context) { + + /* + * The topic actor keeps a local set of subscribers, whenever that is non-empty it registers itself for + * a topic service key, and when it becomes empty again it deregisters from the service key. Published + * messages go to all currently known topics registered for the topic service key and the individual topic + * instances then broadcast the message to all local subscribers. This achieves deduplication for nodes + * with multiple subscribers and avoids sending to nodes without any subscribers. + */ + + import TopicImpl._ + + private val topicServiceKey = ServiceKey[TopicImpl.Command[T]](topicName) + context.log.debugN( + "Starting up pub-sub topic [{}] for messages of type [{}]", + topicName, + classTag.runtimeClass.getName) + + private var topicInstances = Set.empty[ActorRef[TopicImpl.Command[T]]] + private var localSubscribers = Set.empty[ActorRef[T]] + + private val receptionist = context.system.receptionist + private val receptionistAdapter = context.messageAdapter[Receptionist.Listing] { + case topicServiceKey.Listing(topics) => TopicInstancesUpdated(topics) + } + receptionist ! Receptionist.Subscribe(topicServiceKey, receptionistAdapter) + + override def onMessage(msg: TopicImpl.Command[T]): Behavior[TopicImpl.Command[T]] = msg match { + + case Publish(message) => + if (topicInstances.isEmpty) { + context.log.trace("Publishing message of type [{}] but no subscribers, dropping", msg.getClass) + context.system.deadLetters ! Dropped(message, "No topic subscribers known", context.self.toClassic) + } else { + context.log.trace("Publishing message of type [{}]", msg.getClass) + val pub = MessagePublished(message) + topicInstances.foreach(_ ! pub) + } + this + + case MessagePublished(msg) => + context.log.trace("Message of type [{}] published", msg.getClass) + localSubscribers.foreach(_ ! msg) + this + + case Subscribe(subscriber) => + if (!localSubscribers.contains(subscriber)) { + context.watchWith(subscriber, SubscriberTerminated(subscriber)) + localSubscribers = localSubscribers + subscriber + if (localSubscribers.size == 1) { + context.log.debug( + "Local subscriber [{}] added, went from no subscribers to one, subscribing to receptionist", + subscriber) + // we went from no subscribers to one, register to the receptionist + receptionist ! Receptionist.Register(topicServiceKey, context.self) + } else { + context.log.debug("Local subscriber [{}] added", subscriber) + } + } else { + context.log.debug("Local subscriber [{}] already subscribed, ignoring Subscribe command") + } + this + + case Unsubscribe(subscriber) => + context.unwatch(subscriber) + localSubscribers = localSubscribers.filterNot(_ == subscriber) + if (localSubscribers.isEmpty) { + context.log.debug("Last local subscriber [{}] unsubscribed, deregistering from receptionist", subscriber) + // that was the lost subscriber, deregister from the receptionist + receptionist ! Receptionist.Deregister(topicServiceKey, context.self) + } else { + context.log.debug("Local subscriber [{}] unsubscribed", subscriber) + } + this + + case SubscriberTerminated(subscriber) => + localSubscribers -= subscriber + if (localSubscribers.isEmpty) { + context.log.debug("Last local subscriber [{}] terminated, deregistering from receptionist", subscriber) + // that was the last subscriber, deregister from the receptionist + receptionist ! Receptionist.Deregister(topicServiceKey, context.self) + } else { + context.log.debug("Local subscriber [{}] terminated, removing from subscriber list", subscriber) + } + this + + case TopicInstancesUpdated(newTopics) => + context.log.debug("Topic list updated [{}]", newTopics) + topicInstances = newTopics + this + + case GetTopicStats(replyTo) => + replyTo ! TopicStats(localSubscribers.size, topicInstances.size) + this + } +} diff --git a/akka-actor-typed/src/main/scala/akka/actor/typed/pubsub/Topic.scala b/akka-actor-typed/src/main/scala/akka/actor/typed/pubsub/Topic.scala new file mode 100644 index 0000000000..e88dc70cff --- /dev/null +++ b/akka-actor-typed/src/main/scala/akka/actor/typed/pubsub/Topic.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2009-2020 Lightbend Inc. + */ + +package akka.actor.typed.pubsub + +import akka.actor.typed.ActorRef +import akka.actor.typed.Behavior +import akka.actor.typed.internal.pubsub.TopicImpl +import akka.actor.typed.scaladsl.Behaviors +import akka.annotation.DoNotInherit + +import scala.reflect.ClassTag + +/** + * A pub sub topic is an actor that handles subscribing to a topic and publishing messages to all subscribed actors. + * + * It is mostly useful in a clustered setting, where it is intended to be started once on every node that want to + * house subscribers or publish messages to the topic, but it also works in a local setting without cluster. + * + * In a clustered context messages are deduplicated so that there is at most one message sent to each node for + * each publish and if there are no subscribers on a node, no message is sent to it. Note that the list of subscribers + * is eventually consistent and there are no delivery guarantees built in. + * + * Each topic results in a [[akka.actor.typed.receptionist.ServiceKey]] in the [[akka.actor.typed.receptionist.Receptionist]] + * so the same scaling recommendation holds for topics, see docs: + * https://doc.akka.io/docs/akka/current/typed/actor-discovery.html#receptionist-scalability + */ +object Topic { + + /** + * Not for user extension + */ + @DoNotInherit + trait Command[T] extends TopicImpl.Command[T] + + /** + * Scala API: Publish the message to all currently known subscribers. + */ + object Publish { + def apply[T](message: T): Command[T] = + TopicImpl.Publish(message) + } + + /** + * Java API: Publish the message to all currently known subscribers. + */ + def publish[T](message: T): Command[T] = Publish(message) + + /** + * Scala API: Subscribe to this topic. Should only be used for local subscribers. + */ + object Subscribe { + def apply[T](subscriber: ActorRef[T]): Command[T] = TopicImpl.Subscribe(subscriber) + } + + /** + * Java API: Subscribe to this topic. Should only be used for local subscribers. + */ + def subscribe[T](subscriber: ActorRef[T]): Command[T] = Subscribe(subscriber) + + /** + * Scala API: Unsubscribe a previously subscribed actor from this topic. + */ + object Unsubscribe { + def apply[T](subscriber: ActorRef[T]): Command[T] = TopicImpl.Unsubscribe(subscriber) + } + + /** + * Java API: Unsubscribe a previously subscribed actor from this topic. + */ + def unsubscribe[T](subscriber: ActorRef[T]): Command[T] = Unsubscribe(subscriber) + + /** + * Scala API: Create a topic actor behavior for the given topic name and message type. + */ + def apply[T](topicName: String)(implicit classTag: ClassTag[T]): Behavior[Command[T]] = + Behaviors.setup[TopicImpl.Command[T]](context => new TopicImpl[T](topicName, context)).narrow + + /** + * Java API: Create a topic actor behavior for the given topic name and message class + */ + def create[T](messageClass: Class[T], topicName: String): Behavior[Command[T]] = + apply[T](topicName)(ClassTag(messageClass)) + +} diff --git a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java index 946553cb19..609da28988 100644 --- a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java +++ b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java @@ -1,5 +1,5 @@ /* - * Copyright (C) 2019-2020 Lightbend Inc. + * Copyright (C) 2020 Lightbend Inc. */ // Generated by the protocol buffer compiler. DO NOT EDIT! @@ -731,11 +731,659 @@ public final class ClusterMessages { } + public interface PubSubMessagePublishedOrBuilder extends + // @@protoc_insertion_point(interface_extends:akka.cluster.typed.PubSubMessagePublished) + akka.protobufv3.internal.MessageOrBuilder { + + /** + * required .Payload message = 1; + * @return Whether the message field is set. + */ + boolean hasMessage(); + /** + * required .Payload message = 1; + * @return The message. + */ + akka.remote.ContainerFormats.Payload getMessage(); + /** + * required .Payload message = 1; + */ + akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder(); + } + /** + * Protobuf type {@code akka.cluster.typed.PubSubMessagePublished} + */ + public static final class PubSubMessagePublished extends + akka.protobufv3.internal.GeneratedMessageV3 implements + // @@protoc_insertion_point(message_implements:akka.cluster.typed.PubSubMessagePublished) + PubSubMessagePublishedOrBuilder { + private static final long serialVersionUID = 0L; + // Use PubSubMessagePublished.newBuilder() to construct. + private PubSubMessagePublished(akka.protobufv3.internal.GeneratedMessageV3.Builder builder) { + super(builder); + } + private PubSubMessagePublished() { + } + + @java.lang.Override + @SuppressWarnings({"unused"}) + protected java.lang.Object newInstance( + akka.protobufv3.internal.GeneratedMessageV3.UnusedPrivateParameter unused) { + return new PubSubMessagePublished(); + } + + @java.lang.Override + public final akka.protobufv3.internal.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private PubSubMessagePublished( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + this(); + if (extensionRegistry == null) { + throw new java.lang.NullPointerException(); + } + int mutable_bitField0_ = 0; + akka.protobufv3.internal.UnknownFieldSet.Builder unknownFields = + akka.protobufv3.internal.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + case 10: { + akka.remote.ContainerFormats.Payload.Builder subBuilder = null; + if (((bitField0_ & 0x00000001) != 0)) { + subBuilder = message_.toBuilder(); + } + message_ = input.readMessage(akka.remote.ContainerFormats.Payload.PARSER, extensionRegistry); + if (subBuilder != null) { + subBuilder.mergeFrom(message_); + message_ = subBuilder.buildPartial(); + } + bitField0_ |= 0x00000001; + break; + } + default: { + if (!parseUnknownField( + input, unknownFields, extensionRegistry, tag)) { + done = true; + } + break; + } + } + } + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobufv3.internal.InvalidProtocolBufferException( + e).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.class, akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.Builder.class); + } + + private int bitField0_; + public static final int MESSAGE_FIELD_NUMBER = 1; + private akka.remote.ContainerFormats.Payload message_; + /** + * required .Payload message = 1; + * @return Whether the message field is set. + */ + public boolean hasMessage() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * required .Payload message = 1; + * @return The message. + */ + public akka.remote.ContainerFormats.Payload getMessage() { + return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_; + } + /** + * required .Payload message = 1; + */ + public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { + return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_; + } + + private byte memoizedIsInitialized = -1; + @java.lang.Override + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized == 1) return true; + if (isInitialized == 0) return false; + + if (!hasMessage()) { + memoizedIsInitialized = 0; + return false; + } + if (!getMessage().isInitialized()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + @java.lang.Override + public void writeTo(akka.protobufv3.internal.CodedOutputStream output) + throws java.io.IOException { + if (((bitField0_ & 0x00000001) != 0)) { + output.writeMessage(1, getMessage()); + } + unknownFields.writeTo(output); + } + + @java.lang.Override + public int getSerializedSize() { + int size = memoizedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) != 0)) { + size += akka.protobufv3.internal.CodedOutputStream + .computeMessageSize(1, getMessage()); + } + size += unknownFields.getSerializedSize(); + memoizedSize = size; + return size; + } + + @java.lang.Override + public boolean equals(final java.lang.Object obj) { + if (obj == this) { + return true; + } + if (!(obj instanceof akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished)) { + return super.equals(obj); + } + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished other = (akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished) obj; + + if (hasMessage() != other.hasMessage()) return false; + if (hasMessage()) { + if (!getMessage() + .equals(other.getMessage())) return false; + } + if (!unknownFields.equals(other.unknownFields)) return false; + return true; + } + + @java.lang.Override + public int hashCode() { + if (memoizedHashCode != 0) { + return memoizedHashCode; + } + int hash = 41; + hash = (19 * hash) + getDescriptor().hashCode(); + if (hasMessage()) { + hash = (37 * hash) + MESSAGE_FIELD_NUMBER; + hash = (53 * hash) + getMessage().hashCode(); + } + hash = (29 * hash) + unknownFields.hashCode(); + memoizedHashCode = hash; + return hash; + } + + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + java.nio.ByteBuffer data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + java.nio.ByteBuffer data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + akka.protobufv3.internal.ByteString data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + akka.protobufv3.internal.ByteString data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(byte[] data) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + byte[] data, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseDelimitedFrom( + java.io.InputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseDelimitedWithIOException(PARSER, input, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + akka.protobufv3.internal.CodedInputStream input) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parseFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return akka.protobufv3.internal.GeneratedMessageV3 + .parseWithIOException(PARSER, input, extensionRegistry); + } + + @java.lang.Override + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder() { + return DEFAULT_INSTANCE.toBuilder(); + } + public static Builder newBuilder(akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished prototype) { + return DEFAULT_INSTANCE.toBuilder().mergeFrom(prototype); + } + @java.lang.Override + public Builder toBuilder() { + return this == DEFAULT_INSTANCE + ? new Builder() : new Builder().mergeFrom(this); + } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code akka.cluster.typed.PubSubMessagePublished} + */ + public static final class Builder extends + akka.protobufv3.internal.GeneratedMessageV3.Builder implements + // @@protoc_insertion_point(builder_implements:akka.cluster.typed.PubSubMessagePublished) + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublishedOrBuilder { + public static final akka.protobufv3.internal.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor; + } + + @java.lang.Override + protected akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.class, akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.Builder.class); + } + + // Construct using akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobufv3.internal.GeneratedMessageV3.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobufv3.internal.GeneratedMessageV3 + .alwaysUseFieldBuilders) { + getMessageFieldBuilder(); + } + } + @java.lang.Override + public Builder clear() { + super.clear(); + if (messageBuilder_ == null) { + message_ = null; + } else { + messageBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + + @java.lang.Override + public akka.protobufv3.internal.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor; + } + + @java.lang.Override + public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished getDefaultInstanceForType() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.getDefaultInstance(); + } + + @java.lang.Override + public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished build() { + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + @java.lang.Override + public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished buildPartial() { + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished result = new akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) != 0)) { + if (messageBuilder_ == null) { + result.message_ = message_; + } else { + result.message_ = messageBuilder_.build(); + } + to_bitField0_ |= 0x00000001; + } + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + @java.lang.Override + public Builder clone() { + return super.clone(); + } + @java.lang.Override + public Builder setField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.setField(field, value); + } + @java.lang.Override + public Builder clearField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field) { + return super.clearField(field); + } + @java.lang.Override + public Builder clearOneof( + akka.protobufv3.internal.Descriptors.OneofDescriptor oneof) { + return super.clearOneof(oneof); + } + @java.lang.Override + public Builder setRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + int index, java.lang.Object value) { + return super.setRepeatedField(field, index, value); + } + @java.lang.Override + public Builder addRepeatedField( + akka.protobufv3.internal.Descriptors.FieldDescriptor field, + java.lang.Object value) { + return super.addRepeatedField(field, value); + } + @java.lang.Override + public Builder mergeFrom(akka.protobufv3.internal.Message other) { + if (other instanceof akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished) { + return mergeFrom((akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished other) { + if (other == akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished.getDefaultInstance()) return this; + if (other.hasMessage()) { + mergeMessage(other.getMessage()); + } + this.mergeUnknownFields(other.unknownFields); + onChanged(); + return this; + } + + @java.lang.Override + public final boolean isInitialized() { + if (!hasMessage()) { + return false; + } + if (!getMessage().isInitialized()) { + return false; + } + return true; + } + + @java.lang.Override + public Builder mergeFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobufv3.internal.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished) e.getUnfinishedMessage(); + throw e.unwrapIOException(); + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + private akka.remote.ContainerFormats.Payload message_; + private akka.protobufv3.internal.SingleFieldBuilderV3< + akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> messageBuilder_; + /** + * required .Payload message = 1; + * @return Whether the message field is set. + */ + public boolean hasMessage() { + return ((bitField0_ & 0x00000001) != 0); + } + /** + * required .Payload message = 1; + * @return The message. + */ + public akka.remote.ContainerFormats.Payload getMessage() { + if (messageBuilder_ == null) { + return message_ == null ? akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_; + } else { + return messageBuilder_.getMessage(); + } + } + /** + * required .Payload message = 1; + */ + public Builder setMessage(akka.remote.ContainerFormats.Payload value) { + if (messageBuilder_ == null) { + if (value == null) { + throw new NullPointerException(); + } + message_ = value; + onChanged(); + } else { + messageBuilder_.setMessage(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .Payload message = 1; + */ + public Builder setMessage( + akka.remote.ContainerFormats.Payload.Builder builderForValue) { + if (messageBuilder_ == null) { + message_ = builderForValue.build(); + onChanged(); + } else { + messageBuilder_.setMessage(builderForValue.build()); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .Payload message = 1; + */ + public Builder mergeMessage(akka.remote.ContainerFormats.Payload value) { + if (messageBuilder_ == null) { + if (((bitField0_ & 0x00000001) != 0) && + message_ != null && + message_ != akka.remote.ContainerFormats.Payload.getDefaultInstance()) { + message_ = + akka.remote.ContainerFormats.Payload.newBuilder(message_).mergeFrom(value).buildPartial(); + } else { + message_ = value; + } + onChanged(); + } else { + messageBuilder_.mergeFrom(value); + } + bitField0_ |= 0x00000001; + return this; + } + /** + * required .Payload message = 1; + */ + public Builder clearMessage() { + if (messageBuilder_ == null) { + message_ = null; + onChanged(); + } else { + messageBuilder_.clear(); + } + bitField0_ = (bitField0_ & ~0x00000001); + return this; + } + /** + * required .Payload message = 1; + */ + public akka.remote.ContainerFormats.Payload.Builder getMessageBuilder() { + bitField0_ |= 0x00000001; + onChanged(); + return getMessageFieldBuilder().getBuilder(); + } + /** + * required .Payload message = 1; + */ + public akka.remote.ContainerFormats.PayloadOrBuilder getMessageOrBuilder() { + if (messageBuilder_ != null) { + return messageBuilder_.getMessageOrBuilder(); + } else { + return message_ == null ? + akka.remote.ContainerFormats.Payload.getDefaultInstance() : message_; + } + } + /** + * required .Payload message = 1; + */ + private akka.protobufv3.internal.SingleFieldBuilderV3< + akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder> + getMessageFieldBuilder() { + if (messageBuilder_ == null) { + messageBuilder_ = new akka.protobufv3.internal.SingleFieldBuilderV3< + akka.remote.ContainerFormats.Payload, akka.remote.ContainerFormats.Payload.Builder, akka.remote.ContainerFormats.PayloadOrBuilder>( + getMessage(), + getParentForChildren(), + isClean()); + message_ = null; + } + return messageBuilder_; + } + @java.lang.Override + public final Builder setUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.setUnknownFields(unknownFields); + } + + @java.lang.Override + public final Builder mergeUnknownFields( + final akka.protobufv3.internal.UnknownFieldSet unknownFields) { + return super.mergeUnknownFields(unknownFields); + } + + + // @@protoc_insertion_point(builder_scope:akka.cluster.typed.PubSubMessagePublished) + } + + // @@protoc_insertion_point(class_scope:akka.cluster.typed.PubSubMessagePublished) + private static final akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished DEFAULT_INSTANCE; + static { + DEFAULT_INSTANCE = new akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished(); + } + + public static akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished getDefaultInstance() { + return DEFAULT_INSTANCE; + } + + @java.lang.Deprecated public static final akka.protobufv3.internal.Parser + PARSER = new akka.protobufv3.internal.AbstractParser() { + @java.lang.Override + public PubSubMessagePublished parsePartialFrom( + akka.protobufv3.internal.CodedInputStream input, + akka.protobufv3.internal.ExtensionRegistryLite extensionRegistry) + throws akka.protobufv3.internal.InvalidProtocolBufferException { + return new PubSubMessagePublished(input, extensionRegistry); + } + }; + + public static akka.protobufv3.internal.Parser parser() { + return PARSER; + } + + @java.lang.Override + public akka.protobufv3.internal.Parser getParserForType() { + return PARSER; + } + + @java.lang.Override + public akka.cluster.typed.internal.protobuf.ClusterMessages.PubSubMessagePublished getDefaultInstanceForType() { + return DEFAULT_INSTANCE; + } + + } + private static final akka.protobufv3.internal.Descriptors.Descriptor internal_static_akka_cluster_typed_ReceptionistEntry_descriptor; private static final akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable; + private static final akka.protobufv3.internal.Descriptors.Descriptor + internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor; + private static final + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable + internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable; public static akka.protobufv3.internal.Descriptors.FileDescriptor getDescriptor() { @@ -746,13 +1394,16 @@ public final class ClusterMessages { static { java.lang.String[] descriptorData = { "\n\025ClusterMessages.proto\022\022akka.cluster.ty" + - "ped\"8\n\021ReceptionistEntry\022\020\n\010actorRef\030\001 \002" + - "(\t\022\021\n\tsystemUid\030\002 \002(\004B(\n$akka.cluster.ty" + - "ped.internal.protobufH\001" + "ped\032\026ContainerFormats.proto\"8\n\021Reception" + + "istEntry\022\020\n\010actorRef\030\001 \002(\t\022\021\n\tsystemUid\030" + + "\002 \002(\004\"3\n\026PubSubMessagePublished\022\031\n\007messa" + + "ge\030\001 \002(\0132\010.PayloadB(\n$akka.cluster.typed" + + ".internal.protobufH\001" }; descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor .internalBuildGeneratedFileFrom(descriptorData, new akka.protobufv3.internal.Descriptors.FileDescriptor[] { + akka.remote.ContainerFormats.getDescriptor(), }); internal_static_akka_cluster_typed_ReceptionistEntry_descriptor = getDescriptor().getMessageTypes().get(0); @@ -760,6 +1411,13 @@ public final class ClusterMessages { akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( internal_static_akka_cluster_typed_ReceptionistEntry_descriptor, new java.lang.String[] { "ActorRef", "SystemUid", }); + internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor = + getDescriptor().getMessageTypes().get(1); + internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable = new + akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable( + internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor, + new java.lang.String[] { "Message", }); + akka.remote.ContainerFormats.getDescriptor(); } // @@protoc_insertion_point(outer_class_scope) diff --git a/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto b/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto index 39151aeb88..7b77eafc47 100644 --- a/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto +++ b/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto @@ -8,9 +8,14 @@ package akka.cluster.typed; option java_package = "akka.cluster.typed.internal.protobuf"; option optimize_for = SPEED; +import "ContainerFormats.proto"; message ReceptionistEntry { required string actorRef = 1; required uint64 systemUid = 2; } + +message PubSubMessagePublished { + required Payload message = 1; +} \ No newline at end of file diff --git a/akka-cluster-typed/src/main/resources/reference.conf b/akka-cluster-typed/src/main/resources/reference.conf index 67f1507bff..6f6ba574af 100644 --- a/akka-cluster-typed/src/main/resources/reference.conf +++ b/akka-cluster-typed/src/main/resources/reference.conf @@ -49,6 +49,7 @@ akka { } serialization-bindings { "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster + "akka.actor.typed.internal.pubsub.TopicImpl$MessagePublished" = typed-cluster } } cluster.configuration-compatibility-check.checkers { diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala index eade4e0127..38d19b9661 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala @@ -8,11 +8,13 @@ import java.io.NotSerializableException import akka.actor.ExtendedActorSystem import akka.actor.typed.ActorRefResolver +import akka.actor.typed.internal.pubsub.TopicImpl import akka.annotation.InternalApi import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } import akka.actor.typed.scaladsl.adapter._ import akka.cluster.typed.internal.protobuf.ClusterMessages import akka.cluster.typed.internal.receptionist.ClusterReceptionist.Entry +import akka.remote.serialization.WrappedPayloadSupport /** * INTERNAL API @@ -24,27 +26,41 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend // Serializers are initialized early on. `toTyped` might then try to initialize the classic ActorSystemAdapter extension. private lazy val resolver = ActorRefResolver(system.toTyped) + private val payloadSupport = new WrappedPayloadSupport(system) + private val ReceptionistEntryManifest = "a" + private val PubSubPublishManifest = "b" override def manifest(o: AnyRef): String = o match { - case _: Entry => ReceptionistEntryManifest + case _: Entry => ReceptionistEntryManifest + case _: TopicImpl.MessagePublished[_] => PubSubPublishManifest case _ => throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") } override def toBinary(o: AnyRef): Array[Byte] = o match { - case e: Entry => receptionistEntryToBinary(e) + case e: Entry => receptionistEntryToBinary(e) + case m: TopicImpl.MessagePublished[_] => pubSubPublishToBinary(m) case _ => throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") } override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { case ReceptionistEntryManifest => receptionistEntryFromBinary(bytes) + case PubSubPublishManifest => pubSubMessageFromBinary(bytes) case _ => throw new NotSerializableException( s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") } + private def pubSubPublishToBinary(m: TopicImpl.MessagePublished[_]): Array[Byte] = { + ClusterMessages.PubSubMessagePublished + .newBuilder() + .setMessage(payloadSupport.payloadBuilder(m.message)) + .build() + .toByteArray + } + private def receptionistEntryToBinary(e: Entry): Array[Byte] = ClusterMessages.ReceptionistEntry .newBuilder() @@ -53,6 +69,12 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend .build() .toByteArray + private def pubSubMessageFromBinary(bytes: Array[Byte]): TopicImpl.MessagePublished[_] = { + val parsed = ClusterMessages.PubSubMessagePublished.parseFrom(bytes) + val userMessage = payloadSupport.deserializePayload(parsed.getMessage) + TopicImpl.MessagePublished(userMessage) + } + private def receptionistEntryFromBinary(bytes: Array[Byte]): Entry = { val re = ClusterMessages.ReceptionistEntry.parseFrom(bytes) Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid) diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala index cbc25b4dc4..9eb37b8e0a 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala @@ -6,16 +6,24 @@ package akka.cluster.typed import java.util.concurrent.ConcurrentHashMap +import akka.actor.typed.ActorRef import akka.actor.typed.ActorSystem +import akka.actor.typed.Behavior +import akka.actor.typed.Props +import akka.actor.typed.SpawnProtocol import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.{ Address, Scheduler } import akka.cluster.{ ClusterEvent, MemberStatus } import akka.remote.testconductor.RoleName import akka.remote.testkit.{ MultiNodeSpec, STMultiNodeSpec } import akka.testkit.WatchedByCoroner +import akka.util.Timeout import org.scalatest.Suite import org.scalatest.matchers.should.Matchers +import scala.concurrent.Await +import scala.concurrent.Future import scala.concurrent.duration._ import scala.language.implicitConversions @@ -75,4 +83,12 @@ trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedB enterBarrier("all-joined") } + private lazy val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command] + def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = { + implicit val timeout: Timeout = testKitSettings.DefaultTimeout + val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _)) + + Await.result(f, timeout.duration * 2) + } + } diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/PubSubSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/PubSubSpec.scala new file mode 100644 index 0000000000..a625a53794 --- /dev/null +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/PubSubSpec.scala @@ -0,0 +1,115 @@ +/* + * Copyright (C) 2018-2020 Lightbend Inc. + */ + +package akka.cluster.typed + +import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.typed.ActorRef +import akka.actor.typed.internal.pubsub.TopicImpl +import akka.actor.typed.pubsub.Topic +import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.MultiNodeClusterSpec +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.serialization.jackson.CborSerializable +import com.typesafe.config.ConfigFactory + +object PubSubSpecConfig extends MultiNodeConfig { + val first: RoleName = role("first") + val second = role("second") + val third = role("third") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + """).withFallback(MultiNodeClusterSpec.clusterConfig)) + + nodeConfig(first)(ConfigFactory.parseString(""" + akka.cluster.multi-data-center.self-data-center = "dc1" + """)) + + nodeConfig(second, third)(ConfigFactory.parseString(""" + akka.cluster.multi-data-center.self-data-center = "dc2" + """)) + + case class Message(msg: String) extends CborSerializable +} + +class PubSubMultiJvmNode1 extends PubSubSpec +class PubSubMultiJvmNode2 extends PubSubSpec +class PubSubMultiJvmNode3 extends PubSubSpec + +abstract class PubSubSpec extends MultiNodeSpec(PubSubSpecConfig) with MultiNodeTypedClusterSpec { + + import PubSubSpecConfig._ + + var topic: ActorRef[Topic.Command[Message]] = null + val topicProbe = TestProbe[Message]() + var otherTopic: ActorRef[Topic.Command[Message]] = null + val otherTopicProbe = TestProbe[Message]() + + "A cluster" must { + "be able to form" in { + formCluster(first, second, third) + } + + "start a topic on each node" in { + topic = spawn(Topic[Message]("animals"), "AnimalsTopic") + topic ! Topic.Subscribe(topicProbe.ref) + runOn(second, third) { + otherTopic = system.actorOf(PropsAdapter(Topic[Message]("other"))).toTyped[Topic.Command[Message]] + otherTopic ! Topic.Subscribe(otherTopicProbe.ref) + } + enterBarrier("topics started") + } + + "see nodes with subscribers registered" in { + val statsProbe = TestProbe[TopicImpl.TopicStats]() + statsProbe.awaitAssert({ + topic ! TopicImpl.GetTopicStats[Message](statsProbe.ref) + statsProbe.receiveMessage().topicInstanceCount should ===(3) + }) + enterBarrier("topic instances with subscribers seen") + } + + "publish to all nodes" in { + runOn(first) { + topic ! Topic.Publish(Message("monkey")) + } + enterBarrier("first published") + topicProbe.expectMessage(Message("monkey")) + runOn(second, third) { + // check that messages are not leaking between topics + otherTopicProbe.expectNoMessage() + } + enterBarrier("publish seen") + } + + "not publish to unsubscribed" in { + runOn(first) { + topic ! Topic.Unsubscribe(topicProbe.ref) + // unsubscribe does not need to be gossiped before it is effective + val statsProbe = TestProbe[TopicImpl.TopicStats]() + statsProbe.awaitAssert({ + topic ! TopicImpl.GetTopicStats[Message](statsProbe.ref) + statsProbe.receiveMessage().topicInstanceCount should ===(2) + }) + } + enterBarrier("unsubscribed") + Thread.sleep(200) // but it needs to reach the topic + + runOn(third) { + topic ! Topic.Publish(Message("donkey")) + } + enterBarrier("second published") + runOn(second, third) { + topicProbe.expectMessage(Message("donkey")) + } + runOn(first) { + topicProbe.expectNoMessage() + } + } + + } +} diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala index f9182d8929..97561d1478 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/internal/ClusterReceptionistUnreachabilitySpec.scala @@ -5,15 +5,9 @@ package akka.cluster.typed.internal import akka.actor.testkit.typed.scaladsl.TestProbe -import akka.actor.typed.ActorRef -import akka.actor.typed.Behavior -import akka.actor.typed.Props -import akka.actor.typed.SpawnProtocol import akka.actor.typed.receptionist.Receptionist import akka.actor.typed.receptionist.ServiceKey -import akka.actor.typed.scaladsl.AskPattern._ import akka.actor.typed.scaladsl.Behaviors -import akka.actor.typed.scaladsl.adapter._ import akka.cluster.MultiNodeClusterSpec import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.first import akka.cluster.typed.MultiDcClusterSingletonSpecConfig.second @@ -22,11 +16,8 @@ import akka.cluster.typed.MultiNodeTypedClusterSpec import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction -import akka.util.Timeout import com.typesafe.config.ConfigFactory -import scala.concurrent.Await -import scala.concurrent.Future import scala.concurrent.duration._ object ClusterReceptionistUnreachabilitySpecConfig extends MultiNodeConfig { @@ -55,14 +46,6 @@ abstract class ClusterReceptionistUnreachabilitySpec import ClusterReceptionistUnreachabilitySpec._ - val spawnActor = system.actorOf(PropsAdapter(SpawnProtocol())).toTyped[SpawnProtocol.Command] - def spawn[T](behavior: Behavior[T], name: String): ActorRef[T] = { - implicit val timeout: Timeout = 3.seconds - val f: Future[ActorRef[T]] = spawnActor.ask(SpawnProtocol.Spawn(behavior, name, Props.empty, _)) - - Await.result(f, 3.seconds) - } - val probe = TestProbe[AnyRef]() val receptionistProbe = TestProbe[AnyRef]() diff --git a/akka-docs/src/main/paradox/includes/cluster.md b/akka-docs/src/main/paradox/includes/cluster.md index dc4536f298..4bf9c71ef5 100644 --- a/akka-docs/src/main/paradox/includes/cluster.md +++ b/akka-docs/src/main/paradox/includes/cluster.md @@ -28,9 +28,8 @@ Akka Cluster. The data is accessed with an actor providing a key-value store lik ### Distributed Publish Subscribe -Publish-subscribe messaging between actors in the cluster, and point-to-point messaging -using the logical path of the actors, i.e. the sender does not have to know on which -node the destination actor is running. +Publish-subscribe messaging between actors in the cluster based on a topic, +i.e. the sender does not have to know on which node the destination actor is running. diff --git a/akka-docs/src/main/paradox/typed/cluster.md b/akka-docs/src/main/paradox/typed/cluster.md index 15c9e47c79..76a223a963 100644 --- a/akka-docs/src/main/paradox/typed/cluster.md +++ b/akka-docs/src/main/paradox/typed/cluster.md @@ -440,8 +440,6 @@ See @ref:[Cluster Sharding](cluster-sharding.md). See @ref:[Distributed Data](distributed-data.md). @@include[cluster.md](../includes/cluster.md) { #cluster-pubsub } -Classic Pub Sub can be used by leveraging the `.toClassic` adapters. -See @ref:[Distributed Publish Subscribe in Cluster](distributed-pub-sub.md). The API is @github[#26338](#26338). @@include[cluster.md](../includes/cluster.md) { #cluster-multidc } See @ref:[Cluster Multi-DC](cluster-dc.md). diff --git a/akka-docs/src/main/paradox/typed/distributed-pub-sub.md b/akka-docs/src/main/paradox/typed/distributed-pub-sub.md index 75bada2011..813dfe8593 100644 --- a/akka-docs/src/main/paradox/typed/distributed-pub-sub.md +++ b/akka-docs/src/main/paradox/typed/distributed-pub-sub.md @@ -1,91 +1,66 @@ # Distributed Publish Subscribe in Cluster For the Akka Classic documentation of this feature see @ref:[Classic Distributed Publish Subscribe](../distributed-pub-sub.md). -Classic Pub Sub can be used by leveraging the `.toClassic` adapters until @github[#26338](#26338). ## Module info -Until the new Distributed Publish Subscribe API, see @github[#26338](#26338), -you can use Classic Distributed Publish Subscribe -@ref:[coexisting](coexisting.md) with new Cluster and actors. To do this, add following dependency in your project: +The distributed publish subscribe topic API is available and usable with the core `akka-actor-typed` module, however it will only be distributed +when used in a clustered application: @@dependency[sbt,Maven,Gradle] { group="com.typesafe.akka" - artifact="akka-cluster-tools_$scala.binary_version$" + artifact="akka-cluster-typed_$scala.binary_version$" version="$akka.version$" } -Add the new Cluster API if you don't already have it in an existing Cluster application: +## The Topic Actor -@@dependency[sbt,Maven,Gradle] { - group=com.typesafe.akka - artifact=akka-cluster-typed_$scala.binary_version$ - version=$akka.version$ -} +Distributed publish subscribe is achieved by representing each pub sub topic with an actor, `akka.actor.typed.pubsub.Topic`. -@@project-info{ projectId="akka-cluster-typed" } - -## Sample project - -Until @github[#26338](#26338), [this simple example]($github.base_url$/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) shows how to use -@ref:[Classic Distributed Publish Subscribe](../distributed-pub-sub.md) with the new Cluster API. - -### The DistributedPubSub extension - -The mediator can either be started and accessed with the `akka.cluster.pubsub.DistributedPubSub` extension as shown below, -or started as an ordinary actor, see the full Akka Classic documentation @ref:[Classic Distributed PubSub Extension](../distributed-pub-sub.md#distributedpubsub-extension). - -Scala -: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #mediator } +The topic actor needs to run on each node where subscribers will live or that wants to publish messages to the topic. -Actors register to a topic for Pub-Sub mode, or register to a path for point-to-point mode. - -## Publish - -Pub-Sub mode. For the full Akka Classic documentation of this feature see @ref:[Classic Distributed PubSub Publish](../distributed-pub-sub.md#publish). - -### Subscribers - -Subscriber actors can be started on several nodes in the cluster, and all will receive -messages published to the "content" topic. - -An actor that subscribes to a topic: +The identity of the topic is a tuple of the type of messages that can be published and a string topic name but it is recommended +to not define multiple topics with different types and the same topic name. Scala -: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #subscriber } +: @@snip [PubSubExample.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala) { #start-topic } +Java +: @@snip [PubSubExample.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java) { #start-topic } -Actors may also be subscribed to a named topic with a `group` id. -For the full feature description see @ref:[topic groups](../distributed-pub-sub.md#topic-groups). - -### Publishers - -Publishers publish messages to the topic from anywhere in the cluster. -Messages are published by sending `DistributedPubSubMediator.Publish` message to the -local mediator. - -An actor that publishes to a topic: +Local actors can then subscribe to the topic (and unsubscribe from it): Scala -: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #publisher } +: @@snip [PubSubExample.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala) { #subscribe } -## Send +Java +: @@snip [PubSubExample.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java) { #subscribe } -Messages can be sent in point-to-point or broadcast mode. For the full Akka Classic documentation of this feature see @ref:[Classic Distributed PubSub Send](../distributed-pub-sub.md#send). - -First, an actor must register a destination to send to: +And publish messages to the topic: Scala -: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #destination } +: @@snip [PubSubExample.scala](/akka-actor-typed-tests/src/test/scala/docs/akka/typed/pubsub/PubSubExample.scala) { #publish } -An actor that sends to a registered path: +Java +: @@snip [PubSubExample.java](/akka-actor-typed-tests/src/test/java/jdocs/akka/typed/pubsub/PubSubExample.java) { #publish } -Scala -: @@snip [DistributedPubSubExample.scala](/akka-cluster-typed/src/test/scala/docs/akka/cluster/typed/DistributedPubSubExample.scala) { #send } +## Pub Sub Scalability + +Each topic is represented by one @ref[Receptionist](actor-discovery.md) service key meaning that the number of topics +will scale to thousands or tens of thousands but for higher numbers of topics will require custom solutions. It also means +that a very high turnaround of unique topics will not work well and for such use cases a custom solution is advised. + +The topic actor acts as a proxy and delegates to the local subscribers handling deduplication so that a published message +is only sent once to a node regardless of how many subscribers there are to the topic on that node. + +When a topic actor has no subscribers for a topic it will deregister itself from the receptionist meaning published messages +for the topic will not be sent to it. -Actors are automatically removed from the registry when they are terminated, or you -can explicitly remove entries with `DistributedPubSubMediator.Remove`. - ## Delivery Guarantee -For the full Akka Classic documentation of this see @ref:[Classic Distributed PubSub Delivery Guarantee](../distributed-pub-sub.md#delivery-guarantee). +As in @ref:[Message Delivery Reliability](../general/message-delivery-reliability.md) of Akka, message delivery guarantee in distributed pub sub modes is **at-most-once delivery**. In other words, messages can be lost over the wire. In addition to that the registry of nodes which have subscribers is eventually consistent +meaning that subscribing an actor on one node will have a short delay before it is known on other nodes and published to. + +If you are looking for at-least-once delivery guarantee, we recommend [Alpakka Kafka](https://doc.akka.io/docs/alpakka-kafka/current/). + + diff --git a/akka-docs/src/main/paradox/typed/persistence-testing.md b/akka-docs/src/main/paradox/typed/persistence-testing.md index 14292efddc..31c9a29657 100644 --- a/akka-docs/src/main/paradox/typed/persistence-testing.md +++ b/akka-docs/src/main/paradox/typed/persistence-testing.md @@ -20,7 +20,7 @@ Unit testing of `EventSourcedBehavior` can be done with the @ref:[ActorTestKit]( in the same way as other behaviors. @ref:[Synchronous behavior testing](testing-sync.md) for `EventSourcedBehavior` is not supported yet, but -tracked in @github[issue #26338](#23712). +tracked in @github[issue #23712](#23712). You need to configure a journal, and the in-memory journal is sufficient for unit testing. To enable the in-memory journal you need to pass the following configuration to the @scala[`ScalaTestWithActorTestKit`]@java[`TestKitJunitResource`]. diff --git a/build.sbt b/build.sbt index 338f3803c0..c02f92e853 100644 --- a/build.sbt +++ b/build.sbt @@ -290,7 +290,8 @@ lazy val protobufV3 = akkaModule("akka-protobuf-v3") exportJars := true, // in dependent projects, use assembled and shaded jar makePomConfiguration := makePomConfiguration.value .withConfigurations(Vector(Compile)), // prevent original dependency to be added to pom as runtime dep - packageBin in Compile := ReproducibleBuildsPlugin.postProcessJar((assembly in Compile).value), // package by running assembly + packageBin in Compile := ReproducibleBuildsPlugin + .postProcessJar((assembly in Compile).value), // package by running assembly // Prevent cyclic task dependencies, see https://github.com/sbt/sbt-assembly/issues/365 fullClasspath in assembly := (managedClasspath in Runtime).value, // otherwise, there's a cyclic dependency between packageBin and assembly test in assembly := {}, // assembly runs tests for unknown reason which introduces another cyclic dependency to packageBin via exportedJars @@ -350,11 +351,7 @@ lazy val streamTestkit = akkaModule("akka-stream-testkit") lazy val streamTests = akkaModule("akka-stream-tests") .configs(akka.Jdk9.TestJdk9) - .dependsOn( - streamTestkit % "test->test", - remote % "test->test", - stream % "TestJdk9->CompileJdk9" - ) + .dependsOn(streamTestkit % "test->test", remote % "test->test", stream % "TestJdk9->CompileJdk9") .settings(Dependencies.streamTests) .enablePlugins(NoPublish, Jdk9) .disablePlugins(MimaPlugin, WhiteSourcePlugin) @@ -419,6 +416,9 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed") remoteTests % "test->test", jackson % "test->test") .settings(AutomaticModuleName.settings("akka.cluster.typed")) + .settings(Protobuf.settings) + // To be able to import ContainerFormats.proto + .settings(Protobuf.importPath := Some(baseDirectory.value / ".." / "akka-remote" / "src" / "main" / "protobuf")) .configs(MultiJvm) .enablePlugins(MultiNodeScalaTest)