From 76d9c3c33a78a185a159ed2871c8d8bfbf1957ce Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Thu, 19 May 2011 10:58:30 +0200 Subject: [PATCH] 1. Added docs on how to run the multi-jvm tests 2. Fixed cyclic dependency in deployer/cluster boot up 3. Refactored actorOf for clustered actor deployment, all actorOf now works --- .../scala/akka/dataflow/DataFlowSpec.scala | 166 ------------- .../src/main/scala/akka/actor/Actor.scala | 36 ++- .../main/scala/akka/dataflow/DataFlow.scala | 165 ------------- .../src/main/scala/akka/cluster/Cluster.scala | 6 +- akka-docs/dev/index.rst | 3 +- akka-docs/dev/multi-jvm-testing.rst | 220 ++++++++++++++++++ 6 files changed, 241 insertions(+), 355 deletions(-) delete mode 100644 akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala delete mode 100644 akka-actor/src/main/scala/akka/dataflow/DataFlow.scala create mode 100644 akka-docs/dev/multi-jvm-testing.rst diff --git a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala b/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala deleted file mode 100644 index 6b6d8effb5..0000000000 --- a/akka-actor-tests/src/test/scala/akka/dataflow/DataFlowSpec.scala +++ /dev/null @@ -1,166 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import org.scalatest.Spec -import org.scalatest.Assertions -import org.scalatest.matchers.ShouldMatchers -import org.scalatest.BeforeAndAfterAll -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith - -import akka.dispatch.DefaultCompletableFuture -import java.util.concurrent.{ TimeUnit, CountDownLatch } -import annotation.tailrec -import java.util.concurrent.atomic.{ AtomicLong, AtomicReference, AtomicInteger } -import akka.actor.ActorRegistry - -@RunWith(classOf[JUnitRunner]) -class DataFlowTest extends Spec with ShouldMatchers with BeforeAndAfterAll { - describe("DataflowVariable") { - it("should be able to set the value of one variable from other variables") { - import DataFlow._ - - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - val x, y, z = new DataFlowVariable[Int] - thread { - z << x() + y() - result.set(z()) - latch.countDown() - } - thread { x << 40 } - thread { y << 2 } - - latch.await(10, TimeUnit.SECONDS) should equal(true) - result.get should equal(42) - List(x, y, z).foreach(_.shutdown()) - } - - it("should be able to sum a sequence of ints") { - import DataFlow._ - - def ints(n: Int, max: Int): List[Int] = - if (n == max) Nil - else n :: ints(n + 1, max) - - def sum(s: Int, stream: List[Int]): List[Int] = stream match { - case Nil ⇒ s :: Nil - case h :: t ⇒ s :: sum(h + s, t) - } - - val latch = new CountDownLatch(1) - val result = new AtomicReference[List[Int]](Nil) - val x = new DataFlowVariable[List[Int]] - val y = new DataFlowVariable[List[Int]] - val z = new DataFlowVariable[List[Int]] - - thread { x << ints(0, 1000) } - thread { y << sum(0, x()) } - - thread { - z << y() - result.set(z()) - latch.countDown() - } - - latch.await(10, TimeUnit.SECONDS) should equal(true) - result.get should equal(sum(0, ints(0, 1000))) - List(x, y, z).foreach(_.shutdown()) - } - /* - it("should be able to join streams") { - import DataFlow._ - Actor.registry.shutdownAll() - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - val result = new AtomicInteger(0) - - val t1 = thread { ints(0, 1000, producer) } - val t2 = thread { - Thread.sleep(1000) - result.set(producer.map(x => x * x).foldLeft(0)(_ + _)) - latch.countDown() - } - - latch.await(3,TimeUnit.SECONDS) should equal (true) - result.get should equal (332833500) - } - - it("should be able to sum streams recursively") { - import DataFlow._ - - def ints(n: Int, max: Int, stream: DataFlowStream[Int]): Unit = if (n != max) { - stream <<< n - ints(n + 1, max, stream) - } - - def sum(s: Int, in: DataFlowStream[Int], out: DataFlowStream[Int]): Unit = { - out <<< s - sum(in() + s, in, out) - } - - val result = new AtomicLong(0) - - val producer = new DataFlowStream[Int] - val consumer = new DataFlowStream[Int] - val latch = new CountDownLatch(1) - - @tailrec def recurseSum(stream: DataFlowStream[Int]): Unit = { - val x = stream() - - if(result.addAndGet(x) == 166666500) - latch.countDown() - - recurseSum(stream) - } - - thread { ints(0, 1000, producer) } - thread { sum(0, producer, consumer) } - thread { recurseSum(consumer) } - - latch.await(15,TimeUnit.SECONDS) should equal (true) - } -*/ - /* Test not ready for prime time, causes some sort of deadlock */ - /* it("should be able to conditionally set variables") { - - import DataFlow._ - Actor.registry.shutdownAll() - - val latch = new CountDownLatch(1) - val x, y, z, v = new DataFlowVariable[Int] - - val main = thread { - x << 1 - z << Math.max(x(),y()) - latch.countDown() - } - - val setY = thread { - // Thread.sleep(2000) - y << 2 - } - - val setV = thread { - v << y - } - List(x,y,z,v) foreach (_.shutdown()) - latch.await(2,TimeUnit.SECONDS) should equal (true) - }*/ - } -} diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 7da3f0ea85..3e42931fed 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -208,15 +208,16 @@ object Actor extends ListenerManagement { */ def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = { Address.validate(address) + val actorRefFactory = () ⇒ newLocalActorRef(clazz, address) try { Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) ⇒ newLocalActorRef(clazz, address) // FIXME handle 'router' in 'Local' actors - case deploy ⇒ newClusterActorRef[T](clazz, address, deploy) + case Deploy(_, router, _, Local) ⇒ actorRefFactory() // FIXME handle 'router' in 'Local' actors + case deploy ⇒ newClusterActorRef[T](actorRefFactory, address, deploy) } } catch { case e: DeploymentException ⇒ EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) - newLocalActorRef(clazz, address) // if deployment fails, fall back to local actors + actorRefFactory() // if deployment fails, fall back to local actors } } @@ -260,16 +261,16 @@ object Actor extends ListenerManagement { */ def actorOf[T <: Actor](creator: ⇒ T, address: String): ActorRef = { Address.validate(address) - val factory = () ⇒ creator + val actorRefFactory = () ⇒ new LocalActorRef(() ⇒ creator, address) try { Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) ⇒ new LocalActorRef(factory, address) // FIXME handle 'router' in 'Local' actors - case deploy ⇒ newClusterActorRef[T](factory, address, deploy) + case Deploy(_, router, _, Local) ⇒ actorRefFactory() // FIXME handle 'router' in 'Local' actors + case deploy ⇒ newClusterActorRef[T](actorRefFactory, address, deploy) } } catch { case e: DeploymentException ⇒ EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) - new LocalActorRef(factory, address) // if deployment fails, fall back to local actors + actorRefFactory() // if deployment fails, fall back to local actors } } @@ -294,16 +295,16 @@ object Actor extends ListenerManagement { */ def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = { Address.validate(address) - val factory = () ⇒ creator.create + val actorRefFactory = () ⇒ new LocalActorRef(() ⇒ creator.create, address) try { Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) ⇒ new LocalActorRef(factory, address) // FIXME handle 'router' in 'Local' actors - case deploy ⇒ newClusterActorRef[T](factory, address, deploy) + case Deploy(_, router, _, Local) ⇒ actorRefFactory() // FIXME handle 'router' in 'Local' actors + case deploy ⇒ newClusterActorRef[T](actorRefFactory, address, deploy) } } catch { case e: DeploymentException ⇒ EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) - new LocalActorRef(factory, address) // if deployment fails, fall back to local actors + actorRefFactory() // if deployment fails, fall back to local actors } } @@ -371,11 +372,7 @@ object Actor extends ListenerManagement { }, address) } - private def newClusterActorRef[T <: Actor](factory: () ⇒ T, address: String, deploy: Deploy): ActorRef = { - newClusterActorRef(factory().getClass.asInstanceOf[Class[T]], address, deploy) - } - - private def newClusterActorRef[T <: Actor](clazz: Class[T], address: String, deploy: Deploy): ActorRef = { + private def newClusterActorRef[T <: Actor](factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = { deploy match { case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒ ClusterModule.ensureEnabled() @@ -397,7 +394,6 @@ object Actor extends ListenerManagement { } import ClusterModule.node - node.start() // start cluster node if (hostname == Config.hostname) { // home node for clustered actor @@ -428,7 +424,7 @@ object Actor extends ListenerManagement { implicit val format: Format[T] = null sys.error("FIXME use the serializer above instead of dummy Format, but then the ClusterNode AND ActorRef serialization needs to be rewritten") - if (!node.isClustered(address)) node.store(address, clazz, replicas, false) + if (!node.isClustered(address)) node.store(address, factory(), replicas, false) node.use(address) } else { val routerType = router match { @@ -460,9 +456,9 @@ object Actor extends ListenerManagement { */ RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor) + case invalid ⇒ throw new IllegalActorStateException( - "Could not create actor [" + clazz.getName + - "] with address [" + address + + "Could not create actor with address [" + address + "], not bound to a valid deployment scheme [" + invalid + "]") } } diff --git a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala b/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala deleted file mode 100644 index 44cef7e13d..0000000000 --- a/akka-actor/src/main/scala/akka/dataflow/DataFlow.scala +++ /dev/null @@ -1,165 +0,0 @@ -/** - * Copyright (C) 2009-2011 Scalable Solutions AB - */ - -package akka.dataflow - -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ ConcurrentLinkedQueue, LinkedBlockingQueue } - -import akka.event.EventHandler -import akka.actor.{ Actor, ActorRef } -import akka.actor.Actor._ -import akka.dispatch.CompletableFuture -import akka.AkkaException -import akka.japi.{ Function, Effect } - -/** - * Implements Oz-style dataflow (single assignment) variables. - * - * @author Jonas Bonér - */ -object DataFlow { - object Start - object Exit - - class DataFlowVariableException(message: String, cause: Throwable = null) extends AkkaException(message, cause) - - /** - * Executes the supplied thunk in another thread. - */ - def thread(body: ⇒ Unit): Unit = spawn(body) - - /** - * JavaAPI. - * Executes the supplied Effect in another thread. - */ - def thread(body: Effect): Unit = spawn(body.apply) - - /** - * Executes the supplied function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: A ⇒ R) = - actorOf(new ReactiveEventBasedThread(body)).start() - - /** - * JavaAPI. - * Executes the supplied Function in another thread. - */ - def thread[A <: AnyRef, R <: AnyRef](body: Function[A, R]) = - actorOf(new ReactiveEventBasedThread(body.apply)).start() - - private class ReactiveEventBasedThread[A <: AnyRef, T <: AnyRef](body: A ⇒ T) - extends Actor { - def receive = { - case Exit ⇒ self.stop() - case message ⇒ self.reply(body(message.asInstanceOf[A])) - } - } - - private object DataFlowVariable { - private sealed abstract class DataFlowVariableMessage - private case class Set[T <: Any](value: T) extends DataFlowVariableMessage - private object Get extends DataFlowVariableMessage - } - - /** - * @author Jonas Bonér - */ - @deprecated("Superceeded by Future and CompletableFuture as of 1.1", "1.1") - sealed class DataFlowVariable[T <: Any](timeoutMs: Long) { - import DataFlowVariable._ - - def this() = this(1000 * 60) - - private val value = new AtomicReference[Option[T]](None) - private val blockedReaders = new ConcurrentLinkedQueue[ActorRef] - - private class In[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - def receive = { - case s@Set(v) ⇒ - if (dataFlow.value.compareAndSet(None, Some(v.asInstanceOf[T]))) { - while (dataFlow.blockedReaders.peek ne null) - dataFlow.blockedReaders.poll ! s - } else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + dataFlow.value.get + "] to [" + v + "])") - case Exit ⇒ self.stop() - } - } - - private class Out[T <: Any](dataFlow: DataFlowVariable[T]) extends Actor { - self.timeout = timeoutMs - private var readerFuture: Option[CompletableFuture[Any]] = None - def receive = { - case Get ⇒ dataFlow.value.get match { - case Some(value) ⇒ self reply value - case None ⇒ readerFuture = self.senderFuture - } - case Set(v: T) ⇒ readerFuture.map(_ completeWithResult v) - case Exit ⇒ self.stop() - } - } - - private[this] val in = actorOf(new In(this)).start() - - /** - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def <<(ref: DataFlowVariable[T]) { - if (this.value.get.isEmpty) in ! Set(ref()) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + ref() + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(ref: DataFlowVariable[T]) { this << ref } - - /** - * Sets the value of this variable (if unset). - */ - def <<(value: T) { - if (this.value.get.isEmpty) in ! Set(value) - else throw new DataFlowVariableException( - "Attempt to change data flow variable (from [" + this.value.get + "] to [" + value + "])") - } - - /** - * JavaAPI. - * Sets the value of this variable (if unset) with the value of the supplied variable. - */ - def set(value: T) { this << value } - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def get(): T = this() - - /** - * Retrieves the value of variable, throws a DataFlowVariableException if it times out. - */ - def apply(): T = { - value.get getOrElse { - val out = actorOf(new Out(this)).start() - - val result = try { - blockedReaders offer out - (out !! Get).as[T] - } catch { - case e: Exception ⇒ - EventHandler.error(e, this, e.getMessage) - out ! Exit - throw e - } - - result.getOrElse(throw new DataFlowVariableException( - "Timed out (after " + timeoutMs + " milliseconds) while waiting for result")) - } - } - - def shutdown() { in ! Exit } - } -} diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 58faa0fdd5..ce97e8960d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -418,16 +418,16 @@ class ClusterNode private[akka] ( } }, "akka.cluster.remoteClientLifeCycleListener").start() - val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start() + lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start() - val remoteService: RemoteSupport = { + lazy val remoteService: RemoteSupport = { val remote = new akka.remote.netty.NettyRemoteSupport remote.start(nodeAddress.hostname, nodeAddress.port) remote.register(RemoteClusterDaemon.ADDRESS, remoteDaemon) remote.addListener(remoteClientLifeCycleListener) remote } - val remoteServerAddress: InetSocketAddress = remoteService.address + lazy val remoteServerAddress: InetSocketAddress = remoteService.address val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster") diff --git a/akka-docs/dev/index.rst b/akka-docs/dev/index.rst index 690ea88664..17fcb2427c 100644 --- a/akka-docs/dev/index.rst +++ b/akka-docs/dev/index.rst @@ -5,7 +5,8 @@ Information for Developers :maxdepth: 2 building-akka + multi-jvm-testing developer-guidelines documentation team - + diff --git a/akka-docs/dev/multi-jvm-testing.rst b/akka-docs/dev/multi-jvm-testing.rst new file mode 100644 index 0000000000..ce5a2d39a0 --- /dev/null +++ b/akka-docs/dev/multi-jvm-testing.rst @@ -0,0 +1,220 @@ +Multi-JVM Testing +================= + +Included in the example is an sbt trait for multi-JVM testing which will fork +JVMs for multi-node testing. There is support for running applications (objects +with main methods) and running ScalaTest tests. + +Using the multi-JVM testing is straight-forward. First, mix the MultiJvmTests +trait into your sbt project:: + + class SomeProject(info: ProjectInfo) extends DefaultProject(info) with MultiJvmTests + +You can specify JVM options for the forked JVMs:: + + class SomeProject(info: ProjectInfo) extends DefaultProject(info) with MultiJvmTests { + override def multiJvmOptions = Seq("-Xmx256M") + } + +There are two sbt commands: ``multi-jvm-run`` for running applications and +``multi-jvm-test`` for running ScalaTest tests. + + +Creating application tests +~~~~~~~~~~~~~~~~~~~~~~~~~~ + +The tests are discovered through a naming convention. A test is named with the +following pattern: + +.. code-block:: none + + {TestName}MultiJvm{NodeName} + +That is, each test has ``MultiJvm`` in the middle of its name. The part before +it groups together tests/applications under a single ``TestName`` that will run +together. The part after, the ``NodeName``, is a distinguishing name for each +forked JVM. + +So to create a 3-node test called ``Test``, you can create three applications +like the following:: + + package example + + object TestMultiJvmNode1 { + def main(args: Array[String]) { + println("Hello from node 1") + } + } + + object TestMultiJvmNode2 { + def main(args: Array[String]) { + println("Hello from node 2") + } + } + + object TestMultiJvmNode3 { + def main(args: Array[String]) { + println("Hello from node 3") + } + } + +When you call ``multi-jvm-run Test`` at the sbt prompt, three JVMs will be +spawned, one for each node. It will look like this: + +.. code-block:: shell + + > multi-jvm-run Test + ... + [info] == multi-jvm-run == + [info] == multi-jvm / Test == + [info] Starting JVM-Node1 for example.TestMultiJvmNode1 + [info] Starting JVM-Node2 for example.TestMultiJvmNode2 + [info] Starting JVM-Node3 for example.TestMultiJvmNode3 + [JVM-Node1] Hello from node 1 + [JVM-Node2] Hello from node 2 + [JVM-Node3] Hello from node 3 + [info] == multi-jvm / Test == + [info] == multi-jvm-run == + [success] Successful. + + +Naming +~~~~~~ + +You can change what the ``MultiJvm`` identifier is. For example, to change it to +``ClusterTest`` override the ``multiJvmTestName`` method:: + + class SomeProject(info: ProjectInfo) extends DefaultProject(info) with MultiJvmTests { + override def multiJvmTestName = "ClusterSpec" + } + +Your tests should now be named ``{TestName}ClusterTest{NodeName}``. + + +ScalaTest +~~~~~~~~~ + +There is also support for creating ScalaTest tests rather than applications. To +do this use the same naming convention as above, but create ScalaTest suites +rather than objects with main methods. You need to have ScalaTest on the +classpath. Here is a similar example to the one above but using ScalaTest:: + + package example + + import org.scalatest.WordSpec + import org.scalatest.matchers.MustMatchers + + class SpecMultiJvmNode1 extends WordSpec with MustMatchers { + "A node" should { + "be able to say hello" in { + val message = "Hello from node 1" + message must be("Hello from node 1") + } + } + } + + class SpecMultiJvmNode2 extends WordSpec with MustMatchers { + "A node" should { + "be able to say hello" in { + val message = "Hello from node 2" + message must be("Hello from node 2") + } + } + } + +To run these tests you would call ``multi-jvm-test Spec`` at the sbt prompt. + + +Zookeeper Barrier +~~~~~~~~~~~~~~~~~ + +When running multi-JVM tests it's common to need to coordinate timing across +nodes. To do this there is a Zookeeper-based double-barrier (there is both an +entry barrier and an exit barrier). ClusterNodes also have support for creating +barriers easily. To wait at the entry use the ``enter`` method. To wait at the +exit use the ``leave`` method. It's also possible to pass a block of code which +will be run between the barriers. + +When creating a barrier you pass it a name and the number of nodes that are +expected to arrive at the barrier. You can also pass a timeout. The default +timeout is 60 seconds. + +Here is an example of coordinating the starting of two nodes and then running +something in coordination:: + + package example + + import akka.cloud.cluster._ + import akka.actor._ + + object TestMultiJvmNode1 { + val NrOfNodes = 2 + + def main(args: Array[String]) { + Cluster.startLocalCluster() + + val node = Cluster.newNode(NodeAddress("example", "node1", port = 9991)) + + node.barrier("start-node1", NrOfNodes) { + node.start + } + + node.barrier("start-node2", NrOfNodes) { + // wait for node 2 to start + } + + node.barrier("hello", NrOfNodes) { + println("Hello from node 1") + } + + Actor.registry.shutdownAll + + node.stop + + Cluster.shutdownLocalCluster + } + } + + object TestMultiJvmNode2 { + val NrOfNodes = 2 + + def main(args: Array[String]) { + val node = Cluster.newNode(NodeAddress("example", "node2", port = 9992)) + + node.barrier("start-node1", NrOfNodes) { + // wait for node 1 to start + } + + node.barrier("start-node2", NrOfNodes) { + node.start + } + + node.barrier("hello", NrOfNodes) { + println("Hello from node 2") + } + + Actor.registry.shutdownAll + + node.stop + } + } + +An example output from this would be: + +.. code-block:: shell + + > multi-jvm-run Test + ... + [info] == multi-jvm-run == + [info] == multi-jvm / Test == + [info] Starting JVM-Node1 for example.TestMultiJvmNode1 + [info] Starting JVM-Node2 for example.TestMultiJvmNode2 + [JVM-Node1] Loading config [akka.conf] from the application classpath. + [JVM-Node2] Loading config [akka.conf] from the application classpath. + ... + [JVM-Node2] Hello from node 2 + [JVM-Node1] Hello from node 1 + [info] == multi-jvm / Test == + [info] == multi-jvm-run == + [success] Successful. +