From 1be5bb48df89fb73b6270d5ed01eea76362907da Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Tue, 17 Feb 2015 20:17:06 +0100 Subject: [PATCH] +cls #16050 Support graceful shutdown of ShardRegion * possibility to define handOffStopMessage that is used for stopping the entries, both when graceful shutdown and rebalance --- .../cluster/sharding/ClusterSharding.scala | 164 +++++++++++---- ...terShardingCustomShardAllocationSpec.scala | 3 +- .../ClusterShardingGracefulShutdownSpec.scala | 197 ++++++++++++++++++ .../sharding/ClusterShardingSpec.scala | 3 +- .../cluster/sharding/ClusterShardingTest.java | 29 ++- akka-docs/rst/scala/cluster-sharding.rst | 27 ++- 6 files changed, 374 insertions(+), 49 deletions(-) create mode 100644 akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala diff --git a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala index f5e679d691..ce01d33609 100644 --- a/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala +++ b/akka-cluster-sharding/src/main/scala/akka/cluster/sharding/ClusterSharding.scala @@ -230,6 +230,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * that passed the `idExtractor` will be used * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic + * @param handOffStopMessage the message that will be sent to entries when they are to be stopped + * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( @@ -239,12 +241,14 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, - allocationStrategy: ShardAllocationStrategy): ActorRef = { + allocationStrategy: ShardAllocationStrategy, + handOffStopMessage: Any): ActorRef = { val resolvedRole = if (roleOverride == None) Role else roleOverride implicit val timeout = system.settings.CreationTimeout - val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, allocationStrategy) + val startMsg = Start(typeName, entryProps, roleOverride, rememberEntries, + idExtractor, shardResolver, allocationStrategy, handOffStopMessage) val Started(shardRegion) = Await.result(guardian ? startMsg, timeout.duration) regions.put(typeName, shardRegion) shardRegion @@ -256,7 +260,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] - * is used. + * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. @@ -286,7 +290,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { shardResolver: ShardRegion.ShardResolver): ActorRef = { start(typeName, entryProps, roleOverride, rememberEntries, idExtractor, shardResolver, - new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) + new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance), + PoisonPill) } /** @@ -310,6 +315,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * entry from the incoming message * @param allocationStrategy possibility to use a custom shard allocation and * rebalancing logic + * @param handOffStopMessage the message that will be sent to entries when they are to be stopped + * for a rebalance or graceful shutdown of a `ShardRegion`, e.g. `PoisonPill`. * @return the actor ref of the [[ShardRegion]] that is to be responsible for the shard */ def start( @@ -318,7 +325,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { roleOverride: Option[String], rememberEntries: Boolean, messageExtractor: ShardRegion.MessageExtractor, - allocationStrategy: ShardAllocationStrategy): ActorRef = { + allocationStrategy: ShardAllocationStrategy, + handOffStopMessage: Any): ActorRef = { start(typeName, entryProps = Option(entryProps), roleOverride, rememberEntries, idExtractor = { @@ -326,7 +334,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) }, shardResolver = msg ⇒ messageExtractor.shardId(msg), - allocationStrategy = allocationStrategy) + allocationStrategy = allocationStrategy, + handOffStopMessage = handOffStopMessage) } /** @@ -335,7 +344,7 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { * for this type can later be retrieved with the [[#shardRegion]] method. * * The default shard allocation strategy [[ShardCoordinator.LeastShardAllocationStrategy]] - * is used. + * is used. [[akka.actor.PoisonPill]] is used as `handOffStopMessage`. * * Some settings can be configured as described in the `akka.cluster.sharding` section * of the `reference.conf`. @@ -361,7 +370,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { messageExtractor: ShardRegion.MessageExtractor): ActorRef = { start(typeName, entryProps, roleOverride, rememberEntries, messageExtractor, - new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance)) + new LeastShardAllocationStrategy(LeastShardAllocationRebalanceThreshold, LeastShardAllocationMaxSimultaneousRebalance), + PoisonPill) } /** @@ -382,7 +392,8 @@ class ClusterSharding(system: ExtendedActorSystem) extends Extension { private[akka] object ClusterShardingGuardian { import ShardCoordinator.ShardAllocationStrategy final case class Start(typeName: String, entryProps: Option[Props], role: Option[String], rememberEntries: Boolean, - idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, allocationStrategy: ShardAllocationStrategy) + idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver, + allocationStrategy: ShardAllocationStrategy, handOffStopMessage: Any) extends NoSerializationVerificationNeeded final case class Started(shardRegion: ActorRef) extends NoSerializationVerificationNeeded } @@ -399,7 +410,7 @@ private[akka] class ClusterShardingGuardian extends Actor { import sharding.Settings._ def receive = { - case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy) ⇒ + case Start(typeName, entryProps, role, rememberEntries, idExtractor, shardResolver, allocationStrategy, handOffStopMessage) ⇒ val encName = URLEncoder.encode(typeName, "utf-8") val coordinatorSingletonManagerName = encName + "Coordinator" val coordinatorPath = (self.path / coordinatorSingletonManagerName / "singleton" / "coordinator").toStringWithoutAddress @@ -430,7 +441,8 @@ private[akka] class ClusterShardingGuardian extends Actor { bufferSize = BufferSize, rememberEntries = rememberEntries, idExtractor = idExtractor, - shardResolver = shardResolver), + shardResolver = shardResolver, + handOffStopMessage = handOffStopMessage), name = encName) } sender() ! Started(shardRegion) @@ -459,8 +471,10 @@ object ShardRegion { bufferSize: Int, rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): Props = - Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver)) + shardResolver: ShardRegion.ShardResolver, + handOffStopMessage: Any): Props = + Props(new ShardRegion(typeName, entryProps, role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, + snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)) /** * Scala API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. @@ -477,8 +491,10 @@ object ShardRegion { bufferSize: Int, rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): Props = - props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver) + shardResolver: ShardRegion.ShardResolver, + handOffStopMessage: Any): Props = + props(typeName, Some(entryProps), role, coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, + snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage) /** * Java API: Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor. @@ -494,14 +510,17 @@ object ShardRegion { snapshotInterval: FiniteDuration, bufferSize: Int, rememberEntries: Boolean, - messageExtractor: ShardRegion.MessageExtractor): Props = { + messageExtractor: ShardRegion.MessageExtractor, + handOffStopMessage: Any): Props = { - props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, + props(typeName, Option(entryProps), roleOption(role), coordinatorPath, retryInterval, shardFailureBackoff, + entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor = { case msg if messageExtractor.entryId(msg) ne null ⇒ (messageExtractor.entryId(msg), messageExtractor.entryMessage(msg)) }: ShardRegion.IdExtractor, - shardResolver = msg ⇒ messageExtractor.shardId(msg)) + shardResolver = msg ⇒ messageExtractor.shardId(msg), + handOffStopMessage = handOffStopMessage) } /** @@ -516,7 +535,8 @@ object ShardRegion { bufferSize: Int, idExtractor: ShardRegion.IdExtractor, shardResolver: ShardRegion.ShardResolver): Props = - Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero, Duration.Zero, bufferSize, false, idExtractor, shardResolver)) + Props(new ShardRegion(typeName, None, role, coordinatorPath, retryInterval, Duration.Zero, Duration.Zero, + Duration.Zero, bufferSize, false, idExtractor, shardResolver, PoisonPill)) /** * Java API: : Factory method for the [[akka.actor.Props]] of the [[ShardRegion]] actor @@ -611,21 +631,36 @@ object ShardRegion { */ @SerialVersionUID(1L) final case class Passivate(stopMessage: Any) extends ShardRegionCommand + /* + * Send this message to the `ShardRegion` actor to handoff all shards that are hosted by + * the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch` + * it to know when it is completed. + */ + @SerialVersionUID(1L) final case object GracefulShutdown extends ShardRegionCommand + + /** + * Java API: Send this message to the `ShardRegion` actor to handoff all shards that are hosted by + * the `ShardRegion` and then the `ShardRegion` actor will be stopped. You can `watch` + * it to know when it is completed. + */ + def gracefulShutdownInstance = GracefulShutdown + private case object Retry extends ShardRegionCommand private def roleOption(role: String): Option[String] = if (role == "") None else Option(role) /** - * INTERNAL API. Sends `PoisonPill` to the entries and when all of them have terminated - * it replies with `ShardStopped`. + * INTERNAL API. Sends stopMessage (e.g. `PoisonPill`) to the entries and when all of + * them have terminated it replies with `ShardStopped`. */ - private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entries: Set[ActorRef]) extends Actor { + private[akka] class HandOffStopper(shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any) + extends Actor { import ShardCoordinator.Internal.ShardStopped entries.foreach { a ⇒ context watch a - a ! PoisonPill + a ! stopMessage } var remaining = entries @@ -640,8 +675,9 @@ object ShardRegion { } } - private[akka] def handOffStopperProps(shard: String, replyTo: ActorRef, entries: Set[ActorRef]): Props = - Props(new HandOffStopper(shard, replyTo, entries)) + private[akka] def handOffStopperProps( + shard: String, replyTo: ActorRef, entries: Set[ActorRef], stopMessage: Any): Props = + Props(new HandOffStopper(shard, replyTo, entries, stopMessage)) } /** @@ -663,7 +699,8 @@ class ShardRegion( bufferSize: Int, rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver) extends Actor with ActorLogging { + shardResolver: ShardRegion.ShardResolver, + handOffStopMessage: Any) extends Actor with ActorLogging { import ShardCoordinator.Internal._ import ShardRegion._ @@ -680,6 +717,7 @@ class ShardRegion( var shards = Map.empty[ShardId, ActorRef] var shardsByRef = Map.empty[ActorRef, ShardId] var handingOff = Set.empty[ActorRef] + var gracefulShutdownInProgress = false def totalBufferSize = shardBuffers.foldLeft(0) { (sum, entry) ⇒ sum + entry._2.size } @@ -817,8 +855,15 @@ class ShardRegion( case Retry ⇒ if (coordinator.isEmpty) register() - else + else { + sendGracefulShutdownToCoordinator() requestShardBufferHomes() + } + + case GracefulShutdown ⇒ + log.debug("Starting graceful shutdown of region and all its shards") + gracefulShutdownInProgress = true + sendGracefulShutdownToCoordinator() case _ ⇒ unhandled(cmd) } @@ -844,6 +889,9 @@ class ShardRegion( } else { throw new IllegalStateException(s"Shard [$shardId] terminated while not being handed off.") } + + if (gracefulShutdownInProgress && shards.isEmpty && shardBuffers.isEmpty) + context.stop(self) // all shards have been rebalanced, complete graceful shutdown } } @@ -916,7 +964,8 @@ class ShardRegion( bufferSize, rememberEntries, idExtractor, - shardResolver), + shardResolver, + handOffStopMessage), name)) shards = shards.updated(id, shard) shardsByRef = shardsByRef.updated(shard, id) @@ -924,6 +973,10 @@ class ShardRegion( case None ⇒ throw new IllegalStateException("Shard must not be allocated to a proxy only ShardRegion") }) + + def sendGracefulShutdownToCoordinator(): Unit = + if (gracefulShutdownInProgress) + coordinator.foreach(_ ! GracefulShutdownReq(self)) } /** @@ -992,8 +1045,10 @@ private[akka] object Shard { bufferSize: Int, rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver): Props = - Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval, bufferSize, rememberEntries, idExtractor, shardResolver)) + shardResolver: ShardRegion.ShardResolver, + handOffStopMessage: Any): Props = + Props(new Shard(typeName, shardId, entryProps, shardFailureBackoff, entryRestartBackoff, snapshotInterval, + bufferSize, rememberEntries, idExtractor, shardResolver, handOffStopMessage)) } /** @@ -1014,7 +1069,8 @@ private[akka] class Shard( bufferSize: Int, rememberEntries: Boolean, idExtractor: ShardRegion.IdExtractor, - shardResolver: ShardRegion.ShardResolver) extends PersistentActor with ActorLogging { + shardResolver: ShardRegion.ShardResolver, + handOffStopMessage: Any) extends PersistentActor with ActorLogging { import ShardRegion.{ handOffStopperProps, EntryId, Msg, Passivate } import ShardCoordinator.Internal.{ HandOff, ShardStopped } @@ -1105,7 +1161,8 @@ private[akka] class Shard( log.debug("HandOff shard [{}]", shardId) if (state.entries.nonEmpty) { - handOffStopper = Some(context.watch(context.actorOf(handOffStopperProps(shardId, replyTo, idByRef.keySet)))) + handOffStopper = Some(context.watch(context.actorOf( + handOffStopperProps(shardId, replyTo, idByRef.keySet, handOffStopMessage)))) //During hand off we only care about watching for termination of the hand off stopper context become { @@ -1470,6 +1527,11 @@ object ShardCoordinator { */ @SerialVersionUID(1L) final case class RebalanceResult(shards: Set[ShardId]) extends CoordinatorCommand + /** + * `ShardRegion` requests full handoff to be able to shutdown gracefully. + */ + @SerialVersionUID(1L) final case class GracefulShutdownReq(shardRegion: ActorRef) extends CoordinatorCommand + // DomainEvents for the persistent state of the event sourced ShardCoordinator sealed trait DomainEvent @SerialVersionUID(1L) final case class ShardRegionRegistered(region: ActorRef) extends DomainEvent @@ -1605,6 +1667,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite var persistentState = State.empty var rebalanceInProgress = Set.empty[ShardId] var unAckedHostShards = Map.empty[ShardId, Cancellable] + // regions that have requested handoff, for graceful shutdown + var gracefulShutdownInProgress = Set.empty[ActorRef] import context.dispatcher val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) @@ -1665,7 +1729,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite log.debug("ShardRegion registered: [{}]", region) if (persistentState.regions.contains(region)) sender() ! RegisterAck(self) - else + else { + gracefulShutdownInProgress -= region persist(ShardRegionRegistered(region)) { evt ⇒ val firstRegion = persistentState.regions.isEmpty @@ -1676,6 +1741,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite if (firstRegion) allocateShardHomes() } + } case RegisterProxy(proxy) ⇒ log.debug("ShardRegion proxy registered: [{}]", proxy) @@ -1695,6 +1761,8 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite require(persistentState.regions.contains(ref), s"Terminated region $ref not registered") persistentState.regions(ref).foreach { s ⇒ self ! GetShardHome(s) } + gracefulShutdownInProgress -= ref + persist(ShardRegionTerminated(ref)) { evt ⇒ persistentState = persistentState.updated(evt) allocateShardHomes() @@ -1711,9 +1779,10 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite persistentState.shards.get(shard) match { case Some(ref) ⇒ sender() ! ShardHome(shard, ref) case None ⇒ - if (persistentState.regions.nonEmpty) { + val activeRegions = persistentState.regions -- gracefulShutdownInProgress + if (activeRegions.nonEmpty) { val getShardHomeSender = sender() - val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, persistentState.regions) + val regionFuture = allocationStrategy.allocateShard(getShardHomeSender, shard, activeRegions) regionFuture.value match { case Some(Success(region)) ⇒ continueGetShardHome(shard, region, getShardHomeSender) @@ -1771,11 +1840,24 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite rebalanceInProgress -= shard log.debug("Rebalance shard [{}] done [{}]", shard, ok) // The shard could have been removed by ShardRegionTerminated - if (ok && persistentState.shards.contains(shard)) - persist(ShardHomeDeallocated(shard)) { evt ⇒ - persistentState = persistentState.updated(evt) - log.debug("Shard [{}] deallocated", evt.shard) - allocateShardHomes() + if (persistentState.shards.contains(shard)) + if (ok) + persist(ShardHomeDeallocated(shard)) { evt ⇒ + persistentState = persistentState.updated(evt) + log.debug("Shard [{}] deallocated", evt.shard) + allocateShardHomes() + } + else // rebalance not completed, graceful shutdown will be retried + gracefulShutdownInProgress -= persistentState.shards(shard) + + case GracefulShutdownReq(region) ⇒ + if (!gracefulShutdownInProgress(region)) + persistentState.regions.get(region) match { + case Some(shards) ⇒ + log.debug("Graceful shutdown of region [{}] with shards [{}]", region, shards) + gracefulShutdownInProgress += region + continueRebalance(shards.toSet) + case None ⇒ } case SnapshotTick ⇒ @@ -1818,7 +1900,7 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite persistentState.shards.get(shard) match { case Some(ref) ⇒ getShardHomeSender ! ShardHome(shard, ref) case None ⇒ - if (persistentState.regions.contains(region)) { + if (persistentState.regions.contains(region) && !gracefulShutdownInProgress.contains(region)) { persist(ShardHomeAllocated(shard, region)) { evt ⇒ persistentState = persistentState.updated(evt) log.debug("Shard [{}] allocated at [{}]", evt.shard, evt.region) diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala index eb1221331b..327f00a2ab 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingCustomShardAllocationSpec.scala @@ -139,7 +139,8 @@ class ClusterShardingCustomShardAllocationSpec extends MultiNodeSpec(ClusterShar rememberEntries = false, idExtractor = idExtractor, shardResolver = shardResolver, - allocationStrategy = TestAllocationStrategy(allocator)) + allocationStrategy = TestAllocationStrategy(allocator), + handOffStopMessage = PoisonPill) } lazy val region = ClusterSharding(system).shardRegion("Entity") diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala new file mode 100644 index 0000000000..a3c5406f9a --- /dev/null +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala @@ -0,0 +1,197 @@ +/** + * Copyright (C) 2009-2015 Typesafe Inc. + */ +package akka.cluster.sharding + +import scala.collection.immutable +import java.io.File +import akka.cluster.sharding.ShardRegion.Passivate +import scala.concurrent.duration._ +import org.apache.commons.io.FileUtils +import com.typesafe.config.ConfigFactory +import akka.actor._ +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.remote.transport.ThrottlerTransportAdapter.Direction +import akka.testkit._ +import akka.cluster.sharding.ShardCoordinator.ShardAllocationStrategy +import scala.concurrent.Future +import akka.util.Timeout +import akka.pattern.ask + +object ClusterShardingGracefulShutdownSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = INFO + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingGracefulShutdownSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingGracefulShutdownSpec" + """)) + + case object StopEntity + + class Entity extends Actor { + def receive = { + case id: Int ⇒ sender() ! id + case StopEntity ⇒ + context.stop(self) + } + } + + val idExtractor: ShardRegion.IdExtractor = { + case id: Int ⇒ (id.toString, id) + } + + val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + case id: Int ⇒ id.toString + } + + //#graceful-shutdown + class IllustrateGracefulShutdown extends Actor { + val system = context.system + val cluster = Cluster(system) + val region = ClusterSharding(system).shardRegion("Entity") + + def receive = { + case "leave" ⇒ + context.watch(region) + region ! ShardRegion.GracefulShutdown + + case Terminated(`region`) ⇒ + cluster.registerOnMemberRemoved(system.terminate()) + cluster.leave(cluster.selfAddress) + } + } + //#graceful-shutdown + +} + +class ClusterShardingGracefulShutdownMultiJvmNode1 extends ClusterShardingGracefulShutdownSpec +class ClusterShardingGracefulShutdownMultiJvmNode2 extends ClusterShardingGracefulShutdownSpec + +class ClusterShardingGracefulShutdownSpec extends MultiNodeSpec(ClusterShardingGracefulShutdownSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingGracefulShutdownSpec._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + Cluster(system) join node(to).address + startSharding() + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + val allocationStrategy = new ShardCoordinator.LeastShardAllocationStrategy(rebalanceThreshold = 2, maxSimultaneousRebalance = 1) + ClusterSharding(system).start( + typeName = "Entity", + entryProps = Some(Props[Entity]), + roleOverride = None, + rememberEntries = false, + idExtractor = idExtractor, + shardResolver = shardResolver, + allocationStrategy, + handOffStopMessage = StopEntity) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + "Cluster sharding" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + runOn(first, second) { + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity].ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + } + + enterBarrier("after-1") + } + + "start some shards in both regions" in within(30.seconds) { + join(first, first) + join(second, first) + + awaitAssert { + val p = TestProbe() + val regionAddresses = (1 to 100).map { n ⇒ + region.tell(n, p.ref) + p.expectMsg(1.second, n) + p.lastSender.path.address + }.toSet + regionAddresses.size should be(2) + } + enterBarrier("after-2") + } + + "gracefully shutdown a region" in within(30.seconds) { + runOn(second) { + region ! ShardRegion.GracefulShutdown + } + + runOn(first) { + awaitAssert { + val p = TestProbe() + for (n ← 1 to 200) { + region.tell(n, p.ref) + p.expectMsg(1.second, n) + p.lastSender.path should be(region.path / n.toString / n.toString) + } + } + } + enterBarrier("handoff-completed") + + runOn(second) { + watch(region) + expectTerminated(region) + } + + enterBarrier("after-3") + } + + } +} + diff --git a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala index 98d05a74a5..ba9748aad2 100644 --- a/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala +++ b/akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingSpec.scala @@ -212,7 +212,8 @@ class ClusterShardingSpec extends MultiNodeSpec(ClusterShardingSpec) with STMult bufferSize = 1000, rememberEntries = rememberEntries, idExtractor = idExtractor, - shardResolver = shardResolver), + shardResolver = shardResolver, + handOffStopMessage = PoisonPill), name = typeName + "Region") lazy val region = createRegion("counter", rememberEntries = false) diff --git a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java index 2b6d5f8a08..2db3699c52 100644 --- a/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java +++ b/akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java @@ -5,16 +5,20 @@ package akka.cluster.sharding; import java.util.concurrent.TimeUnit; - import scala.concurrent.duration.Duration; + +import akka.actor.AbstractActor; import akka.actor.ActorRef; import akka.actor.ActorSystem; import akka.actor.PoisonPill; import akka.actor.Props; +import akka.actor.Terminated; import akka.actor.ReceiveTimeout; import akka.japi.Procedure; import akka.japi.Option; import akka.persistence.UntypedPersistentActor; +import akka.cluster.Cluster; +import akka.japi.pf.ReceiveBuilder; // Doc code, compile only public class ClusterShardingTest { @@ -66,7 +70,7 @@ public class ClusterShardingTest { //#counter-start Option roleOption = Option.none(); - ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", + ActorRef startedCounterRegion = ClusterSharding.get(system).start("Counter", Props.create(Counter.class), Option.java2ScalaOption(roleOption), false, messageExtractor); //#counter-start @@ -170,4 +174,25 @@ public class ClusterShardingTest { //#counter-actor + static//#graceful-shutdown + public class IllustrateGracefulShutdown extends AbstractActor { + + public IllustrateGracefulShutdown() { + final ActorSystem system = context().system(); + final Cluster cluster = Cluster.get(system); + final ActorRef region = ClusterSharding.get(system).shardRegion("Entity"); + + receive(ReceiveBuilder. + match(String.class, s -> s.equals("leave"), s -> { + context().watch(region); + region.tell(ShardRegion.gracefulShutdownInstance(), self()); + }). + match(Terminated.class, t -> t.actor().equals(region), t -> { + cluster.registerOnMemberRemoved(() -> system.terminate()); + cluster.leave(cluster.selfAddress()); + }).build()); + } + } + //#graceful-shutdown + } diff --git a/akka-docs/rst/scala/cluster-sharding.rst b/akka-docs/rst/scala/cluster-sharding.rst index 2814d409c5..a8d718405d 100644 --- a/akka-docs/rst/scala/cluster-sharding.rst +++ b/akka-docs/rst/scala/cluster-sharding.rst @@ -197,9 +197,10 @@ That means they will start buffering incoming messages for that shard, in the sa shard location is unknown. During the rebalance process the coordinator will not answer any requests for the location of shards that are being rebalanced, i.e. local buffering will continue until the handoff is completed. The ``ShardRegion`` responsible for the rebalanced shard -will stop all entries in that shard by sending ``PoisonPill`` to them. When all entries have -been terminated the ``ShardRegion`` owning the entries will acknowledge the handoff as completed -to the coordinator. Thereafter the coordinator will reply to requests for the location of +will stop all entries in that shard by sending the specified ``handOffStopMessage`` +(default ``PoisonPill``) to them. When all entries have been terminated the ``ShardRegion`` +owning the entries will acknowledge the handoff as completed to the coordinator. +Thereafter the coordinator will reply to requests for the location of the shard and thereby allocate a new home for the shard and then buffered messages in the ``ShardRegion`` actors are delivered to the new location. This means that the state of the entries are not transferred or migrated. If the state of the entries are of importance it should be @@ -274,6 +275,25 @@ using a ``Passivate``. Note that the state of the entries themselves will not be restored unless they have been made persistent, e.g. with ``akka-persistence``. +Graceful Shutdown +----------------- + +You can send the message ``ClusterSharding.GracefulShutdown`` message (``ClusterSharding.gracefulShutdownInstance +in Java) to the ``ShardRegion`` actor to handoff all shards that are hosted by that ``ShardRegion`` and then the +``ShardRegion`` actor will be stopped. You can ``watch`` the ``ShardRegion`` actor to know when it is completed. +During this period other regions will buffer messages for those shards in the same way as when a rebalance is +triggered by the coordinator. When the shards have been stopped the coordinator will allocate these shards elsewhere. + +When the ``ShardRegion`` has terminated you probably want to ``leave`` the cluster, and shut down the ``ActorSystem``. + +This is how to do it in Java: + +.. includecode:: ../../../akka-cluster-sharding/src/test/java/akka/cluster/sharding/ClusterShardingTest.java#graceful-shutdown + +This is how to do it in Scala: + +.. includecode:: ../../../akka-cluster-sharding/src/multi-jvm/scala/akka/cluster/sharding/ClusterShardingGracefulShutdownSpec.scala#graceful-shutdown + Dependencies ------------ @@ -291,7 +311,6 @@ maven:: @version@ - Configuration -------------