From 2bbbeba82e163b1cd3b5996f3144e83a93cc7699 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Mon, 13 Jun 2011 16:48:03 +0300 Subject: [PATCH] - more work on the storage functionality --- .../src/main/scala/akka/cluster/Storage.scala | 182 ++++++++----- .../akka/cluster/InMemoryStorageSpec.scala | 251 ++++++++++++++++++ .../test/scala/akka/cluster/StorageSpec.scala | 165 ------------ .../scala/akka/cluster/StorageTestUtils.scala | 15 ++ .../akka/cluster/ZooKeeperStorageSpec.scala | 58 ++++ 5 files changed, 444 insertions(+), 227 deletions(-) mode change 100644 => 100755 akka-cluster/src/main/scala/akka/cluster/Storage.scala create mode 100755 akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala delete mode 100644 akka-cluster/src/test/scala/akka/cluster/StorageSpec.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala create mode 100644 akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala diff --git a/akka-cluster/src/main/scala/akka/cluster/Storage.scala b/akka-cluster/src/main/scala/akka/cluster/Storage.scala old mode 100644 new mode 100755 index 7c39863f56..32519e56d7 --- a/akka-cluster/src/main/scala/akka/cluster/Storage.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Storage.scala @@ -5,9 +5,8 @@ import akka.AkkaException import org.apache.zookeeper.{ KeeperException, CreateMode } import org.apache.zookeeper.data.Stat import java.util.concurrent.ConcurrentHashMap -import org.apache.zookeeper.KeeperException.NoNodeException -import java.lang.UnsupportedOperationException import annotation.tailrec +import java.lang.{ UnsupportedOperationException, RuntimeException } /** * Simple abstraction to store an Array of bytes based on some String key. @@ -15,36 +14,37 @@ import annotation.tailrec * Nothing is being said about ACID, transactions etc. It depends on the implementation * of this Storage interface of what is and isn't done on the lowest level. * - * TODO: Perhaps add a version to the store to prevent lost updates using optimistic locking. - * (This is supported by ZooKeeper). + * The amount of data that is allowed to be insert/updated is implementation specific. The InMemoryStorage + * has no limits, but the ZooKeeperStorage has a maximum size of 1 mb. + * * TODO: Class is up for better names. * TODO: Instead of a String as key, perhaps also a byte-array. */ trait Storage { /** - * Loads the given entry. + * Loads the VersionedData for the given key. * - * @param key: the key of the data to load. - * @return the VersionedData for the given key. - * @throws NoNodeExistsException if the data with the given key doesn't exist. + * @param key: the key of the VersionedData to load. + * @return the VersionedData for the given entry. + * @throws MissingDataException if the entry with the given key doesn't exist. * @throws StorageException if anything goes wrong while accessing the storage */ def load(key: String): VersionedData /** - * Loads the data for the given key and version. + * Loads the VersionedData for the given key and version. * - * @param key: the key of the data to load - * @param version the version of the data to load - * @throws NoNodeExistsException if the data with the given key doesn't exist. - * @throws VersioningMismatchStorageException if the version of the data is not the same as the given data. + * @param key: the key of the VersionedData to load + * @param version the version of the VersionedData to load + * @throws MissingDataException if the data with the given key doesn't exist. + * @throws VersioningException if the version of the data is not the same as the given data. * @throws StorageException if anything goes wrong while accessing the storage */ def load(key: String, version: Long): VersionedData /** - * Checks if a value with the given key exists. + * Checks if a VersionedData with the given key exists. * * @param key the key to check the existence for. * @return true if exists, false if not. @@ -57,19 +57,34 @@ trait Storage { * * @param key the key of the Data to insert. * @param bytes the data to insert. - * @return the version of the inserted data - * @throws NodeExistsException when a Node with the given Key already exists. + * @return the VersionedData + * @throws DataExistsException when VersionedData with the given Key already exists. * @throws StorageException if anything goes wrong while accessing the storage */ def insert(key: String, bytes: Array[Byte]): VersionedData /** - * Stores a array of bytes based on some key. + * Inserts the data if there is no data for that key, or overwrites it if it is there. * - * @throws MissingNodeException when the Node with the given key doesn't exist. + * This is the method you want to call if you just want to save something and don't + * care about any lost update issues. + * + * @param key the key of the data + * @param bytes the data to insert + * @return the VersionedData that was stored. * @throws StorageException if anything goes wrong while accessing the storage */ - def update(key: String, bytes: Array[Byte]): VersionedData + def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData + + /** + * Overwrites the current data for the given key. + * + * @param key the key of the data to overwrite + * @param bytes the data to insert. + * @throws ` when the entry with the given key doesn't exist. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def overwrite(key: String, bytes: Array[Byte]): VersionedData /** * @throws StorageException if anything goes wrong while accessing the storage @@ -98,17 +113,17 @@ class StorageException(msg: String = null, cause: java.lang.Throwable = null) ex * * * A StorageException thrown when an operation is done on a non existing node. */ -class MissingNodeException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) +class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) /** * A StorageException thrown when an operation is done on an existing node, but no node was expected. */ -class NodeExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) +class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) /** * A StorageException thrown when an operation causes an optimistic locking failure. */ -class VersioningMismatchStorageException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) +class VersioningException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) /** * A Storage implementation based on ZooKeeper. @@ -117,59 +132,89 @@ class VersioningMismatchStorageException(msg: String = null, cause: java.lang.Th * - so everything is written or nothing is written * - is isolated, so threadsafe, * but it will not participate in any transactions. - * //todo: unclear, is only a single connection used in the JVM?? * */ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage { def load(key: String) = try { - val arrayOfBytes: Array[Byte] = zkClient.connection.readData(key, new Stat, false) - //Some(arrayOfBytes) - throw new UnsupportedOperationException() + val stat = new Stat + val arrayOfBytes = zkClient.connection.readData(key, stat, false) + new VersionedData(arrayOfBytes, stat.getVersion) } catch { - //todo: improved error messaged - case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException("Failed to load key", e) - case e: KeeperException ⇒ throw new StorageException("failed to load key " + key, e) + case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( + String.format("Failed to load key [%s]: no data was found", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to load key [%s]", key), e) } - def load(key: String, version: Long) = { - throw new UnsupportedOperationException() + def load(key: String, expectedVersion: Long) = try { + val stat = new Stat + val arrayOfBytes = zkClient.connection.readData(key, stat, false) + + if (stat.getVersion != expectedVersion) throw new VersioningException( + "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + + " but found [" + stat.getVersion + "]") + + new VersionedData(arrayOfBytes, stat.getVersion) + } catch { + case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( + String.format("Failed to load key [%s]: no data was found", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to load key [%s]", key), e) + } + + def insertOrOverwrite(key: String, bytes: Array[Byte]) = { + try { + throw new UnsupportedOperationException() + } catch { + case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( + String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to insert key [%s]", key), e) + } } def insert(key: String, bytes: Array[Byte]): VersionedData = { try { - zkClient.connection.create(key, bytes, CreateMode.PERSISTENT); - throw new UnsupportedOperationException() + zkClient.connection.create(key, bytes, CreateMode.PERSISTENT) + //todo: how to get hold of the reference. + val version: Long = 0 + new VersionedData(bytes, version) } catch { - //todo: improved error messaged - case e: KeeperException.NodeExistsException ⇒ throw new NodeExistsException("failed to insert key " + key, e) - case e: KeeperException ⇒ throw new StorageException("failed to insert key " + key, e) + case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( + String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to insert key [%s]", key), e) } } def exists(key: String) = try { zkClient.connection.exists(key, false) } catch { - //todo: improved error messaged - case e: KeeperException ⇒ throw new StorageException("failed to check for existance on key " + key, e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to check existance for key [%s]", key), e) } - def update(key: String, versionedData: VersionedData) = try { - zkClient.connection.writeData(key, versionedData.data, versionedData.version.asInstanceOf[Int]) - } catch { - //todo: improved error messaged - case e: KeeperException.BadVersionException ⇒ throw new VersioningMismatchStorageException() - case e: KeeperException ⇒ throw new StorageException("failed to check for existance on key " + key, e) + def update(key: String, versionedData: VersionedData) { + try { + zkClient.connection.writeData(key, versionedData.data, versionedData.version.asInstanceOf[Int]) + } catch { + case e: KeeperException.BadVersionException ⇒ throw new VersioningException( + String.format("Failed to update key [%s]: version mismatch", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to update key [%s]", key), e) + } } - def update(key: String, bytes: Array[Byte]): VersionedData = { + def overwrite(key: String, bytes: Array[Byte]): VersionedData = { try { zkClient.connection.writeData(key, bytes) throw new RuntimeException() } catch { - //todo: improved error messaged - case e: KeeperException.NoNodeException ⇒ throw new MissingNodeException("failed to update key ", e) - case e: KeeperException ⇒ throw new StorageException("failed to update key ", e) + case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( + String.format("Failed to overwrite key [%s]: a previous entry already exists", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to overwrite key [%s]", key), e) } } } @@ -188,8 +233,8 @@ final class InMemoryStorage extends Storage { def load(key: String) = { val result = map.get(key) - if (result == null) throw new MissingNodeException( - String.format("Failed to load data for key [%s]: no data was found", key)) + if (result == null) throw new MissingDataException( + String.format("Failed to load key [%s]: no data was found", key)) result } @@ -197,8 +242,8 @@ final class InMemoryStorage extends Storage { def load(key: String, expectedVersion: Long) = { val result = load(key) - if (result.version != expectedVersion) throw new VersioningMismatchStorageException( - "Failed to load data for key [" + key + "]: version mismatch, expected [" + result.version + "] " + + if (result.version != expectedVersion) throw new VersioningException( + "Failed to load key [" + key + "]: version mismatch, expected [" + result.version + "] " + "but found [" + expectedVersion + "]") result @@ -211,7 +256,7 @@ final class InMemoryStorage extends Storage { val result = new VersionedData(bytes, version) val previous = map.putIfAbsent(key, result) - if (previous != null) throw new NodeExistsException( + if (previous != null) throw new DataExistsException( String.format("Failed to insert key [%s]: the key already has been inserted previously", key)) result @@ -221,23 +266,36 @@ final class InMemoryStorage extends Storage { def update(key: String, updatedData: VersionedData) { val currentData = map.get(key) - if (currentData == null) throw new MissingNodeException( - String.format("Failed to update data for key [%s], no previous entry exist", key)) + if (currentData == null) throw new MissingDataException( + String.format("Failed to update key [%s], no previous entry exist", key)) val expectedVersion = currentData.version + 1 - if (expectedVersion != updatedData.version) throw new VersioningMismatchStorageException( - "Failed to update data for key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + + if (expectedVersion != updatedData.version) throw new VersioningException( + "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + " but found [" + updatedData.version + "]") if (!map.replace(key, currentData, updatedData)) update(key, updatedData) } - def update(key: String, bytes: Array[Byte]): VersionedData = { - if (map.get(key) == null) throw new NoNodeException( - String.format("Failed to update key [%s]: no previous insert of this key exists", key)) + @tailrec + def overwrite(key: String, bytes: Array[Byte]): VersionedData = { + val currentData = map.get(key) - //smap.put(key, bytes) - throw new UnsupportedOperationException() + if (currentData == null) throw new MissingDataException( + String.format("Failed to overwrite key [%s], no previous entry exist", key)) + + val newData = currentData.createUpdate(bytes) + if (map.replace(key, currentData, newData)) newData else overwrite(key, bytes) + } + + def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData = { + val version = InMemoryStorage.InitialVersion + val result = new VersionedData(bytes, version) + + val previous = map.putIfAbsent(key, result) + + if (previous == null) result + else overwrite(key, bytes) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala new file mode 100755 index 0000000000..7f3c6a26cd --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala @@ -0,0 +1,251 @@ +package akka.cluster + +import org.scalatest.matchers.MustMatchers +import org.scalatest.WordSpec +import akka.cluster.StorageTestUtils._ + +class InMemoryStorageSpec extends WordSpec with MustMatchers { + + "unversioned load" must { + "throw MissingDataException if non existing key" in { + val store = new InMemoryStorage() + + try { + store.load("foo") + fail() + } catch { + case e: MissingDataException ⇒ + } + } + + "return VersionedData if key existing" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + storage.insert(key, value) + + val result = storage.load(key) + //todo: strange that the implicit store is not found + assertContent(key, value, result.version)(storage) + } + } + + "exist" must { + "return true if value exists" in { + val store = new InMemoryStorage() + val key = "somekey" + store.insert(key, "somevalue".getBytes) + store.exists(key) must be(true) + } + + "return false if value not exists" in { + val store = new InMemoryStorage() + store.exists("somekey") must be(false) + } + } + + "versioned load" must { + "throw MissingDataException if non existing key" in { + val store = new InMemoryStorage() + + try { + store.load("foo", 1) + fail() + } catch { + case e: MissingDataException ⇒ + } + } + + "return VersionedData if key existing and exact version match" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + val stored = storage.insert(key, value) + + val result = storage.load(key, stored.version) + assert(result.version == stored.version) + assert(result.data == stored.data) + } + + "throw VersioningException is version too new" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + val stored = storage.insert(key, value) + + try { + storage.load(key, stored.version + 1) + fail() + } catch { + case e: VersioningException ⇒ + } + } + + "throw VersioningException is version too old" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + val stored = storage.insert(key, value) + + try { + storage.load(key, stored.version - 1) + fail() + } catch { + case e: VersioningException ⇒ + } + } + } + + "insert" must { + + "place a new value when non previously existed" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + storage.insert(key, oldValue) + + val result = storage.load(key) + assertContent(key, oldValue)(storage) + assert(InMemoryStorage.InitialVersion == result.version) + } + + "throw MissingDataException when there already exists an entry with the same key" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + + val oldVersionedData = storage.insert(key, oldValue) + + val newValue = "newValue".getBytes + + try { + storage.insert(key, newValue) + fail() + } catch { + case e: DataExistsException ⇒ + } + + //make sure that the old value was not changed + assert(oldVersionedData == storage.load(key)) + } + } + + "update" must { + + "throw MissingDataException when no node exists" in { + val storage = new InMemoryStorage() + + val key = "somekey" + + try { + storage.update(key, new VersionedData("somevalue".getBytes, 1)) + fail() + } catch { + case e: MissingDataException ⇒ + } + } + + "replace if previous value exists and no other updates have been done" in { + val storage = new InMemoryStorage() + + //do the initial insert + val key = "foo" + val oldValue = "insert".getBytes + val insert = storage.insert(key, oldValue) + + //do the update the will be the cause of the conflict. + val updateValue = "update".getBytes + val update = insert.createUpdate(updateValue) + storage.update(key, update) + + assertContent(key, update.data, update.version)(storage) + } + + "throw VersioningException when already overwritten" in { + val storage = new InMemoryStorage() + + //do the initial insert + val key = "foo" + val oldValue = "insert".getBytes + val insert = storage.insert(key, oldValue) + + //do the update the will be the cause of the conflict. + val otherUpdateValue = "otherupdate".getBytes + val otherUpdate = insert.createUpdate(otherUpdateValue) + storage.update(key, otherUpdate) + + val update = insert.createUpdate("update".getBytes) + + try { + storage.update(key, update) + fail() + } catch { + case e: VersioningException ⇒ + } + + assertContent(key, otherUpdate.data, otherUpdate.version)(storage) + } + } + + "overwrite" must { + + "throw MissingDataException when no node exists" in { + val storage = new InMemoryStorage() + val key = "somekey" + + try { + storage.overwrite(key, "somevalue".getBytes) + fail() + } catch { + case e: MissingDataException ⇒ + } + + storage.exists(key) must be(false) + } + + "succeed if previous value exist" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + val newValue: Array[Byte] = "somevalue".getBytes + + val initialInsert: VersionedData = storage.insert(key, oldValue) + + val result: VersionedData = storage.overwrite(key, newValue) + + assert(result.version == initialInsert.version + 1) + assert(result.data == newValue) + storage.load(key) must be eq (result) + } + } + + "insertOrOverwrite" must { + "insert if nothing was inserted before" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + + val result = storage.insertOrOverwrite(key, value) + + assert(result.version == InMemoryStorage.InitialVersion) + assert(result.data == value) + storage.load(key) must be eq (result) + } + + "overwrite of something existed before" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + val newValue = "somevalue".getBytes + + val initialInsert = storage.insert(key, oldValue) + + val result = storage.insertOrOverwrite(key, newValue) + + assert(result.version == initialInsert.version + 1) + assert(result.data == newValue) + storage.load(key) must be eq (result) + } + } + +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/StorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/StorageSpec.scala deleted file mode 100644 index 535313c3a5..0000000000 --- a/akka-cluster/src/test/scala/akka/cluster/StorageSpec.scala +++ /dev/null @@ -1,165 +0,0 @@ -package akka.cluster - -import org.scalatest.matchers.MustMatchers -import org.scalatest.WordSpec - -class InMemoryStorageSpec extends WordSpec with MustMatchers { - - "unversioned load" must { - "throw MissingNodeException if non existing key" in { - val store = new InMemoryStorage() - - try { - store.load("foo") - fail() - } catch { - case e: MissingNodeException ⇒ - } - } - - "return VersionedData if key existing" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - storage.insert(key, value) - - val result = storage.load(key) - //todo: strange that the implicit store is not found - assertContent(key, value, result.version)(storage) - } - } - - "exist" must { - "return true if value exists" in { - val store = new InMemoryStorage() - val key = "somekey" - store.insert(key, "somevalue".getBytes) - store.exists(key) must be(true) - } - - "return false if value not exists" in { - val store = new InMemoryStorage() - store.exists("somekey") must be(false) - } - } - - "versioned load" must { - "throw MissingNodeException if non existing key" in { - val store = new InMemoryStorage() - - try { - store.load("foo", 1) - fail() - } catch { - case e: MissingNodeException ⇒ - } - } - - "return VersionedData if key existing and exact version match" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - val stored = storage.insert(key, value) - - val result = storage.load(key, stored.version) - assert(result.version == stored.version) - assert(result.data == stored.data) - } - - "throw VersioningMismatchStorageException is version too new" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - val stored = storage.insert(key, value) - - try { - storage.load(key, stored.version + 1) - fail() - } catch { - case e: VersioningMismatchStorageException ⇒ - } - } - - "throw VersioningMismatchStorageException is version too old" in { - val storage = new InMemoryStorage() - val key = "somekey" - val value = "somevalue".getBytes - val stored = storage.insert(key, value) - - try { - storage.load(key, stored.version - 1) - fail() - } catch { - case e: VersioningMismatchStorageException ⇒ - } - } - } - - "insert" must { - - "place a new value when non previously existed" in { - val storage = new InMemoryStorage() - val key = "somekey" - val oldValue = "oldvalue".getBytes - storage.insert(key, oldValue) - - val result = storage.load(key) - assertContent(key, oldValue)(storage) - assert(InMemoryStorage.InitialVersion == result.version) - } - - "throw MissingNodeException when there already exists an entry with the same key" in { - val storage = new InMemoryStorage() - val key = "somekey" - val oldValue = "oldvalue".getBytes - - val oldVersionedData = storage.insert(key, oldValue) - - val newValue = "newValue".getBytes - - try { - storage.insert(key, newValue) - fail() - } catch { - case e: NodeExistsException ⇒ - } - - //make sure that the old value was not changed - assert(oldVersionedData == storage.load(key)) - } - } - - "update with versioning" must { - - "throw NoNodeException when no node exists" in { - val storage = new InMemoryStorage() - - val key = "somekey" - - try { - storage.update(key, new VersionedData("somevalue".getBytes, 1)) - fail() - } catch { - case e: MissingNodeException ⇒ - } - } - - "throw OptimisticLockException when ..." in { - - } - - "replace" in { - } - } - - def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: InMemoryStorage) { - val found = storage.load(key) - assert(found.version == expectedVersion) - assert(expectedData == found.data) //todo: structural equals - } - - def assertContent(key: String, expectedData: Array[Byte])(implicit storage: InMemoryStorage) { - val found = storage.load(key) - assert(expectedData == found.data) //todo: structural equals - } -} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala b/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala new file mode 100644 index 0000000000..99ceaf0070 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala @@ -0,0 +1,15 @@ +package akka.cluster + +object StorageTestUtils { + + def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: InMemoryStorage) { + val found = storage.load(key) + assert(found.version == expectedVersion) + assert(expectedData == found.data) //todo: structural equals + } + + def assertContent(key: String, expectedData: Array[Byte])(implicit storage: InMemoryStorage) { + val found = storage.load(key) + assert(expectedData == found.data) //todo: structural equals + } +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala new file mode 100644 index 0000000000..a4bda26c77 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala @@ -0,0 +1,58 @@ +package akka.cluster + +import org.scalatest.matchers.MustMatchers +import akka.actor.Actor +import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } +import org.I0Itec.zkclient.ZkServer +import zookeeper.AkkaZkClient + +class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { + val dataPath = "_akka_cluster/data" + val logPath = "_akka_cluster/log" + var zkServer: ZkServer = _ + var zkClient: AkkaZkClient = _ + + override def beforeAll() { + try { + zkServer = Cluster.startLocalCluster(dataPath, logPath) + Thread.sleep(5000) + Actor.cluster.start() + zkClient = Cluster.newZkClient() + } catch { + case e ⇒ e.printStackTrace() + } + } + + override def afterAll() { + zkClient.close() + Actor.cluster.shutdown() + ClusterDeployer.shutdown() + Cluster.shutdownLocalCluster() + Actor.registry.local.shutdownAll() + } + + "unversioned load" must { + "throw MissingDataException if non existing key" in { + val store = new ZooKeeperStorage(zkClient) + + //try { + // store.load("foo") + // fail() + //} catch { + // case e: MissingDataException ⇒ + //} + } + + /* + "return VersionedData if key existing" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + storage.insert(key, value) + + val result = storage.load(key) + //todo: strange that the implicit store is not found + assertContent(key, value, result.version)(storage) + } */ + } +} \ No newline at end of file