From a62755c5daefaad3838ff1e552ba0f72877b218e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 22 Feb 2012 18:40:16 +0100 Subject: [PATCH] Turned cluster Node into an Extension. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/cluster/Node.scala | 32 ++++++++++++++++--- .../GossipingAccrualFailureDetectorSpec.scala | 6 ++-- .../akka/cluster/LeaderElectionSpec.scala | 6 ++-- .../MembershipChangeListenerSpec.scala | 6 ++-- .../akka/cluster/NodeMembershipSpec.scala | 6 ++-- .../scala/akka/cluster/NodeStartupSpec.scala | 4 +-- 6 files changed, 41 insertions(+), 19 deletions(-) diff --git a/akka-cluster/src/main/scala/akka/cluster/Node.scala b/akka-cluster/src/main/scala/akka/cluster/Node.scala index bb8bec4d31..33c1ad840e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Node.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Node.scala @@ -170,6 +170,8 @@ final class ClusterCommandDaemon(system: ActorSystem, node: Node) extends Actor } } +// FIXME create package object with implicit conversion that enables: system.node + /** * Pooled and routed wit N number of configurable instances. * Concurrent access to Node. @@ -183,7 +185,22 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor { } } -// FIXME Cluster public API should be an Extension +/** + * Node Extension Id and factory for creating Node extension. + * Example: + * {{{ + * val node = NodeExtension(system) + * + * if (node.isLeader) { ... } + * }}} + */ +object NodeExtension extends ExtensionId[Node] with ExtensionIdProvider { + override def get(system: ActorSystem): Node = super.get(system) + + override def lookup = NodeExtension + + override def createExtension(system: ExtendedActorSystem): Node = new Node(system.asInstanceOf[ActorSystemImpl]) // not nice but need API in ActorSystemImpl inside Node +} /** * This module is responsible for Gossiping cluster information. The abstraction maintains the list of live @@ -200,7 +217,12 @@ final class ClusterGossipDaemon(system: ActorSystem, node: Node) extends Actor { * gossip to random deputy with certain probability depending on number of unreachable, deputy and live members. * */ -case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { +class Node(system: ActorSystemImpl) extends Extension { + + if (!system.provider.isInstanceOf[RemoteActorRefProvider]) + throw new ConfigurationException("ActorSystem[" + system + "] needs to have a 'RemoteActorRefProvider' enabled in the configuration") + + val remote: RemoteActorRefProvider = system.provider.asInstanceOf[RemoteActorRefProvider] /** * Represents the state for this Node. Implemented using optimistic lockless concurrency, @@ -372,7 +394,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { if (!state.compareAndSet(localState, newState)) joining(node) // recur if we failed update else { if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners map { _ notify newMembers } // FIXME should check for cluster convergence before triggering listeners + newState.memberMembershipChangeListeners map { _ notify newMembers } } } } @@ -416,7 +438,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { if (!state.compareAndSet(localState, newState)) receive(sender, remoteGossip) // recur if we fail the update else { if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners map { _ notify newState.latestGossip.members } // FIXME should check for cluster convergence before triggering listeners + newState.memberMembershipChangeListeners map { _ notify newState.latestGossip.members } } } } @@ -571,7 +593,7 @@ case class Node(system: ActorSystemImpl, remote: RemoteActorRefProvider) { if (!state.compareAndSet(localState, newState)) scrutinize() // recur else { if (convergence(newState.latestGossip).isDefined) { - newState.memberMembershipChangeListeners map { _ notify newMembers } // FIXME should check for cluster convergence before triggering listeners + newState.memberMembershipChangeListeners map { _ notify newMembers } } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala index e92b21dbfb..f4deb77706 100644 --- a/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/GossipingAccrualFailureDetectorSpec.scala @@ -46,7 +46,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Node(system1, remote1) + node1 = new Node(system1) val fd1 = node1.failureDetector val address1 = node1.self.address @@ -64,7 +64,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Node(system2, remote2) + node2 = new Node(system2) val fd2 = node2.failureDetector val address2 = node2.self.address @@ -82,7 +82,7 @@ class GossipingAccrualFailureDetectorSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] - node3 = Node(system3, remote3) + node3 = new Node(system3) val fd3 = node3.failureDetector val address3 = node3.self.address diff --git a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala index dc0d8632a1..e3da64cfa0 100644 --- a/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/LeaderElectionSpec.scala @@ -46,7 +46,7 @@ class LeaderElectionSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Node(system1, remote1) + node1 = new Node(system1) val fd1 = node1.failureDetector val address1 = node1.self.address @@ -64,7 +64,7 @@ class LeaderElectionSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Node(system2, remote2) + node2 = new Node(system2) val fd2 = node2.failureDetector val address2 = node2.self.address @@ -82,7 +82,7 @@ class LeaderElectionSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote3 = system3.provider.asInstanceOf[RemoteActorRefProvider] - node3 = Node(system3, remote3) + node3 = new Node(system3) val fd3 = node3.failureDetector val address3 = node3.self.address diff --git a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala index 197fa22b71..e2487de3c8 100644 --- a/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/MembershipChangeListenerSpec.scala @@ -45,7 +45,7 @@ class MembershipChangeListenerSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] - node0 = Node(system0, remote0) + node0 = new Node(system0) system1 = ActorSystem("system1", ConfigFactory .parseString(""" @@ -60,7 +60,7 @@ class MembershipChangeListenerSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Node(system1, remote1) + node1 = new Node(system1) val latch = new CountDownLatch(2) @@ -100,7 +100,7 @@ class MembershipChangeListenerSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Node(system2, remote2) + node2 = new Node(system2) val latch = new CountDownLatch(3) node0.registerListener(new MembershipChangeListener { diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala index dc24485507..56f053fc6c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/NodeMembershipSpec.scala @@ -44,7 +44,7 @@ class NodeMembershipSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] - node0 = Node(system0, remote0) + node0 = new Node(system0) // ======= NODE 1 ======== system1 = ActorSystem("system1", ConfigFactory @@ -60,7 +60,7 @@ class NodeMembershipSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Node(system1, remote1) + node1 = new Node(system1) Thread.sleep(10.seconds.dilated.toMillis) @@ -99,7 +99,7 @@ class NodeMembershipSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote2 = system2.provider.asInstanceOf[RemoteActorRefProvider] - node2 = Node(system2, remote2) + node2 = new Node(system2) Thread.sleep(10.seconds.dilated.toMillis) diff --git a/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala b/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala index 3d98260c4d..ed4b893619 100644 --- a/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/NodeStartupSpec.scala @@ -38,7 +38,7 @@ class NodeStartupSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote0 = system0.provider.asInstanceOf[RemoteActorRefProvider] - node0 = Node(system0, remote0) + node0 = new Node(system0) "be a singleton cluster when started up" in { Thread.sleep(1.seconds.dilated.toMillis) @@ -68,7 +68,7 @@ class NodeStartupSpec extends AkkaSpec(""" .withFallback(system.settings.config)) .asInstanceOf[ActorSystemImpl] val remote1 = system1.provider.asInstanceOf[RemoteActorRefProvider] - node1 = Node(system1, remote1) + node1 = new Node(system1) Thread.sleep(1.seconds.dilated.toMillis) // give enough time for node1 to JOIN node0 val members = node0.latestGossip.members