From 17f7f1108ad5955abb9ff14f93c07696c8ac7f49 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 15 Jun 2020 11:36:20 +0200 Subject: [PATCH] Never use durable store for the cluster receptionist replicator #29231 (#29237) --- .../ClusterReceptionistSettings.scala | 9 ++- .../ClusterReceptionistSpec.scala | 64 +++++++++++++------ 2 files changed, 52 insertions(+), 21 deletions(-) diff --git a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala index 87ebe9980c..1605197695 100644 --- a/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala +++ b/akka-cluster-typed/src/main/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSettings.scala @@ -6,11 +6,10 @@ package akka.cluster.typed.internal.receptionist import scala.concurrent.duration._ import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS } - import com.typesafe.config.Config - import akka.actor.typed.ActorSystem import akka.annotation.InternalApi +import akka.cluster.ddata.Key.KeyId import akka.cluster.ddata.Replicator import akka.cluster.ddata.Replicator.WriteConsistency import akka.cluster.ddata.ReplicatorSettings @@ -38,12 +37,16 @@ private[akka] object ClusterReceptionistSettings { val replicatorSettings = ReplicatorSettings(config.getConfig("distributed-data")) + // Having durable store of entries does not make sense for receptionist, as registered + // services does not survive a full cluster stop, make sure that it is disabled + val replicatorSettingsWithoutDurableStore = replicatorSettings.withDurableKeys(Set.empty[KeyId]) + ClusterReceptionistSettings( writeConsistency, pruningInterval = config.getDuration("pruning-interval", MILLISECONDS).millis, pruneRemovedOlderThan = config.getDuration("prune-removed-older-than", MILLISECONDS).millis, config.getInt("distributed-key-count"), - replicatorSettings) + replicatorSettingsWithoutDurableStore) } } diff --git a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala index 26b9974472..73b2f47157 100644 --- a/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala +++ b/akka-cluster-typed/src/test/scala/akka/cluster/typed/internal/receptionist/ClusterReceptionistSpec.scala @@ -8,11 +8,9 @@ import java.util.concurrent.ThreadLocalRandom import scala.concurrent.Await import scala.concurrent.duration._ - import com.typesafe.config.ConfigFactory import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.AnyWordSpec - import akka.actor.RootActorPath import akka.actor.testkit.typed.FishingOutcome import akka.actor.testkit.typed.scaladsl.ActorTestKit @@ -30,6 +28,8 @@ import akka.cluster.typed.Join import akka.cluster.typed.JoinSeedNodes import akka.cluster.typed.Leave import akka.serialization.jackson.CborSerializable +import akka.testkit.LongRunningTest +import org.scalatest.concurrent.ScalaFutures object ClusterReceptionistSpec { val config = ConfigFactory.parseString(s""" @@ -50,6 +50,9 @@ object ClusterReceptionistSpec { jmx.multi-mbeans-in-same-jvm = on failure-detector.acceptable-heartbeat-pause = 3s } + + # test coverage that the durable store is not used + akka.cluster.distributed-data.durable.keys = ["*"] """) case object Pong extends CborSerializable @@ -72,14 +75,14 @@ object ClusterReceptionistSpec { val AnotherKey = ServiceKey[PingProtocol]("pingy-2") } -class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturing { +class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturing with ScalaFutures { import ClusterReceptionistSpec._ import Receptionist._ "The cluster receptionist" must { - "eventually replicate registrations to the other side" in { + "eventually replicate registrations to the other side" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-1", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -114,7 +117,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "handle registrations before joining" in { + "handle registrations before joining" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-2", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -141,11 +144,11 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "remove registrations when node dies" in { + "remove registrations when node dies" taggedAs (LongRunningTest) in { testNodeRemoval(down = true) } - "remove registrations when node leaves" in { + "remove registrations when node leaves" taggedAs (LongRunningTest) in { testNodeRemoval(down = false) } @@ -208,7 +211,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "not remove registrations when self is shutdown" in { + "not remove registrations when self is shutdown" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-4", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -257,7 +260,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } - "work with services registered before node joins cluster" in { + "work with services registered before node joins cluster" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-5", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -315,7 +318,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "handle a new incarnation of the same node well" in { + "handle a new incarnation of the same node well" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-6", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -413,7 +416,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } // reproducer of issue #26284 - "handle a new incarnation of the same node that is no longer part of same cluster" in { + "handle a new incarnation of the same node that is no longer part of same cluster" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit( "ClusterReceptionistSpec-test-7", ConfigFactory.parseString(""" @@ -517,7 +520,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "not lose removals on concurrent updates to same key" in { + "not lose removals on concurrent updates to same key" taggedAs (LongRunningTest) in { val config = ConfigFactory.parseString(""" # disable delta propagation so we can have repeatable concurrent writes # without delta reaching between nodes already @@ -588,7 +591,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "not conflict with the ClusterClient receptionist default name" in { + "not conflict with the ClusterClient receptionist default name" taggedAs (LongRunningTest) in { val testKit = ActorTestKit(s"ClusterReceptionistSpec-test-9", ClusterReceptionistSpec.config) try { testKit.system.systemActorOf(Behaviors.ignore, "receptionist") @@ -597,7 +600,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } } - "handle unregistration and re-registration of services" in { + "handle unregistration and re-registration of services" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-10", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -655,7 +658,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } - "handle unregistration per key not per actor" in { + "handle unregistration per key not per actor" taggedAs (LongRunningTest) in { val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-11", ClusterReceptionistSpec.config) val system1 = testKit1.system val testKit2 = ActorTestKit(system1.name, system1.settings.config) @@ -707,7 +710,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } - "handle concurrent unregistration and registration on different nodes" in { + "handle concurrent unregistration and registration on different nodes" taggedAs (LongRunningTest) in { // this covers the fact that with ddata a removal can be lost val testKit1 = ActorTestKit("ClusterReceptionistSpec-test-12", ClusterReceptionistSpec.config) val system1 = testKit1.system @@ -766,7 +769,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin } // Fixme concurrent registration and unregistration - "notify subscribers when registering and joining simultaneously" in { + "notify subscribers when registering and joining simultaneously" taggedAs (LongRunningTest) in { // failing test reproducer for issue #28792 // It's possible that the registry entry from the ddata update arrives before MemberJoined. val config = ConfigFactory.parseString(""" @@ -777,7 +780,7 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin akka.cluster.typed.receptionist.pruning-interval = 50ms """).withFallback(ClusterReceptionistSpec.config) val numberOfNodes = 6 // use 9 or more to stress it more - val testKits = Vector.fill(numberOfNodes)(ActorTestKit("ClusterReceptionistSpec", config)) + val testKits = Vector.fill(numberOfNodes)(ActorTestKit("ClusterReceptionistSpec-13", config)) try { val probes = testKits.map(t => TestProbe[Any]()(t.system)) testKits.zip(probes).foreach { case (t, p) => t.system.receptionist ! Subscribe(PingKey, p.ref) } @@ -828,5 +831,30 @@ class ClusterReceptionistSpec extends AnyWordSpec with Matchers with LogCapturin testKits.foreach(_.shutdownTestKit()) } } + + "never use durable store" taggedAs (LongRunningTest) in { + val testKit = ActorTestKit("ClusterReceptionistSpec-test-14", ClusterReceptionistSpec.config) + val system = testKit.system + try { + val regProbe = testKit.createTestProbe[Registered]() + val service = testKit.spawn(pingPongBehavior) + system.receptionist ! Register(PingKey, service, regProbe.ref) + regProbe.expectMessage(Registered(PingKey, service)) + + import akka.actor.typed.scaladsl.adapter._ + val classicSystem = system.toClassic + val replicatorPath = system.receptionist.path / "replicator" + + // double check that the replicator is running where we expect it to + classicSystem.actorSelection(replicatorPath).resolveOne(testKit.timeout.duration).futureValue + + // and that it does not have a durable store child + val durableStorePath = replicatorPath / "durableStore" + classicSystem.actorSelection(durableStorePath).resolveOne(testKit.timeout.duration).failed.futureValue + + } finally { + testKit.shutdownTestKit() + } + } } }