Never use durable store for the cluster receptionist replicator #29231 (#29237)

This commit is contained in:
Johan Andrén 2020-06-15 11:36:20 +02:00 committed by GitHub
parent 4f2e82e5f6
commit 17f7f1108a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 52 additions and 21 deletions

View file

@ -6,11 +6,10 @@ package akka.cluster.typed.internal.receptionist
import scala.concurrent.duration._ import scala.concurrent.duration._
import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS } import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS }
import com.typesafe.config.Config import com.typesafe.config.Config
import akka.actor.typed.ActorSystem import akka.actor.typed.ActorSystem
import akka.annotation.InternalApi import akka.annotation.InternalApi
import akka.cluster.ddata.Key.KeyId
import akka.cluster.ddata.Replicator import akka.cluster.ddata.Replicator
import akka.cluster.ddata.Replicator.WriteConsistency import akka.cluster.ddata.Replicator.WriteConsistency
import akka.cluster.ddata.ReplicatorSettings import akka.cluster.ddata.ReplicatorSettings
@ -38,12 +37,16 @@ private[akka] object ClusterReceptionistSettings {
val replicatorSettings = ReplicatorSettings(config.getConfig("distributed-data")) val replicatorSettings = ReplicatorSettings(config.getConfig("distributed-data"))
// Having durable store of entries does not make sense for receptionist, as registered
// services does not survive a full cluster stop, make sure that it is disabled
val replicatorSettingsWithoutDurableStore = replicatorSettings.withDurableKeys(Set.empty[KeyId])
ClusterReceptionistSettings( ClusterReceptionistSettings(
writeConsistency, writeConsistency,
pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis, pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis,
pruneRemovedOlderThan = config.getDuration("prune-removed-older-than", MILLISECONDS).millis, pruneRemovedOlderThan = config.getDuration("prune-removed-older-than", MILLISECONDS).millis,
config.getInt("distributed-key-count"), config.getInt("distributed-key-count"),
replicatorSettings) replicatorSettingsWithoutDurableStore)
} }
} }

View file

