Cluster receptionist and new reincarnation of node, #23683

* Drop anonymous functions/classes when creating testkit system name.
* Reproducer
* Added custom serializer
This commit is contained in:
Johan Andrén 2018-04-12 19:00:58 +02:00 committed by Patrik Nordwall
parent 3ebb9fa9c1
commit 093f0ef14b
9 changed files with 1056 additions and 119 deletions

View file

@ -0,0 +1,623 @@
/*
* Copyright (C) 2018 Lightbend Inc. <https://www.lightbend.com>
*/
// Generated by the protocol buffer compiler. DO NOT EDIT!
// source: ClusterMessages.proto
package akka.cluster.typed.internal.protobuf;
public final class ClusterMessages {
private ClusterMessages() {}
public static void registerAllExtensions(
akka.protobuf.ExtensionRegistry registry) {
}
public interface ReceptionistEntryOrBuilder
extends akka.protobuf.MessageOrBuilder {
// required string actorRef = 1;
/**
* <code>required string actorRef = 1;</code>
*/
boolean hasActorRef();
/**
* <code>required string actorRef = 1;</code>
*/
java.lang.String getActorRef();
/**
* <code>required string actorRef = 1;</code>
*/
akka.protobuf.ByteString
getActorRefBytes();
// required uint64 systemUid = 2;
/**
* <code>required uint64 systemUid = 2;</code>
*/
boolean hasSystemUid();
/**
* <code>required uint64 systemUid = 2;</code>
*/
long getSystemUid();
}
/**
* Protobuf type {@code akka.cluster.typed.ReceptionistEntry}
*/
public static final class ReceptionistEntry extends
akka.protobuf.GeneratedMessage
implements ReceptionistEntryOrBuilder {
// Use ReceptionistEntry.newBuilder() to construct.
private ReceptionistEntry(akka.protobuf.GeneratedMessage.Builder<?> builder) {
super(builder);
this.unknownFields = builder.getUnknownFields();
}
private ReceptionistEntry(boolean noInit) { this.unknownFields = akka.protobuf.UnknownFieldSet.getDefaultInstance(); }
private static final ReceptionistEntry defaultInstance;
public static ReceptionistEntry getDefaultInstance() {
return defaultInstance;
}
public ReceptionistEntry getDefaultInstanceForType() {
return defaultInstance;
}
private final akka.protobuf.UnknownFieldSet unknownFields;
@java.lang.Override
public final akka.protobuf.UnknownFieldSet
getUnknownFields() {
return this.unknownFields;
}
private ReceptionistEntry(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
initFields();
int mutable_bitField0_ = 0;
akka.protobuf.UnknownFieldSet.Builder unknownFields =
akka.protobuf.UnknownFieldSet.newBuilder();
try {
boolean done = false;
while (!done) {
int tag = input.readTag();
switch (tag) {
case 0:
done = true;
break;
default: {
if (!parseUnknownField(input, unknownFields,
extensionRegistry, tag)) {
done = true;
}
break;
}
case 10: {
bitField0_ |= 0x00000001;
actorRef_ = input.readBytes();
break;
}
case 16: {
bitField0_ |= 0x00000002;
systemUid_ = input.readUInt64();
break;
}
}
}
} catch (akka.protobuf.InvalidProtocolBufferException e) {
throw e.setUnfinishedMessage(this);
} catch (java.io.IOException e) {
throw new akka.protobuf.InvalidProtocolBufferException(
e.getMessage()).setUnfinishedMessage(this);
} finally {
this.unknownFields = unknownFields.build();
makeExtensionsImmutable();
}
}
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.class, akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.Builder.class);
}
public static akka.protobuf.Parser<ReceptionistEntry> PARSER =
new akka.protobuf.AbstractParser<ReceptionistEntry>() {
public ReceptionistEntry parsePartialFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return new ReceptionistEntry(input, extensionRegistry);
}
};
@java.lang.Override
public akka.protobuf.Parser<ReceptionistEntry> getParserForType() {
return PARSER;
}
private int bitField0_;
// required string actorRef = 1;
public static final int ACTORREF_FIELD_NUMBER = 1;
private java.lang.Object actorRef_;
/**
* <code>required string actorRef = 1;</code>
*/
public boolean hasActorRef() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string actorRef = 1;</code>
*/
public java.lang.String getActorRef() {
java.lang.Object ref = actorRef_;
if (ref instanceof java.lang.String) {
return (java.lang.String) ref;
} else {
akka.protobuf.ByteString bs =
(akka.protobuf.ByteString) ref;
java.lang.String s = bs.toStringUtf8();
if (bs.isValidUtf8()) {
actorRef_ = s;
}
return s;
}
}
/**
* <code>required string actorRef = 1;</code>
*/
public akka.protobuf.ByteString
getActorRefBytes() {
java.lang.Object ref = actorRef_;
if (ref instanceof java.lang.String) {
akka.protobuf.ByteString b =
akka.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
actorRef_ = b;
return b;
} else {
return (akka.protobuf.ByteString) ref;
}
}
// required uint64 systemUid = 2;
public static final int SYSTEMUID_FIELD_NUMBER = 2;
private long systemUid_;
/**
* <code>required uint64 systemUid = 2;</code>
*/
public boolean hasSystemUid() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>required uint64 systemUid = 2;</code>
*/
public long getSystemUid() {
return systemUid_;
}
private void initFields() {
actorRef_ = "";
systemUid_ = 0L;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
byte isInitialized = memoizedIsInitialized;
if (isInitialized != -1) return isInitialized == 1;
if (!hasActorRef()) {
memoizedIsInitialized = 0;
return false;
}
if (!hasSystemUid()) {
memoizedIsInitialized = 0;
return false;
}
memoizedIsInitialized = 1;
return true;
}
public void writeTo(akka.protobuf.CodedOutputStream output)
throws java.io.IOException {
getSerializedSize();
if (((bitField0_ & 0x00000001) == 0x00000001)) {
output.writeBytes(1, getActorRefBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
output.writeUInt64(2, systemUid_);
}
getUnknownFields().writeTo(output);
}
private int memoizedSerializedSize = -1;
public int getSerializedSize() {
int size = memoizedSerializedSize;
if (size != -1) return size;
size = 0;
if (((bitField0_ & 0x00000001) == 0x00000001)) {
size += akka.protobuf.CodedOutputStream
.computeBytesSize(1, getActorRefBytes());
}
if (((bitField0_ & 0x00000002) == 0x00000002)) {
size += akka.protobuf.CodedOutputStream
.computeUInt64Size(2, systemUid_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
}
private static final long serialVersionUID = 0L;
@java.lang.Override
protected java.lang.Object writeReplace()
throws java.io.ObjectStreamException {
return super.writeReplace();
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(
akka.protobuf.ByteString data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(
akka.protobuf.ByteString data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(byte[] data)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(
byte[] data,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws akka.protobuf.InvalidProtocolBufferException {
return PARSER.parseFrom(data, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseDelimitedFrom(java.io.InputStream input)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseDelimitedFrom(
java.io.InputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseDelimitedFrom(input, extensionRegistry);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(
akka.protobuf.CodedInputStream input)
throws java.io.IOException {
return PARSER.parseFrom(input);
}
public static akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parseFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
return PARSER.parseFrom(input, extensionRegistry);
}
public static Builder newBuilder() { return Builder.create(); }
public Builder newBuilderForType() { return newBuilder(); }
public static Builder newBuilder(akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry prototype) {
return newBuilder().mergeFrom(prototype);
}
public Builder toBuilder() { return newBuilder(this); }
@java.lang.Override
protected Builder newBuilderForType(
akka.protobuf.GeneratedMessage.BuilderParent parent) {
Builder builder = new Builder(parent);
return builder;
}
/**
* Protobuf type {@code akka.cluster.typed.ReceptionistEntry}
*/
public static final class Builder extends
akka.protobuf.GeneratedMessage.Builder<Builder>
implements akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntryOrBuilder {
public static final akka.protobuf.Descriptors.Descriptor
getDescriptor() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_descriptor;
}
protected akka.protobuf.GeneratedMessage.FieldAccessorTable
internalGetFieldAccessorTable() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable
.ensureFieldAccessorsInitialized(
akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.class, akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.Builder.class);
}
// Construct using akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.newBuilder()
private Builder() {
maybeForceBuilderInitialization();
}
private Builder(
akka.protobuf.GeneratedMessage.BuilderParent parent) {
super(parent);
maybeForceBuilderInitialization();
}
private void maybeForceBuilderInitialization() {
if (akka.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
}
}
private static Builder create() {
return new Builder();
}
public Builder clear() {
super.clear();
actorRef_ = "";
bitField0_ = (bitField0_ & ~0x00000001);
systemUid_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
return this;
}
public Builder clone() {
return create().mergeFrom(buildPartial());
}
public akka.protobuf.Descriptors.Descriptor
getDescriptorForType() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.internal_static_akka_cluster_typed_ReceptionistEntry_descriptor;
}
public akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry getDefaultInstanceForType() {
return akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.getDefaultInstance();
}
public akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry build() {
akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry result = buildPartial();
if (!result.isInitialized()) {
throw newUninitializedMessageException(result);
}
return result;
}
public akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry buildPartial() {
akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry result = new akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry(this);
int from_bitField0_ = bitField0_;
int to_bitField0_ = 0;
if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
to_bitField0_ |= 0x00000001;
}
result.actorRef_ = actorRef_;
if (((from_bitField0_ & 0x00000002) == 0x00000002)) {
to_bitField0_ |= 0x00000002;
}
result.systemUid_ = systemUid_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
}
public Builder mergeFrom(akka.protobuf.Message other) {
if (other instanceof akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry) {
return mergeFrom((akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry)other);
} else {
super.mergeFrom(other);
return this;
}
}
public Builder mergeFrom(akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry other) {
if (other == akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry.getDefaultInstance()) return this;
if (other.hasActorRef()) {
bitField0_ |= 0x00000001;
actorRef_ = other.actorRef_;
onChanged();
}
if (other.hasSystemUid()) {
setSystemUid(other.getSystemUid());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
public final boolean isInitialized() {
if (!hasActorRef()) {
return false;
}
if (!hasSystemUid()) {
return false;
}
return true;
}
public Builder mergeFrom(
akka.protobuf.CodedInputStream input,
akka.protobuf.ExtensionRegistryLite extensionRegistry)
throws java.io.IOException {
akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry parsedMessage = null;
try {
parsedMessage = PARSER.parsePartialFrom(input, extensionRegistry);
} catch (akka.protobuf.InvalidProtocolBufferException e) {
parsedMessage = (akka.cluster.typed.internal.protobuf.ClusterMessages.ReceptionistEntry) e.getUnfinishedMessage();
throw e;
} finally {
if (parsedMessage != null) {
mergeFrom(parsedMessage);
}
}
return this;
}
private int bitField0_;
// required string actorRef = 1;
private java.lang.Object actorRef_ = "";
/**
* <code>required string actorRef = 1;</code>
*/
public boolean hasActorRef() {
return ((bitField0_ & 0x00000001) == 0x00000001);
}
/**
* <code>required string actorRef = 1;</code>
*/
public java.lang.String getActorRef() {
java.lang.Object ref = actorRef_;
if (!(ref instanceof java.lang.String)) {
java.lang.String s = ((akka.protobuf.ByteString) ref)
.toStringUtf8();
actorRef_ = s;
return s;
} else {
return (java.lang.String) ref;
}
}
/**
* <code>required string actorRef = 1;</code>
*/
public akka.protobuf.ByteString
getActorRefBytes() {
java.lang.Object ref = actorRef_;
if (ref instanceof String) {
akka.protobuf.ByteString b =
akka.protobuf.ByteString.copyFromUtf8(
(java.lang.String) ref);
actorRef_ = b;
return b;
} else {
return (akka.protobuf.ByteString) ref;
}
}
/**
* <code>required string actorRef = 1;</code>
*/
public Builder setActorRef(
java.lang.String value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
actorRef_ = value;
onChanged();
return this;
}
/**
* <code>required string actorRef = 1;</code>
*/
public Builder clearActorRef() {
bitField0_ = (bitField0_ & ~0x00000001);
actorRef_ = getDefaultInstance().getActorRef();
onChanged();
return this;
}
/**
* <code>required string actorRef = 1;</code>
*/
public Builder setActorRefBytes(
akka.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
bitField0_ |= 0x00000001;
actorRef_ = value;
onChanged();
return this;
}
// required uint64 systemUid = 2;
private long systemUid_ ;
/**
* <code>required uint64 systemUid = 2;</code>
*/
public boolean hasSystemUid() {
return ((bitField0_ & 0x00000002) == 0x00000002);
}
/**
* <code>required uint64 systemUid = 2;</code>
*/
public long getSystemUid() {
return systemUid_;
}
/**
* <code>required uint64 systemUid = 2;</code>
*/
public Builder setSystemUid(long value) {
bitField0_ |= 0x00000002;
systemUid_ = value;
onChanged();
return this;
}
/**
* <code>required uint64 systemUid = 2;</code>
*/
public Builder clearSystemUid() {
bitField0_ = (bitField0_ & ~0x00000002);
systemUid_ = 0L;
onChanged();
return this;
}
// @@protoc_insertion_point(builder_scope:akka.cluster.typed.ReceptionistEntry)
}
static {
defaultInstance = new ReceptionistEntry(true);
defaultInstance.initFields();
}
// @@protoc_insertion_point(class_scope:akka.cluster.typed.ReceptionistEntry)
}
private static akka.protobuf.Descriptors.Descriptor
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor;
private static
akka.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable;
public static akka.protobuf.Descriptors.FileDescriptor
getDescriptor() {
return descriptor;
}
private static akka.protobuf.Descriptors.FileDescriptor
descriptor;
static {
java.lang.String[] descriptorData = {
"\n\025ClusterMessages.proto\022\022akka.cluster.ty" +
"ped\"8\n\021ReceptionistEntry\022\020\n\010actorRef\030\001 \002" +
"(\t\022\021\n\tsystemUid\030\002 \002(\004B(\n$akka.cluster.ty" +
"ped.internal.protobufH\001"
};
akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new akka.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
public akka.protobuf.ExtensionRegistry assignDescriptors(
akka.protobuf.Descriptors.FileDescriptor root) {
descriptor = root;
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor =
getDescriptor().getMessageTypes().get(0);
internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable = new
akka.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor,
new java.lang.String[] { "ActorRef", "SystemUid", });
return null;
}
};
akka.protobuf.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
new akka.protobuf.Descriptors.FileDescriptor[] {
}, assigner);
}
// @@protoc_insertion_point(outer_class_scope)
}

View file

@ -0,0 +1,13 @@
/**
* Copyright (C) 2014-2018 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed;
option java_package = "akka.cluster.typed.internal.protobuf";
option optimize_for = SPEED;
message ReceptionistEntry {
required string actorRef = 1;
required uint64 systemUid = 2;
}

View file

@ -14,3 +14,15 @@ akka.cluster.typed.receptionist {
# in case of abrupt termination.
pruning-interval = 3 s
}
akka.actor {
serialization-identifiers {
"akka.cluster.typed.internal.AkkaClusterTypedSerializer" = 28
}
serializers {
typed-cluster = "akka.cluster.typed.internal.AkkaClusterTypedSerializer"
}
serialization-bindings {
"akka.cluster.typed.internal.receptionist.ClusterReceptionist$Entry" = typed-cluster
}
}

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-${YEAR} Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed.internal
import java.io.NotSerializableException
import akka.actor.ExtendedActorSystem
import akka.actor.typed.ActorRefResolver
import akka.annotation.InternalApi
import akka.serialization.{ BaseSerializer, SerializerWithStringManifest }
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.typed.internal.protobuf.ClusterMessages
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.Entry
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class AkkaClusterTypedSerializer(override val system: ExtendedActorSystem)
extends SerializerWithStringManifest with BaseSerializer {
// Serializers are initialized early on. `toTyped` might then try to initialize the untyped ActorSystemAdapter extension.
private lazy val resolver = ActorRefResolver(system.toTyped)
private val ReceptionistEntryManifest = "a"
override def manifest(o: AnyRef): String = o match {
case _: Entry ReceptionistEntryManifest
case _
throw new IllegalArgumentException(s"Can't serialize object of type ${o.getClass} in [${getClass.getName}]")
}
override def toBinary(o: AnyRef): Array[Byte] = o match {
case e: Entry receptionistEntryToBinary(e)
case _
throw new IllegalArgumentException(s"Cannot serialize object of type [${o.getClass.getName}]")
}
override def fromBinary(bytes: Array[Byte], manifest: String): AnyRef = manifest match {
case ReceptionistEntryManifest receptionistEntryFromBinary(bytes)
case _
throw new NotSerializableException(
s"Unimplemented deserialization of message with manifest [$manifest] in [${getClass.getName}]")
}
private def receptionistEntryToBinary(e: Entry): Array[Byte] =
ClusterMessages.ReceptionistEntry.newBuilder()
.setActorRef(resolver.toSerializationFormat(e.ref))
.setSystemUid(e.systemUid)
.build()
.toByteArray
private def receptionistEntryFromBinary(bytes: Array[Byte]): Entry = {
val re = ClusterMessages.ReceptionistEntry.parseFrom(bytes)
Entry(
resolver.resolveActorRef(re.getActorRef),
re.getSystemUid
)
}
}

View file

@ -7,17 +7,17 @@ package akka.cluster.typed.internal.receptionist
import akka.actor.typed.internal.receptionist.{ AbstractServiceKey, ReceptionistBehaviorProvider, ReceptionistMessages }
import akka.actor.typed.receptionist.Receptionist.Command
import akka.actor.typed.receptionist.ServiceKey
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.scaladsl.{ ActorContext, Behaviors }
import akka.actor.typed.{ ActorRef, Behavior, Terminated }
import akka.actor.{ Address, ExtendedActorSystem }
import akka.annotation.InternalApi
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ddata.{ DistributedData, ORMultiMap, ORMultiMapKey, Replicator }
import akka.cluster.{ Cluster, ClusterEvent }
import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress }
import akka.remote.AddressUidExtension
import akka.util.TypedMultiMap
import scala.language.existentials
import akka.actor.typed.scaladsl.adapter._
/** INTERNAL API */
@InternalApi
@ -26,43 +26,54 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
type SubscriptionsKV[K <: AbstractServiceKey] = ActorRef[ReceptionistMessages.Listing[K#Protocol]]
type SubscriptionRegistry = TypedMultiMap[AbstractServiceKey, SubscriptionsKV]
private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], ActorRef[_]]("ReceptionistKey")
private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], ActorRef[_]]
private final val ReceptionistKey = ORMultiMapKey[ServiceKey[_], Entry]("ReceptionistKey")
private final val EmptyORMultiMap = ORMultiMap.empty[ServiceKey[_], Entry]
case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], ActorRef[_]]) extends AnyVal {
// values contain system uid to make it possible to discern actors at the same
// path in different incarnations of a cluster node
final case class Entry(ref: ActorRef[_], systemUid: Long) {
def uniqueAddress(selfUniqueAddress: UniqueAddress): UniqueAddress =
if (ref.path.address.hasLocalScope) selfUniqueAddress
else UniqueAddress(ref.path.address, systemUid)
override def toString = ref.path.toString + "#" + ref.path.uid
}
final case class ServiceRegistry(map: ORMultiMap[ServiceKey[_], Entry]) extends AnyVal {
// let's hide all the ugly casts we can in here
def getOrElse[T](key: AbstractServiceKey, default: Set[ActorRef[_]]): Set[ActorRef[key.Protocol]] =
map.getOrElse(key.asServiceKey, default.asInstanceOf[Set[ActorRef[_]]]).asInstanceOf[Set[ActorRef[key.Protocol]]]
def getActorRefsFor[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] =
getEntriesFor(key).map(_.ref.asInstanceOf[ActorRef[key.Protocol]])
def getOrEmpty[T](key: AbstractServiceKey): Set[ActorRef[key.Protocol]] = getOrElse(key, Set.empty)
def getEntriesFor(key: AbstractServiceKey): Set[Entry] =
map.getOrElse(key.asServiceKey, Set.empty[Entry])
def addBinding[T](key: ServiceKey[T], value: ActorRef[T])(implicit cluster: Cluster): ServiceRegistry =
def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(map.addBinding(key, value))
def removeBinding[T](key: ServiceKey[T], value: ActorRef[T])(implicit cluster: Cluster): ServiceRegistry =
def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(map.removeBinding(key, value))
def removeAll(removals: Map[AbstractServiceKey, Set[ActorRef[_]]])(implicit cluster: Cluster): ServiceRegistry = {
def removeAll(removals: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = {
removals.foldLeft(this) {
case (acc, (key, actors))
actors.foldLeft(acc) {
case (innerAcc, actor)
innerAcc.removeBinding[key.Protocol](key.asServiceKey, actor.asInstanceOf[ActorRef[key.Protocol]])
case (acc, (key, entries))
entries.foldLeft(acc) {
case (innerAcc, entry)
innerAcc.removeBinding[key.Protocol](key.asServiceKey, entry)
}
}
}
def toORMultiMap: ORMultiMap[ServiceKey[_], ActorRef[_]] = map
def toORMultiMap: ORMultiMap[ServiceKey[_], Entry] = map
}
object ServiceRegistry {
val empty = ServiceRegistry(EmptyORMultiMap)
final val Empty = ServiceRegistry(EmptyORMultiMap)
def collectChangedKeys(previousState: ServiceRegistry, newState: ServiceRegistry): Set[AbstractServiceKey] = {
val allKeys = previousState.toORMultiMap.entries.keySet ++ newState.toORMultiMap.entries.keySet
allKeys.foldLeft(Set.empty[AbstractServiceKey]) { (acc, key)
val oldValues = previousState.getOrEmpty(key)
val newValues = newState.getOrEmpty(key)
val oldValues = previousState.getEntriesFor(key)
val newValues = newState.getEntriesFor(key)
if (oldValues != newValues) acc + key
else acc
}
@ -72,16 +83,18 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
sealed trait InternalCommand
final case class RegisteredActorTerminated[T](key: ServiceKey[T], ref: ActorRef[T]) extends InternalCommand
final case class SubscriberTerminated[T](key: ServiceKey[T], ref: ActorRef[ReceptionistMessages.Listing[T]]) extends InternalCommand
final case class NodeRemoved(addresses: Address) extends InternalCommand
final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], ActorRef[_]]) extends InternalCommand
final case class NodeRemoved(addresses: UniqueAddress) extends InternalCommand
final case class ChangeFromReplicator(value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand
case object RemoveTick extends InternalCommand
// captures setup/dependencies so we can avoid doing it over and over again
private class Setup(ctx: ActorContext[Any]) {
class Setup(ctx: ActorContext[Any]) {
val untypedSystem = ctx.system.toUntyped
val settings = ClusterReceptionistSettings(ctx.system)
val replicator = DistributedData(untypedSystem).replicator
val selfSystemUid = AddressUidExtension(untypedSystem).longAddressUid
implicit val cluster = Cluster(untypedSystem)
def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress
}
override def behavior: Behavior[Command] = Behaviors.setup[Any] { ctx
@ -97,7 +110,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// remove entries when members are removed
val clusterEventMessageAdapter: ActorRef[MemberRemoved] =
ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) NodeRemoved(member.address) }
ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) NodeRemoved(member.uniqueAddress) }
setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved])
// also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update,
@ -107,7 +120,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
behavior(
setup,
ServiceRegistry.empty,
ServiceRegistry.Empty,
TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV]
)
}.narrow[Command]
@ -145,36 +158,31 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
})
def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = {
val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key))
val msg = ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key))
subscriptions.get(key).foreach(_ ! msg)
}
def nodesRemoved(addresses: Set[Address]): Behavior[Any] = {
def nodesRemoved(addresses: Set[UniqueAddress]): Behavior[Any] = {
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) {
import akka.actor.typed.scaladsl.adapter._
val localAddress = ctx.system.toUntyped.asInstanceOf[ExtendedActorSystem].provider.getDefaultAddress
def isOnRemovedNode(ref: ActorRef[_]): Boolean = {
if (ref.path.address.hasLocalScope) addresses(localAddress)
else addresses(ref.path.address)
}
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress))
val removals = {
state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[ActorRef[_]]]) {
case (acc, (key, values))
val removedActors = values.filter(isOnRemovedNode)
if (removedActors.isEmpty) acc // no change
else acc + (key -> removedActors)
state.map.entries.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries))
val removedEntries = entries.filter(isOnRemovedNode)
if (removedEntries.isEmpty) acc // no change
else acc + (key -> removedEntries)
}
}
if (removals.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug(
"Node(s) [{}] removed, updating registry [{}]",
"Node(s) [{}] removed, updating registry removing: [{}]",
addresses.mkString(","),
removals.map { case (key, actors) key.asServiceKey.id -> actors.mkString("[", ", ", "]") }.mkString(","))
removals.map {
case (key, entries) key.asServiceKey.id -> entries.mkString("[", ", ", "]")
}.mkString(","))
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).removeAll(removals).toORMultiMap
@ -187,26 +195,27 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def onCommand(cmd: Command): Behavior[Any] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo)
ctx.log.debug("Actor was registered: [{}] [{}]", key, serviceInstance.path)
val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("Actor was registered: [{}] [{}]", key, entry)
watchWith(ctx, serviceInstance, RegisteredActorTerminated(key, serviceInstance))
maybeReplyTo match {
case Some(replyTo) replyTo ! ReceptionistMessages.Registered(key, serviceInstance)
case None
}
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).addBinding(key, serviceInstance).toORMultiMap
ServiceRegistry(registry).addBinding(key, entry).toORMultiMap
}
Behaviors.same
case ReceptionistMessages.Find(key, replyTo)
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key))
replyTo ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key))
Behaviors.same
case ReceptionistMessages.Subscribe(key, subscriber)
watchWith(ctx, subscriber, SubscriberTerminated(key, subscriber))
// immediately reply with initial listings to the new subscriber
subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getOrEmpty(key))
subscriber ! ReceptionistMessages.Listing(key.asServiceKey, state.getActorRefsFor(key))
next(newSubscriptions = subscriptions.inserted(key)(subscriber))
}
@ -217,9 +226,10 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
next(newSubscriptions = subscriptions.removed(key)(subscriber))
case RegisteredActorTerminated(key, serviceInstance)
ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, serviceInstance.path)
val entry = Entry(serviceInstance, setup.selfSystemUid)
ctx.log.debug("Registered actor terminated: [{}] [{}]", key.asServiceKey.id, entry)
replicator ! Replicator.Update(ReceptionistKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).removeBinding(key, serviceInstance).toORMultiMap
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
}
Behaviors.same
@ -230,9 +240,10 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
if (changedKeys.nonEmpty) {
if (ctx.log.isDebugEnabled) {
ctx.log.debug(
"Registration changed: [{}]",
"Change from replicator: [{}], changes: [{}]",
newState.map.entries,
changedKeys.map(key
key.asServiceKey.id -> newState.getOrEmpty(key).map(_.path).mkString("[", ", ", "]")
key.asServiceKey.id -> newState.getEntriesFor(key).mkString("[", ", ", "]")
).mkString(", "))
}
changedKeys.foreach(notifySubscribersFor(_, newState))
@ -241,24 +252,33 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
Behaviors.same
}
case NodeRemoved(address)
case NodeRemoved(uniqueAddress)
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress)) {
nodesRemoved(Set(address))
ctx.log.debug(s"Leader node observed removed address [{}]", uniqueAddress)
nodesRemoved(Set(uniqueAddress))
} else Behaviors.same
case RemoveTick
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress)) {
val allAddressesInState: Set[Address] = state.map.entries.flatMap {
case (_, values)
val allAddressesInState: Set[UniqueAddress] = state.map.entries.flatMap {
case (_, entries)
// don't care about local (empty host:port addresses)
values.collect { case ref if ref.path.address.hasGlobalScope ref.path.address }
entries.collect {
case entry if entry.ref.path.address.hasGlobalScope
entry.uniqueAddress(setup.selfUniqueAddress)
}
}(collection.breakOut)
val clusterAddresses = cluster.state.members.map(_.address)
val diff = allAddressesInState diff clusterAddresses
if (diff.isEmpty) Behavior.same
else nodesRemoved(diff)
val clusterAddresses = cluster.state.members.map(_.uniqueAddress)
val notInCluster = allAddressesInState diff clusterAddresses
if (notInCluster.isEmpty) Behavior.same
else {
if (ctx.log.isDebugEnabled)
ctx.log.debug("Leader node cleanup tick, removed nodes: [{}]", notInCluster.mkString(","))
nodesRemoved(notInCluster)
}
} else
Behavior.same
}

