diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala index 2227f96d17..e1932f03e1 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingPersistenceSpec.scala @@ -39,6 +39,7 @@ object ClusterShardingPersistenceSpec { #akka.persistence.typed.log-stashing = on akka.actor.provider = cluster + akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 diff --git a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala index 71f6e858fc..fe6553f4c3 100644 --- a/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala +++ b/akka-cluster-sharding-typed/src/test/scala/akka/cluster/sharding/typed/scaladsl/ClusterShardingSpec.scala @@ -39,8 +39,6 @@ object ClusterShardingSpec { akka.actor.provider = cluster // akka.loglevel = debug - - akka.remote.artery.enabled = true akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala index 99dcfb2341..1710c46422 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionist.scala @@ -13,10 +13,11 @@ import akka.actor.typed.{ ActorRef, Behavior, Terminated } import akka.annotation.InternalApi import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ddata.{ DistributedData, ORMultiMap, ORMultiMapKey, Replicator } -import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress } +import akka.cluster.{ Cluster, ClusterEvent, MemberStatus, UniqueAddress } import akka.remote.AddressUidExtension import akka.util.TypedMultiMap +import scala.concurrent.duration._ import scala.language.existentials /** INTERNAL API */ @@ -46,6 +47,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { key: DDataKey, value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand private case object RemoveTick extends InternalCommand + private case object PruneTombstonesTick extends InternalCommand // captures setup/dependencies so we can avoid doing it over and over again final class Setup(ctx: ActorContext[Command]) { @@ -53,44 +55,55 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { val settings = ClusterReceptionistSettings(ctx.system) val replicator = DistributedData(untypedSystem).replicator val selfSystemUid = AddressUidExtension(untypedSystem).longAddressUid + lazy val keepTombstonesFor = cluster.settings.PruneGossipTombstonesAfter match { + case f: FiniteDuration ⇒ f + case _ ⇒ throw new IllegalStateException("Cannot actually happen") + } implicit val cluster = Cluster(untypedSystem) + def newTombstoneDeadline() = Deadline(keepTombstonesFor) def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress } - override def behavior: Behavior[Command] = Behaviors.setup { ctx ⇒ + override def behavior: Behavior[Command] = + Behaviors.setup { ctx ⇒ + Behaviors.withTimers { timers ⇒ - val setup = new Setup(ctx) - val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount) + val setup = new Setup(ctx) + val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount) - // subscribe to changes from other nodes - val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] = - ctx.messageAdapter[Replicator.ReplicatorMessage] { - case changed: Replicator.Changed[_] @unchecked ⇒ - ChangeFromReplicator( - changed.key.asInstanceOf[DDataKey], - changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]]) + // subscribe to changes from other nodes + val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] = + ctx.messageAdapter[Replicator.ReplicatorMessage] { + case changed: Replicator.Changed[_] @unchecked ⇒ + ChangeFromReplicator( + changed.key.asInstanceOf[DDataKey], + changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]]) + } + + registry.allDdataKeys.foreach(key ⇒ + setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped) + ) + + // remove entries when members are removed + val clusterEventMessageAdapter: ActorRef[MemberRemoved] = + ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) ⇒ NodeRemoved(member.uniqueAddress) } + setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) + + // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, + // which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message + timers.startPeriodicTimer("remove-nodes", RemoveTick, setup.settings.pruningInterval) + + // default tomstone keepalive is 24h (based on prune-gossip-tombstones-after) and keeping the actorrefs + // around isn't very costly so don't prune often + timers.startPeriodicTimer("prune-tombstones", PruneTombstonesTick, setup.keepTombstonesFor / 24) + + behavior( + setup, + registry, + TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] + ) } - - registry.allDdataKeys.foreach(key ⇒ - setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped) - ) - - // remove entries when members are removed - val clusterEventMessageAdapter: ActorRef[MemberRemoved] = - ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) ⇒ NodeRemoved(member.uniqueAddress) } - setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved]) - - // also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update, - // which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message - ctx.system.scheduler.schedule(setup.settings.pruningInterval, setup.settings.pruningInterval, - ctx.self.toUntyped, RemoveTick)(ctx.system.executionContext) - - behavior( - setup, - registry, - TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV] - ) - } + } /** * @param registry The last seen state from the replicator - only updated when we get an update from th replicator @@ -125,7 +138,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { }) def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = { - val msg = ReceptionistMessages.Listing(key.asServiceKey, state.actorRefsFor(key)) + // filter tombstoned refs to avoid an extra update + // to subscribers in the case of lost removals (because of how ORMultiMap works) + val refsForKey = state.actorRefsFor(key) + val refsWithoutTombstoned = + if (registry.tombstones.isEmpty) refsForKey + else refsForKey.filterNot(registry.hasTombstone) + val msg = ReceptionistMessages.Listing(key.asServiceKey, refsWithoutTombstoned) subscriptions.get(key).foreach(_ ! msg) } @@ -133,6 +152,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { // ok to update from several nodes but more efficient to try to do it from one node if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) { def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress)) + val removals = { registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) { case (acc, (key, entries)) ⇒ @@ -160,10 +180,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap } } - } - Behaviors.same - } else Behaviors.same + } + + } + Behaviors.same } def onCommand(cmd: Command): Behavior[Command] = cmd match { @@ -206,7 +227,9 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap } - Behaviors.same + // tombstone removals so they are not re-added by merging with other concurrent + // registrations for the same key + next(newState = registry.addTombstone(serviceInstance, setup.newTombstoneDeadline())) case ChangeFromReplicator(ddataKey, value) ⇒ // every change will come back this way - this is where the local notifications happens @@ -216,13 +239,30 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { if (changedKeys.nonEmpty) { if (ctx.log.isDebugEnabled) { ctx.log.debug( - "Change from replicator: [{}], changes: [{}]", + "Change from replicator: [{}], changes: [{}], tombstones [{}]", newState.entries.entries, changedKeys.map(key ⇒ key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]") - ).mkString(", ")) + ).mkString(", "), + registry.tombstones.mkString(", ") + ) } - changedKeys.foreach(notifySubscribersFor(_, newState)) + changedKeys.foreach { changedKey ⇒ + notifySubscribersFor(changedKey, newState) + + // because of how ORMultiMap/ORset works, we could have a case where an actor we removed + // is re-introduced because of a concurrent update, in that case we need to re-remove it + val serviceKey = changedKey.asServiceKey + val tombstonedButReAdded = + newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone) + tombstonedButReAdded.foreach { actorRef ⇒ + ctx.log.debug("Saw actorref that was tomstoned {}, re-removing.", actorRef) + replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry ⇒ + ServiceRegistry(registry).removeBinding(serviceKey, Entry(actorRef, setup.selfSystemUid)).toORMultiMap + } + } + } + next(newRegistry) } else { Behaviors.same @@ -250,6 +290,14 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider { } } else Behavior.same + + case PruneTombstonesTick ⇒ + val prunedRegistry = registry.pruneTombstones() + if (prunedRegistry eq registry) Behaviors.same + else { + ctx.log.debug(s"Pruning tombstones") + next(prunedRegistry) + } } Behaviors.receive[Command] { (ctx, msg) ⇒ diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala index 420bae83a6..9681268887 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/Registry.scala @@ -11,6 +11,9 @@ import akka.annotation.InternalApi import akka.cluster.{ Cluster, UniqueAddress } import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey } import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry } +import akka.util.Timeout + +import scala.concurrent.duration.Deadline /** * INTERNAL API @@ -21,7 +24,7 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n") key -> new ServiceRegistry(EmptyORMultiMap) }.toMap - new ShardedServiceRegistry(emptyRegistries) + new ShardedServiceRegistry(emptyRegistries, Map.empty) } } @@ -30,9 +33,14 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, * Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not * get too large ddata messages) * + * @param tombstones Local actors that were stopped and should not be re-added to the available set of actors + * for a key. Since the only way to unregister is to stop, we don't need to keep track of + * the service key * INTERNAL API */ -@InternalApi private[akka] final class ShardedServiceRegistry(serviceRegistries: Map[DDataKey, ServiceRegistry]) { +@InternalApi private[akka] final case class ShardedServiceRegistry( + serviceRegistries: Map[DDataKey, ServiceRegistry], + tombstones: Map[ActorRef[_], Deadline]) { private val keys = serviceRegistries.keySet.toArray @@ -54,7 +62,7 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, } def withServiceRegistry(dDataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry = - new ShardedServiceRegistry(serviceRegistries + (dDataKey -> registry)) + copy(serviceRegistries + (dDataKey -> registry), tombstones) def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] = allEntries.collect { @@ -76,6 +84,18 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, acc + (ddataKey -> updated) } + def addTombstone(actorRef: ActorRef[_], deadline: Deadline): ShardedServiceRegistry = + copy(tombstones = tombstones + (actorRef -> deadline)) + + def hasTombstone(actorRef: ActorRef[_]): Boolean = + tombstones.contains(actorRef) + + def pruneTombstones(): ShardedServiceRegistry = { + copy(tombstones = tombstones.filter { + case (ref, deadline) ⇒ deadline.hasTimeLeft + }) + } + } /** @@ -91,10 +111,10 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, entries.getOrElse(key.asServiceKey, Set.empty[Entry]) def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = - ServiceRegistry(entries.addBinding(key, value)) + copy(entries = entries.addBinding(key, value)) def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry = - ServiceRegistry(entries.removeBinding(key, value)) + copy(entries = entries.removeBinding(key, value)) def removeAll(entries: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = { entries.foldLeft(this) { diff --git a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java index 3fbbaac428..e283c06855 100644 --- a/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java +++ b/akka-cluster-typed/src/test/java/akka/cluster/typed/ClusterApiTest.java @@ -20,7 +20,6 @@ public class ClusterApiTest extends JUnitSuite { public void joinLeaveAndObserve() throws Exception { Config config = ConfigFactory.parseString( "akka.actor.provider = cluster \n" + - "akka.remote.artery.enabled = true \n"+ "akka.remote.netty.tcp.port = 0 \n"+ "akka.remote.artery.canonical.port = 0 \n"+ "akka.remote.artery.canonical.hostname = 127.0.0.1 \n" + diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala index f5dd6f5307..ba21c8ec29 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterApiSpec.scala @@ -18,7 +18,6 @@ object ClusterApiSpec { val config = ConfigFactory.parseString( """ akka.actor.provider = cluster - akka.remote.artery.enabled = true akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala index 741d07661e..74509956da 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonApiSpec.scala @@ -39,7 +39,6 @@ object ClusterSingletonApiSpec { } } akka.remote.netty.tcp.port = 0 - akka.remote.artery.enabled = true akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 akka.cluster.jmx.multi-mbeans-in-same-jvm = on diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala index 27079dc783..0e7cecb1ae 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/ClusterSingletonPersistenceSpec.scala @@ -16,7 +16,6 @@ object ClusterSingletonPersistenceSpec { val config = ConfigFactory.parseString( """ akka.actor.provider = cluster - akka.remote.artery.enabled = true akka.remote.netty.tcp.port = 0 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala index a331f82b80..ef2c06a2e9 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteContextAskSpec.scala @@ -59,8 +59,9 @@ object RemoteContextAskSpec { "akka.cluster.typed.RemoteContextAskSpec$$Pong$$" = test } } + remote.netty.tcp.port = 0 + remote.netty.tcp.host = 127.0.0.1 remote.artery { - enabled = on canonical { hostname = 127.0.0.1 port = 0 diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteDeployNotAllowedSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteDeployNotAllowedSpec.scala index 933024b1bc..07a5237985 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteDeployNotAllowedSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteDeployNotAllowedSpec.scala @@ -24,8 +24,8 @@ object RemoteDeployNotAllowedSpec { warn-about-java-serializer-usage = off serialize-creators = off } + remote.netty.tcp.port = 0 remote.artery { - enabled = on canonical { hostname = 127.0.0.1 port = 0 diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteMessageSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteMessageSpec.scala index cc35431c51..b8f907c24a 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteMessageSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/RemoteMessageSpec.scala @@ -47,8 +47,8 @@ object RemoteMessageSpec { "akka.cluster.typed.RemoteMessageSpec$$Ping" = test } } + remote.netty.tcp.port = 0 remote.artery { - enabled = on canonical { hostname = 127.0.0.1 port = 0 diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 5f5ed69324..10efddfa11 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -10,19 +10,18 @@ import akka.actor.{ ExtendedActorSystem, RootActorPath } import akka.actor.typed.receptionist.{ Receptionist, ServiceKey } import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter._ -import akka.actor.typed.{ ActorRef, ActorRefResolver } +import akka.actor.typed.{ ActorRef, ActorRefResolver, scaladsl } import akka.cluster.MemberStatus import akka.cluster.typed.{ Cluster, Join } import akka.serialization.SerializerWithStringManifest import akka.actor.testkit.typed.FishingOutcome -import akka.actor.testkit.typed.scaladsl.TestProbe +import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, FishingOutcomes, TestProbe } import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } + import scala.concurrent.Await import scala.concurrent.duration._ -import akka.actor.testkit.typed.scaladsl.ActorTestKit - object ClusterReceptionistSpec { val config = ConfigFactory.parseString( s""" @@ -40,7 +39,8 @@ object ClusterReceptionistSpec { "akka.cluster.typed.internal.receptionist.ClusterReceptionistSpec$$Perish$$" = test } } - akka.remote.artery.enabled = true + akka.remote.netty.tcp.port = 0 + akka.remote.netty.tcp.host = 127.0.0.1 akka.remote.artery.canonical.port = 0 akka.remote.artery.canonical.hostname = 127.0.0.1 akka.cluster { @@ -165,7 +165,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) } finally { testKit1.shutdownTestKit() - if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit() + testKit2.shutdownTestKit() } } @@ -207,7 +207,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]])) } finally { testKit1.shutdownTestKit() - if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit() + testKit2.shutdownTestKit() } } @@ -292,7 +292,83 @@ class ClusterReceptionistSpec extends WordSpec with Matchers { } } finally { testKit1.shutdownTestKit() - if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit() + testKit2.shutdownTestKit() + } + } + + "not lose removals on concurrent updates to same key" in { + val config = ConfigFactory.parseString( + """ + # disable delta propagation so we can have repeatable concurrent writes + # without delta reaching between nodes already + akka.cluster.distributed-data.delta-crdt.enabled=false + """).withFallback(ClusterReceptionistSpec.config) + val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", config) + val system1 = testKit1.system + val testKit2 = ActorTestKit(system1.name, system1.settings.config) + val system2 = testKit2.system + + val TheKey = ServiceKey[AnyRef]("whatever") + try { + val clusterNode1 = Cluster(system1) + val clusterNode2 = Cluster(system2) + clusterNode1.manager ! Join(clusterNode1.selfMember.address) + clusterNode2.manager ! Join(clusterNode1.selfMember.address) + + val regProbe1 = TestProbe[AnyRef]()(system1) + val regProbe2 = TestProbe[AnyRef]()(system2) + + regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2) + + // one actor on each node up front + val actor1 = testKit1.spawn(Behaviors.receive[AnyRef] { + case (ctx, "stop") ⇒ + ctx.log.info("Stopping") + Behaviors.stopped + case _ ⇒ Behaviors.same + }, "actor1") + val actor2 = testKit2.spawn(Behaviors.empty[AnyRef], "actor2") + + system1.receptionist ! Register(TheKey, actor1) + system1.receptionist ! Subscribe(TheKey, regProbe1.ref) + regProbe1.awaitAssert( + regProbe1.expectMessage(Listing(TheKey, Set(actor1))), + 5.seconds + ) + + system2.receptionist ! Subscribe(TheKey, regProbe2.ref) + regProbe2.fishForMessage(10.seconds) { + case TheKey.Listing(actors) if actors.nonEmpty ⇒ + println(actors) + FishingOutcomes.complete + case _ ⇒ FishingOutcomes.continue + } + system1.log.info("Saw actor on both nodes") + + // TheKey -> Set(actor1) seen by both nodes, now, + // remove on node1 and add on node2 (hopefully) concurrently + system2.receptionist ! Register(TheKey, actor2, regProbe2.ref) + actor1 ! "stop" + regProbe2.expectMessage(Registered(TheKey, actor2)) + system2.log.info("actor2 registered") + + // we should now, eventually, see the removal on both nodes + regProbe1.fishForMessage(10.seconds) { + case TheKey.Listing(actors) if actors.size == 1 ⇒ + FishingOutcomes.complete + case _ ⇒ + FishingOutcomes.continue + } + regProbe2.fishForMessage(10.seconds) { + case TheKey.Listing(actors) if actors.size == 1 ⇒ + FishingOutcomes.complete + case _ ⇒ + FishingOutcomes.continue + } + + } finally { + testKit1.shutdownTestKit() + testKit2.shutdownTestKit() } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala index 891df639dd..60b3aa9d9f 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterSettings.scala @@ -105,6 +105,9 @@ final class ClusterSettings(val config: Config, val systemName: String) { } } + /** + * Is in fact always a `FiniteDuration` but needs to stay `Duration` for binary compatibility + */ val PruneGossipTombstonesAfter: Duration = { val key = "prune-gossip-tombstones-after" cc.getMillisDuration(key) requiring (_ >= Duration.Zero, key + " >= 0s") diff --git a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala index ae7a8374c4..b7c69b262d 100644 --- a/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala +++ b/akka-distributed-data/src/main/scala/akka/cluster/ddata/ORMultiMap.scala @@ -58,6 +58,8 @@ object ORMultiMap { * [[ORMap]] with an [[ORSet]] for the map's value. * * This class is immutable, i.e. "modifying" methods return a new instance. + * + * Note that on concurrent adds and removals for the same key (on the same set), removals can be lost. */ @SerialVersionUID(1L) final class ORMultiMap[A, B] private[akka] ( diff --git a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala index 2b236a28a2..f505410ff5 100644 --- a/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala +++ b/akka-distributed-data/src/test/scala/akka/cluster/ddata/ORMultiMapSpec.scala @@ -11,8 +11,8 @@ import org.scalatest.{ Matchers, WordSpec } class ORMultiMapSpec extends WordSpec with Matchers { - val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1) - val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2) + val node1 = UniqueAddress(Address("akka.tcp", "Sys", "localhost", 2551), 1L) + val node2 = UniqueAddress(node1.address.copy(port = Some(2552)), 2L) "A ORMultiMap" must { @@ -34,6 +34,26 @@ class ORMultiMapSpec extends WordSpec with Matchers { m.entries should be(Map("a" → Set("B"))) } + "not handle concurrent updates to the same set" in { + val m = ORMultiMap().addBinding(node1, "a", "A") + + val m1 = m.removeBinding(node1, "a", "A") + + val m2 = m.addBinding(node2, "a", "B") + + val merged1 = m1.merge(m2) + val merged2 = m2.merge(m1) + + // more to document that the concurrent removal from the set may be lost + // than asserting anything + merged1.entries should be(Map( + "a" -> Set("A", "B") + )) + merged2.entries should be(Map( + "a" -> Set("A", "B") + )) + } + "be able to have its entries correctly merged with another ORMultiMap with other entries" in { val m1 = ORMultiMap().addBinding(node1, "a", "A").addBinding(node1, "b", "B") val m2 = ORMultiMap().addBinding(node2, "c", "C")