Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-06-13 16:47:27 +02:00
commit 3d54c09994
5 changed files with 444 additions and 227 deletions

182
akka-cluster/src/main/scala/akka/cluster/Storage.scala Normal file → Executable file
View file

@ -5,9 +5,8 @@ import akka.AkkaException
import org.apache.zookeeper.{ KeeperException, CreateMode }
import org.apache.zookeeper.data.Stat
import java.util.concurrent.ConcurrentHashMap
import org.apache.zookeeper.KeeperException.NoNodeException
import java.lang.UnsupportedOperationException
import annotation.tailrec
import java.lang.{ UnsupportedOperationException, RuntimeException }
/**
* Simple abstraction to store an Array of bytes based on some String key.
@ -15,36 +14,37 @@ import annotation.tailrec
* Nothing is being said about ACID, transactions etc. It depends on the implementation
* of this Storage interface of what is and isn't done on the lowest level.
*
* TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking.
* (This is supported by ZooKeeper).
* The amount of data that is allowed to be insert/updated is implementation specific. The InMemoryStorage
* has no limits, but the ZooKeeperStorage has a maximum size of 1 mb.
*
* TODO: Class is up for better names.
* TODO: Instead of a String as key, perhaps also a byte-array.
*/
trait Storage {
/**
* Loads the given entry.
* Loads the VersionedData for the given key.
*
* @param key: the key of the data to load.
* @return the VersionedData for the given key.
* @throws NoNodeExistsException if the data with the given key doesn't exist.
* @param key: the key of the VersionedData to load.
* @return the VersionedData for the given entry.
* @throws MissingDataException if the entry with the given key doesn't exist.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def load(key: String): VersionedData
/**
* Loads the data for the given key and version.
* Loads the VersionedData for the given key and version.
*
* @param key: the key of the data to load
* @param version the version of the data to load
* @throws NoNodeExistsException if the data with the given key doesn't exist.
* @throws VersioningMismatchStorageException if the version of the data is not the same as the given data.
* @param key: the key of the VersionedData to load
* @param version the version of the VersionedData to load
* @throws MissingDataException if the data with the given key doesn't exist.
* @throws VersioningException if the version of the data is not the same as the given data.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def load(key: String, version: Long): VersionedData
/**
* Checks if a value with the given key exists.
* Checks if a VersionedData with the given key exists.
*
* @param key the key to check the existence for.
* @return true if exists, false if not.
@ -57,19 +57,34 @@ trait Storage {
*
* @param key the key of the Data to insert.
* @param bytes the data to insert.
* @return the version of the inserted data
* @throws NodeExistsException when a Node with the given Key already exists.
* @return the VersionedData
* @throws DataExistsException when VersionedData with the given Key already exists.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def insert(key: String, bytes: Array[Byte]): VersionedData
/**
* Stores a array of bytes based on some key.
* Inserts the data if there is no data for that key, or overwrites it if it is there.
*
* @throws MissingNodeException when the Node with the given key doesn't exist.
* This is the method you want to call if you just want to save something and don't
* care about any lost update issues.
*
* @param key the key of the data
* @param bytes the data to insert
* @return the VersionedData that was stored.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def update(key: String, bytes: Array[Byte]): VersionedData
def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData
/**
* Overwrites the current data for the given key.
*
* @param key the key of the data to overwrite
* @param bytes the data to insert.
* @throws ` when the entry with the given key doesn't exist.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def overwrite(key: String, bytes: Array[Byte]): VersionedData
/**
* @throws StorageException if anything goes wrong while accessing the storage
@ -98,17 +113,17 @@ class StorageException(msg: String = null, cause: java.lang.Throwable = null) ex
* *
* A StorageException thrown when an operation is done on a non existing node.
*/
class MissingNodeException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
/**
* A StorageException thrown when an operation is done on an existing node, but no node was expected.
*/
class NodeExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
/**
* A StorageException thrown when an operation causes an optimistic locking failure.
*/
class VersioningMismatchStorageException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class VersioningException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
/**
* A Storage implementation based on ZooKeeper.
@ -117,59 +132,89 @@ class VersioningMismatchStorageException(msg: String = null, cause: java.lang.Th
* - so everything is written or nothing is written
* - is isolated, so threadsafe,
* but it will not participate in any transactions.
* //todo: unclear, is only a single connection used in the JVM??
*
*/
class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage {
def load(key: String) = try {
val arrayOfBytes: Array[Byte] = zkClient.connection.readData(key, new Stat, false)
//Some(arrayOfBytes)
throw new UnsupportedOperationException()
val stat = new Stat
val arrayOfBytes = zkClient.connection.readData(key, stat, false)
new VersionedData(arrayOfBytes, stat.getVersion)
} catch {
//todo: improved error messaged
case e: KeeperException.NoNodeException throw new MissingNodeException("Failed to load key", e)
case e: KeeperException throw new StorageException("failed to load key " + key, e)
case e: KeeperException.NoNodeException throw new MissingDataException(
String.format("Failed to load key [%s]: no data was found", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to load key [%s]", key), e)
}
def load(key: String, version: Long) = {
throw new UnsupportedOperationException()
def load(key: String, expectedVersion: Long) = try {
val stat = new Stat
val arrayOfBytes = zkClient.connection.readData(key, stat, false)
if (stat.getVersion != expectedVersion) throw new VersioningException(
"Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" +
" but found [" + stat.getVersion + "]")
new VersionedData(arrayOfBytes, stat.getVersion)
} catch {
case e: KeeperException.NoNodeException throw new MissingDataException(
String.format("Failed to load key [%s]: no data was found", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to load key [%s]", key), e)
}
def insertOrOverwrite(key: String, bytes: Array[Byte]) = {
try {
throw new UnsupportedOperationException()
} catch {
case e: KeeperException.NodeExistsException throw new DataExistsException(
String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to insert key [%s]", key), e)
}
}
def insert(key: String, bytes: Array[Byte]): VersionedData = {
try {
zkClient.connection.create(key, bytes, CreateMode.PERSISTENT);
throw new UnsupportedOperationException()
zkClient.connection.create(key, bytes, CreateMode.PERSISTENT)
//todo: how to get hold of the reference.
val version: Long = 0
new VersionedData(bytes, version)
} catch {
//todo: improved error messaged
case e: KeeperException.NodeExistsException throw new NodeExistsException("failed to insert key " + key, e)
case e: KeeperException throw new StorageException("failed to insert key " + key, e)
case e: KeeperException.NodeExistsException throw new DataExistsException(
String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to insert key [%s]", key), e)
}
}
def exists(key: String) = try {
zkClient.connection.exists(key, false)
} catch {
//todo: improved error messaged
case e: KeeperException throw new StorageException("failed to check for existance on key " + key, e)
case e: KeeperException throw new StorageException(
String.format("Failed to check existance for key [%s]", key), e)
}
def update(key: String, versionedData: VersionedData) = try {
zkClient.connection.writeData(key, versionedData.data, versionedData.version.asInstanceOf[Int])
} catch {
//todo: improved error messaged
case e: KeeperException.BadVersionException throw new VersioningMismatchStorageException()
case e: KeeperException throw new StorageException("failed to check for existance on key " + key, e)
def update(key: String, versionedData: VersionedData) {
try {
zkClient.connection.writeData(key, versionedData.data, versionedData.version.asInstanceOf[Int])
} catch {
case e: KeeperException.BadVersionException throw new VersioningException(
String.format("Failed to update key [%s]: version mismatch", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to update key [%s]", key), e)
}
}
def update(key: String, bytes: Array[Byte]): VersionedData = {
def overwrite(key: String, bytes: Array[Byte]): VersionedData = {
try {
zkClient.connection.writeData(key, bytes)
throw new RuntimeException()
} catch {
//todo: improved error messaged
case e: KeeperException.NoNodeException throw new MissingNodeException("failed to update key ", e)
case e: KeeperException throw new StorageException("failed to update key ", e)
case e: KeeperException.NoNodeException throw new MissingDataException(
String.format("Failed to overwrite key [%s]: a previous entry already exists", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to overwrite key [%s]", key), e)
}
}
}
@ -188,8 +233,8 @@ final class InMemoryStorage extends Storage {
def load(key: String) = {
val result = map.get(key)
if (result == null) throw new MissingNodeException(
String.format("Failed to load data for key [%s]: no data was found", key))
if (result == null) throw new MissingDataException(
String.format("Failed to load key [%s]: no data was found", key))
result
}
@ -197,8 +242,8 @@ final class InMemoryStorage extends Storage {
def load(key: String, expectedVersion: Long) = {
val result = load(key)
if (result.version != expectedVersion) throw new VersioningMismatchStorageException(
"Failed to load data for key [" + key + "]: version mismatch, expected [" + result.version + "] " +
if (result.version != expectedVersion) throw new VersioningException(
"Failed to load key [" + key + "]: version mismatch, expected [" + result.version + "] " +
"but found [" + expectedVersion + "]")
result
@ -211,7 +256,7 @@ final class InMemoryStorage extends Storage {
val result = new VersionedData(bytes, version)
val previous = map.putIfAbsent(key, result)
if (previous != null) throw new NodeExistsException(
if (previous != null) throw new DataExistsException(
String.format("Failed to insert key [%s]: the key already has been inserted previously", key))
result
@ -221,23 +266,36 @@ final class InMemoryStorage extends Storage {
def update(key: String, updatedData: VersionedData) {
val currentData = map.get(key)
if (currentData == null) throw new MissingNodeException(
String.format("Failed to update data for key [%s], no previous entry exist", key))
if (currentData == null) throw new MissingDataException(
String.format("Failed to update key [%s], no previous entry exist", key))
val expectedVersion = currentData.version + 1
if (expectedVersion != updatedData.version) throw new VersioningMismatchStorageException(
"Failed to update data for key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" +
if (expectedVersion != updatedData.version) throw new VersioningException(
"Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" +
" but found [" + updatedData.version + "]")
if (!map.replace(key, currentData, updatedData)) update(key, updatedData)
}
def update(key: String, bytes: Array[Byte]): VersionedData = {
if (map.get(key) == null) throw new NoNodeException(
String.format("Failed to update key [%s]: no previous insert of this key exists", key))
@tailrec
def overwrite(key: String, bytes: Array[Byte]): VersionedData = {
val currentData = map.get(key)
//smap.put(key, bytes)
throw new UnsupportedOperationException()
if (currentData == null) throw new MissingDataException(
String.format("Failed to overwrite key [%s], no previous entry exist", key))
val newData = currentData.createUpdate(bytes)
if (map.replace(key, currentData, newData)) newData else overwrite(key, bytes)
}
def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData = {
val version = InMemoryStorage.InitialVersion
val result = new VersionedData(bytes, version)
val previous = map.putIfAbsent(key, result)
if (previous == null) result
else overwrite(key, bytes)
}
}

View file

@ -0,0 +1,251 @@
package akka.cluster
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.cluster.StorageTestUtils._
class InMemoryStorageSpec extends WordSpec with MustMatchers {
"unversioned load" must {
"throw MissingDataException if non existing key" in {
val store = new InMemoryStorage()
try {
store.load("foo")
fail()
} catch {
case e: MissingDataException
}
}
"return VersionedData if key existing" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
storage.insert(key, value)
val result = storage.load(key)
//todo: strange that the implicit store is not found
assertContent(key, value, result.version)(storage)
}
}
"exist" must {
"return true if value exists" in {
val store = new InMemoryStorage()
val key = "somekey"
store.insert(key, "somevalue".getBytes)
store.exists(key) must be(true)
}
"return false if value not exists" in {
val store = new InMemoryStorage()
store.exists("somekey") must be(false)
}
}
"versioned load" must {
"throw MissingDataException if non existing key" in {
val store = new InMemoryStorage()
try {
store.load("foo", 1)
fail()
} catch {
case e: MissingDataException
}
}
"return VersionedData if key existing and exact version match" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val stored = storage.insert(key, value)
val result = storage.load(key, stored.version)
assert(result.version == stored.version)
assert(result.data == stored.data)
}
"throw VersioningException is version too new" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val stored = storage.insert(key, value)
try {
storage.load(key, stored.version + 1)
fail()
} catch {
case e: VersioningException
}
}
"throw VersioningException is version too old" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val stored = storage.insert(key, value)
try {
storage.load(key, stored.version - 1)
fail()
} catch {
case e: VersioningException
}
}
}
"insert" must {
"place a new value when non previously existed" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue)
val result = storage.load(key)
assertContent(key, oldValue)(storage)
assert(InMemoryStorage.InitialVersion == result.version)
}
"throw MissingDataException when there already exists an entry with the same key" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
val oldVersionedData = storage.insert(key, oldValue)
val newValue = "newValue".getBytes
try {
storage.insert(key, newValue)
fail()
} catch {
case e: DataExistsException
}
//make sure that the old value was not changed
assert(oldVersionedData == storage.load(key))
}
}
"update" must {
"throw MissingDataException when no node exists" in {
val storage = new InMemoryStorage()
val key = "somekey"
try {
storage.update(key, new VersionedData("somevalue".getBytes, 1))
fail()
} catch {
case e: MissingDataException
}
}
"replace if previous value exists and no other updates have been done" in {
val storage = new InMemoryStorage()
//do the initial insert
val key = "foo"
val oldValue = "insert".getBytes
val insert = storage.insert(key, oldValue)
//do the update the will be the cause of the conflict.
val updateValue = "update".getBytes
val update = insert.createUpdate(updateValue)
storage.update(key, update)
assertContent(key, update.data, update.version)(storage)
}
"throw VersioningException when already overwritten" in {
val storage = new InMemoryStorage()
//do the initial insert
val key = "foo"
val oldValue = "insert".getBytes
val insert = storage.insert(key, oldValue)
//do the update the will be the cause of the conflict.
val otherUpdateValue = "otherupdate".getBytes
val otherUpdate = insert.createUpdate(otherUpdateValue)
storage.update(key, otherUpdate)
val update = insert.createUpdate("update".getBytes)
try {
storage.update(key, update)
fail()
} catch {
case e: VersioningException
}
assertContent(key, otherUpdate.data, otherUpdate.version)(storage)
}
}
"overwrite" must {
"throw MissingDataException when no node exists" in {
val storage = new InMemoryStorage()
val key = "somekey"
try {
storage.overwrite(key, "somevalue".getBytes)
fail()
} catch {
case e: MissingDataException
}
storage.exists(key) must be(false)
}
"succeed if previous value exist" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
val newValue: Array[Byte] = "somevalue".getBytes
val initialInsert: VersionedData = storage.insert(key, oldValue)
val result: VersionedData = storage.overwrite(key, newValue)
assert(result.version == initialInsert.version + 1)
assert(result.data == newValue)
storage.load(key) must be eq (result)
}
}
"insertOrOverwrite" must {
"insert if nothing was inserted before" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val result = storage.insertOrOverwrite(key, value)
assert(result.version == InMemoryStorage.InitialVersion)
assert(result.data == value)
storage.load(key) must be eq (result)
}
"overwrite of something existed before" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
val newValue = "somevalue".getBytes
val initialInsert = storage.insert(key, oldValue)
val result = storage.insertOrOverwrite(key, newValue)
assert(result.version == initialInsert.version + 1)
assert(result.data == newValue)
storage.load(key) must be eq (result)
}
}
}

View file

@ -1,165 +0,0 @@
package akka.cluster
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
class InMemoryStorageSpec extends WordSpec with MustMatchers {
"unversioned load" must {
"throw MissingNodeException if non existing key" in {
val store = new InMemoryStorage()
try {
store.load("foo")
fail()
} catch {
case e: MissingNodeException
}
}
"return VersionedData if key existing" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
storage.insert(key, value)
val result = storage.load(key)
//todo: strange that the implicit store is not found
assertContent(key, value, result.version)(storage)
}
}
"exist" must {
"return true if value exists" in {
val store = new InMemoryStorage()
val key = "somekey"
store.insert(key, "somevalue".getBytes)
store.exists(key) must be(true)
}
"return false if value not exists" in {
val store = new InMemoryStorage()
store.exists("somekey") must be(false)
}
}
"versioned load" must {
"throw MissingNodeException if non existing key" in {
val store = new InMemoryStorage()
try {
store.load("foo", 1)
fail()
} catch {
case e: MissingNodeException
}
}
"return VersionedData if key existing and exact version match" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val stored = storage.insert(key, value)
val result = storage.load(key, stored.version)
assert(result.version == stored.version)
assert(result.data == stored.data)
}
"throw VersioningMismatchStorageException is version too new" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val stored = storage.insert(key, value)
try {
storage.load(key, stored.version + 1)
fail()
} catch {
case e: VersioningMismatchStorageException
}
}
"throw VersioningMismatchStorageException is version too old" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val stored = storage.insert(key, value)
try {
storage.load(key, stored.version - 1)
fail()
} catch {
case e: VersioningMismatchStorageException
}
}
}
"insert" must {
"place a new value when non previously existed" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue)
val result = storage.load(key)
assertContent(key, oldValue)(storage)
assert(InMemoryStorage.InitialVersion == result.version)
}
"throw MissingNodeException when there already exists an entry with the same key" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
val oldVersionedData = storage.insert(key, oldValue)
val newValue = "newValue".getBytes
try {
storage.insert(key, newValue)
fail()
} catch {
case e: NodeExistsException
}
//make sure that the old value was not changed
assert(oldVersionedData == storage.load(key))
}
}
"update with versioning" must {
"throw NoNodeException when no node exists" in {
val storage = new InMemoryStorage()
val key = "somekey"
try {
storage.update(key, new VersionedData("somevalue".getBytes, 1))
fail()
} catch {
case e: MissingNodeException
}
}
"throw OptimisticLockException when ..." in {
}
"replace" in {
}
}
def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: InMemoryStorage) {
val found = storage.load(key)
assert(found.version == expectedVersion)
assert(expectedData == found.data) //todo: structural equals
}
def assertContent(key: String, expectedData: Array[Byte])(implicit storage: InMemoryStorage) {
val found = storage.load(key)
assert(expectedData == found.data) //todo: structural equals
}
}

View file

@ -0,0 +1,15 @@
package akka.cluster
object StorageTestUtils {
def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: InMemoryStorage) {
val found = storage.load(key)
assert(found.version == expectedVersion)
assert(expectedData == found.data) //todo: structural equals
}
def assertContent(key: String, expectedData: Array[Byte])(implicit storage: InMemoryStorage) {
val found = storage.load(key)
assert(expectedData == found.data) //todo: structural equals
}
}

View file

@ -0,0 +1,58 @@
package akka.cluster
import org.scalatest.matchers.MustMatchers
import akka.actor.Actor
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
import org.I0Itec.zkclient.ZkServer
import zookeeper.AkkaZkClient
class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
var zkServer: ZkServer = _
var zkClient: AkkaZkClient = _
override def beforeAll() {
try {
zkServer = Cluster.startLocalCluster(dataPath, logPath)
Thread.sleep(5000)
Actor.cluster.start()
zkClient = Cluster.newZkClient()
} catch {
case e e.printStackTrace()
}
}
override def afterAll() {
zkClient.close()
Actor.cluster.shutdown()
ClusterDeployer.shutdown()
Cluster.shutdownLocalCluster()
Actor.registry.local.shutdownAll()
}
"unversioned load" must {
"throw MissingDataException if non existing key" in {
val store = new ZooKeeperStorage(zkClient)
//try {
// store.load("foo")
// fail()
//} catch {
// case e: MissingDataException
//}
}
/*
"return VersionedData if key existing" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
storage.insert(key, value)
val result = storage.load(key)
//todo: strange that the implicit store is not found
assertContent(key, value, result.version)(storage)
} */
}
}