Merge branch 'master' of github.com:bwmcadams/akka
This commit is contained in:
commit
7eecff5963
58 changed files with 336 additions and 336 deletions
|
|
@ -20,7 +20,7 @@ class DeployerSpec extends WordSpec with MustMatchers {
|
|||
LeastCPU,
|
||||
"akka.serialization.Format$Default$",
|
||||
Clustered(
|
||||
Node("node1"),
|
||||
Vector(Node("node1")),
|
||||
Replicate(3),
|
||||
Replication(
|
||||
TransactionLog,
|
||||
|
|
|
|||
|
|
@ -144,6 +144,7 @@ object Actor extends ListenerManagement {
|
|||
def this(timeout: Long) = this(Duration(timeout, TimeUnit.MILLISECONDS))
|
||||
def this(length: Long, unit: TimeUnit) = this(Duration(length, unit))
|
||||
}
|
||||
|
||||
object Timeout {
|
||||
def apply(timeout: Long) = new Timeout(timeout)
|
||||
def apply(length: Long, unit: TimeUnit) = new Timeout(length, unit)
|
||||
|
|
@ -443,7 +444,7 @@ object Actor extends ListenerManagement {
|
|||
case Deploy(
|
||||
configAdress, router, serializerClassName,
|
||||
Clustered(
|
||||
home,
|
||||
preferredHomeNodes,
|
||||
replicas,
|
||||
replication)) ⇒
|
||||
|
||||
|
|
@ -454,7 +455,7 @@ object Actor extends ListenerManagement {
|
|||
if (!Actor.remote.isRunning) throw new IllegalStateException(
|
||||
"Remote server is not running")
|
||||
|
||||
val isHomeNode = DeploymentConfig.isHomeNode(home)
|
||||
val isHomeNode = preferredHomeNodes exists (home ⇒ DeploymentConfig.isHomeNode(home))
|
||||
val nrOfReplicas = DeploymentConfig.replicaValueFor(replicas)
|
||||
|
||||
def serializerErrorDueTo(reason: String) =
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ object DeploymentConfig {
|
|||
case class Deploy(
|
||||
address: String,
|
||||
routing: Routing = Direct,
|
||||
format: String = Serializer.defaultSerializerName, // Format.defaultSerializerName,
|
||||
format: String = Serializer.defaultSerializerName,
|
||||
scope: Scope = Local)
|
||||
|
||||
// --------------------------------
|
||||
|
|
@ -61,7 +61,7 @@ object DeploymentConfig {
|
|||
// --------------------------------
|
||||
sealed trait Scope
|
||||
case class Clustered(
|
||||
home: Home = Host("localhost"),
|
||||
preferredNodes: Iterable[Home] = Vector(Host("localhost")),
|
||||
replicas: Replicas = NoReplicas,
|
||||
replication: ReplicationScheme = Transient) extends Scope
|
||||
|
||||
|
|
@ -139,12 +139,19 @@ object DeploymentConfig {
|
|||
// --- Helper methods for parsing
|
||||
// --------------------------------
|
||||
|
||||
def isHomeNode(home: Home): Boolean = home match {
|
||||
case Host(hostname) ⇒ hostname == Config.hostname
|
||||
case IP(address) ⇒ address == "0.0.0.0" || address == "127.0.0.1" // FIXME look up IP address from the system
|
||||
case Node(nodename) ⇒ nodename == Config.nodename
|
||||
def nodeNameFor(home: Home): String = {
|
||||
home match {
|
||||
case Node(nodename) ⇒ nodename
|
||||
case Host("localhost") ⇒ Config.nodename
|
||||
case IP("0.0.0.0") ⇒ Config.nodename
|
||||
case IP("127.0.0.1") ⇒ Config.nodename
|
||||
case Host(hostname) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'hostname' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
|
||||
case IP(address) ⇒ throw new UnsupportedOperationException("Specifying preferred node name by 'IP address' is not yet supported. Use the node name like: preferred-nodes = [\"node:node1\"]")
|
||||
}
|
||||
}
|
||||
|
||||
def isHomeNode(home: Home): Boolean = nodeNameFor(home) == Config.nodename
|
||||
|
||||
def replicaValueFor(replicas: Replicas): Int = replicas match {
|
||||
case Replicate(replicas) ⇒ replicas
|
||||
case AutoReplicate ⇒ -1
|
||||
|
|
@ -166,7 +173,7 @@ object DeploymentConfig {
|
|||
case LeastRAM() ⇒ RouterType.LeastRAM
|
||||
case LeastMessages ⇒ RouterType.LeastMessages
|
||||
case LeastMessages() ⇒ RouterType.LeastMessages
|
||||
case c: CustomRouter ⇒ throw new UnsupportedOperationException("routerTypeFor: " + c)
|
||||
case c: CustomRouter ⇒ throw new UnsupportedOperationException("Unknown Router [" + c + "]")
|
||||
}
|
||||
|
||||
def isReplicationAsync(strategy: ReplicationStrategy): Boolean = strategy match {
|
||||
|
|
@ -245,8 +252,10 @@ object Deployer {
|
|||
|
||||
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
|
||||
val deployment_? = instance.lookupDeploymentFor(address)
|
||||
|
||||
if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_?
|
||||
else {
|
||||
|
||||
val newDeployment =
|
||||
try {
|
||||
lookupInConfig(address)
|
||||
|
|
@ -255,6 +264,7 @@ object Deployer {
|
|||
EventHandler.error(e, this, e.getMessage)
|
||||
throw e
|
||||
}
|
||||
|
||||
newDeployment foreach { d ⇒
|
||||
if (d eq null) {
|
||||
val e = new IllegalStateException("Deployment for address [" + address + "] is null")
|
||||
|
|
@ -263,6 +273,7 @@ object Deployer {
|
|||
}
|
||||
deploy(d) // deploy and cache it
|
||||
}
|
||||
|
||||
newDeployment
|
||||
}
|
||||
}
|
||||
|
|
@ -334,28 +345,30 @@ object Deployer {
|
|||
case Some(clusteredConfig) ⇒
|
||||
|
||||
// --------------------------------
|
||||
// akka.actor.deployment.<address>.clustered.home
|
||||
// akka.actor.deployment.<address>.clustered.preferred-nodes
|
||||
// --------------------------------
|
||||
|
||||
val home = clusteredConfig.getString("home", "") match {
|
||||
case "" ⇒ Host("localhost")
|
||||
case home ⇒
|
||||
val preferredNodes = clusteredConfig.getList("preferred-nodes") match {
|
||||
case Nil ⇒ Vector(Host("localhost"))
|
||||
case homes ⇒
|
||||
def raiseHomeConfigError() = throw new ConfigurationException(
|
||||
"Config option [" + addressPath +
|
||||
".clustered.home] needs to be on format 'host:<hostname>', 'ip:<ip address>'' or 'node:<node name>', was [" +
|
||||
home + "]")
|
||||
".clustered.preferred-nodes] needs to be a list with elements on format\n'host:<hostname>', 'ip:<ip address>' or 'node:<node name>', was [" +
|
||||
homes + "]")
|
||||
|
||||
if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
|
||||
homes map { home ⇒
|
||||
if (!(home.startsWith("host:") || home.startsWith("node:") || home.startsWith("ip:"))) raiseHomeConfigError()
|
||||
|
||||
val tokenizer = new java.util.StringTokenizer(home, ":")
|
||||
val protocol = tokenizer.nextElement
|
||||
val address = tokenizer.nextElement.asInstanceOf[String]
|
||||
val tokenizer = new java.util.StringTokenizer(home, ":")
|
||||
val protocol = tokenizer.nextElement
|
||||
val address = tokenizer.nextElement.asInstanceOf[String]
|
||||
|
||||
protocol match {
|
||||
case "host" ⇒ Host(address)
|
||||
case "node" ⇒ Node(address)
|
||||
case "ip" ⇒ IP(address)
|
||||
case _ ⇒ raiseHomeConfigError()
|
||||
protocol match {
|
||||
case "host" ⇒ Host(address)
|
||||
case "node" ⇒ Node(address)
|
||||
case "ip" ⇒ IP(address)
|
||||
case _ ⇒ raiseHomeConfigError()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -382,7 +395,7 @@ object Deployer {
|
|||
// --------------------------------
|
||||
clusteredConfig.getSection("replication") match {
|
||||
case None ⇒
|
||||
Some(Deploy(address, router, format, Clustered(home, replicas, Transient)))
|
||||
Some(Deploy(address, router, format, Clustered(preferredNodes, replicas, Transient)))
|
||||
|
||||
case Some(replicationConfig) ⇒
|
||||
val storage = replicationConfig.getString("storage", "transaction-log") match {
|
||||
|
|
@ -401,7 +414,7 @@ object Deployer {
|
|||
".clustered.replication.strategy] needs to be either [\"write-through\"] or [\"write-behind\"] - was [" +
|
||||
unknown + "]")
|
||||
}
|
||||
Some(Deploy(address, router, format, Clustered(home, replicas, Replication(storage, strategy))))
|
||||
Some(Deploy(address, router, format, Clustered(preferredNodes, replicas, Replication(storage, strategy))))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -41,6 +41,14 @@ class SupervisorException private[akka] (message: String, cause: Throwable = nul
|
|||
* supervisor.unlink(child)
|
||||
* </pre>
|
||||
*
|
||||
* If you are using it from Java you have to use <code>Supervisor.apply(..)</code> like in:
|
||||
* <pre>
|
||||
* Supervisor supervisor = Supervisor.apply(
|
||||
* SupervisorConfig(
|
||||
* ..
|
||||
* ))
|
||||
* </pre>
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Supervisor {
|
||||
|
|
|
|||
|
|
@ -122,7 +122,7 @@ object NodeAddress {
|
|||
trait ClusterNode {
|
||||
import ChangeListener._
|
||||
|
||||
val isConnected = new Switch(false)
|
||||
val isConnected = new AtomicBoolean(false)
|
||||
|
||||
private[cluster] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
|
||||
|
||||
|
|
@ -136,7 +136,7 @@ trait ClusterNode {
|
|||
|
||||
def remoteServerAddress: InetSocketAddress
|
||||
|
||||
def isRunning: Boolean = isConnected.isOn
|
||||
def isRunning: Boolean = isConnected.get
|
||||
|
||||
def start(): ClusterNode
|
||||
|
||||
|
|
@ -324,6 +324,11 @@ trait ClusterNode {
|
|||
*/
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef]
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific set of nodes.
|
||||
*/
|
||||
def useActorOnNodes(nodes: Array[String], actorAddress: String)
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on all nodes in the cluster.
|
||||
*/
|
||||
|
|
@ -500,8 +505,6 @@ trait ClusterNode {
|
|||
|
||||
private[cluster] def remoteSocketAddressForNode(node: String): Option[InetSocketAddress]
|
||||
|
||||
private[cluster] def createActorsAtAddressPath()
|
||||
|
||||
private[cluster] def membershipPathFor(node: String): String
|
||||
private[cluster] def configurationPathFor(key: String): String
|
||||
|
||||
|
|
|
|||
|
|
@ -96,6 +96,8 @@ object Config {
|
|||
|
||||
val TIME_UNIT = config.getString("akka.time-unit", "seconds")
|
||||
|
||||
val isClusterEnabled = config.getList("akka.enabled-modules").exists(_ == "cluster")
|
||||
|
||||
lazy val nodename = System.getProperty("akka.cluster.nodename") match {
|
||||
case null | "" ⇒ new UUID().toString
|
||||
case value ⇒ value
|
||||
|
|
|
|||
|
|
@ -30,12 +30,12 @@ object ReflectiveAccess {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object ClusterModule {
|
||||
lazy val isEnabled = clusterInstance.isDefined
|
||||
lazy val isEnabled = Config.isClusterEnabled && clusterInstance.isDefined
|
||||
|
||||
def ensureEnabled() {
|
||||
if (!isEnabled) {
|
||||
val e = new ModuleNotAvailableException(
|
||||
"Can't load the cluster module, make sure that akka-cluster.jar is on the classpath")
|
||||
"Can't load the cluster module, make sure it is enabled in the config ('akka.enabled-modules = [\"cluster\"])' and that akka-cluster.jar is on the classpath")
|
||||
EventHandler.debug(this, e.toString)
|
||||
throw e
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ import Helpers._
|
|||
import akka.actor._
|
||||
import Actor._
|
||||
import Status._
|
||||
import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind }
|
||||
import DeploymentConfig._
|
||||
|
||||
import akka.event.EventHandler
|
||||
import akka.dispatch.{ Dispatchers, Future }
|
||||
|
|
@ -54,7 +54,6 @@ import com.eaio.uuid.UUID
|
|||
import com.google.protobuf.ByteString
|
||||
|
||||
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
|
||||
// FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk
|
||||
|
||||
/**
|
||||
* JMX MBean for the cluster service.
|
||||
|
|
@ -131,11 +130,20 @@ object Cluster {
|
|||
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
|
||||
val enableJMX = config.getBool("akka.enable-jmx", true)
|
||||
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
|
||||
val excludeRefNodeInReplicaSet = config.getBool("akka.cluster.exclude-ref-node-in-replica-set", true)
|
||||
val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
|
||||
|
||||
@volatile
|
||||
private var properties = Map.empty[String, String]
|
||||
|
||||
/**
|
||||
* Use to override JVM options such as <code>-Dakka.cluster.nodename=node1</code> etc.
|
||||
* Currently supported options are:
|
||||
* <pre>
|
||||
* Cluster setProperty ("akka.cluster.nodename", "node1")
|
||||
* Cluster setProperty ("akka.cluster.hostname", "darkstar.lan")
|
||||
* Cluster setProperty ("akka.cluster.port", "1234")
|
||||
* </pre>
|
||||
*/
|
||||
def setProperty(property: (String, String)) {
|
||||
properties = properties + property
|
||||
}
|
||||
|
|
@ -155,7 +163,7 @@ object Cluster {
|
|||
case None ⇒ Config.remoteServerPort
|
||||
}
|
||||
|
||||
val defaultSerializer = new SerializableSerializer
|
||||
val defaultZooKeeperSerializer = new SerializableSerializer
|
||||
|
||||
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
|
||||
|
||||
|
|
@ -169,7 +177,7 @@ object Cluster {
|
|||
*/
|
||||
val node = {
|
||||
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
|
||||
new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultSerializer)
|
||||
new DefaultClusterNode(nodeAddress, hostname, port, zooKeeperServers, defaultZooKeeperSerializer)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -230,7 +238,7 @@ object Cluster {
|
|||
/**
|
||||
* Creates a new AkkaZkClient.
|
||||
*/
|
||||
def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
|
||||
def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultZooKeeperSerializer)
|
||||
|
||||
def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
|
||||
|
||||
|
|
@ -364,7 +372,8 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
private[akka] val nodeConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] = {
|
||||
val conns = new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
|
||||
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
|
||||
if (includeRefNodeInReplicaSet)
|
||||
conns.put(nodeAddress.nodeName, (remoteServerAddress, remoteDaemon)) // add the remote connection to 'this' node as well, but as a 'local' actor
|
||||
conns
|
||||
}
|
||||
|
||||
|
|
@ -406,14 +415,14 @@ class DefaultClusterNode private[akka] (
|
|||
// =======================================
|
||||
|
||||
def start(): ClusterNode = {
|
||||
isConnected switchOn {
|
||||
if (isConnected.compareAndSet(false, true)) {
|
||||
initializeNode()
|
||||
}
|
||||
this
|
||||
}
|
||||
|
||||
def shutdown() {
|
||||
isConnected switchOff {
|
||||
if (isConnected.compareAndSet(true, false)) {
|
||||
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
|
||||
|
||||
locallyCachedMembershipNodes.clear()
|
||||
|
|
@ -633,9 +642,7 @@ class DefaultClusterNode private[akka] (
|
|||
replicationFactor: Int,
|
||||
replicationScheme: ReplicationScheme,
|
||||
serializeMailbox: Boolean,
|
||||
serializer: Serializer): ClusterNode = if (isConnected.isOn) {
|
||||
|
||||
import akka.serialization.ActorSerialization._
|
||||
serializer: Serializer): ClusterNode = if (isConnected.get) {
|
||||
|
||||
val serializerClassName = serializer.getClass.getName
|
||||
|
||||
|
|
@ -654,7 +661,7 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
// create ADDRESS -> Array[Byte] for actor registry
|
||||
try {
|
||||
zkClient.writeData(actorAddressRegistryPath, actorFactoryBytes) // FIXME store actor factory bytes in Data Grid not ZooKeeper
|
||||
zkClient.writeData(actorAddressRegistryPath, actorFactoryBytes)
|
||||
} catch {
|
||||
case e: ZkNoNodeException ⇒ // if not stored yet, store the actor
|
||||
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
|
||||
|
|
@ -684,13 +691,7 @@ class DefaultClusterNode private[akka] (
|
|||
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
|
||||
}
|
||||
|
||||
import RemoteClusterDaemon._
|
||||
val command = RemoteDaemonMessageProtocol.newBuilder
|
||||
.setMessageType(USE)
|
||||
.setActorAddress(actorAddress)
|
||||
.build
|
||||
|
||||
nodeConnectionsForReplicationFactor(replicationFactor) foreach { connection ⇒ sendCommandToNode(connection, command, async = false) }
|
||||
useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
|
||||
|
||||
this
|
||||
} else throw new ClusterException("Not connected to cluster")
|
||||
|
|
@ -717,7 +718,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Is the actor with uuid clustered or not?
|
||||
*/
|
||||
def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
|
||||
def isClustered(actorAddress: String): Boolean = if (isConnected.get) {
|
||||
zkClient.exists(actorAddressRegistryPathFor(actorAddress))
|
||||
} else false
|
||||
|
||||
|
|
@ -729,7 +730,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Is the actor with uuid in use or not?
|
||||
*/
|
||||
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
|
||||
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.get) {
|
||||
zkClient.exists(actorAddressToNodesPathFor(actorAddress, node.nodeName))
|
||||
} else false
|
||||
|
||||
|
|
@ -743,13 +744,11 @@ class DefaultClusterNode private[akka] (
|
|||
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
||||
* for remote access through lookup by its UUID.
|
||||
*/
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.isOn) {
|
||||
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.get) {
|
||||
val nodeName = nodeAddress.nodeName
|
||||
|
||||
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
|
||||
|
||||
// FIXME should not grab bytes from ZK but load the class and instantiate it with newInstance
|
||||
|
||||
val actorFactoryPath = actorAddressRegistryPathFor(actorAddress)
|
||||
zkClient.retryUntilConnected(new Callable[Either[Exception, () ⇒ ActorRef]]() {
|
||||
def call: Either[Exception, () ⇒ ActorRef] = {
|
||||
|
|
@ -824,41 +823,40 @@ class DefaultClusterNode private[akka] (
|
|||
} else None
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on all nodes in the cluster.
|
||||
* Using (checking out) actor on a specific set of nodes.
|
||||
*/
|
||||
def useActorOnAllNodes(actorAddress: String) {
|
||||
isConnected ifOn {
|
||||
EventHandler.debug(this,
|
||||
"Using (checking out) actor with address [%s] on all nodes in cluster".format(actorAddress))
|
||||
def useActorOnNodes(nodes: Array[String], actorAddress: String) {
|
||||
EventHandler.debug(this,
|
||||
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
|
||||
|
||||
if (isConnected.get) {
|
||||
|
||||
val command = RemoteDaemonMessageProtocol.newBuilder
|
||||
.setMessageType(USE)
|
||||
.setActorAddress(actorAddress)
|
||||
.build
|
||||
|
||||
nodeConnections.get(node) foreach {
|
||||
case (_, connection) ⇒ sendCommandToNode(connection, command, async = false)
|
||||
nodes foreach { node ⇒
|
||||
nodeConnections.get(node) foreach {
|
||||
case (_, connection) ⇒
|
||||
sendCommandToNode(connection, command, async = false)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on all nodes in the cluster.
|
||||
*/
|
||||
def useActorOnAllNodes(actorAddress: String) {
|
||||
useActorOnNodes(membershipNodes, actorAddress)
|
||||
}
|
||||
|
||||
/**
|
||||
* Using (checking out) actor on a specific node.
|
||||
*/
|
||||
def useActorOnNode(node: String, actorAddress: String) {
|
||||
EventHandler.debug(this,
|
||||
"Sending command to node [%s] for checking out actor [%s]".format(node, actorAddress))
|
||||
|
||||
isConnected ifOn {
|
||||
nodeConnections.get(node) foreach {
|
||||
case (_, connection) ⇒
|
||||
val command = RemoteDaemonMessageProtocol.newBuilder
|
||||
.setMessageType(USE)
|
||||
.setActorAddress(actorAddress)
|
||||
.build
|
||||
sendCommandToNode(connection, command, async = false)
|
||||
}
|
||||
}
|
||||
useActorOnNodes(Array(node), actorAddress)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -875,7 +873,7 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
|
||||
|
||||
isConnected ifOn {
|
||||
if (isConnected.get) {
|
||||
ignore[ZkNoNodeException](zkClient.delete(actorAddressToNodesPathFor(actorAddress, nodeAddress.nodeName)))
|
||||
|
||||
uuidsForActorAddress(actorAddress) foreach { uuid ⇒
|
||||
|
|
@ -892,13 +890,13 @@ class DefaultClusterNode private[akka] (
|
|||
* Releases (checking in) all actors with a specific address on all nodes in the cluster where the actor is in 'use'.
|
||||
*/
|
||||
private[akka] def releaseActorOnAllNodes(actorAddress: String) {
|
||||
isConnected ifOn {
|
||||
if (isConnected.get) {
|
||||
EventHandler.debug(this,
|
||||
"Releasing (checking in) all actors with address [%s] on all nodes in cluster".format(actorAddress))
|
||||
|
||||
val command = RemoteDaemonMessageProtocol.newBuilder
|
||||
.setMessageType(RELEASE)
|
||||
.setActorAddress(actorAddress) // FIXME rename to actorAddress in protobuf protocol
|
||||
.setActorAddress(actorAddress)
|
||||
.build
|
||||
|
||||
nodesForActorsInUseWithAddress(actorAddress) foreach { node ⇒
|
||||
|
|
@ -912,7 +910,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Creates an ActorRef with a Router to a set of clustered actors.
|
||||
*/
|
||||
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
|
||||
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.get) {
|
||||
val addresses = addressesForActor(actorAddress)
|
||||
EventHandler.debug(this,
|
||||
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
|
||||
|
|
@ -936,7 +934,7 @@ class DefaultClusterNode private[akka] (
|
|||
*/
|
||||
def migrate(
|
||||
from: NodeAddress, to: NodeAddress, actorAddress: String) {
|
||||
isConnected ifOn {
|
||||
if (isConnected.get) {
|
||||
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
|
||||
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
|
||||
if (isInUseOnNode(actorAddress, from)) {
|
||||
|
|
@ -960,7 +958,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Returns the UUIDs of all actors registered in this cluster.
|
||||
*/
|
||||
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
|
||||
private[akka] def uuidsForClusteredActors: Array[UUID] = if (isConnected.get) {
|
||||
zkClient.getChildren(ACTOR_UUID_REGISTRY_PATH).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
|
||||
} else Array.empty[UUID]
|
||||
|
||||
|
|
@ -972,7 +970,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Returns the actor id for the actor with a specific UUID.
|
||||
*/
|
||||
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
|
||||
private[akka] def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.get) {
|
||||
try {
|
||||
Some(zkClient.readData(actorUuidRegistryAddressPathFor(uuid)).asInstanceOf[String])
|
||||
} catch {
|
||||
|
|
@ -989,7 +987,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Returns the actor UUIDs for actor ID.
|
||||
*/
|
||||
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
|
||||
private[akka] def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.get) {
|
||||
try {
|
||||
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.toArray map {
|
||||
case c: CharSequence ⇒ new UUID(c)
|
||||
|
|
@ -1002,7 +1000,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Returns the node names of all actors in use with UUID.
|
||||
*/
|
||||
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.isOn) {
|
||||
private[akka] def nodesForActorsInUseWithAddress(actorAddress: String): Array[String] = if (isConnected.get) {
|
||||
try {
|
||||
zkClient.getChildren(actorAddressToNodesPathFor(actorAddress)).toList.toArray.asInstanceOf[Array[String]]
|
||||
} catch {
|
||||
|
|
@ -1013,7 +1011,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Returns the UUIDs of all actors in use registered on a specific node.
|
||||
*/
|
||||
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
|
||||
private[akka] def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.get) {
|
||||
try {
|
||||
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
|
||||
case c: CharSequence ⇒ new UUID(c)
|
||||
|
|
@ -1026,7 +1024,7 @@ class DefaultClusterNode private[akka] (
|
|||
/**
|
||||
* Returns the addresses of all actors in use registered on a specific node.
|
||||
*/
|
||||
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
|
||||
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.get) {
|
||||
val uuids =
|
||||
try {
|
||||
zkClient.getChildren(nodeToUuidsPathFor(nodeName)).toList.toArray map {
|
||||
|
|
@ -1042,8 +1040,6 @@ class DefaultClusterNode private[akka] (
|
|||
* Returns Serializer for actor with specific address.
|
||||
*/
|
||||
def serializerForActor(actorAddress: String): Serializer = {
|
||||
// FIXME should only be 1 single class name per actor address - FIX IT
|
||||
|
||||
val serializerClassName =
|
||||
try {
|
||||
zkClient.readData(actorAddressRegistrySerializerPathFor(actorAddress), new Stat).asInstanceOf[String]
|
||||
|
|
@ -1265,45 +1261,77 @@ class DefaultClusterNode private[akka] (
|
|||
"\n\tserializer = [%s]")
|
||||
.format(nodeAddress.clusterName, nodeAddress.nodeName, port, zkServerAddresses, serializer))
|
||||
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
|
||||
createRootClusterNode()
|
||||
val isLeader = joinLeaderElection()
|
||||
if (isLeader) createNodeStructureIfNeeded()
|
||||
createZooKeeperPathStructureIfNeeded()
|
||||
registerListeners()
|
||||
joinCluster()
|
||||
createActorsAtAddressPath()
|
||||
joinLeaderElection()
|
||||
fetchMembershipNodes()
|
||||
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a random set with replica connections of size 'replicationFactor'.
|
||||
* Default replicationFactor is 0, which returns the empty set.
|
||||
* Returns a random set with node names of size 'replicationFactor'.
|
||||
* Default replicationFactor is 0, which returns the empty Set.
|
||||
*/
|
||||
private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = {
|
||||
var replicas = HashSet.empty[ActorRef]
|
||||
if (replicationFactor < 1) return replicas
|
||||
private def nodesForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[String] = {
|
||||
var replicaNames = Set.empty[String]
|
||||
val nrOfClusterNodes = nodeConnections.size
|
||||
|
||||
val numberOfReplicas = nodeConnections.size
|
||||
val nodeConnectionsAsArray = nodeConnections.toList map {
|
||||
case (node, (address, actorRef)) ⇒ actorRef
|
||||
} // the ActorRefs
|
||||
if (replicationFactor < 1) return replicaNames
|
||||
if (nrOfClusterNodes < replicationFactor) throw new IllegalArgumentException(
|
||||
"Replication factor [" + replicationFactor +
|
||||
"] is greater than the number of available nodeNames [" + nrOfClusterNodes + "]")
|
||||
|
||||
if (numberOfReplicas < replicationFactor) {
|
||||
throw new IllegalArgumentException(
|
||||
"Replication factor [" + replicationFactor +
|
||||
"] is greater than the number of available nodes [" + numberOfReplicas + "]")
|
||||
} else if (numberOfReplicas == replicationFactor) {
|
||||
replicas = replicas ++ nodeConnectionsAsArray
|
||||
} else {
|
||||
val random = new java.util.Random(System.currentTimeMillis)
|
||||
while (replicas.size < replicationFactor) {
|
||||
val index = random.nextInt(numberOfReplicas)
|
||||
replicas = replicas + nodeConnectionsAsArray(index)
|
||||
val preferredNodes =
|
||||
if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor
|
||||
Deployer.deploymentFor(actorAddress.get) match {
|
||||
case Deploy(_, _, _, Clustered(nodes, _, _)) ⇒
|
||||
nodes map (node ⇒ DeploymentConfig.nodeNameFor(node)) take replicationFactor
|
||||
case _ ⇒
|
||||
throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
|
||||
}
|
||||
} else Vector.empty[String]
|
||||
|
||||
for {
|
||||
nodeName ← preferredNodes
|
||||
key ← nodeConnections.keys
|
||||
if key == nodeName
|
||||
} replicaNames = replicaNames + nodeName
|
||||
|
||||
val nrOfCurrentReplicaNames = replicaNames.size
|
||||
|
||||
val replicaSet =
|
||||
if (nrOfCurrentReplicaNames > replicationFactor) throw new IllegalStateException("Replica set is larger than replication factor")
|
||||
else if (nrOfCurrentReplicaNames == replicationFactor) replicaNames
|
||||
else {
|
||||
val random = new java.util.Random(System.currentTimeMillis)
|
||||
while (replicaNames.size < replicationFactor) {
|
||||
replicaNames = replicaNames + membershipNodes(random.nextInt(nrOfClusterNodes))
|
||||
}
|
||||
replicaNames
|
||||
}
|
||||
}
|
||||
replicas
|
||||
|
||||
EventHandler.debug(this,
|
||||
"Picked out replica set [%s] for actor [%s]".format(replicaSet.mkString(", "), actorAddress))
|
||||
|
||||
replicaSet
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns a random set with replica connections of size 'replicationFactor'.
|
||||
* Default replicationFactor is 0, which returns the empty Set.
|
||||
*/
|
||||
private def nodeConnectionsForReplicationFactor(replicationFactor: Int = 0, actorAddress: Option[String] = None): Set[ActorRef] = {
|
||||
for {
|
||||
node ← nodesForReplicationFactor(replicationFactor, actorAddress)
|
||||
connectionOption ← nodeConnections.get(node)
|
||||
connection ← connectionOption
|
||||
actorRef ← connection._2
|
||||
} yield actorRef
|
||||
}
|
||||
|
||||
private val connectToAllNewlyArrivedMembershipNodesInClusterLock = new AtomicBoolean(false)
|
||||
|
||||
/**
|
||||
* Update the list of connections to other nodes in the cluster.
|
||||
*
|
||||
|
|
@ -1311,7 +1339,7 @@ class DefaultClusterNode private[akka] (
|
|||
*/
|
||||
private[cluster] def connectToAllNewlyArrivedMembershipNodesInCluster(
|
||||
newlyConnectedMembershipNodes: Traversable[String],
|
||||
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = { // to prevent race in startup (fetchMembershipNodes vs MembershipChildListener)
|
||||
newlyDisconnectedMembershipNodes: Traversable[String]): Map[String, InetSocketAddress] = {
|
||||
|
||||
// cache the disconnected connections in a map, needed for fail-over of these connections later
|
||||
var disconnectedConnections = Map.empty[String, InetSocketAddress]
|
||||
|
|
@ -1319,25 +1347,29 @@ class DefaultClusterNode private[akka] (
|
|||
disconnectedConnections += (node -> (nodeConnections(node) match { case (address, _) ⇒ address }))
|
||||
}
|
||||
|
||||
// remove connections to failed nodes
|
||||
newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
|
||||
if (connectToAllNewlyArrivedMembershipNodesInClusterLock.compareAndSet(false, true)) {
|
||||
try {
|
||||
// remove connections to failed nodes
|
||||
newlyDisconnectedMembershipNodes foreach (nodeConnections.remove(_))
|
||||
|
||||
// add connections newly arrived nodes
|
||||
newlyConnectedMembershipNodes foreach { node ⇒
|
||||
if (!nodeConnections.contains(node)) { // only connect to each replica once
|
||||
// add connections newly arrived nodes
|
||||
newlyConnectedMembershipNodes foreach { node ⇒
|
||||
if (!nodeConnections.contains(node)) { // only connect to each replica once
|
||||
|
||||
val addressOption = remoteSocketAddressForNode(node)
|
||||
if (addressOption.isDefined) {
|
||||
val address = addressOption.get
|
||||
remoteSocketAddressForNode(node) foreach { address ⇒
|
||||
EventHandler.debug(this,
|
||||
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
|
||||
|
||||
EventHandler.debug(this,
|
||||
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
|
||||
|
||||
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort).start()
|
||||
nodeConnections.put(node, (address, clusterDaemon))
|
||||
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort).start()
|
||||
nodeConnections.put(node, (address, clusterDaemon))
|
||||
}
|
||||
}
|
||||
}
|
||||
} finally {
|
||||
connectToAllNewlyArrivedMembershipNodesInClusterLock.set(false)
|
||||
}
|
||||
}
|
||||
|
||||
disconnectedConnections
|
||||
}
|
||||
|
||||
|
|
@ -1353,6 +1385,7 @@ class DefaultClusterNode private[akka] (
|
|||
EventHandler.error(error, this, error.toString)
|
||||
throw error
|
||||
}
|
||||
ignore[ZkNodeExistsException](zkClient.createPersistent(nodeToUuidsPathFor(nodeAddress.nodeName)))
|
||||
}
|
||||
|
||||
private[cluster] def joinLeaderElection(): Boolean = {
|
||||
|
|
@ -1372,10 +1405,6 @@ class DefaultClusterNode private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
private[cluster] def createActorsAtAddressPath() {
|
||||
ignore[ZkNodeExistsException](zkClient.createPersistent(nodeToUuidsPathFor(nodeAddress.nodeName)))
|
||||
}
|
||||
|
||||
private[cluster] def failOverClusterActorRefConnections(from: InetSocketAddress, to: InetSocketAddress) {
|
||||
clusterActorRefs.values(from) foreach (_.failOver(from, to))
|
||||
}
|
||||
|
|
@ -1477,14 +1506,12 @@ class DefaultClusterNode private[akka] (
|
|||
}
|
||||
}
|
||||
|
||||
private def createRootClusterNode() {
|
||||
private def createZooKeeperPathStructureIfNeeded() {
|
||||
ignore[ZkNodeExistsException] {
|
||||
zkClient.create(CLUSTER_PATH, null, CreateMode.PERSISTENT)
|
||||
EventHandler.info(this, "Created node [%s]".format(CLUSTER_PATH))
|
||||
}
|
||||
}
|
||||
|
||||
private def createNodeStructureIfNeeded() {
|
||||
basePaths.foreach { path ⇒
|
||||
try {
|
||||
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
|
||||
|
|
@ -1530,7 +1557,7 @@ class DefaultClusterNode private[akka] (
|
|||
|
||||
override def resign(): Unit = self.resign()
|
||||
|
||||
override def isConnected = self.isConnected.isOn
|
||||
override def isConnected = self.isConnected.get
|
||||
|
||||
override def getRemoteServerHostname = self.hostname
|
||||
|
||||
|
|
@ -1661,7 +1688,7 @@ object RemoteClusterDaemon {
|
|||
val ADDRESS = "akka-cluster-daemon".intern
|
||||
|
||||
// FIXME configure computeGridDispatcher to what?
|
||||
val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
|
||||
val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -21,6 +21,9 @@ import java.util.{ Map ⇒ JMap }
|
|||
import com.eaio.uuid.UUID
|
||||
|
||||
/**
|
||||
* ActorRef representing a one or many instances of a clustered, load-balanced and sometimes replicated actor
|
||||
* where the instances can reside on other nodes in the cluster.
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ClusterActorRef private[akka] (
|
||||
|
|
@ -36,7 +39,6 @@ class ClusterActorRef private[akka] (
|
|||
})
|
||||
|
||||
ClusterModule.ensureEnabled()
|
||||
start()
|
||||
|
||||
def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
|
||||
|
||||
|
|
@ -91,33 +93,48 @@ class ClusterActorRef private[akka] (
|
|||
def dispatcher_=(md: MessageDispatcher) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
def dispatcher: MessageDispatcher = unsupported
|
||||
|
||||
def link(actorRef: ActorRef) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
def unlink(actorRef: ActorRef) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
def startLink(actorRef: ActorRef): ActorRef = unsupported
|
||||
|
||||
def supervisor: Option[ActorRef] = unsupported
|
||||
|
||||
def linkedActors: JMap[Uuid, ActorRef] = unsupported
|
||||
|
||||
protected[akka] def mailbox: AnyRef = unsupported
|
||||
|
||||
protected[akka] def mailbox_=(value: AnyRef): AnyRef = unsupported
|
||||
|
||||
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
protected[akka] def invoke(messageHandle: MessageInvocation) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
|
||||
unsupported
|
||||
}
|
||||
|
||||
protected[akka] def actorInstance: AtomicReference[Actor] = unsupported
|
||||
|
||||
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -52,7 +52,7 @@ object ClusterDeployer {
|
|||
Cluster.zooKeeperServers,
|
||||
Cluster.sessionTimeout,
|
||||
Cluster.connectionTimeout,
|
||||
Cluster.defaultSerializer)
|
||||
Cluster.defaultZooKeeperSerializer)
|
||||
|
||||
private val deploymentInProgressLockListener = new LockListener {
|
||||
def lockAcquired() {
|
||||
|
|
|
|||
|
|
@ -344,7 +344,7 @@ object TransactionLog {
|
|||
Cluster.zooKeeperServers,
|
||||
Cluster.sessionTimeout,
|
||||
Cluster.connectionTimeout,
|
||||
Cluster.defaultSerializer)
|
||||
Cluster.defaultZooKeeperSerializer)
|
||||
|
||||
try {
|
||||
zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
|
||||
|
|
|
|||
|
|
@ -7,8 +7,6 @@ import org.I0Itec.zkclient._
|
|||
import org.I0Itec.zkclient.serialize._
|
||||
import org.I0Itec.zkclient.exception._
|
||||
|
||||
//import akka.event.EventHandler
|
||||
|
||||
/**
|
||||
* ZooKeeper client. Holds the ZooKeeper connection and manages its session.
|
||||
*/
|
||||
|
|
@ -17,7 +15,6 @@ class AkkaZkClient(zkServers: String,
|
|||
connectionTimeout: Int,
|
||||
zkSerializer: ZkSerializer = new SerializableSerializer)
|
||||
extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) {
|
||||
// EventHandler.debug(this, "Connecting to ZooKeeper ensamble [%s]" format zkServers)
|
||||
|
||||
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
|
||||
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -17,7 +17,6 @@ import akka.config.Config
|
|||
import akka.serialization.Serialization
|
||||
|
||||
import java.util.concurrent._
|
||||
|
||||
/*
|
||||
object MigrationExplicitMultiJvmSpec {
|
||||
var NrOfNodes = 2
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1 +1,2 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
|
|||
|
|
@ -1,4 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
|
|||
|
|
@ -0,0 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["host:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -0,0 +1,59 @@
|
|||
package akka.cluster.routing.homenode
|
||||
|
||||
import akka.config.Config
|
||||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
|
||||
import Cluster._
|
||||
|
||||
object HomeNodeMultiJvmSpec {
|
||||
|
||||
val NrOfNodes = 2
|
||||
|
||||
class SomeActor extends Actor with Serializable {
|
||||
def receive = {
|
||||
case "identify" ⇒ {
|
||||
self.reply(Config.nodename)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class HomeNodeMultiJvmNode1 extends MasterClusterTestNode {
|
||||
|
||||
import HomeNodeMultiJvmSpec._
|
||||
|
||||
val testNodes = NrOfNodes
|
||||
|
||||
"A Router" must {
|
||||
"obey 'home-node' config option when instantiated actor in cluster" in {
|
||||
|
||||
node.start()
|
||||
barrier("waiting-for-begin", NrOfNodes).await()
|
||||
|
||||
barrier("get-ref-to-actor-on-node2", NrOfNodes).await()
|
||||
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class HomeNodeMultiJvmNode2 extends ClusterTestNode {
|
||||
|
||||
import HomeNodeMultiJvmSpec._
|
||||
|
||||
"A Router" must {
|
||||
"obey 'home-node' config option when instantiated actor in cluster" in {
|
||||
|
||||
node.start()
|
||||
barrier("waiting-for-begin", NrOfNodes).await()
|
||||
|
||||
barrier("get-ref-to-actor-on-node2", NrOfNodes) {
|
||||
val actor = Actor.actorOf[SomeActor]("service-hello")
|
||||
val name = (actor ? "identify").get.asInstanceOf[String]
|
||||
name must equal("node1")
|
||||
}
|
||||
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -1,5 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 2
|
||||
akka.actor.deployment.service-hello.clustered.stateless = on
|
||||
|
|
|
|||
|
|
@ -1,5 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1","node:node2"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 2
|
||||
akka.actor.deployment.service-hello.clustered.stateless = on
|
||||
|
|
@ -1,5 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 3
|
||||
akka.actor.deployment.service-hello.clustered.stateless = on
|
||||
|
|
|
|||
|
|
@ -1,5 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 3
|
||||
akka.actor.deployment.service-hello.clustered.stateless = on
|
||||
|
|
@ -1,5 +1,4 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 3
|
||||
akka.actor.deployment.service-hello.clustered.stateless = on
|
||||
|
|
|
|||
|
|
@ -1,4 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -1,4 +1,5 @@
|
|||
akka.enabled-modules = ["cluster"]
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -4,7 +4,7 @@ object SomeNode extends ClusterNodeWithConf{
|
|||
def config() = "
|
||||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.preferred-nodes = ["node:node1"]
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1,4 +0,0 @@
|
|||
akka.event-handler-level = "INFO"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
akka.event-handler-level = "INFO"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 1
|
||||
|
|
@ -1,68 +0,0 @@
|
|||
package akka.cluster.routing.routing_identity_problem
|
||||
|
||||
import akka.config.Config
|
||||
import akka.actor.{ ActorRef, Actor }
|
||||
import akka.cluster.{ ClusterTestNode, MasterClusterTestNode, Cluster }
|
||||
|
||||
object RoutingIdentityProblemMultiJvmSpec {
|
||||
|
||||
val NrOfNodes = 2
|
||||
|
||||
class SomeActor extends Actor with Serializable {
|
||||
println("---------------------------------------------------------------------------")
|
||||
println("SomeActor has been created on node [" + Config.nodename + "]")
|
||||
println("---------------------------------------------------------------------------")
|
||||
|
||||
def receive = {
|
||||
case "identify" ⇒ {
|
||||
println("The node received the 'identify' command: " + Config.nodename)
|
||||
self.reply(Config.nodename)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RoutingIdentityProblemMultiJvmNode1 extends MasterClusterTestNode {
|
||||
|
||||
import RoutingIdentityProblemMultiJvmSpec._
|
||||
|
||||
val testNodes = NrOfNodes
|
||||
|
||||
"foo" must {
|
||||
"bla" in {
|
||||
Cluster.node.start()
|
||||
|
||||
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
|
||||
|
||||
var hello: ActorRef = null
|
||||
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
|
||||
hello = Actor.actorOf[SomeActor]("service-hello")
|
||||
}
|
||||
|
||||
Cluster.barrier("waiting-to-end", NrOfNodes).await()
|
||||
Cluster.node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RoutingIdentityProblemMultiJvmNode2 extends ClusterTestNode {
|
||||
|
||||
import RoutingIdentityProblemMultiJvmSpec._
|
||||
|
||||
"foo" must {
|
||||
"bla" in {
|
||||
Cluster.node.start()
|
||||
Cluster.barrier("waiting-for-begin", NrOfNodes).await()
|
||||
|
||||
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
|
||||
|
||||
val actor = Actor.actorOf[SomeActor]("service-hello")
|
||||
val name: String = (actor ? "identify").get.asInstanceOf[String]
|
||||
//todo: Jonas: this is the line that needs to be uncommented to get the test to fail.
|
||||
//name must equal("node1")
|
||||
|
||||
Cluster.barrier("waiting-to-end", NrOfNodes).await()
|
||||
Cluster.node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node1"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 2
|
||||
|
|
@ -1 +0,0 @@
|
|||
-Dakka.cluster.nodename=node1 -Dakka.cluster.port=9991
|
||||
|
|
@ -1,4 +0,0 @@
|
|||
akka.event-handler-level = "DEBUG"
|
||||
akka.actor.deployment.service-hello.router = "round-robin"
|
||||
akka.actor.deployment.service-hello.clustered.home = "node:node2"
|
||||
akka.actor.deployment.service-hello.clustered.replicas = 2
|
||||
|
|
@ -1 +0,0 @@
|
|||
-Dakka.cluster.nodename=node2 -Dakka.cluster.port=9992
|
||||
|
|
@ -1,70 +0,0 @@
|
|||
package akka.cluster.routing.use_homenode_as_replica
|
||||
|
||||
import org.scalatest.matchers.MustMatchers
|
||||
import akka.config.Config
|
||||
import org.scalatest.{ BeforeAndAfterAll, WordSpec }
|
||||
import akka.cluster._
|
||||
import Cluster._
|
||||
import akka.actor.{ ActorRef, Actor }
|
||||
|
||||
object UseHomeNodeAsReplicaMultiJvmSpec {
|
||||
val NrOfNodes = 2
|
||||
|
||||
class HelloWorld extends Actor with Serializable {
|
||||
def receive = {
|
||||
case x: String ⇒ {
|
||||
println("Hello message was received")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class UseHomeNodeAsReplicaMultiJvmNode1 extends MasterClusterTestNode {
|
||||
|
||||
import UseHomeNodeAsReplicaMultiJvmSpec._
|
||||
|
||||
val testNodes = NrOfNodes
|
||||
|
||||
"foo" must {
|
||||
"bla" in {
|
||||
println("Node 1 has started")
|
||||
|
||||
barrier("start-node1", NrOfNodes) {
|
||||
node.start()
|
||||
}
|
||||
|
||||
barrier("start-node2", NrOfNodes) {}
|
||||
|
||||
println("Getting reference to service-hello actor")
|
||||
var hello: ActorRef = null
|
||||
barrier("get-ref-to-actor-on-node2", NrOfNodes) {
|
||||
hello = Actor.actorOf[HelloWorld]("service-hello")
|
||||
}
|
||||
|
||||
println("Saying hello to actor")
|
||||
hello ! "say hello"
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class UseHomeNodeAsReplicaMultiJvmNode2 extends ClusterTestNode {
|
||||
|
||||
import UseHomeNodeAsReplicaMultiJvmSpec._
|
||||
"foo" must {
|
||||
"bla" in {
|
||||
println("Waiting for Node 1 to start")
|
||||
barrier("start-node1", NrOfNodes) {}
|
||||
|
||||
println("Waiting for himself to start???")
|
||||
barrier("start-node2", NrOfNodes) {
|
||||
node.start()
|
||||
}
|
||||
|
||||
barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
|
||||
|
||||
println("Shutting down JVM Node 2")
|
||||
node.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -117,7 +117,7 @@ The Actor’s supervision can be declaratively defined by creating a ‘Supervis
|
|||
import static akka.config.Supervision.*;
|
||||
import static akka.actor.Actors.*;
|
||||
|
||||
Supervisor supervisor = new Supervisor(
|
||||
Supervisor supervisor = Supervisor.apply(
|
||||
new SupervisorConfig(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
|
|
@ -141,13 +141,14 @@ MaximumNumberOfRestartsWithinTimeRangeReached message.
|
|||
import static akka.actor.Actors.*;
|
||||
import akka.event.JavaEventHandler;
|
||||
|
||||
Procedure2<ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached> handler = new Procedure2<ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached>() {
|
||||
public void apply(ActorRef ref, MaximumNumberOfRestartsWithinTimeRangeReached max) {
|
||||
JavaEventHandler.error(ref, max);
|
||||
}
|
||||
};
|
||||
Procedure2<ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached> handler =
|
||||
new Procedure2<ActorRef, MaximumNumberOfRestartsWithinTimeRangeReached>() {
|
||||
public void apply(ActorRef ref, MaximumNumberOfRestartsWithinTimeRangeReached max) {
|
||||
JavaEventHandler.error(ref, max);
|
||||
}
|
||||
};
|
||||
|
||||
Supervisor supervisor = new Supervisor(
|
||||
Supervisor supervisor = Supervisor.apply(
|
||||
new SupervisorConfig(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
|
|
@ -165,7 +166,7 @@ You can link and unlink actors from a declaratively defined supervisor using the
|
|||
|
||||
.. code-block:: java
|
||||
|
||||
Supervisor supervisor = new Supervisor(...);
|
||||
Supervisor supervisor = Supervisor.apply(...);
|
||||
supervisor.link(..);
|
||||
supervisor.unlink(..);
|
||||
|
||||
|
|
@ -209,7 +210,7 @@ Here is an example:
|
|||
import static akka.config.Supervision.*;
|
||||
import static akka.actor.Actors.*;
|
||||
|
||||
Supervisor supervisor = new Supervisor(
|
||||
Supervisor supervisor = Supervisor.apply(
|
||||
new SupervisorConfig(
|
||||
new AllForOneStrategy(new Class[]{Exception.class}, 3, 5000),
|
||||
new Supervise[] {
|
||||
|
|
|
|||
|
|
@ -51,7 +51,8 @@ akka {
|
|||
clustered { # makes the actor available in the cluster registry
|
||||
# default (if omitted) is local non-clustered actor
|
||||
|
||||
home = "node:node1" # hostname, IP-address or node name of the "home" node for clustered actor
|
||||
preferred-nodes = ["node:node1"] # a list of preferred nodes for instantiating the actor instances on
|
||||
# defined as: hostname, IP-address or node name
|
||||
# available: "host:<hostname>", "ip:<ip address>" and "node:<node name>"
|
||||
# default is "host:localhost"
|
||||
|
||||
|
|
@ -59,6 +60,7 @@ akka {
|
|||
# available: positivoe integer (0-N) or the string "auto" for auto-scaling
|
||||
# if "auto" is used then 'home' has no meaning
|
||||
# default is '0', meaning no replicas;
|
||||
# if the "direct" router is used then this configuration element is ignored
|
||||
|
||||
replication { # use replication or not?
|
||||
|
||||
|
|
@ -101,9 +103,9 @@ akka {
|
|||
|
||||
debug {
|
||||
receive = "false" # enable function of Actor.loggable(), which is
|
||||
# to log any received message at DEBUG level
|
||||
# to log any received message at DEBUG level
|
||||
autoreceive = "false" # enable DEBUG logging of all AutoReceiveMessages
|
||||
# (Kill, PoisonPill and the like)
|
||||
# (Kill, PoisonPill and the like)
|
||||
lifecycle = "false" # enable DEBUG logging of actor lifecycle changes
|
||||
}
|
||||
|
||||
|
|
@ -182,8 +184,7 @@ akka {
|
|||
connection-timeout = 60
|
||||
use-compression = off
|
||||
remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc.
|
||||
exclude-ref-node-in-replica-set = on # Should a replica be instantiated on the same node as the
|
||||
# cluster reference to the actor
|
||||
include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor
|
||||
# Default: on
|
||||
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
|
||||
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
||||
|
|
@ -260,6 +261,6 @@ akka {
|
|||
}
|
||||
|
||||
test {
|
||||
timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load
|
||||
timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue