diff --git a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java
index 609da28988..99e40644c0 100644
--- a/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java
+++ b/akka-cluster-typed/src/main/java/akka/cluster/typed/internal/protobuf/ClusterMessages.java
@@ -49,6 +49,17 @@ public final class ClusterMessages {
* @return The systemUid.
*/
long getSystemUid();
+
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return Whether the createdTimestamp field is set.
+ */
+ boolean hasCreatedTimestamp();
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return The createdTimestamp.
+ */
+ long getCreatedTimestamp();
}
/**
* Protobuf type {@code akka.cluster.typed.ReceptionistEntry}
@@ -108,6 +119,11 @@ public final class ClusterMessages {
systemUid_ = input.readUInt64();
break;
}
+ case 24: {
+ bitField0_ |= 0x00000004;
+ createdTimestamp_ = input.readInt64();
+ break;
+ }
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
@@ -203,6 +219,23 @@ public final class ClusterMessages {
return systemUid_;
}
+ public static final int CREATEDTIMESTAMP_FIELD_NUMBER = 3;
+ private long createdTimestamp_;
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return Whether the createdTimestamp field is set.
+ */
+ public boolean hasCreatedTimestamp() {
+ return ((bitField0_ & 0x00000004) != 0);
+ }
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return The createdTimestamp.
+ */
+ public long getCreatedTimestamp() {
+ return createdTimestamp_;
+ }
+
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@@ -231,6 +264,9 @@ public final class ClusterMessages {
if (((bitField0_ & 0x00000002) != 0)) {
output.writeUInt64(2, systemUid_);
}
+ if (((bitField0_ & 0x00000004) != 0)) {
+ output.writeInt64(3, createdTimestamp_);
+ }
unknownFields.writeTo(output);
}
@@ -247,6 +283,10 @@ public final class ClusterMessages {
size += akka.protobufv3.internal.CodedOutputStream
.computeUInt64Size(2, systemUid_);
}
+ if (((bitField0_ & 0x00000004) != 0)) {
+ size += akka.protobufv3.internal.CodedOutputStream
+ .computeInt64Size(3, createdTimestamp_);
+ }
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@@ -272,6 +312,11 @@ public final class ClusterMessages {
if (getSystemUid()
!= other.getSystemUid()) return false;
}
+ if (hasCreatedTimestamp() != other.hasCreatedTimestamp()) return false;
+ if (hasCreatedTimestamp()) {
+ if (getCreatedTimestamp()
+ != other.getCreatedTimestamp()) return false;
+ }
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@@ -292,6 +337,11 @@ public final class ClusterMessages {
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong(
getSystemUid());
}
+ if (hasCreatedTimestamp()) {
+ hash = (37 * hash) + CREATEDTIMESTAMP_FIELD_NUMBER;
+ hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong(
+ getCreatedTimestamp());
+ }
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@@ -429,6 +479,8 @@ public final class ClusterMessages {
bitField0_ = (bitField0_ & ~0x00000001);
systemUid_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
+ createdTimestamp_ = 0L;
+ bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@@ -465,6 +517,10 @@ public final class ClusterMessages {
result.systemUid_ = systemUid_;
to_bitField0_ |= 0x00000002;
}
+ if (((from_bitField0_ & 0x00000004) != 0)) {
+ result.createdTimestamp_ = createdTimestamp_;
+ to_bitField0_ |= 0x00000004;
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -522,6 +578,9 @@ public final class ClusterMessages {
if (other.hasSystemUid()) {
setSystemUid(other.getSystemUid());
}
+ if (other.hasCreatedTimestamp()) {
+ setCreatedTimestamp(other.getCreatedTimestamp());
+ }
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@@ -678,6 +737,43 @@ public final class ClusterMessages {
onChanged();
return this;
}
+
+ private long createdTimestamp_ ;
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return Whether the createdTimestamp field is set.
+ */
+ public boolean hasCreatedTimestamp() {
+ return ((bitField0_ & 0x00000004) != 0);
+ }
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return The createdTimestamp.
+ */
+ public long getCreatedTimestamp() {
+ return createdTimestamp_;
+ }
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @param value The createdTimestamp to set.
+ * @return This builder for chaining.
+ */
+ public Builder setCreatedTimestamp(long value) {
+ bitField0_ |= 0x00000004;
+ createdTimestamp_ = value;
+ onChanged();
+ return this;
+ }
+ /**
+ * optional int64 createdTimestamp = 3;
+ * @return This builder for chaining.
+ */
+ public Builder clearCreatedTimestamp() {
+ bitField0_ = (bitField0_ & ~0x00000004);
+ createdTimestamp_ = 0L;
+ onChanged();
+ return this;
+ }
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@@ -1394,11 +1490,12 @@ public final class ClusterMessages {
static {
java.lang.String[] descriptorData = {
"\n\025ClusterMessages.proto\022\022akka.cluster.ty" +
- "ped\032\026ContainerFormats.proto\"8\n\021Reception" +
+ "ped\032\026ContainerFormats.proto\"R\n\021Reception" +
"istEntry\022\020\n\010actorRef\030\001 \002(\t\022\021\n\tsystemUid\030" +
- "\002 \002(\004\"3\n\026PubSubMessagePublished\022\031\n\007messa" +
- "ge\030\001 \002(\0132\010.PayloadB(\n$akka.cluster.typed" +
- ".internal.protobufH\001"
+ "\002 \002(\004\022\030\n\020createdTimestamp\030\003 \001(\003\"3\n\026PubSu" +
+ "bMessagePublished\022\031\n\007message\030\001 \002(\0132\010.Pay" +
+ "loadB(\n$akka.cluster.typed.internal.prot" +
+ "obufH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@@ -1410,7 +1507,7 @@ public final class ClusterMessages {
internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor,
- new java.lang.String[] { "ActorRef", "SystemUid", });
+ new java.lang.String[] { "ActorRef", "SystemUid", "CreatedTimestamp", });
internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable = new
diff --git a/akka-cluster-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-28792-ClusterReceptionist.excludes b/akka-cluster-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-28792-ClusterReceptionist.excludes
new file mode 100644
index 0000000000..7c62477881
--- /dev/null
+++ b/akka-cluster-typed/src/main/mima-filters/2.6.4.backwards.excludes/issue-28792-ClusterReceptionist.excludes
@@ -0,0 +1,3 @@
+# #28792 Changes to internals of ClusterReceptionist
+ProblemFilters.exclude[Problem]("akka.cluster.typed.internal.receptionist.*")
+ProblemFilters.exclude[Problem]("akka.cluster.typed.internal.protobuf.*")
diff --git a/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto b/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto
index 7b77eafc47..6127847db6 100644
--- a/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto
+++ b/akka-cluster-typed/src/main/protobuf/ClusterMessages.proto
@@ -14,8 +14,9 @@ import "ContainerFormats.proto";
message ReceptionistEntry {
required string actorRef = 1;
required uint64 systemUid = 2;
+ optional int64 createdTimestamp = 3;
}
message PubSubMessagePublished {
required Payload message = 1;
-}
\ No newline at end of file
+}
diff --git a/akka-cluster-typed/src/main/resources/reference.conf b/akka-cluster-typed/src/main/resources/reference.conf
index 45d36c6eca..4cd45a5d24 100644
--- a/akka-cluster-typed/src/main/resources/reference.conf
+++ b/akka-cluster-typed/src/main/resources/reference.conf
@@ -14,6 +14,11 @@ akka.cluster.typed.receptionist {
# in case of abrupt termination.
pruning-interval = 3 s
+ # The periodic task to remove actor references that are hosted by removed nodes
+ # will only remove entries older than this duration. The reason for this
+ # is to avoid removing entries of nodes that haven't been visible as joining.
+ prune-removed-older-than = 60 s
+
# Shard the services over this many Distributed Data keys, with large amounts of different
# service keys storing all of them in the same Distributed Data entry would lead to large updates
# etc. instead the keys are sharded across this number of keys. This must be the same on all nodes
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala
index 38d19b9661..660c6fc4e7 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializer.scala
@@ -61,13 +61,17 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
.toByteArray
}
- private def receptionistEntryToBinary(e: Entry): Array[Byte] =
- ClusterMessages.ReceptionistEntry
+ private def receptionistEntryToBinary(e: Entry): Array[Byte] = {
+ val b = ClusterMessages.ReceptionistEntry
.newBuilder()
.setActorRef(resolver.toSerializationFormat(e.ref))
.setSystemUid(e.systemUid)
- .build()
- .toByteArray
+
+ if (e.createdTimestamp != 0L)
+ b.setCreatedTimestamp(e.createdTimestamp)
+
+ b.build().toByteArray
+ }
private def pubSubMessageFromBinary(bytes: Array[Byte]): TopicImpl.MessagePublished[_] = {
val parsed = ClusterMessages.PubSubMessagePublished.parseFrom(bytes)
@@ -77,6 +81,7 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
private def receptionistEntryFromBinary(bytes: Array[Byte]): Entry = {
val re = ClusterMessages.ReceptionistEntry.parseFrom(bytes)
- Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)
+ val createdTimestamp = if (re.hasCreatedTimestamp) re.getCreatedTimestamp else 0L
+ Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)(createdTimestamp)
}
}
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala
index 8f9a70f225..5a6054ffe8 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala
@@ -48,13 +48,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// values contain system uid to make it possible to discern actors at the same
// path in different incarnations of a cluster node
- final case class Entry(ref: ActorRef[_], systemUid: Long) {
+ final case class Entry(ref: ActorRef[_], systemUid: Long)(val createdTimestamp: Long) {
def uniqueAddress(selfAddress: Address): UniqueAddress =
if (ref.path.address.hasLocalScope) UniqueAddress(selfAddress, systemUid)
else UniqueAddress(ref.path.address, systemUid)
+
override def toString: String =
s"${ref.path.toString}#${ref.path.uid} @ $systemUid"
-
}
private sealed trait InternalCommand extends Command
@@ -262,56 +262,67 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
cluster.state.leader.contains(cluster.selfAddress)
}
- def nodesRemoved(addresses: Set[UniqueAddress]): Unit = {
+ def nodesRemoved(addresses: Set[UniqueAddress], onlyRemoveOldEntries: Boolean): Unit = {
// ok to update from several nodes but more efficient to try to do it from one node
- if (isLeader) {
- def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
+ def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
- val removals = {
- state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
- case (acc, (key, entries)) =>
- val removedEntries = entries.filter(isOnRemovedNode)
- if (removedEntries.isEmpty) acc // no change
- else acc + (key -> removedEntries)
- }
+ val now = System.currentTimeMillis()
+
+ // it possible that an entry is added before MemberJoined is visible and such entries should not be removed
+ def isOld(entry: Entry): Boolean = (now - entry.createdTimestamp) >= settings.pruneRemovedOlderThan.toMillis
+
+ val removals = {
+ state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
+ case (acc, (key, entries)) =>
+ val removedEntries =
+ entries.filter(entry => isOnRemovedNode(entry) && (!onlyRemoveOldEntries || isOld(entry)))
+
+ if (removedEntries.isEmpty) acc // no change
+ else acc + (key -> removedEntries)
}
+ }
- if (removals.nonEmpty) {
- if (ctx.log.isDebugEnabled)
- ctx.log.debugN(
- "ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
- cluster.selfAddress,
- addresses.mkString(","),
- removals
- .map {
- case (key, entries) => key.asServiceKey.id -> entries.mkString("[", ", ", "]")
- }
- .mkString(","))
-
- // shard changes over the ddata keys they belong to
- val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
-
- removalsPerDdataKey.foreach {
- case (ddataKey, removalForKey) =>
- replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
- ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
+ if (removals.nonEmpty) {
+ if (ctx.log.isDebugEnabled)
+ ctx.log.debugN(
+ "ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
+ cluster.selfAddress,
+ addresses.mkString(","),
+ removals
+ .map {
+ case (key, entries) => key.asServiceKey.id -> entries.mkString("[", ", ", "]")
}
- }
+ .mkString(","))
+ // shard changes over the ddata keys they belong to
+ val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
+
+ removalsPerDdataKey.foreach {
+ case (ddataKey, removalForKey) =>
+ replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
+ ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
+ }
}
}
}
def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newState: State): Unit = {
- keysForNode.foreach { changedKey =>
+ notifySubscribers(keysForNode, servicesWereAddedOrRemoved = false, newState)
+ }
+
+ def notifySubscribers(
+ changedKeys: Set[AbstractServiceKey],
+ servicesWereAddedOrRemoved: Boolean,
+ newState: State): Unit = {
+ changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
- val subscribers = state.subscriptions.get(changedKey)
+ val subscribers = newState.subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
- ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = false)
+ ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved)
subscribers.foreach(_ ! listing)
}
}
@@ -320,7 +331,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def onCommand(cmd: Command): Behavior[Command] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) {
- val entry = Entry(serviceInstance, setup.selfSystemUid)
+ val entry = Entry(serviceInstance, setup.selfSystemUid)(System.currentTimeMillis())
ctx.log
.debugN("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
// actor already watched after one service key registration
@@ -343,7 +354,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) {
- val entry = Entry(serviceInstance, setup.selfSystemUid)
+ val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
ctx.log.debugN(
"ClusterReceptionist [{}] - Unregister actor: [{}] [{}]",
cluster.selfAddress,
@@ -400,7 +411,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
behavior(setup, state.removeSubscriber(subscriber))
case LocalServiceActorTerminated(serviceInstance) =>
- val entry = Entry(serviceInstance, setup.selfSystemUid)
+ val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
// could be empty if there was a race between termination and unregistration
val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
@@ -439,17 +450,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
state.tombstones.mkString(", "))
}
+ notifySubscribers(changedKeys, servicesWereAddedOrRemoved = true, newState)
+
changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
- val subscribers = state.subscriptions.get(changedKey)
- if (subscribers.nonEmpty) {
- val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
- val listing =
- ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = true)
- subscribers.foreach(_ ! listing)
- }
-
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
// is re-introduced because of a concurrent update, in that case we need to re-remove it
val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(state.hasTombstone(serviceKey))
@@ -463,7 +468,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
tombstonedButReAdded
.foldLeft(ServiceRegistry(registry)) { (acc, ref) =>
- acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid))
+ acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid)(0L))
}
.toORMultiMap
}
@@ -476,7 +481,23 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
case NodeAdded(uniqueAddress) =>
- behavior(setup, state.copy(registry = state.registry.addNode(uniqueAddress)))
+ if (state.registry.nodes.contains(uniqueAddress)) {
+ Behaviors.same
+ } else {
+ val newState = state.copy(registry = state.registry.addNode(uniqueAddress))
+ val keysForNode = newState.registry.keysFor(uniqueAddress)
+ if (keysForNode.nonEmpty) {
+ ctx.log.debug2(
+ "ClusterReceptionist [{}] - Node with registered services added [{}]",
+ cluster.selfAddress,
+ uniqueAddress)
+ notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
+ } else {
+ ctx.log.debug2("ClusterReceptionist [{}] - Node added [{}]", cluster.selfAddress, uniqueAddress)
+ }
+
+ behavior(setup, newState)
+ }
case NodeRemoved(uniqueAddress) =>
if (uniqueAddress == selfUniqueAddress) {
@@ -484,17 +505,30 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// If self cluster node is shutting down our own entries should have been removed via
// watch-Terminated or will be removed by other nodes. This point is anyway too late.
Behaviors.stopped
- } else {
+ } else if (state.registry.nodes.contains(uniqueAddress)) {
+
+ val keysForNode = state.registry.keysFor(uniqueAddress)
+ val newState = state.copy(registry = state.registry.removeNode(uniqueAddress))
+ if (keysForNode.nonEmpty) {
+ ctx.log.debug2(
+ "ClusterReceptionist [{}] - Node with registered services removed [{}]",
+ cluster.selfAddress,
+ uniqueAddress)
+ notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
+ }
+
// Ok to update from several nodes but more efficient to try to do it from one node.
if (isLeader) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Leader node observed removed node [{}]",
cluster.selfAddress,
uniqueAddress)
- nodesRemoved(Set(uniqueAddress))
+ nodesRemoved(Set(uniqueAddress), onlyRemoveOldEntries = false)
}
- behavior(setup, state.copy(registry = state.registry.removeNode(uniqueAddress)))
+ behavior(setup, newState)
+ } else {
+ Behaviors.same
}
case NodeUnreachable(uniqueAddress) =>
@@ -528,14 +562,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
state.registry.allUniqueAddressesInState(setup.selfUniqueAddress)
val notInCluster = allAddressesInState.diff(state.registry.nodes)
- if (notInCluster.isEmpty) Behaviors.same
- else {
+ if (notInCluster.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug2(
"ClusterReceptionist [{}] - Leader node cleanup tick, removed nodes: [{}]",
cluster.selfAddress,
notInCluster.mkString(","))
- nodesRemoved(notInCluster)
+ nodesRemoved(notInCluster, onlyRemoveOldEntries = true)
}
}
Behaviors.same
diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala
index 9e7659c1b2..f30452aabe 100644
--- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala
+++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala
@@ -40,6 +40,7 @@ private[akka] object ClusterReceptionistSettings {
ClusterReceptionistSettings(
writeConsistency,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
+ pruneRemovedOlderThan = config.getDuration("prune-removed-older-than", MILLISECONDS).millis,
config.getInt("distributed-key-count"),
replicatorSettings)
}
@@ -52,5 +53,6 @@ private[akka] object ClusterReceptionistSettings {
private[akka] case class ClusterReceptionistSettings(
writeConsistency: WriteConsistency,
pruningInterval: FiniteDuration,
+ pruneRemovedOlderThan: FiniteDuration,
distributedKeyCount: Int,
replicatorSettings: ReplicatorSettings)
diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala
index 470c783c0c..2592b53bbf 100644
--- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala
+++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/AkkaClusterTypedSerializerSpec.scala
@@ -21,7 +21,7 @@ class AkkaClusterTypedSerializerSpec extends ScalaTestWithActorTestKit with AnyW
"AkkaClusterTypedSerializer" must {
- Seq("ReceptionistEntry" -> ClusterReceptionist.Entry(ref, 666L)).foreach {
+ Seq("ReceptionistEntry" -> ClusterReceptionist.Entry(ref, 666L)(System.currentTimeMillis())).foreach {
case (scenario, item) =>
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(classicSystem)
diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
index 365568228c..c455eade12 100644
--- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
+++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala
@@ -4,6 +4,8 @@
package akka.cluster.typed.internal.receptionist
+import java.util.concurrent.ThreadLocalRandom
+
import scala.concurrent.Await
import scala.concurrent.duration._
@@ -111,6 +113,33 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
}
}
+ "handle registrations before joining" in {
+ val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config)
+ val system1 = testKit1.system
+ val testKit2 = ActorTestKit(system1.name, system1.settings.config)
+ val system2 = testKit2.system
+ try {
+ val regProbe1 = TestProbe[Any]()(system1)
+ val regProbe2 = TestProbe[Any]()(system2)
+ val service = testKit1.spawn(pingPongBehavior)
+ testKit1.system.receptionist ! Register(PingKey, service, regProbe1.ref)
+ regProbe1.expectMessage(Registered(PingKey, service))
+ system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
+ regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
+
+ val clusterNode1 = Cluster(system1)
+ clusterNode1.manager ! Join(clusterNode1.selfMember.address)
+ val clusterNode2 = Cluster(system2)
+ clusterNode2.manager ! Join(clusterNode1.selfMember.address)
+
+ val PingKey.Listing(remoteServiceRefs) = regProbe2.expectMessageType[Listing](10.seconds)
+ remoteServiceRefs.head.path.address should ===(Cluster(system1).selfMember.address)
+ } finally {
+ testKit1.shutdownTestKit()
+ testKit2.shutdownTestKit()
+ }
+ }
+
"remove registrations when node dies" in {
testNodeRemoval(down = true)
}
@@ -735,5 +764,68 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
}
// Fixme concurrent registration and unregistration
+
+ "notify subscribers when registering and joining simultaneously" in {
+ // failing test reproducer for issue #28792
+ // It's possible that the registry entry from the ddata update arrives before MemberJoined.
+ val config = ConfigFactory.parseString("""
+ # quick dissemination to increase the chance of the race condition
+ akka.cluster.typed.receptionist.distributed-data.write-consistency = all
+ akka.cluster.typed.receptionist.distributed-data.gossip-interval = 500ms
+ # run the RemoveTick cleanup often to exercise that scenario
+ akka.cluster.typed.receptionist.pruning-interval = 50ms
+ """).withFallback(ClusterReceptionistSpec.config)
+ val numberOfNodes = 6 // use 9 or more to stress it more
+ val testKits = Vector.fill(numberOfNodes)(ActorTestKit("ClusterReceptionistSpec", config))
+ try {
+ val probes = testKits.map(t => TestProbe[Any]()(t.system))
+ testKits.zip(probes).foreach { case (t, p) => t.system.receptionist ! Subscribe(PingKey, p.ref) }
+
+ val clusterNode1 = Cluster(testKits.head.system)
+ // join 3 first
+ (0 until 3).foreach { i =>
+ val t = testKits(i)
+ Cluster(t.system).manager ! Join(clusterNode1.selfMember.address)
+ val ref = t.spawn(pingPongBehavior)
+ t.system.receptionist ! Register(PingKey, ref)
+ }
+ // wait until all those are Up
+ (0 until 3).foreach { i =>
+ probes(i).awaitAssert(
+ Cluster(testKits(i).system).state.members.count(_.status == MemberStatus.Up) should ===(3),
+ 10.seconds)
+ }
+
+ // then join the rest randomly to the first 3
+ // important to not join all to first to be able to reproduce the problem
+ testKits.drop(3).foreach { t =>
+ val i = ThreadLocalRandom.current().nextInt(3)
+ Cluster(t.system).manager ! Join(Cluster(testKits(i).system).selfMember.address)
+ val ref = t.spawn(pingPongBehavior)
+ Thread.sleep(100) // increase chance of the race condition
+ t.system.receptionist ! Register(PingKey, ref)
+ }
+
+ (0 until numberOfNodes).foreach { i =>
+ probes(i).awaitAssert(
+ Cluster(testKits(i).system).state.members.count(_.status == MemberStatus.Up) should ===(numberOfNodes),
+ 30.seconds)
+ }
+
+ // eventually, all should be included in the Listing
+ (0 until numberOfNodes).foreach { i =>
+ probes(i).fishForMessage(10.seconds, s"$i") {
+ case PingKey.Listing(actors) if actors.size == numberOfNodes => FishingOutcomes.complete
+ case PingKey.Listing(_) => FishingOutcomes.continue
+ }
+ }
+ testKits.head.system.log.debug("All expected listings found.")
+
+ } finally {
+ // faster to terminate all at the same time
+ testKits.foreach(_.system.terminate())
+ testKits.foreach(_.shutdownTestKit())
+ }
+ }
}
}