=str #17200 Stop shard region when MemberRemoved

Two issues:

1) ShardRegion actor must stop itself when the node is shutting down,
   ie. when receiving MemberRemoved(selfAddress)
2) ShardCoordinator must not persist anything when the node is shutting
   down. MemberRemoved of other shard regions will trigger Terminated,
   which must not be persisted, because then the next coordinator will
   replay those events and end up in wrong state. This is a problem
   announced itself when using leaving as illustrated in the new test.

To solve the second issue I have added a new ClusterShuttingDown event
that is published before the MemberRemoved events. Note that Terminated
is triggered by MemberRemoved.

(cherry picked from commit 1b272c72597beece9d93f0054f4b58e3d25f9ae2)
This commit is contained in:
Patrik Nordwall 2015-04-15 11:07:12 +02:00
parent db8d02ff06
commit c991d5f1d1
7 changed files with 244 additions and 5 deletions

View file

@ -171,6 +171,17 @@ object ClusterEvent {
def getLeader: Address = leader orNull
}
/**
* This event is published when the cluster node is shutting down,
* before the final [[MemberRemoved]] events are published.
*/
final case object ClusterShuttingDown extends ClusterDomainEvent
/**
* Java API: get the singleton instance of [[ClusterShuttingDown]] event
*/
def getClusterShuttingDownInstance = ClusterShuttingDown
/**
* Marker interface to facilitate subscription of
* both [[UnreachableMember]] and [[ReachableMember]].
@ -328,6 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto
override def postStop(): Unit = {
// publish the final removed state before shutting down
publish(ClusterShuttingDown)
publishChanges(Gossip.empty)
}

View file

@ -77,6 +77,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable {
_state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role -> leader))
case stats: CurrentInternalStats _latestStats = stats
case ClusterMetricsChanged(nodes) _clusterMetrics = nodes
case ClusterShuttingDown
}
case s: CurrentClusterState _state = s
}

View file

@ -61,6 +61,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
memberSubscriber = TestProbe()
system.eventStream.subscribe(memberSubscriber.ref, classOf[MemberEvent])
system.eventStream.subscribe(memberSubscriber.ref, classOf[LeaderChanged])
system.eventStream.subscribe(memberSubscriber.ref, ClusterShuttingDown.getClass)
publisher = system.actorOf(Props[ClusterDomainEventPublisher])
publisher ! PublishChanges(g0)
@ -167,8 +168,9 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish
subscriber.expectNoMsg(500 millis)
}
"publish Removed when stopped" in {
"publish ClusterShuttingDown and Removed when stopped" in {
publisher ! PoisonPill
memberSubscriber.expectMsg(ClusterShuttingDown)
memberSubscriber.expectMsg(MemberRemoved(aRemoved, Up))
}

View file

@ -32,6 +32,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState
import akka.cluster.ClusterEvent.MemberEvent
import akka.cluster.ClusterEvent.MemberRemoved
import akka.cluster.ClusterEvent.MemberUp
import akka.cluster.ClusterEvent.ClusterShuttingDown
import akka.cluster.Member
import akka.cluster.MemberStatus
import akka.pattern.ask
@ -734,7 +735,9 @@ class ShardRegion(
changeMembers(membersByAge + m)
case MemberRemoved(m, _)
if (matchingRole(m))
if (m.uniqueAddress == cluster.selfUniqueAddress)
context.stop(self)
else if (matchingRole(m))
changeMembers(membersByAge - m)
case _ unhandled(evt)
@ -1588,10 +1591,13 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick)
val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick)
Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass)
override def postStop(): Unit = {
super.postStop()
rebalanceTask.cancel()
snapshotTask.cancel()
Cluster(context.system).unsubscribe(self)
}
override def receiveRecover: Receive = {
@ -1750,6 +1756,17 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite
//On rebalance, we send ourselves a GetShardHome message to reallocate a
// shard. This recieve handles the "response" from that message. i.e. Ingores it.
case ClusterShuttingDown
log.debug("Shutting down ShardCoordinator")
// can't stop because supervisor will start it again,
// it will soon be stopped when singleton is stopped
context.become(shuttingDown)
case _: CurrentClusterState
}
def shuttingDown: Receive = {
case _ // ignore all
}
def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = {

View file

@ -67,7 +67,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig {
case m @ Add(id, _) (id, m)
}
val shardResolver: ShardRegion.ShardResolver = msg msg match {
val shardResolver: ShardRegion.ShardResolver = {
case Get(id) id.charAt(0).toString
case Add(id, _) id.charAt(0).toString
}

View file

@ -0,0 +1,207 @@
/**
* Copyright (C) 2015 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.contrib.pattern
import java.io.File
import scala.concurrent.duration._
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
import akka.actor.Identify
import akka.actor.Props
import akka.cluster.Cluster
import akka.cluster.ClusterEvent._
import akka.cluster.MemberStatus
import akka.persistence.Persistence
import akka.persistence.journal.leveldb.SharedLeveldbJournal
import akka.persistence.journal.leveldb.SharedLeveldbStore
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
import com.typesafe.config.ConfigFactory
import org.apache.commons.io.FileUtils
object ClusterShardingLeavingSpec extends MultiNodeConfig {
val first = role("first")
val second = role("second")
val third = role("third")
val fourth = role("fourth")
commonConfig(ConfigFactory.parseString("""
akka.loglevel = DEBUG
akka.actor.provider = "akka.cluster.ClusterActorRefProvider"
akka.remote.log-remote-lifecycle-events = off
akka.cluster.auto-down-unreachable-after = 0s
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared"
akka.persistence.journal.leveldb-shared {
timeout = 5s
store {
native = off
dir = "target/journal-ClusterShardingLeavingSpec"
}
}
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec"
"""))
case class Ping(id: String)
class Entity extends Actor {
def receive = {
case Ping(_) sender() ! self
}
}
case object GetLocations
case class Locations(locations: Map[String, ActorRef])
class ShardLocations extends Actor {
var locations: Locations = _
def receive = {
case GetLocations sender() ! locations
case l: Locations locations = l
}
}
val idExtractor: ShardRegion.IdExtractor = {
case m @ Ping(id) (id, m)
}
val shardResolver: ShardRegion.ShardResolver = {
case Ping(id: String) id.charAt(0).toString
}
}
class ClusterShardingLeavingMultiJvmNode1 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingMultiJvmNode2 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingMultiJvmNode3 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingMultiJvmNode4 extends ClusterShardingLeavingSpec
class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpec) with STMultiNodeSpec with ImplicitSender {
import ClusterShardingLeavingSpec._
override def initialParticipants = roles.size
val storageLocations = List(
"akka.persistence.journal.leveldb.dir",
"akka.persistence.journal.leveldb-shared.store.dir",
"akka.persistence.snapshot-store.local.dir").map(s new File(system.settings.config.getString(s)))
override protected def atStartup() {
runOn(first) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
override protected def afterTermination() {
runOn(first) {
storageLocations.foreach(dir if (dir.exists) FileUtils.deleteDirectory(dir))
}
}
val cluster = Cluster(system)
def join(from: RoleName, to: RoleName): Unit = {
runOn(from) {
cluster join node(to).address
startSharding()
within(5.seconds) {
awaitAssert(cluster.state.members.exists { m
m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up
} should be(true))
}
}
enterBarrier(from.name + "-joined")
}
def startSharding(): Unit = {
ClusterSharding(system).start(
typeName = "Entity",
entryProps = Some(Props[Entity]),
roleOverride = None,
rememberEntries = false,
idExtractor = idExtractor,
shardResolver = shardResolver)
}
lazy val region = ClusterSharding(system).shardRegion("Entity")
"Cluster sharding with leaving member" must {
"setup shared journal" in {
// start the Persistence extension
Persistence(system)
runOn(first) {
system.actorOf(Props[SharedLeveldbStore], "store")
}
enterBarrier("peristence-started")
system.actorSelection(node(first) / "user" / "store") ! Identify(None)
val sharedStore = expectMsgType[ActorIdentity].ref.get
SharedLeveldbJournal.setStore(sharedStore, system)
enterBarrier("after-1")
}
"join cluster" in within(20.seconds) {
join(first, first)
join(second, first)
join(third, first)
join(fourth, first)
enterBarrier("after-2")
}
"initialize shards" in {
runOn(first) {
val shardLocations = system.actorOf(Props[ShardLocations], "shardLocations")
val locations = (for (n 1 to 10) yield {
val id = n.toString
region ! Ping(id)
id -> expectMsgType[ActorRef]
}).toMap
shardLocations ! Locations(locations)
}
enterBarrier("after-3")
}
"recover after leaving coordinator node" in within(30.seconds) {
runOn(third) {
cluster.leave(node(first).address)
}
runOn(first) {
watch(region)
expectTerminated(region, 5.seconds)
}
enterBarrier("stopped")
runOn(second, third, fourth) {
system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations
val Locations(locations) = expectMsgType[Locations]
val firstAddress = node(first).address
awaitAssert {
val probe = TestProbe()
locations.foreach {
case (id, ref)
region.tell(Ping(id), probe.ref)
if (ref.path.address == firstAddress)
probe.expectMsgType[ActorRef](1.second) should not be (ref)
else
probe.expectMsg(1.second, ref) // should not move
}
}
}
enterBarrier("after-4")
}
}
}

View file

@ -116,7 +116,7 @@ object ClusterShardingSpec extends MultiNodeConfig {
val numberOfShards = 12
val shardResolver: ShardRegion.ShardResolver = msg msg match {
val shardResolver: ShardRegion.ShardResolver = {
case EntryEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
}
@ -135,7 +135,7 @@ object ClusterShardingDocCode {
val numberOfShards = 100
val shardResolver: ShardRegion.ShardResolver = msg msg match {
val shardResolver: ShardRegion.ShardResolver = {
case EntryEnvelope(id, _) (id % numberOfShards).toString
case Get(id) (id % numberOfShards).toString
}