View file

@ -0,0 +1,41 @@
/**
* Copyright (C) 2009-${YEAR} Lightbend Inc. <https://www.lightbend.com>
*/
package akka.cluster.typed.internal
import akka.actor.ExtendedActorSystem
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ Behavior, TypedAkkaSpecWithShutdown }
import akka.cluster.typed.internal.receptionist.ClusterReceptionist
import akka.serialization.SerializationExtension
import akka.testkit.typed.scaladsl.ActorTestKit
class AkkaClusterTypedSerializerSpec extends ActorTestKit with TypedAkkaSpecWithShutdown {
val ref = spawn(Behavior.empty[String])
val untypedSystem = system.toUntyped
val serializer = new AkkaClusterTypedSerializer(untypedSystem.asInstanceOf[ExtendedActorSystem])
"AkkaClusterTypedSerializer" must {
Seq(
"ReceptionistEntry" ClusterReceptionist.Entry(ref, 666L)
).foreach {
case (scenario, item)
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(untypedSystem)
serializer.serializerFor(item.getClass).getClass should be(classOf[AkkaClusterTypedSerializer])
}
s"serialize and de-serialize $scenario" in {
verifySerialization(item)
}
}
}
def verifySerialization(msg: AnyRef): Unit = {
serializer.fromBinary(serializer.toBinary(msg), serializer.manifest(msg)) should be(msg)
}
}

