Handle lost typed receptionist removals #24887

Keep track of removed actors and re-remove them when ORMultiMap conflict has reintroduced them
This commit is contained in:
Johan Andrén 2018-11-09 10:58:18 +01:00 committed by GitHub
parent ea80ce10fa
commit f66ee1cbe8
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 229 additions and 64 deletions

View file

@ -13,10 +13,11 @@ import akka.actor.typed.{ ActorRef, Behavior, Terminated }
import akka.annotation.InternalApi
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ddata.{ DistributedData, ORMultiMap, ORMultiMapKey, Replicator }
import akka.cluster.{ Cluster, ClusterEvent, UniqueAddress }
import akka.cluster.{ Cluster, ClusterEvent, MemberStatus, UniqueAddress }
import akka.remote.AddressUidExtension
import akka.util.TypedMultiMap
import scala.concurrent.duration._
import scala.language.existentials
/** INTERNAL API */
@ -46,6 +47,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
key: DDataKey,
value: ORMultiMap[ServiceKey[_], Entry]) extends InternalCommand
private case object RemoveTick extends InternalCommand
private case object PruneTombstonesTick extends InternalCommand
// captures setup/dependencies so we can avoid doing it over and over again
final class Setup(ctx: ActorContext[Command]) {
@ -53,44 +55,55 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
val settings = ClusterReceptionistSettings(ctx.system)
val replicator = DistributedData(untypedSystem).replicator
val selfSystemUid = AddressUidExtension(untypedSystem).longAddressUid
lazy val keepTombstonesFor = cluster.settings.PruneGossipTombstonesAfter match {
case f: FiniteDuration f
case _ throw new IllegalStateException("Cannot actually happen")
}
implicit val cluster = Cluster(untypedSystem)
def newTombstoneDeadline() = Deadline(keepTombstonesFor)
def selfUniqueAddress: UniqueAddress = cluster.selfUniqueAddress
}
override def behavior: Behavior[Command] = Behaviors.setup { ctx
override def behavior: Behavior[Command] =
Behaviors.setup { ctx
Behaviors.withTimers { timers
val setup = new Setup(ctx)
val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount)
val setup = new Setup(ctx)
val registry = ShardedServiceRegistry(setup.settings.distributedKeyCount)
// subscribe to changes from other nodes
val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.messageAdapter[Replicator.ReplicatorMessage] {
case changed: Replicator.Changed[_] @unchecked
ChangeFromReplicator(
changed.key.asInstanceOf[DDataKey],
changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]])
// subscribe to changes from other nodes
val replicatorMessageAdapter: ActorRef[Replicator.ReplicatorMessage] =
ctx.messageAdapter[Replicator.ReplicatorMessage] {
case changed: Replicator.Changed[_] @unchecked
ChangeFromReplicator(
changed.key.asInstanceOf[DDataKey],
changed.dataValue.asInstanceOf[ORMultiMap[ServiceKey[_], Entry]])
}
registry.allDdataKeys.foreach(key
setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped)
)
// remove entries when members are removed
val clusterEventMessageAdapter: ActorRef[MemberRemoved] =
ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) NodeRemoved(member.uniqueAddress) }
setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved])
// also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update,
// which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message
timers.startPeriodicTimer("remove-nodes", RemoveTick, setup.settings.pruningInterval)
// default tomstone keepalive is 24h (based on prune-gossip-tombstones-after) and keeping the actorrefs
// around isn't very costly so don't prune often
timers.startPeriodicTimer("prune-tombstones", PruneTombstonesTick, setup.keepTombstonesFor / 24)
behavior(
setup,
registry,
TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV]
)
}
registry.allDdataKeys.foreach(key
setup.replicator ! Replicator.Subscribe(key, replicatorMessageAdapter.toUntyped)
)
// remove entries when members are removed
val clusterEventMessageAdapter: ActorRef[MemberRemoved] =
ctx.messageAdapter[MemberRemoved] { case MemberRemoved(member, _) NodeRemoved(member.uniqueAddress) }
setup.cluster.subscribe(clusterEventMessageAdapter.toUntyped, ClusterEvent.InitialStateAsEvents, classOf[MemberRemoved])
// also periodic cleanup in case removal from ORMultiMap is skipped due to concurrent update,
// which is possible for OR CRDTs - done with an adapter to leverage the existing NodesRemoved message
ctx.system.scheduler.schedule(setup.settings.pruningInterval, setup.settings.pruningInterval,
ctx.self.toUntyped, RemoveTick)(ctx.system.executionContext)
behavior(
setup,
registry,
TypedMultiMap.empty[AbstractServiceKey, SubscriptionsKV]
)
}
}
/**
* @param registry The last seen state from the replicator - only updated when we get an update from th replicator
@ -125,7 +138,13 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
})
def notifySubscribersFor(key: AbstractServiceKey, state: ServiceRegistry): Unit = {
val msg = ReceptionistMessages.Listing(key.asServiceKey, state.actorRefsFor(key))
// filter tombstoned refs to avoid an extra update
// to subscribers in the case of lost removals (because of how ORMultiMap works)
val refsForKey = state.actorRefsFor(key)
val refsWithoutTombstoned =
if (registry.tombstones.isEmpty) refsForKey
else refsForKey.filterNot(registry.hasTombstone)
val msg = ReceptionistMessages.Listing(key.asServiceKey, refsWithoutTombstoned)
subscriptions.get(key).foreach(_ ! msg)
}
@ -133,6 +152,7 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
// ok to update from several nodes but more efficient to try to do it from one node
if (cluster.state.leader.contains(cluster.selfAddress) && addresses.nonEmpty) {
def isOnRemovedNode(entry: Entry): Boolean = addresses(entry.uniqueAddress(setup.selfUniqueAddress))
val removals = {
registry.allServices.foldLeft(Map.empty[AbstractServiceKey, Set[Entry]]) {
case (acc, (key, entries))
@ -160,10 +180,11 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
ServiceRegistry(registry).removeAll(removalForKey).toORMultiMap
}
}
}
Behaviors.same
} else Behaviors.same
}
}
Behaviors.same
}
def onCommand(cmd: Command): Behavior[Command] = cmd match {
@ -206,7 +227,9 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).removeBinding(key, entry).toORMultiMap
}
Behaviors.same
// tombstone removals so they are not re-added by merging with other concurrent
// registrations for the same key
next(newState = registry.addTombstone(serviceInstance, setup.newTombstoneDeadline()))
case ChangeFromReplicator(ddataKey, value)
// every change will come back this way - this is where the local notifications happens
@ -216,13 +239,30 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
if (changedKeys.nonEmpty) {
if (ctx.log.isDebugEnabled) {
ctx.log.debug(
"Change from replicator: [{}], changes: [{}]",
"Change from replicator: [{}], changes: [{}], tombstones [{}]",
newState.entries.entries,
changedKeys.map(key
key.asServiceKey.id -> newState.entriesFor(key).mkString("[", ", ", "]")
).mkString(", "))
).mkString(", "),
registry.tombstones.mkString(", ")
)
}
changedKeys.foreach(notifySubscribersFor(_, newState))
changedKeys.foreach { changedKey
notifySubscribersFor(changedKey, newState)
// because of how ORMultiMap/ORset works, we could have a case where an actor we removed
// is re-introduced because of a concurrent update, in that case we need to re-remove it
val serviceKey = changedKey.asServiceKey
val tombstonedButReAdded =
newRegistry.actorRefsFor(serviceKey).filter(newRegistry.hasTombstone)
tombstonedButReAdded.foreach { actorRef
ctx.log.debug("Saw actorref that was tomstoned {}, re-removing.", actorRef)
replicator ! Replicator.Update(ddataKey, EmptyORMultiMap, settings.writeConsistency) { registry
ServiceRegistry(registry).removeBinding(serviceKey, Entry(actorRef, setup.selfSystemUid)).toORMultiMap
}
}
}
next(newRegistry)
} else {
Behaviors.same
@ -250,6 +290,14 @@ private[typed] object ClusterReceptionist extends ReceptionistBehaviorProvider {
}
} else
Behavior.same
case PruneTombstonesTick
val prunedRegistry = registry.pruneTombstones()
if (prunedRegistry eq registry) Behaviors.same
else {
ctx.log.debug(s"Pruning tombstones")
next(prunedRegistry)
}
}
Behaviors.receive[Command] { (ctx, msg)

View file

@ -11,6 +11,9 @@ import akka.annotation.InternalApi
import akka.cluster.{ Cluster, UniqueAddress }
import akka.cluster.ddata.{ ORMultiMap, ORMultiMapKey }
import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey, EmptyORMultiMap, Entry }
import akka.util.Timeout
import scala.concurrent.duration.Deadline
/**
* INTERNAL API
@ -21,7 +24,7 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey,
val key = ORMultiMapKey[ServiceKey[_], Entry](s"ReceptionistKey_$n")
key -> new ServiceRegistry(EmptyORMultiMap)
}.toMap
new ShardedServiceRegistry(emptyRegistries)
new ShardedServiceRegistry(emptyRegistries, Map.empty)
}
}
@ -30,9 +33,14 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey,
* Two level structure for keeping service registry to be able to shard entries over multiple ddata keys (to not
* get too large ddata messages)
*
* @param tombstones Local actors that were stopped and should not be re-added to the available set of actors
* for a key. Since the only way to unregister is to stop, we don't need to keep track of
* the service key
* INTERNAL API
*/
@InternalApi private[akka] final class ShardedServiceRegistry(serviceRegistries: Map[DDataKey, ServiceRegistry]) {
@InternalApi private[akka] final case class ShardedServiceRegistry(
serviceRegistries: Map[DDataKey, ServiceRegistry],
tombstones: Map[ActorRef[_], Deadline]) {
private val keys = serviceRegistries.keySet.toArray
@ -54,7 +62,7 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey,
}
def withServiceRegistry(dDataKey: DDataKey, registry: ServiceRegistry): ShardedServiceRegistry =
new ShardedServiceRegistry(serviceRegistries + (dDataKey -> registry))
copy(serviceRegistries + (dDataKey -> registry), tombstones)
def allUniqueAddressesInState(selfUniqueAddress: UniqueAddress): Set[UniqueAddress] =
allEntries.collect {
@ -76,6 +84,18 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey,
acc + (ddataKey -> updated)
}
def addTombstone(actorRef: ActorRef[_], deadline: Deadline): ShardedServiceRegistry =
copy(tombstones = tombstones + (actorRef -> deadline))
def hasTombstone(actorRef: ActorRef[_]): Boolean =
tombstones.contains(actorRef)
def pruneTombstones(): ShardedServiceRegistry = {
copy(tombstones = tombstones.filter {
case (ref, deadline) deadline.hasTimeLeft
})
}
}
/**
@ -91,10 +111,10 @@ import akka.cluster.typed.internal.receptionist.ClusterReceptionist.{ DDataKey,
entries.getOrElse(key.asServiceKey, Set.empty[Entry])
def addBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(entries.addBinding(key, value))
copy(entries = entries.addBinding(key, value))
def removeBinding[T](key: ServiceKey[T], value: Entry)(implicit cluster: Cluster): ServiceRegistry =
ServiceRegistry(entries.removeBinding(key, value))
copy(entries = entries.removeBinding(key, value))
def removeAll(entries: Map[AbstractServiceKey, Set[Entry]])(implicit cluster: Cluster): ServiceRegistry = {
entries.foldLeft(this) {

View file

@ -20,7 +20,6 @@ public class ClusterApiTest extends JUnitSuite {
public void joinLeaveAndObserve() throws Exception {
Config config = ConfigFactory.parseString(
"akka.actor.provider = cluster \n" +
"akka.remote.artery.enabled = true \n"+
"akka.remote.netty.tcp.port = 0 \n"+
"akka.remote.artery.canonical.port = 0 \n"+
"akka.remote.artery.canonical.hostname = 127.0.0.1 \n" +

View file

@ -18,7 +18,6 @@ object ClusterApiSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1

View file

@ -39,7 +39,6 @@ object ClusterSingletonApiSpec {
}
}
akka.remote.netty.tcp.port = 0
akka.remote.artery.enabled = true
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster.jmx.multi-mbeans-in-same-jvm = on

View file

@ -16,7 +16,6 @@ object ClusterSingletonPersistenceSpec {
val config = ConfigFactory.parseString(
"""
akka.actor.provider = cluster
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1

View file

@ -59,8 +59,9 @@ object RemoteContextAskSpec {
"akka.cluster.typed.RemoteContextAskSpec$$Pong$$" = test
}
}
remote.netty.tcp.port = 0
remote.netty.tcp.host = 127.0.0.1
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0

View file

@ -24,8 +24,8 @@ object RemoteDeployNotAllowedSpec {
warn-about-java-serializer-usage = off
serialize-creators = off
}
remote.netty.tcp.port = 0
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0

View file

@ -47,8 +47,8 @@ object RemoteMessageSpec {
"akka.cluster.typed.RemoteMessageSpec$$Ping" = test
}
}
remote.netty.tcp.port = 0
remote.artery {
enabled = on
canonical {
hostname = 127.0.0.1
port = 0

View file

@ -10,19 +10,18 @@ import akka.actor.{ ExtendedActorSystem, RootActorPath }
import akka.actor.typed.receptionist.{ Receptionist, ServiceKey }
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter._
import akka.actor.typed.{ ActorRef, ActorRefResolver }
import akka.actor.typed.{ ActorRef, ActorRefResolver, scaladsl }
import akka.cluster.MemberStatus
import akka.cluster.typed.{ Cluster, Join }
import akka.serialization.SerializerWithStringManifest
import akka.actor.testkit.typed.FishingOutcome
import akka.actor.testkit.typed.scaladsl.TestProbe
import akka.actor.testkit.typed.scaladsl.{ ActorTestKit, FishingOutcomes, TestProbe }
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.actor.testkit.typed.scaladsl.ActorTestKit
object ClusterReceptionistSpec {
val config = ConfigFactory.parseString(
s"""
@ -40,7 +39,8 @@ object ClusterReceptionistSpec {
"akka.cluster.typed.internal.receptionist.ClusterReceptionistSpec$$Perish$$" = test
}
}
akka.remote.artery.enabled = true
akka.remote.netty.tcp.port = 0
akka.remote.netty.tcp.host = 127.0.0.1
akka.remote.artery.canonical.port = 0
akka.remote.artery.canonical.hostname = 127.0.0.1
akka.cluster {
@ -165,7 +165,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
@ -207,7 +207,7 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
regProbe1.expectMessage(10.seconds, Listing(PingKey, Set.empty[ActorRef[PingProtocol]]))
} finally {
testKit1.shutdownTestKit()
if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
@ -292,7 +292,83 @@ class ClusterReceptionistSpec extends WordSpec with Matchers {
}
} finally {
testKit1.shutdownTestKit()
if (!system1.whenTerminated.isCompleted) testKit2.shutdownTestKit()
testKit2.shutdownTestKit()
}
}
"not lose removals on concurrent updates to same key" in {
val config = ConfigFactory.parseString(
"""
# disable delta propagation so we can have repeatable concurrent writes
# without delta reaching between nodes already
akka.cluster.distributed-data.delta-crdt.enabled=false
""").withFallback(ClusterReceptionistSpec.config)
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", config)
val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config)
val system2 = testKit2.system
val TheKey = ServiceKey[AnyRef]("whatever")
try {
val clusterNode1 = Cluster(system1)
val clusterNode2 = Cluster(system2)
clusterNode1.manager ! Join(clusterNode1.selfMember.address)
clusterNode2.manager ! Join(clusterNode1.selfMember.address)
val regProbe1 = TestProbe[AnyRef]()(system1)
val regProbe2 = TestProbe[AnyRef]()(system2)
regProbe1.awaitAssert(clusterNode1.state.members.count(_.status == MemberStatus.Up) == 2)
// one actor on each node up front
val actor1 = testKit1.spawn(Behaviors.receive[AnyRef] {
case (ctx, "stop")
ctx.log.info("Stopping")
Behaviors.stopped
case _ Behaviors.same
}, "actor1")
val actor2 = testKit2.spawn(Behaviors.empty[AnyRef], "actor2")
system1.receptionist ! Register(TheKey, actor1)
system1.receptionist ! Subscribe(TheKey, regProbe1.ref)
regProbe1.awaitAssert(
regProbe1.expectMessage(Listing(TheKey, Set(actor1))),
5.seconds
)
system2.receptionist ! Subscribe(TheKey, regProbe2.ref)
regProbe2.fishForMessage(10.seconds) {
case TheKey.Listing(actors) if actors.nonEmpty
println(actors)
FishingOutcomes.complete
case _ FishingOutcomes.continue
}
system1.log.info("Saw actor on both nodes")
// TheKey -> Set(actor1) seen by both nodes, now,
// remove on node1 and add on node2 (hopefully) concurrently
system2.receptionist ! Register(TheKey, actor2, regProbe2.ref)
actor1 ! "stop"
regProbe2.expectMessage(Registered(TheKey, actor2))
system2.log.info("actor2 registered")
// we should now, eventually, see the removal on both nodes
regProbe1.fishForMessage(10.seconds) {
case TheKey.Listing(actors) if actors.size == 1
FishingOutcomes.complete
case _
FishingOutcomes.continue
}
regProbe2.fishForMessage(10.seconds) {
case TheKey.Listing(actors) if actors.size == 1
FishingOutcomes.complete
case _
FishingOutcomes.continue
}
} finally {
testKit1.shutdownTestKit()
testKit2.shutdownTestKit()
}
}