sharding: actively signal 'region stopped' to the coordinator (#30402)

This commit is contained in:
Arnout Engelen 2021-07-20 12:01:53 +02:00 committed by GitHub
parent 1dc345896c
commit ac70b1db38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 54 additions and 9 deletions

View file

@ -403,6 +403,11 @@ object ShardCoordinator {
*/
@SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand
/**
* Notification when the entire shard region has stopped
*/
@SerialVersionUID(1L) final case class RegionStopped(shardRegion: ActorRef) extends CoordinatorCommand
/**
* `ShardRegion` requests full handoff to be able to shutdown gracefully.
*/
@ -597,7 +602,7 @@ object ShardCoordinator {
shardRegionFrom ! HandOff(shard)
context.become(stoppingShard, discardOld = true)
} else {
log.debug("{}: Remaining shard regions: {}", typeName, remaining.size)
log.debug("{}: Remaining shard regions for shard [{}]: {}", typeName, shard, remaining.size)
}
}
@ -1025,6 +1030,10 @@ abstract class ShardCoordinator(
regionProxyTerminated(ref)
}
case RegionStopped(ref) =>
log.debug("{}: ShardRegion stopped: [{}]", typeName, ref)
regionTerminated(ref)
case DelayedShardRegionTerminated(ref) =>
regionTerminated(ref)
}

View file

@ -655,6 +655,8 @@ private[akka] class ShardRegion(
override def postStop(): Unit = {
super.postStop()
log.debug("{}: Region stopped", typeName)
coordinator.foreach(_ ! RegionStopped(context.self))
cluster.unsubscribe(self)
gracefulShutdownProgress.trySuccess(Done)
}
@ -836,7 +838,7 @@ private[akka] class ShardRegion(
}
sender() ! BeginHandOffAck(shard)
} else {
log.debug("{}: Ignoring begin handoff as preparing to shutdown", typeName)
log.debug("{}: Ignoring begin handoff of shard [{}] as preparing to shutdown", typeName, shard)
}
case msg @ HandOff(shard) =>

View file

@ -67,6 +67,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
private val HandOffManifest = "BJ"
private val ShardStoppedManifest = "BK"
private val GracefulShutdownReqManifest = "BL"
private val RegionStoppedManifest = "BM"
private val EntityStateManifest = "CA"
private val EntityStartedManifest = "CB"
@ -152,6 +153,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
GracefulShutdownReqManifest -> { bytes =>
GracefulShutdownReq(actorRefMessageFromBinary(bytes))
},
RegionStoppedManifest -> { bytes =>
RegionStopped(actorRefMessageFromBinary(bytes))
},
GetShardStatsManifest -> { _ =>
GetShardStats
},
@ -225,6 +229,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case _: HandOff => HandOffManifest
case _: ShardStopped => ShardStoppedManifest
case _: GracefulShutdownReq => GracefulShutdownReqManifest
case _: RegionStopped => RegionStoppedManifest
case _: StartEntity => StartEntityManifest
case _: StartEntityAck => StartEntityAckManifest
@ -273,6 +278,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy
case ShardStopped(shardId) => shardIdMessageToProto(shardId).toByteArray
case GracefulShutdownReq(ref) =>
actorRefMessageToProto(ref).toByteArray
case RegionStopped(ref) => actorRefMessageToProto(ref).toByteArray
case m: EntityState => entityStateToProto(m).toByteArray
case m: EntitiesStarted => entitiesStartedToProto(m).toByteArray

View file

@ -4,18 +4,28 @@
package akka.cluster.sharding
import scala.concurrent.duration._
import akka.Done
import scala.concurrent.duration._
import akka.actor._
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import akka.cluster.sharding.ShardRegion.GracefulShutdown
import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GracefulShutdown }
import akka.remote.testconductor.RoleName
import akka.testkit._
import scala.concurrent.Future
abstract class ClusterShardingGracefulShutdownSpecConfig(mode: String)
extends MultiNodeClusterShardingConfig(
mode,
additionalConfig = "akka.persistence.journal.leveldb-shared.store.native = off") {
additionalConfig =
"""
akka.loglevel = info
akka.persistence.journal.leveldb-shared.store.native = off
# We set this high to allow pausing coordinated shutdown make sure the handoff completes 'immediately' and not
# relies on the member removal, which could make things take longer then necessary
akka.coordinated-shutdown.phases.cluster-sharding-shutdown-region.timeout = 60s
""") {
val first = role("first")
val second = role("second")
}
@ -66,11 +76,10 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard
lazy val region = ClusterSharding(system).shardRegion(typeName)
s"Cluster sharding ($mode)" must {
"start some shards in both regions" in within(30.seconds) {
startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second))
join(first, first, typeName)
join(first, first, typeName) // oldest
join(second, first, typeName)
awaitAssert {
@ -83,11 +92,20 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard
regionAddresses.size should be(2)
}
enterBarrier("after-2")
region ! ShardRegion.GetCurrentRegions
expectMsgType[CurrentRegions].regions.size should be(2)
}
"gracefully shutdown a region" in within(30.seconds) {
"gracefully shutdown the region on the newest node" in within(30.seconds) {
runOn(second) {
region ! ShardRegion.GracefulShutdown
// Make sure the 'cluster-sharding-shutdown-region' phase takes at least 40 seconds,
// to validate region shutdown completion is propagated immediately and not postponed
// until when the cluster member leaves
CoordinatedShutdown(system).addTask("cluster-sharding-shutdown-region", "postpone-actual-stop")(() => {
akka.pattern.after(40.seconds)(Future.successful(Done))
})
CoordinatedShutdown(system).run(CoordinatedShutdown.unknownReason)
}
runOn(first) {
@ -102,6 +120,16 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard
}
enterBarrier("handoff-completed")
// Check that the coordinator is correctly notified the region has stopped:
runOn(first) {
// the coordinator side should observe that the region has stopped
awaitAssert {
region ! ShardRegion.GetCurrentRegions
expectMsgType[CurrentRegions].regions.size should be(1)
}
// without having to wait for the member to be entirely removed (as that would cause unnecessary latency)
}
runOn(second) {
watch(region)
expectTerminated(region)