diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 9c5d3ab46b..6f28ac7313 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -1268,8 +1268,13 @@ class PersistentShardCoordinator( case StateInitialized => stateInitialized() + log.debug("{}: Coordinator initialization completed", typeName) context.become(active.orElse[Any, Unit](receiveSnapshotResult)) + case Register(region) => + // region will retry so ok to ignore + log.debug("{}: Ignoring registration from region [{}] while initializing", typeName, region) + }: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult) def receiveSnapshotResult: Receive = { diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index 78f56b2fdc..c606069bb5 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -231,9 +231,13 @@ object ShardRegion { @SerialVersionUID(1L) case object GetCurrentRegions extends ShardRegionQuery with ClusterShardingSerializable /** - * Java API: + * Java API: Send this message to the `ShardRegion` actor to request for [[CurrentRegions]], + * which contains the addresses of all registered regions. + * + * Intended for testing purpose to see when cluster sharding is "ready" or to monitor + * the state of the shard regions. */ - def getCurrentRegionsInstance = GetCurrentRegions + def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions /** * Reply to `GetCurrentRegions` diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala index cf133d3088..2f8e9fb82d 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/PersistentShardingMigrationSpec.scala @@ -6,6 +6,7 @@ package akka.cluster.sharding import java.util.UUID import akka.actor.{ ActorRef, ActorSystem, Props } +import akka.cluster.sharding.ShardRegion.CurrentRegions import akka.cluster.{ Cluster, MemberStatus } import akka.persistence.PersistentActor import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } @@ -106,10 +107,11 @@ class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrati import PersistentShardingMigrationSpec._ "Migration" should { - "allow migration of remembered shards and now allow going back" in { + "allow migration of remembered shards and not allow going back" in { val typeName = "Migration" withSystem(config, typeName, "OldMode") { (_, region, _) => + assertRegionRegistrationComplete(region) region ! Message(1) expectMsg("ack") region ! Message(2) @@ -119,6 +121,7 @@ class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrati } withSystem(configForNewMode, typeName, "NewMode") { (system, region, rememberedEntitiesProbe) => + assertRegionRegistrationComplete(region) val probe = TestProbe()(system) region.tell(Message(1), probe.ref) probe.expectMsg("ack") @@ -172,5 +175,13 @@ class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrati Await.ready(system.terminate(), 20.seconds) } } + + def assertRegionRegistrationComplete(region: ActorRef): Unit = { + awaitAssert { + region ! ShardRegion.GetCurrentRegions + expectMsgType[CurrentRegions].regions should have size (1) + } + } } + } diff --git a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala index f2cc684354..ee34ef0631 100644 --- a/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala +++ b/akka-cluster-sharding/src/test/scala/akka/cluster/sharding/RememberEntitiesShardIdExtractorChangeSpec.scala @@ -10,6 +10,7 @@ import akka.actor.ActorRef import akka.actor.ActorSystem import akka.actor.Props import akka.cluster.Cluster +import akka.cluster.sharding.ShardRegion.CurrentRegions import akka.persistence.PersistentActor import akka.testkit.AkkaSpec import akka.testkit.ImplicitSender @@ -85,6 +86,7 @@ class RememberEntitiesShardIdExtractorChangeSpec "allow a change to the shard id extractor" in { withSystem("FirstShardIdExtractor", firstExtractShardId) { (_, region) => + assertRegionRegistrationComplete(region) region ! Message(1) expectMsg("ack") region ! Message(11) @@ -132,5 +134,13 @@ class RememberEntitiesShardIdExtractorChangeSpec Await.ready(system.terminate(), 20.seconds) } } + + def assertRegionRegistrationComplete(region: ActorRef): Unit = { + awaitAssert { + region ! ShardRegion.GetCurrentRegions + expectMsgType[CurrentRegions].regions should have size (1) + } + } } + }