From c991d5f1d1c7a10602333573231fb933c298f822 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 15 Apr 2015 11:07:12 +0200 Subject: [PATCH] =str #17200 Stop shard region when MemberRemoved Two issues: 1) ShardRegion actor must stop itself when the node is shutting down, ie. when receiving MemberRemoved(selfAddress) 2) ShardCoordinator must not persist anything when the node is shutting down. MemberRemoved of other shard regions will trigger Terminated, which must not be persisted, because then the next coordinator will replay those events and end up in wrong state. This is a problem announced itself when using leaving as illustrated in the new test. To solve the second issue I have added a new ClusterShuttingDown event that is published before the MemberRemoved events. Note that Terminated is triggered by MemberRemoved. (cherry picked from commit 1b272c72597beece9d93f0054f4b58e3d25f9ae2) --- .../scala/akka/cluster/ClusterEvent.scala | 12 + .../scala/akka/cluster/ClusterReadView.scala | 1 + .../ClusterDomainEventPublisherSpec.scala | 4 +- .../contrib/pattern/ClusterSharding.scala | 19 +- .../pattern/ClusterShardingFailureSpec.scala | 2 +- .../pattern/ClusterShardingLeavingSpec.scala | 207 ++++++++++++++++++ .../contrib/pattern/ClusterShardingSpec.scala | 4 +- 7 files changed, 244 insertions(+), 5 deletions(-) create mode 100644 akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index d478a4a3f5..6e154c3f5a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -171,6 +171,17 @@ object ClusterEvent { def getLeader: Address = leader orNull } + /** + * This event is published when the cluster node is shutting down, + * before the final [[MemberRemoved]] events are published. + */ + final case object ClusterShuttingDown extends ClusterDomainEvent + + /** + * Java API: get the singleton instance of [[ClusterShuttingDown]] event + */ + def getClusterShuttingDownInstance = ClusterShuttingDown + /** * Marker interface to facilitate subscription of * both [[UnreachableMember]] and [[ReachableMember]]. @@ -328,6 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto override def postStop(): Unit = { // publish the final removed state before shutting down + publish(ClusterShuttingDown) publishChanges(Gossip.empty) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index c0dfcb3e40..3501957900 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -77,6 +77,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role -> leader)) case stats: CurrentInternalStats ⇒ _latestStats = stats case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes + case ClusterShuttingDown ⇒ } case s: CurrentClusterState ⇒ _state = s } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index bff5a180c3..edba2b5b87 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -61,6 +61,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish memberSubscriber = TestProbe() system.eventStream.subscribe(memberSubscriber.ref, classOf[MemberEvent]) system.eventStream.subscribe(memberSubscriber.ref, classOf[LeaderChanged]) + system.eventStream.subscribe(memberSubscriber.ref, ClusterShuttingDown.getClass) publisher = system.actorOf(Props[ClusterDomainEventPublisher]) publisher ! PublishChanges(g0) @@ -167,8 +168,9 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish subscriber.expectNoMsg(500 millis) } - "publish Removed when stopped" in { + "publish ClusterShuttingDown and Removed when stopped" in { publisher ! PoisonPill + memberSubscriber.expectMsg(ClusterShuttingDown) memberSubscriber.expectMsg(MemberRemoved(aRemoved, Up)) } diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index fdaa3aa900..d4fe73c869 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -32,6 +32,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.Member import akka.cluster.MemberStatus import akka.pattern.ask @@ -734,7 +735,9 @@ class ShardRegion( changeMembers(membersByAge + m) case MemberRemoved(m, _) ⇒ - if (matchingRole(m)) + if (m.uniqueAddress == cluster.selfUniqueAddress) + context.stop(self) + else if (matchingRole(m)) changeMembers(membersByAge - m) case _ ⇒ unhandled(evt) @@ -1588,10 +1591,13 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick) + Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass) + override def postStop(): Unit = { super.postStop() rebalanceTask.cancel() snapshotTask.cancel() + Cluster(context.system).unsubscribe(self) } override def receiveRecover: Receive = { @@ -1750,6 +1756,17 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite //On rebalance, we send ourselves a GetShardHome message to reallocate a // shard. This recieve handles the "response" from that message. i.e. Ingores it. + case ClusterShuttingDown ⇒ + log.debug("Shutting down ShardCoordinator") + // can't stop because supervisor will start it again, + // it will soon be stopped when singleton is stopped + context.become(shuttingDown) + + case _: CurrentClusterState ⇒ + } + + def shuttingDown: Receive = { + case _ ⇒ // ignore all } def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = { diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala index 4d387db4a4..ac1f44c52e 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala @@ -67,7 +67,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig { case m @ Add(id, _) ⇒ (id, m) } - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val shardResolver: ShardRegion.ShardResolver = { case Get(id) ⇒ id.charAt(0).toString case Add(id, _) ⇒ id.charAt(0).toString } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala new file mode 100644 index 0000000000..034f112bee --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.contrib.pattern + +import java.io.File + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils + +object ClusterShardingLeavingSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingLeavingSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec" + """)) + + case class Ping(id: String) + + class Entity extends Actor { + def receive = { + case Ping(_) ⇒ sender() ! self + } + } + + case object GetLocations + case class Locations(locations: Map[String, ActorRef]) + + class ShardLocations extends Actor { + var locations: Locations = _ + def receive = { + case GetLocations ⇒ sender() ! locations + case l: Locations ⇒ locations = l + } + } + + val idExtractor: ShardRegion.IdExtractor = { + case m @ Ping(id) ⇒ (id, m) + } + + val shardResolver: ShardRegion.ShardResolver = { + case Ping(id: String) ⇒ id.charAt(0).toString + } + +} + +class ClusterShardingLeavingMultiJvmNode1 extends ClusterShardingLeavingSpec +class ClusterShardingLeavingMultiJvmNode2 extends ClusterShardingLeavingSpec +class ClusterShardingLeavingMultiJvmNode3 extends ClusterShardingLeavingSpec +class ClusterShardingLeavingMultiJvmNode4 extends ClusterShardingLeavingSpec + +class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingLeavingSpec._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + startSharding() + within(5.seconds) { + awaitAssert(cluster.state.members.exists { m ⇒ + m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up + } should be(true)) + } + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entryProps = Some(Props[Entity]), + roleOverride = None, + rememberEntries = false, + idExtractor = idExtractor, + shardResolver = shardResolver) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + "Cluster sharding with leaving member" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity].ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + + enterBarrier("after-1") + } + + "join cluster" in within(20.seconds) { + join(first, first) + join(second, first) + join(third, first) + join(fourth, first) + + enterBarrier("after-2") + } + + "initialize shards" in { + runOn(first) { + val shardLocations = system.actorOf(Props[ShardLocations], "shardLocations") + val locations = (for (n ← 1 to 10) yield { + val id = n.toString + region ! Ping(id) + id -> expectMsgType[ActorRef] + }).toMap + shardLocations ! Locations(locations) + } + enterBarrier("after-3") + } + + "recover after leaving coordinator node" in within(30.seconds) { + runOn(third) { + cluster.leave(node(first).address) + } + + runOn(first) { + watch(region) + expectTerminated(region, 5.seconds) + } + enterBarrier("stopped") + + runOn(second, third, fourth) { + system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations + val Locations(locations) = expectMsgType[Locations] + val firstAddress = node(first).address + awaitAssert { + val probe = TestProbe() + locations.foreach { + case (id, ref) ⇒ + region.tell(Ping(id), probe.ref) + if (ref.path.address == firstAddress) + probe.expectMsgType[ActorRef](1.second) should not be (ref) + else + probe.expectMsg(1.second, ref) // should not move + } + } + } + + enterBarrier("after-4") + } + + } +} + diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index eb43ea15bf..644b4fc817 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -116,7 +116,7 @@ object ClusterShardingSpec extends MultiNodeConfig { val numberOfShards = 12 - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val shardResolver: ShardRegion.ShardResolver = { case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString } @@ -135,7 +135,7 @@ object ClusterShardingDocCode { val numberOfShards = 100 - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val shardResolver: ShardRegion.ShardResolver = { case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString }