From 83c97d08da714718eb33a2f959fdded6b8879b12 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 20 Feb 2012 17:22:07 +0100 Subject: [PATCH] Added support for "leader election", the isLeader method and leader election tests. Also fixed bug in scrutinizer not maintaining the 'seen' map. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Node.scala | 13 +- .../akka/cluster/LeaderElectionSpec.scala | 155 ++++++++++++++++++ 2 files changed, 167 insertions(+), 1 deletion(-) create mode 100644 akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index 0eaa6b1d16..bcb9d1ecbc 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -282,6 +282,14 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { */ def self: Member = state.get.self + /** + * Is this node the leader? + */ + def isLeader: Boolean = { + val currentState = state.get + remoteAddress == currentState.latestGossip.members.head.address + } + /** * Is this node a singleton cluster? */ @@ -540,6 +548,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { val localGossip = localState.latestGossip val localOverview = localGossip.overview + val localSeen = localOverview.seen val localMembers = localGossip.members val localUnreachableAddresses = localGossip.overview.unreachable @@ -553,7 +562,9 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { log.info("Node [{}] - Marking node(s) an unreachable [{}]", remoteAddress, newlyDetectedUnreachableAddresses.mkString(", ")) - val newOverview = localOverview copy (unreachable = newUnreachableAddresses) + val newSeen = newUnreachableAddresses.foldLeft(localSeen)((currentSeen, address) ⇒ currentSeen - address) + + val newOverview = localOverview copy (seen = newSeen, unreachable = newUnreachableAddresses) val newGossip = localGossip copy (overview = newOverview, members = newMembers) val versionedGossip = newGossip + vclockNode diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala new file mode 100644 index 0000000000..dc0d8632a1 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala @@ -0,0 +1,155 @@ +/** + * Copyright (C) 2009-2012 Typesafe Inc. + */ +package akka.cluster + +import akka.testkit._ +import akka.dispatch._ +import akka.actor._ +import akka.remote._ +import akka.util.duration._ + +import com.typesafe.config._ + +import java.net.InetSocketAddress + +class LeaderElectionSpec extends AkkaSpec(""" + akka { + loglevel = "DEBUG" + actor.debug.lifecycle = on + actor.debug.autoreceive = on + cluster.failure-detector.threshold = 3 + } + """) with ImplicitSender { + + var node1: Node = _ + var node2: Node = _ + var node3: Node = _ + + var system1: ActorSystemImpl = _ + var system2: ActorSystemImpl = _ + var system3: ActorSystemImpl = _ + + try { + "A cluster of three nodes" must { + + // ======= NODE 1 ======== + system1 = ActorSystem("system1", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port=5550 + } + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] + node1 = Node(system1, remote1) + val fd1 = node1.failureDetector + val address1 = node1.self.address + + // ======= NODE 2 ======== + system2 = ActorSystem("system2", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port = 5551 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] + node2 = Node(system2, remote2) + val fd2 = node2.failureDetector + val address2 = node2.self.address + + // ======= NODE 3 ======== + system3 = ActorSystem("system3", ConfigFactory + .parseString(""" + akka { + actor.provider = "akka.remote.RemoteActorRefProvider" + remote.netty { + hostname = localhost + port=5552 + } + cluster.node-to-join = "akka://system1@localhost:5550" + }""") + .withFallback(system.settings.config)) + .asInstanceOf[ActorSystemImpl] + val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] + node3 = Node(system3, remote3) + val fd3 = node3.failureDetector + val address3 = node3.self.address + + "be able to 'elect' a single leader" taggedAs LongRunningTest in { + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds + + // check cluster convergence + node1.convergence must be('defined) + node2.convergence must be('defined) + node3.convergence must be('defined) + + // check leader + node1.isLeader must be(true) + node2.isLeader must be(false) + node3.isLeader must be(false) + } + + "be able to 're-elect' a single leader after leader has left" taggedAs LongRunningTest in { + + // shut down system1 - the leader + node1.shutdown() + system1.shutdown() + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3 + + // check cluster convergence + node2.convergence must be('defined) + node3.convergence must be('defined) + + // check leader + node2.isLeader must be(true) + node3.isLeader must be(false) + } + + "be able to 're-elect' a single leader after leader has left (again, leaving a single node)" taggedAs LongRunningTest in { + + // shut down system1 - the leader + node2.shutdown() + system2.shutdown() + + println("Give the system time to converge...") + Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3 + + // check cluster convergence + node3.convergence must be('defined) + + // check leader + node3.isLeader must be(true) + } + } + } catch { + case e: Exception ⇒ + e.printStackTrace + fail(e.toString) + } + + override def atTermination() { + if (node1 ne null) node1.shutdown() + if (system1 ne null) system1.shutdown() + + if (node2 ne null) node2.shutdown() + if (system2 ne null) system2.shutdown() + + if (node3 ne null) node3.shutdown() + if (system3 ne null) system3.shutdown() + } +}