diff --git a/akka-cluster/src/main/scala/akka/cluster/Storage.scala b/akka-cluster/src/main/scala/akka/cluster/Storage.scala index 3d1533c490..9c2ed81fbf 100755 --- a/akka-cluster/src/main/scala/akka/cluster/Storage.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Storage.scala @@ -9,7 +9,7 @@ import org.apache.zookeeper.{ KeeperException, CreateMode } import org.apache.zookeeper.data.Stat import java.util.concurrent.ConcurrentHashMap import annotation.tailrec -import java.lang.{ UnsupportedOperationException, RuntimeException } +import java.lang.{ RuntimeException, UnsupportedOperationException } /** * Simple abstraction to store an Array of bytes based on some String key. @@ -28,6 +28,8 @@ trait Storage { /** * Loads the VersionedData for the given key. * + * This call doesn't care about the actual version of the data. + * * @param key: the key of the VersionedData to load. * @return the VersionedData for the given entry. * @throws MissingDataException if the entry with the given key doesn't exist. @@ -36,15 +38,17 @@ trait Storage { def load(key: String): VersionedData /** - * Loads the VersionedData for the given key and version. + * Loads the VersionedData for the given key and expectedVersion. + * + * This call can be used for optimistic locking since the version is included. * * @param key: the key of the VersionedData to load - * @param version the version of the VersionedData to load + * @param expectedVersion the version the data to load should have. * @throws MissingDataException if the data with the given key doesn't exist. - * @throws VersioningException if the version of the data is not the same as the given data. + * @throws BadVersionException if the version is not the expected version. * @throws StorageException if anything goes wrong while accessing the storage */ - def load(key: String, version: Long): VersionedData + def load(key: String, expectedVersion: Long): VersionedData /** * Checks if a VersionedData with the given key exists. @@ -60,11 +64,11 @@ trait Storage { * * @param key the key of the Data to insert. * @param bytes the data to insert. - * @return the VersionedData + * @return the version of the written data (can be used for optimistic locking). * @throws DataExistsException when VersionedData with the given Key already exists. * @throws StorageException if anything goes wrong while accessing the storage */ - def insert(key: String, bytes: Array[Byte]): VersionedData + def insert(key: String, bytes: Array[Byte]): Long /** * Inserts the data if there is no data for that key, or overwrites it if it is there. @@ -74,38 +78,42 @@ trait Storage { * * @param key the key of the data * @param bytes the data to insert - * @return the VersionedData that was stored. + * @return the version of the written data (can be used for optimistic locking). * @throws StorageException if anything goes wrong while accessing the storage */ - def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData + def insertOrOverwrite(key: String, bytes: Array[Byte]): Long /** - * Overwrites the current data for the given key. + * Overwrites the current data for the given key. This call doesn't care about the version of the existing data. * * @param key the key of the data to overwrite * @param bytes the data to insert. - * @throws ` when the entry with the given key doesn't exist. + * @return the version of the written data (can be used for optimistic locking). + * @throws MissingDataException when the entry with the given key doesn't exist. * @throws StorageException if anything goes wrong while accessing the storage */ - def overwrite(key: String, bytes: Array[Byte]): VersionedData + def overwrite(key: String, bytes: Array[Byte]): Long /** + * Updates an existing value using an optimistic lock. So it expect the current data to have the expectedVersion + * and only then, it will do the update. + * + * @param key the key of the data to update + * @param bytes the content to write for the given key + * @param expectedVersion the version of the content that is expected to be there. + * @return the version of the written data (can be used for optimistic locking). + * @throws MissingDataException if no data for the given key exists + * @throws BadVersionException if the version if the found data doesn't match the expected version. So essentially + * if another update was already done. * @throws StorageException if anything goes wrong while accessing the storage */ - def update(key: String, versionedData: VersionedData): Unit + def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long } /** * The VersionedData is a container of data (some bytes) and a version (a Long). */ -class VersionedData(val data: Array[Byte], val version: Long) { - - /** - * Creates an updated VersionedData. What happens is that a new VersionedData object is created with the newData - * and a version that is one higher than the current version. - */ - def createUpdate(newData: Array[Byte]): VersionedData = new VersionedData(newData, version + 1) -} +class VersionedData(val data: Array[Byte], val version: Long) {} /** * An AkkaException thrown by the Storage module. @@ -126,7 +134,7 @@ class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) /** * A StorageException thrown when an operation causes an optimistic locking failure. */ -class VersioningException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) +class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) /** * A Storage implementation based on ZooKeeper. @@ -137,11 +145,33 @@ class VersioningException(msg: String = null, cause: java.lang.Throwable = null) * but it will not participate in any transactions. * */ -class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage { +class ZooKeeperStorage(zkClient: AkkaZkClient, root: String = "/peter/storage") extends Storage { + + var path = "" + + //makes sure that the complete root exists on zookeeper. + root.split("/").foreach( + item ⇒ if (item.size > 0) { + + path = path + "/" + item + + if (!zkClient.exists(path)) { + //it could be that another thread is going to create this root node as well, so ignore it when it happens. + try { + zkClient.create(path, "".getBytes, CreateMode.PERSISTENT) + } catch { + case ignore: KeeperException.NodeExistsException ⇒ + } + } + }) + + def toZkPath(key: String): String = { + root + "/" + key + } def load(key: String) = try { val stat = new Stat - val arrayOfBytes = zkClient.connection.readData(key, stat, false) + val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false) new VersionedData(arrayOfBytes, stat.getVersion) } catch { case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( @@ -152,9 +182,9 @@ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage { def load(key: String, expectedVersion: Long) = try { val stat = new Stat - val arrayOfBytes = zkClient.connection.readData(key, stat, false) + val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false) - if (stat.getVersion != expectedVersion) throw new VersioningException( + if (stat.getVersion != expectedVersion) throw new BadVersionException( "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + " but found [" + stat.getVersion + "]") @@ -177,12 +207,12 @@ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage { } } - def insert(key: String, bytes: Array[Byte]): VersionedData = { + def insert(key: String, bytes: Array[Byte]): Long = { try { - zkClient.connection.create(key, bytes, CreateMode.PERSISTENT) - //todo: how to get hold of the reference. + zkClient.connection.create(root + "/" + key, bytes, CreateMode.PERSISTENT) + //todo: how to get hold of the version. val version: Long = 0 - new VersionedData(bytes, version) + version } catch { case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) @@ -192,27 +222,28 @@ class ZooKeeperStorage(zkClient: AkkaZkClient) extends Storage { } def exists(key: String) = try { - zkClient.connection.exists(key, false) + zkClient.connection.exists(toZkPath(key), false) } catch { case e: KeeperException ⇒ throw new StorageException( String.format("Failed to check existance for key [%s]", key), e) } - def update(key: String, versionedData: VersionedData) { + def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = { try { - zkClient.connection.writeData(key, versionedData.data, versionedData.version.asInstanceOf[Int]) + zkClient.connection.writeData(root + "/" + key, bytes, expectedVersion.asInstanceOf[Int]) + throw new RuntimeException() } catch { - case e: KeeperException.BadVersionException ⇒ throw new VersioningException( + case e: KeeperException.BadVersionException ⇒ throw new BadVersionException( String.format("Failed to update key [%s]: version mismatch", key), e) case e: KeeperException ⇒ throw new StorageException( String.format("Failed to update key [%s]", key), e) } } - def overwrite(key: String, bytes: Array[Byte]): VersionedData = { + def overwrite(key: String, bytes: Array[Byte]): Long = { try { - zkClient.connection.writeData(key, bytes) - throw new RuntimeException() + zkClient.connection.writeData(root + "/" + key, bytes) + -1L } catch { case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( String.format("Failed to overwrite key [%s]: a previous entry already exists", key), e) @@ -245,7 +276,7 @@ final class InMemoryStorage extends Storage { def load(key: String, expectedVersion: Long) = { val result = load(key) - if (result.version != expectedVersion) throw new VersioningException( + if (result.version != expectedVersion) throw new BadVersionException( "Failed to load key [" + key + "]: version mismatch, expected [" + result.version + "] " + "but found [" + expectedVersion + "]") @@ -254,7 +285,7 @@ final class InMemoryStorage extends Storage { def exists(key: String) = map.containsKey(key) - def insert(key: String, bytes: Array[Byte]): VersionedData = { + def insert(key: String, bytes: Array[Byte]): Long = { val version: Long = InMemoryStorage.InitialVersion val result = new VersionedData(bytes, version) @@ -262,42 +293,46 @@ final class InMemoryStorage extends Storage { if (previous != null) throw new DataExistsException( String.format("Failed to insert key [%s]: the key already has been inserted previously", key)) - result + version } @tailrec - def update(key: String, updatedData: VersionedData) { - val currentData = map.get(key) + def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = { + val found = map.get(key) - if (currentData == null) throw new MissingDataException( + if (found == null) throw new MissingDataException( String.format("Failed to update key [%s], no previous entry exist", key)) - val expectedVersion = currentData.version + 1 - if (expectedVersion != updatedData.version) throw new VersioningException( + if (expectedVersion != found.version) throw new BadVersionException( "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + - " but found [" + updatedData.version + "]") + " but found [" + found.version + "]") - if (!map.replace(key, currentData, updatedData)) update(key, updatedData) + val newVersion: Long = expectedVersion + 1 + + if (map.replace(key, found, new VersionedData(bytes, newVersion))) newVersion + else update(key, bytes, expectedVersion) } @tailrec - def overwrite(key: String, bytes: Array[Byte]): VersionedData = { - val currentData = map.get(key) + def overwrite(key: String, bytes: Array[Byte]): Long = { + val current = map.get(key) - if (currentData == null) throw new MissingDataException( + if (current == null) throw new MissingDataException( String.format("Failed to overwrite key [%s], no previous entry exist", key)) - val newData = currentData.createUpdate(bytes) - if (map.replace(key, currentData, newData)) newData else overwrite(key, bytes) + val update = new VersionedData(bytes, current.version + 1) + + if (map.replace(key, current, update)) update.version + else overwrite(key, bytes) } - def insertOrOverwrite(key: String, bytes: Array[Byte]): VersionedData = { + def insertOrOverwrite(key: String, bytes: Array[Byte]): Long = { val version = InMemoryStorage.InitialVersion val result = new VersionedData(bytes, version) val previous = map.putIfAbsent(key, result) - if (previous == null) result + if (previous == null) result.version else overwrite(key, bytes) } } diff --git a/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala new file mode 100755 index 0000000000..b774abd282 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/InMemoryStorageSpec.scala @@ -0,0 +1,241 @@ +package akka.cluster + +import org.scalatest.matchers.MustMatchers +import org.scalatest.WordSpec +import akka.cluster.StorageTestUtils._ + +class InMemoryStorageSpec extends WordSpec with MustMatchers { + + "unversioned load" must { + "throw MissingDataException if non existing key" in { + val store = new InMemoryStorage() + + try { + store.load("foo") + fail() + } catch { + case e: MissingDataException ⇒ + } + } + + "return VersionedData if key existing" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + storage.insert(key, value) + + val result = storage.load(key) + //todo: strange that the implicit store is not found + assertContent(key, value, result.version)(storage) + } + } + + "exist" must { + "return true if value exists" in { + val store = new InMemoryStorage() + val key = "somekey" + store.insert(key, "somevalue".getBytes) + store.exists(key) must be(true) + } + + "return false if value not exists" in { + val store = new InMemoryStorage() + store.exists("somekey") must be(false) + } + } + + "versioned load" must { + "throw MissingDataException if non existing key" in { + val store = new InMemoryStorage() + + try { + store.load("foo", 1) + fail() + } catch { + case e: MissingDataException ⇒ + } + } + + "return VersionedData if key existing and exact version match" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + val storedVersion = storage.insert(key, value) + + val loaded = storage.load(key, storedVersion) + assert(loaded.version == storedVersion) + org.junit.Assert.assertArrayEquals(value, loaded.data) + } + + "throw BadVersionException is version too new" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + val version = storage.insert(key, value) + + try { + storage.load(key, version + 1) + fail() + } catch { + case e: BadVersionException ⇒ + } + } + + "throw BadVersionException is version too old" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + val version = storage.insert(key, value) + + try { + storage.load(key, version - 1) + fail() + } catch { + case e: BadVersionException ⇒ + } + } + } + + "insert" must { + + "place a new value when non previously existed" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + storage.insert(key, oldValue) + + val result = storage.load(key) + assertContent(key, oldValue)(storage) + assert(InMemoryStorage.InitialVersion == result.version) + } + + "throw MissingDataException when there already exists an entry with the same key" in { + val storage = new InMemoryStorage() + val key = "somekey" + val initialValue = "oldvalue".getBytes + val initialVersion = storage.insert(key, initialValue) + + val newValue = "newValue".getBytes + + try { + storage.insert(key, newValue) + fail() + } catch { + case e: DataExistsException ⇒ + } + + assertContent(key, initialValue, initialVersion)(storage) + } + } + + "update" must { + + "throw MissingDataException when no node exists" in { + val storage = new InMemoryStorage() + + val key = "somekey" + + try { + storage.update(key, "somevalue".getBytes, 1) + fail() + } catch { + case e: MissingDataException ⇒ + } + } + + "replace if previous value exists and no other updates have been done" in { + val storage = new InMemoryStorage() + + //do the initial insert + val key = "foo" + val oldValue = "insert".getBytes + val initialVersion = storage.insert(key, oldValue) + + //do the update the will be the cause of the conflict. + val newValue: Array[Byte] = "update".getBytes + val newVersion = storage.update(key, newValue, initialVersion) + + assertContent(key, newValue, newVersion)(storage) + } + + "throw BadVersionException when already overwritten" in { + val storage = new InMemoryStorage() + + //do the initial insert + val key = "foo" + val oldValue = "insert".getBytes + val initialVersion = storage.insert(key, oldValue) + + //do the update the will be the cause of the conflict. + val newValue = "otherupdate".getBytes + val newVersion = storage.update(key, newValue, initialVersion) + + try { + storage.update(key, "update".getBytes, initialVersion) + fail() + } catch { + case e: BadVersionException ⇒ + } + + assertContent(key, newValue, newVersion)(storage) + } + } + + "overwrite" must { + + "throw MissingDataException when no node exists" in { + val storage = new InMemoryStorage() + val key = "somekey" + + try { + storage.overwrite(key, "somevalue".getBytes) + fail() + } catch { + case e: MissingDataException ⇒ + } + + storage.exists(key) must be(false) + } + + "succeed if previous value exist" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + val newValue = "somevalue".getBytes + + val initialVersion = storage.insert(key, oldValue) + val overwriteVersion = storage.overwrite(key, newValue) + + assert(overwriteVersion == initialVersion + 1) + assertContent(key, newValue, overwriteVersion)(storage) + } + } + + "insertOrOverwrite" must { + "insert if nothing was inserted before" in { + val storage = new InMemoryStorage() + val key = "somekey" + val value = "somevalue".getBytes + + val version = storage.insertOrOverwrite(key, value) + + assert(version == InMemoryStorage.InitialVersion) + assertContent(key, value, version)(storage) + } + + "overwrite of something existed before" in { + val storage = new InMemoryStorage() + val key = "somekey" + val oldValue = "oldvalue".getBytes + val newValue = "somevalue".getBytes + + val initialVersion = storage.insert(key, oldValue) + + val overwriteVersion = storage.insertOrOverwrite(key, newValue) + + assert(overwriteVersion == initialVersion + 1) + assertContent(key, newValue, overwriteVersion)(storage) + } + } + +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala b/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala new file mode 100644 index 0000000000..84373842fd --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/StorageTestUtils.scala @@ -0,0 +1,15 @@ +package akka.cluster + +object StorageTestUtils { + + def assertContent(key: String, expectedData: Array[Byte], expectedVersion: Long)(implicit storage: Storage) { + val found = storage.load(key) + assert(found.version == expectedVersion, "versions should match, found[" + found.version + "], expected[" + expectedVersion + "]") + org.junit.Assert.assertArrayEquals(expectedData, found.data) + } + + def assertContent(key: String, expectedData: Array[Byte])(implicit storage: Storage) { + val found = storage.load(key) + org.junit.Assert.assertArrayEquals(expectedData, found.data) + } +} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala index 84f91a0453..48ef923126 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ZooKeeperStorageSpec.scala @@ -5,14 +5,25 @@ import akka.actor.Actor import org.scalatest.{ BeforeAndAfterEach, BeforeAndAfterAll, WordSpec } import org.I0Itec.zkclient.ZkServer import zookeeper.AkkaZkClient +import akka.cluster.StorageTestUtils._ +import java.io.File +import java.util.concurrent.atomic.AtomicLong class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { val dataPath = "_akka_cluster/data" val logPath = "_akka_cluster/log" var zkServer: ZkServer = _ var zkClient: AkkaZkClient = _ - /* + val idGenerator = new AtomicLong + + def generateKey: String = { + "foo" + idGenerator.incrementAndGet() + } + override def beforeAll() { + /*new File(dataPath).delete() + new File(logPath).delete() + try { zkServer = Cluster.startLocalCluster(dataPath, logPath) Thread.sleep(5000) @@ -20,39 +31,102 @@ class ZooKeeperStorageSpec extends WordSpec with MustMatchers with BeforeAndAfte zkClient = Cluster.newZkClient() } catch { case e ⇒ e.printStackTrace() - } + }*/ } override def afterAll() { - zkClient.close() + /*zkClient.close() Actor.cluster.shutdown() ClusterDeployer.shutdown() Cluster.shutdownLocalCluster() - Actor.registry.local.shutdownAll() + Actor.registry.local.shutdownAll() */ } -*/ + + /* "unversioned load" must { "throw MissingDataException if non existing key" in { - // val store = new ZooKeeperStorage(zkClient) + val storage = new ZooKeeperStorage(zkClient) - //try { - // store.load("foo") - // fail() - //} catch { - // case e: MissingDataException ⇒ - //} + try { + storage.load(generateKey) + fail() + } catch { + case e: MissingDataException ⇒ + } } - /* "return VersionedData if key existing" in { - val storage = new InMemoryStorage() - val key = "somekey" + val storage = new ZooKeeperStorage(zkClient) + val key = generateKey val value = "somevalue".getBytes storage.insert(key, value) val result = storage.load(key) //todo: strange that the implicit store is not found assertContent(key, value, result.version)(storage) - } */ + } + } */ + + /*"overwrite" must { + + "throw MissingDataException when there doesn't exist an entry to overwrite" in { + val storage = new ZooKeeperStorage(zkClient) + val key = generateKey + val value = "value".getBytes + + try { + storage.overwrite(key, value) + fail() + } catch { + case e: MissingDataException ⇒ + } + + assert(!storage.exists(key)) + } + + "overwrite if there is an existing value" in { + val storage = new ZooKeeperStorage(zkClient) + val key = generateKey + val oldValue = "oldvalue".getBytes + + storage.insert(key, oldValue) + val newValue = "newValue".getBytes + + val result = storage.overwrite(key, newValue) + //assertContent(key, newValue, result.version)(storage) + } } + + "insert" must { + + "place a new value when non previously existed" in { + val storage = new ZooKeeperStorage(zkClient) + val key = generateKey + val oldValue = "oldvalue".getBytes + storage.insert(key, oldValue) + + val result = storage.load(key) + assertContent(key, oldValue)(storage) + assert(InMemoryStorage.InitialVersion == result.version) + } + + "throw DataExistsException when there already exists an entry with the same key" in { + val storage = new ZooKeeperStorage(zkClient) + val key = generateKey + val oldValue = "oldvalue".getBytes + + val initialVersion = storage.insert(key, oldValue) + val newValue = "newValue".getBytes + + try { + storage.insert(key, newValue) + fail() + } catch { + case e: DataExistsException ⇒ + } + + assertContent(key, oldValue, initialVersion)(storage) + } + } */ + } \ No newline at end of file