Fix for PersistentShardingMigrationSpec failing #29234 #29690

and same fix for RememberEntitiesShardIdExtractorChangeSpec
This commit is contained in:
Johan Andrén 2020-10-12 12:41:21 +02:00 committed by GitHub
parent 0e0680bd82
commit 3a03fd5f39
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 33 additions and 3 deletions

View file

@ -1268,8 +1268,13 @@ class PersistentShardCoordinator(
case StateInitialized => case StateInitialized =>
stateInitialized() stateInitialized()
log.debug("{}: Coordinator initialization completed", typeName)
context.become(active.orElse[Any, Unit](receiveSnapshotResult)) context.become(active.orElse[Any, Unit](receiveSnapshotResult))
case Register(region) =>
// region will retry so ok to ignore
log.debug("{}: Ignoring registration from region [{}] while initializing", typeName, region)
}: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult) }: Receive).orElse[Any, Unit](receiveTerminated).orElse[Any, Unit](receiveSnapshotResult)
def receiveSnapshotResult: Receive = { def receiveSnapshotResult: Receive = {

View file

@ -231,9 +231,13 @@ object ShardRegion {
@SerialVersionUID(1L) case object GetCurrentRegions extends ShardRegionQuery with ClusterShardingSerializable @SerialVersionUID(1L) case object GetCurrentRegions extends ShardRegionQuery with ClusterShardingSerializable
/** /**
* Java API: * Java API: Send this message to the `ShardRegion` actor to request for [[CurrentRegions]],
* which contains the addresses of all registered regions.
*
* Intended for testing purpose to see when cluster sharding is "ready" or to monitor
* the state of the shard regions.
*/ */
def getCurrentRegionsInstance = GetCurrentRegions def getCurrentRegionsInstance: GetCurrentRegions.type = GetCurrentRegions
/** /**
* Reply to `GetCurrentRegions` * Reply to `GetCurrentRegions`

View file

@ -6,6 +6,7 @@ package akka.cluster.sharding
import java.util.UUID import java.util.UUID
import akka.actor.{ ActorRef, ActorSystem, Props } import akka.actor.{ ActorRef, ActorSystem, Props }
import akka.cluster.sharding.ShardRegion.CurrentRegions
import akka.cluster.{ Cluster, MemberStatus } import akka.cluster.{ Cluster, MemberStatus }
import akka.persistence.PersistentActor import akka.persistence.PersistentActor
import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe }
@ -106,10 +107,11 @@ class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrati
import PersistentShardingMigrationSpec._ import PersistentShardingMigrationSpec._
"Migration" should { "Migration" should {
"allow migration of remembered shards and now allow going back" in { "allow migration of remembered shards and not allow going back" in {
val typeName = "Migration" val typeName = "Migration"
withSystem(config, typeName, "OldMode") { (_, region, _) => withSystem(config, typeName, "OldMode") { (_, region, _) =>
assertRegionRegistrationComplete(region)
region ! Message(1) region ! Message(1)
expectMsg("ack") expectMsg("ack")
region ! Message(2) region ! Message(2)
@ -119,6 +121,7 @@ class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrati
} }
withSystem(configForNewMode, typeName, "NewMode") { (system, region, rememberedEntitiesProbe) => withSystem(configForNewMode, typeName, "NewMode") { (system, region, rememberedEntitiesProbe) =>
assertRegionRegistrationComplete(region)
val probe = TestProbe()(system) val probe = TestProbe()(system)
region.tell(Message(1), probe.ref) region.tell(Message(1), probe.ref)
probe.expectMsg("ack") probe.expectMsg("ack")
@ -172,5 +175,13 @@ class PersistentShardingMigrationSpec extends AkkaSpec(PersistentShardingMigrati
Await.ready(system.terminate(), 20.seconds) Await.ready(system.terminate(), 20.seconds)
} }
} }
def assertRegionRegistrationComplete(region: ActorRef): Unit = {
awaitAssert {
region ! ShardRegion.GetCurrentRegions
expectMsgType[CurrentRegions].regions should have size (1)
}
}
} }
} }

View file

@ -10,6 +10,7 @@ import akka.actor.ActorRef
import akka.actor.ActorSystem import akka.actor.ActorSystem
import akka.actor.Props import akka.actor.Props
import akka.cluster.Cluster import akka.cluster.Cluster
import akka.cluster.sharding.ShardRegion.CurrentRegions
import akka.persistence.PersistentActor import akka.persistence.PersistentActor
import akka.testkit.AkkaSpec import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender import akka.testkit.ImplicitSender
@ -85,6 +86,7 @@ class RememberEntitiesShardIdExtractorChangeSpec
"allow a change to the shard id extractor" in { "allow a change to the shard id extractor" in {
withSystem("FirstShardIdExtractor", firstExtractShardId) { (_, region) => withSystem("FirstShardIdExtractor", firstExtractShardId) { (_, region) =>
assertRegionRegistrationComplete(region)
region ! Message(1) region ! Message(1)
expectMsg("ack") expectMsg("ack")
region ! Message(11) region ! Message(11)
@ -132,5 +134,13 @@ class RememberEntitiesShardIdExtractorChangeSpec
Await.ready(system.terminate(), 20.seconds) Await.ready(system.terminate(), 20.seconds)
} }
} }
def assertRegionRegistrationComplete(region: ActorRef): Unit = {
awaitAssert {
region ! ShardRegion.GetCurrentRegions
expectMsgType[CurrentRegions].regions should have size (1)
}
}
} }
} }