View file

@ -6,19 +6,21 @@ package akka.cluster.typed.internal.receptionist
import java.nio.charset.StandardCharsets
import akka.actor.ExtendedActorSystem
import akka.actor.typed.{ ActorRef, ActorRefResolver, TypedAkkaSpecWithShutdown }
import akka.actor.{ ExtendedActorSystem, RootActorPath }
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.cluster.typed.Cluster
import akka.actor.typed.{ ActorRef, ActorRefResolver }
import akka.cluster.MemberStatus
import akka.cluster.typed.{ Cluster, Join }
import akka.serialization.SerializerWithStringManifest
import akka.testkit.typed.TestKitSettings
import akka.testkit.typed.FishingOutcome
import akka.testkit.typed.scaladsl.{ ActorTestKit, TestProbe }
import com.typesafe.config.ConfigFactory
import scala.concurrent.duration._
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.{ Matchers, WordSpec }
import akka.cluster.typed.Join
import scala.concurrent.Await
import scala.concurrent.duration._
object ClusterReceptionistSpec {
val config = ConfigFactory.parseString(
@ -40,7 +42,6 @@ object ClusterReceptionistSpec {
}
}
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster {
@ -89,40 +90,40 @@ object ClusterReceptionistSpec {
val PingKey = ServiceKey[PingProtocol]("pingy")
}
class ClusterReceptionistSpec extends ActorTestKit
with TypedAkkaSpecWithShutdown {
override def config = ClusterReceptionistSpec.config
class ClusterReceptionistSpec extends WordSpec with Matchers {
import ClusterReceptionistSpec._
implicit val testSettings = TestKitSettings(system)
val clusterNode1 = Cluster(system)
val testKit2 = new ActorTestKit {
override def name = ClusterReceptionistSpec.this.system.name
override def config = ClusterReceptionistSpec.this.system.settings.config
}
val system2 = testKit2.system
val clusterNode2 = Cluster(system2)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
import Receptionist._
"The cluster receptionist" must {
"must eventually replicate registrations to the other side" in {
val regProbe = TestProbe[Any]()(system)
"eventually replicate registrations to the other side" in {
val testKit1 = new ActorTestKit {
override def name = super.name + "-test-1"
override def config = ClusterReceptionistSpec.config
}
val system1 = testKit1.system
val testKit2 = new ActorTestKit {
override def name = system1.name
override def config = testKit1.system.settings.config
}
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service = spawn(pingPongBehavior)
system.receptionist ! Register(PingKey, service, regProbe.ref)
regProbe.expectMessage(Registered(PingKey, service))
val service = testKit1.spawn(pingPongBehavior)
testKit1.system.receptionist ! Register(PingKey, service, regProbe1.ref)
regProbe1.expectMessage(Registered(PingKey, service))
val PingKey.Listing(remoteServiceRefs) = regProbe2.expectMessageType[Listing]
val theRef = remoteServiceRefs.head
@ -131,34 +132,196 @@ class ClusterReceptionistSpec extends ActorTestKit
service ! Perish
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"must remove registrations when node dies" in {
"remove registrations when node dies" in {
val testKit1 = new ActorTestKit {
override def name = super.name + "-test-2"
override def config = ClusterReceptionistSpec.config
}
val system1 = testKit1.system
val testKit2 = new ActorTestKit {
override def name = system1.name
override def config = testKit1.system.settings.config
}
val system2 = testKit2.system
try {
val regProbe = TestProbe[Any]()(system)
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
system.receptionist ! Subscribe(PingKey, regProbe.ref)
regProbe.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service2 = testKit2.spawn(pingPongBehavior)
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
val remoteServiceRefs = regProbe.expectMessageType[Listing].serviceInstances(PingKey)
val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe.ref)
regProbe.expectMessage(Pong)
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
// abrupt termination
system2.terminate()
regProbe.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit()
}
}
"work with services registered before node joins cluster" in {
val testKit1 = new ActorTestKit {
override def name = super.name + "-test-2"
override def config = ClusterReceptionistSpec.config
}
val system1 = testKit1.system
val testKit2 = new ActorTestKit {
override def name = system1.name
override def config = testKit1.system.settings.config
}
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service2 = testKit2.spawn(pingPongBehavior)
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
// then we join the cluster
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up))
// and the subscriber on node1 should see the service
val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
// abrupt termination
system2.terminate()
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit()
}
}
"handle a new incarnation of the same node well" in {
val testKit1 = new ActorTestKit {
override def name = super.name + "-test-3"
override def config = ClusterReceptionistSpec.config
}
val system1 = testKit1.system
val testKit2 = new ActorTestKit {
override def name = system1.name
override def config = testKit1.system.settings.config
}
val system2 = testKit2.system
try {
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
system1.receptionist ! Subscribe(PingKey, regProbe1.ref)
regProbe1.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val service2 = testKit2.spawn(pingPongBehavior, "instance")
system2.receptionist ! Register(PingKey, service2, regProbe2.ref)
regProbe2.expectMessage(Registered(PingKey, service2))
// make sure we saw the first incarnation on node1
val remoteServiceRefs = regProbe1.expectMessageType[Listing].serviceInstances(PingKey)
val theRef = remoteServiceRefs.head
theRef ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
// FIXME do we need to blackhole the connection to system2 before terminating
// right now it doesn't work anyways though ;D
// abrupt termination but then a node with the same host:port comes online quickly
system1.log.debug("Terminating system2, uid: [{}]", clusterNode2.selfMember.uniqueAddress.longUid)
Await.ready(system2.terminate(), 10.seconds)
val testKit3 = new ActorTestKit {
override protected def name: String = system1.name
override def config: Config = testKit1.config
}
try {
val system3 = testKit3.system
system1.log.debug("Starting system3 at same hostname port as system2, uid: [{}]", Cluster(system3).selfMember.uniqueAddress.longUid)
val clusterNode3 = Cluster(system3)
clusterNode3.manager ! Join(clusterNode1.selfMember.address)
val regProbe3 = TestProbe[Any]()(system3)
// and registers the same service key
val service3 = testKit3.spawn(pingPongBehavior, "instance")
system3.log.debug("Spawning/registering ping service in new incarnation {}#{}", service3.path, service3.path.uid)
system3.receptionist ! Register(PingKey, service3, regProbe3.ref)
regProbe3.expectMessage(Registered(PingKey, service3))
system3.log.debug("Registered actor [{}#{}] for system3", service3.path, service3.path.uid)
// make sure it joined fine and node1 has upped it
regProbe1.awaitAssert {
clusterNode1.state.members.exists(m
m.uniqueAddress == clusterNode3.selfMember.uniqueAddress &&
m.status == MemberStatus.Up &&
!clusterNode1.state.unreachable(m)
)
}
// we should get either empty message and then updated with the new incarnation actor
// or just updated with the new service directly
val msg = regProbe1.fishForMessage(20.seconds) {
case PingKey.Listing(entries) if entries.size == 1 FishingOutcome.Complete
case _: Listing FishingOutcome.ContinueAndIgnore
}
val PingKey.Listing(entries) = msg.last
entries should have size 1
val ref = entries.head
val service3RemotePath = RootActorPath(clusterNode3.selfMember.address) / "user" / "instance"
ref.path should ===(service3RemotePath)
ref ! Ping(regProbe1.ref)
regProbe1.expectMessage(Pong)
} finally {
testKit3.shutdownTestKit()
}
} finally {
testKit1.shutdownTestKit()
if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit()
}
}
override def afterAll(): Unit = {
super.afterAll()
ActorTestKit.shutdown(system2, 10.seconds)
}
}

View file

@ -54,7 +54,10 @@ private[akka] object TestKitUtils {
}
// sanitize for actor system name
filteredStack.next().replaceFirst(""".*\.""", "").replaceAll("[^a-zA-Z_0-9]", "_")
filteredStack.next()
.replaceFirst("""^.*\.""", "") // drop package name
.replaceAll("""\$\$?\w+""", "") // drop scala anonymous functions/classes
.replaceAll("[^a-zA-Z_0-9]", "_")
}
def shutdown(

View file

@ -408,6 +408,7 @@ lazy val clusterTyped = akkaModule("akka-cluster-typed")
distributedData,
persistence % "test->test",
persistenceTyped % "test->test",
protobuf,
typedTestkit % "test->test",
actorTypedTests % "test->test"
)