Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2011-06-27 18:58:01 +02:00
commit c145dcb818
4 changed files with 435 additions and 70 deletions

View file

@ -9,7 +9,7 @@ import org.apache.zookeeper.{ KeeperException, CreateMode }
import org.apache.zookeeper.data.Stat
import java.util.concurrent.ConcurrentHashMap
import annotation.tailrec
import java.lang.{ UnsupportedOperationException, RuntimeException }
import java.lang.{ RuntimeException, UnsupportedOperationException }
/**
* Simple abstraction to store an Array of bytes based on some String key.
@ -28,6 +28,8 @@ trait Storage {
/**
* Loads the VersionedData for the given key.
*
* This call doesn't care about the actual version of the data.
*
* @param key: the key of the VersionedData to load.
* @return the VersionedData for the given entry.
* @throws MissingDataException if the entry with the given key doesn't exist.
@ -36,15 +38,17 @@ trait Storage {
def load(key: String): VersionedData
/**
* Loads the VersionedData for the given key and version.
* Loads the VersionedData for the given key and expectedVersion.
*
* This call can be used for optimistic locking since the version is included.
*
* @param key: the key of the VersionedData to load
* @param version the version of the VersionedData to load
* @param expectedVersion the version the data to load should have.
* @throws MissingDataException if the data with the given key doesn't exist.
* @throws VersioningException if the version of the data is not the same as the given data.
* @throws BadVersionException if the version is not the expected version.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def load(key: String, version: Long): VersionedData
def load(key: String, expectedVersion: Long): VersionedData
/**
* Checks if a VersionedData with the given key exists.
@ -60,11 +64,11 @@ trait Storage {
*
* @param key the key of the Data to insert.
* @param bytes the data to insert.
* @return the VersionedData
* @return the version of the written data (can be used for optimistic locking).
* @throws DataExistsException when VersionedData with the given Key already exists.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def insert(key: String, bytes: Array[Byte]): VersionedData
def insert(key: String, bytes: Array[Byte]): Long
/**
* Inserts the data if there is no data for that key, or overwrites it if it is there.
@ -74,38 +78,42 @@ trait Storage {
*
* @param key the key of the data
* @param bytes the data to insert
* @return the VersionedData that was stored.
* @return the version of the written data (can be used for optimistic locking).
* @throws StorageException if anything goes wrong while accessing the storage
*/
def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData
def insertOrOverwrite(key: String, bytes: Array[Byte]): Long
/**
* Overwrites the current data for the given key.
* Overwrites the current data for the given key. This call doesn't care about the version of the existing data.
*
* @param key the key of the data to overwrite
* @param bytes the data to insert.
* @throws ` when the entry with the given key doesn't exist.
* @return the version of the written data (can be used for optimistic locking).
* @throws MissingDataException when the entry with the given key doesn't exist.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def overwrite(key: String, bytes: Array[Byte]): VersionedData
def overwrite(key: String, bytes: Array[Byte]): Long
/**
* Updates an existing value using an optimistic lock. So it expect the current data to have the expectedVersion
* and only then, it will do the update.
*
* @param key the key of the data to update
* @param bytes the content to write for the given key
* @param expectedVersion the version of the content that is expected to be there.
* @return the version of the written data (can be used for optimistic locking).
* @throws MissingDataException if no data for the given key exists
* @throws BadVersionException if the version if the found data doesn't match the expected version. So essentially
* if another update was already done.
* @throws StorageException if anything goes wrong while accessing the storage
*/
def update(key: String, versionedData: VersionedData): Unit
def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long
}
/**
* The VersionedData is a container of data (some bytes) and a version (a Long).
*/
class VersionedData(val data: Array[Byte], val version: Long) {
/**
* Creates an updated VersionedData. What happens is that a new VersionedData object is created with the newData
* and a version that is one higher than the current version.
*/
def createUpdate(newData: Array[Byte]): VersionedData = new VersionedData(newData, version + 1)
}
class VersionedData(val data: Array[Byte], val version: Long) {}
/**
* An AkkaException thrown by the Storage module.
@ -126,7 +134,7 @@ class DataExistsException(msg: String = null, cause: java.lang.Throwable = null)
/**
* A StorageException thrown when an operation causes an optimistic locking failure.
*/
class VersioningException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause)
/**
* A Storage implementation based on ZooKeeper.
@ -137,11 +145,33 @@ class VersioningException(msg: String = null, cause: java.lang.Throwable = null)
* but it will not participate in any transactions.
*
*/
class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage {
class ZooKeeperStorage(zkClient: AkkaZkClient, root: String = "/peter/storage") extends Storage {
var path = ""
//makes sure that the complete root exists on zookeeper.
root.split("/").foreach(
item if (item.size > 0) {
path = path + "/" + item
if (!zkClient.exists(path)) {
//it could be that another thread is going to create this root node as well, so ignore it when it happens.
try {
zkClient.create(path, "".getBytes, CreateMode.PERSISTENT)
} catch {
case ignore: KeeperException.NodeExistsException
}
}
})
def toZkPath(key: String): String = {
root + "/" + key
}
def load(key: String) = try {
val stat = new Stat
val arrayOfBytes = zkClient.connection.readData(key, stat, false)
val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false)
new VersionedData(arrayOfBytes, stat.getVersion)
} catch {
case e: KeeperException.NoNodeException throw new MissingDataException(
@ -152,9 +182,9 @@ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage {
def load(key: String, expectedVersion: Long) = try {
val stat = new Stat
val arrayOfBytes = zkClient.connection.readData(key, stat, false)
val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false)
if (stat.getVersion != expectedVersion) throw new VersioningException(
if (stat.getVersion != expectedVersion) throw new BadVersionException(
"Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" +
" but found [" + stat.getVersion + "]")
@ -177,12 +207,12 @@ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage {
}
}
def insert(key: String, bytes: Array[Byte]): VersionedData = {
def insert(key: String, bytes: Array[Byte]): Long = {
try {
zkClient.connection.create(key, bytes, CreateMode.PERSISTENT)
//todo: how to get hold of the reference.
zkClient.connection.create(root + "/" + key, bytes, CreateMode.PERSISTENT)
//todo: how to get hold of the version.
val version: Long = 0
new VersionedData(bytes, version)
version
} catch {
case e: KeeperException.NodeExistsException throw new DataExistsException(
String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e)
@ -192,27 +222,28 @@ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage {
}
def exists(key: String) = try {
zkClient.connection.exists(key, false)
zkClient.connection.exists(toZkPath(key), false)
} catch {
case e: KeeperException throw new StorageException(
String.format("Failed to check existance for key [%s]", key), e)
}
def update(key: String, versionedData: VersionedData) {
def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = {
try {
zkClient.connection.writeData(key, versionedData.data, versionedData.version.asInstanceOf[Int])
zkClient.connection.writeData(root + "/" + key, bytes, expectedVersion.asInstanceOf[Int])
throw new RuntimeException()
} catch {
case e: KeeperException.BadVersionException throw new VersioningException(
case e: KeeperException.BadVersionException throw new BadVersionException(
String.format("Failed to update key [%s]: version mismatch", key), e)
case e: KeeperException throw new StorageException(
String.format("Failed to update key [%s]", key), e)
}
}
def overwrite(key: String, bytes: Array[Byte]): VersionedData = {
def overwrite(key: String, bytes: Array[Byte]): Long = {
try {
zkClient.connection.writeData(key, bytes)
throw new RuntimeException()
zkClient.connection.writeData(root + "/" + key, bytes)
-1L
} catch {
case e: KeeperException.NoNodeException throw new MissingDataException(
String.format("Failed to overwrite key [%s]: a previous entry already exists", key), e)
@ -245,7 +276,7 @@ final class InMemoryStorage extends Storage {
def load(key: String, expectedVersion: Long) = {
val result = load(key)
if (result.version != expectedVersion) throw new VersioningException(
if (result.version != expectedVersion) throw new BadVersionException(
"Failed to load key [" + key + "]: version mismatch, expected [" + result.version + "] " +
"but found [" + expectedVersion + "]")
@ -254,7 +285,7 @@ final class InMemoryStorage extends Storage {
def exists(key: String) = map.containsKey(key)
def insert(key: String, bytes: Array[Byte]): VersionedData = {
def insert(key: String, bytes: Array[Byte]): Long = {
val version: Long = InMemoryStorage.InitialVersion
val result = new VersionedData(bytes, version)
@ -262,42 +293,46 @@ final class InMemoryStorage extends Storage {
if (previous != null) throw new DataExistsException(
String.format("Failed to insert key [%s]: the key already has been inserted previously", key))
result
version
}
@tailrec
def update(key: String, updatedData: VersionedData) {
val currentData = map.get(key)
def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = {
val found = map.get(key)
if (currentData == null) throw new MissingDataException(
if (found == null) throw new MissingDataException(
String.format("Failed to update key [%s], no previous entry exist", key))
val expectedVersion = currentData.version + 1
if (expectedVersion != updatedData.version) throw new VersioningException(
if (expectedVersion != found.version) throw new BadVersionException(
"Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" +
" but found [" + updatedData.version + "]")
" but found [" + found.version + "]")
if (!map.replace(key, currentData, updatedData)) update(key, updatedData)
val newVersion: Long = expectedVersion + 1
if (map.replace(key, found, new VersionedData(bytes, newVersion))) newVersion
else update(key, bytes, expectedVersion)
}
@tailrec
def overwrite(key: String, bytes: Array[Byte]): VersionedData = {
val currentData = map.get(key)
def overwrite(key: String, bytes: Array[Byte]): Long = {
val current = map.get(key)
if (currentData == null) throw new MissingDataException(
if (current == null) throw new MissingDataException(
String.format("Failed to overwrite key [%s], no previous entry exist", key))
val newData = currentData.createUpdate(bytes)
if (map.replace(key, currentData, newData)) newData else overwrite(key, bytes)
val update = new VersionedData(bytes, current.version + 1)
if (map.replace(key, current, update)) update.version
else overwrite(key, bytes)
}
def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData = {
def insertOrOverwrite(key: String, bytes: Array[Byte]): Long = {
val version = InMemoryStorage.InitialVersion
val result = new VersionedData(bytes, version)
val previous = map.putIfAbsent(key, result)
if (previous == null) result
if (previous == null) result.version
else overwrite(key, bytes)
}
}

View file

@ -0,0 +1,241 @@
package akka.cluster
import org.scalatest.matchers.MustMatchers
import org.scalatest.WordSpec
import akka.cluster.StorageTestUtils._
class InMemoryStorageSpec extends WordSpec with MustMatchers {
"unversioned load" must {
"throw MissingDataException if non existing key" in {
val store = new InMemoryStorage()
try {
store.load("foo")
fail()
} catch {
case e: MissingDataException
}
}
"return VersionedData if key existing" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
storage.insert(key, value)
val result = storage.load(key)
//todo: strange that the implicit store is not found
assertContent(key, value, result.version)(storage)
}
}
"exist" must {
"return true if value exists" in {
val store = new InMemoryStorage()
val key = "somekey"
store.insert(key, "somevalue".getBytes)
store.exists(key) must be(true)
}
"return false if value not exists" in {
val store = new InMemoryStorage()
store.exists("somekey") must be(false)
}
}
"versioned load" must {
"throw MissingDataException if non existing key" in {
val store = new InMemoryStorage()
try {
store.load("foo", 1)
fail()
} catch {
case e: MissingDataException
}
}
"return VersionedData if key existing and exact version match" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val storedVersion = storage.insert(key, value)
val loaded = storage.load(key, storedVersion)
assert(loaded.version == storedVersion)
org.junit.Assert.assertArrayEquals(value, loaded.data)
}
"throw BadVersionException is version too new" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val version = storage.insert(key, value)
try {
storage.load(key, version + 1)
fail()
} catch {
case e: BadVersionException
}
}
"throw BadVersionException is version too old" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val version = storage.insert(key, value)
try {
storage.load(key, version - 1)
fail()
} catch {
case e: BadVersionException
}
}
}
"insert" must {
"place a new value when non previously existed" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue)
val result = storage.load(key)
assertContent(key, oldValue)(storage)
assert(InMemoryStorage.InitialVersion == result.version)
}
"throw MissingDataException when there already exists an entry with the same key" in {
val storage = new InMemoryStorage()
val key = "somekey"
val initialValue = "oldvalue".getBytes
val initialVersion = storage.insert(key, initialValue)
val newValue = "newValue".getBytes
try {
storage.insert(key, newValue)
fail()
} catch {
case e: DataExistsException
}
assertContent(key, initialValue, initialVersion)(storage)
}
}
"update" must {
"throw MissingDataException when no node exists" in {
val storage = new InMemoryStorage()
val key = "somekey"
try {
storage.update(key, "somevalue".getBytes, 1)
fail()
} catch {
case e: MissingDataException
}
}
"replace if previous value exists and no other updates have been done" in {
val storage = new InMemoryStorage()
//do the initial insert
val key = "foo"
val oldValue = "insert".getBytes
val initialVersion = storage.insert(key, oldValue)
//do the update the will be the cause of the conflict.
val newValue: Array[Byte] = "update".getBytes
val newVersion = storage.update(key, newValue, initialVersion)
assertContent(key, newValue, newVersion)(storage)
}
"throw BadVersionException when already overwritten" in {
val storage = new InMemoryStorage()
//do the initial insert
val key = "foo"
val oldValue = "insert".getBytes
val initialVersion = storage.insert(key, oldValue)
//do the update the will be the cause of the conflict.
val newValue = "otherupdate".getBytes
val newVersion = storage.update(key, newValue, initialVersion)
try {
storage.update(key, "update".getBytes, initialVersion)
fail()
} catch {
case e: BadVersionException
}
assertContent(key, newValue, newVersion)(storage)
}
}
"overwrite" must {
"throw MissingDataException when no node exists" in {
val storage = new InMemoryStorage()
val key = "somekey"
try {
storage.overwrite(key, "somevalue".getBytes)
fail()
} catch {
case e: MissingDataException
}
storage.exists(key) must be(false)
}
"succeed if previous value exist" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
val newValue = "somevalue".getBytes
val initialVersion = storage.insert(key, oldValue)
val overwriteVersion = storage.overwrite(key, newValue)
assert(overwriteVersion == initialVersion + 1)
assertContent(key, newValue, overwriteVersion)(storage)
}
}
"insertOrOverwrite" must {
"insert if nothing was inserted before" in {
val storage = new InMemoryStorage()
val key = "somekey"
val value = "somevalue".getBytes
val version = storage.insertOrOverwrite(key, value)
assert(version == InMemoryStorage.InitialVersion)
assertContent(key, value, version)(storage)
}
"overwrite of something existed before" in {
val storage = new InMemoryStorage()
val key = "somekey"
val oldValue = "oldvalue".getBytes
val newValue = "somevalue".getBytes
val initialVersion = storage.insert(key, oldValue)
val overwriteVersion = storage.insertOrOverwrite(key, newValue)
assert(overwriteVersion == initialVersion + 1)
assertContent(key, newValue, overwriteVersion)(storage)
}
}
}

View file

@ -0,0 +1,15 @@
package akka.cluster
object StorageTestUtils {
def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: Storage) {
val found = storage.load(key)
assert(found.version == expectedVersion, "versions should match, found[" + found.version + "], expected[" + expectedVersion + "]")
org.junit.Assert.assertArrayEquals(expectedData, found.data)
}
def assertContent(key: String, expectedData: Array[Byte])(implicit storage: Storage) {
val found = storage.load(key)
org.junit.Assert.assertArrayEquals(expectedData, found.data)
}
}

View file

@ -5,14 +5,25 @@ import akka.actor.Actor
import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec }
import org.I0Itec.zkclient.ZkServer
import zookeeper.AkkaZkClient
import akka.cluster.StorageTestUtils._
import java.io.File
import java.util.concurrent.atomic.AtomicLong
class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
var zkServer: ZkServer = _
var zkClient: AkkaZkClient = _
/*
val idGenerator = new AtomicLong
def generateKey: String = {
"foo" + idGenerator.incrementAndGet()
}
override def beforeAll() {
/*new File(dataPath).delete()
new File(logPath).delete()
try {
zkServer = Cluster.startLocalCluster(dataPath, logPath)
Thread.sleep(5000)
@ -20,39 +31,102 @@ class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfte
zkClient = Cluster.newZkClient()
} catch {
case e e.printStackTrace()
}
}*/
}
override def afterAll() {
zkClient.close()
/*zkClient.close()
Actor.cluster.shutdown()
ClusterDeployer.shutdown()
Cluster.shutdownLocalCluster()
Actor.registry.local.shutdownAll()
Actor.registry.local.shutdownAll() */
}
*/
/*
"unversioned load" must {
"throw MissingDataException if non existing key" in {
// val store = new ZooKeeperStorage(zkClient)
val storage = new ZooKeeperStorage(zkClient)
//try {
// store.load("foo")
// fail()
//} catch {
// case e: MissingDataException
//}
try {
storage.load(generateKey)
fail()
} catch {
case e: MissingDataException
}
}
/*
"return VersionedData if key existing" in {
val storage = new InMemoryStorage()
val key = "somekey"
val storage = new ZooKeeperStorage(zkClient)
val key = generateKey
val value = "somevalue".getBytes
storage.insert(key, value)
val result = storage.load(key)
//todo: strange that the implicit store is not found
assertContent(key, value, result.version)(storage)
} */
}
} */
/*"overwrite" must {
"throw MissingDataException when there doesn't exist an entry to overwrite" in {
val storage = new ZooKeeperStorage(zkClient)
val key = generateKey
val value = "value".getBytes
try {
storage.overwrite(key, value)
fail()
} catch {
case e: MissingDataException
}
assert(!storage.exists(key))
}
"overwrite if there is an existing value" in {
val storage = new ZooKeeperStorage(zkClient)
val key = generateKey
val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue)
val newValue = "newValue".getBytes
val result = storage.overwrite(key, newValue)
//assertContent(key, newValue, result.version)(storage)
}
}
"insert" must {
"place a new value when non previously existed" in {
val storage = new ZooKeeperStorage(zkClient)
val key = generateKey
val oldValue = "oldvalue".getBytes
storage.insert(key, oldValue)
val result = storage.load(key)
assertContent(key, oldValue)(storage)
assert(InMemoryStorage.InitialVersion == result.version)
}
"throw DataExistsException when there already exists an entry with the same key" in {
val storage = new ZooKeeperStorage(zkClient)
val key = generateKey
val oldValue = "oldvalue".getBytes
val initialVersion = storage.insert(key, oldValue)
val newValue = "newValue".getBytes
try {
storage.insert(key, newValue)
fail()
} catch {
case e: DataExistsException
}
assertContent(key, oldValue, initialVersion)(storage)
}
} */
}