From 093f0ef14b38eebaf1a9639167b08ca1bcce9523 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 12 Apr 2018 19:00:58 +0200 Subject: [PATCH] Cluster receptionist and new reincarnation of node, #23683 * Drop anonymous functions/classes when creating testkit system name. * Reproducer * Added custom serializer --- .../internal/protobuf/ClusterMessages.java | 623 ++++++++++++++++++ .../src/main/protobuf/ClusterMessages.proto | 13 + .../src/main/resources/reference.conf | 12 + .../internal/AkkaClusterTypedSerializer.scala | 61 ++ .../receptionist/ClusterReceptionist.scala | 138 ++-- .../AkkaClusterTypedSerializerSpec.scala | 41 ++ .../ClusterReceptionistSpec.scala | 281 ++++++-- .../testkit/typed/internal/TestKitUtils.scala | 5 +- build.sbt | 1 + 9 files changed, 1056 insertions(+), 119 deletions(-) create mode 100644 akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java create mode 100644 akka-cluster-typed/src/main/protobuf/ClusterMessages.proto create mode 100644 akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala create mode 100644 akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala diff --git a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java new file mode 100644 index 0000000000..6f60a81d11 --- /dev/null +++ b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java @@ -0,0 +1,623 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +// Generated by the protocol buffer compiler. DO NOT EDIT! +// source: ClusterMessages.proto + +package akka.cluster.typed.internal.protobuf; + +public final class ClusterMessages { + private ClusterMessages() {} + public static void registerAllExtensions( + akka.protobuf.ExtensionRegistry registry) { + } + public interface ReceptionistEntryOrBuilder + extends akka.protobuf.MessageOrBuilder { + + // required string actorRef = 1; + /** + * required string actorRef = 1; + */ + boolean hasActorRef(); + /** + * required string actorRef = 1; + */ + java.lang.String getActorRef(); + /** + * required string actorRef = 1; + */ + akka.protobuf.ByteString + getActorRefBytes(); + + // required uint64 systemUid = 2; + /** + * required uint64 systemUid = 2; + */ + boolean hasSystemUid(); + /** + * required uint64 systemUid = 2; + */ + long getSystemUid(); + } + /** + * Protobuf type {@code akka.cluster.typed.ReceptionistEntry} + */ + public static final class ReceptionistEntry extends + akka.protobuf.GeneratedMessage + implements ReceptionistEntryOrBuilder { + // Use ReceptionistEntry.newBuilder() to construct. + private ReceptionistEntry(akka.protobuf.GeneratedMessage.Builder builder) { + super(builder); + this.unknownFields = builder.getUnknownFields(); + } + private ReceptionistEntry(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); } + + private static final ReceptionistEntry defaultInstance; + public static ReceptionistEntry getDefaultInstance() { + return defaultInstance; + } + + public ReceptionistEntry getDefaultInstanceForType() { + return defaultInstance; + } + + private final akka.protobuf.UnknownFieldSet unknownFields; + @java.lang.Override + public final akka.protobuf.UnknownFieldSet + getUnknownFields() { + return this.unknownFields; + } + private ReceptionistEntry( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + initFields(); + int mutable_bitField0_ = 0; + akka.protobuf.UnknownFieldSet.Builder unknownFields = + akka.protobuf.UnknownFieldSet.newBuilder(); + try { + boolean done = false; + while (!done) { + int tag = input.readTag(); + switch (tag) { + case 0: + done = true; + break; + default: { + if (!parseUnknownField(input, unknownFields, + extensionRegistry, tag)) { + done = true; + } + break; + } + case 10: { + bitField0_ |= 0x00000001; + actorRef_ = input.readBytes(); + break; + } + case 16: { + bitField0_ |= 0x00000002; + systemUid_ = input.readUInt64(); + break; + } + } + } + } catch (akka.protobuf.InvalidProtocolBufferException e) { + throw e.setUnfinishedMessage(this); + } catch (java.io.IOException e) { + throw new akka.protobuf.InvalidProtocolBufferException( + e.getMessage()).setUnfinishedMessage(this); + } finally { + this.unknownFields = unknownFields.build(); + makeExtensionsImmutable(); + } + } + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.class, akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.Builder.class); + } + + public static akka.protobuf.Parser PARSER = + new akka.protobuf.AbstractParser() { + public ReceptionistEntry parsePartialFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return new ReceptionistEntry(input, extensionRegistry); + } + }; + + @java.lang.Override + public akka.protobuf.Parser getParserForType() { + return PARSER; + } + + private int bitField0_; + // required string actorRef = 1; + public static final int ACTORREF_FIELD_NUMBER = 1; + private java.lang.Object actorRef_; + /** + * required string actorRef = 1; + */ + public boolean hasActorRef() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string actorRef = 1; + */ + public java.lang.String getActorRef() { + java.lang.Object ref = actorRef_; + if (ref instanceof java.lang.String) { + return (java.lang.String) ref; + } else { + akka.protobuf.ByteString bs = + (akka.protobuf.ByteString) ref; + java.lang.String s = bs.toStringUtf8(); + if (bs.isValidUtf8()) { + actorRef_ = s; + } + return s; + } + } + /** + * required string actorRef = 1; + */ + public akka.protobuf.ByteString + getActorRefBytes() { + java.lang.Object ref = actorRef_; + if (ref instanceof java.lang.String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + actorRef_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + + // required uint64 systemUid = 2; + public static final int SYSTEMUID_FIELD_NUMBER = 2; + private long systemUid_; + /** + * required uint64 systemUid = 2; + */ + public boolean hasSystemUid() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required uint64 systemUid = 2; + */ + public long getSystemUid() { + return systemUid_; + } + + private void initFields() { + actorRef_ = ""; + systemUid_ = 0L; + } + private byte memoizedIsInitialized = -1; + public final boolean isInitialized() { + byte isInitialized = memoizedIsInitialized; + if (isInitialized != -1) return isInitialized == 1; + + if (!hasActorRef()) { + memoizedIsInitialized = 0; + return false; + } + if (!hasSystemUid()) { + memoizedIsInitialized = 0; + return false; + } + memoizedIsInitialized = 1; + return true; + } + + public void writeTo(akka.protobuf.CodedOutputStream output) + throws java.io.IOException { + getSerializedSize(); + if (((bitField0_ & 0x00000001) == 0x00000001)) { + output.writeBytes(1, getActorRefBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + output.writeUInt64(2, systemUid_); + } + getUnknownFields().writeTo(output); + } + + private int memoizedSerializedSize = -1; + public int getSerializedSize() { + int size = memoizedSerializedSize; + if (size != -1) return size; + + size = 0; + if (((bitField0_ & 0x00000001) == 0x00000001)) { + size += akka.protobuf.CodedOutputStream + .computeBytesSize(1, getActorRefBytes()); + } + if (((bitField0_ & 0x00000002) == 0x00000002)) { + size += akka.protobuf.CodedOutputStream + .computeUInt64Size(2, systemUid_); + } + size += getUnknownFields().getSerializedSize(); + memoizedSerializedSize = size; + return size; + } + + private static final long serialVersionUID = 0L; + @java.lang.Override + protected java.lang.Object writeReplace() + throws java.io.ObjectStreamException { + return super.writeReplace(); + } + + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom( + akka.protobuf.ByteString data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom( + akka.protobuf.ByteString data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(byte[] data) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom( + byte[] data, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws akka.protobuf.InvalidProtocolBufferException { + return PARSER.parseFrom(data, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseDelimitedFrom(java.io.InputStream input) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseDelimitedFrom( + java.io.InputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseDelimitedFrom(input, extensionRegistry); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom( + akka.protobuf.CodedInputStream input) + throws java.io.IOException { + return PARSER.parseFrom(input); + } + public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + return PARSER.parseFrom(input, extensionRegistry); + } + + public static Builder newBuilder() { return Builder.create(); } + public Builder newBuilderForType() { return newBuilder(); } + public static Builder newBuilder(akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry prototype) { + return newBuilder().mergeFrom(prototype); + } + public Builder toBuilder() { return newBuilder(this); } + + @java.lang.Override + protected Builder newBuilderForType( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + Builder builder = new Builder(parent); + return builder; + } + /** + * Protobuf type {@code akka.cluster.typed.ReceptionistEntry} + */ + public static final class Builder extends + akka.protobuf.GeneratedMessage.Builder + implements akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntryOrBuilder { + public static final akka.protobuf.Descriptors.Descriptor + getDescriptor() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_descriptor; + } + + protected akka.protobuf.GeneratedMessage.FieldAccessorTable + internalGetFieldAccessorTable() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable + .ensureFieldAccessorsInitialized( + akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.class, akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.Builder.class); + } + + // Construct using akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.newBuilder() + private Builder() { + maybeForceBuilderInitialization(); + } + + private Builder( + akka.protobuf.GeneratedMessage.BuilderParent parent) { + super(parent); + maybeForceBuilderInitialization(); + } + private void maybeForceBuilderInitialization() { + if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) { + } + } + private static Builder create() { + return new Builder(); + } + + public Builder clear() { + super.clear(); + actorRef_ = ""; + bitField0_ = (bitField0_ & ~0x00000001); + systemUid_ = 0L; + bitField0_ = (bitField0_ & ~0x00000002); + return this; + } + + public Builder clone() { + return create().mergeFrom(buildPartial()); + } + + public akka.protobuf.Descriptors.Descriptor + getDescriptorForType() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_descriptor; + } + + public akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry getDefaultInstanceForType() { + return akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.getDefaultInstance(); + } + + public akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry build() { + akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry result = buildPartial(); + if (!result.isInitialized()) { + throw newUninitializedMessageException(result); + } + return result; + } + + public akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry buildPartial() { + akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry result = new akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry(this); + int from_bitField0_ = bitField0_; + int to_bitField0_ = 0; + if (((from_bitField0_ & 0x00000001) == 0x00000001)) { + to_bitField0_ |= 0x00000001; + } + result.actorRef_ = actorRef_; + if (((from_bitField0_ & 0x00000002) == 0x00000002)) { + to_bitField0_ |= 0x00000002; + } + result.systemUid_ = systemUid_; + result.bitField0_ = to_bitField0_; + onBuilt(); + return result; + } + + public Builder mergeFrom(akka.protobuf.Message other) { + if (other instanceof akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry) { + return mergeFrom((akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry)other); + } else { + super.mergeFrom(other); + return this; + } + } + + public Builder mergeFrom(akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry other) { + if (other == akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.getDefaultInstance()) return this; + if (other.hasActorRef()) { + bitField0_ |= 0x00000001; + actorRef_ = other.actorRef_; + onChanged(); + } + if (other.hasSystemUid()) { + setSystemUid(other.getSystemUid()); + } + this.mergeUnknownFields(other.getUnknownFields()); + return this; + } + + public final boolean isInitialized() { + if (!hasActorRef()) { + + return false; + } + if (!hasSystemUid()) { + + return false; + } + return true; + } + + public Builder mergeFrom( + akka.protobuf.CodedInputStream input, + akka.protobuf.ExtensionRegistryLite extensionRegistry) + throws java.io.IOException { + akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parsedMessage = null; + try { + parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry); + } catch (akka.protobuf.InvalidProtocolBufferException e) { + parsedMessage = (akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry) e.getUnfinishedMessage(); + throw e; + } finally { + if (parsedMessage != null) { + mergeFrom(parsedMessage); + } + } + return this; + } + private int bitField0_; + + // required string actorRef = 1; + private java.lang.Object actorRef_ = ""; + /** + * required string actorRef = 1; + */ + public boolean hasActorRef() { + return ((bitField0_ & 0x00000001) == 0x00000001); + } + /** + * required string actorRef = 1; + */ + public java.lang.String getActorRef() { + java.lang.Object ref = actorRef_; + if (!(ref instanceof java.lang.String)) { + java.lang.String s = ((akka.protobuf.ByteString) ref) + .toStringUtf8(); + actorRef_ = s; + return s; + } else { + return (java.lang.String) ref; + } + } + /** + * required string actorRef = 1; + */ + public akka.protobuf.ByteString + getActorRefBytes() { + java.lang.Object ref = actorRef_; + if (ref instanceof String) { + akka.protobuf.ByteString b = + akka.protobuf.ByteString.copyFromUtf8( + (java.lang.String) ref); + actorRef_ = b; + return b; + } else { + return (akka.protobuf.ByteString) ref; + } + } + /** + * required string actorRef = 1; + */ + public Builder setActorRef( + java.lang.String value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + actorRef_ = value; + onChanged(); + return this; + } + /** + * required string actorRef = 1; + */ + public Builder clearActorRef() { + bitField0_ = (bitField0_ & ~0x00000001); + actorRef_ = getDefaultInstance().getActorRef(); + onChanged(); + return this; + } + /** + * required string actorRef = 1; + */ + public Builder setActorRefBytes( + akka.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + bitField0_ |= 0x00000001; + actorRef_ = value; + onChanged(); + return this; + } + + // required uint64 systemUid = 2; + private long systemUid_ ; + /** + * required uint64 systemUid = 2; + */ + public boolean hasSystemUid() { + return ((bitField0_ & 0x00000002) == 0x00000002); + } + /** + * required uint64 systemUid = 2; + */ + public long getSystemUid() { + return systemUid_; + } + /** + * required uint64 systemUid = 2; + */ + public Builder setSystemUid(long value) { + bitField0_ |= 0x00000002; + systemUid_ = value; + onChanged(); + return this; + } + /** + * required uint64 systemUid = 2; + */ + public Builder clearSystemUid() { + bitField0_ = (bitField0_ & ~0x00000002); + systemUid_ = 0L; + onChanged(); + return this; + } + + // @@protoc_insertion_point(builder_scope:akka.cluster.typed.ReceptionistEntry) + } + + static { + defaultInstance = new ReceptionistEntry(true); + defaultInstance.initFields(); + } + + // @@protoc_insertion_point(class_scope:akka.cluster.typed.ReceptionistEntry) + } + + private static akka.protobuf.Descriptors.Descriptor + internal_static_akka_cluster_typed_ReceptionistEntry_descriptor; + private static + akka.protobuf.GeneratedMessage.FieldAccessorTable + internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable; + + public static akka.protobuf.Descriptors.FileDescriptor + getDescriptor() { + return descriptor; + } + private static akka.protobuf.Descriptors.FileDescriptor + descriptor; + static { + java.lang.String[] descriptorData = { + "\n\025ClusterMessages.proto\022\022akka.cluster.ty" + + "ped\"8\n\021ReceptionistEntry\022\020\n\010actorRef\030\001 \002" + + "(\t\022\021\n\tsystemUid\030\002 \002(\004B(\n$akka.cluster.ty" + + "ped.internal.protobufH\001" + }; + akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = + new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { + public akka.protobuf.ExtensionRegistry assignDescriptors( + akka.protobuf.Descriptors.FileDescriptor root) { + descriptor = root; + internal_static_akka_cluster_typed_ReceptionistEntry_descriptor = + getDescriptor().getMessageTypes().get(0); + internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable = new + akka.protobuf.GeneratedMessage.FieldAccessorTable( + internal_static_akka_cluster_typed_ReceptionistEntry_descriptor, + new java.lang.String[] { "ActorRef", "SystemUid", }); + return null; + } + }; + akka.protobuf.Descriptors.FileDescriptor + .internalBuildGeneratedFileFrom(descriptorData, + new akka.protobuf.Descriptors.FileDescriptor[] { + }, assigner); + } + + // @@protoc_insertion_point(outer_class_scope) +} diff --git a/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto b/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto new file mode 100644 index 0000000000..14b979ac80 --- /dev/null +++ b/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto @@ -0,0 +1,13 @@ +/** + * Copyright (C) 2014-2018 Lightbend Inc. + */ +package akka.cluster.typed; + +option java_package = "akka.cluster.typed.internal.protobuf"; +option optimize_for = SPEED; + + +message ReceptionistEntry { + required string actorRef = 1; + required uint64 systemUid = 2; +} diff --git a/akka-cluster-typed/src/main/resources/reference.conf b/akka-cluster-typed/src/main/resources/reference.conf index 6bd3a35b8f..6fa18b7439 100644 --- a/akka-cluster-typed/src/main/resources/reference.conf +++ b/akka-cluster-typed/src/main/resources/reference.conf @@ -14,3 +14,15 @@ akka.cluster.typed.receptionist { # in case of abrupt termination. pruning-interval = 3 s } + +akka.actor { + serialization-identifiers { + "akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28 + } + serializers { + typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer" + } + serialization-bindings { + "akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster + } +} diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala new file mode 100644 index 0000000000..ef369a3478 --- /dev/null +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala @@ -0,0 +1,61 @@ +/** + * Copyright (C) 2009-${YEAR} Lightbend Inc. + */ + +package akka.cluster.typed.internal + +import java.io.NotSerializableException + +import akka.actor.ExtendedActorSystem +import akka.actor.typed.ActorRefResolver +import akka.annotation.InternalApi +import akka.serialization.{ BaseSerializer, SerializerWithStringManifest } +import akka.actor.typed.scaladsl.adapter._ +import akka.cluster.typed.internal.protobuf.ClusterMessages +import akka.cluster.typed.internal.receptionist.ClusterReceptionist.Entry + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class AkkaClusterTypedSerializer(override val system: ExtendedActorSystem) + extends SerializerWithStringManifest with BaseSerializer { + + // Serializers are initialized early on. `toTyped` might then try to initialize the untyped ActorSystemAdapter extension. + private lazy val resolver = ActorRefResolver(system.toTyped) + private val ReceptionistEntryManifest = "a" + + override def manifest(o: AnyRef): String = o match { + case _: Entry ⇒ ReceptionistEntryManifest + case _ ⇒ + throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]") + } + + override def toBinary(o: AnyRef): Array[Byte] = o match { + case e: Entry ⇒ receptionistEntryToBinary(e) + case _ ⇒ + throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]") + } + + override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match { + case ReceptionistEntryManifest ⇒ receptionistEntryFromBinary(bytes) + case _ ⇒ + throw new NotSerializableException( + s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]") + } + + private def receptionistEntryToBinary(e: Entry): Array[Byte] = + ClusterMessages.ReceptionistEntry.newBuilder() + .setActorRef(resolver.toSerializationFormat(e.ref)) + .setSystemUid(e.systemUid) + .build() + .toByteArray + + private def receptionistEntryFromBinary(bytes: Array[Byte]): Entry = { + val re = ClusterMessages.ReceptionistEntry.parseFrom(bytes) + Entry( + resolver.resolveActorRef(re.getActorRef), + re.getSystemUid + ) + } +} diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index 34dd61a138..f7d4adef85 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -7,17 +7,17 @@ package akka.cluster.typed.internal.receptionist import akka.actor.typed.internal.receptionist.{ AbstractServiceKey, ReceptionistBehaviorProvider, ReceptionistMessages } import akka.actor.typed.receptionist.Receptionist.Command import akka.actor.typed.receptionist.ServiceKey +import akka.actor.typed.scaladsl.adapter._ import akka.actor.typed.scaladsl.{ ActorContext, Behaviors } import akka.actor.typed.{ ActorRef, Behavior, Terminated } -import akka.actor.{ Address, ExtendedActorSystem } import akka.annotation.InternalApi import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ddata.{ DistributedData, ORMultiMap, ORMultiMapKey, Replicator } -import akka.cluster.{ Cluster, ClusterEvent } +import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress } +import akka.remote.AddressUidExtension import akka.util.TypedMultiMap import scala.language.existentials -import akka.actor.typed.scaladsl.adapter._ /** INTERNAL API */ @InternalApi @@ -26,43 +26,54 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]] type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV] - private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], ActorRef[_]]("ReceptionistKey") - private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], ActorRef[_]] + private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], Entry]("ReceptionistKey") + private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], Entry] - case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], ActorRef[_]]) extends AnyVal { + // values contain system uid to make it possible to discern actors at the same + // path in different incarnations of a cluster node + final case class Entry(ref: ActorRef[_], systemUid: Long) { + def uniqueAddress(selfUniqueAddress: UniqueAddress): UniqueAddress = + if (ref.path.address.hasLocalScope) selfUniqueAddress + else UniqueAddress(ref.path.address, systemUid) + override def toString = ref.path.toString + "#" + ref.path.uid + } + + final case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], Entry]) extends AnyVal { // let's hide all the ugly casts we can in here - def getOrElse[T](key: AbstractServiceKey, default: ⇒ Set[ActorRef[_]]): Set[ActorRef[key.Protocol]] = - map.getOrElse(key.asServiceKey, default.asInstanceOf[Set[ActorRef[_]]]).asInstanceOf[Set[ActorRef[key.Protocol]]] + def getActorRefsFor[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = + getEntriesFor(key).map(_.ref.asInstanceOf[ActorRef[key.Protocol]]) - def getOrEmpty[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = getOrElse(key, Set.empty) + def getEntriesFor(key: AbstractServiceKey): Set[Entry] = + map.getOrElse(key.asServiceKey, Set.empty[Entry]) - def addBinding[T](key: ServiceKey[T], value: ActorRef[T])(implicit cluster: Cluster): ServiceRegistry = + def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = ServiceRegistry(map.addBinding(key, value)) - def removeBinding[T](key: ServiceKey[T], value: ActorRef[T])(implicit cluster: Cluster): ServiceRegistry = + def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = ServiceRegistry(map.removeBinding(key, value)) - def removeAll(removals: Map[AbstractServiceKey, Set[ActorRef[_]]])(implicit cluster: Cluster): ServiceRegistry = { + def removeAll(removals: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = { removals.foldLeft(this) { - case (acc, (key, actors)) ⇒ - actors.foldLeft(acc) { - case (innerAcc, actor) ⇒ - innerAcc.removeBinding[key.Protocol](key.asServiceKey, actor.asInstanceOf[ActorRef[key.Protocol]]) + case (acc, (key, entries)) ⇒ + entries.foldLeft(acc) { + case (innerAcc, entry) ⇒ + innerAcc.removeBinding[key.Protocol](key.asServiceKey, entry) } } } - def toORMultiMap: ORMultiMap[ServiceKey[_], ActorRef[_]] = map + def toORMultiMap: ORMultiMap[ServiceKey[_], Entry] = map + } object ServiceRegistry { - val empty = ServiceRegistry(EmptyORMultiMap) + final val Empty = ServiceRegistry(EmptyORMultiMap) def collectChangedKeys(previousState: ServiceRegistry, newState: ServiceRegistry): Set[AbstractServiceKey] = { val allKeys = previousState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key) ⇒ - val oldValues = previousState.getOrEmpty(key) - val newValues = newState.getOrEmpty(key) + val oldValues = previousState.getEntriesFor(key) + val newValues = newState.getEntriesFor(key) if (oldValues != newValues) acc + key else acc } @@ -72,16 +83,18 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { sealed trait InternalCommand final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand - final case class NodeRemoved(addresses: Address) extends InternalCommand - final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], ActorRef[_]]) extends InternalCommand + final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand + final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand case object RemoveTick extends InternalCommand // captures setup/dependencies so we can avoid doing it over and over again - private class Setup(ctx: ActorContext[Any]) { + class Setup(ctx: ActorContext[Any]) { val untypedSystem = ctx.system.toUntyped val settings = ClusterReceptionistSettings(ctx.system) val replicator = DistributedData(untypedSystem).replicator + val selfSystemUid = AddressUidExtension(untypedSystem).longAddressUid implicit val cluster = Cluster(untypedSystem) + def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress } override def behavior: Behavior[Command] = Behaviors.setup[Any] { ctx ⇒ @@ -97,7 +110,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { // remove entries when members are removed val clusterEventMessageAdapter: ActorRef[MemberRemoved] = - ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) ⇒ NodeRemoved(member.address) } + ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) ⇒ NodeRemoved(member.uniqueAddress) } setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, @@ -107,7 +120,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { behavior( setup, - ServiceRegistry.empty, + ServiceRegistry.Empty, TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] ) }.narrow[Command] @@ -145,36 +158,31 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { }) def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = { - val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key)) + val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) subscriptions.get(key).foreach(_ ! msg) } - def nodesRemoved(addresses: Set[Address]): Behavior[Any] = { + def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Any] = { // ok to update from several nodes but more efficient to try to do it from one node if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) { - import akka.actor.typed.scaladsl.adapter._ - val localAddress = ctx.system.toUntyped.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress - - def isOnRemovedNode(ref: ActorRef[_]): Boolean = { - if (ref.path.address.hasLocalScope) addresses(localAddress) - else addresses(ref.path.address) - } - + def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress)) val removals = { - state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]]) { - case (acc, (key, values)) ⇒ - val removedActors = values.filter(isOnRemovedNode) - if (removedActors.isEmpty) acc // no change - else acc + (key -> removedActors) + state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) { + case (acc, (key, entries)) ⇒ + val removedEntries = entries.filter(isOnRemovedNode) + if (removedEntries.isEmpty) acc // no change + else acc + (key -> removedEntries) } } if (removals.nonEmpty) { if (ctx.log.isDebugEnabled) ctx.log.debug( - "Node(s) [{}] removed, updating registry [{}]", + "Node(s) [{}] removed, updating registry removing: [{}]", addresses.mkString(","), - removals.map { case (key, actors) ⇒ key.asServiceKey.id -> actors.mkString("[", ", ", "]") }.mkString(",")) + removals.map { + case (key, entries) ⇒ key.asServiceKey.id -> entries.mkString("[", ", ", "]") + }.mkString(",")) replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ ServiceRegistry(registry).removeAll(removals).toORMultiMap @@ -187,26 +195,27 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { def onCommand(cmd: Command): Behavior[Any] = cmd match { case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) ⇒ - ctx.log.debug("Actor was registered: [{}] [{}]", key, serviceInstance.path) + val entry = Entry(serviceInstance, setup.selfSystemUid) + ctx.log.debug("Actor was registered: [{}] [{}]", key, entry) watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance)) maybeReplyTo match { case Some(replyTo) ⇒ replyTo ! ReceptionistMessages.Registered(key, serviceInstance) case None ⇒ } replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ - ServiceRegistry(registry).addBinding(key, serviceInstance).toORMultiMap + ServiceRegistry(registry).addBinding(key, entry).toORMultiMap } Behaviors.same case ReceptionistMessages.Find(key, replyTo) ⇒ - replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key)) + replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) Behaviors.same case ReceptionistMessages.Subscribe(key, subscriber) ⇒ watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber)) // immediately reply with initial listings to the new subscriber - subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key)) + subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key)) next(newSubscriptions = subscriptions.inserted(key)(subscriber)) } @@ -217,9 +226,10 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { next(newSubscriptions = subscriptions.removed(key)(subscriber)) case RegisteredActorTerminated(key, serviceInstance) ⇒ - ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, serviceInstance.path) + val entry = Entry(serviceInstance, setup.selfSystemUid) + ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry) replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ - ServiceRegistry(registry).removeBinding(key, serviceInstance).toORMultiMap + ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap } Behaviors.same @@ -230,9 +240,10 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { if (changedKeys.nonEmpty) { if (ctx.log.isDebugEnabled) { ctx.log.debug( - "Registration changed: [{}]", + "Change from replicator: [{}], changes: [{}]", + newState.map.entries, changedKeys.map(key ⇒ - key.asServiceKey.id -> newState.getOrEmpty(key).map(_.path).mkString("[", ", ", "]") + key.asServiceKey.id -> newState.getEntriesFor(key).mkString("[", ", ", "]") ).mkString(", ")) } changedKeys.foreach(notifySubscribersFor(_, newState)) @@ -241,24 +252,33 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { Behaviors.same } - case NodeRemoved(address) ⇒ + case NodeRemoved(uniqueAddress) ⇒ // ok to update from several nodes but more efficient to try to do it from one node if (cluster.state.leader.contains(cluster.selfAddress)) { - nodesRemoved(Set(address)) + ctx.log.debug(s"Leader node observed removed address [{}]", uniqueAddress) + nodesRemoved(Set(uniqueAddress)) } else Behaviors.same case RemoveTick ⇒ // ok to update from several nodes but more efficient to try to do it from one node if (cluster.state.leader.contains(cluster.selfAddress)) { - val allAddressesInState: Set[Address] = state.map.entries.flatMap { - case (_, values) ⇒ + val allAddressesInState: Set[UniqueAddress] = state.map.entries.flatMap { + case (_, entries) ⇒ // don't care about local (empty host:port addresses) - values.collect { case ref if ref.path.address.hasGlobalScope ⇒ ref.path.address } + entries.collect { + case entry if entry.ref.path.address.hasGlobalScope ⇒ + entry.uniqueAddress(setup.selfUniqueAddress) + } }(collection.breakOut) - val clusterAddresses = cluster.state.members.map(_.address) - val diff = allAddressesInState diff clusterAddresses - if (diff.isEmpty) Behavior.same - else nodesRemoved(diff) + val clusterAddresses = cluster.state.members.map(_.uniqueAddress) + val notInCluster = allAddressesInState diff clusterAddresses + + if (notInCluster.isEmpty) Behavior.same + else { + if (ctx.log.isDebugEnabled) + ctx.log.debug("Leader node cleanup tick, removed nodes: [{}]", notInCluster.mkString(",")) + nodesRemoved(notInCluster) + } } else Behavior.same } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala new file mode 100644 index 0000000000..74c0b0b5de --- /dev/null +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala @@ -0,0 +1,41 @@ +/** + * Copyright (C) 2009-${YEAR} Lightbend Inc. + */ + +package akka.cluster.typed.internal + +import akka.actor.ExtendedActorSystem +import akka.actor.typed.scaladsl.adapter._ +import akka.actor.typed.{ Behavior, TypedAkkaSpecWithShutdown } +import akka.cluster.typed.internal.receptionist.ClusterReceptionist +import akka.serialization.SerializationExtension +import akka.testkit.typed.scaladsl.ActorTestKit + +class AkkaClusterTypedSerializerSpec extends ActorTestKit with TypedAkkaSpecWithShutdown { + + val ref = spawn(Behavior.empty[String]) + val untypedSystem = system.toUntyped + val serializer = new AkkaClusterTypedSerializer(untypedSystem.asInstanceOf[ExtendedActorSystem]) + + "AkkaClusterTypedSerializer" must { + + Seq( + "ReceptionistEntry" → ClusterReceptionist.Entry(ref, 666L) + ).foreach { + case (scenario, item) ⇒ + s"resolve serializer for $scenario" in { + val serializer = SerializationExtension(untypedSystem) + serializer.serializerFor(item.getClass).getClass should be(classOf[AkkaClusterTypedSerializer]) + } + + s"serialize and de-serialize $scenario" in { + verifySerialization(item) + } + } + } + + def verifySerialization(msg: AnyRef): Unit = { + serializer.fromBinary(serializer.toBinary(msg), serializer.manifest(msg)) should be(msg) + } + +} diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 812466a30f..781af52fe5 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -6,19 +6,21 @@ package akka.cluster.typed.internal.receptionist import java.nio.charset.StandardCharsets -import akka.actor.ExtendedActorSystem -import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown } +import akka.actor.{ ExtendedActorSystem, RootActorPath } import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.cluster.typed.Cluster +import akka.actor.typed.{ ActorRef, ActorRefResolver } +import akka.cluster.MemberStatus +import akka.cluster.typed.{ Cluster, Join } import akka.serialization.SerializerWithStringManifest -import akka.testkit.typed.TestKitSettings +import akka.testkit.typed.FishingOutcome import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe } -import com.typesafe.config.ConfigFactory -import scala.concurrent.duration._ +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.{ Matchers, WordSpec } -import akka.cluster.typed.Join +import scala.concurrent.Await +import scala.concurrent.duration._ object ClusterReceptionistSpec { val config = ConfigFactory.parseString( @@ -40,7 +42,6 @@ object ClusterReceptionistSpec { } } akka.remote.artery.enabled = true - akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 akka.cluster { @@ -89,76 +90,238 @@ object ClusterReceptionistSpec { val PingKey = ServiceKey[PingProtocol]("pingy") } -class ClusterReceptionistSpec extends ActorTestKit - with TypedAkkaSpecWithShutdown { - - override def config = ClusterReceptionistSpec.config +class ClusterReceptionistSpec extends WordSpec with Matchers { import ClusterReceptionistSpec._ - - implicit val testSettings = TestKitSettings(system) - val clusterNode1 = Cluster(system) - - val testKit2 = new ActorTestKit { - override def name = ClusterReceptionistSpec.this.system.name - override def config = ClusterReceptionistSpec.this.system.settings.config - } - val system2 = testKit2.system - val clusterNode2 = Cluster(system2) - - clusterNode1.manager ! Join(clusterNode1.selfMember.address) - clusterNode2.manager ! Join(clusterNode1.selfMember.address) - import Receptionist._ "The cluster receptionist" must { - "must eventually replicate registrations to the other side" in { - val regProbe = TestProbe[Any]()(system) - val regProbe2 = TestProbe[Any]()(system2) + "eventually replicate registrations to the other side" in { + val testKit1 = new ActorTestKit { + override def name = super.name + "-test-1" + override def config = ClusterReceptionistSpec.config + } + val system1 = testKit1.system + val testKit2 = new ActorTestKit { + override def name = system1.name + override def config = testKit1.system.settings.config + } + val system2 = testKit2.system + try { + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) - system2.receptionist ! Subscribe(PingKey, regProbe2.ref) - regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) - val service = spawn(pingPongBehavior) - system.receptionist ! Register(PingKey, service, regProbe.ref) - regProbe.expectMessage(Registered(PingKey, service)) + system2.receptionist ! Subscribe(PingKey, regProbe2.ref) + regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - val PingKey.Listing(remoteServiceRefs) = regProbe2.expectMessageType[Listing] - val theRef = remoteServiceRefs.head - theRef ! Ping(regProbe2.ref) - regProbe2.expectMessage(Pong) + val service = testKit1.spawn(pingPongBehavior) + testKit1.system.receptionist ! Register(PingKey, service, regProbe1.ref) + regProbe1.expectMessage(Registered(PingKey, service)) - service ! Perish - regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + val PingKey.Listing(remoteServiceRefs) = regProbe2.expectMessageType[Listing] + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe2.ref) + regProbe2.expectMessage(Pong) + + service ! Perish + regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + } finally { + testKit1.shutdownTestKit() + testKit2.shutdownTestKit() + } } - "must remove registrations when node dies" in { + "remove registrations when node dies" in { + val testKit1 = new ActorTestKit { + override def name = super.name + "-test-2" + override def config = ClusterReceptionistSpec.config + } + val system1 = testKit1.system + val testKit2 = new ActorTestKit { + override def name = system1.name + override def config = testKit1.system.settings.config + } + val system2 = testKit2.system + try { - val regProbe = TestProbe[Any]()(system) - val regProbe2 = TestProbe[Any]()(system2) + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) - system.receptionist ! Subscribe(PingKey, regProbe.ref) - regProbe.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) - val service2 = testKit2.spawn(pingPongBehavior) - system2.receptionist ! Register(PingKey, service2, regProbe2.ref) - regProbe2.expectMessage(Registered(PingKey, service2)) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) - val remoteServiceRefs = regProbe.expectMessageType[Listing].serviceInstances(PingKey) - val theRef = remoteServiceRefs.head - theRef ! Ping(regProbe.ref) - regProbe.expectMessage(Pong) + system1.receptionist ! Subscribe(PingKey, regProbe1.ref) + regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) - // abrupt termination - system2.terminate() - regProbe.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + val service2 = testKit2.spawn(pingPongBehavior) + system2.receptionist ! Register(PingKey, service2, regProbe2.ref) + regProbe2.expectMessage(Registered(PingKey, service2)) + + val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe1.ref) + regProbe1.expectMessage(Pong) + + // abrupt termination + system2.terminate() + regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + } finally { + testKit1.shutdownTestKit() + if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit() + } } - } + "work with services registered before node joins cluster" in { + val testKit1 = new ActorTestKit { + override def name = super.name + "-test-2" + override def config = ClusterReceptionistSpec.config + } + val system1 = testKit1.system + val testKit2 = new ActorTestKit { + override def name = system1.name + override def config = testKit1.system.settings.config + } + val system2 = testKit2.system + try { + + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) + + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + + system1.receptionist ! Subscribe(PingKey, regProbe1.ref) + regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + val service2 = testKit2.spawn(pingPongBehavior) + system2.receptionist ! Register(PingKey, service2, regProbe2.ref) + regProbe2.expectMessage(Registered(PingKey, service2)) + + // then we join the cluster + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up)) + + // and the subscriber on node1 should see the service + val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe1.ref) + regProbe1.expectMessage(Pong) + + // abrupt termination + system2.terminate() + regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + } finally { + testKit1.shutdownTestKit() + if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit() + } + } + + "handle a new incarnation of the same node well" in { + val testKit1 = new ActorTestKit { + override def name = super.name + "-test-3" + override def config = ClusterReceptionistSpec.config + } + val system1 = testKit1.system + val testKit2 = new ActorTestKit { + override def name = system1.name + override def config = testKit1.system.settings.config + } + val system2 = testKit2.system + try { + + val clusterNode1 = Cluster(system1) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + val clusterNode2 = Cluster(system2) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + + val regProbe1 = TestProbe[Any]()(system1) + val regProbe2 = TestProbe[Any]()(system2) + + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + + system1.receptionist ! Subscribe(PingKey, regProbe1.ref) + regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) + + val service2 = testKit2.spawn(pingPongBehavior, "instance") + system2.receptionist ! Register(PingKey, service2, regProbe2.ref) + regProbe2.expectMessage(Registered(PingKey, service2)) + + // make sure we saw the first incarnation on node1 + val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey) + val theRef = remoteServiceRefs.head + theRef ! Ping(regProbe1.ref) + regProbe1.expectMessage(Pong) + + // FIXME do we need to blackhole the connection to system2 before terminating + // right now it doesn't work anyways though ;D + + // abrupt termination but then a node with the same host:port comes online quickly + system1.log.debug("Terminating system2, uid: [{}]", clusterNode2.selfMember.uniqueAddress.longUid) + Await.ready(system2.terminate(), 10.seconds) + + val testKit3 = new ActorTestKit { + override protected def name: String = system1.name + override def config: Config = testKit1.config + } + try { + val system3 = testKit3.system + system1.log.debug("Starting system3 at same hostname port as system2, uid: [{}]", Cluster(system3).selfMember.uniqueAddress.longUid) + val clusterNode3 = Cluster(system3) + clusterNode3.manager ! Join(clusterNode1.selfMember.address) + val regProbe3 = TestProbe[Any]()(system3) + + // and registers the same service key + val service3 = testKit3.spawn(pingPongBehavior, "instance") + system3.log.debug("Spawning/registering ping service in new incarnation {}#{}", service3.path, service3.path.uid) + system3.receptionist ! Register(PingKey, service3, regProbe3.ref) + regProbe3.expectMessage(Registered(PingKey, service3)) + system3.log.debug("Registered actor [{}#{}] for system3", service3.path, service3.path.uid) + + // make sure it joined fine and node1 has upped it + regProbe1.awaitAssert { + clusterNode1.state.members.exists(m ⇒ + m.uniqueAddress == clusterNode3.selfMember.uniqueAddress && + m.status == MemberStatus.Up && + !clusterNode1.state.unreachable(m) + ) + } + + // we should get either empty message and then updated with the new incarnation actor + // or just updated with the new service directly + val msg = regProbe1.fishForMessage(20.seconds) { + case PingKey.Listing(entries) if entries.size == 1 ⇒ FishingOutcome.Complete + case _: Listing ⇒ FishingOutcome.ContinueAndIgnore + } + val PingKey.Listing(entries) = msg.last + entries should have size 1 + val ref = entries.head + val service3RemotePath = RootActorPath(clusterNode3.selfMember.address) / "user" / "instance" + ref.path should ===(service3RemotePath) + ref ! Ping(regProbe1.ref) + regProbe1.expectMessage(Pong) + + } finally { + testKit3.shutdownTestKit() + } + } finally { + testKit1.shutdownTestKit() + if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit() + } + } - override def afterAll(): Unit = { - super.afterAll() - ActorTestKit.shutdown(system2, 10.seconds) } } diff --git a/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestKitUtils.scala b/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestKitUtils.scala index 5189d16e90..906af6b3ca 100644 --- a/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestKitUtils.scala +++ b/akka-testkit-typed/src/main/scala/akka/testkit/typed/internal/TestKitUtils.scala @@ -54,7 +54,10 @@ private[akka] object TestKitUtils { } // sanitize for actor system name - filteredStack.next().replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_") + filteredStack.next() + .replaceFirst("""^.*\.""", "") // drop package name + .replaceAll("""\$\$?\w+""", "") // drop scala anonymous functions/classes + .replaceAll("[^a-zA-Z_0-9]", "_") } def shutdown( diff --git a/build.sbt b/build.sbt index d571a3656f..ef2baf4909 100644 --- a/build.sbt +++ b/build.sbt @@ -408,6 +408,7 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed") distributedData, persistence % "test->test", persistenceTyped % "test->test", + protobuf, typedTestkit % "test->test", actorTypedTests % "test->test" )