From ddb2a69c192b24b410b9dd9789d2910fdd33a0a8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Mon, 23 May 2011 22:35:01 +0200 Subject: [PATCH] Moved ClusterNode interface, NodeAddress and ChangeListener into akka-actor as real Trait instead of using structural typing. Refactored boot dependency in Cluster/Actor/Deployer. Added multi-jvm test for testing clustered actor deployment, check out as LocalActorRef and ClusterActorRef. MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../scala/akka/actor/actor/DeployerSpec.scala | 2 +- .../src/main/scala/akka/actor/Actor.scala | 29 +- .../src/main/scala/akka/actor/ActorRef.scala | 6 +- .../src/main/scala/akka/actor/Deployer.scala | 1 + .../scala/akka/cluster/ClusterInterface.scala | 449 ++++++++++++++++++ .../src/main/scala/akka/config/Config.scala | 2 + .../scala/akka/util/ReflectiveAccess.scala | 19 +- .../src/main/scala/akka/cluster/Cluster.scala | 160 ++----- .../scala/akka/cluster/ClusterActorRef.scala | 2 +- .../scala/akka/cluster/ClusterDeployer.scala | 28 +- .../akka/cluster/ClusterDeployerSpec.scala | 6 - .../SampleMultiJvmNode1.conf} | 0 .../SampleMultiJvmNode1.opts} | 0 .../SampleMultiJvmNode2.conf} | 0 .../SampleMultiJvmNode2.opts} | 0 .../SampleMultiJvmSpec.scala} | 41 +- .../store_actor/StoreActorMultiJvmNode1.conf | 4 + .../store_actor/StoreActorMultiJvmNode1.opts | 1 + .../store_actor/StoreActorMultiJvmNode2.conf | 4 + .../store_actor/StoreActorMultiJvmNode2.opts | 1 + .../store_actor/StoreActorMultiJvmSpec.scala | 89 ++++ akka-docs/dev/multi-jvm-testing.rst | 117 ++--- config/akka-reference.conf | 13 +- 23 files changed, 709 insertions(+), 265 deletions(-) create mode 100644 akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala rename akka-cluster/src/test/scala/akka/cluster/{multi/ClusterMultiJvmNode1.conf => sample/SampleMultiJvmNode1.conf} (100%) rename akka-cluster/src/test/scala/akka/cluster/{multi/ClusterMultiJvmNode1.opts => sample/SampleMultiJvmNode1.opts} (100%) rename akka-cluster/src/test/scala/akka/cluster/{multi/ClusterMultiJvmNode2.conf => sample/SampleMultiJvmNode2.conf} (100%) rename akka-cluster/src/test/scala/akka/cluster/{multi/ClusterMultiJvmNode2.opts => sample/SampleMultiJvmNode2.opts} (100%) rename akka-cluster/src/test/scala/akka/cluster/{multi/ClusterMultiJvmSpec.scala => sample/SampleMultiJvmSpec.scala} (60%) create mode 100644 akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf create mode 100644 akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts create mode 100644 akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index d92b1a5a67..bd56a79a8e 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -20,7 +20,7 @@ class DeployerSpec extends WordSpec with MustMatchers { LeastCPU, "akka.serialization.Format$Default$", Clustered( - Node("test-1"), + Node("node1"), Replicate(3), Stateless)))) } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8bf98a9bcd..8560e951b2 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -6,7 +6,7 @@ package akka.actor import DeploymentConfig._ import akka.dispatch._ -import akka.config.Config +import akka.config._ import Config._ import akka.util.{ ListenerManagement, ReflectiveAccess, Duration, Helpers } import ReflectiveAccess._ @@ -15,6 +15,7 @@ import akka.remoteinterface.RemoteSupport import akka.japi.{ Creator, Procedure } import akka.AkkaException import akka.serialization.{ Format, Serializer } +import akka.cluster.ClusterNode import akka.event.EventHandler import scala.reflect.BeanProperty @@ -137,7 +138,11 @@ object Actor extends ListenerManagement { /** * Handle to the ClusterNode. API for the cluster client. */ - lazy val cluster: ClusterModule.ClusterNode = ClusterModule.node + lazy val cluster: ClusterNode = { + val node = ClusterModule.node + node.start() + node + } /** * Handle to the RemoteSupport. API for the remote client/server. @@ -146,7 +151,7 @@ object Actor extends ListenerManagement { private[akka] lazy val remote: RemoteSupport = cluster.remoteService // start up a cluster node to join the ZooKeeper cluster - if (ClusterModule.isEnabled) cluster.start() + //if (ClusterModule.isEnabled) cluster.start() /** * Creates an ActorRef out of the Actor with type T. @@ -388,10 +393,10 @@ object Actor extends ListenerManagement { if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") - val hostname = home match { - case Host(hostname) ⇒ hostname - case IP(address) ⇒ address - case Node(nodeName) ⇒ Config.hostname + val isHomeNode = home match { + case Host(hostname) ⇒ hostname == Config.hostname + case IP(address) ⇒ address == "0.0.0.0" // FIXME checking if IP address is on home node is missing + case Node(nodename) ⇒ nodename == Config.nodename } val replicas = replication match { @@ -402,7 +407,7 @@ object Actor extends ListenerManagement { case NoReplicas() ⇒ 0 } - if (hostname == Config.hostname) { // home node for clustered actor + if (isHomeNode) { // home node for clustered actor def serializerErrorDueTo(reason: String) = throw new akka.config.ConfigurationException( @@ -433,7 +438,9 @@ object Actor extends ListenerManagement { if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added) - cluster.use(address, serializer) + cluster + .use(address, serializer) + .getOrElse(throw new ConfigurationException("Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) } else { val routerType = router match { @@ -453,7 +460,7 @@ object Actor extends ListenerManagement { cluster.ref(address, routerType) } - /* + /* Misc stuff: - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - ClusterNode API and Actor.remote API should be made private[akka] @@ -464,7 +471,7 @@ object Actor extends ListenerManagement { */ - RemoteActorRef(address, Actor.TIMEOUT, None) + // RemoteActorRef(address, Actor.TIMEOUT, None) case invalid ⇒ throw new IllegalActorStateException( "Could not create actor with address [" + address + diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 61fbf56be9..5094ebed01 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -1006,8 +1006,10 @@ private[akka] case class RemoteActorRef private[akka] ( case Node(nodeName) ⇒ Config.hostname } new InetSocketAddress(hostname, Config.remoteServerPort) - case _ ⇒ throw new IllegalStateException( - "Actor with Address [" + address + "] is not bound to a Clustered Deployment") + case _ ⇒ + new InetSocketAddress(Config.hostname, Config.remoteServerPort) + //throw new IllegalStateException( + // "Actor with Address [" + address + "] is not bound to a Clustered Deployment") } start() diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 0fb15e7071..5879656ca3 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -265,6 +265,7 @@ object Deployer { // -------------------------------- // akka.actor.deployment.
.clustered.home // -------------------------------- + val home = clusteredConfig.getString("home", "") match { case "" ⇒ Host("localhost") case home ⇒ diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala new file mode 100644 index 0000000000..9b31b28de0 --- /dev/null +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -0,0 +1,449 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.cluster + +import akka.remoteinterface.RemoteSupport +import akka.serialization.Serializer +import akka.actor._ +import akka.dispatch.Future +import akka.config.Config +import akka.util._ + +import com.eaio.uuid.UUID + +import java.net.InetSocketAddress +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } +import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap } + +import scala.collection.mutable.ConcurrentMap +import scala.collection.JavaConversions._ + +object ChangeListener { + + /** + * Cluster membership change listener. + * For Scala API. + */ + trait ChangeListener { + def notify(event: ChangeNotification, client: ClusterNode) { + event match { + case NodeConnected(name) ⇒ nodeConnected(name, client) + case NodeDisconnected(name) ⇒ nodeDisconnected(name, client) + case NewLeader(name: String) ⇒ newLeader(name, client) + case NewSession ⇒ thisNodeNewSession(client) + case ThisNode.Connected ⇒ thisNodeConnected(client) + case ThisNode.Disconnected ⇒ thisNodeDisconnected(client) + case ThisNode.Expired ⇒ thisNodeExpired(client) + } + } + + def nodeConnected(node: String, client: ClusterNode) {} + + def nodeDisconnected(node: String, client: ClusterNode) {} + + def newLeader(name: String, client: ClusterNode) {} + + def thisNodeNewSession(client: ClusterNode) {} + + def thisNodeConnected(client: ClusterNode) {} + + def thisNodeDisconnected(client: ClusterNode) {} + + def thisNodeExpired(client: ClusterNode) {} + } + + /** + * Cluster membership change listener. + * For Java API. + */ + abstract class ChangeListenerAdapter extends ChangeListener + + sealed trait ChangeNotification + + case class NodeConnected(node: String) extends ChangeNotification + + case class NodeDisconnected(node: String) extends ChangeNotification + + case class NewLeader(name: String) extends ChangeNotification + + case object NewSession extends ChangeNotification + + object ThisNode { + + case object Connected extends ChangeNotification + + case object Disconnected extends ChangeNotification + + case object Expired extends ChangeNotification + + } +} + +/** + * Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance. + * + * @author Jonas Bonér + */ +class NodeAddress( + val clusterName: String, + val nodeName: String, + val hostname: String, + val port: Int) { + if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string") + if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string") + if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string") + if (port < 1) throw new NullPointerException("Port can not be negative") + + override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port) + + override def hashCode = 0 + clusterName.## + nodeName.## + hostname.## + port.## + + override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other) +} + +object NodeAddress { + + def apply( + clusterName: String = Config.clusterName, + nodeName: String = Config.nodename, + hostname: String = Config.hostname, + port: Int = Config.remoteServerPort): NodeAddress = new NodeAddress(clusterName, nodeName, hostname, port) + + def unapply(other: Any) = other match { + case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName, address.hostname, address.port)) + case _ ⇒ None + } +} + +/** + * Interface for cluster node. + * + * @author Jonas Bonér + */ +trait ClusterNode { + import ChangeListener._ + + val nodeAddress: NodeAddress + val zkServerAddresses: String + + val remoteClientLifeCycleListener: ActorRef + val remoteDaemon: ActorRef + val remoteService: RemoteSupport + val remoteServerAddress: InetSocketAddress + + val isConnected = new Switch(false) + val isLeader = new AtomicBoolean(false) + val electionNumber = new AtomicInteger(Int.MaxValue) + + private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() + private[cluster] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress] + private[cluster] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]] + + def membershipNodes: Array[String] + + def isRunning: Boolean = isConnected.isOn + + def start(): ClusterNode + + def shutdown() + + def disconnect(): ClusterNode + + def reconnect(): ClusterNode + + /** + * Registers a cluster change listener. + */ + def register(listener: ChangeListener): ClusterNode + + /** + * Returns the name of the current leader. + */ + def leader: String + + /** + * Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op. + */ + def resign() + + /** + * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode + + /** + * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode + + /** + * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode + + /** + * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode + + /** + * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store(actorRef: ActorRef, format: Serializer): ClusterNode + + /** + * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode + + /** + * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode + + /** + * Needed to have reflection through structural typing work. + */ + def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode + + /** + * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated + * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly + * available durable store. + */ + def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode + + /** + * Removes actor with uuid from the cluster. + */ + def remove(uuid: UUID) + + /** + * Removes actor with address from the cluster. + */ + def remove(address: String): ClusterNode + + /** + * Is the actor with uuid clustered or not? + */ + def isClustered(actorAddress: String): Boolean + + /** + * Is the actor with uuid in use on 'this' node or not? + */ + def isInUseOnNode(actorAddress: String): Boolean + + /** + * Is the actor with uuid in use or not? + */ + def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean + + /** + * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available + * for remote access through lookup by its UUID. + */ + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] + + /** + * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available + * for remote access through lookup by its UUID. + */ + def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef] + + /** + * Using (checking out) all actors with a specific UUID on all nodes in the cluster. + */ + def useActorOnAllNodes(uuid: UUID) + + /** + * Using (checking out) specific UUID on a specefic node. + */ + def useActorOnNode(node: String, uuid: UUID) + + /** + * Checks in an actor after done using it on this node. + */ + def release(actorAddress: String) + + /** + * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'. + */ + def releaseActorOnAllNodes(uuid: UUID) + + /** + * Creates an ActorRef with a Router to a set of clustered actors. + */ + def ref(actorAddress: String, router: RouterType): ActorRef + + /** + * Migrate the actor from 'this' node to node 'to'. + */ + def migrate(to: NodeAddress, actorAddress: String) + + /** + * Migrate the actor from node 'from' to node 'to'. + */ + def migrate(from: NodeAddress, to: NodeAddress, actorAddress: String) + + /** + * Returns the UUIDs of all actors checked out on this node. + */ + def uuidsForActorsInUse: Array[UUID] + + /** + * Returns the addresses of all actors checked out on this node. + */ + def addressesForActorsInUse: Array[String] + + /** + * Returns the UUIDs of all actors registered in this cluster. + */ + def uuidsForClusteredActors: Array[UUID] + + /** + * Returns the addresses of all actors registered in this cluster. + */ + def addressesForClusteredActors: Array[String] + + /** + * Returns the actor id for the actor with a specific UUID. + */ + def actorAddressForUuid(uuid: UUID): String + + /** + * Returns the actor ids for all the actors with a specific UUID. + */ + def actorAddressForUuids(uuids: Array[UUID]): Array[String] + + /** + * Returns the actor UUIDs for actor ID. + */ + def uuidsForActorAddress(actorAddress: String): Array[UUID] + + /** + * Returns the node names of all actors in use with UUID. + */ + def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] + + /** + * Returns the UUIDs of all actors in use registered on a specific node. + */ + def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] + + /** + * Returns the addresses of all actors in use registered on a specific node. + */ + def addressesForActorsInUseOnNode(nodeName: String): Array[String] + + /** + * Returns Format for actor with UUID. + */ + def formatForActor(actorAddress: String): Serializer + + /** + * Returns home address for actor with UUID. + */ + def addressesForActor(actorAddress: String): Array[(UUID, InetSocketAddress)] + + /** + * Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument). + */ + def send(f: Function0[Unit], replicationFactor: Int) + + /** + * Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument). + * Returns an 'Array' with all the 'Future's from the computation. + */ + def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] + + /** + * Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument) + * with the argument speficied. + */ + def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) + + /** + * Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument) + * with the argument speficied. + * Returns an 'Array' with all the 'Future's from the computation. + */ + def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] + + def setConfigElement(key: String, bytes: Array[Byte]) + + /** + * Returns the config element for the key or NULL if no element exists under the key. + */ + def getConfigElement(key: String): Array[Byte] + + def removeConfigElement(key: String) + + def getConfigElementKeys: Array[String] + + private[cluster] def initializeNode() + + private[cluster] def addressForNode(node: String): InetSocketAddress + + private[cluster] def publish(change: ChangeNotification) + + private[cluster] def findFailedNodes(nodes: List[String]): List[String] + + private[cluster] def findNewlyConnectedMembershipNodes(nodes: List[String]): List[String] + + private[cluster] def findNewlyDisconnectedMembershipNodes(nodes: List[String]): List[String] + + private[cluster] def findNewlyConnectedAvailableNodes(nodes: List[String]): List[String] + + private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] + + private[cluster] def joinMembershipNode() + + private[cluster] def joinActorsAtAddressNode() + + private[cluster] def joinLeaderElection: Boolean + + private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) + + private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String]) + + private[cluster] def membershipPathFor(node: String): String + + private[cluster] def configurationPathFor(key: String): String + + private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String + + private[cluster] def actorLocationsPathFor(uuid: UUID): String + + private[cluster] def actorLocationsPathFor(uuid: UUID, node: NodeAddress): String + + private[cluster] def actorsAtNodePathFor(node: String): String + + private[cluster] def actorAtNodePathFor(node: String, uuid: UUID): String + + private[cluster] def actorRegistryPathFor(uuid: UUID): String + + private[cluster] def actorRegistryFormatPathFor(uuid: UUID): String + + private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID): String + + private[cluster] def actorRegistryNodePathFor(uuid: UUID): String + + private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String +} + diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala index 18d054ab15..16daea4c88 100644 --- a/akka-actor/src/main/scala/akka/config/Config.scala +++ b/akka-actor/src/main/scala/akka/config/Config.scala @@ -115,6 +115,8 @@ object Config { case value ⇒ value.toInt } + val clusterName = config.getString("akka.cluster.name", "default") + val startTime = System.currentTimeMillis def uptime = (System.currentTimeMillis - startTime) / 1000 } diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 3efdb67a63..35196813ae 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -11,6 +11,7 @@ import akka.actor._ import DeploymentConfig.Deploy import akka.event.EventHandler import akka.serialization.Format +import akka.cluster.ClusterNode import java.net.InetSocketAddress @@ -71,24 +72,6 @@ object ReflectiveAccess { clusterDeployerInstance.get } - type ClusterNode = { - def start() - def shutdown() - - def remoteService: RemoteSupport - - def store(address: String, actorClass: Class[_ <: Actor], replicas: Int, serializeMailbox: Boolean, format: Serializer) - def store(actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean, format: Serializer) - - def remove(address: String) - - def use(address: String, format: Serializer): Array[ActorRef] - def ref(address: String, router: RouterType): ActorRef - - def isClustered(address: String): Boolean - def nrOfActors: Int - } - type ClusterDeployer = { def init(deployments: List[Deploy]) def shutdown() diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index a32c462441..1da99f88b5 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -38,6 +38,7 @@ import Compression.LZF import akka.AkkaException import akka.cluster.zookeeper._ +import akka.cluster.ChangeListener._ import com.eaio.uuid.UUID @@ -153,7 +154,7 @@ object Cluster { val UUID_PREFIX = "uuid:".intern // config options - val name = config.getString("akka.cluster.name", "default") + val name = Config.clusterName val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552) val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt @@ -162,64 +163,6 @@ object Cluster { val shouldCompressData = config.getBool("akka.cluster.use-compression", false) val enableJMX = config.getBool("akka.enable-jmx", true) - /** - * Cluster membership change listener. - * For Scala API. - */ - trait ChangeListener { - def notify(event: ChangeNotification, client: ClusterNode) { - event match { - case NodeConnected(name) ⇒ nodeConnected(name, client) - case NodeDisconnected(name) ⇒ nodeDisconnected(name, client) - case NewLeader(name: String) ⇒ newLeader(name, client) - case NewSession ⇒ thisNodeNewSession(client) - case ThisNode.Connected ⇒ thisNodeConnected(client) - case ThisNode.Disconnected ⇒ thisNodeDisconnected(client) - case ThisNode.Expired ⇒ thisNodeExpired(client) - } - } - - def nodeConnected(node: String, client: ClusterNode) {} - - def nodeDisconnected(node: String, client: ClusterNode) {} - - def newLeader(name: String, client: ClusterNode) {} - - def thisNodeNewSession(client: ClusterNode) {} - - def thisNodeConnected(client: ClusterNode) {} - - def thisNodeDisconnected(client: ClusterNode) {} - - def thisNodeExpired(client: ClusterNode) {} - } - - /** - * Cluster membership change listener. - * For Java API. - */ - abstract class ChangeListenerAdapter extends ChangeListener - - sealed trait ChangeNotification - - case class NodeConnected(node: String) extends ChangeNotification - - case class NodeDisconnected(node: String) extends ChangeNotification - - case class NewLeader(name: String) extends ChangeNotification - - case object NewSession extends ChangeNotification - - object ThisNode { - - case object Connected extends ChangeNotification - - case object Disconnected extends ChangeNotification - - case object Expired extends ChangeNotification - - } - @volatile private var properties = Map.empty[String, String] @@ -249,14 +192,14 @@ object Cluster { /** * The node address. */ - lazy val nodeAddress = NodeAddress(name, nodename, hostname, port) + val nodeAddress = NodeAddress(name, nodename, hostname, port) /** * The reference to the running ClusterNode. */ - lazy val node: ClusterNode = { + val node = { if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null") - new ClusterNode(nodeAddress, zooKeeperServers, defaultSerializer) + new DefaultClusterNode(nodeAddress, zooKeeperServers, defaultSerializer) } /** @@ -319,6 +262,14 @@ object Cluster { */ def newZkClient: AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer) + def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking) + + def barrier(name: String, count: Int) = + ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count) + + def barrier(name: String, count: Int, timeout: Duration) = + ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count, timeout) + def uuidToString(uuid: UUID): String = uuid.toString def stringToUuid(uuid: String): UUID = { @@ -348,35 +299,26 @@ object Cluster { * * @author Jonas Bonér */ -class ClusterNode private[akka] ( +class DefaultClusterNode private[akka] ( val nodeAddress: NodeAddress, val zkServerAddresses: String, - val serializer: ZkSerializer) extends ErrorHandler { + val serializer: ZkSerializer) extends ErrorHandler with ClusterNode { self ⇒ if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'") + val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster") + import Cluster._ - EventHandler.info(this, - ("\nCreating cluster node with" + - "\n\tcluster name = [%s]" + - "\n\tnode name = [%s]" + - "\n\tport = [%s]" + - "\n\tzookeeper server addresses = [%s]" + - "\n\tserializer = [%s]") - .format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer)) - - val remoteClientLifeCycleListener = actorOf(new Actor { + lazy val remoteClientLifeCycleListener = actorOf(new Actor { def receive = { case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule() case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule() case _ ⇒ //ignore other } }, "akka.cluster.remoteClientLifeCycleListener").start() - lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start() - lazy val remoteService: RemoteSupport = { val remote = new akka.remote.netty.NettyRemoteSupport remote.start(nodeAddress.hostname, nodeAddress.port) @@ -386,8 +328,6 @@ class ClusterNode private[akka] ( } lazy val remoteServerAddress: InetSocketAddress = remoteService.address - val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster") - // static nodes val CLUSTER_NODE = "/" + nodeAddress.clusterName val MEMBERSHIP_NODE = CLUSTER_NODE + "/members" @@ -409,17 +349,8 @@ class ClusterNode private[akka] ( val LEADER_ELECTION_NODE = CLUSTER_NODE + "/leader" // should NOT be part of 'baseNodes' only used by 'leaderLock' - val isConnected = new Switch(false) - val isLeader = new AtomicBoolean(false) - val electionNumber = new AtomicInteger(Integer.MAX_VALUE) - private val membershipNodePath = membershipPathFor(nodeAddress.nodeName) - // local caches of ZK data - private[akka] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() - private[akka] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress] - private[akka] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]] - def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]] private[akka] val replicaConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = @@ -436,13 +367,13 @@ class ClusterNode private[akka] ( private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef] // resources - private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer) + lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer) - private[cluster] val leaderElectionCallback = new LockListener { + lazy private[cluster] val leaderElectionCallback = new LockListener { override def lockAcquired() { EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName)) self.isLeader.set(true) - self.publish(Cluster.NewLeader(self.nodeAddress.nodeName)) + self.publish(NewLeader(self.nodeAddress.nodeName)) } override def lockReleased() { @@ -453,7 +384,7 @@ class ClusterNode private[akka] ( } } - private[cluster] val leaderLock = new WriteLock( + lazy private[cluster] val leaderLock = new WriteLock( zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) { // ugly hack, but what do you do? <--- haha epic private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId") @@ -468,8 +399,6 @@ class ClusterNode private[akka] ( // Node // ======================================= - def isRunning: Boolean = isConnected.isOn - def start(): ClusterNode = { isConnected switchOn { initializeNode() @@ -738,13 +667,13 @@ class ClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String): Array[LocalActorRef] = use(actorAddress, formatForActor(actorAddress)) + def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, formatForActor(actorAddress)) /** * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String, format: Serializer): Array[LocalActorRef] = if (isConnected.isOn) { + def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef] = if (isConnected.isOn) { import akka.serialization.ActorSerialization._ @@ -780,8 +709,8 @@ class ClusterNode private[akka] ( actor.asInstanceOf[LocalActorRef] case Right(exception) ⇒ throw exception } - } - } else Array.empty[LocalActorRef] + } headOption // FIXME should not be an array at all coming here + } else None /** * Using (checking out) all actors with a specific UUID on all nodes in the cluster. @@ -1151,22 +1080,6 @@ class ClusterNode private[akka] ( def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]] - // ======================================= - // Queue - // ======================================= - - def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(zkClient, rootPath, blocking) - - // ======================================= - // Barrier - // ======================================= - - def barrier(name: String, count: Int) = - ZooKeeperBarrier(zkClient, nodeAddress.clusterName, name, nodeAddress.nodeName, count) - - def barrier(name: String, count: Int, timeout: Duration) = - ZooKeeperBarrier(zkClient, nodeAddress.clusterName, name, nodeAddress.nodeName, count, timeout) - // ======================================= // Private // ======================================= @@ -1198,7 +1111,14 @@ class ClusterNode private[akka] ( "%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort) private[cluster] def initializeNode() { - EventHandler.info(this, "Initializing cluster node [%s]".format(nodeAddress)) + EventHandler.info(this, + ("\nCreating cluster node with" + + "\n\tcluster name = [%s]" + + "\n\tnode name = [%s]" + + "\n\tport = [%s]" + + "\n\tzookeeper server addresses = [%s]" + + "\n\tserializer = [%s]") + .format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer)) EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString)) createRootClusterNode() val isLeader = joinLeaderElection @@ -1499,12 +1419,12 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E .format(self.nodeAddress.nodeName, childList.mkString(" "))) self.findNewlyConnectedMembershipNodes(childList) foreach { name ⇒ self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map - self.publish(Cluster.NodeConnected(name)) + self.publish(NodeConnected(name)) } self.findNewlyDisconnectedMembershipNodes(childList) foreach { name ⇒ self.nodeNameToAddress.remove(name) // update 'nodename-address' map - self.publish(Cluster.NodeDisconnected(name)) + self.publish(NodeDisconnected(name)) } self.locallyCachedMembershipNodes.clear() @@ -1522,13 +1442,13 @@ class StateListener(self: ClusterNode) extends IZkStateListener { state match { case KeeperState.SyncConnected ⇒ EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress)) - self.publish(Cluster.ThisNode.Connected) + self.publish(ThisNode.Connected) case KeeperState.Disconnected ⇒ EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress)) - self.publish(Cluster.ThisNode.Disconnected) + self.publish(ThisNode.Disconnected) case KeeperState.Expired ⇒ EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress)) - self.publish(Cluster.ThisNode.Expired) + self.publish(ThisNode.Expired) } } @@ -1538,7 +1458,7 @@ class StateListener(self: ClusterNode) extends IZkStateListener { def handleNewSession() { EventHandler.debug(this, "Session expired re-initializing node [%s]".format(self.nodeAddress)) self.initializeNode() - self.publish(Cluster.NewSession) + self.publish(NewSession) } } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index c8ac1012f4..abcbbb58ee 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -56,5 +56,5 @@ class ClusterActorRef private[akka] ( // clustered refs are always registered and looked up by UUID private def createRemoteActorRef(uuid: UUID, address: InetSocketAddress) = - RemoteActorRef(UUID_PREFIX + uuidToString(uuid), Actor.TIMEOUT, None) + RemoteActorRef(uuidToString(uuid), Actor.TIMEOUT, None) } diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index c8d22ad616..ceadbd8585 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -4,7 +4,7 @@ package akka.cluster -import akka.actor.{ DeploymentConfig, Deployer, DeploymentException } +import akka.actor.{ DeploymentConfig, Deployer, LocalDeployer, DeploymentException } import DeploymentConfig._ import akka.event.EventHandler import akka.config.Config @@ -119,17 +119,23 @@ object ClusterDeployer { private[akka] def deploy(deployment: Deploy) { ensureRunning { - val path = deploymentAddressPath.format(deployment.address) - try { - ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) - zkClient.writeData(path, deployment) + deployment match { + case Deploy(_, _, _, Local) ⇒ // local deployment + LocalDeployer.deploy(deployment) - // FIXME trigger cluster-wide deploy action - } catch { - case e: NullPointerException ⇒ - handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed")) - case e: Exception ⇒ - handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e)) + case _ ⇒ // cluster deployment + val path = deploymentAddressPath.format(deployment.address) + try { + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) + zkClient.writeData(path, deployment) + + // FIXME trigger cluster-wide deploy action + } catch { + case e: NullPointerException ⇒ + handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed")) + case e: Exception ⇒ + handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e)) + } } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index bc5c189a1c..c5f9addc36 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -39,12 +39,6 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter oldDeployment must equal(newDeployment.get) } } - - "be able to create an actor deployed using ClusterDeployer, add it to ZooKeeper and then check the actor out for use" in { - val pi = Actor.actorOf[HelloWorld]("service-hello") - pi must not equal (null) - pi.address must equal("service-hello") - } } override def beforeAll() { diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.conf similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.conf rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.conf diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.opts similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.opts rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.opts diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.conf similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.conf rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.conf diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.opts similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.opts rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.opts diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmSpec.scala similarity index 60% rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmSpec.scala rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmSpec.scala index fe1e07109f..d949d99d26 100644 --- a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmSpec.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.cluster.multi +package akka.cluster.sample import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers @@ -10,16 +10,15 @@ import org.scalatest.BeforeAndAfterAll import akka.cluster._ -object ClusterMultiJvmSpec { +object SampleMultiJvmSpec { val NrOfNodes = 2 } -class ClusterMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { - import ClusterMultiJvmSpec._ +class SampleMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import SampleMultiJvmSpec._ override def beforeAll() = { Cluster.startLocalCluster() - // resetCluster() } override def afterAll() = { @@ -45,23 +44,17 @@ class ClusterMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfte } "be able to start all nodes" in { - Cluster.node.barrier("start", NrOfNodes) { - // Cluster.node.start() + Cluster.barrier("start", NrOfNodes) { + Cluster.node.start() } - // Cluster.node.isRunning must be(true) - } - - "be able to shutdown all nodes" in { - Cluster.node.barrier("shutdown", NrOfNodes) { - // Cluster.node.shutdown() - } - // Cluster.node.isRunning must be(false) + Cluster.node.isRunning must be(true) + Cluster.node.shutdown() } } } -class ClusterMultiJvmNode2 extends WordSpec with MustMatchers { - import ClusterMultiJvmSpec._ +class SampleMultiJvmNode2 extends WordSpec with MustMatchers { + import SampleMultiJvmSpec._ "A cluster" must { @@ -72,17 +65,11 @@ class ClusterMultiJvmNode2 extends WordSpec with MustMatchers { } "be able to start all nodes" in { - Cluster.node.barrier("start", NrOfNodes) { - // Cluster.node.start() + Cluster.barrier("start", NrOfNodes) { + Cluster.node.start() } - // Cluster.node.isRunning must be(true) - } - - "be able to shutdown all nodes" in { - Cluster.node.barrier("shutdown", NrOfNodes) { - // Cluster.node.shutdown() - } - // Cluster.node.isRunning must be(false) + Cluster.node.isRunning must be(true) + Cluster.node.shutdown() } } } diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf new file mode 100644 index 0000000000..deaf85b42c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf @@ -0,0 +1,4 @@ +akka.actor.deployment.service-hello.router = "round-robin" +akka.actor.deployment.service-hello.clustered.home = "node:node1" +akka.actor.deployment.service-hello.clustered.replicas = 2 +akka.actor.deployment.service-hello.clustered.stateless = on diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts new file mode 100644 index 0000000000..a88c260d8c --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf new file mode 100644 index 0000000000..0c1a16ea15 --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf @@ -0,0 +1,4 @@ +akka.actor.deployment.service-hello.router = "round-robin" +akka.actor.deployment.service-hello.clustered.home = "node:node1" +akka.actor.deployment.service-hello.clustered.replicas = 2 +akka.actor.deployment.service-hello.clustered.stateless = on \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts new file mode 100644 index 0000000000..f1e01f253d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts @@ -0,0 +1 @@ +-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala new file mode 100644 index 0000000000..8e32f3960b --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.cluster.store_actor + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +import akka.cluster._ +import akka.actor._ +import Actor._ + +object StoreActorMultiJvmSpec { + val NrOfNodes = 2 + + class HelloWorld extends Actor with Serializable { + def receive = { + case "Hello" ⇒ self.reply("World") + } + } +} + +class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import StoreActorMultiJvmSpec._ + + "A cluster" must { + + "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { + System.getProperty("akka.cluster.nodename", "") must be("node1") + System.getProperty("akka.cluster.port", "") must be("9991") + + Cluster.barrier("start-node1", NrOfNodes) { + Cluster.node.start() + } + + Cluster.barrier("start-node2", NrOfNodes) {} + + Cluster.barrier("create-clustered-actor-node1", NrOfNodes) { + val pi = Actor.actorOf[HelloWorld]("service-hello") + pi must not equal (null) + pi.address must equal("service-hello") + pi.isInstanceOf[LocalActorRef] must be(true) + } + + Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {} + + Cluster.node.shutdown() + } + } + + override def beforeAll() = { + Cluster.startLocalCluster() + } + + override def afterAll() = { + Cluster.shutdownLocalCluster() + } +} + +class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers { + import StoreActorMultiJvmSpec._ + + "A cluster" must { + + "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in { + System.getProperty("akka.cluster.nodename", "") must be("node2") + System.getProperty("akka.cluster.port", "") must be("9992") + + Cluster.barrier("start-node1", NrOfNodes) {} + + Cluster.barrier("start-node2", NrOfNodes) { + Cluster.node.start() + } + + Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {} + + Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) { + val pi = Actor.actorOf[HelloWorld]("service-hello") + pi must not equal (null) + pi.address must equal("service-hello") + pi.isInstanceOf[ClusterActorRef] must be(true) + } + + Cluster.node.shutdown() + } + } +} diff --git a/akka-docs/dev/multi-jvm-testing.rst b/akka-docs/dev/multi-jvm-testing.rst index c69ad34adb..84a3e5de32 100644 --- a/akka-docs/dev/multi-jvm-testing.rst +++ b/akka-docs/dev/multi-jvm-testing.rst @@ -41,19 +41,19 @@ like the following:: package example - object TestMultiJvmNode1 { + object SampleMultiJvmNode1 { def main(args: Array[String]) { println("Hello from node 1") } } - object TestMultiJvmNode2 { + object SampleMultiJvmNode2 { def main(args: Array[String]) { println("Hello from node 2") } } - object TestMultiJvmNode3 { + object SampleMultiJvmNode3 { def main(args: Array[String]) { println("Hello from node 3") } @@ -68,9 +68,9 @@ spawned, one for each node. It will look like this: ... [info] == multi-jvm-run == [info] == multi-jvm / Test == - [info] Starting JVM-Node1 for example.TestMultiJvmNode1 - [info] Starting JVM-Node2 for example.TestMultiJvmNode2 - [info] Starting JVM-Node3 for example.TestMultiJvmNode3 + [info] Starting JVM-Node1 for example.SampleMultiJvmNode1 + [info] Starting JVM-Node2 for example.SampleMultiJvmNode2 + [info] Starting JVM-Node3 for example.SampleMultiJvmNode3 [JVM-Node1] Hello from node 1 [JVM-Node2] Hello from node 2 [JVM-Node3] Hello from node 3 @@ -98,21 +98,22 @@ Setting JVM options ------------------- You can define specific JVM options for each of the spawned JVMs. You do that by creating -a file named after the node in the test with suffix ``.opts``. +a file named after the node in the test with suffix ``.opts`` and put them in the same +directory as the test. For example, to feed the JVM options ``-Dakka.cluster.nodename=node1`` and -``-Dakka.cluster.port=9991`` to the ``TestMultiJvmNode1`` let's create three ``*.opts`` files +``-Dakka.cluster.port=9991`` to the ``SampleMultiJvmNode1`` let's create three ``*.opts`` files and add the options to them. -``TestMultiJvmNode1.opts``:: +``SampleMultiJvmNode1.opts``:: -Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991 -``TestMultiJvmNode2.opts``:: +``SampleMultiJvmNode2.opts``:: -Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992 -``TestMultiJvmNode3.opts``:: +``SampleMultiJvmNode3.opts``:: -Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993 @@ -120,20 +121,21 @@ Overriding akka.conf options ---------------------------- You can also override the options in the ``akka.conf`` file with different options for each -spawned JVM. You do that by creating a file named after the node in the test with suffix ``.conf``. +spawned JVM. You do that by creating a file named after the node in the test with suffix +``.conf`` and put them in the same directory as the test . -For example, to override the configuration option ``akka.cluster.name`` let's create three ``*.conf`` files -and add the option to them. +For example, to override the configuration option ``akka.cluster.name`` let's create three +``*.conf`` files and add the option to them. -``TestMultiJvmNode1.conf``:: +``SampleMultiJvmNode1.conf``:: akka.cluster.name = "test-cluster" -``TestMultiJvmNode2.conf``:: +``SampleMultiJvmNode2.conf``:: akka.cluster.name = "test-cluster" -``TestMultiJvmNode3.conf``:: +``SampleMultiJvmNode3.conf``:: akka.cluster.name = "test-cluster" @@ -175,10 +177,10 @@ Zookeeper Barrier ~~~~~~~~~~~~~~~~~ When running multi-JVM tests it's common to need to coordinate timing across -nodes. To do this there is a Zookeeper-based double-barrier (there is both an +nodes. To do this there is a ZooKeeper-based double-barrier (there is both an entry barrier and an exit barrier). ClusterNodes also have support for creating barriers easily. To wait at the entry use the ``enter`` method. To wait at the -exit use the ``leave`` method. It's also possible to pass a block of code which +exit use the ``leave`` method. It's also possible t pass a block of code which will be run between the barriers. When creating a barrier you pass it a name and the number of nodes that are @@ -188,60 +190,63 @@ timeout is 60 seconds. Here is an example of coordinating the starting of two nodes and then running something in coordination:: - package example + import org.scalatest.WordSpec + import org.scalatest.matchers.MustMatchers + import org.scalatest.BeforeAndAfterAll import akka.cluster._ - import akka.actor._ - object TestMultiJvmNode1 { + object SampleMultiJvmSpec { val NrOfNodes = 2 + } - def main(args: Array[String]) { + class SampleMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import SampleMultiJvmSpec._ + + override def beforeAll() = { Cluster.startLocalCluster() + } - val node = Cluster.newNode(NodeAddress("example", "node1", port = 9991)) + override def afterAll() = { + Cluster.shutdownLocalCluster() + } - node.barrier("start-node1", NrOfNodes) { - node.start + "A cluster" must { + + "have jvm options" in { + System.getProperty("akka.cluster.nodename", "") must be("node1") + System.getProperty("akka.cluster.port", "") must be("9991") + akka.config.Config.config.getString("test.name", "") must be("node1") } - node.barrier("start-node2", NrOfNodes) { - // wait for node 2 to start + "be able to start all nodes" in { + Cluster.barrier("start", NrOfNodes) { + Cluster.node.start() + } + Cluster.node.isRunning must be(true) + Cluster.node.shutdown() } - - node.barrier("hello", NrOfNodes) { - println("Hello from node 1") - } - - Actor.registry.local.shutdownAll - - node.stop - - Cluster.shutdownLocalCluster } } - object TestMultiJvmNode2 { - val NrOfNodes = 2 + class SampleMultiJvmNode2 extends WordSpec with MustMatchers { + import SampleMultiJvmSpec._ - def main(args: Array[String]) { - val node = Cluster.newNode(NodeAddress("example", "node2", port = 9992)) + "A cluster" must { - node.barrier("start-node1", NrOfNodes) { - // wait for node 1 to start + "have jvm options" in { + System.getProperty("akka.cluster.nodename", "") must be("node2") + System.getProperty("akka.cluster.port", "") must be("9992") + akka.config.Config.config.getString("test.name", "") must be("node2") } - node.barrier("start-node2", NrOfNodes) { - node.start + "be able to start all nodes" in { + Cluster.barrier("start", NrOfNodes) { + Cluster.node.start() + } + Cluster.node.isRunning must be(true) + Cluster.node.shutdown() } - - node.barrier("hello", NrOfNodes) { - println("Hello from node 2") - } - - Actor.registry.local.shutdownAll - - node.stop } } @@ -253,8 +258,8 @@ An example output from this would be: ... [info] == multi-jvm-run == [info] == multi-jvm / Test == - [info] Starting JVM-Node1 for example.TestMultiJvmNode1 - [info] Starting JVM-Node2 for example.TestMultiJvmNode2 + [info] Starting JVM-Node1 for example.SampleMultiJvmNode1 + [info] Starting JVM-Node2 for example.SampleMultiJvmNode2 [JVM-Node1] Loading config [akka.conf] from the application classpath. [JVM-Node2] Loading config [akka.conf] from the application classpath. ... diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 8e5a39b280..40188f1dd8 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -48,7 +48,7 @@ akka { format = "akka.serialization.Format$Default$" clustered { # makes the actor available in the cluster registry # default (if omitted) is local non-clustered actor - home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor + home = "node:node1" # defines the hostname, IP-address or node name of the "home" node for clustered actor # available: "host:", "ip:" and "node:" # default is "host:localhost" replicas = 3 # number of actor replicas in the cluster @@ -61,17 +61,6 @@ akka { # default is 'off' } } - - service-pong {} # local actor - - service-hello { - router = "round-robin" - clustered { - home = "host:localhost" - replicas = 3 - stateless = on - } - } } default-dispatcher {