Renamed Gossiper to Node (and selfNode to vclockNode).
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
2c67a6d50d
commit
3c2f5ab93c
5 changed files with 137 additions and 137 deletions
|
|
@ -156,35 +156,35 @@ case class Gossip(
|
||||||
// FIXME ClusterCommandDaemon with FSM trait
|
// FIXME ClusterCommandDaemon with FSM trait
|
||||||
/**
|
/**
|
||||||
* Single instance. FSM managing the different cluster nodes states.
|
* Single instance. FSM managing the different cluster nodes states.
|
||||||
* Serialized access to Gossiper.
|
* Serialized access to Node.
|
||||||
*/
|
*/
|
||||||
final class ClusterCommandDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
|
final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor {
|
||||||
val log = Logging(system, "ClusterCommandDaemon")
|
val log = Logging(system, "ClusterCommandDaemon")
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case Join(address) ⇒ gossiper.joining(address)
|
case Join(address) ⇒ node.joining(address)
|
||||||
case Leave(address) ⇒ //gossiper.leaving(address)
|
case Leave(address) ⇒ //node.leaving(address)
|
||||||
case Down(address) ⇒ //gossiper.downing(address)
|
case Down(address) ⇒ //node.downing(address)
|
||||||
case Remove(address) ⇒ //gossiper.removing(address)
|
case Remove(address) ⇒ //node.removing(address)
|
||||||
case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Pooled and routed wit N number of configurable instances.
|
* Pooled and routed wit N number of configurable instances.
|
||||||
* Concurrent access to Gossiper.
|
* Concurrent access to Node.
|
||||||
*/
|
*/
|
||||||
final class ClusterGossipDaemon(system: ActorSystem, gossiper: Gossiper) extends Actor {
|
final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor {
|
||||||
val log = Logging(system, "ClusterGossipDaemon")
|
val log = Logging(system, "ClusterGossipDaemon")
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case GossipEnvelope(sender, gossip) ⇒ gossiper.receive(sender, gossip)
|
case GossipEnvelope(sender, gossip) ⇒ node.receive(sender, gossip)
|
||||||
case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
case unknown ⇒ log.error("Unknown message sent to cluster daemon [" + unknown + "]")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME Cluster public API should be an Extension
|
// FIXME Cluster public API should be an Extension
|
||||||
// FIXME Add cluster Node class and refactor out all non-gossip related stuff out of Gossiper
|
// FIXME Add cluster Node class and refactor out all non-gossip related stuff out of Node
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
|
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
|
||||||
|
|
@ -201,10 +201,10 @@ final class ClusterGossipDaemon(system: ActorSystem, gossiper: Gossiper) extends
|
||||||
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
|
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Represents the state for this Gossiper. Implemented using optimistic lockless concurrency,
|
* Represents the state for this Node. Implemented using optimistic lockless concurrency,
|
||||||
* all state is represented by this immutable case class and managed by an AtomicReference.
|
* all state is represented by this immutable case class and managed by an AtomicReference.
|
||||||
*/
|
*/
|
||||||
private case class State(
|
private case class State(
|
||||||
|
|
@ -216,7 +216,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
val clusterSettings = new ClusterSettings(system.settings.config, system.name)
|
||||||
|
|
||||||
val remoteAddress = remote.transport.address
|
val remoteAddress = remote.transport.address
|
||||||
val selfNode = VectorClock.Node(remoteAddress.toString)
|
val vclockNode = VectorClock.Node(remoteAddress.toString)
|
||||||
|
|
||||||
val gossipInitialDelay = clusterSettings.GossipInitialDelay
|
val gossipInitialDelay = clusterSettings.GossipInitialDelay
|
||||||
val gossipFrequency = clusterSettings.GossipFrequency
|
val gossipFrequency = clusterSettings.GossipFrequency
|
||||||
|
|
@ -234,7 +234,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
private val serialization = remote.serialization
|
private val serialization = remote.serialization
|
||||||
|
|
||||||
private val isRunning = new AtomicBoolean(true)
|
private val isRunning = new AtomicBoolean(true)
|
||||||
private val log = Logging(system, "Gossiper")
|
private val log = Logging(system, "Node")
|
||||||
private val random = SecureRandom.getInstance("SHA1PRNG")
|
private val random = SecureRandom.getInstance("SHA1PRNG")
|
||||||
|
|
||||||
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
|
// Is it right to put this guy under the /system path or should we have a top-level /cluster or something else...?
|
||||||
|
|
@ -247,13 +247,13 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
|
|
||||||
private val state = {
|
private val state = {
|
||||||
val member = Member(remoteAddress, MemberStatus.Joining)
|
val member = Member(remoteAddress, MemberStatus.Joining)
|
||||||
val gossip = Gossip(members = SortedSet.empty[Member] + member) + selfNode // add me as member and update my vector clock
|
val gossip = Gossip(members = SortedSet.empty[Member] + member) + vclockNode // add me as member and update my vector clock
|
||||||
new AtomicReference[State](State(member, gossip))
|
new AtomicReference[State](State(member, gossip))
|
||||||
}
|
}
|
||||||
|
|
||||||
import Versioned.latestVersionOf
|
import Versioned.latestVersionOf
|
||||||
|
|
||||||
log.info("Node [{}] - Starting cluster Gossiper...", remoteAddress)
|
log.info("Node [{}] - Starting cluster Node...", remoteAddress)
|
||||||
|
|
||||||
// try to join the node defined in the 'akka.cluster.node-to-join' option
|
// try to join the node defined in the 'akka.cluster.node-to-join' option
|
||||||
join()
|
join()
|
||||||
|
|
@ -295,13 +295,13 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
|
// FIXME Cheating for now. Can't just shut down. Node must first gossip an Leave command, wait for Leader to do proper Handoff and then await an Exit command before switching to Removed
|
||||||
|
|
||||||
if (isRunning.compareAndSet(true, false)) {
|
if (isRunning.compareAndSet(true, false)) {
|
||||||
log.info("Node [{}] - Shutting down Gossiper and ClusterDaemon...", remoteAddress)
|
log.info("Node [{}] - Shutting down Node and ClusterDaemon...", remoteAddress)
|
||||||
|
|
||||||
try system.stop(clusterCommandDaemon) finally {
|
try system.stop(clusterCommandDaemon) finally {
|
||||||
try system.stop(clusterGossipDaemon) finally {
|
try system.stop(clusterGossipDaemon) finally {
|
||||||
try gossipCanceller.cancel() finally {
|
try gossipCanceller.cancel() finally {
|
||||||
try scrutinizeCanceller.cancel() finally {
|
try scrutinizeCanceller.cancel() finally {
|
||||||
log.info("Node [{}] - Gossiper and ClusterDaemon shut down successfully", remoteAddress)
|
log.info("Node [{}] - Node and ClusterDaemon shut down successfully", remoteAddress)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -325,7 +325,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
|
val newMembers = localMembers + Member(node, MemberStatus.Joining) // add joining node as Joining
|
||||||
val newGossip = localGossip copy (members = newMembers)
|
val newGossip = localGossip copy (members = newMembers)
|
||||||
|
|
||||||
val versionedGossip = newGossip + selfNode
|
val versionedGossip = newGossip + vclockNode
|
||||||
val seenVersionedGossip = versionedGossip seen remoteAddress
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
||||||
|
|
||||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||||
|
|
@ -354,7 +354,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
if (remoteGossip.version <> localGossip.version) {
|
if (remoteGossip.version <> localGossip.version) {
|
||||||
// concurrent
|
// concurrent
|
||||||
val mergedGossip = merge(remoteGossip, localGossip)
|
val mergedGossip = merge(remoteGossip, localGossip)
|
||||||
val versionedMergedGossip = mergedGossip + selfNode
|
val versionedMergedGossip = mergedGossip + vclockNode
|
||||||
|
|
||||||
log.debug(
|
log.debug(
|
||||||
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
|
"Can't establish a causal relationship between \"remote\" gossip [{}] and \"local\" gossip [{}] - merging them into [{}]",
|
||||||
|
|
@ -497,7 +497,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
|
val newMembersSortedSet = SortedSet[Member](newMembersSet.toList: _*)
|
||||||
val newGossip = localGossip copy (members = newMembersSortedSet)
|
val newGossip = localGossip copy (members = newMembersSortedSet)
|
||||||
|
|
||||||
val versionedGossip = newGossip + selfNode
|
val versionedGossip = newGossip + vclockNode
|
||||||
val seenVersionedGossip = versionedGossip seen remoteAddress
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
||||||
|
|
||||||
val newState = localState copy (self = newSelf, latestGossip = seenVersionedGossip)
|
val newState = localState copy (self = newSelf, latestGossip = seenVersionedGossip)
|
||||||
|
|
@ -556,7 +556,7 @@ case class Gossiper(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
|
||||||
val newOverview = localOverview copy (unreachable = newUnreachableAddresses)
|
val newOverview = localOverview copy (unreachable = newUnreachableAddresses)
|
||||||
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
val newGossip = localGossip copy (overview = newOverview, members = newMembers)
|
||||||
|
|
||||||
val versionedGossip = newGossip + selfNode
|
val versionedGossip = newGossip + vclockNode
|
||||||
val seenVersionedGossip = versionedGossip seen remoteAddress
|
val seenVersionedGossip = versionedGossip seen remoteAddress
|
||||||
|
|
||||||
val newState = localState copy (latestGossip = seenVersionedGossip)
|
val newState = localState copy (latestGossip = seenVersionedGossip)
|
||||||
|
|
@ -22,19 +22,19 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
""") with ImplicitSender {
|
""") with ImplicitSender {
|
||||||
|
|
||||||
var gossiper1: Gossiper = _
|
var node1: Node = _
|
||||||
var gossiper2: Gossiper = _
|
var node2: Node = _
|
||||||
var gossiper3: Gossiper = _
|
var node3: Node = _
|
||||||
|
|
||||||
var node1: ActorSystemImpl = _
|
var system1: ActorSystemImpl = _
|
||||||
var node2: ActorSystemImpl = _
|
var system2: ActorSystemImpl = _
|
||||||
var node3: ActorSystemImpl = _
|
var system3: ActorSystemImpl = _
|
||||||
|
|
||||||
try {
|
try {
|
||||||
"A Gossip-driven Failure Detector" must {
|
"A Gossip-driven Failure Detector" must {
|
||||||
|
|
||||||
// ======= NODE 1 ========
|
// ======= NODE 1 ========
|
||||||
node1 = ActorSystem("node1", ConfigFactory
|
system1 = ActorSystem("system1", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -45,13 +45,13 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper1 = Gossiper(node1, remote1)
|
node1 = Node(system1, remote1)
|
||||||
val fd1 = gossiper1.failureDetector
|
val fd1 = node1.failureDetector
|
||||||
val address1 = gossiper1.self.address
|
val address1 = node1.self.address
|
||||||
|
|
||||||
// ======= NODE 2 ========
|
// ======= NODE 2 ========
|
||||||
node2 = ActorSystem("node2", ConfigFactory
|
system2 = ActorSystem("system2", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -59,17 +59,17 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
port = 5551
|
port = 5551
|
||||||
}
|
}
|
||||||
cluster.node-to-join = "akka://node1@localhost:5550"
|
cluster.node-to-join = "akka://system1@localhost:5550"
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote2 = node2.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper2 = Gossiper(node2, remote2)
|
node2 = Node(system2, remote2)
|
||||||
val fd2 = gossiper2.failureDetector
|
val fd2 = node2.failureDetector
|
||||||
val address2 = gossiper2.self.address
|
val address2 = node2.self.address
|
||||||
|
|
||||||
// ======= NODE 3 ========
|
// ======= NODE 3 ========
|
||||||
node3 = ActorSystem("node3", ConfigFactory
|
system3 = ActorSystem("system3", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -77,17 +77,17 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
port=5552
|
port=5552
|
||||||
}
|
}
|
||||||
cluster.node-to-join = "akka://node1@localhost:5550"
|
cluster.node-to-join = "akka://system1@localhost:5550"
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote3 = node3.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper3 = Gossiper(node3, remote3)
|
node3 = Node(system3, remote3)
|
||||||
val fd3 = gossiper3.failureDetector
|
val fd3 = node3.failureDetector
|
||||||
val address3 = gossiper3.self.address
|
val address3 = node3.self.address
|
||||||
|
|
||||||
"receive gossip heartbeats so that all healthy nodes in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
"receive gossip heartbeats so that all healthy systems in the cluster are marked 'available'" taggedAs LongRunningTest in {
|
||||||
println("Let the nodes gossip for a while...")
|
println("Let the systems gossip for a while...")
|
||||||
Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
|
Thread.sleep(30.seconds.dilated.toMillis) // let them gossip for 30 seconds
|
||||||
fd1.isAvailable(address2) must be(true)
|
fd1.isAvailable(address2) must be(true)
|
||||||
fd1.isAvailable(address3) must be(true)
|
fd1.isAvailable(address3) must be(true)
|
||||||
|
|
@ -97,12 +97,12 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
fd3.isAvailable(address2) must be(true)
|
fd3.isAvailable(address2) must be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"mark node as 'unavailable' if a node in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
"mark system as 'unavailable' if a system in the cluster is shut down (and its heartbeats stops)" taggedAs LongRunningTest in {
|
||||||
// shut down node3
|
// shut down system3
|
||||||
gossiper3.shutdown()
|
|
||||||
node3.shutdown()
|
node3.shutdown()
|
||||||
println("Give the remaning nodes time to detect failure...")
|
system3.shutdown()
|
||||||
Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of node3
|
println("Give the remaning systems time to detect failure...")
|
||||||
|
Thread.sleep(30.seconds.dilated.toMillis) // give them 30 seconds to detect failure of system3
|
||||||
fd1.isAvailable(address2) must be(true)
|
fd1.isAvailable(address2) must be(true)
|
||||||
fd1.isAvailable(address3) must be(false)
|
fd1.isAvailable(address3) must be(false)
|
||||||
fd2.isAvailable(address1) must be(true)
|
fd2.isAvailable(address1) must be(true)
|
||||||
|
|
@ -116,13 +116,13 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
|
|
||||||
override def atTermination() {
|
override def atTermination() {
|
||||||
if (gossiper1 ne null) gossiper1.shutdown()
|
|
||||||
if (node1 ne null) node1.shutdown()
|
if (node1 ne null) node1.shutdown()
|
||||||
|
if (system1 ne null) system1.shutdown()
|
||||||
|
|
||||||
if (gossiper2 ne null) gossiper2.shutdown()
|
|
||||||
if (node2 ne null) node2.shutdown()
|
if (node2 ne null) node2.shutdown()
|
||||||
|
if (system2 ne null) system2.shutdown()
|
||||||
|
|
||||||
if (gossiper3 ne null) gossiper3.shutdown()
|
|
||||||
if (node3 ne null) node3.shutdown()
|
if (node3 ne null) node3.shutdown()
|
||||||
|
if (system3 ne null) system3.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -22,18 +22,18 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
""") with ImplicitSender {
|
""") with ImplicitSender {
|
||||||
|
|
||||||
var gossiper0: Gossiper = _
|
var node0: Node = _
|
||||||
var gossiper1: Gossiper = _
|
var node1: Node = _
|
||||||
var gossiper2: Gossiper = _
|
var node2: Node = _
|
||||||
|
|
||||||
var node0: ActorSystemImpl = _
|
var system0: ActorSystemImpl = _
|
||||||
var node1: ActorSystemImpl = _
|
var system1: ActorSystemImpl = _
|
||||||
var node2: ActorSystemImpl = _
|
var system2: ActorSystemImpl = _
|
||||||
|
|
||||||
try {
|
try {
|
||||||
"A set of connected cluster nodes" must {
|
"A set of connected cluster systems" must {
|
||||||
"(when two nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
"(when two systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
||||||
node0 = ActorSystem("node0", ConfigFactory
|
system0 = ActorSystem("system0", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -44,10 +44,10 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote0 = node0.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper0 = Gossiper(node0, remote0)
|
node0 = Node(system0, remote0)
|
||||||
|
|
||||||
node1 = ActorSystem("node1", ConfigFactory
|
system1 = ActorSystem("system1", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -55,21 +55,21 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
port=5551
|
port=5551
|
||||||
}
|
}
|
||||||
cluster.node-to-join = "akka://node0@localhost:5550"
|
cluster.node-to-join = "akka://system0@localhost:5550"
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper1 = Gossiper(node1, remote1)
|
node1 = Node(system1, remote1)
|
||||||
|
|
||||||
val latch = new CountDownLatch(2)
|
val latch = new CountDownLatch(2)
|
||||||
|
|
||||||
gossiper0.registerListener(new MembershipChangeListener {
|
node0.registerListener(new MembershipChangeListener {
|
||||||
def notify(members: SortedSet[Member]) {
|
def notify(members: SortedSet[Member]) {
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
gossiper1.registerListener(new MembershipChangeListener {
|
node1.registerListener(new MembershipChangeListener {
|
||||||
def notify(members: SortedSet[Member]) {
|
def notify(members: SortedSet[Member]) {
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
}
|
}
|
||||||
|
|
@ -80,14 +80,14 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
Thread.sleep(10.seconds.dilated.toMillis)
|
||||||
|
|
||||||
// check cluster convergence
|
// check cluster convergence
|
||||||
gossiper0.convergence must be('defined)
|
node0.convergence must be('defined)
|
||||||
gossiper1.convergence must be('defined)
|
node1.convergence must be('defined)
|
||||||
}
|
}
|
||||||
|
|
||||||
"(when three nodes) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
"(when three systems) after cluster convergence updates the membership table then all MembershipChangeListeners should be triggered" taggedAs LongRunningTest in {
|
||||||
|
|
||||||
// ======= NODE 2 ========
|
// ======= NODE 2 ========
|
||||||
node2 = ActorSystem("node2", ConfigFactory
|
system2 = ActorSystem("system2", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -95,25 +95,25 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
port=5552
|
port=5552
|
||||||
}
|
}
|
||||||
cluster.node-to-join = "akka://node0@localhost:5550"
|
cluster.node-to-join = "akka://system0@localhost:5550"
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote2 = node2.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper2 = Gossiper(node2, remote2)
|
node2 = Node(system2, remote2)
|
||||||
|
|
||||||
val latch = new CountDownLatch(3)
|
val latch = new CountDownLatch(3)
|
||||||
gossiper0.registerListener(new MembershipChangeListener {
|
node0.registerListener(new MembershipChangeListener {
|
||||||
def notify(members: SortedSet[Member]) {
|
def notify(members: SortedSet[Member]) {
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
gossiper1.registerListener(new MembershipChangeListener {
|
node1.registerListener(new MembershipChangeListener {
|
||||||
def notify(members: SortedSet[Member]) {
|
def notify(members: SortedSet[Member]) {
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
gossiper2.registerListener(new MembershipChangeListener {
|
node2.registerListener(new MembershipChangeListener {
|
||||||
def notify(members: SortedSet[Member]) {
|
def notify(members: SortedSet[Member]) {
|
||||||
latch.countDown()
|
latch.countDown()
|
||||||
}
|
}
|
||||||
|
|
@ -124,9 +124,9 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
Thread.sleep(10.seconds.dilated.toMillis)
|
||||||
|
|
||||||
// check cluster convergence
|
// check cluster convergence
|
||||||
gossiper0.convergence must be('defined)
|
node0.convergence must be('defined)
|
||||||
gossiper1.convergence must be('defined)
|
node1.convergence must be('defined)
|
||||||
gossiper2.convergence must be('defined)
|
node2.convergence must be('defined)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -136,13 +136,13 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
|
|
||||||
override def atTermination() {
|
override def atTermination() {
|
||||||
if (gossiper0 ne null) gossiper0.shutdown()
|
|
||||||
if (node0 ne null) node0.shutdown()
|
if (node0 ne null) node0.shutdown()
|
||||||
|
if (system0 ne null) system0.shutdown()
|
||||||
|
|
||||||
if (gossiper1 ne null) gossiper1.shutdown()
|
|
||||||
if (node1 ne null) node1.shutdown()
|
if (node1 ne null) node1.shutdown()
|
||||||
|
if (system1 ne null) system1.shutdown()
|
||||||
|
|
||||||
if (gossiper2 ne null) gossiper2.shutdown()
|
|
||||||
if (node2 ne null) node2.shutdown()
|
if (node2 ne null) node2.shutdown()
|
||||||
|
if (system2 ne null) system2.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,20 +19,20 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
""") with ImplicitSender {
|
""") with ImplicitSender {
|
||||||
|
|
||||||
var gossiper0: Gossiper = _
|
var node0: Node = _
|
||||||
var gossiper1: Gossiper = _
|
var node1: Node = _
|
||||||
var gossiper2: Gossiper = _
|
var node2: Node = _
|
||||||
|
|
||||||
var node0: ActorSystemImpl = _
|
var system0: ActorSystemImpl = _
|
||||||
var node1: ActorSystemImpl = _
|
var system1: ActorSystemImpl = _
|
||||||
var node2: ActorSystemImpl = _
|
var system2: ActorSystemImpl = _
|
||||||
|
|
||||||
try {
|
try {
|
||||||
"A set of connected cluster nodes" must {
|
"A set of connected cluster systems" must {
|
||||||
"(when two nodes) start gossiping to each other so that both nodes gets the same gossip info" taggedAs LongRunningTest in {
|
"(when two systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
|
||||||
|
|
||||||
// ======= NODE 0 ========
|
// ======= NODE 0 ========
|
||||||
node0 = ActorSystem("node0", ConfigFactory
|
system0 = ActorSystem("system0", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -43,11 +43,11 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote0 = node0.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper0 = Gossiper(node0, remote0)
|
node0 = Node(system0, remote0)
|
||||||
|
|
||||||
// ======= NODE 1 ========
|
// ======= NODE 1 ========
|
||||||
node1 = ActorSystem("node1", ConfigFactory
|
system1 = ActorSystem("system1", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -55,27 +55,27 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
port=5551
|
port=5551
|
||||||
}
|
}
|
||||||
cluster.node-to-join = "akka://node0@localhost:5550"
|
cluster.node-to-join = "akka://system0@localhost:5550"
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper1 = Gossiper(node1, remote1)
|
node1 = Node(system1, remote1)
|
||||||
|
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
Thread.sleep(10.seconds.dilated.toMillis)
|
||||||
|
|
||||||
// check cluster convergence
|
// check cluster convergence
|
||||||
gossiper0.convergence must be('defined)
|
node0.convergence must be('defined)
|
||||||
gossiper1.convergence must be('defined)
|
node1.convergence must be('defined)
|
||||||
|
|
||||||
val members0 = gossiper0.latestGossip.members.toArray
|
val members0 = node0.latestGossip.members.toArray
|
||||||
members0.size must be(2)
|
members0.size must be(2)
|
||||||
members0(0).address.port.get must be(5550)
|
members0(0).address.port.get must be(5550)
|
||||||
members0(0).status must be(MemberStatus.Joining)
|
members0(0).status must be(MemberStatus.Joining)
|
||||||
members0(1).address.port.get must be(5551)
|
members0(1).address.port.get must be(5551)
|
||||||
members0(1).status must be(MemberStatus.Joining)
|
members0(1).status must be(MemberStatus.Joining)
|
||||||
|
|
||||||
val members1 = gossiper1.latestGossip.members.toArray
|
val members1 = node1.latestGossip.members.toArray
|
||||||
members1.size must be(2)
|
members1.size must be(2)
|
||||||
members1(0).address.port.get must be(5550)
|
members1(0).address.port.get must be(5550)
|
||||||
members1(0).status must be(MemberStatus.Joining)
|
members1(0).status must be(MemberStatus.Joining)
|
||||||
|
|
@ -83,10 +83,10 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
members1(1).status must be(MemberStatus.Joining)
|
members1(1).status must be(MemberStatus.Joining)
|
||||||
}
|
}
|
||||||
|
|
||||||
"(when three nodes) start gossiping to each other so that both nodes gets the same gossip info" taggedAs LongRunningTest in {
|
"(when three systems) start gossiping to each other so that both systems gets the same gossip info" taggedAs LongRunningTest in {
|
||||||
|
|
||||||
// ======= NODE 2 ========
|
// ======= NODE 2 ========
|
||||||
node2 = ActorSystem("node2", ConfigFactory
|
system2 = ActorSystem("system2", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -94,22 +94,22 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
hostname = localhost
|
hostname = localhost
|
||||||
port=5552
|
port=5552
|
||||||
}
|
}
|
||||||
cluster.node-to-join = "akka://node0@localhost:5550"
|
cluster.node-to-join = "akka://system0@localhost:5550"
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote2 = node2.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper2 = Gossiper(node2, remote2)
|
node2 = Node(system2, remote2)
|
||||||
|
|
||||||
Thread.sleep(10.seconds.dilated.toMillis)
|
Thread.sleep(10.seconds.dilated.toMillis)
|
||||||
|
|
||||||
// check cluster convergence
|
// check cluster convergence
|
||||||
gossiper0.convergence must be('defined)
|
node0.convergence must be('defined)
|
||||||
gossiper1.convergence must be('defined)
|
node1.convergence must be('defined)
|
||||||
gossiper2.convergence must be('defined)
|
node2.convergence must be('defined)
|
||||||
|
|
||||||
val members0 = gossiper0.latestGossip.members.toArray
|
val members0 = node0.latestGossip.members.toArray
|
||||||
val version = gossiper0.latestGossip.version
|
val version = node0.latestGossip.version
|
||||||
members0.size must be(3)
|
members0.size must be(3)
|
||||||
members0(0).address.port.get must be(5550)
|
members0(0).address.port.get must be(5550)
|
||||||
members0(0).status must be(MemberStatus.Joining)
|
members0(0).status must be(MemberStatus.Joining)
|
||||||
|
|
@ -118,7 +118,7 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
members0(2).address.port.get must be(5552)
|
members0(2).address.port.get must be(5552)
|
||||||
members0(2).status must be(MemberStatus.Joining)
|
members0(2).status must be(MemberStatus.Joining)
|
||||||
|
|
||||||
val members1 = gossiper1.latestGossip.members.toArray
|
val members1 = node1.latestGossip.members.toArray
|
||||||
members1.size must be(3)
|
members1.size must be(3)
|
||||||
members1(0).address.port.get must be(5550)
|
members1(0).address.port.get must be(5550)
|
||||||
members1(0).status must be(MemberStatus.Joining)
|
members1(0).status must be(MemberStatus.Joining)
|
||||||
|
|
@ -127,7 +127,7 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
members1(2).address.port.get must be(5552)
|
members1(2).address.port.get must be(5552)
|
||||||
members1(2).status must be(MemberStatus.Joining)
|
members1(2).status must be(MemberStatus.Joining)
|
||||||
|
|
||||||
val members2 = gossiper2.latestGossip.members.toArray
|
val members2 = node2.latestGossip.members.toArray
|
||||||
members2.size must be(3)
|
members2.size must be(3)
|
||||||
members2(0).address.port.get must be(5550)
|
members2(0).address.port.get must be(5550)
|
||||||
members2(0).status must be(MemberStatus.Joining)
|
members2(0).status must be(MemberStatus.Joining)
|
||||||
|
|
@ -144,13 +144,13 @@ class NodeMembershipSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
|
|
||||||
override def atTermination() {
|
override def atTermination() {
|
||||||
if (gossiper0 ne null) gossiper0.shutdown()
|
|
||||||
if (node0 ne null) node0.shutdown()
|
if (node0 ne null) node0.shutdown()
|
||||||
|
if (system0 ne null) system0.shutdown()
|
||||||
|
|
||||||
if (gossiper1 ne null) gossiper1.shutdown()
|
|
||||||
if (node1 ne null) node1.shutdown()
|
if (node1 ne null) node1.shutdown()
|
||||||
|
if (system1 ne null) system1.shutdown()
|
||||||
|
|
||||||
if (gossiper2 ne null) gossiper2.shutdown()
|
|
||||||
if (node2 ne null) node2.shutdown()
|
if (node2 ne null) node2.shutdown()
|
||||||
|
if (system2 ne null) system2.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -19,14 +19,14 @@ class NodeStartupSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
""") with ImplicitSender {
|
""") with ImplicitSender {
|
||||||
|
|
||||||
var gossiper0: Gossiper = _
|
var node0: Node = _
|
||||||
var gossiper1: Gossiper = _
|
var node1: Node = _
|
||||||
var node0: ActorSystemImpl = _
|
var system0: ActorSystemImpl = _
|
||||||
var node1: ActorSystemImpl = _
|
var system1: ActorSystemImpl = _
|
||||||
|
|
||||||
try {
|
try {
|
||||||
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
|
"A first cluster node with a 'node-to-join' config set to empty string (singleton cluster)" must {
|
||||||
node0 = ActorSystem("NodeStartupSpec", ConfigFactory
|
system0 = ActorSystem("NodeStartupSpec", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -37,16 +37,16 @@ class NodeStartupSpec extends AkkaSpec("""
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote0 = node0.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper0 = Gossiper(node0, remote0)
|
node0 = Node(system0, remote0)
|
||||||
|
|
||||||
"be a singleton cluster when started up" in {
|
"be a singleton cluster when started up" in {
|
||||||
Thread.sleep(1.seconds.dilated.toMillis)
|
Thread.sleep(1.seconds.dilated.toMillis)
|
||||||
gossiper0.isSingletonCluster must be(true)
|
node0.isSingletonCluster must be(true)
|
||||||
}
|
}
|
||||||
|
|
||||||
"be in 'Up' phase when started up" in {
|
"be in 'Up' phase when started up" in {
|
||||||
val members = gossiper0.latestGossip.members
|
val members = node0.latestGossip.members
|
||||||
val joiningMember = members find (_.address.port.get == 5550)
|
val joiningMember = members find (_.address.port.get == 5550)
|
||||||
joiningMember must be('defined)
|
joiningMember must be('defined)
|
||||||
joiningMember.get.status must be(MemberStatus.Joining)
|
joiningMember.get.status must be(MemberStatus.Joining)
|
||||||
|
|
@ -55,7 +55,7 @@ class NodeStartupSpec extends AkkaSpec("""
|
||||||
|
|
||||||
"A second cluster node with a 'node-to-join' config defined" must {
|
"A second cluster node with a 'node-to-join' config defined" must {
|
||||||
"join the other node cluster as 'Joining' when sending a Join command" in {
|
"join the other node cluster as 'Joining' when sending a Join command" in {
|
||||||
node1 = ActorSystem("NodeStartupSpec", ConfigFactory
|
system1 = ActorSystem("NodeStartupSpec", ConfigFactory
|
||||||
.parseString("""
|
.parseString("""
|
||||||
akka {
|
akka {
|
||||||
actor.provider = "akka.remote.RemoteActorRefProvider"
|
actor.provider = "akka.remote.RemoteActorRefProvider"
|
||||||
|
|
@ -67,11 +67,11 @@ class NodeStartupSpec extends AkkaSpec("""
|
||||||
}""")
|
}""")
|
||||||
.withFallback(system.settings.config))
|
.withFallback(system.settings.config))
|
||||||
.asInstanceOf[ActorSystemImpl]
|
.asInstanceOf[ActorSystemImpl]
|
||||||
val remote1 = node1.provider.asInstanceOf[RemoteActorRefProvider]
|
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||||
gossiper1 = Gossiper(node1, remote1)
|
node1 = Node(system1, remote1)
|
||||||
|
|
||||||
Thread.sleep(1.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0
|
Thread.sleep(1.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0
|
||||||
val members = gossiper0.latestGossip.members
|
val members = node0.latestGossip.members
|
||||||
val joiningMember = members find (_.address.port.get == 5551)
|
val joiningMember = members find (_.address.port.get == 5551)
|
||||||
joiningMember must be('defined)
|
joiningMember must be('defined)
|
||||||
joiningMember.get.status must be(MemberStatus.Joining)
|
joiningMember.get.status must be(MemberStatus.Joining)
|
||||||
|
|
@ -84,10 +84,10 @@ class NodeStartupSpec extends AkkaSpec("""
|
||||||
}
|
}
|
||||||
|
|
||||||
override def atTermination() {
|
override def atTermination() {
|
||||||
if (gossiper0 ne null) gossiper0.shutdown()
|
|
||||||
if (node0 ne null) node0.shutdown()
|
if (node0 ne null) node0.shutdown()
|
||||||
|
if (system0 ne null) system0.shutdown()
|
||||||
|
|
||||||
if (gossiper1 ne null) gossiper1.shutdown()
|
|
||||||
if (node1 ne null) node1.shutdown()
|
if (node1 ne null) node1.shutdown()
|
||||||
|
if (system1 ne null) system1.shutdown()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue