Fixed typos in Cluster API.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-08-31 11:59:30 +02:00
parent c8d738f534
commit 8a55fc93d4
3 changed files with 238 additions and 235 deletions

View file

@ -83,7 +83,7 @@ trait ClusterNodeMBean {
def getMemberNodes: Array[String] def getMemberNodes: Array[String]
def getNodeAddres(): NodeAddress def getNodeAddress(): NodeAddress
def getLeaderLockName: String def getLeaderLockName: String
@ -111,7 +111,7 @@ trait ClusterNodeMBean {
def getConfigElementKeys: Array[String] def getConfigElementKeys: Array[String]
def getMemberShipPathFor(node: String): String def getMembershipPathFor(node: String): String
def getConfigurationPathFor(key: String): String def getConfigurationPathFor(key: String): String
@ -121,6 +121,7 @@ trait ClusterNodeMBean {
def getNodeToUuidsPathFor(node: String): String def getNodeToUuidsPathFor(node: String): String
// FIXME All MBean methods that take a UUID are useless, change to String
def getNodeToUuidsPathFor(node: String, uuid: UUID): String def getNodeToUuidsPathFor(node: String, uuid: UUID): String
def getActorAddressRegistryPathFor(actorAddress: String): String def getActorAddressRegistryPathFor(actorAddress: String): String
@ -1280,6 +1281,7 @@ class DefaultClusterNode private[akka] (
/** /**
* Update the list of connections to other nodes in the cluster. * Update the list of connections to other nodes in the cluster.
* Tail recursive, using lockless optimimistic concurrency.
* *
* @returns a Map with the remote socket addresses to of disconnected node connections * @returns a Map with the remote socket addresses to of disconnected node connections
*/ */
@ -1348,8 +1350,9 @@ class DefaultClusterNode private[akka] (
zkClient.createEphemeral(membershipNodePath, remoteServerAddress) zkClient.createEphemeral(membershipNodePath, remoteServerAddress)
} catch { } catch {
case e: ZkNodeExistsException case e: ZkNodeExistsException
e.printStackTrace
val error = new ClusterException( val error = new ClusterException(
"Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in by another node") "Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in use by another node.")
EventHandler.error(error, this, error.toString) EventHandler.error(error, this, error.toString)
throw error throw error
} }
@ -1532,7 +1535,7 @@ class DefaultClusterNode private[akka] (
override def resign() = self.resign() override def resign() = self.resign()
override def getNodeAddres = self.nodeAddress override def getNodeAddress = self.nodeAddress
override def getRemoteServerHostname = self.hostname override def getRemoteServerHostname = self.hostname
@ -1572,7 +1575,7 @@ class DefaultClusterNode private[akka] (
override def getConfigElementKeys = self.getConfigElementKeys.toArray override def getConfigElementKeys = self.getConfigElementKeys.toArray
override def getMemberShipPathFor(node: String) = self.membershipPathFor(node) override def getMembershipPathFor(node: String) = self.membershipPathFor(node)
override def getConfigurationPathFor(key: String) = self.configurationPathFor(key) override def getConfigurationPathFor(key: String) = self.configurationPathFor(key)

View file

@ -1,158 +1,158 @@
/** // /**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com> // * Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/ // */
package akka.cluster.routing.roundrobin.replicationfactor_3 // package akka.cluster.routing.roundrobin.replicationfactor_3
import org.scalatest.WordSpec // import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers // import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll // import org.scalatest.BeforeAndAfterAll
import akka.cluster._ // import akka.cluster._
import akka.actor._ // import akka.actor._
import akka.actor.Actor._ // import akka.actor.Actor._
import akka.util.duration._ // import akka.util.duration._
import akka.util.{ Duration, Timer } // import akka.util.{ Duration, Timer }
import akka.config.Config // import akka.config.Config
import akka.cluster.LocalCluster._ // import akka.cluster.LocalCluster._
import Cluster._ // import Cluster._
/** // /**
* When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible // * When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible
* for running actors, or will it be just a 'client' talking to the cluster. // * for running actors, or will it be just a 'client' talking to the cluster.
*/ // */
object RoundRobin3ReplicasMultiJvmSpec { // object RoundRobin3ReplicasMultiJvmSpec {
val NrOfNodes = 3 // val NrOfNodes = 3
class HelloWorld extends Actor with Serializable { // class HelloWorld extends Actor with Serializable {
def receive = { // def receive = {
case "Hello" // case "Hello"
self.reply("World from node [" + Config.nodename + "]") // self.reply("World from node [" + Config.nodename + "]")
} // }
} // }
} // }
/** // /**
* What is the purpose of this node? Is this just a node for the cluster to make use of? // * What is the purpose of this node? Is this just a node for the cluster to make use of?
*/ // */
class RoundRobin3ReplicasMultiJvmNode1 extends MasterClusterTestNode { // class RoundRobin3ReplicasMultiJvmNode1 extends MasterClusterTestNode {
import RoundRobin3ReplicasMultiJvmSpec._ // import RoundRobin3ReplicasMultiJvmSpec._
val testNodes = NrOfNodes // val testNodes = NrOfNodes
"Round Robin: A cluster" must { // "Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { // "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
//wait till node 1 has started. // //wait till node 1 has started.
barrier("start-node1", NrOfNodes) { // barrier("start-node1", NrOfNodes) {
Cluster.node.boot() // Cluster.node.boot()
} // }
//wait till ndoe 2 has started. // //wait till ndoe 2 has started.
barrier("start-node2", NrOfNodes).await() // barrier("start-node2", NrOfNodes).await()
//wait till node 3 has started. // //wait till node 3 has started.
barrier("start-node3", NrOfNodes).await() // barrier("start-node3", NrOfNodes).await()
//wait till an actor reference on node 2 has become available. // //wait till an actor reference on node 2 has become available.
barrier("get-ref-to-actor-on-node2", NrOfNodes) { // barrier("get-ref-to-actor-on-node2", NrOfNodes) {
val timer = Timer(30.seconds, true) // val timer = Timer(30.seconds, true)
while (timer.isTicking && !node.isInUseOnNode("service-hello")) {} // while (timer.isTicking && !node.isInUseOnNode("service-hello")) {}
} // }
//wait till the node 2 has send a message to the replica's. // //wait till the node 2 has send a message to the replica's.
barrier("send-message-from-node2-to-replicas", NrOfNodes).await() // barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown() // node.shutdown()
} // }
} // }
} // }
class RoundRobin3ReplicasMultiJvmNode2 extends ClusterTestNode { // class RoundRobin3ReplicasMultiJvmNode2 extends ClusterTestNode {
import RoundRobin3ReplicasMultiJvmSpec._ // import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._ // import Cluster._
"Round Robin: A cluster" must { // "Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { // "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
//wait till node 1 has started. // //wait till node 1 has started.
barrier("start-node1", NrOfNodes).await() // barrier("start-node1", NrOfNodes).await()
//wait till node 2 has started. // //wait till node 2 has started.
barrier("start-node2", NrOfNodes) { // barrier("start-node2", NrOfNodes) {
Cluster.node.start() // Cluster.node.start()
} // }
//wait till node 3 has started. // //wait till node 3 has started.
barrier("start-node3", NrOfNodes).await() // barrier("start-node3", NrOfNodes).await()
//check if the actorRef is the expected remoteActorRef. // //check if the actorRef is the expected remoteActorRef.
var hello: ActorRef = null // var hello: ActorRef = null
barrier("get-ref-to-actor-on-node2", NrOfNodes) { // barrier("get-ref-to-actor-on-node2", NrOfNodes) {
hello = Actor.actorOf[HelloWorld]("service-hello") // hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null) // hello must not equal (null)
hello.address must equal("service-hello") // hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true) // hello.isInstanceOf[ClusterActorRef] must be(true)
} // }
barrier("send-message-from-node2-to-replicas", NrOfNodes) { // barrier("send-message-from-node2-to-replicas", NrOfNodes) {
//todo: is there a reason to check for null again since it already has been done in the previous block. // //todo: is there a reason to check for null again since it already has been done in the previous block.
hello must not equal (null) // hello must not equal (null)
val replies = collection.mutable.Map.empty[String, Int] // val replies = collection.mutable.Map.empty[String, Int]
def count(reply: String) = { // def count(reply: String) = {
if (replies.get(reply).isEmpty) replies.put(reply, 1) // if (replies.get(reply).isEmpty) replies.put(reply, 1)
else replies.put(reply, replies(reply) + 1) // else replies.put(reply, replies(reply) + 1)
} // }
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node2")))
count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3"))) // count((hello ? "Hello").as[String].getOrElse(fail("Should have recieved reply from node3")))
replies("World from node [node1]") must equal(4) // replies("World from node [node1]") must equal(4)
replies("World from node [node2]") must equal(4) // replies("World from node [node2]") must equal(4)
replies("World from node [node3]") must equal(4) // replies("World from node [node3]") must equal(4)
} // }
node.shutdown() // node.shutdown()
} // }
} // }
} // }
class RoundRobin3ReplicasMultiJvmNode3 extends ClusterTestNode { // class RoundRobin3ReplicasMultiJvmNode3 extends ClusterTestNode {
import RoundRobin3ReplicasMultiJvmSpec._ // import RoundRobin3ReplicasMultiJvmSpec._
import Cluster._ // import Cluster._
"Round Robin: A cluster" must { // "Round Robin: A cluster" must {
"create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { // "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
barrier("start-node1", NrOfNodes).await() // barrier("start-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes).await() // barrier("start-node2", NrOfNodes).await()
barrier("start-node3", NrOfNodes) { // barrier("start-node3", NrOfNodes) {
Cluster.node.start() // Cluster.node.start()
} // }
barrier("get-ref-to-actor-on-node2", NrOfNodes) { // barrier("get-ref-to-actor-on-node2", NrOfNodes) {
val timer = Timer(30.seconds, true) // val timer = Timer(30.seconds, true)
while (timer.isTicking && !node.isInUseOnNode("service-hello")) {} // while (timer.isTicking && !node.isInUseOnNode("service-hello")) {}
} // }
barrier("send-message-from-node2-to-replicas", NrOfNodes).await() // barrier("send-message-from-node2-to-replicas", NrOfNodes).await()
node.shutdown() // node.shutdown()
} // }
} // }
} // }

View file

@ -1,132 +1,132 @@
package akka.cluster.storage // package akka.cluster.storage
import org.scalatest.matchers.MustMatchers // import org.scalatest.matchers.MustMatchers
import akka.actor.Actor // import akka.actor.Actor
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } // import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
import org.I0Itec.zkclient.ZkServer // import org.I0Itec.zkclient.ZkServer
//import zookeeper.AkkaZkClient // //import zookeeper.AkkaZkClient
import akka.cluster.storage.StorageTestUtils._ // import akka.cluster.storage.StorageTestUtils._
import java.io.File // import java.io.File
import java.util.concurrent.atomic.AtomicLong // import java.util.concurrent.atomic.AtomicLong
class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { // class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
val dataPath = "_akka_cluster/data" // val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log" // val logPath = "_akka_cluster/log"
var zkServer: ZkServer = _ // var zkServer: ZkServer = _
//var zkClient: AkkaZkClient = _ // //var zkClient: AkkaZkClient = _
val idGenerator = new AtomicLong // val idGenerator = new AtomicLong
def generateKey: String = { // def generateKey: String = {
"foo" + idGenerator.incrementAndGet() // "foo" + idGenerator.incrementAndGet()
} // }
override def beforeAll() { // override def beforeAll() {
/*new File(dataPath).delete() // /*new File(dataPath).delete()
new File(logPath).delete() // new File(logPath).delete()
try { // try {
zkServer = Cluster.startLocalCluster(dataPath, logPath) // zkServer = Cluster.startLocalCluster(dataPath, logPath)
Thread.sleep(5000) // Thread.sleep(5000)
Actor.cluster.start() // Actor.cluster.start()
zkClient = Cluster.newZkClient() // zkClient = Cluster.newZkClient()
} catch { // } catch {
case e e.printStackTrace() // case e e.printStackTrace()
}*/ // }*/
} // }
override def afterAll() { // override def afterAll() {
/*zkClient.close() // /*zkClient.close()
Actor.cluster.shutdown() // Actor.cluster.shutdown()
ClusterDeployer.shutdown() // ClusterDeployer.shutdown()
Cluster.shutdownLocalCluster() // Cluster.shutdownLocalCluster()
Actor.registry.local.shutdownAll() */ // Actor.registry.local.shutdownAll() */
} // }
/* // /*
"unversioned load" must { // "unversioned load" must {
"throw MissingDataException if non existing key" in { // "throw MissingDataException if non existing key" in {
val storage = new ZooKeeperStorage(zkClient) // val storage = new ZooKeeperStorage(zkClient)
try { // try {
storage.load(generateKey) // storage.load(generateKey)
fail() // fail()
} catch { // } catch {
case e: MissingDataException // case e: MissingDataException
} // }
} // }
"return VersionedData if key existing" in { // "return VersionedData if key existing" in {
val storage = new ZooKeeperStorage(zkClient) // val storage = new ZooKeeperStorage(zkClient)
val key = generateKey // val key = generateKey
val value = "somevalue".getBytes // val value = "somevalue".getBytes
storage.insert(key, value) // storage.insert(key, value)
val result = storage.load(key) // val result = storage.load(key)
//todo: strange that the implicit store is not found // //todo: strange that the implicit store is not found
assertContent(key, value, result.version)(storage) // assertContent(key, value, result.version)(storage)
} // }
} */ // } */
/*"overwrite" must { // /*"overwrite" must {
"throw MissingDataException when there doesn't exist an entry to overwrite" in { // "throw MissingDataException when there doesn't exist an entry to overwrite" in {
val storage = new ZooKeeperStorage(zkClient) // val storage = new ZooKeeperStorage(zkClient)
val key = generateKey // val key = generateKey
val value = "value".getBytes // val value = "value".getBytes
try { // try {
storage.overwrite(key, value) // storage.overwrite(key, value)
fail() // fail()
} catch { // } catch {
case e: MissingDataException // case e: MissingDataException
} // }
assert(!storage.exists(key)) // assert(!storage.exists(key))
} // }
"overwrite if there is an existing value" in { // "overwrite if there is an existing value" in {
val storage = new ZooKeeperStorage(zkClient) // val storage = new ZooKeeperStorage(zkClient)
val key = generateKey // val key = generateKey
val oldValue = "oldvalue".getBytes // val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue) // storage.insert(key, oldValue)
val newValue = "newValue".getBytes // val newValue = "newValue".getBytes
val result = storage.overwrite(key, newValue) // val result = storage.overwrite(key, newValue)
//assertContent(key, newValue, result.version)(storage) // //assertContent(key, newValue, result.version)(storage)
} // }
} // }
"insert" must { // "insert" must {
"place a new value when non previously existed" in { // "place a new value when non previously existed" in {
val storage = new ZooKeeperStorage(zkClient) // val storage = new ZooKeeperStorage(zkClient)
val key = generateKey // val key = generateKey
val oldValue = "oldvalue".getBytes // val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue) // storage.insert(key, oldValue)
val result = storage.load(key) // val result = storage.load(key)
assertContent(key, oldValue)(storage) // assertContent(key, oldValue)(storage)
assert(InMemoryStorage.InitialVersion == result.version) // assert(InMemoryStorage.InitialVersion == result.version)
} // }
"throw DataExistsException when there already exists an entry with the same key" in { // "throw DataExistsException when there already exists an entry with the same key" in {
val storage = new ZooKeeperStorage(zkClient) // val storage = new ZooKeeperStorage(zkClient)
val key = generateKey // val key = generateKey
val oldValue = "oldvalue".getBytes // val oldValue = "oldvalue".getBytes
val initialVersion = storage.insert(key, oldValue) // val initialVersion = storage.insert(key, oldValue)
val newValue = "newValue".getBytes // val newValue = "newValue".getBytes
try { // try {
storage.insert(key, newValue) // storage.insert(key, newValue)
fail() // fail()
} catch { // } catch {
case e: DataExistsException // case e: DataExistsException
} // }
assertContent(key, oldValue, initialVersion)(storage) // assertContent(key, oldValue, initialVersion)(storage)
} // }
} */ // } */
} // }