Added tests for storing, retrieving and removing custom configuration data in cluster storage.
Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
parent
df8c4dac89
commit
833238cd44
8 changed files with 122 additions and 32 deletions
|
|
@ -465,13 +465,22 @@ trait ClusterNode {
|
|||
*/
|
||||
def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]]
|
||||
|
||||
/**
|
||||
* Stores a configuration element under a specific key.
|
||||
* If the key already exists then it will be overwritten.
|
||||
*/
|
||||
def setConfigElement(key: String, bytes: Array[Byte])
|
||||
|
||||
/**
|
||||
* Returns the config element for the key or NULL if no element exists under the key.
|
||||
* Returns <code>Some(element)</code> if it exists else <code>None</code>
|
||||
*/
|
||||
def getConfigElement(key: String): Option[Array[Byte]]
|
||||
|
||||
/**
|
||||
* Removes configuration element for a specific key.
|
||||
* Does nothing if the key does not exist.
|
||||
*/
|
||||
def removeConfigElement(key: String)
|
||||
|
||||
def getConfigElementKeys: Array[String]
|
||||
|
|
|
|||
|
|
@ -1143,6 +1143,10 @@ class DefaultClusterNode private[akka] (
|
|||
// Config
|
||||
// =======================================
|
||||
|
||||
/**
|
||||
* Stores a configuration element under a specific key.
|
||||
* If the key already exists then it will be overwritten.
|
||||
*/
|
||||
def setConfigElement(key: String, bytes: Array[Byte]) {
|
||||
val compressedBytes = if (shouldCompressData) LZF.compress(bytes) else bytes
|
||||
EventHandler.debug(this,
|
||||
|
|
@ -1168,6 +1172,7 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
/**
|
||||
* Returns the config element for the key or NULL if no element exists under the key.
|
||||
* Returns <code>Some(element)</code> if it exists else <code>None</code>
|
||||
*/
|
||||
def getConfigElement(key: String): Option[Array[Byte]] = try {
|
||||
Some(zkClient.connection.readData(configurationPathFor(key), new Stat, true))
|
||||
|
|
@ -1175,6 +1180,10 @@ class DefaultClusterNode private[akka] (
|
|||
case e: KeeperException.NoNodeException ⇒ None
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes configuration element for a specific key.
|
||||
* Does nothing if the key does not exist.
|
||||
*/
|
||||
def removeConfigElement(key: String) {
|
||||
ignore[ZkNoNodeException] {
|
||||
EventHandler.debug(this,
|
||||
|
|
|
|||
|
|
@ -305,38 +305,6 @@ class ClusterSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with
|
|||
node2.stop
|
||||
}
|
||||
|
||||
"be able to set and get config elements" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "set-get-config-1", port = 9001))
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "set-get-config-2", port = 9002))
|
||||
node1.start
|
||||
node2.start
|
||||
|
||||
node1.setConfigElement("key1", "value1".getBytes)
|
||||
node2.getConfigElement("key1") must be("value1".getBytes)
|
||||
|
||||
node2.setConfigElement("key2", "value2".getBytes)
|
||||
node1.getConfigElement("key2") must be("value2".getBytes)
|
||||
|
||||
node1.stop
|
||||
node2.stop
|
||||
}
|
||||
|
||||
"be able to remove config elements" in {
|
||||
val node1 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "remove-config-1", port = 9001))
|
||||
val node2 = Cluster.newNode(nodeAddress = NodeAddress("test-cluster", "remove-config-2", port = 9002))
|
||||
node1.start
|
||||
node2.start
|
||||
|
||||
node1.setConfigElement("key1", "value1".getBytes)
|
||||
node2.getConfigElement("key1") must be("value1".getBytes)
|
||||
|
||||
node2.removeConfigElement("key1")
|
||||
node1.getConfigElement("key1") must be(null)
|
||||
|
||||
node1.stop
|
||||
node2.stop
|
||||
}
|
||||
|
||||
"be able to replicate an actor" in {
|
||||
// create actor
|
||||
val actorRef = actorOf[MyJavaSerializableActor]("actor-address").start
|
||||
|
|
|
|||
|
|
@ -0,0 +1 @@
|
|||
akka.event-handler-level = "DEBUG"
|
||||
|
|
@ -0,0 +1 @@
|
|||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||
|
|
@ -0,0 +1 @@
|
|||
akka.event-handler-level = "DEBUG"
|
||||
|
|
@ -0,0 +1 @@
|
|||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
|
||||
|
|
@ -0,0 +1,100 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
||||
*/
|
||||
|
||||
package akka.cluster.configuration
|
||||
|
||||
import org.scalatest.WordSpec
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
|
||||
import akka.cluster._
|
||||
import Cluster._
|
||||
|
||||
object ConfigurationStorageMultiJvmSpec {
|
||||
var NrOfNodes = 2
|
||||
}
|
||||
|
||||
class ConfigurationStorageMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
|
||||
import ConfigurationStorageMultiJvmSpec._
|
||||
|
||||
"A cluster" must {
|
||||
|
||||
"be able to store, read and remove custom configuration data" in {
|
||||
|
||||
barrier("start-node-1", NrOfNodes) {
|
||||
node.start()
|
||||
}
|
||||
|
||||
barrier("start-node-2", NrOfNodes) {
|
||||
}
|
||||
|
||||
barrier("store-config-data-node-1", NrOfNodes) {
|
||||
node.setConfigElement("key1", "value1".getBytes)
|
||||
}
|
||||
|
||||
barrier("read-config-data-node-2", NrOfNodes) {
|
||||
}
|
||||
|
||||
barrier("remove-config-data-node-2", NrOfNodes) {
|
||||
}
|
||||
|
||||
barrier("try-read-config-data-node-1", NrOfNodes) {
|
||||
val option = node.getConfigElement("key1")
|
||||
option.isDefined must be(false)
|
||||
|
||||
val elements = node.getConfigElementKeys
|
||||
elements.size must be(0)
|
||||
}
|
||||
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
|
||||
override def beforeAll() = {
|
||||
startLocalCluster()
|
||||
}
|
||||
|
||||
override def afterAll() = {
|
||||
shutdownLocalCluster()
|
||||
}
|
||||
}
|
||||
|
||||
class ConfigurationStorageMultiJvmNode2 extends WordSpec with MustMatchers {
|
||||
import ConfigurationStorageMultiJvmSpec._
|
||||
|
||||
"A cluster" must {
|
||||
|
||||
"be able to store, read and remove custom configuration data" in {
|
||||
|
||||
barrier("start-node-1", NrOfNodes) {
|
||||
}
|
||||
|
||||
barrier("start-node-2", NrOfNodes) {
|
||||
node.start()
|
||||
}
|
||||
|
||||
barrier("store-config-data-node-1", NrOfNodes) {
|
||||
}
|
||||
|
||||
barrier("read-config-data-node-2", NrOfNodes) {
|
||||
val option = node.getConfigElement("key1")
|
||||
option.isDefined must be(true)
|
||||
option.get must be("value1".getBytes)
|
||||
|
||||
val elements = node.getConfigElementKeys
|
||||
elements.size must be(1)
|
||||
elements.head must be("key1")
|
||||
}
|
||||
|
||||
barrier("remove-config-data-node-2", NrOfNodes) {
|
||||
node.removeConfigElement("key1")
|
||||
}
|
||||
|
||||
barrier("try-read-config-data-node-1", NrOfNodes) {
|
||||
}
|
||||
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue