diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index 15316f727d..7149c6c984 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -20,7 +20,7 @@ class DeployerSpec extends WordSpec with MustMatchers { LeastCPU, "akka.serialization.Format$Default$", Clustered( - Node("node1"), + Vector(Node("node1")), Replicate(3), Replication( TransactionLog, diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 93b46be127..8b284b52bc 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -144,6 +144,7 @@ object Actor extends ListenerManagement { def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS)) def this(length: Long, unit: TimeUnit) = this(Duration(length, unit)) } + object Timeout { def apply(timeout: Long) = new Timeout(timeout) def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit) @@ -183,7 +184,7 @@ object Actor extends ListenerManagement { class LoggingReceive(source: AnyRef, r: Receive) extends Receive { def isDefinedAt(o: Any) = { val handled = r.isDefinedAt(o) - EventHandler.debug(source, "received " + (if (handled) "handled" else "unhandled") + " message " + o) + EventHandler.debug(source, "Received " + (if (handled) "handled" else "unhandled") + " message " + o) handled } def apply(o: Any): Unit = r(o) @@ -443,7 +444,7 @@ object Actor extends ListenerManagement { case Deploy( configAdress, router, serializerClassName, Clustered( - home, + preferredHomeNodes, replicas, replication)) ⇒ @@ -454,7 +455,7 @@ object Actor extends ListenerManagement { if (!Actor.remote.isRunning) throw new IllegalStateException( "Remote server is not running") - val isHomeNode = DeploymentConfig.isHomeNode(home) + val isHomeNode = preferredHomeNodes exists (home ⇒ DeploymentConfig.isHomeNode(home)) val nrOfReplicas = DeploymentConfig.replicaValueFor(replicas) def serializerErrorDueTo(reason: String) = diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 685197820b..85bfd26dec 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -31,7 +31,7 @@ object DeploymentConfig { case class Deploy( address: String, routing: Routing = Direct, - format: String = Serializer.defaultSerializerName, // Format.defaultSerializerName, + format: String = Serializer.defaultSerializerName, scope: Scope = Local) // -------------------------------- @@ -61,7 +61,7 @@ object DeploymentConfig { // -------------------------------- sealed trait Scope case class Clustered( - home: Home = Host("localhost"), + preferredNodes: Iterable[Home] = Vector(Host("localhost")), replicas: Replicas = NoReplicas, replication: ReplicationScheme = Transient) extends Scope @@ -139,12 +139,19 @@ object DeploymentConfig { // --- Helper methods for parsing // -------------------------------- - def isHomeNode(home: Home): Boolean = home match { - case Host(hostname) ⇒ hostname == Config.hostname - case IP(address) ⇒ address == "0.0.0.0" || address == "127.0.0.1" // FIXME look up IP address from the system - case Node(nodename) ⇒ nodename == Config.nodename + def nodeNameFor(home: Home): String = { + home match { + case Node(nodename) ⇒ nodename + case Host("localhost") ⇒ Config.nodename + case IP("0.0.0.0") ⇒ Config.nodename + case IP("127.0.0.1") ⇒ Config.nodename + case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]") + case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]") + } } + def isHomeNode(home: Home): Boolean = nodeNameFor(home) == Config.nodeName + def replicaValueFor(replicas: Replicas): Int = replicas match { case Replicate(replicas) ⇒ replicas case AutoReplicate ⇒ -1 @@ -166,7 +173,7 @@ object DeploymentConfig { case LeastRAM() ⇒ RouterType.LeastRAM case LeastMessages ⇒ RouterType.LeastMessages case LeastMessages() ⇒ RouterType.LeastMessages - case c: CustomRouter ⇒ throw new UnsupportedOperationException("routerTypeFor: " + c) + case c: CustomRouter ⇒ throw new UnsupportedOperationException("Unknown Router [" + c + "]") } def isReplicationAsync(strategy: ReplicationStrategy): Boolean = strategy match { @@ -245,8 +252,10 @@ object Deployer { private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = { val deployment_? = instance.lookupDeploymentFor(address) + if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_? else { + val newDeployment = try { lookupInConfig(address) @@ -255,6 +264,7 @@ object Deployer { EventHandler.error(e, this, e.getMessage) throw e } + newDeployment foreach { d ⇒ if (d eq null) { val e = new IllegalStateException("Deployment for address [" + address + "] is null") @@ -263,6 +273,7 @@ object Deployer { } deploy(d) // deploy and cache it } + newDeployment } } @@ -334,28 +345,30 @@ object Deployer { case Some(clusteredConfig) ⇒ // -------------------------------- - // akka.actor.deployment.
.clustered.home + // akka.actor.deployment..clustered.preferred-nodes // -------------------------------- - val home = clusteredConfig.getString("home", "") match { - case "" ⇒ Host("localhost") - case home ⇒ + val preferredNodes = clusteredConfig.getList("preferred-nodes") match { + case Nil ⇒ Vector(Host("localhost")) + case homes ⇒ def raiseHomeConfigError() = throw new ConfigurationException( "Config option [" + addressPath + - ".clustered.home] needs to be on format 'host:Supervisor.apply(..) like in:
+ * + * Supervisor supervisor = Supervisor.apply( + * SupervisorConfig( + * .. + * )) + *+ * @author Jonas Bonér */ object Supervisor { diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index 1e8dc035c4..3b92fd04a5 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -122,7 +122,7 @@ object NodeAddress { trait ClusterNode { import ChangeListener._ - val isConnected = new Switch(false) + val isConnected = new AtomicBoolean(false) private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]() @@ -136,7 +136,7 @@ trait ClusterNode { def remoteServerAddress: InetSocketAddress - def isRunning: Boolean = isConnected.isOn + def isRunning: Boolean = isConnected.get def start(): ClusterNode @@ -324,6 +324,11 @@ trait ClusterNode { */ def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] + /** + * Using (checking out) actor on a specific set of nodes. + */ + def useActorOnNodes(nodes: Array[String], actorAddress: String) + /** * Using (checking out) actor on all nodes in the cluster. */ diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index adf39d8fe6..2aadf0bd2d 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -31,7 +31,7 @@ import Helpers._ import akka.actor._ import Actor._ import Status._ -import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind } +import DeploymentConfig._ import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future } @@ -54,7 +54,6 @@ import com.eaio.uuid.UUID import com.google.protobuf.ByteString // FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down -// FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk /** * JMX MBean for the cluster service. @@ -131,11 +130,20 @@ object Cluster { val shouldCompressData = config.getBool("akka.cluster.use-compression", false) val enableJMX = config.getBool("akka.enable-jmx", true) val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt - val excludeRefNodeInReplicaSet = config.getBool("akka.cluster.exclude-ref-node-in-replica-set", true) + val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true) @volatile private var properties = Map.empty[String, String] + /** + * Use to override JVM options such as
-Dakka.cluster.nodename=node1 etc.
+ * Currently supported options are:
+ *
+ * Cluster setProperty ("akka.cluster.nodename", "node1")
+ * Cluster setProperty ("akka.cluster.hostname", "darkstar.lan")
+ * Cluster setProperty ("akka.cluster.port", "1234")
+ *
+ */
def setProperty(property: (String, String)) {
properties = properties + property
}
@@ -155,7 +163,7 @@ object Cluster {
case None ⇒ Config.remoteServerPort
}
- val defaultSerializer = new SerializableSerializer
+ val defaultZooKeeperSerializer = new SerializableSerializer
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
@@ -169,7 +177,7 @@ object Cluster {
*/
val node = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
- new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultSerializer)
+ new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultZooKeeperSerializer)
}
/**
@@ -230,7 +238,7 @@ object Cluster {
/**
* Creates a new AkkaZkClient.
*/
- def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
+ def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultZooKeeperSerializer)
def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
@@ -364,7 +372,8 @@ class DefaultClusterNode private[akka] (
private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
- conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
+ if (includeRefNodeInReplicaSet)
+ conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
conns
}
@@ -406,14 +415,14 @@ class DefaultClusterNode private[akka] (
// =======================================
def start(): ClusterNode = {
- isConnected switchOn {
+ if (isConnected.compareAndSet(false, true)) {
initializeNode()
}
this
}
def shutdown() {
- isConnected switchOff {
+ if (isConnected.compareAndSet(true, false)) {
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
locallyCachedMembershipNodes.clear()
@@ -633,9 +642,7 @@ class DefaultClusterNode private[akka] (
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
- serializer: Serializer): ClusterNode = if (isConnected.isOn) {
-
- import akka.serialization.ActorSerialization._
+ serializer: Serializer): ClusterNode = if (isConnected.get) {
val serializerClassName = serializer.getClass.getName
@@ -654,7 +661,7 @@ class DefaultClusterNode private[akka] (
// create ADDRESS -> Array[Byte] for actor registry
try {
- zkClient.writeData(actorAddressRegistryPath, actorFactoryBytes) // FIXME store actor factory bytes in Data Grid not ZooKeeper
+ zkClient.writeData(actorAddressRegistryPath, actorFactoryBytes)
} catch {
case e: ZkNoNodeException ⇒ // if not stored yet, store the actor
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
@@ -684,13 +691,7 @@ class DefaultClusterNode private[akka] (
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
}
- import RemoteClusterDaemon._
- val command = RemoteDaemonMessageProtocol.newBuilder
- .setMessageType(USE)
- .setActorAddress(actorAddress)
- .build
-
- nodeConnectionsForReplicationFactor(replicationFactor) foreach { connection ⇒ sendCommandToNode(connection, command, async = false) }
+ useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
this
} else throw new ClusterException("Not connected to cluster")
@@ -717,7 +718,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid clustered or not?
*/
- def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
+ def isClustered(actorAddress: String): Boolean = if (isConnected.get) {
zkClient.exists(actorAddressRegistryPathFor(actorAddress))
} else false
@@ -729,7 +730,7 @@ class DefaultClusterNode private[akka] (
/**
* Is the actor with uuid in use or not?
*/
- def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
+ def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.get) {
zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
} else false
@@ -743,13 +744,11 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
- def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.isOn) {
+ def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.get) {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
- // FIXME should not grab bytes from ZK but load the class and instantiate it with newInstance
-
val actorFactoryPath = actorAddressRegistryPathFor(actorAddress)
zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ ActorRef]]() {
def call: Either[Exception, () ⇒ ActorRef] = {
@@ -824,41 +823,40 @@ class DefaultClusterNode private[akka] (
} else None
/**
- * Using (checking out) actor on all nodes in the cluster.
+ * Using (checking out) actor on a specific set of nodes.
*/
- def useActorOnAllNodes(actorAddress: String) {
- isConnected ifOn {
- EventHandler.debug(this,
- "Using (checking out) actor with address [%s] on all nodes in cluster".format(actorAddress))
+ def useActorOnNodes(nodes: Array[String], actorAddress: String) {
+ EventHandler.debug(this,
+ "Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
+
+ if (isConnected.get) {
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
.build
- nodeConnections.get(node) foreach {
- case (_, connection) ⇒ sendCommandToNode(connection, command, async = false)
+ nodes foreach { node ⇒
+ nodeConnections.get(node) foreach {
+ case (_, connection) ⇒
+ sendCommandToNode(connection, command, async = false)
+ }
}
}
}
+ /**
+ * Using (checking out) actor on all nodes in the cluster.
+ */
+ def useActorOnAllNodes(actorAddress: String) {
+ useActorOnNodes(membershipNodes, actorAddress)
+ }
+
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(node: String, actorAddress: String) {
- EventHandler.debug(this,
- "Sending command to node [%s] for checking out actor [%s]".format(node, actorAddress))
-
- isConnected ifOn {
- nodeConnections.get(node) foreach {
- case (_, connection) ⇒
- val command = RemoteDaemonMessageProtocol.newBuilder
- .setMessageType(USE)
- .setActorAddress(actorAddress)
- .build
- sendCommandToNode(connection, command, async = false)
- }
- }
+ useActorOnNodes(Array(node), actorAddress)
}
/**
@@ -875,7 +873,7 @@ class DefaultClusterNode private[akka] (
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
- isConnected ifOn {
+ if (isConnected.get) {
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
uuidsForActorAddress(actorAddress) foreach { uuid ⇒
@@ -892,13 +890,13 @@ class DefaultClusterNode private[akka] (
* Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'.
*/
private[akka] def releaseActorOnAllNodes(actorAddress: String) {
- isConnected ifOn {
+ if (isConnected.get) {
EventHandler.debug(this,
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(RELEASE)
- .setActorAddress(actorAddress) // FIXME rename to actorAddress in protobuf protocol
+ .setActorAddress(actorAddress)
.build
nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒
@@ -912,7 +910,7 @@ class DefaultClusterNode private[akka] (
/**
* Creates an ActorRef with a Router to a set of clustered actors.
*/
- def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
+ def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.get) {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
@@ -936,7 +934,7 @@ class DefaultClusterNode private[akka] (
*/
def migrate(
from: NodeAddress, to: NodeAddress, actorAddress: String) {
- isConnected ifOn {
+ if (isConnected.get) {
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
if (isInUseOnNode(actorAddress, from)) {
@@ -960,7 +958,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors registered in this cluster.
*/
- private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
+ private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.get) {
zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
} else Array.empty[UUID]
@@ -972,7 +970,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor id for the actor with a specific UUID.
*/
- private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
+ private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.get) {
try {
Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String])
} catch {
@@ -989,7 +987,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the actor UUIDs for actor ID.
*/
- private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
+ private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.get) {
try {
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map {
case c: CharSequence ⇒ new UUID(c)
@@ -1002,7 +1000,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the node names of all actors in use with UUID.
*/
- private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) {
+ private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.get) {
try {
zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]]
} catch {
@@ -1013,7 +1011,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the UUIDs of all actors in use registered on a specific node.
*/
- private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
+ private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.get) {
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
case c: CharSequence ⇒ new UUID(c)
@@ -1026,7 +1024,7 @@ class DefaultClusterNode private[akka] (
/**
* Returns the addresses of all actors in use registered on a specific node.
*/
- def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
+ def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.get) {
val uuids =
try {
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
@@ -1042,8 +1040,6 @@ class DefaultClusterNode private[akka] (
* Returns Serializer for actor with specific address.
*/
def serializerForActor(actorAddress: String): Serializer = {
- // FIXME should only be 1 single class name per actor address - FIX IT
-
val serializerClassName =
try {
zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String]
@@ -1276,34 +1272,68 @@ class DefaultClusterNode private[akka] (
}
/**
- * Returns a random set with replica connections of size 'replicationFactor'.
- * Default replicationFactor is 0, which returns the empty set.
+ * Returns a random set with node names of size 'replicationFactor'.
+ * Default replicationFactor is 0, which returns the empty Vector.
*/
- private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = {
- var replicas = HashSet.empty[ActorRef]
- if (replicationFactor < 1) return replicas
+ private def nodesForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Vector[String] = {
+ var replicaNames = Vector.empty[String]
+ val nrOfClusterNodes = nodeConnections.size
- val numberOfReplicas = nodeConnections.size
- val nodeConnectionsAsArray = nodeConnections.toList map {
- case (node, (address, actorRef)) ⇒ actorRef
- } // the ActorRefs
+ if (replicationFactor < 1) return replicaNames
+ if (nrOfClusterNodes < replicationFactor) throw new IllegalArgumentException(
+ "Replication factor [" + replicationFactor +
+ "] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]")
- if (numberOfReplicas < replicationFactor) {
- throw new IllegalArgumentException(
- "Replication factor [" + replicationFactor +
- "] is greater than the number of available nodes [" + numberOfReplicas + "]")
- } else if (numberOfReplicas == replicationFactor) {
- replicas = replicas ++ nodeConnectionsAsArray
- } else {
- val random = new java.util.Random(System.currentTimeMillis)
- while (replicas.size < replicationFactor) {
- val index = random.nextInt(numberOfReplicas)
- replicas = replicas + nodeConnectionsAsArray(index)
+ val preferredNodes =
+ if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor
+ Deployer.deploymentFor(actorAddress.get) match {
+ case Deploy(_, _, _, Clustered(nodes, _, _)) ⇒
+ nodes map (node ⇒ Deployer.nodeNameFor(node)) take replicationFactor
+ case _ ⇒
+ throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
+ }
+ } else Vector.empty[String]
+
+ for {
+ nodeName ← preferredNodes
+ key ← nodeConnections.keys
+ if key == nodeName
+ } replicaNames = replicaNames :+ nodeName
+
+ val nrOfCurrentReplicaNames = replicaNames.size
+
+ val replicaSet =
+ if (nrOfCurrentReplicaNames > replicationFactor) throw new IllegalStateException("Replica set is larger than replication factor")
+ else if (nrOfCurrentReplicaNames == replicationFactor) replicaNames
+ else {
+ val random = new java.util.Random(System.currentTimeMillis)
+ while (replicaNames.size < replicationFactor) {
+ replicaNames = replicaNames :+ membershipNodes(random.nextInt(nrOfClusterNodes))
+ }
+ replicaNames
}
- }
- replicas
+
+ EventHandler.debug(this,
+ "Picked out replica set [%s] for actor [%s]".format(replicaSet.mkString(", "), actorAddress))
+
+ replicaSet
}
+ /**
+ * Returns a random set with replica connections of size 'replicationFactor'.
+ * Default replicationFactor is 0, which returns the empty Vector.
+ */
+ private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Vector[ActorRef] = {
+ for {
+ node ← nodesForReplicationFactor(replicationFactor, actorAddress)
+ connectionOption ← nodeConnections.get(node)
+ connection ← connectionOption
+ actorRef ← connection._2
+ } yield actorRef
+ }
+
+ private val connectToAllNewlyArrivedMembershipNodesInClusterLock = new AtomicBoolean(false)
+
/**
* Update the list of connections to other nodes in the cluster.
*
@@ -1311,7 +1341,7 @@ class DefaultClusterNode private[akka] (
*/
private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
newlyConnectedMembershipNodes: Traversable[String],
- newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = { // to prevent race in startup (fetchMembershipNodes vs MembershipChildListener)
+ newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
// cache the disconnected connections in a map, needed for fail-over of these connections later
var disconnectedConnections = Map.empty[String, InetSocketAddress]
@@ -1319,25 +1349,29 @@ class DefaultClusterNode private[akka] (
disconnectedConnections += (node -> (nodeConnections(node) match { case (address, _) ⇒ address }))
}
- // remove connections to failed nodes
- newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
+ if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
+ try {
+ // remove connections to failed nodes
+ newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
- // add connections newly arrived nodes
- newlyConnectedMembershipNodes foreach { node ⇒
- if (!nodeConnections.contains(node)) { // only connect to each replica once
+ // add connections newly arrived nodes
+ newlyConnectedMembershipNodes foreach { node ⇒
+ if (!nodeConnections.contains(node)) { // only connect to each replica once
- val addressOption = remoteSocketAddressForNode(node)
- if (addressOption.isDefined) {
- val address = addressOption.get
+ remoteSocketAddressForNode(node) foreach { address ⇒
+ EventHandler.debug(this,
+ "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
- EventHandler.debug(this,
- "Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
-
- val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort).start()
- nodeConnections.put(node, (address, clusterDaemon))
+ val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort).start()
+ nodeConnections.put(node, (address, clusterDaemon))
+ }
+ }
}
+ } finally {
+ connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false)
}
}
+
disconnectedConnections
}
@@ -1530,7 +1564,7 @@ class DefaultClusterNode private[akka] (
override def resign(): Unit = self.resign()
- override def isConnected = self.isConnected.isOn
+ override def isConnected = self.isConnected.get
override def getRemoteServerHostname = self.hostname
@@ -1661,7 +1695,7 @@ object RemoteClusterDaemon {
val ADDRESS = "akka-cluster-daemon".intern
// FIXME configure computeGridDispatcher to what?
- val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
+ val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build
}
/**
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
index 93c58d1f32..55e1fb2c33 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
@@ -52,7 +52,7 @@ object ClusterDeployer {
Cluster.zooKeeperServers,
Cluster.sessionTimeout,
Cluster.connectionTimeout,
- Cluster.defaultSerializer)
+ Cluster.defaultZooKeeperSerializer)
private val deploymentInProgressLockListener = new LockListener {
def lockAcquired() {
diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
index 89a9c811d9..510fd9415e 100644
--- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
@@ -344,7 +344,7 @@ object TransactionLog {
Cluster.zooKeeperServers,
Cluster.sessionTimeout,
Cluster.connectionTimeout,
- Cluster.defaultSerializer)
+ Cluster.defaultZooKeeperSerializer)
try {
zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
diff --git a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
index c168de4022..42df10ee63 100644
--- a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
@@ -7,8 +7,6 @@ import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
-//import akka.event.EventHandler
-
/**
* ZooKeeper client. Holds the ZooKeeper connection and manages its session.
*/
@@ -17,7 +15,6 @@ class AkkaZkClient(zkServers: String,
connectionTimeout: Int,
zkSerializer: ZkSerializer = new SerializableSerializer)
extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) {
- // EventHandler.debug(this, "Connecting to ZooKeeper ensamble [%s]" format zkServers)
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
diff --git a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala
index 1c1be57a0c..06e201497c 100644
--- a/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala
+++ b/akka-cluster/src/test/scala/akka/cluster/api/migration/explicit/MigrationExplicitMultiJvmSpec.scala
@@ -17,7 +17,6 @@ import akka.config.Config
import akka.serialization.Serialization
import java.util.concurrent._
-
/*
object MigrationExplicitMultiJvmSpec {
var NrOfNodes = 2
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
similarity index 50%
rename from akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode1.conf
rename to akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
index 3dbd80a663..44cfd2f725 100644
--- a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode1.conf
+++ b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.conf
@@ -1,4 +1,4 @@
-akka.event-handler-level = "INFO"
+akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.home = "node:node1"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["host:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
\ No newline at end of file
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode1.opts
rename to akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode1.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf
similarity index 50%
rename from akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode2.conf
rename to akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf
index 3dbd80a663..7b150cfb06 100644
--- a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode2.conf
+++ b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.conf
@@ -1,4 +1,4 @@
-akka.event-handler-level = "INFO"
+akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.home = "node:node1"
+akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
akka.actor.deployment.service-hello.clustered.replicas = 1
\ No newline at end of file
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.opts
similarity index 100%
rename from akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmNode2.opts
rename to akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmNode2.opts
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmSpec.scala
new file mode 100644
index 0000000000..e4aae69f8f
--- /dev/null
+++ b/akka-cluster/src/test/scala/akka/cluster/routing/homenode/HomeNodeMultiJvmSpec.scala
@@ -0,0 +1,59 @@
+package akka.cluster.routing.homenode
+
+import akka.config.Config
+import akka.actor.{ ActorRef, Actor }
+import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
+import Cluster._
+
+object HomeNodeMultiJvmSpec {
+
+ val NrOfNodes = 2
+
+ class SomeActor extends Actor with Serializable {
+ def receive = {
+ case "identify" ⇒ {
+ self.reply(Config.nodename)
+ }
+ }
+ }
+}
+
+class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
+
+ import HomeNodeMultiJvmSpec._
+
+ val testNodes = NrOfNodes
+
+ "A Router" must {
+ "obey 'home-node' config option when instantiated actor in cluster" in {
+
+ node.start()
+ barrier("waiting-for-begin", NrOfNodes).await()
+
+ barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
+
+ node.shutdown()
+ }
+ }
+}
+
+class HomeNodeMultiJvmNode2 extends ClusterTestNode {
+
+ import HomeNodeMultiJvmSpec._
+
+ "A Router" must {
+ "obey 'home-node' config option when instantiated actor in cluster" in {
+
+ node.start()
+ barrier("waiting-for-begin", NrOfNodes).await()
+
+ barrier("get-ref-to-actor-on-node2", NrOfNodes) {
+ val actor = Actor.actorOf[SomeActor]("service-hello")
+ val name = (actor ? "identify").get.asInstanceOf[String]
+ name must equal("node1")
+ }
+
+ node.shutdown()
+ }
+ }
+}
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmSpec.scala
deleted file mode 100644
index 7f755339b5..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/routing/routing_identity_problem/RoutingIdentityProblemMultiJvmSpec.scala
+++ /dev/null
@@ -1,68 +0,0 @@
-package akka.cluster.routing.routing_identity_problem
-
-import akka.config.Config
-import akka.actor.{ ActorRef, Actor }
-import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
-
-object RoutingIdentityProblemMultiJvmSpec {
-
- val NrOfNodes = 2
-
- class SomeActor extends Actor with Serializable {
- println("---------------------------------------------------------------------------")
- println("SomeActor has been created on node [" + Config.nodename + "]")
- println("---------------------------------------------------------------------------")
-
- def receive = {
- case "identify" ⇒ {
- println("The node received the 'identify' command: " + Config.nodename)
- self.reply(Config.nodename)
- }
- }
- }
-}
-
-class RoutingIdentityProblemMultiJvmNode1 extends MasterClusterTestNode {
-
- import RoutingIdentityProblemMultiJvmSpec._
-
- val testNodes = NrOfNodes
-
- "foo" must {
- "bla" in {
- Cluster.node.start()
-
- Cluster.barrier("waiting-for-begin", NrOfNodes).await()
-
- var hello: ActorRef = null
- Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
- hello = Actor.actorOf[SomeActor]("service-hello")
- }
-
- Cluster.barrier("waiting-to-end", NrOfNodes).await()
- Cluster.node.shutdown()
- }
- }
-}
-
-class RoutingIdentityProblemMultiJvmNode2 extends ClusterTestNode {
-
- import RoutingIdentityProblemMultiJvmSpec._
-
- "foo" must {
- "bla" in {
- Cluster.node.start()
- Cluster.barrier("waiting-for-begin", NrOfNodes).await()
-
- Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
-
- val actor = Actor.actorOf[SomeActor]("service-hello")
- val name: String = (actor ? "identify").get.asInstanceOf[String]
- //todo: Jonas: this is the line that needs to be uncommented to get the test to fail.
- //name must equal("node1")
-
- Cluster.barrier("waiting-to-end", NrOfNodes).await()
- Cluster.node.shutdown()
- }
- }
-}
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode1.conf
deleted file mode 100644
index f3a3da248a..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode1.conf
+++ /dev/null
@@ -1,4 +0,0 @@
-akka.event-handler-level = "DEBUG"
-akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.home = "node:node1"
-akka.actor.deployment.service-hello.clustered.replicas = 2
\ No newline at end of file
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode1.opts
deleted file mode 100644
index a88c260d8c..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode1.opts
+++ /dev/null
@@ -1 +0,0 @@
--Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode2.conf
deleted file mode 100644
index 746f608425..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode2.conf
+++ /dev/null
@@ -1,4 +0,0 @@
-akka.event-handler-level = "DEBUG"
-akka.actor.deployment.service-hello.router = "round-robin"
-akka.actor.deployment.service-hello.clustered.home = "node:node2"
-akka.actor.deployment.service-hello.clustered.replicas = 2
\ No newline at end of file
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode2.opts
deleted file mode 100644
index f1e01f253d..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmNode2.opts
+++ /dev/null
@@ -1 +0,0 @@
--Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
diff --git a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmSpec.scala
deleted file mode 100644
index b99b7c671b..0000000000
--- a/akka-cluster/src/test/scala/akka/cluster/routing/use_homenode_as_replica/UseHomeNodeAsReplicaMultiJvmSpec.scala
+++ /dev/null
@@ -1,70 +0,0 @@
-package akka.cluster.routing.use_homenode_as_replica
-
-import org.scalatest.matchers.MustMatchers
-import akka.config.Config
-import org.scalatest.{ BeforeAndAfterAll, WordSpec }
-import akka.cluster._
-import Cluster._
-import akka.actor.{ ActorRef, Actor }
-
-object UseHomeNodeAsReplicaMultiJvmSpec {
- val NrOfNodes = 2
-
- class HelloWorld extends Actor with Serializable {
- def receive = {
- case x: String ⇒ {
- println("Hello message was received")
- }
- }
- }
-}
-
-class UseHomeNodeAsReplicaMultiJvmNode1 extends MasterClusterTestNode {
-
- import UseHomeNodeAsReplicaMultiJvmSpec._
-
- val testNodes = NrOfNodes
-
- "foo" must {
- "bla" in {
- println("Node 1 has started")
-
- barrier("start-node1", NrOfNodes) {
- node.start()
- }
-
- barrier("start-node2", NrOfNodes) {}
-
- println("Getting reference to service-hello actor")
- var hello: ActorRef = null
- barrier("get-ref-to-actor-on-node2", NrOfNodes) {
- hello = Actor.actorOf[HelloWorld]("service-hello")
- }
-
- println("Saying hello to actor")
- hello ! "say hello"
- node.shutdown()
- }
- }
-}
-
-class UseHomeNodeAsReplicaMultiJvmNode2 extends ClusterTestNode {
-
- import UseHomeNodeAsReplicaMultiJvmSpec._
- "foo" must {
- "bla" in {
- println("Waiting for Node 1 to start")
- barrier("start-node1", NrOfNodes) {}
-
- println("Waiting for himself to start???")
- barrier("start-node2", NrOfNodes) {
- node.start()
- }
-
- barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
-
- println("Shutting down JVM Node 2")
- node.shutdown()
- }
- }
-}
diff --git a/akka-docs/java/fault-tolerance.rst b/akka-docs/java/fault-tolerance.rst
index b89b3978b4..512e914d2b 100644
--- a/akka-docs/java/fault-tolerance.rst
+++ b/akka-docs/java/fault-tolerance.rst
@@ -117,7 +117,7 @@ The Actor’s supervision can be declaratively defined by creating a ‘Supervis
import static akka.config.Supervision.*;
import static akka.actor.Actors.*;
- Supervisor supervisor = new Supervisor(
+ Supervisor supervisor = Supervisor.apply(
new SupervisorConfig(
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
new Supervise[] {
@@ -141,13 +141,14 @@ MaximumNumberOfRestartsWithinTimeRangeReached message.
import static akka.actor.Actors.*;
import akka.event.JavaEventHandler;
- Procedure2