diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index d9450c4ba8..5b6dd82ab6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -237,10 +237,10 @@ object Cluster { def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking) - def barrier(name: String, count: Int) = + def barrier(name: String, count: Int): ZooKeeperBarrier = ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count) - def barrier(name: String, count: Int, timeout: Duration) = + def barrier(name: String, count: Int, timeout: Duration): ZooKeeperBarrier = ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count, timeout) def uuidToString(uuid: UUID): String = uuid.toString @@ -258,9 +258,9 @@ object Cluster { } } - def uuidProtocolToUuid(uuid: UuidProtocol) = new UUID(uuid.getHigh, uuid.getLow) + def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow) - def uuidToUuidProtocol(uuid: UUID) = + def uuidToUuidProtocol(uuid: UUID): UuidProtocol = UuidProtocol.newBuilder .setHigh(uuid.getTime) .setLow(uuid.getClockSeqAndNode) diff --git a/akka-cluster/src/main/scala/akka/cluster/Storage.scala b/akka-cluster/src/main/scala/akka/cluster/Storage.scala deleted file mode 100755 index 9c2ed81fbf..0000000000 --- a/akka-cluster/src/main/scala/akka/cluster/Storage.scala +++ /dev/null @@ -1,358 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ -package akka.cluster - -import zookeeper.AkkaZkClient -import akka.AkkaException -import org.apache.zookeeper.{ KeeperException, CreateMode } -import org.apache.zookeeper.data.Stat -import java.util.concurrent.ConcurrentHashMap -import annotation.tailrec -import java.lang.{ RuntimeException, UnsupportedOperationException } - -/** - * Simple abstraction to store an Array of bytes based on some String key. - * - * Nothing is being said about ACID, transactions etc. It depends on the implementation - * of this Storage interface of what is and isn't done on the lowest level. - * - * The amount of data that is allowed to be insert/updated is implementation specific. The InMemoryStorage - * has no limits, but the ZooKeeperStorage has a maximum size of 1 mb. - * - * TODO: Class is up for better names. - * TODO: Instead of a String as key, perhaps also a byte-array. - */ -trait Storage { - - /** - * Loads the VersionedData for the given key. - * - * This call doesn't care about the actual version of the data. - * - * @param key: the key of the VersionedData to load. - * @return the VersionedData for the given entry. - * @throws MissingDataException if the entry with the given key doesn't exist. - * @throws StorageException if anything goes wrong while accessing the storage - */ - def load(key: String): VersionedData - - /** - * Loads the VersionedData for the given key and expectedVersion. - * - * This call can be used for optimistic locking since the version is included. - * - * @param key: the key of the VersionedData to load - * @param expectedVersion the version the data to load should have. - * @throws MissingDataException if the data with the given key doesn't exist. - * @throws BadVersionException if the version is not the expected version. - * @throws StorageException if anything goes wrong while accessing the storage - */ - def load(key: String, expectedVersion: Long): VersionedData - - /** - * Checks if a VersionedData with the given key exists. - * - * @param key the key to check the existence for. - * @return true if exists, false if not. - * @throws StorageException if anything goes wrong while accessing the storage - */ - def exists(key: String): Boolean - - /** - * Inserts a byte-array based on some key. - * - * @param key the key of the Data to insert. - * @param bytes the data to insert. - * @return the version of the written data (can be used for optimistic locking). - * @throws DataExistsException when VersionedData with the given Key already exists. - * @throws StorageException if anything goes wrong while accessing the storage - */ - def insert(key: String, bytes: Array[Byte]): Long - - /** - * Inserts the data if there is no data for that key, or overwrites it if it is there. - * - * This is the method you want to call if you just want to save something and don't - * care about any lost update issues. - * - * @param key the key of the data - * @param bytes the data to insert - * @return the version of the written data (can be used for optimistic locking). - * @throws StorageException if anything goes wrong while accessing the storage - */ - def insertOrOverwrite(key: String, bytes: Array[Byte]): Long - - /** - * Overwrites the current data for the given key. This call doesn't care about the version of the existing data. - * - * @param key the key of the data to overwrite - * @param bytes the data to insert. - * @return the version of the written data (can be used for optimistic locking). - * @throws MissingDataException when the entry with the given key doesn't exist. - * @throws StorageException if anything goes wrong while accessing the storage - */ - def overwrite(key: String, bytes: Array[Byte]): Long - - /** - * Updates an existing value using an optimistic lock. So it expect the current data to have the expectedVersion - * and only then, it will do the update. - * - * @param key the key of the data to update - * @param bytes the content to write for the given key - * @param expectedVersion the version of the content that is expected to be there. - * @return the version of the written data (can be used for optimistic locking). - * @throws MissingDataException if no data for the given key exists - * @throws BadVersionException if the version if the found data doesn't match the expected version. So essentially - * if another update was already done. - * @throws StorageException if anything goes wrong while accessing the storage - */ - def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long -} - -/** - * The VersionedData is a container of data (some bytes) and a version (a Long). - */ -class VersionedData(val data: Array[Byte], val version: Long) {} - -/** - * An AkkaException thrown by the Storage module. - */ -class StorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause) - -/** - * * - * A StorageException thrown when an operation is done on a non existing node. - */ -class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) - -/** - * A StorageException thrown when an operation is done on an existing node, but no node was expected. - */ -class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) - -/** - * A StorageException thrown when an operation causes an optimistic locking failure. - */ -class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) - -/** - * A Storage implementation based on ZooKeeper. - * - * The store method is atomic: - * - so everything is written or nothing is written - * - is isolated, so threadsafe, - * but it will not participate in any transactions. - * - */ -class ZooKeeperStorage(zkClient: AkkaZkClient, root: String = "/peter/storage") extends Storage { - - var path = "" - - //makes sure that the complete root exists on zookeeper. - root.split("/").foreach( - item ⇒ if (item.size > 0) { - - path = path + "/" + item - - if (!zkClient.exists(path)) { - //it could be that another thread is going to create this root node as well, so ignore it when it happens. - try { - zkClient.create(path, "".getBytes, CreateMode.PERSISTENT) - } catch { - case ignore: KeeperException.NodeExistsException ⇒ - } - } - }) - - def toZkPath(key: String): String = { - root + "/" + key - } - - def load(key: String) = try { - val stat = new Stat - val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false) - new VersionedData(arrayOfBytes, stat.getVersion) - } catch { - case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( - String.format("Failed to load key [%s]: no data was found", key), e) - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to load key [%s]", key), e) - } - - def load(key: String, expectedVersion: Long) = try { - val stat = new Stat - val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false) - - if (stat.getVersion != expectedVersion) throw new BadVersionException( - "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + - " but found [" + stat.getVersion + "]") - - new VersionedData(arrayOfBytes, stat.getVersion) - } catch { - case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( - String.format("Failed to load key [%s]: no data was found", key), e) - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to load key [%s]", key), e) - } - - def insertOrOverwrite(key: String, bytes: Array[Byte]) = { - try { - throw new UnsupportedOperationException() - } catch { - case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( - String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to insert key [%s]", key), e) - } - } - - def insert(key: String, bytes: Array[Byte]): Long = { - try { - zkClient.connection.create(root + "/" + key, bytes, CreateMode.PERSISTENT) - //todo: how to get hold of the version. - val version: Long = 0 - version - } catch { - case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( - String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to insert key [%s]", key), e) - } - } - - def exists(key: String) = try { - zkClient.connection.exists(toZkPath(key), false) - } catch { - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to check existance for key [%s]", key), e) - } - - def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = { - try { - zkClient.connection.writeData(root + "/" + key, bytes, expectedVersion.asInstanceOf[Int]) - throw new RuntimeException() - } catch { - case e: KeeperException.BadVersionException ⇒ throw new BadVersionException( - String.format("Failed to update key [%s]: version mismatch", key), e) - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to update key [%s]", key), e) - } - } - - def overwrite(key: String, bytes: Array[Byte]): Long = { - try { - zkClient.connection.writeData(root + "/" + key, bytes) - -1L - } catch { - case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( - String.format("Failed to overwrite key [%s]: a previous entry already exists", key), e) - case e: KeeperException ⇒ throw new StorageException( - String.format("Failed to overwrite key [%s]", key), e) - } - } -} - -object InMemoryStorage { - val InitialVersion = 0; -} - -/** - * An in memory {@link RawStore} implementation. Useful for testing purposes. - */ -final class InMemoryStorage extends Storage { - - private val map = new ConcurrentHashMap[String, VersionedData]() - - def load(key: String) = { - val result = map.get(key) - - if (result == null) throw new MissingDataException( - String.format("Failed to load key [%s]: no data was found", key)) - - result - } - - def load(key: String, expectedVersion: Long) = { - val result = load(key) - - if (result.version != expectedVersion) throw new BadVersionException( - "Failed to load key [" + key + "]: version mismatch, expected [" + result.version + "] " + - "but found [" + expectedVersion + "]") - - result - } - - def exists(key: String) = map.containsKey(key) - - def insert(key: String, bytes: Array[Byte]): Long = { - val version: Long = InMemoryStorage.InitialVersion - val result = new VersionedData(bytes, version) - - val previous = map.putIfAbsent(key, result) - if (previous != null) throw new DataExistsException( - String.format("Failed to insert key [%s]: the key already has been inserted previously", key)) - - version - } - - @tailrec - def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = { - val found = map.get(key) - - if (found == null) throw new MissingDataException( - String.format("Failed to update key [%s], no previous entry exist", key)) - - if (expectedVersion != found.version) throw new BadVersionException( - "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + - " but found [" + found.version + "]") - - val newVersion: Long = expectedVersion + 1 - - if (map.replace(key, found, new VersionedData(bytes, newVersion))) newVersion - else update(key, bytes, expectedVersion) - } - - @tailrec - def overwrite(key: String, bytes: Array[Byte]): Long = { - val current = map.get(key) - - if (current == null) throw new MissingDataException( - String.format("Failed to overwrite key [%s], no previous entry exist", key)) - - val update = new VersionedData(bytes, current.version + 1) - - if (map.replace(key, current, update)) update.version - else overwrite(key, bytes) - } - - def insertOrOverwrite(key: String, bytes: Array[Byte]): Long = { - val version = InMemoryStorage.InitialVersion - val result = new VersionedData(bytes, version) - - val previous = map.putIfAbsent(key, result) - - if (previous == null) result.version - else overwrite(key, bytes) - } -} - -//TODO: To minimize the number of dependencies, should the Storage not be placed in a seperate module? -//class VoldemortRawStorage(storeClient: StoreClient) extends Storage { -// -// def load(Key: String) = { -// try { -// -// } catch { -// case -// } -// } -// -// override def insert(key: String, bytes: Array[Byte]) { -// throw new UnsupportedOperationException() -// } -// -// def update(key: String, bytes: Array[Byte]) { -// throw new UnsupportedOperationException() -// } -//} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala deleted file mode 100755 index b774abd282..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala +++ /dev/null @@ -1,241 +0,0 @@ -package akka.cluster - -import org.scalatest.matchers.MustMatchers -import org.scalatest.WordSpec -import akka.cluster.StorageTestUtils._ - -class InMemoryStorageSpec extends WordSpec with MustMatchers { - - "unversioned load" must { - "throw MissingDataException if non existing key" in { - val store = new InMemoryStorage() - - try { - store.load("foo") - fail() - } catch { - case e: MissingDataException ⇒ - } - } - - "return VersionedData if key existing" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - storage.insert(key, value) - - val result = storage.load(key) - //todo: strange that the implicit store is not found - assertContent(key, value, result.version)(storage) - } - } - - "exist" must { - "return true if value exists" in { - val store = new InMemoryStorage() - val key = "somekey" - store.insert(key, "somevalue".getBytes) - store.exists(key) must be(true) - } - - "return false if value not exists" in { - val store = new InMemoryStorage() - store.exists("somekey") must be(false) - } - } - - "versioned load" must { - "throw MissingDataException if non existing key" in { - val store = new InMemoryStorage() - - try { - store.load("foo", 1) - fail() - } catch { - case e: MissingDataException ⇒ - } - } - - "return VersionedData if key existing and exact version match" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - val storedVersion = storage.insert(key, value) - - val loaded = storage.load(key, storedVersion) - assert(loaded.version == storedVersion) - org.junit.Assert.assertArrayEquals(value, loaded.data) - } - - "throw BadVersionException is version too new" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - val version = storage.insert(key, value) - - try { - storage.load(key, version + 1) - fail() - } catch { - case e: BadVersionException ⇒ - } - } - - "throw BadVersionException is version too old" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - val version = storage.insert(key, value) - - try { - storage.load(key, version - 1) - fail() - } catch { - case e: BadVersionException ⇒ - } - } - } - - "insert" must { - - "place a new value when non previously existed" in { - val storage = new InMemoryStorage() - val key = "somekey" - val oldValue = "oldvalue".getBytes - storage.insert(key, oldValue) - - val result = storage.load(key) - assertContent(key, oldValue)(storage) - assert(InMemoryStorage.InitialVersion == result.version) - } - - "throw MissingDataException when there already exists an entry with the same key" in { - val storage = new InMemoryStorage() - val key = "somekey" - val initialValue = "oldvalue".getBytes - val initialVersion = storage.insert(key, initialValue) - - val newValue = "newValue".getBytes - - try { - storage.insert(key, newValue) - fail() - } catch { - case e: DataExistsException ⇒ - } - - assertContent(key, initialValue, initialVersion)(storage) - } - } - - "update" must { - - "throw MissingDataException when no node exists" in { - val storage = new InMemoryStorage() - - val key = "somekey" - - try { - storage.update(key, "somevalue".getBytes, 1) - fail() - } catch { - case e: MissingDataException ⇒ - } - } - - "replace if previous value exists and no other updates have been done" in { - val storage = new InMemoryStorage() - - //do the initial insert - val key = "foo" - val oldValue = "insert".getBytes - val initialVersion = storage.insert(key, oldValue) - - //do the update the will be the cause of the conflict. - val newValue: Array[Byte] = "update".getBytes - val newVersion = storage.update(key, newValue, initialVersion) - - assertContent(key, newValue, newVersion)(storage) - } - - "throw BadVersionException when already overwritten" in { - val storage = new InMemoryStorage() - - //do the initial insert - val key = "foo" - val oldValue = "insert".getBytes - val initialVersion = storage.insert(key, oldValue) - - //do the update the will be the cause of the conflict. - val newValue = "otherupdate".getBytes - val newVersion = storage.update(key, newValue, initialVersion) - - try { - storage.update(key, "update".getBytes, initialVersion) - fail() - } catch { - case e: BadVersionException ⇒ - } - - assertContent(key, newValue, newVersion)(storage) - } - } - - "overwrite" must { - - "throw MissingDataException when no node exists" in { - val storage = new InMemoryStorage() - val key = "somekey" - - try { - storage.overwrite(key, "somevalue".getBytes) - fail() - } catch { - case e: MissingDataException ⇒ - } - - storage.exists(key) must be(false) - } - - "succeed if previous value exist" in { - val storage = new InMemoryStorage() - val key = "somekey" - val oldValue = "oldvalue".getBytes - val newValue = "somevalue".getBytes - - val initialVersion = storage.insert(key, oldValue) - val overwriteVersion = storage.overwrite(key, newValue) - - assert(overwriteVersion == initialVersion + 1) - assertContent(key, newValue, overwriteVersion)(storage) - } - } - - "insertOrOverwrite" must { - "insert if nothing was inserted before" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - - val version = storage.insertOrOverwrite(key, value) - - assert(version == InMemoryStorage.InitialVersion) - assertContent(key, value, version)(storage) - } - - "overwrite of something existed before" in { - val storage = new InMemoryStorage() - val key = "somekey" - val oldValue = "oldvalue".getBytes - val newValue = "somevalue".getBytes - - val initialVersion = storage.insert(key, oldValue) - - val overwriteVersion = storage.insertOrOverwrite(key, newValue) - - assert(overwriteVersion == initialVersion + 1) - assertContent(key, newValue, overwriteVersion)(storage) - } - } - -} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala b/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala deleted file mode 100644 index 84373842fd..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala +++ /dev/null @@ -1,15 +0,0 @@ -package akka.cluster - -object StorageTestUtils { - - def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: Storage) { - val found = storage.load(key) - assert(found.version == expectedVersion, "versions should match, found[" + found.version + "], expected[" + expectedVersion + "]") - org.junit.Assert.assertArrayEquals(expectedData, found.data) - } - - def assertContent(key: String, expectedData: Array[Byte])(implicit storage: Storage) { - val found = storage.load(key) - org.junit.Assert.assertArrayEquals(expectedData, found.data) - } -} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala deleted file mode 100644 index 48ef923126..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala +++ /dev/null @@ -1,132 +0,0 @@ -package akka.cluster - -import org.scalatest.matchers.MustMatchers -import akka.actor.Actor -import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } -import org.I0Itec.zkclient.ZkServer -import zookeeper.AkkaZkClient -import akka.cluster.StorageTestUtils._ -import java.io.File -import java.util.concurrent.atomic.AtomicLong - -class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { - val dataPath = "_akka_cluster/data" - val logPath = "_akka_cluster/log" - var zkServer: ZkServer = _ - var zkClient: AkkaZkClient = _ - val idGenerator = new AtomicLong - - def generateKey: String = { - "foo" + idGenerator.incrementAndGet() - } - - override def beforeAll() { - /*new File(dataPath).delete() - new File(logPath).delete() - - try { - zkServer = Cluster.startLocalCluster(dataPath, logPath) - Thread.sleep(5000) - Actor.cluster.start() - zkClient = Cluster.newZkClient() - } catch { - case e ⇒ e.printStackTrace() - }*/ - } - - override def afterAll() { - /*zkClient.close() - Actor.cluster.shutdown() - ClusterDeployer.shutdown() - Cluster.shutdownLocalCluster() - Actor.registry.local.shutdownAll() */ - } - - /* - "unversioned load" must { - "throw MissingDataException if non existing key" in { - val storage = new ZooKeeperStorage(zkClient) - - try { - storage.load(generateKey) - fail() - } catch { - case e: MissingDataException ⇒ - } - } - - "return VersionedData if key existing" in { - val storage = new ZooKeeperStorage(zkClient) - val key = generateKey - val value = "somevalue".getBytes - storage.insert(key, value) - - val result = storage.load(key) - //todo: strange that the implicit store is not found - assertContent(key, value, result.version)(storage) - } - } */ - - /*"overwrite" must { - - "throw MissingDataException when there doesn't exist an entry to overwrite" in { - val storage = new ZooKeeperStorage(zkClient) - val key = generateKey - val value = "value".getBytes - - try { - storage.overwrite(key, value) - fail() - } catch { - case e: MissingDataException ⇒ - } - - assert(!storage.exists(key)) - } - - "overwrite if there is an existing value" in { - val storage = new ZooKeeperStorage(zkClient) - val key = generateKey - val oldValue = "oldvalue".getBytes - - storage.insert(key, oldValue) - val newValue = "newValue".getBytes - - val result = storage.overwrite(key, newValue) - //assertContent(key, newValue, result.version)(storage) - } - } - - "insert" must { - - "place a new value when non previously existed" in { - val storage = new ZooKeeperStorage(zkClient) - val key = generateKey - val oldValue = "oldvalue".getBytes - storage.insert(key, oldValue) - - val result = storage.load(key) - assertContent(key, oldValue)(storage) - assert(InMemoryStorage.InitialVersion == result.version) - } - - "throw DataExistsException when there already exists an entry with the same key" in { - val storage = new ZooKeeperStorage(zkClient) - val key = generateKey - val oldValue = "oldvalue".getBytes - - val initialVersion = storage.insert(key, oldValue) - val newValue = "newValue".getBytes - - try { - storage.insert(key, newValue) - fail() - } catch { - case e: DataExistsException ⇒ - } - - assertContent(key, oldValue, initialVersion)(storage) - } - } */ - -} \ No newline at end of file