Turned cluster Node into an Extension.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2012-02-22 18:40:16 +01:00
parent e4b1d8609f
commit a62755c5da
6 changed files with 41 additions and 19 deletions

View file

@ -170,6 +170,8 @@ final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor
}
}
// FIXME create package object with implicit conversion that enables: system.node
/**
* Pooled and routed wit N number of configurable instances.
* Concurrent access to Node.
@ -183,7 +185,22 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor {
}
}
// FIXME Cluster public API should be an Extension
/**
* Node Extension Id and factory for creating Node extension.
* Example:
* {{{
* val node = NodeExtension(system)
*
* if (node.isLeader) { ... }
* }}}
*/
object NodeExtension extends ExtensionId[Node] with ExtensionIdProvider {
override def get(system: ActorSystem): Node = super.get(system)
override def lookup = NodeExtension
override def createExtension(system: ExtendedActorSystem): Node = new Node(system.asInstanceOf[ActorSystemImpl]) // not nice but need API in ActorSystemImpl inside Node
}
/**
* This module is responsible for Gossiping cluster information. The abstraction maintains the list of live
@ -200,7 +217,12 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor {
* gossip to random deputy with certain probability depending on number of unreachable, deputy and live members.
* </pre>
*/
case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
class Node(system: ActorSystemImpl) extends Extension {
if (!system.provider.isInstanceOf[RemoteActorRefProvider])
throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration")
val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider]
/**
* Represents the state for this Node. Implemented using optimistic lockless concurrency,
@ -372,7 +394,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners map { _ notify newMembers } // FIXME should check for cluster convergence before triggering listeners
newState.memberMembershipChangeListeners map { _ notify newMembers }
}
}
}
@ -416,7 +438,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners map { _ notify newState.latestGossip.members } // FIXME should check for cluster convergence before triggering listeners
newState.memberMembershipChangeListeners map { _ notify newState.latestGossip.members }
}
}
}
@ -571,7 +593,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) {
if (!state.compareAndSet(localState, newState)) scrutinize() // recur
else {
if (convergence(newState.latestGossip).isDefined) {
newState.memberMembershipChangeListeners map { _ notify newMembers } // FIXME should check for cluster convergence before triggering listeners
newState.memberMembershipChangeListeners map { _ notify newMembers }
}
}
}

View file

@ -46,7 +46,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Node(system1, remote1)
node1 = new Node(system1)
val fd1 = node1.failureDetector
val address1 = node1.self.address
@ -64,7 +64,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
node2 = Node(system2, remote2)
node2 = new Node(system2)
val fd2 = node2.failureDetector
val address2 = node2.self.address
@ -82,7 +82,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
node3 = Node(system3, remote3)
node3 = new Node(system3)
val fd3 = node3.failureDetector
val address3 = node3.self.address

View file

@ -46,7 +46,7 @@ class LeaderElectionSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Node(system1, remote1)
node1 = new Node(system1)
val fd1 = node1.failureDetector
val address1 = node1.self.address
@ -64,7 +64,7 @@ class LeaderElectionSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
node2 = Node(system2, remote2)
node2 = new Node(system2)
val fd2 = node2.failureDetector
val address2 = node2.self.address
@ -82,7 +82,7 @@ class LeaderElectionSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider]
node3 = Node(system3, remote3)
node3 = new Node(system3)
val fd3 = node3.failureDetector
val address3 = node3.self.address

View file

@ -45,7 +45,7 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
node0 = Node(system0, remote0)
node0 = new Node(system0)
system1 = ActorSystem("system1", ConfigFactory
.parseString("""
@ -60,7 +60,7 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Node(system1, remote1)
node1 = new Node(system1)
val latch = new CountDownLatch(2)
@ -100,7 +100,7 @@ class MembershipChangeListenerSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
node2 = Node(system2, remote2)
node2 = new Node(system2)
val latch = new CountDownLatch(3)
node0.registerListener(new MembershipChangeListener {

View file

@ -44,7 +44,7 @@ class NodeMembershipSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
node0 = Node(system0, remote0)
node0 = new Node(system0)
// ======= NODE 1 ========
system1 = ActorSystem("system1", ConfigFactory
@ -60,7 +60,7 @@ class NodeMembershipSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Node(system1, remote1)
node1 = new Node(system1)
Thread.sleep(10.seconds.dilated.toMillis)
@ -99,7 +99,7 @@ class NodeMembershipSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider]
node2 = Node(system2, remote2)
node2 = new Node(system2)
Thread.sleep(10.seconds.dilated.toMillis)

View file

@ -38,7 +38,7 @@ class NodeStartupSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider]
node0 = Node(system0, remote0)
node0 = new Node(system0)
"be a singleton cluster when started up" in {
Thread.sleep(1.seconds.dilated.toMillis)
@ -68,7 +68,7 @@ class NodeStartupSpec extends AkkaSpec("""
.withFallback(system.settings.config))
.asInstanceOf[ActorSystemImpl]
val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider]
node1 = Node(system1, remote1)
node1 = new Node(system1)
Thread.sleep(1.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0
val members = node0.latestGossip.members