+cls #16050 Support graceful shutdown of ShardRegion

* possibility to define handOffStopMessage that is used
  for stopping the entries, both when graceful shutdown
  and rebalance
This commit is contained in:
Patrik Nordwall 2015-02-17 20:17:06 +01:00
parent 1dac401099
commit 1be5bb48df
6 changed files with 374 additions and 49 deletions

View file

@ -230,6 +230,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* that passed the `idExtractor` will be used
* @param allocationStrategy possibility to use a custom shard allocation and
* rebalancing logic
* @param handOffStopMessage the message that will be sent to entries when they are to be stopped
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
@ -239,12 +241,14 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver,
allocationStrategy: ShardAllocationStrategy): ActorRef = {
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
val resolvedRole = if (roleOverride == None) Role else roleOverride
implicit val timeout = system.settings.CreationTimeout
val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, allocationStrategy)
val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries,
idExtractor, shardResolver, allocationStrategy, handOffStopMessage)
val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration)
regions.put(typeName, shardRegion)
shardRegion
@ -256,7 +260,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* for this type can later be retrieved with the [[#shardRegion]] method.
*
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used.
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
@ -286,7 +290,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
shardResolver: ShardRegion.ShardResolver): ActorRef = {
start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver,
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance))
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance),
PoisonPill)
}
/**
@ -310,6 +315,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* entry from the incoming message
* @param allocationStrategy possibility to use a custom shard allocation and
* rebalancing logic
* @param handOffStopMessage the message that will be sent to entries when they are to be stopped
* for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`.
* @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard
*/
def start(
@ -318,7 +325,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
roleOverride: Option[String],
rememberEntries: Boolean,
messageExtractor: ShardRegion.MessageExtractor,
allocationStrategy: ShardAllocationStrategy): ActorRef = {
allocationStrategy: ShardAllocationStrategy,
handOffStopMessage: Any): ActorRef = {
start(typeName, entryProps = Option(entryProps), roleOverride, rememberEntries,
idExtractor = {
@ -326,7 +334,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
},
shardResolver = msg messageExtractor.shardId(msg),
allocationStrategy = allocationStrategy)
allocationStrategy = allocationStrategy,
handOffStopMessage = handOffStopMessage)
}
/**
@ -335,7 +344,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
* for this type can later be retrieved with the [[#shardRegion]] method.
*
* The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]]
* is used.
* is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`.
*
* Some settings can be configured as described in the `akka.cluster.sharding` section
* of the `reference.conf`.
@ -361,7 +370,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
messageExtractor: ShardRegion.MessageExtractor): ActorRef = {
start(typeName, entryProps, roleOverride, rememberEntries, messageExtractor,
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance))
new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance),
PoisonPill)
}
/**
@ -382,7 +392,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension {
private[akka] object ClusterShardingGuardian {
import ShardCoordinator.ShardAllocationStrategy
final case class Start(typeName: String, entryProps: Option[Props], role: Option[String], rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy)
idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver,
allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any)
extends NoSerializationVerificationNeeded
final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded
}
@ -399,7 +410,7 @@ private[akka] class ClusterShardingGuardian extends Actor {
import sharding.Settings._
def receive = {
case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy)
case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage)
val encName = URLEncoder.encode(typeName, "utf-8")
val coordinatorSingletonManagerName = encName + "Coordinator"
val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress
@ -430,7 +441,8 @@ private[akka] class ClusterShardingGuardian extends Actor {
bufferSize = BufferSize,
rememberEntries = rememberEntries,
idExtractor = idExtractor,
shardResolver = shardResolver),
shardResolver = shardResolver,
handOffStopMessage = handOffStopMessage),
name = encName)
}
sender() ! Started(shardRegion)
@ -459,8 +471,10 @@ object ShardRegion {
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): Props =
Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver))
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any): Props =
Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff,
snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage))
/**
* Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor.
@ -477,8 +491,10 @@ object ShardRegion {
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): Props =
props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver)
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any): Props =
props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff,
snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)
/**
* Java API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor.
@ -494,14 +510,17 @@ object ShardRegion {
snapshotInterval: FiniteDuration,
bufferSize: Int,
rememberEntries: Boolean,
messageExtractor: ShardRegion.MessageExtractor): Props = {
messageExtractor: ShardRegion.MessageExtractor,
handOffStopMessage: Any): Props = {
props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries,
props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff,
entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries,
idExtractor = {
case msg if messageExtractor.entryId(msg) ne null
(messageExtractor.entryId(msg), messageExtractor.entryMessage(msg))
}: ShardRegion.IdExtractor,
shardResolver = msg messageExtractor.shardId(msg))
shardResolver = msg messageExtractor.shardId(msg),
handOffStopMessage = handOffStopMessage)
}
/**
@ -516,7 +535,8 @@ object ShardRegion {
bufferSize: Int,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): Props =
Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero, Duration.Zero, bufferSize, false, idExtractor, shardResolver))
Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero,
Duration.Zero, bufferSize, false, idExtractor, shardResolver, PoisonPill))
/**
* Java API: : Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor
@ -611,21 +631,36 @@ object ShardRegion {
*/
@SerialVersionUID(1L) final case class Passivate(stopMessage: Any) extends ShardRegionCommand
/*
* Send this message to the `ShardRegion` actor to handoff all shards that are hosted by
* the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch`
* it to know when it is completed.
*/
@SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand
/**
* Java API: Send this message to the `ShardRegion` actor to handoff all shards that are hosted by
* the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch`
* it to know when it is completed.
*/
def gracefulShutdownInstance = GracefulShutdown
private case object Retry extends ShardRegionCommand
private def roleOption(role: String): Option[String] =
if (role == "") None else Option(role)
/**
* INTERNAL API. Sends `PoisonPill` to the entries and when all of them have terminated
* it replies with `ShardStopped`.
* INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entries and when all of
* them have terminated it replies with `ShardStopped`.
*/
private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entries: Set[ActorRef]) extends Actor {
private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any)
extends Actor {
import ShardCoordinator.Internal.ShardStopped
entries.foreach { a
context watch a
a ! PoisonPill
a ! stopMessage
}
var remaining = entries
@ -640,8 +675,9 @@ object ShardRegion {
}
}
private[akka] def handOffStopperProps(shard: String, replyTo: ActorRef, entries: Set[ActorRef]): Props =
Props(new HandOffStopper(shard, replyTo, entries))
private[akka] def handOffStopperProps(
shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any): Props =
Props(new HandOffStopper(shard, replyTo, entries, stopMessage))
}
/**
@ -663,7 +699,8 @@ class ShardRegion(
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver) extends Actor with ActorLogging {
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any) extends Actor with ActorLogging {
import ShardCoordinator.Internal._
import ShardRegion._
@ -680,6 +717,7 @@ class ShardRegion(
var shards = Map.empty[ShardId, ActorRef]
var shardsByRef = Map.empty[ActorRef, ShardId]
var handingOff = Set.empty[ActorRef]
var gracefulShutdownInProgress = false
def totalBufferSize = shardBuffers.foldLeft(0) { (sum, entry) sum + entry._2.size }
@ -817,8 +855,15 @@ class ShardRegion(
case Retry
if (coordinator.isEmpty)
register()
else
else {
sendGracefulShutdownToCoordinator()
requestShardBufferHomes()
}
case GracefulShutdown
log.debug("Starting graceful shutdown of region and all its shards")
gracefulShutdownInProgress = true
sendGracefulShutdownToCoordinator()
case _ unhandled(cmd)
}
@ -844,6 +889,9 @@ class ShardRegion(
} else {
throw new IllegalStateException(s"Shard [$shardId] terminated while not being handed off.")
}
if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty)
context.stop(self) // all shards have been rebalanced, complete graceful shutdown
}
}
@ -916,7 +964,8 @@ class ShardRegion(
bufferSize,
rememberEntries,
idExtractor,
shardResolver),
shardResolver,
handOffStopMessage),
name))
shards = shards.updated(id, shard)
shardsByRef = shardsByRef.updated(shard, id)
@ -924,6 +973,10 @@ class ShardRegion(
case None
throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion")
})
def sendGracefulShutdownToCoordinator(): Unit =
if (gracefulShutdownInProgress)
coordinator.foreach(_ ! GracefulShutdownReq(self))
}
/**
@ -992,8 +1045,10 @@ private[akka] object Shard {
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver): Props =
Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver))
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any): Props =
Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval,
bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage))
}
/**
@ -1014,7 +1069,8 @@ private[akka] class Shard(
bufferSize: Int,
rememberEntries: Boolean,
idExtractor: ShardRegion.IdExtractor,
shardResolver: ShardRegion.ShardResolver) extends PersistentActor with ActorLogging {
shardResolver: ShardRegion.ShardResolver,
handOffStopMessage: Any) extends PersistentActor with ActorLogging {
import ShardRegion.{ handOffStopperProps, EntryId, Msg, Passivate }
import ShardCoordinator.Internal.{ HandOff, ShardStopped }
@ -1105,7 +1161,8 @@ private[akka] class Shard(
log.debug("HandOff shard [{}]", shardId)
if (state.entries.nonEmpty) {
handOffStopper = Some(context.watch(context.actorOf(handOffStopperProps(shardId, replyTo, idByRef.keySet))))
handOffStopper = Some(context.watch(context.actorOf(
handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage))))
//During hand off we only care about watching for termination of the hand off stopper
context become {
@ -1470,6 +1527,11 @@ object ShardCoordinator {
*/
@SerialVersionUID(1L) final case class RebalanceResult(shards: Set[ShardId]) extends CoordinatorCommand
/**
* `ShardRegion` requests full handoff to be able to shutdown gracefully.
*/
@SerialVersionUID(1L) final case class GracefulShutdownReq(shardRegion: ActorRef) extends CoordinatorCommand
// DomainEvents for the persistent state of the event sourced ShardCoordinator
sealed trait DomainEvent
@SerialVersionUID(1L) final case class ShardRegionRegistered(region: ActorRef) extends DomainEvent
@ -1605,6 +1667,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
var persistentState = State.empty
var rebalanceInProgress = Set.empty[ShardId]
var unAckedHostShards = Map.empty[ShardId, Cancellable]
// regions that have requested handoff, for graceful shutdown
var gracefulShutdownInProgress = Set.empty[ActorRef]
import context.dispatcher
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
@ -1665,7 +1729,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
log.debug("ShardRegion registered: [{}]", region)
if (persistentState.regions.contains(region))
sender() ! RegisterAck(self)
else
else {
gracefulShutdownInProgress -= region
persist(ShardRegionRegistered(region)) { evt
val firstRegion = persistentState.regions.isEmpty
@ -1676,6 +1741,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
if (firstRegion)
allocateShardHomes()
}
}
case RegisterProxy(proxy)
log.debug("ShardRegion proxy registered: [{}]", proxy)
@ -1695,6 +1761,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
require(persistentState.regions.contains(ref), s"Terminated region $ref not registered")
persistentState.regions(ref).foreach { s self ! GetShardHome(s) }
gracefulShutdownInProgress -= ref
persist(ShardRegionTerminated(ref)) { evt
persistentState = persistentState.updated(evt)
allocateShardHomes()
@ -1711,9 +1779,10 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
persistentState.shards.get(shard) match {
case Some(ref) sender() ! ShardHome(shard, ref)
case None
if (persistentState.regions.nonEmpty) {
val activeRegions = persistentState.regions -- gracefulShutdownInProgress
if (activeRegions.nonEmpty) {
val getShardHomeSender = sender()
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, persistentState.regions)
val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions)
regionFuture.value match {
case Some(Success(region))
continueGetShardHome(shard, region, getShardHomeSender)
@ -1771,11 +1840,24 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
rebalanceInProgress -= shard
log.debug("Rebalance shard [{}] done [{}]", shard, ok)
// The shard could have been removed by ShardRegionTerminated
if (ok && persistentState.shards.contains(shard))
persist(ShardHomeDeallocated(shard)) { evt
persistentState = persistentState.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
allocateShardHomes()
if (persistentState.shards.contains(shard))
if (ok)
persist(ShardHomeDeallocated(shard)) { evt
persistentState = persistentState.updated(evt)
log.debug("Shard [{}] deallocated", evt.shard)
allocateShardHomes()
}
else // rebalance not completed, graceful shutdown will be retried
gracefulShutdownInProgress -= persistentState.shards(shard)
case GracefulShutdownReq(region)
if (!gracefulShutdownInProgress(region))
persistentState.regions.get(region) match {
case Some(shards)
log.debug("Graceful shutdown of region [{}] with shards [{}]", region, shards)
gracefulShutdownInProgress += region
continueRebalance(shards.toSet)
case None
}
case SnapshotTick
@ -1818,7 +1900,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
persistentState.shards.get(shard) match {
case Some(ref) getShardHomeSender ! ShardHome(shard, ref)
case None
if (persistentState.regions.contains(region)) {
if (persistentState.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) {
persist(ShardHomeAllocated(shard, region)) { evt
persistentState = persistentState.updated(evt)
log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region)

View file

@ -139,7 +139,8 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver,
allocationStrategy = TestAllocationStrategy(allocator))
allocationStrategy = TestAllocationStrategy(allocator),
handOffStopMessage = PoisonPill)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")

View file

@ -0,0 +1,197 @@
/**
* Copyright (C) 2009-2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster.sharding
import scala.collection.immutable
import java.io.File
import akka.cluster.sharding.ShardRegion.Passivate
import scala.concurrent.duration._
import org.apache.commons.io.FileUtils
import com.typesafe.config.ConfigFactory
import akka.actor._
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.remote.transport.ThrottlerTransportAdapter.Direction
import akka.testkit._
import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy
import scala.concurrent.Future
import akka.util.Timeout
import akka.pattern.ask
object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = INFO
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingGracefulShutdownSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec"
"""))
case object StopEntity
class Entity extends Actor {
def receive = {
case id: Int sender() ! id
case StopEntity
context.stop(self)
}
}
val idExtractor: ShardRegion.IdExtractor = {
case id: Int (id.toString, id)
}
val shardResolver: ShardRegion.ShardResolver = msg msg match {
case id: Int id.toString
}
//#graceful-shutdown
class IllustrateGracefulShutdown extends Actor {
val system = context.system
val cluster = Cluster(system)
val region = ClusterSharding(system).shardRegion("Entity")
def receive = {
case "leave"
context.watch(region)
region ! ShardRegion.GracefulShutdown
case Terminated(`region`)
cluster.registerOnMemberRemoved(system.terminate())
cluster.leave(cluster.selfAddress)
}
}
//#graceful-shutdown
}
class ClusterShardingGracefulShutdownMultiJvmNode1 extends ClusterShardingGracefulShutdownSpec
class ClusterShardingGracefulShutdownMultiJvmNode2 extends ClusterShardingGracefulShutdownSpec
class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingGracefulShutdownSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingGracefulShutdownSpec._
override def initialParticipants = roles.size
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
runOn(first) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
override protected def afterTermination() {
runOn(first) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
Cluster(system) join node(to).address
startSharding()
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = {
val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1)
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver,
allocationStrategy,
handOffStopMessage = StopEntity)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
"Cluster sharding" must {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
runOn(first, second) {
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity].ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
}
enterBarrier("after-1")
}
"start some shards in both regions" in within(30.seconds) {
join(first, first)
join(second, first)
awaitAssert {
val p = TestProbe()
val regionAddresses = (1 to 100).map { n
region.tell(n, p.ref)
p.expectMsg(1.second, n)
p.lastSender.path.address
}.toSet
regionAddresses.size should be(2)
}
enterBarrier("after-2")
}
"gracefully shutdown a region" in within(30.seconds) {
runOn(second) {
region ! ShardRegion.GracefulShutdown
}
runOn(first) {
awaitAssert {
val p = TestProbe()
for (n 1 to 200) {
region.tell(n, p.ref)
p.expectMsg(1.second, n)
p.lastSender.path should be(region.path / n.toString / n.toString)
}
}
}
enterBarrier("handoff-completed")
runOn(second) {
watch(region)
expectTerminated(region)
}
enterBarrier("after-3")
}
}
}

View file

@ -212,7 +212,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult
bufferSize = 1000,
rememberEntries = rememberEntries,
idExtractor = idExtractor,
shardResolver = shardResolver),
shardResolver = shardResolver,
handOffStopMessage = PoisonPill),
name = typeName + "Region")
lazy val region = createRegion("counter", rememberEntries = false)

View file

@ -5,16 +5,20 @@
package akka.cluster.sharding;
import java.util.concurrent.TimeUnit;
import scala.concurrent.duration.Duration;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.actor.ReceiveTimeout;
import akka.japi.Procedure;
import akka.japi.Option;
import akka.persistence.UntypedPersistentActor;
import akka.cluster.Cluster;
import akka.japi.pf.ReceiveBuilder;
// Doc code, compile only
public class ClusterShardingTest {
@ -66,7 +70,7 @@ public class ClusterShardingTest {
//#counter-start
Option<String> roleOption = Option.none();
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter",
Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor);
//#counter-start
@ -170,4 +174,25 @@ public class ClusterShardingTest {
//#counter-actor
static//#graceful-shutdown
public class IllustrateGracefulShutdown extends AbstractActor {
public IllustrateGracefulShutdown() {
final ActorSystem system = context().system();
final Cluster cluster = Cluster.get(system);
final ActorRef region = ClusterSharding.get(system).shardRegion("Entity");
receive(ReceiveBuilder.
match(String.class, s -> s.equals("leave"), s -> {
context().watch(region);
region.tell(ShardRegion.gracefulShutdownInstance(), self());
}).
match(Terminated.class, t -> t.actor().equals(region), t -> {
cluster.registerOnMemberRemoved(() -> system.terminate());
cluster.leave(cluster.selfAddress());
}).build());
}
}
//#graceful-shutdown
}

View file

@ -197,9 +197,10 @@ That means they will start buffering incoming messages for that shard, in the sa
shard location is unknown. During the rebalance process the coordinator will not answer any
requests for the location of shards that are being rebalanced, i.e. local buffering will
continue until the handoff is completed. The ``ShardRegion`` responsible for the rebalanced shard
will stop all entries in that shard by sending ``PoisonPill`` to them. When all entries have
been terminated the ``ShardRegion`` owning the entries will acknowledge the handoff as completed
to the coordinator. Thereafter the coordinator will reply to requests for the location of
will stop all entries in that shard by sending the specified ``handOffStopMessage``
(default ``PoisonPill``) to them. When all entries have been terminated the ``ShardRegion``
owning the entries will acknowledge the handoff as completed to the coordinator.
Thereafter the coordinator will reply to requests for the location of
the shard and thereby allocate a new home for the shard and then buffered messages in the
``ShardRegion`` actors are delivered to the new location. This means that the state of the entries
are not transferred or migrated. If the state of the entries are of importance it should be
@ -274,6 +275,25 @@ using a ``Passivate``.
Note that the state of the entries themselves will not be restored unless they have been made persistent,
e.g. with ``akka-persistence``.
Graceful Shutdown
-----------------
You can send the message ``ClusterSharding.GracefulShutdown`` message (``ClusterSharding.gracefulShutdownInstance
in Java) to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``ShardRegion`` and then the
``ShardRegion`` actor will be stopped. You can ``watch`` the ``ShardRegion`` actor to know when it is completed.
During this period other regions will buffer messages for those shards in the same way as when a rebalance is
triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere.
When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``.
This is how to do it in Java:
.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown
This is how to do it in Scala:
.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown
Dependencies
------------
@ -291,7 +311,6 @@ maven::
<version>@version@</version>
</dependency>
Configuration
-------------