Notify Receptionist subscribers when node added and removed, #28792 (#28795)

* notify subscribers when node added and removed
* If the change from ddata arrives before the join information
* One difficulty is that that the removal tick may trigger removal
  for entries that are in the ddata state but not in the membership yet.
* it's possible that the Entry arrives before MemberJoined, but
  such entries should not be removed by the RemoveTick
* by adding a timestamp of when the entry is created we can
  avoid removing such early entries and the MemberJoined should be
  visible before that duration has elapsed
This commit is contained in:
Patrik Nordwall 2020-04-02 15:19:05 +02:00 committed by GitHub
parent 2b06bb676a
commit abbef000ab
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 304 additions and 66 deletions

View file

@ -49,6 +49,17 @@ public final class ClusterMessages {
* @return The systemUid.
*/
long getSystemUid();
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return Whether the createdTimestamp field is set.
*/
boolean hasCreatedTimestamp();
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return The createdTimestamp.
*/
long getCreatedTimestamp();
}
/**
* Protobuf type {@code akka.cluster.typed.ReceptionistEntry}
@ -108,6 +119,11 @@ public final class ClusterMessages {
systemUid_ = input.readUInt64();
break;
}
case 24: {
bitField0_ |= 0x00000004;
createdTimestamp_ = input.readInt64();
break;
}
default: {
if (!parseUnknownField(
input, unknownFields, extensionRegistry, tag)) {
@ -203,6 +219,23 @@ public final class ClusterMessages {
return systemUid_;
}
public static final int CREATEDTIMESTAMP_FIELD_NUMBER = 3;
private long createdTimestamp_;
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return Whether the createdTimestamp field is set.
*/
public boolean hasCreatedTimestamp() {
return ((bitField0_ & 0x00000004) != 0);
}
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return The createdTimestamp.
*/
public long getCreatedTimestamp() {
return createdTimestamp_;
}
private byte memoizedIsInitialized = -1;
@java.lang.Override
public final boolean isInitialized() {
@ -231,6 +264,9 @@ public final class ClusterMessages {
if (((bitField0_ & 0x00000002) != 0)) {
output.writeUInt64(2, systemUid_);
}
if (((bitField0_ & 0x00000004) != 0)) {
output.writeInt64(3, createdTimestamp_);
}
unknownFields.writeTo(output);
}
@ -247,6 +283,10 @@ public final class ClusterMessages {
size += akka.protobufv3.internal.CodedOutputStream
.computeUInt64Size(2, systemUid_);
}
if (((bitField0_ & 0x00000004) != 0)) {
size += akka.protobufv3.internal.CodedOutputStream
.computeInt64Size(3, createdTimestamp_);
}
size += unknownFields.getSerializedSize();
memoizedSize = size;
return size;
@ -272,6 +312,11 @@ public final class ClusterMessages {
if (getSystemUid()
!= other.getSystemUid()) return false;
}
if (hasCreatedTimestamp() != other.hasCreatedTimestamp()) return false;
if (hasCreatedTimestamp()) {
if (getCreatedTimestamp()
!= other.getCreatedTimestamp()) return false;
}
if (!unknownFields.equals(other.unknownFields)) return false;
return true;
}
@ -292,6 +337,11 @@ public final class ClusterMessages {
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong(
getSystemUid());
}
if (hasCreatedTimestamp()) {
hash = (37 * hash) + CREATEDTIMESTAMP_FIELD_NUMBER;
hash = (53 * hash) + akka.protobufv3.internal.Internal.hashLong(
getCreatedTimestamp());
}
hash = (29 * hash) + unknownFields.hashCode();
memoizedHashCode = hash;
return hash;
@ -429,6 +479,8 @@ public final class ClusterMessages {
bitField0_ = (bitField0_ & ~0x00000001);
systemUid_ = 0L;
bitField0_ = (bitField0_ & ~0x00000002);
createdTimestamp_ = 0L;
bitField0_ = (bitField0_ & ~0x00000004);
return this;
}
@ -465,6 +517,10 @@ public final class ClusterMessages {
result.systemUid_ = systemUid_;
to_bitField0_ |= 0x00000002;
}
if (((from_bitField0_ & 0x00000004) != 0)) {
result.createdTimestamp_ = createdTimestamp_;
to_bitField0_ |= 0x00000004;
}
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@ -522,6 +578,9 @@ public final class ClusterMessages {
if (other.hasSystemUid()) {
setSystemUid(other.getSystemUid());
}
if (other.hasCreatedTimestamp()) {
setCreatedTimestamp(other.getCreatedTimestamp());
}
this.mergeUnknownFields(other.unknownFields);
onChanged();
return this;
@ -678,6 +737,43 @@ public final class ClusterMessages {
onChanged();
return this;
}
private long createdTimestamp_ ;
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return Whether the createdTimestamp field is set.
*/
public boolean hasCreatedTimestamp() {
return ((bitField0_ & 0x00000004) != 0);
}
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return The createdTimestamp.
*/
public long getCreatedTimestamp() {
return createdTimestamp_;
}
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @param value The createdTimestamp to set.
* @return This builder for chaining.
*/
public Builder setCreatedTimestamp(long value) {
bitField0_ |= 0x00000004;
createdTimestamp_ = value;
onChanged();
return this;
}
/**
* <code>optional int64 createdTimestamp = 3;</code>
* @return This builder for chaining.
*/
public Builder clearCreatedTimestamp() {
bitField0_ = (bitField0_ & ~0x00000004);
createdTimestamp_ = 0L;
onChanged();
return this;
}
@java.lang.Override
public final Builder setUnknownFields(
final akka.protobufv3.internal.UnknownFieldSet unknownFields) {
@ -1394,11 +1490,12 @@ public final class ClusterMessages {
static {
java.lang.String[] descriptorData = {
"\n\025ClusterMessages.proto\022\022akka.cluster.ty" +
"ped\032\026ContainerFormats.proto\"8\n\021Reception" +
"ped\032\026ContainerFormats.proto\"R\n\021Reception" +
"istEntry\022\020\n\010actorRef\030\001 \002(\t\022\021\n\tsystemUid\030" +
"\002 \002(\004\"3\n\026PubSubMessagePublished\022\031\n\007messa" +
"ge\030\001 \002(\0132\010.PayloadB(\n$akka.cluster.typed" +
".internal.protobufH\001"
"\002 \002(\004\022\030\n\020createdTimestamp\030\003 \001(\003\"3\n\026PubSu" +
"bMessagePublished\022\031\n\007message\030\001 \002(\0132\010.Pay" +
"loadB(\n$akka.cluster.typed.internal.prot" +
"obufH\001"
};
descriptor = akka.protobufv3.internal.Descriptors.FileDescriptor
.internalBuildGeneratedFileFrom(descriptorData,
@ -1410,7 +1507,7 @@ public final class ClusterMessages {
internal_static_akka_cluster_typed_ReceptionistEntry_fieldAccessorTable = new
akka.protobufv3.internal.GeneratedMessageV3.FieldAccessorTable(
internal_static_akka_cluster_typed_ReceptionistEntry_descriptor,
new java.lang.String[] { "ActorRef", "SystemUid", });
new java.lang.String[] { "ActorRef", "SystemUid", "CreatedTimestamp", });
internal_static_akka_cluster_typed_PubSubMessagePublished_descriptor =
getDescriptor().getMessageTypes().get(1);
internal_static_akka_cluster_typed_PubSubMessagePublished_fieldAccessorTable = new

View file

@ -0,0 +1,3 @@
# #28792 Changes to internals of ClusterReceptionist
ProblemFilters.exclude[Problem]("akka.cluster.typed.internal.receptionist.*")
ProblemFilters.exclude[Problem]("akka.cluster.typed.internal.protobuf.*")

View file

@ -14,6 +14,7 @@ import "ContainerFormats.proto";
message ReceptionistEntry {
required string actorRef = 1;
required uint64 systemUid = 2;
optional int64 createdTimestamp = 3;
}
message PubSubMessagePublished {

View file

@ -14,6 +14,11 @@ akka.cluster.typed.receptionist {
# in case of abrupt termination.
pruning-interval = 3 s
# The periodic task to remove actor references that are hosted by removed nodes
# will only remove entries older than this duration. The reason for this
# is to avoid removing entries of nodes that haven't been visible as joining.
prune-removed-older-than = 60 s
# Shard the services over this many Distributed Data keys, with large amounts of different
# service keys storing all of them in the same Distributed Data entry would lead to large updates
# etc. instead the keys are sharded across this number of keys. This must be the same on all nodes

View file

@ -61,13 +61,17 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
.toByteArray
}
private def receptionistEntryToBinary(e: Entry): Array[Byte] =
ClusterMessages.ReceptionistEntry
private def receptionistEntryToBinary(e: Entry): Array[Byte] = {
val b = ClusterMessages.ReceptionistEntry
.newBuilder()
.setActorRef(resolver.toSerializationFormat(e.ref))
.setSystemUid(e.systemUid)
.build()
.toByteArray
if (e.createdTimestamp != 0L)
b.setCreatedTimestamp(e.createdTimestamp)
b.build().toByteArray
}
private def pubSubMessageFromBinary(bytes: Array[Byte]): TopicImpl.MessagePublished[_] = {
val parsed = ClusterMessages.PubSubMessagePublished.parseFrom(bytes)
@ -77,6 +81,7 @@ private[akka] final class AkkaClusterTypedSerializer(override val system: Extend
private def receptionistEntryFromBinary(bytes: Array[Byte]): Entry = {
val re = ClusterMessages.ReceptionistEntry.parseFrom(bytes)
Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)
val createdTimestamp = if (re.hasCreatedTimestamp) re.getCreatedTimestamp else 0L
Entry(resolver.resolveActorRef(re.getActorRef), re.getSystemUid)(createdTimestamp)
}
}

View file

@ -48,13 +48,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// values contain system uid to make it possible to discern actors at the same
// path in different incarnations of a cluster node
final case class Entry(ref: ActorRef[_], systemUid: Long) {
final case class Entry(ref: ActorRef[_], systemUid: Long)(val createdTimestamp: Long) {
def uniqueAddress(selfAddress: Address): UniqueAddress =
if (ref.path.address.hasLocalScope) UniqueAddress(selfAddress, systemUid)
else UniqueAddress(ref.path.address, systemUid)
override def toString: String =
s"${ref.path.toString}#${ref.path.uid} @ $systemUid"
}
private sealed trait InternalCommand extends Command
@ -262,56 +262,67 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
cluster.state.leader.contains(cluster.selfAddress)
}
def nodesRemoved(addresses: Set[UniqueAddress]): Unit = {
def nodesRemoved(addresses: Set[UniqueAddress], onlyRemoveOldEntries: Boolean): Unit = {
// ok to update from several nodes but more efficient to try to do it from one node
if (isLeader) {
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress.address))
val removals = {
state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries)) =>
val removedEntries = entries.filter(isOnRemovedNode)
if (removedEntries.isEmpty) acc // no change
else acc + (key -> removedEntries)
}
val now = System.currentTimeMillis()
// it possible that an entry is added before MemberJoined is visible and such entries should not be removed
def isOld(entry: Entry): Boolean = (now - entry.createdTimestamp) >= settings.pruneRemovedOlderThan.toMillis
val removals = {
state.registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries)) =>
val removedEntries =
entries.filter(entry => isOnRemovedNode(entry) && (!onlyRemoveOldEntries || isOld(entry)))
if (removedEntries.isEmpty) acc // no change
else acc + (key -> removedEntries)
}
}
if (removals.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debugN(
"ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
cluster.selfAddress,
addresses.mkString(","),
removals
.map {
case (key, entries) => key.asServiceKey.id -> entries.mkString("[", ", ", "]")
}
.mkString(","))
// shard changes over the ddata keys they belong to
val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
removalsPerDdataKey.foreach {
case (ddataKey, removalForKey) =>
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
if (removals.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debugN(
"ClusterReceptionist [{}] - Node(s) removed [{}], updating registry removing entries: [{}]",
cluster.selfAddress,
addresses.mkString(","),
removals
.map {
case (key, entries) => key.asServiceKey.id -> entries.mkString("[", ", ", "]")
}
}
.mkString(","))
// shard changes over the ddata keys they belong to
val removalsPerDdataKey = state.registry.entriesPerDdataKey(removals)
removalsPerDdataKey.foreach {
case (ddataKey, removalForKey) =>
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
}
}
}
}
def reachabilityChanged(keysForNode: Set[AbstractServiceKey], newState: State): Unit = {
keysForNode.foreach { changedKey =>
notifySubscribers(keysForNode, servicesWereAddedOrRemoved = false, newState)
}
def notifySubscribers(
changedKeys: Set[AbstractServiceKey],
servicesWereAddedOrRemoved: Boolean,
newState: State): Unit = {
changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
val subscribers = state.subscriptions.get(changedKey)
val subscribers = newState.subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = false)
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved)
subscribers.foreach(_ ! listing)
}
}
@ -320,7 +331,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
def onCommand(cmd: Command): Behavior[Command] = cmd match {
case ReceptionistMessages.Register(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) {
val entry = Entry(serviceInstance, setup.selfSystemUid)
val entry = Entry(serviceInstance, setup.selfSystemUid)(System.currentTimeMillis())
ctx.log
.debugN("ClusterReceptionist [{}] - Actor was registered: [{}] [{}]", cluster.selfAddress, key, entry)
// actor already watched after one service key registration
@ -343,7 +354,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
case ReceptionistMessages.Deregister(key, serviceInstance, maybeReplyTo) =>
if (serviceInstance.path.address.hasLocalScope) {
val entry = Entry(serviceInstance, setup.selfSystemUid)
val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
ctx.log.debugN(
"ClusterReceptionist [{}] - Unregister actor: [{}] [{}]",
cluster.selfAddress,
@ -400,7 +411,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
behavior(setup, state.removeSubscriber(subscriber))
case LocalServiceActorTerminated(serviceInstance) =>
val entry = Entry(serviceInstance, setup.selfSystemUid)
val entry = Entry(serviceInstance, setup.selfSystemUid)(0L)
// could be empty if there was a race between termination and unregistration
val keys = state.servicesPerActor.getOrElse(serviceInstance, Set.empty)
@ -439,17 +450,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
state.tombstones.mkString(", "))
}
notifySubscribers(changedKeys, servicesWereAddedOrRemoved = true, newState)
changedKeys.foreach { changedKey =>
val serviceKey = changedKey.asServiceKey
val subscribers = state.subscriptions.get(changedKey)
if (subscribers.nonEmpty) {
val (reachable, all) = newState.activeActorRefsFor(serviceKey, selfUniqueAddress)
val listing =
ReceptionistMessages.Listing(serviceKey, reachable, all, servicesWereAddedOrRemoved = true)
subscribers.foreach(_ ! listing)
}
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
// is re-introduced because of a concurrent update, in that case we need to re-remove it
val tombstonedButReAdded = newRegistry.actorRefsFor(serviceKey).filter(state.hasTombstone(serviceKey))
@ -463,7 +468,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry =>
tombstonedButReAdded
.foldLeft(ServiceRegistry(registry)) { (acc, ref) =>
acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid))
acc.removeBinding(serviceKey, Entry(ref, setup.selfSystemUid)(0L))
}
.toORMultiMap
}
@ -476,7 +481,23 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
case NodeAdded(uniqueAddress) =>
behavior(setup, state.copy(registry = state.registry.addNode(uniqueAddress)))
if (state.registry.nodes.contains(uniqueAddress)) {
Behaviors.same
} else {
val newState = state.copy(registry = state.registry.addNode(uniqueAddress))
val keysForNode = newState.registry.keysFor(uniqueAddress)
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services added [{}]",
cluster.selfAddress,
uniqueAddress)
notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
} else {
ctx.log.debug2("ClusterReceptionist [{}] - Node added [{}]", cluster.selfAddress, uniqueAddress)
}
behavior(setup, newState)
}
case NodeRemoved(uniqueAddress) =>
if (uniqueAddress == selfUniqueAddress) {
@ -484,17 +505,30 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// If self cluster node is shutting down our own entries should have been removed via
// watch-Terminated or will be removed by other nodes. This point is anyway too late.
Behaviors.stopped
} else {
} else if (state.registry.nodes.contains(uniqueAddress)) {
val keysForNode = state.registry.keysFor(uniqueAddress)
val newState = state.copy(registry = state.registry.removeNode(uniqueAddress))
if (keysForNode.nonEmpty) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Node with registered services removed [{}]",
cluster.selfAddress,
uniqueAddress)
notifySubscribers(keysForNode, servicesWereAddedOrRemoved = true, newState)
}
// Ok to update from several nodes but more efficient to try to do it from one node.
if (isLeader) {
ctx.log.debug2(
"ClusterReceptionist [{}] - Leader node observed removed node [{}]",
cluster.selfAddress,
uniqueAddress)
nodesRemoved(Set(uniqueAddress))
nodesRemoved(Set(uniqueAddress), onlyRemoveOldEntries = false)
}
behavior(setup, state.copy(registry = state.registry.removeNode(uniqueAddress)))
behavior(setup, newState)
} else {
Behaviors.same
}
case NodeUnreachable(uniqueAddress) =>
@ -528,14 +562,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
state.registry.allUniqueAddressesInState(setup.selfUniqueAddress)
val notInCluster = allAddressesInState.diff(state.registry.nodes)
if (notInCluster.isEmpty) Behaviors.same
else {
if (notInCluster.nonEmpty) {
if (ctx.log.isDebugEnabled)
ctx.log.debug2(
"ClusterReceptionist [{}] - Leader node cleanup tick, removed nodes: [{}]",
cluster.selfAddress,
notInCluster.mkString(","))
nodesRemoved(notInCluster)
nodesRemoved(notInCluster, onlyRemoveOldEntries = true)
}
}
Behaviors.same

View file

@ -40,6 +40,7 @@ private[akka] object ClusterReceptionistSettings {
ClusterReceptionistSettings(
writeConsistency,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
pruneRemovedOlderThan = config.getDuration("prune-removed-older-than", MILLISECONDS).millis,
config.getInt("distributed-key-count"),
replicatorSettings)
}
@ -52,5 +53,6 @@ private[akka] object ClusterReceptionistSettings {
private[akka] case class ClusterReceptionistSettings(
writeConsistency: WriteConsistency,
pruningInterval: FiniteDuration,
pruneRemovedOlderThan: FiniteDuration,
distributedKeyCount: Int,
replicatorSettings: ReplicatorSettings)

View file

@ -21,7 +21,7 @@ class AkkaClusterTypedSerializerSpec extends ScalaTestWithActorTestKit with AnyW
"AkkaClusterTypedSerializer" must {
Seq("ReceptionistEntry" -> ClusterReceptionist.Entry(ref, 666L)).foreach {
Seq("ReceptionistEntry" -> ClusterReceptionist.Entry(ref, 666L)(System.currentTimeMillis())).foreach {
case (scenario, item) =>
s"resolve serializer for $scenario" in {
val serializer = SerializationExtension(classicSystem)

View file

@ -4,6 +4,8 @@
package akka.cluster.typed.internal.receptionist
import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.Await
import scala.concurrent.duration._
@ -111,6 +113,33 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
}
}
"handle registrations before joining" in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
try {
val regProbe1 = TestProbe[Any]()(system1)
val regProbe2 = TestProbe[Any]()(system2)
val service = testKit1.spawn(pingPongBehavior)
testKit1.system.receptionist ! Register(PingKey, service, regProbe1.ref)
regProbe1.expectMessage(Registered(PingKey, service))
system2.receptionist ! Subscribe(PingKey, regProbe2.ref)
regProbe2.expectMessage(Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
val clusterNode1 = Cluster(system1)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
val clusterNode2 = Cluster(system2)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val PingKey.Listing(remoteServiceRefs) = regProbe2.expectMessageType[Listing](10.seconds)
remoteServiceRefs.head.path.address should ===(Cluster(system1).selfMember.address)
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"remove registrations when node dies" in {
testNodeRemoval(down = true)
}
@ -735,5 +764,68 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
}
// Fixme concurrent registration and unregistration
"notify subscribers when registering and joining simultaneously" in {
// failing test reproducer for issue #28792
// It's possible that the registry entry from the ddata update arrives before MemberJoined.
val config = ConfigFactory.parseString("""
# quick dissemination to increase the chance of the race condition
akka.cluster.typed.receptionist.distributed-data.write-consistency = all
akka.cluster.typed.receptionist.distributed-data.gossip-interval = 500ms
# run the RemoveTick cleanup often to exercise that scenario
akka.cluster.typed.receptionist.pruning-interval = 50ms
""").withFallback(ClusterReceptionistSpec.config)
val numberOfNodes = 6 // use 9 or more to stress it more
val testKits = Vector.fill(numberOfNodes)(ActorTestKit("ClusterReceptionistSpec", config))
try {
val probes = testKits.map(t => TestProbe[Any]()(t.system))
testKits.zip(probes).foreach { case (t, p) => t.system.receptionist ! Subscribe(PingKey, p.ref) }
val clusterNode1 = Cluster(testKits.head.system)
// join 3 first
(0 until 3).foreach { i =>
val t = testKits(i)
Cluster(t.system).manager ! Join(clusterNode1.selfMember.address)
val ref = t.spawn(pingPongBehavior)
t.system.receptionist ! Register(PingKey, ref)
}
// wait until all those are Up
(0 until 3).foreach { i =>
probes(i).awaitAssert(
Cluster(testKits(i).system).state.members.count(_.status == MemberStatus.Up) should ===(3),
10.seconds)
}
// then join the rest randomly to the first 3
// important to not join all to first to be able to reproduce the problem
testKits.drop(3).foreach { t =>
val i = ThreadLocalRandom.current().nextInt(3)
Cluster(t.system).manager ! Join(Cluster(testKits(i).system).selfMember.address)
val ref = t.spawn(pingPongBehavior)
Thread.sleep(100) // increase chance of the race condition
t.system.receptionist ! Register(PingKey, ref)
}
(0 until numberOfNodes).foreach { i =>
probes(i).awaitAssert(
Cluster(testKits(i).system).state.members.count(_.status == MemberStatus.Up) should ===(numberOfNodes),
30.seconds)
}
// eventually, all should be included in the Listing
(0 until numberOfNodes).foreach { i =>
probes(i).fishForMessage(10.seconds, s"$i") {
case PingKey.Listing(actors) if actors.size == numberOfNodes => FishingOutcomes.complete
case PingKey.Listing(_) => FishingOutcomes.continue
}
}
testKits.head.system.log.debug("All expected listings found.")
} finally {
// faster to terminate all at the same time
testKits.foreach(_.system.terminate())
testKits.foreach(_.shutdownTestKit())
}
}
}
}