@ -8,11 +8,9 @@ import java.util.concurrent.ThreadLocalRandom
import scala.concurrent.Await import scala.concurrent.Await
import scala.concurrent.duration._ import scala.concurrent.duration._
import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigFactory
import org.scalatest.matchers.should.Matchers import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec import org.scalatest.wordspec.AnyWordSpec
import akka.actor.RootActorPath import akka.actor.RootActorPath
import akka.actor.testkit.typed.FishingOutcome import akka.actor.testkit.typed.FishingOutcome
import akka.actor.testkit.typed.scaladsl.ActorTestKit import akka.actor.testkit.typed.scaladsl.ActorTestKit
@ -30,6 +28,8 @@ import akka.cluster.typed.Join
import akka.cluster.typed.JoinSeedNodes import akka.cluster.typed.JoinSeedNodes
import akka.cluster.typed.Leave import akka.cluster.typed.Leave
import akka.serialization.jackson.CborSerializable import akka.serialization.jackson.CborSerializable
import akka.testkit.LongRunningTest
import org.scalatest.concurrent.ScalaFutures
object ClusterReceptionistSpec { object ClusterReceptionistSpec {
val config = ConfigFactory.parseString(s""" val config = ConfigFactory.parseString(s"""
@ -50,6 +50,9 @@ object ClusterReceptionistSpec {
jmx.multi-mbeans-in-same-jvm = on jmx.multi-mbeans-in-same-jvm = on
failure-detector.acceptable-heartbeat-pause = 3s failure-detector.acceptable-heartbeat-pause = 3s
} }
# test coverage that the durable store is not used
akka.cluster.distributed-data.durable.keys = ["*"]
""") """)
case object Pong extends CborSerializable case object Pong extends CborSerializable
@ -72,14 +75,14 @@ object ClusterReceptionistSpec {
val AnotherKey = ServiceKey[PingProtocol]("pingy-2") val AnotherKey = ServiceKey[PingProtocol]("pingy-2")
} }
class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturing { class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturing with ScalaFutures {
import ClusterReceptionistSpec._ import ClusterReceptionistSpec._
import Receptionist._ import Receptionist._
"The cluster receptionist" must { "The cluster receptionist" must {
"eventually replicate registrations to the other side" in { "eventually replicate registrations to the other side" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-1", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-1", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -114,7 +117,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"handle registrations before joining" in { "handle registrations before joining" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -141,11 +144,11 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"remove registrations when node dies" in { "remove registrations when node dies" taggedAs (LongRunningTest) in {
testNodeRemoval(down = true) testNodeRemoval(down = true)
} }
"remove registrations when node leaves" in { "remove registrations when node leaves" taggedAs (LongRunningTest) in {
testNodeRemoval(down = false) testNodeRemoval(down = false)
} }
@ -208,7 +211,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"not remove registrations when self is shutdown" in { "not remove registrations when self is shutdown" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-4", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-4", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -257,7 +260,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
"work with services registered before node joins cluster" in { "work with services registered before node joins cluster" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -315,7 +318,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"handle a new incarnation of the same node well" in { "handle a new incarnation of the same node well" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-6", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-6", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -413,7 +416,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
// reproducer of issue #26284 // reproducer of issue #26284
"handle a new incarnation of the same node that is no longer part of same cluster" in { "handle a new incarnation of the same node that is no longer part of same cluster" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit( val testKit1 = ActorTestKit(
"ClusterReceptionistSpec-test-7", "ClusterReceptionistSpec-test-7",
ConfigFactory.parseString(""" ConfigFactory.parseString("""
@ -517,7 +520,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"not lose removals on concurrent updates to same key" in { "not lose removals on concurrent updates to same key" taggedAs (LongRunningTest) in {
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
# disable delta propagation so we can have repeatable concurrent writes # disable delta propagation so we can have repeatable concurrent writes
# without delta reaching between nodes already # without delta reaching between nodes already
@ -588,7 +591,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"not conflict with the ClusterClient receptionist default name" in { "not conflict with the ClusterClient receptionist default name" taggedAs (LongRunningTest) in {
val testKit = ActorTestKit(s"ClusterReceptionistSpec-test-9", ClusterReceptionistSpec.config) val testKit = ActorTestKit(s"ClusterReceptionistSpec-test-9", ClusterReceptionistSpec.config)
try { try {
testKit.system.systemActorOf(Behaviors.ignore, "receptionist") testKit.system.systemActorOf(Behaviors.ignore, "receptionist")
@ -597,7 +600,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
} }
"handle unregistration and re-registration of services" in { "handle unregistration and re-registration of services" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-10", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-10", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -655,7 +658,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
"handle unregistration per key not per actor" in { "handle unregistration per key not per actor" taggedAs (LongRunningTest) in {
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-11", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-11", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
val testKit2 = ActorTestKit(system1.name, system1.settings.config) val testKit2 = ActorTestKit(system1.name, system1.settings.config)
@ -707,7 +710,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
"handle concurrent unregistration and registration on different nodes" in { "handle concurrent unregistration and registration on different nodes" taggedAs (LongRunningTest) in {
// this covers the fact that with ddata a removal can be lost // this covers the fact that with ddata a removal can be lost
val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-12", ClusterReceptionistSpec.config) val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-12", ClusterReceptionistSpec.config)
val system1 = testKit1.system val system1 = testKit1.system
@ -766,7 +769,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
} }
// Fixme concurrent registration and unregistration // Fixme concurrent registration and unregistration
"notify subscribers when registering and joining simultaneously" in { "notify subscribers when registering and joining simultaneously" taggedAs (LongRunningTest) in {
// failing test reproducer for issue #28792 // failing test reproducer for issue #28792
// It's possible that the registry entry from the ddata update arrives before MemberJoined. // It's possible that the registry entry from the ddata update arrives before MemberJoined.
val config = ConfigFactory.parseString(""" val config = ConfigFactory.parseString("""
@ -777,7 +780,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
akka.cluster.typed.receptionist.pruning-interval = 50ms akka.cluster.typed.receptionist.pruning-interval = 50ms
""").withFallback(ClusterReceptionistSpec.config) """).withFallback(ClusterReceptionistSpec.config)
val numberOfNodes = 6 // use 9 or more to stress it more val numberOfNodes = 6 // use 9 or more to stress it more
val testKits = Vector.fill(numberOfNodes)(ActorTestKit("ClusterReceptionistSpec", config)) val testKits = Vector.fill(numberOfNodes)(ActorTestKit("ClusterReceptionistSpec-13", config))
try { try {
val probes = testKits.map(t => TestProbe[Any]()(t.system)) val probes = testKits.map(t => TestProbe[Any]()(t.system))
testKits.zip(probes).foreach { case (t, p) => t.system.receptionist ! Subscribe(PingKey, p.ref) } testKits.zip(probes).foreach { case (t, p) => t.system.receptionist ! Subscribe(PingKey, p.ref) }
@ -828,5 +831,30 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin
testKits.foreach(_.shutdownTestKit()) testKits.foreach(_.shutdownTestKit())
} }
} }
"never use durable store" taggedAs (LongRunningTest) in {
val testKit = ActorTestKit("ClusterReceptionistSpec-test-14", ClusterReceptionistSpec.config)
val system = testKit.system
try {
val regProbe = testKit.createTestProbe[Registered]()
val service = testKit.spawn(pingPongBehavior)
system.receptionist ! Register(PingKey, service, regProbe.ref)
regProbe.expectMessage(Registered(PingKey, service))
import akka.actor.typed.scaladsl.adapter._
val classicSystem = system.toClassic
val replicatorPath = system.receptionist.path / "replicator"
// double check that the replicator is running where we expect it to
classicSystem.actorSelection(replicatorPath).resolveOne(testKit.timeout.duration).futureValue
// and that it does not have a durable store child
val durableStorePath = replicatorPath / "durableStore"
classicSystem.actorSelection(durableStorePath).resolveOne(testKit.timeout.duration).failed.futureValue
} finally {
testKit.shutdownTestKit()
}
}
} }
} }