diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala index 1361637cc0..778aa401ea 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardCoordinator.scala @@ -403,6 +403,11 @@ object ShardCoordinator { */ @SerialVersionUID(1L) final case class ShardStopped(shard: ShardId) extends CoordinatorCommand + /** + * Notification when the entire shard region has stopped + */ + @SerialVersionUID(1L) final case class RegionStopped(shardRegion: ActorRef) extends CoordinatorCommand + /** * `ShardRegion` requests full handoff to be able to shutdown gracefully. */ @@ -597,7 +602,7 @@ object ShardCoordinator { shardRegionFrom ! HandOff(shard) context.become(stoppingShard, discardOld = true) } else { - log.debug("{}: Remaining shard regions: {}", typeName, remaining.size) + log.debug("{}: Remaining shard regions for shard [{}]: {}", typeName, shard, remaining.size) } } @@ -1025,6 +1030,10 @@ abstract class ShardCoordinator( regionProxyTerminated(ref) } + case RegionStopped(ref) => + log.debug("{}: ShardRegion stopped: [{}]", typeName, ref) + regionTerminated(ref) + case DelayedShardRegionTerminated(ref) => regionTerminated(ref) } diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index f311786dbd..70e76af612 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -655,6 +655,8 @@ private[akka] class ShardRegion( override def postStop(): Unit = { super.postStop() + log.debug("{}: Region stopped", typeName) + coordinator.foreach(_ ! RegionStopped(context.self)) cluster.unsubscribe(self) gracefulShutdownProgress.trySuccess(Done) } @@ -836,7 +838,7 @@ private[akka] class ShardRegion( } sender() ! BeginHandOffAck(shard) } else { - log.debug("{}: Ignoring begin handoff as preparing to shutdown", typeName) + log.debug("{}: Ignoring begin handoff of shard [{}] as preparing to shutdown", typeName, shard) } case msg @ HandOff(shard) => diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala index ac31778b43..86ffc67331 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/protobuf/ClusterShardingMessageSerializer.scala @@ -67,6 +67,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy private val HandOffManifest = "BJ" private val ShardStoppedManifest = "BK" private val GracefulShutdownReqManifest = "BL" + private val RegionStoppedManifest = "BM" private val EntityStateManifest = "CA" private val EntityStartedManifest = "CB" @@ -152,6 +153,9 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy GracefulShutdownReqManifest -> { bytes => GracefulShutdownReq(actorRefMessageFromBinary(bytes)) }, + RegionStoppedManifest -> { bytes => + RegionStopped(actorRefMessageFromBinary(bytes)) + }, GetShardStatsManifest -> { _ => GetShardStats }, @@ -225,6 +229,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case _: HandOff => HandOffManifest case _: ShardStopped => ShardStoppedManifest case _: GracefulShutdownReq => GracefulShutdownReqManifest + case _: RegionStopped => RegionStoppedManifest case _: StartEntity => StartEntityManifest case _: StartEntityAck => StartEntityAckManifest @@ -273,6 +278,7 @@ private[akka] class ClusterShardingMessageSerializer(val system: ExtendedActorSy case ShardStopped(shardId) => shardIdMessageToProto(shardId).toByteArray case GracefulShutdownReq(ref) => actorRefMessageToProto(ref).toByteArray + case RegionStopped(ref) => actorRefMessageToProto(ref).toByteArray case m: EntityState => entityStateToProto(m).toByteArray case m: EntitiesStarted => entitiesStartedToProto(m).toByteArray diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index 772f45654d..5d4a80e500 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -4,18 +4,28 @@ package akka.cluster.sharding -import scala.concurrent.duration._ +import akka.Done +import scala.concurrent.duration._ import akka.actor._ import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy -import akka.cluster.sharding.ShardRegion.GracefulShutdown +import akka.cluster.sharding.ShardRegion.{ CurrentRegions, GracefulShutdown } import akka.remote.testconductor.RoleName import akka.testkit._ +import scala.concurrent.Future + abstract class ClusterShardingGracefulShutdownSpecConfig(mode: String) extends MultiNodeClusterShardingConfig( mode, - additionalConfig = "akka.persistence.journal.leveldb-shared.store.native = off") { + additionalConfig = + """ + akka.loglevel = info + akka.persistence.journal.leveldb-shared.store.native = off + # We set this high to allow pausing coordinated shutdown make sure the handoff completes 'immediately' and not + # relies on the member removal, which could make things take longer then necessary + akka.coordinated-shutdown.phases.cluster-sharding-shutdown-region.timeout = 60s + """) { val first = role("first") val second = role("second") } @@ -66,11 +76,10 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard lazy val region = ClusterSharding(system).shardRegion(typeName) s"Cluster sharding ($mode)" must { - "start some shards in both regions" in within(30.seconds) { startPersistenceIfNeeded(startOn = first, setStoreOn = Seq(first, second)) - join(first, first, typeName) + join(first, first, typeName) // oldest join(second, first, typeName) awaitAssert { @@ -83,11 +92,20 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard regionAddresses.size should be(2) } enterBarrier("after-2") + + region ! ShardRegion.GetCurrentRegions + expectMsgType[CurrentRegions].regions.size should be(2) } - "gracefully shutdown a region" in within(30.seconds) { + "gracefully shutdown the region on the newest node" in within(30.seconds) { runOn(second) { - region ! ShardRegion.GracefulShutdown + // Make sure the 'cluster-sharding-shutdown-region' phase takes at least 40 seconds, + // to validate region shutdown completion is propagated immediately and not postponed + // until when the cluster member leaves + CoordinatedShutdown(system).addTask("cluster-sharding-shutdown-region", "postpone-actual-stop")(() => { + akka.pattern.after(40.seconds)(Future.successful(Done)) + }) + CoordinatedShutdown(system).run(CoordinatedShutdown.unknownReason) } runOn(first) { @@ -102,6 +120,16 @@ abstract class ClusterShardingGracefulShutdownSpec(multiNodeConfig: ClusterShard } enterBarrier("handoff-completed") + // Check that the coordinator is correctly notified the region has stopped: + runOn(first) { + // the coordinator side should observe that the region has stopped + awaitAssert { + region ! ShardRegion.GetCurrentRegions + expectMsgType[CurrentRegions].regions.size should be(1) + } + // without having to wait for the member to be entirely removed (as that would cause unnecessary latency) + } + runOn(second) { watch(region) expectTerminated(region)