From fcea22faf620930ffd7aaa991cdf3b1c284d2998 Mon Sep 17 00:00:00 2001 From: Peter Veentjer Date: Wed, 29 Jun 2011 13:48:09 +0300 Subject: [PATCH] added missing storage dir --- .../src/main/scala/akka/actor/ActorRef.scala | 6 + .../scala/akka/cluster/storage/Storage.scala | 358 ++++++++++++++++++ .../RoundRobin1ReplicaMultiJvmSpec.scala | 9 + .../RoundRobin2ReplicasMultiJvmSpec.scala | 17 + project/build/MultiJvmTests.scala | 57 ++- 5 files changed, 439 insertions(+), 8 deletions(-) create mode 100755 akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 3cfea4d22b..a29eea6798 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -1196,9 +1196,12 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR * Sends a message asynchronously, returning a future which may eventually hold the reply. */ def ?(message: Any, timeout: Actor.Timeout = Actor.noTimeoutGiven)(implicit channel: UntypedChannel = NullChannel, implicitTimeout: Actor.Timeout = Actor.defaultTimeout): Future[Any] = { + //todo: so it can happen that a message is posted after the actor has been shut down (the isRunning and postMessageToMailboxAndCreateFutureResultWithTimeout + //are not atomic. if (isRunning) { val realTimeout = if (timeout eq Actor.noTimeoutGiven) implicitTimeout else timeout postMessageToMailboxAndCreateFutureResultWithTimeout(message, realTimeout.duration.toMillis, channel) + //todo: there is no after check if the running state is still true.. so no 'repairing' } else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") } @@ -1209,8 +1212,11 @@ trait ScalaActorRef extends ActorRefShared with ForwardableChannel { ref: ActorR * Works with '!' and '?'/'ask'. */ def forward(message: Any)(implicit channel: ForwardableChannel) = { + //todo: so it can happen that a message is posted after the actor has been shut down (the isRunning and postMessageToMailbox + //are not atomic. if (isRunning) { postMessageToMailbox(message, channel.channel) + //todo: there is no after check if the running state is still true.. so no 'repairing' } else throw new ActorInitializationException( "Actor has not been started, you need to invoke 'actor.start()' before using it") } diff --git a/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala b/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala new file mode 100755 index 0000000000..5718d41fe5 --- /dev/null +++ b/akka-cluster/src/main/scala/akka/cluster/storage/Storage.scala @@ -0,0 +1,358 @@ +package akka.cluster.storage + +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +import akka.cluster.zookeeper.AkkaZkClient +import akka.AkkaException +import org.apache.zookeeper.{ KeeperException, CreateMode } +import org.apache.zookeeper.data.Stat +import java.util.concurrent.ConcurrentHashMap +import annotation.tailrec +import java.lang.{ RuntimeException, UnsupportedOperationException } + +/** + * Simple abstraction to store an Array of bytes based on some String key. + * + * Nothing is being said about ACID, transactions etc. It depends on the implementation + * of this Storage interface of what is and isn't done on the lowest level. + * + * The amount of data that is allowed to be insert/updated is implementation specific. The InMemoryStorage + * has no limits, but the ZooKeeperStorage has a maximum size of 1 mb. + * + * TODO: Class is up for better names. + * TODO: Instead of a String as key, perhaps also a byte-array. + */ +trait Storage { + + /** + * Loads the VersionedData for the given key. + * + * This call doesn't care about the actual version of the data. + * + * @param key: the key of the VersionedData to load. + * @return the VersionedData for the given entry. + * @throws MissingDataException if the entry with the given key doesn't exist. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def load(key: String): VersionedData + + /** + * Loads the VersionedData for the given key and expectedVersion. + * + * This call can be used for optimistic locking since the version is included. + * + * @param key: the key of the VersionedData to load + * @param expectedVersion the version the data to load should have. + * @throws MissingDataException if the data with the given key doesn't exist. + * @throws BadVersionException if the version is not the expected version. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def load(key: String, expectedVersion: Long): VersionedData + + /** + * Checks if a VersionedData with the given key exists. + * + * @param key the key to check the existence for. + * @return true if exists, false if not. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def exists(key: String): Boolean + + /** + * Inserts a byte-array based on some key. + * + * @param key the key of the Data to insert. + * @param bytes the data to insert. + * @return the version of the written data (can be used for optimistic locking). + * @throws DataExistsException when VersionedData with the given Key already exists. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def insert(key: String, bytes: Array[Byte]): Long + + /** + * Inserts the data if there is no data for that key, or overwrites it if it is there. + * + * This is the method you want to call if you just want to save something and don't + * care about any lost update issues. + * + * @param key the key of the data + * @param bytes the data to insert + * @return the version of the written data (can be used for optimistic locking). + * @throws StorageException if anything goes wrong while accessing the storage + */ + def insertOrOverwrite(key: String, bytes: Array[Byte]): Long + + /** + * Overwrites the current data for the given key. This call doesn't care about the version of the existing data. + * + * @param key the key of the data to overwrite + * @param bytes the data to insert. + * @return the version of the written data (can be used for optimistic locking). + * @throws MissingDataException when the entry with the given key doesn't exist. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def overwrite(key: String, bytes: Array[Byte]): Long + + /** + * Updates an existing value using an optimistic lock. So it expect the current data to have the expectedVersion + * and only then, it will do the update. + * + * @param key the key of the data to update + * @param bytes the content to write for the given key + * @param expectedVersion the version of the content that is expected to be there. + * @return the version of the written data (can be used for optimistic locking). + * @throws MissingDataException if no data for the given key exists + * @throws BadVersionException if the version if the found data doesn't match the expected version. So essentially + * if another update was already done. + * @throws StorageException if anything goes wrong while accessing the storage + */ + def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long +} + +/** + * The VersionedData is a container of data (some bytes) and a version (a Long). + */ +class VersionedData(val data: Array[Byte], val version: Long) {} + +/** + * An AkkaException thrown by the Storage module. + */ +class StorageException(msg: String = null, cause: java.lang.Throwable = null) extends AkkaException(msg, cause) + +/** + * * + * A StorageException thrown when an operation is done on a non existing node. + */ +class MissingDataException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) + +/** + * A StorageException thrown when an operation is done on an existing node, but no node was expected. + */ +class DataExistsException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) + +/** + * A StorageException thrown when an operation causes an optimistic locking failure. + */ +class BadVersionException(msg: String = null, cause: java.lang.Throwable = null) extends StorageException(msg, cause) + +/** + * A Storage implementation based on ZooKeeper. + * + * The store method is atomic: + * - so everything is written or nothing is written + * - is isolated, so threadsafe, + * but it will not participate in any transactions. + * + */ +class ZooKeeperStorage(zkClient: AkkaZkClient, root: String = "/peter/storage") extends Storage { + + var path = "" + + //makes sure that the complete root exists on zookeeper. + root.split("/").foreach( + item ⇒ if (item.size > 0) { + + path = path + "/" + item + + if (!zkClient.exists(path)) { + //it could be that another thread is going to create this root node as well, so ignore it when it happens. + try { + zkClient.create(path, "".getBytes, CreateMode.PERSISTENT) + } catch { + case ignore: KeeperException.NodeExistsException ⇒ + } + } + }) + + def toZkPath(key: String): String = { + root + "/" + key + } + + def load(key: String) = try { + val stat = new Stat + val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false) + new VersionedData(arrayOfBytes, stat.getVersion) + } catch { + case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( + String.format("Failed to load key [%s]: no data was found", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to load key [%s]", key), e) + } + + def load(key: String, expectedVersion: Long) = try { + val stat = new Stat + val arrayOfBytes = zkClient.connection.readData(root + "/" + key, stat, false) + + if (stat.getVersion != expectedVersion) throw new BadVersionException( + "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + + " but found [" + stat.getVersion + "]") + + new VersionedData(arrayOfBytes, stat.getVersion) + } catch { + case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( + String.format("Failed to load key [%s]: no data was found", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to load key [%s]", key), e) + } + + def insertOrOverwrite(key: String, bytes: Array[Byte]) = { + try { + throw new UnsupportedOperationException() + } catch { + case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( + String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to insert key [%s]", key), e) + } + } + + def insert(key: String, bytes: Array[Byte]): Long = { + try { + zkClient.connection.create(root + "/" + key, bytes, CreateMode.PERSISTENT) + //todo: how to get hold of the version. + val version: Long = 0 + version + } catch { + case e: KeeperException.NodeExistsException ⇒ throw new DataExistsException( + String.format("Failed to insert key [%s]: an entry already exists with the same key", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to insert key [%s]", key), e) + } + } + + def exists(key: String) = try { + zkClient.connection.exists(toZkPath(key), false) + } catch { + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to check existance for key [%s]", key), e) + } + + def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = { + try { + zkClient.connection.writeData(root + "/" + key, bytes, expectedVersion.asInstanceOf[Int]) + throw new RuntimeException() + } catch { + case e: KeeperException.BadVersionException ⇒ throw new BadVersionException( + String.format("Failed to update key [%s]: version mismatch", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to update key [%s]", key), e) + } + } + + def overwrite(key: String, bytes: Array[Byte]): Long = { + try { + zkClient.connection.writeData(root + "/" + key, bytes) + -1L + } catch { + case e: KeeperException.NoNodeException ⇒ throw new MissingDataException( + String.format("Failed to overwrite key [%s]: a previous entry already exists", key), e) + case e: KeeperException ⇒ throw new StorageException( + String.format("Failed to overwrite key [%s]", key), e) + } + } +} + +object InMemoryStorage { + val InitialVersion = 0; +} + +/** + * An in memory {@link RawStore} implementation. Useful for testing purposes. + */ +final class InMemoryStorage extends Storage { + + private val map = new ConcurrentHashMap[String, VersionedData]() + + def load(key: String) = { + val result = map.get(key) + + if (result == null) throw new MissingDataException( + String.format("Failed to load key [%s]: no data was found", key)) + + result + } + + def load(key: String, expectedVersion: Long) = { + val result = load(key) + + if (result.version != expectedVersion) throw new BadVersionException( + "Failed to load key [" + key + "]: version mismatch, expected [" + result.version + "] " + + "but found [" + expectedVersion + "]") + + result + } + + def exists(key: String) = map.containsKey(key) + + def insert(key: String, bytes: Array[Byte]): Long = { + val version: Long = InMemoryStorage.InitialVersion + val result = new VersionedData(bytes, version) + + val previous = map.putIfAbsent(key, result) + if (previous != null) throw new DataExistsException( + String.format("Failed to insert key [%s]: the key already has been inserted previously", key)) + + version + } + + @tailrec + def update(key: String, bytes: Array[Byte], expectedVersion: Long): Long = { + val found = map.get(key) + + if (found == null) throw new MissingDataException( + String.format("Failed to update key [%s], no previous entry exist", key)) + + if (expectedVersion != found.version) throw new BadVersionException( + "Failed to update key [" + key + "]: version mismatch, expected [" + expectedVersion + "]" + + " but found [" + found.version + "]") + + val newVersion: Long = expectedVersion + 1 + + if (map.replace(key, found, new VersionedData(bytes, newVersion))) newVersion + else update(key, bytes, expectedVersion) + } + + @tailrec + def overwrite(key: String, bytes: Array[Byte]): Long = { + val current = map.get(key) + + if (current == null) throw new MissingDataException( + String.format("Failed to overwrite key [%s], no previous entry exist", key)) + + val update = new VersionedData(bytes, current.version + 1) + + if (map.replace(key, current, update)) update.version + else overwrite(key, bytes) + } + + def insertOrOverwrite(key: String, bytes: Array[Byte]): Long = { + val version = InMemoryStorage.InitialVersion + val result = new VersionedData(bytes, version) + + val previous = map.putIfAbsent(key, result) + + if (previous == null) result.version + else overwrite(key, bytes) + } +} + +//TODO: To minimize the number of dependencies, should the Storage not be placed in a seperate module? +//class VoldemortRawStorage(storeClient: StoreClient) extends Storage { +// +// def load(Key: String) = { +// try { +// +// } catch { +// case +// } +// } +// +// override def insert(key: String, bytes: Array[Byte]) { +// throw new UnsupportedOperationException() +// } +// +// def update(key: String, bytes: Array[Byte]) { +// throw new UnsupportedOperationException() +// } +//} \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala index 668acb3376..977cb6505e 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala @@ -16,6 +16,9 @@ import akka.actor._ import akka.actor.Actor._ import akka.config.Config +/** + * todo: What is the main purpose of this test? + */ object RoundRobin1ReplicaMultiJvmSpec { val NrOfNodes = 2 @@ -27,6 +30,9 @@ object RoundRobin1ReplicaMultiJvmSpec { } } +/** + * This node makes use of the remote actor and + */ class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { import RoundRobin1ReplicaMultiJvmSpec._ @@ -65,6 +71,9 @@ class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with Be } } +/** + * This node checks if the basic behavior of the actor is working correctly. + */ class RoundRobin1ReplicaMultiJvmNode2 extends WordSpec with MustMatchers { import RoundRobin1ReplicaMultiJvmSpec._ diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala index a65abd2b1c..febd898a18 100644 --- a/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_2_replicas/RoundRobin2ReplicasMultiJvmSpec.scala @@ -16,6 +16,10 @@ import akka.actor._ import akka.actor.Actor._ import akka.config.Config +/** + * When a MultiJvmNode is started, will it automatically be part of the cluster (so will it automatically be eligible + * for running actors, or will it be just a 'client' talking to the cluster. + */ object RoundRobin2ReplicasMultiJvmSpec { val NrOfNodes = 3 @@ -28,6 +32,9 @@ object RoundRobin2ReplicasMultiJvmSpec { } } +/** + * What is the purpose of this node? Is this just a node for the cluster to make use of? + */ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { import RoundRobin2ReplicasMultiJvmSpec._ @@ -40,16 +47,21 @@ class RoundRobin2ReplicasMultiJvmNode1 extends WordSpec with MustMatchers with B System.getProperty("akka.cluster.nodename", "") must be("node1") System.getProperty("akka.cluster.port", "") must be("9991") + //wait till node 1 has started. Cluster.barrier("start-node1", NrOfNodes) { Cluster.node.start() } + //wait till ndoe 2 has started. Cluster.barrier("start-node2", NrOfNodes) {} + //wait till node 3 has started. Cluster.barrier("start-node3", NrOfNodes) {} + //wait till an actor reference on node 2 has become available. Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {} + //wait till the node 2 has send a message to the replica's. Cluster.barrier("send-message-from-node2-to-replicas", NrOfNodes) {} Cluster.node.shutdown() @@ -77,14 +89,18 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers { System.getProperty("akka.cluster.nodename", "") must be("node2") System.getProperty("akka.cluster.port", "") must be("9992") + //wait till node 1 has started. Cluster.barrier("start-node1", NrOfNodes) {} + //wait till node 2 has started. Cluster.barrier("start-node2", NrOfNodes) { Cluster.node.start() } + //wait till node 3 has started. Cluster.barrier("start-node3", NrOfNodes) {} + //check if the actorRef is the expected remoteActorRef. var hello: ActorRef = null Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) { hello = Actor.actorOf[HelloWorld]("service-hello") @@ -94,6 +110,7 @@ class RoundRobin2ReplicasMultiJvmNode2 extends WordSpec with MustMatchers { } Cluster.barrier("send-message-from-node2-to-replicas", NrOfNodes) { + //todo: is there a reason to check for null again since it already has been done in the previous block. hello must not equal (null) val replies = collection.mutable.Map.empty[String, Int] diff --git a/project/build/MultiJvmTests.scala b/project/build/MultiJvmTests.scala index 4cf83e3076..46d3956d62 100644 --- a/project/build/MultiJvmTests.scala +++ b/project/build/MultiJvmTests.scala @@ -2,13 +2,13 @@ import sbt._ import sbt.Process import java.io.File import java.lang.{ProcessBuilder => JProcessBuilder} -import java.io.{BufferedReader, Closeable, InputStream, InputStreamReader, IOException, OutputStream} -import java.io.{PipedInputStream, PipedOutputStream} -import scala.concurrent.SyncVar +import java.io.{BufferedReader, InputStream, InputStreamReader, OutputStream} trait MultiJvmTests extends DefaultProject { def multiJvmTestName = "MultiJvm" + def multiJvmOptions: Seq[String] = Seq.empty + def multiJvmExtraOptions(className: String): Seq[String] = Seq.empty val MultiJvmTestName = multiJvmTestName @@ -29,13 +29,16 @@ trait MultiJvmTests extends DefaultProject { lazy val multiJvmTestAll = multiJvmTestAllAction def multiJvmTestAction = multiJvmMethod(getMultiJvmTests, testScalaOptions) + def multiJvmRunAction = multiJvmMethod(getMultiJvmApps, runScalaOptions) + def multiJvmTestAllAction = multiJvmTask(Nil, getMultiJvmTests, testScalaOptions) def multiJvmMethod(getMultiTestsMap: => Map[String, Seq[String]], scalaOptions: String => Seq[String]) = { - task { args => - multiJvmTask(args.toList, getMultiTestsMap, scalaOptions) - } completeWith(getMultiTestsMap.keys.toList) + task { + args => + multiJvmTask(args.toList, getMultiTestsMap, scalaOptions) + } completeWith (getMultiTestsMap.keys.toList) } def multiJvmTask(tests: List[String], getMultiTestsMap: => Map[String, Seq[String]], scalaOptions: String => Seq[String]) = { @@ -58,17 +61,26 @@ trait MultiJvmTests extends DefaultProject { } dependsOn (testCompile) } + /** + * todo: Documentation + */ def getMultiJvmTests(): Map[String, Seq[String]] = { val allTests = testCompileConditional.analysis.allTests.toList.map(_.className) filterMultiJvmTests(allTests) } + /** + * todo: Documentation + */ def getMultiJvmApps(): Map[String, Seq[String]] = { val allApps = (mainCompileConditional.analysis.allApplications.toSeq ++ - testCompileConditional.analysis.allApplications.toSeq) + testCompileConditional.analysis.allApplications.toSeq) filterMultiJvmTests(allApps) } + /** + * todo: Documentation + */ def filterMultiJvmTests(allTests: Seq[String]): Map[String, Seq[String]] = { val multiJvmTests = allTests filter (_.contains(MultiJvmTestName)) val names = multiJvmTests map { fullName => @@ -81,16 +93,25 @@ trait MultiJvmTests extends DefaultProject { Map(testPairs: _*) } + /** + * todo: Documentation + */ def testIdentifier(className: String) = { val i = className.indexOf(MultiJvmTestName) val l = MultiJvmTestName.length className.substring(i + l) } + /** + * todo: Documentation + */ def testSimpleName(className: String) = { className.split("\\.").last } + /** + * todo: Documentation + */ def testScalaOptions(testClass: String) = { val scalaTestJars = testClasspath.get.filter(_.name.contains("scalatest")) val cp = Path.makeString(scalaTestJars) @@ -98,13 +119,23 @@ trait MultiJvmTests extends DefaultProject { Seq("-cp", cp, ScalaTestRunner, ScalaTestOptions, "-s", testClass, "-p", paths) } + /** + * todo: Documentation + */ def runScalaOptions(appClass: String) = { val cp = Path.makeString(testClasspath.get) Seq("-cp", cp, appClass) } - def runMulti(testName: String, testClasses: Seq[String], scalaOptions: String => Seq[String]) = { + /** + * Runs all the test. This method blocks until all processes have completed. + * + * @return an option that return an error message if one of the tests failed, or a None in case of a success. + */ + def runMulti(testName: String, testClasses: Seq[String], scalaOptions: String => Seq[String]): Option[String] = { log.control(ControlEvent.Start, "%s multi-jvm / %s %s" format (HeaderStart, testName, HeaderEnd)) + + //spawns all the processes. val processes = testClasses.toList.zipWithIndex map { case (testClass, index) => { val jvmName = "JVM-" + testIdentifier(testClass) @@ -128,18 +159,28 @@ trait MultiJvmTests extends DefaultProject { (testClass, startJvm(jvmOptions, scalaOptions(testClass), jvmLogger, index == 0)) } } + + //places the exit code of the process belonging to a specific textClass in the exitCodes map. val exitCodes = processes map { case (testClass, process) => (testClass, process.exitValue) } + + //Checks if there are any processes that failed with an error. val failures = exitCodes flatMap { case (testClass, exit) if exit > 0 => Some("%s failed with exit code %s" format (testClass, exit)) case _ => None } + + //log the failures (if there are any). failures foreach (log.error(_)) log.control(ControlEvent.Finish, "%s multi-jvm / %s %s" format (HeaderStart, testName, HeaderEnd)) + if (!failures.isEmpty) Some("Some processes failed") else None } + /** + * Starts a JVM with the given options. + */ def startJvm(jvmOptions: Seq[String], scalaOptions: Seq[String], logger: Logger, connectInput: Boolean) = { val si = buildScalaInstance val scalaJars = Seq(si.libraryJar, si.compilerJar)