diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index d478a4a3f5..6e154c3f5a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -171,6 +171,17 @@ object ClusterEvent { def getLeader: Address = leader orNull } + /** + * This event is published when the cluster node is shutting down, + * before the final [[MemberRemoved]] events are published. + */ + final case object ClusterShuttingDown extends ClusterDomainEvent + + /** + * Java API: get the singleton instance of [[ClusterShuttingDown]] event + */ + def getClusterShuttingDownInstance = ClusterShuttingDown + /** * Marker interface to facilitate subscription of * both [[UnreachableMember]] and [[ReachableMember]]. @@ -328,6 +339,7 @@ private[cluster] final class ClusterDomainEventPublisher extends Actor with Acto override def postStop(): Unit = { // publish the final removed state before shutting down + publish(ClusterShuttingDown) publishChanges(Gossip.empty) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index c0dfcb3e40..3501957900 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -77,6 +77,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { _state = _state.copy(roleLeaderMap = _state.roleLeaderMap + (role -> leader)) case stats: CurrentInternalStats ⇒ _latestStats = stats case ClusterMetricsChanged(nodes) ⇒ _clusterMetrics = nodes + case ClusterShuttingDown ⇒ } case s: CurrentClusterState ⇒ _state = s } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala index bff5a180c3..edba2b5b87 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDomainEventPublisherSpec.scala @@ -61,6 +61,7 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish memberSubscriber = TestProbe() system.eventStream.subscribe(memberSubscriber.ref, classOf[MemberEvent]) system.eventStream.subscribe(memberSubscriber.ref, classOf[LeaderChanged]) + system.eventStream.subscribe(memberSubscriber.ref, ClusterShuttingDown.getClass) publisher = system.actorOf(Props[ClusterDomainEventPublisher]) publisher ! PublishChanges(g0) @@ -167,8 +168,9 @@ class ClusterDomainEventPublisherSpec extends AkkaSpec(ClusterDomainEventPublish subscriber.expectNoMsg(500 millis) } - "publish Removed when stopped" in { + "publish ClusterShuttingDown and Removed when stopped" in { publisher ! PoisonPill + memberSubscriber.expectMsg(ClusterShuttingDown) memberSubscriber.expectMsg(MemberRemoved(aRemoved, Up)) } diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala index fdaa3aa900..d4fe73c869 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ClusterSharding.scala @@ -32,6 +32,7 @@ import akka.cluster.ClusterEvent.CurrentClusterState import akka.cluster.ClusterEvent.MemberEvent import akka.cluster.ClusterEvent.MemberRemoved import akka.cluster.ClusterEvent.MemberUp +import akka.cluster.ClusterEvent.ClusterShuttingDown import akka.cluster.Member import akka.cluster.MemberStatus import akka.pattern.ask @@ -734,7 +735,9 @@ class ShardRegion( changeMembers(membersByAge + m) case MemberRemoved(m, _) ⇒ - if (matchingRole(m)) + if (m.uniqueAddress == cluster.selfUniqueAddress) + context.stop(self) + else if (matchingRole(m)) changeMembers(membersByAge - m) case _ ⇒ unhandled(evt) @@ -1588,10 +1591,13 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite val rebalanceTask = context.system.scheduler.schedule(rebalanceInterval, rebalanceInterval, self, RebalanceTick) val snapshotTask = context.system.scheduler.schedule(snapshotInterval, snapshotInterval, self, SnapshotTick) + Cluster(context.system).subscribe(self, ClusterShuttingDown.getClass) + override def postStop(): Unit = { super.postStop() rebalanceTask.cancel() snapshotTask.cancel() + Cluster(context.system).unsubscribe(self) } override def receiveRecover: Receive = { @@ -1750,6 +1756,17 @@ class ShardCoordinator(handOffTimeout: FiniteDuration, shardStartTimeout: Finite //On rebalance, we send ourselves a GetShardHome message to reallocate a // shard. This recieve handles the "response" from that message. i.e. Ingores it. + case ClusterShuttingDown ⇒ + log.debug("Shutting down ShardCoordinator") + // can't stop because supervisor will start it again, + // it will soon be stopped when singleton is stopped + context.become(shuttingDown) + + case _: CurrentClusterState ⇒ + } + + def shuttingDown: Receive = { + case _ ⇒ // ignore all } def sendHostShardMsg(shard: ShardId, region: ActorRef): Unit = { diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala index 4d387db4a4..ac1f44c52e 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingFailureSpec.scala @@ -67,7 +67,7 @@ object ClusterShardingFailureSpec extends MultiNodeConfig { case m @ Add(id, _) ⇒ (id, m) } - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val shardResolver: ShardRegion.ShardResolver = { case Get(id) ⇒ id.charAt(0).toString case Add(id, _) ⇒ id.charAt(0).toString } diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala new file mode 100644 index 0000000000..034f112bee --- /dev/null +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingLeavingSpec.scala @@ -0,0 +1,207 @@ +/** + * Copyright (C) 2015 Typesafe Inc. + */ +package akka.contrib.pattern + +import java.io.File + +import scala.concurrent.duration._ + +import akka.actor.Actor +import akka.actor.ActorIdentity +import akka.actor.ActorRef +import akka.actor.Identify +import akka.actor.Props +import akka.cluster.Cluster +import akka.cluster.ClusterEvent._ +import akka.cluster.MemberStatus +import akka.persistence.Persistence +import akka.persistence.journal.leveldb.SharedLeveldbJournal +import akka.persistence.journal.leveldb.SharedLeveldbStore +import akka.remote.testconductor.RoleName +import akka.remote.testkit.MultiNodeConfig +import akka.remote.testkit.MultiNodeSpec +import akka.remote.testkit.STMultiNodeSpec +import akka.testkit._ +import com.typesafe.config.ConfigFactory +import org.apache.commons.io.FileUtils + +object ClusterShardingLeavingSpec extends MultiNodeConfig { + val first = role("first") + val second = role("second") + val third = role("third") + val fourth = role("fourth") + + commonConfig(ConfigFactory.parseString(""" + akka.loglevel = DEBUG + akka.actor.provider = "akka.cluster.ClusterActorRefProvider" + akka.remote.log-remote-lifecycle-events = off + akka.cluster.auto-down-unreachable-after = 0s + akka.persistence.journal.plugin = "akka.persistence.journal.leveldb-shared" + akka.persistence.journal.leveldb-shared { + timeout = 5s + store { + native = off + dir = "target/journal-ClusterShardingLeavingSpec" + } + } + akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local" + akka.persistence.snapshot-store.local.dir = "target/snapshots-ClusterShardingLeavingSpec" + """)) + + case class Ping(id: String) + + class Entity extends Actor { + def receive = { + case Ping(_) ⇒ sender() ! self + } + } + + case object GetLocations + case class Locations(locations: Map[String, ActorRef]) + + class ShardLocations extends Actor { + var locations: Locations = _ + def receive = { + case GetLocations ⇒ sender() ! locations + case l: Locations ⇒ locations = l + } + } + + val idExtractor: ShardRegion.IdExtractor = { + case m @ Ping(id) ⇒ (id, m) + } + + val shardResolver: ShardRegion.ShardResolver = { + case Ping(id: String) ⇒ id.charAt(0).toString + } + +} + +class ClusterShardingLeavingMultiJvmNode1 extends ClusterShardingLeavingSpec +class ClusterShardingLeavingMultiJvmNode2 extends ClusterShardingLeavingSpec +class ClusterShardingLeavingMultiJvmNode3 extends ClusterShardingLeavingSpec +class ClusterShardingLeavingMultiJvmNode4 extends ClusterShardingLeavingSpec + +class ClusterShardingLeavingSpec extends MultiNodeSpec(ClusterShardingLeavingSpec) with STMultiNodeSpec with ImplicitSender { + import ClusterShardingLeavingSpec._ + + override def initialParticipants = roles.size + + val storageLocations = List( + "akka.persistence.journal.leveldb.dir", + "akka.persistence.journal.leveldb-shared.store.dir", + "akka.persistence.snapshot-store.local.dir").map(s ⇒ new File(system.settings.config.getString(s))) + + override protected def atStartup() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + override protected def afterTermination() { + runOn(first) { + storageLocations.foreach(dir ⇒ if (dir.exists) FileUtils.deleteDirectory(dir)) + } + } + + val cluster = Cluster(system) + + def join(from: RoleName, to: RoleName): Unit = { + runOn(from) { + cluster join node(to).address + startSharding() + within(5.seconds) { + awaitAssert(cluster.state.members.exists { m ⇒ + m.uniqueAddress == cluster.selfUniqueAddress && m.status == MemberStatus.Up + } should be(true)) + } + } + enterBarrier(from.name + "-joined") + } + + def startSharding(): Unit = { + ClusterSharding(system).start( + typeName = "Entity", + entryProps = Some(Props[Entity]), + roleOverride = None, + rememberEntries = false, + idExtractor = idExtractor, + shardResolver = shardResolver) + } + + lazy val region = ClusterSharding(system).shardRegion("Entity") + + "Cluster sharding with leaving member" must { + + "setup shared journal" in { + // start the Persistence extension + Persistence(system) + runOn(first) { + system.actorOf(Props[SharedLeveldbStore], "store") + } + enterBarrier("peristence-started") + + system.actorSelection(node(first) / "user" / "store") ! Identify(None) + val sharedStore = expectMsgType[ActorIdentity].ref.get + SharedLeveldbJournal.setStore(sharedStore, system) + + enterBarrier("after-1") + } + + "join cluster" in within(20.seconds) { + join(first, first) + join(second, first) + join(third, first) + join(fourth, first) + + enterBarrier("after-2") + } + + "initialize shards" in { + runOn(first) { + val shardLocations = system.actorOf(Props[ShardLocations], "shardLocations") + val locations = (for (n ← 1 to 10) yield { + val id = n.toString + region ! Ping(id) + id -> expectMsgType[ActorRef] + }).toMap + shardLocations ! Locations(locations) + } + enterBarrier("after-3") + } + + "recover after leaving coordinator node" in within(30.seconds) { + runOn(third) { + cluster.leave(node(first).address) + } + + runOn(first) { + watch(region) + expectTerminated(region, 5.seconds) + } + enterBarrier("stopped") + + runOn(second, third, fourth) { + system.actorSelection(node(first) / "user" / "shardLocations") ! GetLocations + val Locations(locations) = expectMsgType[Locations] + val firstAddress = node(first).address + awaitAssert { + val probe = TestProbe() + locations.foreach { + case (id, ref) ⇒ + region.tell(Ping(id), probe.ref) + if (ref.path.address == firstAddress) + probe.expectMsgType[ActorRef](1.second) should not be (ref) + else + probe.expectMsg(1.second, ref) // should not move + } + } + } + + enterBarrier("after-4") + } + + } +} + diff --git a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala index eb43ea15bf..644b4fc817 100644 --- a/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala +++ b/akka-contrib/src/multi-jvm/scala/akka/contrib/pattern/ClusterShardingSpec.scala @@ -116,7 +116,7 @@ object ClusterShardingSpec extends MultiNodeConfig { val numberOfShards = 12 - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val shardResolver: ShardRegion.ShardResolver = { case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString } @@ -135,7 +135,7 @@ object ClusterShardingDocCode { val numberOfShards = 100 - val shardResolver: ShardRegion.ShardResolver = msg ⇒ msg match { + val shardResolver: ShardRegion.ShardResolver = { case EntryEnvelope(id, _) ⇒ (id % numberOfShards).toString case Get(id) ⇒ (id % numberOfShards).toString }