Renamed 'Node' to 'Cluster'.
Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
parent
f7ca01a26b
commit
7c03f8df6c
10 changed files with 103 additions and 103 deletions
|
|
@ -242,21 +242,21 @@ case class Gossip(
|
|||
|
||||
/**
|
||||
* Manages routing of the different cluster commands.
|
||||
* Instantiated as a single instance for each Node - e.g. commands are serialized to Node message after message.
|
||||
* Instantiated as a single instance for each Cluster - e.g. commands are serialized to Cluster message after message.
|
||||
*/
|
||||
final class ClusterCommandDaemon extends Actor {
|
||||
import ClusterAction._
|
||||
|
||||
val node = Node(context.system)
|
||||
val cluster = Cluster(context.system)
|
||||
val log = Logging(context.system, this)
|
||||
|
||||
def receive = {
|
||||
case Join(address) ⇒ node.joining(address)
|
||||
case Up(address) ⇒ node.up(address)
|
||||
case Down(address) ⇒ node.downing(address)
|
||||
case Leave(address) ⇒ node.leaving(address)
|
||||
case Exit(address) ⇒ node.exiting(address)
|
||||
case Remove(address) ⇒ node.removing(address)
|
||||
case Join(address) ⇒ cluster.joining(address)
|
||||
case Up(address) ⇒ cluster.up(address)
|
||||
case Down(address) ⇒ cluster.downing(address)
|
||||
case Leave(address) ⇒ cluster.leaving(address)
|
||||
case Exit(address) ⇒ cluster.exiting(address)
|
||||
case Remove(address) ⇒ cluster.removing(address)
|
||||
}
|
||||
|
||||
override def unhandled(unknown: Any) = log.error("Illegal command [{}]", unknown)
|
||||
|
|
@ -264,29 +264,29 @@ final class ClusterCommandDaemon extends Actor {
|
|||
|
||||
/**
|
||||
* Pooled and routed with N number of configurable instances.
|
||||
* Concurrent access to Node.
|
||||
* Concurrent access to Cluster.
|
||||
*/
|
||||
final class ClusterGossipDaemon extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
val node = Node(context.system)
|
||||
val cluster = Cluster(context.system)
|
||||
|
||||
def receive = {
|
||||
case GossipEnvelope(sender, gossip) ⇒ node.receive(sender, gossip)
|
||||
case GossipEnvelope(sender, gossip) ⇒ cluster.receive(sender, gossip)
|
||||
}
|
||||
|
||||
override def unhandled(unknown: Any) = log.error("[/system/cluster/gossip] can not respond to messages - received [{}]", unknown)
|
||||
}
|
||||
|
||||
/**
|
||||
* Supervisor managing the different cluste daemons.
|
||||
* Supervisor managing the different Cluster daemons.
|
||||
*/
|
||||
final class ClusterDaemonSupervisor extends Actor {
|
||||
val log = Logging(context.system, this)
|
||||
val node = Node(context.system)
|
||||
val cluster = Cluster(context.system)
|
||||
|
||||
private val commands = context.actorOf(Props[ClusterCommandDaemon], "commands")
|
||||
private val gossip = context.actorOf(
|
||||
Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(node.clusterSettings.NrOfGossipDaemons)), "gossip")
|
||||
Props[ClusterGossipDaemon].withRouter(RoundRobinRouter(cluster.clusterSettings.NrOfGossipDaemons)), "gossip")
|
||||
|
||||
def receive = Actor.emptyBehavior
|
||||
|
||||
|
|
@ -294,20 +294,20 @@ final class ClusterDaemonSupervisor extends Actor {
|
|||
}
|
||||
|
||||
/**
|
||||
* Node Extension Id and factory for creating Node extension.
|
||||
* Cluster Extension Id and factory for creating Cluster extension.
|
||||
* Example:
|
||||
* {{{
|
||||
* val node = Node(system)
|
||||
* val cluster = Cluster(system)
|
||||
*
|
||||
* if (node.isLeader) { ... }
|
||||
* if (cluster.isLeader) { ... }
|
||||
* }}}
|
||||
*/
|
||||
object Node extends ExtensionId[Node] with ExtensionIdProvider {
|
||||
override def get(system: ActorSystem): Node = super.get(system)
|
||||
object Cluster extends ExtensionId[Cluster] with ExtensionIdProvider {
|
||||
override def get(system: ActorSystem): Cluster = super.get(system)
|
||||
|
||||
override def lookup = Node
|
||||
override def lookup = Cluster
|
||||
|
||||
override def createExtension(system: ExtendedActorSystem): Node = new Node(system)
|
||||
override def createExtension(system: ExtendedActorSystem): Cluster = new Cluster(system)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -327,17 +327,17 @@ object Node extends ExtensionId[Node] with ExtensionIdProvider {
|
|||
*
|
||||
* Example:
|
||||
* {{{
|
||||
* val node = Node(system)
|
||||
* val cluster = Cluster(system)
|
||||
*
|
||||
* if (node.isLeader) { ... }
|
||||
* if (cluster.isLeader) { ... }
|
||||
* }}}
|
||||
*/
|
||||
class Node(system: ExtendedActorSystem) extends Extension {
|
||||
class Cluster(system: ExtendedActorSystem) extends Extension {
|
||||
|
||||
// FIXME rename Node to Cluster
|
||||
// FIXME rename Cluster to Cluster
|
||||
|
||||
/**
|
||||
* Represents the state for this Node. Implemented using optimistic lockless concurrency.
|
||||
* Represents the state for this Cluster. Implemented using optimistic lockless concurrency.
|
||||
* All state is represented by this immutable case class and managed by an AtomicReference.
|
||||
*/
|
||||
private case class State(
|
||||
|
|
@ -376,7 +376,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
private val log = Logging(system, "Node")
|
||||
private val random = SecureRandom.getInstance("SHA1PRNG")
|
||||
|
||||
log.info("Node [{}] - is JOINING cluster...", remoteAddress)
|
||||
log.info("Cluster Node [{}] - is JOINING cluster...", remoteAddress)
|
||||
|
||||
// create superisor for daemons under path "/system/cluster"
|
||||
private val clusterDaemons = {
|
||||
|
|
@ -415,7 +415,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
leaderActions()
|
||||
}
|
||||
|
||||
log.info("Node [{}] - has JOINED cluster successfully", remoteAddress)
|
||||
log.info("Cluster Node [{}] - has JOINED cluster successfully", remoteAddress)
|
||||
|
||||
// ======================================================
|
||||
// ===================== PUBLIC API =====================
|
||||
|
|
@ -465,7 +465,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
*/
|
||||
def shutdown() {
|
||||
if (isRunning.compareAndSet(true, false)) {
|
||||
log.info("Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress)
|
||||
log.info("Cluster Node [{}] - Shutting down cluster Node and cluster daemons...", remoteAddress)
|
||||
gossipCanceller.cancel()
|
||||
failureDetectorReaperCanceller.cancel()
|
||||
leaderActionsCanceller.cancel()
|
||||
|
|
@ -533,7 +533,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
*/
|
||||
@tailrec
|
||||
private[cluster] final def joining(node: Address) {
|
||||
log.info("Node [{}] - Node [{}] is JOINING", remoteAddress, node)
|
||||
log.info("Cluster Node [{}] - Node [{}] is JOINING", remoteAddress, node)
|
||||
|
||||
val localState = state.get
|
||||
val localGossip = localState.latestGossip
|
||||
|
|
@ -566,28 +566,28 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
* State transition to UP.
|
||||
*/
|
||||
private[cluster] final def up(address: Address) {
|
||||
log.info("Node [{}] - Marking node [{}] as UP", remoteAddress, address)
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as UP", remoteAddress, address)
|
||||
}
|
||||
|
||||
/**
|
||||
* State transition to LEAVING.
|
||||
*/
|
||||
private[cluster] final def leaving(address: Address) {
|
||||
log.info("Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as LEAVING", remoteAddress, address)
|
||||
}
|
||||
|
||||
/**
|
||||
* State transition to EXITING.
|
||||
*/
|
||||
private[cluster] final def exiting(address: Address) {
|
||||
log.info("Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as EXITING", remoteAddress, address)
|
||||
}
|
||||
|
||||
/**
|
||||
* State transition to REMOVED.
|
||||
*/
|
||||
private[cluster] final def removing(address: Address) {
|
||||
log.info("Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as REMOVED", remoteAddress, address)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -612,7 +612,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
localMembers
|
||||
.map { member ⇒
|
||||
if (member.address == address) {
|
||||
log.info("Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address)
|
||||
log.info("Cluster Node [{}] - Marking node [{}] as DOWN", remoteAddress, member.address)
|
||||
val newMember = member copy (status = MemberStatus.Down)
|
||||
downedMember = Some(newMember)
|
||||
newMember
|
||||
|
|
@ -626,7 +626,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
|
||||
.map { member ⇒
|
||||
if (member.address == address) {
|
||||
log.info("Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address)
|
||||
log.info("Cluster Node [{}] - Marking unreachable node [{}] as DOWN", remoteAddress, member.address)
|
||||
member copy (status = MemberStatus.Down)
|
||||
} else member
|
||||
}
|
||||
|
|
@ -689,7 +689,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
|
||||
else {
|
||||
log.debug("Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
|
||||
log.debug("Cluster Node [{}] - Receiving gossip from [{}]", remoteAddress, sender.address)
|
||||
|
||||
failureDetector heartbeat sender.address // update heartbeat in failure detector
|
||||
|
||||
|
|
@ -705,7 +705,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
private def autoJoin() = nodeToJoin foreach { address ⇒
|
||||
val connection = clusterCommandConnectionFor(address)
|
||||
val command = ClusterAction.Join(remoteAddress)
|
||||
log.info("Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
|
||||
log.info("Cluster Node [{}] - Sending [{}] to [{}] through connection [{}]", remoteAddress, command, address, connection)
|
||||
connection ! command
|
||||
}
|
||||
|
||||
|
|
@ -717,7 +717,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
* @return the updated new state with the new member status
|
||||
*/
|
||||
private def switchMemberStatusTo(newStatus: MemberStatus, state: State): State = {
|
||||
log.info("Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
|
||||
log.info("Cluster Node [{}] - Switching membership status to [{}]", remoteAddress, newStatus)
|
||||
|
||||
val localSelf = self
|
||||
|
||||
|
|
@ -749,7 +749,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
*/
|
||||
private def gossipTo(address: Address) {
|
||||
val connection = clusterGossipConnectionFor(address)
|
||||
log.debug("Node [{}] - Gossiping to [{}]", remoteAddress, connection)
|
||||
log.debug("Cluster Node [{}] - Gossiping to [{}]", remoteAddress, connection)
|
||||
connection ! GossipEnvelope(self, latestGossip)
|
||||
}
|
||||
|
||||
|
|
@ -779,7 +779,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
if (!isSingletonCluster(localState) && isAvailable(localState)) {
|
||||
// only gossip if we are a non-singleton cluster and available
|
||||
|
||||
log.debug("Node [{}] - Initiating new round of gossip", remoteAddress)
|
||||
log.debug("Cluster Node [{}] - Initiating new round of gossip", remoteAddress)
|
||||
|
||||
val localGossip = localState.latestGossip
|
||||
val localMembers = localGossip.members
|
||||
|
|
@ -844,7 +844,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
// if we won the race then update else try again
|
||||
if (!state.compareAndSet(localState, newState)) reapUnreachableMembers() // recur
|
||||
else {
|
||||
log.info("Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", "))
|
||||
log.info("Cluster Node [{}] - Marking node(s) as UNREACHABLE [{}]", remoteAddress, newlyDetectedUnreachableMembers.mkString(", "))
|
||||
|
||||
if (convergence(newState.latestGossip).isDefined) {
|
||||
newState.memberMembershipChangeListeners foreach { _ notify newMembers }
|
||||
|
|
@ -889,14 +889,14 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
localMembers map { member ⇒
|
||||
// 1. Move JOINING => UP
|
||||
if (member.status == MemberStatus.Joining) {
|
||||
log.info("Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address)
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from JOINING to UP", remoteAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = MemberStatus.Up)
|
||||
} else member
|
||||
} map { member ⇒
|
||||
// 2. Move EXITING => REMOVED
|
||||
if (member.status == MemberStatus.Exiting) {
|
||||
log.info("Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address)
|
||||
log.info("Cluster Node [{}] - Leader is moving node [{}] from EXITING to REMOVED", remoteAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = MemberStatus.Removed)
|
||||
} else member
|
||||
|
|
@ -912,7 +912,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
localUnreachableMembers
|
||||
.filter(_.status != MemberStatus.Down) // no need to DOWN members already DOWN
|
||||
.map { member ⇒
|
||||
log.info("Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address)
|
||||
log.info("Cluster Node [{}] - Leader is marking unreachable node [{}] as DOWN", remoteAddress, member.address)
|
||||
hasChangedState = true
|
||||
member copy (status = MemberStatus.Down)
|
||||
}
|
||||
|
|
@ -966,7 +966,7 @@ class Node(system: ExtendedActorSystem) extends Extension {
|
|||
val views = Set.empty[VectorClock] ++ seen.values
|
||||
|
||||
if (views.size == 1) {
|
||||
log.debug("Node [{}] - Cluster convergence reached", remoteAddress)
|
||||
log.debug("Cluster Node [{}] - Cluster convergence reached", remoteAddress)
|
||||
Some(gossip)
|
||||
} else None
|
||||
} else None
|
||||
|
|
|
|||
|
|
@ -16,10 +16,10 @@ import java.net.InetSocketAddress
|
|||
|
||||
class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with ImplicitSender {
|
||||
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node3: Node = _
|
||||
var node4: Node = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
var node4: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
|
|
@ -38,7 +38,7 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
val fd1 = node1.failureDetector
|
||||
val address1 = node1.remoteAddress
|
||||
|
||||
|
|
@ -52,7 +52,7 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
val fd2 = node2.failureDetector
|
||||
val address2 = node2.remoteAddress
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node3 = Node(system3)
|
||||
node3 = Cluster(system3)
|
||||
val fd3 = node3.failureDetector
|
||||
val address3 = node3.remoteAddress
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ class ClientDowningSpec extends ClusterSpec("akka.cluster.auto-down = off") with
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node4 = Node(system4)
|
||||
node4 = Cluster(system4)
|
||||
val fd4 = node4.failureDetector
|
||||
val address4 = node4.remoteAddress
|
||||
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ abstract class ClusterSpec(_system: ActorSystem) extends AkkaSpec(_system) {
|
|||
|
||||
def this() = this(ActorSystem(AkkaSpec.getCallerName, ClusterSpec.testConf))
|
||||
|
||||
def awaitConvergence(nodes: Iterable[Node], maxWaitTime: Duration = 60 seconds) {
|
||||
def awaitConvergence(nodes: Iterable[Cluster], maxWaitTime: Duration = 60 seconds) {
|
||||
val deadline = maxWaitTime.fromNow
|
||||
while (nodes map (_.convergence.isDefined) exists (_ == false)) {
|
||||
if (deadline.isOverdue) throw new IllegalStateException("Convergence could no be reached within " + maxWaitTime)
|
||||
|
|
|
|||
|
|
@ -15,9 +15,9 @@ import java.net.InetSocketAddress
|
|||
|
||||
class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSender {
|
||||
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node3: Node = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
|
|
@ -32,7 +32,7 @@ class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSende
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
val fd1 = node1.failureDetector
|
||||
val address1 = node1.remoteAddress
|
||||
|
||||
|
|
@ -46,7 +46,7 @@ class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSende
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
val fd2 = node2.failureDetector
|
||||
val address2 = node2.remoteAddress
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ class GossipingAccrualFailureDetectorSpec extends ClusterSpec with ImplicitSende
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node3 = Node(system3)
|
||||
node3 = Cluster(system3)
|
||||
val fd3 = node3.failureDetector
|
||||
val address3 = node3.remoteAddress
|
||||
|
||||
|
|
|
|||
|
|
@ -16,12 +16,12 @@ import java.net.InetSocketAddress
|
|||
|
||||
class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.threshold = 5") with ImplicitSender {
|
||||
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node3: Node = _
|
||||
var node4: Node = _
|
||||
var node5: Node = _
|
||||
var node6: Node = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
var node4: Cluster = _
|
||||
var node5: Cluster = _
|
||||
var node6: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
|
|
@ -41,7 +41,7 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
|
||||
// ======= NODE 2 ========
|
||||
system2 = ActorSystem("system2", ConfigFactory
|
||||
|
|
@ -52,7 +52,7 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
|
||||
// ======= NODE 3 ========
|
||||
system3 = ActorSystem("system3", ConfigFactory
|
||||
|
|
@ -62,7 +62,7 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node3 = Node(system3)
|
||||
node3 = Cluster(system3)
|
||||
|
||||
// ======= NODE 4 ========
|
||||
system4 = ActorSystem("system4", ConfigFactory
|
||||
|
|
@ -73,7 +73,7 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node4 = Node(system4)
|
||||
node4 = Cluster(system4)
|
||||
|
||||
// ======= NODE 5 ========
|
||||
system5 = ActorSystem("system5", ConfigFactory
|
||||
|
|
@ -83,7 +83,7 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node5 = Node(system5)
|
||||
node5 = Cluster(system5)
|
||||
|
||||
// ======= NODE 6 ========
|
||||
system6 = ActorSystem("system6", ConfigFactory
|
||||
|
|
@ -94,7 +94,7 @@ class JoinTwoClustersSpec extends ClusterSpec("akka.cluster.failure-detector.thr
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node6 = Node(system6)
|
||||
node6 = Cluster(system6)
|
||||
|
||||
"be able to 'elect' a single leader after joining (A -> B)" taggedAs LongRunningTest in {
|
||||
|
||||
|
|
|
|||
|
|
@ -16,10 +16,10 @@ import java.net.InetSocketAddress
|
|||
|
||||
class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
|
||||
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node3: Node = _
|
||||
var node4: Node = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
var node4: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
|
|
@ -38,7 +38,7 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
val fd1 = node1.failureDetector
|
||||
val address1 = node1.remoteAddress
|
||||
|
||||
|
|
@ -52,7 +52,7 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
val fd2 = node2.failureDetector
|
||||
val address2 = node2.remoteAddress
|
||||
|
||||
|
|
@ -66,7 +66,7 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node3 = Node(system3)
|
||||
node3 = Cluster(system3)
|
||||
val fd3 = node3.failureDetector
|
||||
val address3 = node3.remoteAddress
|
||||
|
||||
|
|
@ -80,7 +80,7 @@ class LeaderDowningSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote4 = system4.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node4 = Node(system4)
|
||||
node4 = Cluster(system4)
|
||||
val fd4 = node4.failureDetector
|
||||
val address4 = node4.remoteAddress
|
||||
|
||||
|
|
|
|||
|
|
@ -16,9 +16,9 @@ import java.net.InetSocketAddress
|
|||
|
||||
class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
|
||||
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node3: Node = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
var node3: Cluster = _
|
||||
|
||||
var system1: ActorSystemImpl = _
|
||||
var system2: ActorSystemImpl = _
|
||||
|
|
@ -35,7 +35,7 @@ class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
val address1 = node1.remoteAddress
|
||||
|
||||
// ======= NODE 2 ========
|
||||
|
|
@ -47,7 +47,7 @@ class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
val address2 = node2.remoteAddress
|
||||
|
||||
// ======= NODE 3 ========
|
||||
|
|
@ -59,7 +59,7 @@ class LeaderElectionSpec extends ClusterSpec with ImplicitSender {
|
|||
}""")
|
||||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
node3 = Node(system3)
|
||||
node3 = Cluster(system3)
|
||||
val address3 = node3.remoteAddress
|
||||
|
||||
"be able to 'elect' a single leader" taggedAs LongRunningTest in {
|
||||
|
|
|
|||
|
|
@ -18,9 +18,9 @@ import com.typesafe.config._
|
|||
|
||||
class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
|
||||
|
||||
var node0: Node = _
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node0: Cluster = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
|
||||
var system0: ActorSystemImpl = _
|
||||
var system1: ActorSystemImpl = _
|
||||
|
|
@ -34,7 +34,7 @@ class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node0 = Node(system0)
|
||||
node0 = Cluster(system0)
|
||||
|
||||
system1 = ActorSystem("system1", ConfigFactory
|
||||
.parseString("""
|
||||
|
|
@ -45,7 +45,7 @@ class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
|
||||
val latch = new CountDownLatch(2)
|
||||
|
||||
|
|
@ -81,7 +81,7 @@ class MembershipChangeListenerSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
|
||||
val latch = new CountDownLatch(3)
|
||||
node0.registerListener(new MembershipChangeListener {
|
||||
|
|
|
|||
|
|
@ -15,9 +15,9 @@ import com.typesafe.config._
|
|||
|
||||
class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
|
||||
|
||||
var node0: Node = _
|
||||
var node1: Node = _
|
||||
var node2: Node = _
|
||||
var node0: Cluster = _
|
||||
var node1: Cluster = _
|
||||
var node2: Cluster = _
|
||||
|
||||
var system0: ActorSystemImpl = _
|
||||
var system1: ActorSystemImpl = _
|
||||
|
|
@ -33,7 +33,7 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node0 = Node(system0)
|
||||
node0 = Cluster(system0)
|
||||
|
||||
// ======= NODE 1 ========
|
||||
system1 = ActorSystem("system1", ConfigFactory
|
||||
|
|
@ -45,7 +45,7 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
|
||||
// check cluster convergence
|
||||
awaitConvergence(node0 :: node1 :: Nil)
|
||||
|
|
@ -77,7 +77,7 @@ class NodeMembershipSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node2 = Node(system2)
|
||||
node2 = Cluster(system2)
|
||||
|
||||
awaitConvergence(node0 :: node1 :: node2 :: Nil)
|
||||
|
||||
|
|
|
|||
|
|
@ -15,8 +15,8 @@ import com.typesafe.config._
|
|||
|
||||
class NodeStartupSpec extends ClusterSpec with ImplicitSender {
|
||||
|
||||
var node0: Node = _
|
||||
var node1: Node = _
|
||||
var node0: Cluster = _
|
||||
var node1: Cluster = _
|
||||
var system0: ActorSystemImpl = _
|
||||
var system1: ActorSystemImpl = _
|
||||
|
||||
|
|
@ -27,7 +27,7 @@ class NodeStartupSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node0 = Node(system0)
|
||||
node0 = Cluster(system0)
|
||||
|
||||
"be a singleton cluster when started up" taggedAs LongRunningTest in {
|
||||
Thread.sleep(1.seconds.dilated.toMillis)
|
||||
|
|
@ -53,7 +53,7 @@ class NodeStartupSpec extends ClusterSpec with ImplicitSender {
|
|||
.withFallback(system.settings.config))
|
||||
.asInstanceOf[ActorSystemImpl]
|
||||
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
|
||||
node1 = Node(system1)
|
||||
node1 = Cluster(system1)
|
||||
|
||||
Thread.sleep(10.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0 and leader to move him to UP
|
||||
val members = node0.latestGossip.members
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue