=cls #18762 fix graceful shutdown of empty region
This commit is contained in:
parent
712233f725
commit
5c418efef2
2 changed files with 33 additions and 18 deletions
|
|
@ -515,12 +515,14 @@ class ShardRegion(
|
|||
else {
|
||||
sendGracefulShutdownToCoordinator()
|
||||
requestShardBufferHomes()
|
||||
tryCompleteGracefulShutdown()
|
||||
}
|
||||
|
||||
case GracefulShutdown ⇒
|
||||
log.debug("Starting graceful shutdown of region and all its shards")
|
||||
gracefulShutdownInProgress = true
|
||||
sendGracefulShutdownToCoordinator()
|
||||
tryCompleteGracefulShutdown()
|
||||
|
||||
case _ ⇒ unhandled(cmd)
|
||||
}
|
||||
|
|
@ -569,8 +571,7 @@ class ShardRegion(
|
|||
}
|
||||
}
|
||||
|
||||
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty)
|
||||
context.stop(self) // all shards have been rebalanced, complete graceful shutdown
|
||||
tryCompleteGracefulShutdown()
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -601,6 +602,10 @@ class ShardRegion(
|
|||
})
|
||||
}
|
||||
|
||||
private def tryCompleteGracefulShutdown() =
|
||||
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty)
|
||||
context.stop(self) // all shards have been rebalanced, complete graceful shutdown
|
||||
|
||||
def register(): Unit = {
|
||||
coordinatorSelection.foreach(_ ! registrationMessage)
|
||||
if (shardBuffers.nonEmpty && retryCount >= 5)
|
||||
|
|
|
|||
|
|
@ -3,28 +3,20 @@
|
|||
*/
|
||||
package akka.cluster.sharding
|
||||
|
||||
import scala.collection.immutable
|
||||
import java.io.File
|
||||
import akka.cluster.sharding.ShardRegion.Passivate
|
||||
import scala.concurrent.duration._
|
||||
import org.apache.commons.io.FileUtils
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import akka.actor._
|
||||
import akka.cluster.Cluster
|
||||
import akka.cluster.ClusterEvent._
|
||||
import akka.cluster.sharding.ShardRegion.GracefulShutdown
|
||||
import akka.persistence.Persistence
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbJournal
|
||||
import akka.persistence.journal.leveldb.SharedLeveldbStore
|
||||
import akka.persistence.journal.leveldb.{SharedLeveldbJournal, SharedLeveldbStore}
|
||||
import akka.remote.testconductor.RoleName
|
||||
import akka.remote.testkit.MultiNodeConfig
|
||||
import akka.remote.testkit.MultiNodeSpec
|
||||
import akka.remote.testkit.STMultiNodeSpec
|
||||
import akka.remote.transport.ThrottlerTransportAdapter.Direction
|
||||
import akka.remote.testkit.{MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec}
|
||||
import akka.testkit._
|
||||
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
|
||||
import scala.concurrent.Future
|
||||
import akka.util.Timeout
|
||||
import akka.pattern.ask
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.apache.commons.io.FileUtils
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object ClusterShardingGracefulShutdownSpec {
|
||||
case object StopEntity
|
||||
|
|
@ -204,6 +196,24 @@ abstract class ClusterShardingGracefulShutdownSpec(config: ClusterShardingGracef
|
|||
enterBarrier("after-3")
|
||||
}
|
||||
|
||||
"gracefully shutdown empty region" in within(30.seconds) {
|
||||
runOn(first) {
|
||||
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
|
||||
val regionEmpty = ClusterSharding(system).start(
|
||||
typeName = "EntityEmpty",
|
||||
entityProps = Props[Entity],
|
||||
settings = ClusterShardingSettings(system),
|
||||
extractEntityId = extractEntityId,
|
||||
extractShardId = extractShardId,
|
||||
allocationStrategy,
|
||||
handOffStopMessage = StopEntity)
|
||||
|
||||
watch(regionEmpty)
|
||||
regionEmpty ! GracefulShutdown
|
||||
expectTerminated(regionEmpty, 5.seconds)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue