diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala
index d92b1a5a67..bd56a79a8e 100644
--- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala
+++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala
@@ -20,7 +20,7 @@ class DeployerSpec extends WordSpec with MustMatchers {
LeastCPU,
"akka.serialization.Format$Default$",
Clustered(
- Node("test-1"),
+ Node("node1"),
Replicate(3),
Stateless))))
}
diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala
index 8bf98a9bcd..8560e951b2 100644
--- a/akka-actor/src/main/scala/akka/actor/Actor.scala
+++ b/akka-actor/src/main/scala/akka/actor/Actor.scala
@@ -6,7 +6,7 @@ package akka.actor
import DeploymentConfig._
import akka.dispatch._
-import akka.config.Config
+import akka.config._
import Config._
import akka.util.{ ListenerManagement, ReflectiveAccess, Duration, Helpers }
import ReflectiveAccess._
@@ -15,6 +15,7 @@ import akka.remoteinterface.RemoteSupport
import akka.japi.{ Creator, Procedure }
import akka.AkkaException
import akka.serialization.{ Format, Serializer }
+import akka.cluster.ClusterNode
import akka.event.EventHandler
import scala.reflect.BeanProperty
@@ -137,7 +138,11 @@ object Actor extends ListenerManagement {
/**
* Handle to the ClusterNode. API for the cluster client.
*/
- lazy val cluster: ClusterModule.ClusterNode = ClusterModule.node
+ lazy val cluster: ClusterNode = {
+ val node = ClusterModule.node
+ node.start()
+ node
+ }
/**
* Handle to the RemoteSupport. API for the remote client/server.
@@ -146,7 +151,7 @@ object Actor extends ListenerManagement {
private[akka] lazy val remote: RemoteSupport = cluster.remoteService
// start up a cluster node to join the ZooKeeper cluster
- if (ClusterModule.isEnabled) cluster.start()
+ //if (ClusterModule.isEnabled) cluster.start()
/**
* Creates an ActorRef out of the Actor with type T.
@@ -388,10 +393,10 @@ object Actor extends ListenerManagement {
if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
- val hostname = home match {
- case Host(hostname) ⇒ hostname
- case IP(address) ⇒ address
- case Node(nodeName) ⇒ Config.hostname
+ val isHomeNode = home match {
+ case Host(hostname) ⇒ hostname == Config.hostname
+ case IP(address) ⇒ address == "0.0.0.0" // FIXME checking if IP address is on home node is missing
+ case Node(nodename) ⇒ nodename == Config.nodename
}
val replicas = replication match {
@@ -402,7 +407,7 @@ object Actor extends ListenerManagement {
case NoReplicas() ⇒ 0
}
- if (hostname == Config.hostname) { // home node for clustered actor
+ if (isHomeNode) { // home node for clustered actor
def serializerErrorDueTo(reason: String) =
throw new akka.config.ConfigurationException(
@@ -433,7 +438,9 @@ object Actor extends ListenerManagement {
if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
- cluster.use(address, serializer)
+ cluster
+ .use(address, serializer)
+ .getOrElse(throw new ConfigurationException("Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
} else {
val routerType = router match {
@@ -453,7 +460,7 @@ object Actor extends ListenerManagement {
cluster.ref(address, routerType)
}
- /*
+ /*
Misc stuff:
- How to define a single ClusterNode to use? Where should it be booted up? How should it be configured?
- ClusterNode API and Actor.remote API should be made private[akka]
@@ -464,7 +471,7 @@ object Actor extends ListenerManagement {
*/
- RemoteActorRef(address, Actor.TIMEOUT, None)
+ // RemoteActorRef(address, Actor.TIMEOUT, None)
case invalid ⇒ throw new IllegalActorStateException(
"Could not create actor with address [" + address +
diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
index 61fbf56be9..5094ebed01 100644
--- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala
+++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala
@@ -1006,8 +1006,10 @@ private[akka] case class RemoteActorRef private[akka] (
case Node(nodeName) ⇒ Config.hostname
}
new InetSocketAddress(hostname, Config.remoteServerPort)
- case _ ⇒ throw new IllegalStateException(
- "Actor with Address [" + address + "] is not bound to a Clustered Deployment")
+ case _ ⇒
+ new InetSocketAddress(Config.hostname, Config.remoteServerPort)
+ //throw new IllegalStateException(
+ // "Actor with Address [" + address + "] is not bound to a Clustered Deployment")
}
start()
diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index 0fb15e7071..5879656ca3 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -265,6 +265,7 @@ object Deployer {
// --------------------------------
// akka.actor.deployment.
.clustered.home
// --------------------------------
+
val home = clusteredConfig.getString("home", "") match {
case "" ⇒ Host("localhost")
case home ⇒
diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
new file mode 100644
index 0000000000..9b31b28de0
--- /dev/null
+++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala
@@ -0,0 +1,449 @@
+/**
+ * Copyright (C) 2009-2010 Scalable Solutions AB
+ */
+
+package akka.cluster
+
+import akka.remoteinterface.RemoteSupport
+import akka.serialization.Serializer
+import akka.actor._
+import akka.dispatch.Future
+import akka.config.Config
+import akka.util._
+
+import com.eaio.uuid.UUID
+
+import java.net.InetSocketAddress
+import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
+import java.util.concurrent.{ ConcurrentSkipListSet, ConcurrentHashMap }
+
+import scala.collection.mutable.ConcurrentMap
+import scala.collection.JavaConversions._
+
+object ChangeListener {
+
+ /**
+ * Cluster membership change listener.
+ * For Scala API.
+ */
+ trait ChangeListener {
+ def notify(event: ChangeNotification, client: ClusterNode) {
+ event match {
+ case NodeConnected(name) ⇒ nodeConnected(name, client)
+ case NodeDisconnected(name) ⇒ nodeDisconnected(name, client)
+ case NewLeader(name: String) ⇒ newLeader(name, client)
+ case NewSession ⇒ thisNodeNewSession(client)
+ case ThisNode.Connected ⇒ thisNodeConnected(client)
+ case ThisNode.Disconnected ⇒ thisNodeDisconnected(client)
+ case ThisNode.Expired ⇒ thisNodeExpired(client)
+ }
+ }
+
+ def nodeConnected(node: String, client: ClusterNode) {}
+
+ def nodeDisconnected(node: String, client: ClusterNode) {}
+
+ def newLeader(name: String, client: ClusterNode) {}
+
+ def thisNodeNewSession(client: ClusterNode) {}
+
+ def thisNodeConnected(client: ClusterNode) {}
+
+ def thisNodeDisconnected(client: ClusterNode) {}
+
+ def thisNodeExpired(client: ClusterNode) {}
+ }
+
+ /**
+ * Cluster membership change listener.
+ * For Java API.
+ */
+ abstract class ChangeListenerAdapter extends ChangeListener
+
+ sealed trait ChangeNotification
+
+ case class NodeConnected(node: String) extends ChangeNotification
+
+ case class NodeDisconnected(node: String) extends ChangeNotification
+
+ case class NewLeader(name: String) extends ChangeNotification
+
+ case object NewSession extends ChangeNotification
+
+ object ThisNode {
+
+ case object Connected extends ChangeNotification
+
+ case object Disconnected extends ChangeNotification
+
+ case object Expired extends ChangeNotification
+
+ }
+}
+
+/**
+ * Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance.
+ *
+ * @author Jonas Bonér
+ */
+class NodeAddress(
+ val clusterName: String,
+ val nodeName: String,
+ val hostname: String,
+ val port: Int) {
+ if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string")
+ if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string")
+ if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string")
+ if (port < 1) throw new NullPointerException("Port can not be negative")
+
+ override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port)
+
+ override def hashCode = 0 + clusterName.## + nodeName.## + hostname.## + port.##
+
+ override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other)
+}
+
+object NodeAddress {
+
+ def apply(
+ clusterName: String = Config.clusterName,
+ nodeName: String = Config.nodename,
+ hostname: String = Config.hostname,
+ port: Int = Config.remoteServerPort): NodeAddress = new NodeAddress(clusterName, nodeName, hostname, port)
+
+ def unapply(other: Any) = other match {
+ case address: NodeAddress ⇒ Some((address.clusterName, address.nodeName, address.hostname, address.port))
+ case _ ⇒ None
+ }
+}
+
+/**
+ * Interface for cluster node.
+ *
+ * @author Jonas Bonér
+ */
+trait ClusterNode {
+ import ChangeListener._
+
+ val nodeAddress: NodeAddress
+ val zkServerAddresses: String
+
+ val remoteClientLifeCycleListener: ActorRef
+ val remoteDaemon: ActorRef
+ val remoteService: RemoteSupport
+ val remoteServerAddress: InetSocketAddress
+
+ val isConnected = new Switch(false)
+ val isLeader = new AtomicBoolean(false)
+ val electionNumber = new AtomicInteger(Int.MaxValue)
+
+ private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
+ private[cluster] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress]
+ private[cluster] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]]
+
+ def membershipNodes: Array[String]
+
+ def isRunning: Boolean = isConnected.isOn
+
+ def start(): ClusterNode
+
+ def shutdown()
+
+ def disconnect(): ClusterNode
+
+ def reconnect(): ClusterNode
+
+ /**
+ * Registers a cluster change listener.
+ */
+ def register(listener: ChangeListener): ClusterNode
+
+ /**
+ * Returns the name of the current leader.
+ */
+ def leader: String
+
+ /**
+ * Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
+ */
+ def resign()
+
+ /**
+ * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode
+
+ /**
+ * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode
+
+ /**
+ * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode
+
+ /**
+ * Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode
+
+ /**
+ * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store(actorRef: ActorRef, format: Serializer): ClusterNode
+
+ /**
+ * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode
+
+ /**
+ * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode
+
+ /**
+ * Needed to have reflection through structural typing work.
+ */
+ def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode
+
+ /**
+ * Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
+ * with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
+ * available durable store.
+ */
+ def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode
+
+ /**
+ * Removes actor with uuid from the cluster.
+ */
+ def remove(uuid: UUID)
+
+ /**
+ * Removes actor with address from the cluster.
+ */
+ def remove(address: String): ClusterNode
+
+ /**
+ * Is the actor with uuid clustered or not?
+ */
+ def isClustered(actorAddress: String): Boolean
+
+ /**
+ * Is the actor with uuid in use on 'this' node or not?
+ */
+ def isInUseOnNode(actorAddress: String): Boolean
+
+ /**
+ * Is the actor with uuid in use or not?
+ */
+ def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean
+
+ /**
+ * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
+ * for remote access through lookup by its UUID.
+ */
+ def use[T <: Actor](actorAddress: String): Option[LocalActorRef]
+
+ /**
+ * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
+ * for remote access through lookup by its UUID.
+ */
+ def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef]
+
+ /**
+ * Using (checking out) all actors with a specific UUID on all nodes in the cluster.
+ */
+ def useActorOnAllNodes(uuid: UUID)
+
+ /**
+ * Using (checking out) specific UUID on a specefic node.
+ */
+ def useActorOnNode(node: String, uuid: UUID)
+
+ /**
+ * Checks in an actor after done using it on this node.
+ */
+ def release(actorAddress: String)
+
+ /**
+ * Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'.
+ */
+ def releaseActorOnAllNodes(uuid: UUID)
+
+ /**
+ * Creates an ActorRef with a Router to a set of clustered actors.
+ */
+ def ref(actorAddress: String, router: RouterType): ActorRef
+
+ /**
+ * Migrate the actor from 'this' node to node 'to'.
+ */
+ def migrate(to: NodeAddress, actorAddress: String)
+
+ /**
+ * Migrate the actor from node 'from' to node 'to'.
+ */
+ def migrate(from: NodeAddress, to: NodeAddress, actorAddress: String)
+
+ /**
+ * Returns the UUIDs of all actors checked out on this node.
+ */
+ def uuidsForActorsInUse: Array[UUID]
+
+ /**
+ * Returns the addresses of all actors checked out on this node.
+ */
+ def addressesForActorsInUse: Array[String]
+
+ /**
+ * Returns the UUIDs of all actors registered in this cluster.
+ */
+ def uuidsForClusteredActors: Array[UUID]
+
+ /**
+ * Returns the addresses of all actors registered in this cluster.
+ */
+ def addressesForClusteredActors: Array[String]
+
+ /**
+ * Returns the actor id for the actor with a specific UUID.
+ */
+ def actorAddressForUuid(uuid: UUID): String
+
+ /**
+ * Returns the actor ids for all the actors with a specific UUID.
+ */
+ def actorAddressForUuids(uuids: Array[UUID]): Array[String]
+
+ /**
+ * Returns the actor UUIDs for actor ID.
+ */
+ def uuidsForActorAddress(actorAddress: String): Array[UUID]
+
+ /**
+ * Returns the node names of all actors in use with UUID.
+ */
+ def nodesForActorsInUseWithUuid(uuid: UUID): Array[String]
+
+ /**
+ * Returns the UUIDs of all actors in use registered on a specific node.
+ */
+ def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID]
+
+ /**
+ * Returns the addresses of all actors in use registered on a specific node.
+ */
+ def addressesForActorsInUseOnNode(nodeName: String): Array[String]
+
+ /**
+ * Returns Format for actor with UUID.
+ */
+ def formatForActor(actorAddress: String): Serializer
+
+ /**
+ * Returns home address for actor with UUID.
+ */
+ def addressesForActor(actorAddress: String): Array[(UUID, InetSocketAddress)]
+
+ /**
+ * Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
+ */
+ def send(f: Function0[Unit], replicationFactor: Int)
+
+ /**
+ * Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
+ * Returns an 'Array' with all the 'Future's from the computation.
+ */
+ def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]]
+
+ /**
+ * Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
+ * with the argument speficied.
+ */
+ def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int)
+
+ /**
+ * Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
+ * with the argument speficied.
+ * Returns an 'Array' with all the 'Future's from the computation.
+ */
+ def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]]
+
+ def setConfigElement(key: String, bytes: Array[Byte])
+
+ /**
+ * Returns the config element for the key or NULL if no element exists under the key.
+ */
+ def getConfigElement(key: String): Array[Byte]
+
+ def removeConfigElement(key: String)
+
+ def getConfigElementKeys: Array[String]
+
+ private[cluster] def initializeNode()
+
+ private[cluster] def addressForNode(node: String): InetSocketAddress
+
+ private[cluster] def publish(change: ChangeNotification)
+
+ private[cluster] def findFailedNodes(nodes: List[String]): List[String]
+
+ private[cluster] def findNewlyConnectedMembershipNodes(nodes: List[String]): List[String]
+
+ private[cluster] def findNewlyDisconnectedMembershipNodes(nodes: List[String]): List[String]
+
+ private[cluster] def findNewlyConnectedAvailableNodes(nodes: List[String]): List[String]
+
+ private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String]
+
+ private[cluster] def joinMembershipNode()
+
+ private[cluster] def joinActorsAtAddressNode()
+
+ private[cluster] def joinLeaderElection: Boolean
+
+ private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress)
+
+ private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String])
+
+ private[cluster] def membershipPathFor(node: String): String
+
+ private[cluster] def configurationPathFor(key: String): String
+
+ private[cluster] def actorAddressToUuidsPathFor(actorAddress: String): String
+
+ private[cluster] def actorLocationsPathFor(uuid: UUID): String
+
+ private[cluster] def actorLocationsPathFor(uuid: UUID, node: NodeAddress): String
+
+ private[cluster] def actorsAtNodePathFor(node: String): String
+
+ private[cluster] def actorAtNodePathFor(node: String, uuid: UUID): String
+
+ private[cluster] def actorRegistryPathFor(uuid: UUID): String
+
+ private[cluster] def actorRegistryFormatPathFor(uuid: UUID): String
+
+ private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID): String
+
+ private[cluster] def actorRegistryNodePathFor(uuid: UUID): String
+
+ private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String
+}
+
diff --git a/akka-actor/src/main/scala/akka/config/Config.scala b/akka-actor/src/main/scala/akka/config/Config.scala
index 18d054ab15..16daea4c88 100644
--- a/akka-actor/src/main/scala/akka/config/Config.scala
+++ b/akka-actor/src/main/scala/akka/config/Config.scala
@@ -115,6 +115,8 @@ object Config {
case value ⇒ value.toInt
}
+ val clusterName = config.getString("akka.cluster.name", "default")
+
val startTime = System.currentTimeMillis
def uptime = (System.currentTimeMillis - startTime) / 1000
}
diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
index 3efdb67a63..35196813ae 100644
--- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
+++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala
@@ -11,6 +11,7 @@ import akka.actor._
import DeploymentConfig.Deploy
import akka.event.EventHandler
import akka.serialization.Format
+import akka.cluster.ClusterNode
import java.net.InetSocketAddress
@@ -71,24 +72,6 @@ object ReflectiveAccess {
clusterDeployerInstance.get
}
- type ClusterNode = {
- def start()
- def shutdown()
-
- def remoteService: RemoteSupport
-
- def store(address: String, actorClass: Class[_ <: Actor], replicas: Int, serializeMailbox: Boolean, format: Serializer)
- def store(actorRef: ActorRef, replicas: Int, serializeMailbox: Boolean, format: Serializer)
-
- def remove(address: String)
-
- def use(address: String, format: Serializer): Array[ActorRef]
- def ref(address: String, router: RouterType): ActorRef
-
- def isClustered(address: String): Boolean
- def nrOfActors: Int
- }
-
type ClusterDeployer = {
def init(deployments: List[Deploy])
def shutdown()
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index a32c462441..1da99f88b5 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -38,6 +38,7 @@ import Compression.LZF
import akka.AkkaException
import akka.cluster.zookeeper._
+import akka.cluster.ChangeListener._
import com.eaio.uuid.UUID
@@ -153,7 +154,7 @@ object Cluster {
val UUID_PREFIX = "uuid:".intern
// config options
- val name = config.getString("akka.cluster.name", "default")
+ val name = Config.clusterName
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
@@ -162,64 +163,6 @@ object Cluster {
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
val enableJMX = config.getBool("akka.enable-jmx", true)
- /**
- * Cluster membership change listener.
- * For Scala API.
- */
- trait ChangeListener {
- def notify(event: ChangeNotification, client: ClusterNode) {
- event match {
- case NodeConnected(name) ⇒ nodeConnected(name, client)
- case NodeDisconnected(name) ⇒ nodeDisconnected(name, client)
- case NewLeader(name: String) ⇒ newLeader(name, client)
- case NewSession ⇒ thisNodeNewSession(client)
- case ThisNode.Connected ⇒ thisNodeConnected(client)
- case ThisNode.Disconnected ⇒ thisNodeDisconnected(client)
- case ThisNode.Expired ⇒ thisNodeExpired(client)
- }
- }
-
- def nodeConnected(node: String, client: ClusterNode) {}
-
- def nodeDisconnected(node: String, client: ClusterNode) {}
-
- def newLeader(name: String, client: ClusterNode) {}
-
- def thisNodeNewSession(client: ClusterNode) {}
-
- def thisNodeConnected(client: ClusterNode) {}
-
- def thisNodeDisconnected(client: ClusterNode) {}
-
- def thisNodeExpired(client: ClusterNode) {}
- }
-
- /**
- * Cluster membership change listener.
- * For Java API.
- */
- abstract class ChangeListenerAdapter extends ChangeListener
-
- sealed trait ChangeNotification
-
- case class NodeConnected(node: String) extends ChangeNotification
-
- case class NodeDisconnected(node: String) extends ChangeNotification
-
- case class NewLeader(name: String) extends ChangeNotification
-
- case object NewSession extends ChangeNotification
-
- object ThisNode {
-
- case object Connected extends ChangeNotification
-
- case object Disconnected extends ChangeNotification
-
- case object Expired extends ChangeNotification
-
- }
-
@volatile
private var properties = Map.empty[String, String]
@@ -249,14 +192,14 @@ object Cluster {
/**
* The node address.
*/
- lazy val nodeAddress = NodeAddress(name, nodename, hostname, port)
+ val nodeAddress = NodeAddress(name, nodename, hostname, port)
/**
* The reference to the running ClusterNode.
*/
- lazy val node: ClusterNode = {
+ val node = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
- new ClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
+ new DefaultClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
}
/**
@@ -319,6 +262,14 @@ object Cluster {
*/
def newZkClient: AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
+ def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
+
+ def barrier(name: String, count: Int) =
+ ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count)
+
+ def barrier(name: String, count: Int, timeout: Duration) =
+ ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count, timeout)
+
def uuidToString(uuid: UUID): String = uuid.toString
def stringToUuid(uuid: String): UUID = {
@@ -348,35 +299,26 @@ object Cluster {
*
* @author Jonas Bonér
*/
-class ClusterNode private[akka] (
+class DefaultClusterNode private[akka] (
val nodeAddress: NodeAddress,
val zkServerAddresses: String,
- val serializer: ZkSerializer) extends ErrorHandler {
+ val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
self ⇒
if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'")
+ val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
+
import Cluster._
- EventHandler.info(this,
- ("\nCreating cluster node with" +
- "\n\tcluster name = [%s]" +
- "\n\tnode name = [%s]" +
- "\n\tport = [%s]" +
- "\n\tzookeeper server addresses = [%s]" +
- "\n\tserializer = [%s]")
- .format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
-
- val remoteClientLifeCycleListener = actorOf(new Actor {
+ lazy val remoteClientLifeCycleListener = actorOf(new Actor {
def receive = {
case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule()
case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule()
case _ ⇒ //ignore other
}
}, "akka.cluster.remoteClientLifeCycleListener").start()
-
lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
-
lazy val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
remote.start(nodeAddress.hostname, nodeAddress.port)
@@ -386,8 +328,6 @@ class ClusterNode private[akka] (
}
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
- val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
-
// static nodes
val CLUSTER_NODE = "/" + nodeAddress.clusterName
val MEMBERSHIP_NODE = CLUSTER_NODE + "/members"
@@ -409,17 +349,8 @@ class ClusterNode private[akka] (
val LEADER_ELECTION_NODE = CLUSTER_NODE + "/leader" // should NOT be part of 'baseNodes' only used by 'leaderLock'
- val isConnected = new Switch(false)
- val isLeader = new AtomicBoolean(false)
- val electionNumber = new AtomicInteger(Integer.MAX_VALUE)
-
private val membershipNodePath = membershipPathFor(nodeAddress.nodeName)
- // local caches of ZK data
- private[akka] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
- private[akka] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress]
- private[akka] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]]
-
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
private[akka] val replicaConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] =
@@ -436,13 +367,13 @@ class ClusterNode private[akka] (
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
// resources
- private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
+ lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
- private[cluster] val leaderElectionCallback = new LockListener {
+ lazy private[cluster] val leaderElectionCallback = new LockListener {
override def lockAcquired() {
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
self.isLeader.set(true)
- self.publish(Cluster.NewLeader(self.nodeAddress.nodeName))
+ self.publish(NewLeader(self.nodeAddress.nodeName))
}
override def lockReleased() {
@@ -453,7 +384,7 @@ class ClusterNode private[akka] (
}
}
- private[cluster] val leaderLock = new WriteLock(
+ lazy private[cluster] val leaderLock = new WriteLock(
zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) {
// ugly hack, but what do you do? <--- haha epic
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
@@ -468,8 +399,6 @@ class ClusterNode private[akka] (
// Node
// =======================================
- def isRunning: Boolean = isConnected.isOn
-
def start(): ClusterNode = {
isConnected switchOn {
initializeNode()
@@ -738,13 +667,13 @@ class ClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String): Array[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
+ def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String, format: Serializer): Array[LocalActorRef] = if (isConnected.isOn) {
+ def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@@ -780,8 +709,8 @@ class ClusterNode private[akka] (
actor.asInstanceOf[LocalActorRef]
case Right(exception) ⇒ throw exception
}
- }
- } else Array.empty[LocalActorRef]
+ } headOption // FIXME should not be an array at all coming here
+ } else None
/**
* Using (checking out) all actors with a specific UUID on all nodes in the cluster.
@@ -1151,22 +1080,6 @@ class ClusterNode private[akka] (
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]]
- // =======================================
- // Queue
- // =======================================
-
- def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(zkClient, rootPath, blocking)
-
- // =======================================
- // Barrier
- // =======================================
-
- def barrier(name: String, count: Int) =
- ZooKeeperBarrier(zkClient, nodeAddress.clusterName, name, nodeAddress.nodeName, count)
-
- def barrier(name: String, count: Int, timeout: Duration) =
- ZooKeeperBarrier(zkClient, nodeAddress.clusterName, name, nodeAddress.nodeName, count, timeout)
-
// =======================================
// Private
// =======================================
@@ -1198,7 +1111,14 @@ class ClusterNode private[akka] (
"%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort)
private[cluster] def initializeNode() {
- EventHandler.info(this, "Initializing cluster node [%s]".format(nodeAddress))
+ EventHandler.info(this,
+ ("\nCreating cluster node with" +
+ "\n\tcluster name = [%s]" +
+ "\n\tnode name = [%s]" +
+ "\n\tport = [%s]" +
+ "\n\tzookeeper server addresses = [%s]" +
+ "\n\tserializer = [%s]")
+ .format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createRootClusterNode()
val isLeader = joinLeaderElection
@@ -1499,12 +1419,12 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.findNewlyConnectedMembershipNodes(childList) foreach { name ⇒
self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map
- self.publish(Cluster.NodeConnected(name))
+ self.publish(NodeConnected(name))
}
self.findNewlyDisconnectedMembershipNodes(childList) foreach { name ⇒
self.nodeNameToAddress.remove(name) // update 'nodename-address' map
- self.publish(Cluster.NodeDisconnected(name))
+ self.publish(NodeDisconnected(name))
}
self.locallyCachedMembershipNodes.clear()
@@ -1522,13 +1442,13 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
state match {
case KeeperState.SyncConnected ⇒
EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress))
- self.publish(Cluster.ThisNode.Connected)
+ self.publish(ThisNode.Connected)
case KeeperState.Disconnected ⇒
EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress))
- self.publish(Cluster.ThisNode.Disconnected)
+ self.publish(ThisNode.Disconnected)
case KeeperState.Expired ⇒
EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress))
- self.publish(Cluster.ThisNode.Expired)
+ self.publish(ThisNode.Expired)
}
}
@@ -1538,7 +1458,7 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
def handleNewSession() {
EventHandler.debug(this, "Session expired re-initializing node [%s]".format(self.nodeAddress))
self.initializeNode()
- self.publish(Cluster.NewSession)
+ self.publish(NewSession)
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index c8ac1012f4..abcbbb58ee 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -56,5 +56,5 @@ class ClusterActorRef private[akka] (
// clustered refs are always registered and looked up by UUID
private def createRemoteActorRef(uuid: UUID, address: InetSocketAddress) =
- RemoteActorRef(UUID_PREFIX + uuidToString(uuid), Actor.TIMEOUT, None)
+ RemoteActorRef(uuidToString(uuid), Actor.TIMEOUT, None)
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
index c8d22ad616..ceadbd8585 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
@@ -4,7 +4,7 @@
package akka.cluster
-import akka.actor.{ DeploymentConfig, Deployer, DeploymentException }
+import akka.actor.{ DeploymentConfig, Deployer, LocalDeployer, DeploymentException }
import DeploymentConfig._
import akka.event.EventHandler
import akka.config.Config
@@ -119,17 +119,23 @@ object ClusterDeployer {
private[akka] def deploy(deployment: Deploy) {
ensureRunning {
- val path = deploymentAddressPath.format(deployment.address)
- try {
- ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
- zkClient.writeData(path, deployment)
+ deployment match {
+ case Deploy(_, _, _, Local) ⇒ // local deployment
+ LocalDeployer.deploy(deployment)
- // FIXME trigger cluster-wide deploy action
- } catch {
- case e: NullPointerException ⇒
- handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
- case e: Exception ⇒
- handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e))
+ case _ ⇒ // cluster deployment
+ val path = deploymentAddressPath.format(deployment.address)
+ try {
+ ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
+ zkClient.writeData(path, deployment)
+
+ // FIXME trigger cluster-wide deploy action
+ } catch {
+ case e: NullPointerException ⇒
+ handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
+ case e: Exception ⇒
+ handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e))
+ }
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala
index bc5c189a1c..c5f9addc36 100644
--- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala
@@ -39,12 +39,6 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
oldDeployment must equal(newDeployment.get)
}
}
-
- "be able to create an actor deployed using ClusterDeployer, add it to ZooKeeper and then check the actor out for use" in {
- val pi = Actor.actorOf[HelloWorld]("service-hello")
- pi must not equal (null)
- pi.address must equal("service-hello")
- }
}
override def beforeAll() {
diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.conf
rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode1.opts
rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode1.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.conf
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.conf
rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.conf
diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmNode2.opts
rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmNode2.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmSpec.scala
similarity index 60%
rename from akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmSpec.scala
rename to akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmSpec.scala
index fe1e07109f..d949d99d26 100644
--- a/akka-cluster/src/test/scala/akka/cluster/multi/ClusterMultiJvmSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/sample/SampleMultiJvmSpec.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009-2011 Scalable Solutions AB
*/
-package akka.cluster.multi
+package akka.cluster.sample
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
@@ -10,16 +10,15 @@ import org.scalatest.BeforeAndAfterAll
import akka.cluster._
-object ClusterMultiJvmSpec {
+object SampleMultiJvmSpec {
val NrOfNodes = 2
}
-class ClusterMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
- import ClusterMultiJvmSpec._
+class SampleMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
+ import SampleMultiJvmSpec._
override def beforeAll() = {
Cluster.startLocalCluster()
- // resetCluster()
}
override def afterAll() = {
@@ -45,23 +44,17 @@ class ClusterMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfte
}
"be able to start all nodes" in {
- Cluster.node.barrier("start", NrOfNodes) {
- // Cluster.node.start()
+ Cluster.barrier("start", NrOfNodes) {
+ Cluster.node.start()
}
- // Cluster.node.isRunning must be(true)
- }
-
- "be able to shutdown all nodes" in {
- Cluster.node.barrier("shutdown", NrOfNodes) {
- // Cluster.node.shutdown()
- }
- // Cluster.node.isRunning must be(false)
+ Cluster.node.isRunning must be(true)
+ Cluster.node.shutdown()
}
}
}
-class ClusterMultiJvmNode2 extends WordSpec with MustMatchers {
- import ClusterMultiJvmSpec._
+class SampleMultiJvmNode2 extends WordSpec with MustMatchers {
+ import SampleMultiJvmSpec._
"A cluster" must {
@@ -72,17 +65,11 @@ class ClusterMultiJvmNode2 extends WordSpec with MustMatchers {
}
"be able to start all nodes" in {
- Cluster.node.barrier("start", NrOfNodes) {
- // Cluster.node.start()
+ Cluster.barrier("start", NrOfNodes) {
+ Cluster.node.start()
}
- // Cluster.node.isRunning must be(true)
- }
-
- "be able to shutdown all nodes" in {
- Cluster.node.barrier("shutdown", NrOfNodes) {
- // Cluster.node.shutdown()
- }
- // Cluster.node.isRunning must be(false)
+ Cluster.node.isRunning must be(true)
+ Cluster.node.shutdown()
}
}
}
diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf
new file mode 100644
index 0000000000..deaf85b42c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf
@@ -0,0 +1,4 @@
+akka.actor.deployment.service-hello.router = "round-robin"
+akka.actor.deployment.service-hello.clustered.home = "node:node1"
+akka.actor.deployment.service-hello.clustered.replicas = 2
+akka.actor.deployment.service-hello.clustered.stateless = on
diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts
new file mode 100644
index 0000000000..a88c260d8c
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf
new file mode 100644
index 0000000000..0c1a16ea15
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf
@@ -0,0 +1,4 @@
+akka.actor.deployment.service-hello.router = "round-robin"
+akka.actor.deployment.service-hello.clustered.home = "node:node1"
+akka.actor.deployment.service-hello.clustered.replicas = 2
+akka.actor.deployment.service-hello.clustered.stateless = on
\ No newline at end of file
diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts
new file mode 100644
index 0000000000..f1e01f253d
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts
@@ -0,0 +1 @@
+-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala
new file mode 100644
index 0000000000..8e32f3960b
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala
@@ -0,0 +1,89 @@
+/**
+ * Copyright (C) 2009-2011 Scalable Solutions AB
+ */
+
+package akka.cluster.store_actor
+
+import org.scalatest.WordSpec
+import org.scalatest.matchers.MustMatchers
+import org.scalatest.BeforeAndAfterAll
+
+import akka.cluster._
+import akka.actor._
+import Actor._
+
+object StoreActorMultiJvmSpec {
+ val NrOfNodes = 2
+
+ class HelloWorld extends Actor with Serializable {
+ def receive = {
+ case "Hello" ⇒ self.reply("World")
+ }
+ }
+}
+
+class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
+ import StoreActorMultiJvmSpec._
+
+ "A cluster" must {
+
+ "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
+ System.getProperty("akka.cluster.nodename", "") must be("node1")
+ System.getProperty("akka.cluster.port", "") must be("9991")
+
+ Cluster.barrier("start-node1", NrOfNodes) {
+ Cluster.node.start()
+ }
+
+ Cluster.barrier("start-node2", NrOfNodes) {}
+
+ Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {
+ val pi = Actor.actorOf[HelloWorld]("service-hello")
+ pi must not equal (null)
+ pi.address must equal("service-hello")
+ pi.isInstanceOf[LocalActorRef] must be(true)
+ }
+
+ Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
+
+ Cluster.node.shutdown()
+ }
+ }
+
+ override def beforeAll() = {
+ Cluster.startLocalCluster()
+ }
+
+ override def afterAll() = {
+ Cluster.shutdownLocalCluster()
+ }
+}
+
+class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers {
+ import StoreActorMultiJvmSpec._
+
+ "A cluster" must {
+
+ "create clustered actor, get a 'local' actor on 'home' node and a 'ref' to actor on remote node" in {
+ System.getProperty("akka.cluster.nodename", "") must be("node2")
+ System.getProperty("akka.cluster.port", "") must be("9992")
+
+ Cluster.barrier("start-node1", NrOfNodes) {}
+
+ Cluster.barrier("start-node2", NrOfNodes) {
+ Cluster.node.start()
+ }
+
+ Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {}
+
+ Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
+ val pi = Actor.actorOf[HelloWorld]("service-hello")
+ pi must not equal (null)
+ pi.address must equal("service-hello")
+ pi.isInstanceOf[ClusterActorRef] must be(true)
+ }
+
+ Cluster.node.shutdown()
+ }
+ }
+}
diff --git a/akka-docs/dev/multi-jvm-testing.rst b/akka-docs/dev/multi-jvm-testing.rst
index c69ad34adb..84a3e5de32 100644
--- a/akka-docs/dev/multi-jvm-testing.rst
+++ b/akka-docs/dev/multi-jvm-testing.rst
@@ -41,19 +41,19 @@ like the following::
package example
- object TestMultiJvmNode1 {
+ object SampleMultiJvmNode1 {
def main(args: Array[String]) {
println("Hello from node 1")
}
}
- object TestMultiJvmNode2 {
+ object SampleMultiJvmNode2 {
def main(args: Array[String]) {
println("Hello from node 2")
}
}
- object TestMultiJvmNode3 {
+ object SampleMultiJvmNode3 {
def main(args: Array[String]) {
println("Hello from node 3")
}
@@ -68,9 +68,9 @@ spawned, one for each node. It will look like this:
...
[info] == multi-jvm-run ==
[info] == multi-jvm / Test ==
- [info] Starting JVM-Node1 for example.TestMultiJvmNode1
- [info] Starting JVM-Node2 for example.TestMultiJvmNode2
- [info] Starting JVM-Node3 for example.TestMultiJvmNode3
+ [info] Starting JVM-Node1 for example.SampleMultiJvmNode1
+ [info] Starting JVM-Node2 for example.SampleMultiJvmNode2
+ [info] Starting JVM-Node3 for example.SampleMultiJvmNode3
[JVM-Node1] Hello from node 1
[JVM-Node2] Hello from node 2
[JVM-Node3] Hello from node 3
@@ -98,21 +98,22 @@ Setting JVM options
-------------------
You can define specific JVM options for each of the spawned JVMs. You do that by creating
-a file named after the node in the test with suffix ``.opts``.
+a file named after the node in the test with suffix ``.opts`` and put them in the same
+directory as the test.
For example, to feed the JVM options ``-Dakka.cluster.nodename=node1`` and
-``-Dakka.cluster.port=9991`` to the ``TestMultiJvmNode1`` let's create three ``*.opts`` files
+``-Dakka.cluster.port=9991`` to the ``SampleMultiJvmNode1`` let's create three ``*.opts`` files
and add the options to them.
-``TestMultiJvmNode1.opts``::
+``SampleMultiJvmNode1.opts``::
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
-``TestMultiJvmNode2.opts``::
+``SampleMultiJvmNode2.opts``::
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
-``TestMultiJvmNode3.opts``::
+``SampleMultiJvmNode3.opts``::
-Dakka.cluster.nodename=node3 -Dakka.cluster.port=9993
@@ -120,20 +121,21 @@ Overriding akka.conf options
----------------------------
You can also override the options in the ``akka.conf`` file with different options for each
-spawned JVM. You do that by creating a file named after the node in the test with suffix ``.conf``.
+spawned JVM. You do that by creating a file named after the node in the test with suffix
+``.conf`` and put them in the same directory as the test .
-For example, to override the configuration option ``akka.cluster.name`` let's create three ``*.conf`` files
-and add the option to them.
+For example, to override the configuration option ``akka.cluster.name`` let's create three
+``*.conf`` files and add the option to them.
-``TestMultiJvmNode1.conf``::
+``SampleMultiJvmNode1.conf``::
akka.cluster.name = "test-cluster"
-``TestMultiJvmNode2.conf``::
+``SampleMultiJvmNode2.conf``::
akka.cluster.name = "test-cluster"
-``TestMultiJvmNode3.conf``::
+``SampleMultiJvmNode3.conf``::
akka.cluster.name = "test-cluster"
@@ -175,10 +177,10 @@ Zookeeper Barrier
~~~~~~~~~~~~~~~~~
When running multi-JVM tests it's common to need to coordinate timing across
-nodes. To do this there is a Zookeeper-based double-barrier (there is both an
+nodes. To do this there is a ZooKeeper-based double-barrier (there is both an
entry barrier and an exit barrier). ClusterNodes also have support for creating
barriers easily. To wait at the entry use the ``enter`` method. To wait at the
-exit use the ``leave`` method. It's also possible to pass a block of code which
+exit use the ``leave`` method. It's also possible t pass a block of code which
will be run between the barriers.
When creating a barrier you pass it a name and the number of nodes that are
@@ -188,60 +190,63 @@ timeout is 60 seconds.
Here is an example of coordinating the starting of two nodes and then running
something in coordination::
- package example
+ import org.scalatest.WordSpec
+ import org.scalatest.matchers.MustMatchers
+ import org.scalatest.BeforeAndAfterAll
import akka.cluster._
- import akka.actor._
- object TestMultiJvmNode1 {
+ object SampleMultiJvmSpec {
val NrOfNodes = 2
+ }
- def main(args: Array[String]) {
+ class SampleMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll {
+ import SampleMultiJvmSpec._
+
+ override def beforeAll() = {
Cluster.startLocalCluster()
+ }
- val node = Cluster.newNode(NodeAddress("example", "node1", port = 9991))
+ override def afterAll() = {
+ Cluster.shutdownLocalCluster()
+ }
- node.barrier("start-node1", NrOfNodes) {
- node.start
+ "A cluster" must {
+
+ "have jvm options" in {
+ System.getProperty("akka.cluster.nodename", "") must be("node1")
+ System.getProperty("akka.cluster.port", "") must be("9991")
+ akka.config.Config.config.getString("test.name", "") must be("node1")
}
- node.barrier("start-node2", NrOfNodes) {
- // wait for node 2 to start
+ "be able to start all nodes" in {
+ Cluster.barrier("start", NrOfNodes) {
+ Cluster.node.start()
+ }
+ Cluster.node.isRunning must be(true)
+ Cluster.node.shutdown()
}
-
- node.barrier("hello", NrOfNodes) {
- println("Hello from node 1")
- }
-
- Actor.registry.local.shutdownAll
-
- node.stop
-
- Cluster.shutdownLocalCluster
}
}
- object TestMultiJvmNode2 {
- val NrOfNodes = 2
+ class SampleMultiJvmNode2 extends WordSpec with MustMatchers {
+ import SampleMultiJvmSpec._
- def main(args: Array[String]) {
- val node = Cluster.newNode(NodeAddress("example", "node2", port = 9992))
+ "A cluster" must {
- node.barrier("start-node1", NrOfNodes) {
- // wait for node 1 to start
+ "have jvm options" in {
+ System.getProperty("akka.cluster.nodename", "") must be("node2")
+ System.getProperty("akka.cluster.port", "") must be("9992")
+ akka.config.Config.config.getString("test.name", "") must be("node2")
}
- node.barrier("start-node2", NrOfNodes) {
- node.start
+ "be able to start all nodes" in {
+ Cluster.barrier("start", NrOfNodes) {
+ Cluster.node.start()
+ }
+ Cluster.node.isRunning must be(true)
+ Cluster.node.shutdown()
}
-
- node.barrier("hello", NrOfNodes) {
- println("Hello from node 2")
- }
-
- Actor.registry.local.shutdownAll
-
- node.stop
}
}
@@ -253,8 +258,8 @@ An example output from this would be:
...
[info] == multi-jvm-run ==
[info] == multi-jvm / Test ==
- [info] Starting JVM-Node1 for example.TestMultiJvmNode1
- [info] Starting JVM-Node2 for example.TestMultiJvmNode2
+ [info] Starting JVM-Node1 for example.SampleMultiJvmNode1
+ [info] Starting JVM-Node2 for example.SampleMultiJvmNode2
[JVM-Node1] Loading config [akka.conf] from the application classpath.
[JVM-Node2] Loading config [akka.conf] from the application classpath.
...
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 8e5a39b280..40188f1dd8 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -48,7 +48,7 @@ akka {
format = "akka.serialization.Format$Default$"
clustered { # makes the actor available in the cluster registry
# default (if omitted) is local non-clustered actor
- home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor
+ home = "node:node1" # defines the hostname, IP-address or node name of the "home" node for clustered actor
# available: "host:", "ip:" and "node:"
# default is "host:localhost"
replicas = 3 # number of actor replicas in the cluster
@@ -61,17 +61,6 @@ akka {
# default is 'off'
}
}
-
- service-pong {} # local actor
-
- service-hello {
- router = "round-robin"
- clustered {
- home = "host:localhost"
- replicas = 3
- stateless = on
- }
- }
}
default-dispatcher {