diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala index d137c7f7aa..0936e5acde 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ShardRegion.scala @@ -515,12 +515,14 @@ class ShardRegion( else { sendGracefulShutdownToCoordinator() requestShardBufferHomes() + tryCompleteGracefulShutdown() } case GracefulShutdown ⇒ log.debug("Starting graceful shutdown of region and all its shards") gracefulShutdownInProgress = true sendGracefulShutdownToCoordinator() + tryCompleteGracefulShutdown() case _ ⇒ unhandled(cmd) } @@ -569,8 +571,7 @@ class ShardRegion( } } - if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) - context.stop(self) // all shards have been rebalanced, complete graceful shutdown + tryCompleteGracefulShutdown() } } @@ -601,6 +602,10 @@ class ShardRegion( }) } + private def tryCompleteGracefulShutdown() = + if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) + context.stop(self) // all shards have been rebalanced, complete graceful shutdown + def register(): Unit = { coordinatorSelection.foreach(_ ! registrationMessage) if (shardBuffers.nonEmpty && retryCount >= 5) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala index b7fb193d1d..f6b9164445 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -3,28 +3,20 @@ */ package akka.cluster.sharding -import scala.collection.immutable import java.io.File -import akka.cluster.sharding.ShardRegion.Passivate -import scala.concurrent.duration._ -import org.apache.commons.io.FileUtils -import com.typesafe.config.ConfigFactory + import akka.actor._ import akka.cluster.Cluster -import akka.cluster.ClusterEvent._ +import akka.cluster.sharding.ShardRegion.GracefulShutdown import akka.persistence.Persistence -import akka.persistence.journal.leveldb.SharedLeveldbJournal -import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore} import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec -import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec} import akka.testkit._ -import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy -import scala.concurrent.Future -import akka.util.Timeout -import akka.pattern.ask +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils + +import scala.concurrent.duration._ object ClusterShardingGracefulShutdownSpec { case object StopEntity @@ -204,6 +196,24 @@ abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracef enterBarrier("after-3") } + "gracefully shutdown empty region" in within(30.seconds) { + runOn(first) { + val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) + val regionEmpty = ClusterSharding(system).start( + typeName = "EntityEmpty", + entityProps = Props[Entity], + settings = ClusterShardingSettings(system), + extractEntityId = extractEntityId, + extractShardId = extractShardId, + allocationStrategy, + handOffStopMessage = StopEntity) + + watch(regionEmpty) + regionEmpty ! GracefulShutdown + expectTerminated(regionEmpty, 5.seconds) + } + } + } }