2011-06-17 10:25:02 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
/**
|
|
|
|
|
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
|
|
|
|
|
*/
|
2011-04-28 20:12:37 +02:00
|
|
|
package akka.cluster
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
import org.apache.zookeeper._
|
|
|
|
|
import org.apache.zookeeper.Watcher.Event._
|
|
|
|
|
import org.apache.zookeeper.data.Stat
|
2011-05-18 17:25:30 +02:00
|
|
|
import org.apache.zookeeper.recipes.lock.{ WriteLock, LockListener }
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
import org.I0Itec.zkclient._
|
|
|
|
|
import org.I0Itec.zkclient.serialize._
|
|
|
|
|
import org.I0Itec.zkclient.exception._
|
|
|
|
|
|
2011-06-14 19:35:18 +02:00
|
|
|
import java.util.{ List ⇒ JList }
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicReference, AtomicInteger }
|
|
|
|
|
import java.util.concurrent.{ ConcurrentSkipListSet, CopyOnWriteArrayList, Callable, ConcurrentHashMap }
|
2011-04-27 01:10:00 +02:00
|
|
|
import java.net.InetSocketAddress
|
|
|
|
|
import javax.management.StandardMBean
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
import scala.collection.immutable.{ HashMap, HashSet }
|
2011-04-27 01:10:00 +02:00
|
|
|
import scala.collection.mutable.ConcurrentMap
|
|
|
|
|
import scala.collection.JavaConversions._
|
|
|
|
|
|
|
|
|
|
import ClusterProtocol._
|
|
|
|
|
import RemoteDaemonMessageType._
|
|
|
|
|
|
|
|
|
|
import akka.util._
|
2011-04-28 20:12:37 +02:00
|
|
|
import Helpers._
|
2011-06-14 19:35:18 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
import akka.actor._
|
2011-05-20 09:08:11 +02:00
|
|
|
import Actor._
|
2011-05-25 16:18:35 +02:00
|
|
|
import Status._
|
2011-06-10 16:31:24 +01:00
|
|
|
import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind }
|
2011-06-14 19:35:18 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
import akka.event.EventHandler
|
2011-05-18 17:25:30 +02:00
|
|
|
import akka.dispatch.{ Dispatchers, Future }
|
2011-04-27 01:10:00 +02:00
|
|
|
import akka.remoteinterface._
|
2011-05-24 19:04:25 +02:00
|
|
|
import akka.routing.RouterType
|
2011-06-14 19:35:18 +02:00
|
|
|
|
2011-05-30 10:53:25 +02:00
|
|
|
import akka.config.{ Config, Supervision }
|
|
|
|
|
import Supervision._
|
2011-05-20 09:08:11 +02:00
|
|
|
import Config._
|
2011-06-14 19:35:18 +02:00
|
|
|
|
|
|
|
|
import akka.serialization.{ Serialization, Serializer, Compression }
|
2011-05-20 09:08:11 +02:00
|
|
|
import Compression.LZF
|
2011-04-27 01:10:00 +02:00
|
|
|
import akka.AkkaException
|
|
|
|
|
|
2011-04-28 20:12:37 +02:00
|
|
|
import akka.cluster.zookeeper._
|
2011-05-23 22:35:01 +02:00
|
|
|
import akka.cluster.ChangeListener._
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
import com.eaio.uuid.UUID
|
|
|
|
|
|
|
|
|
|
import com.google.protobuf.ByteString
|
|
|
|
|
|
|
|
|
|
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
|
|
|
|
|
// FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* JMX MBean for the cluster service.
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
trait ClusterNodeMBean {
|
2011-05-18 08:37:58 +02:00
|
|
|
def start()
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def stop()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def disconnect()
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def reconnect()
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def resign()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
def isConnected: Boolean
|
|
|
|
|
|
|
|
|
|
def getRemoteServerHostname: String
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def getRemoteServerPort: Int
|
|
|
|
|
|
|
|
|
|
def getNodeName: String
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def getClusterName: String
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def getZooKeeperServerAddresses: String
|
|
|
|
|
|
|
|
|
|
def getMemberNodes: Array[String]
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def getLeader: String
|
|
|
|
|
|
|
|
|
|
def getUuidsForClusteredActors: Array[String]
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
def getAddressesForClusteredActors: Array[String]
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
def getUuidsForActorsInUse: Array[String]
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
def getAddressesForActorsInUse: Array[String]
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
def getNodesForActorInUseWithUuid(uuid: String): Array[String]
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
def getNodesForActorInUseWithAddress(address: String): Array[String]
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
def getUuidsForActorsInUseOnNode(nodeName: String): Array[String]
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
def getAddressesForActorsInUseOnNode(nodeName: String): Array[String]
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def setConfigElement(key: String, value: String)
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def getConfigElement(key: String): AnyRef
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def removeConfigElement(key: String)
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def getConfigElementKeys: Array[String]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-05-20 09:08:11 +02:00
|
|
|
* Module for the ClusterNode. Also holds global state such as configuration data etc.
|
2011-04-27 01:10:00 +02:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
object Cluster {
|
|
|
|
|
val EMPTY_STRING = "".intern
|
2011-05-18 12:25:27 +02:00
|
|
|
val UUID_PREFIX = "uuid:".intern
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// config options
|
2011-05-23 22:35:01 +02:00
|
|
|
val name = Config.clusterName
|
2011-05-18 12:25:27 +02:00
|
|
|
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
|
|
|
|
|
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
|
|
|
|
|
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
|
|
|
|
|
val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
|
2011-04-28 20:12:37 +02:00
|
|
|
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
|
2011-05-18 12:25:27 +02:00
|
|
|
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
|
|
|
|
|
val enableJMX = config.getBool("akka.enable-jmx", true)
|
2011-05-25 16:18:35 +02:00
|
|
|
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
|
|
|
|
|
val excludeRefNodeInReplicaSet = config.getBool("akka.cluster.exclude-ref-node-in-replica-set", true)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-20 09:08:11 +02:00
|
|
|
@volatile
|
|
|
|
|
private var properties = Map.empty[String, String]
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-20 09:08:11 +02:00
|
|
|
def setProperty(property: (String, String)) {
|
|
|
|
|
properties = properties + property
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
private def nodename: String = properties.get("akka.cluster.nodename") match {
|
|
|
|
|
case Some(uberride) ⇒ uberride
|
|
|
|
|
case None ⇒ Config.nodename
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
private def hostname: String = properties.get("akka.cluster.hostname") match {
|
|
|
|
|
case Some(uberride) ⇒ uberride
|
|
|
|
|
case None ⇒ Config.hostname
|
2011-05-20 09:08:11 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
private def port: Int = properties.get("akka.cluster.port") match {
|
|
|
|
|
case Some(uberride) ⇒ uberride.toInt
|
|
|
|
|
case None ⇒ Config.remoteServerPort
|
2011-05-20 09:08:11 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-20 09:08:11 +02:00
|
|
|
val defaultSerializer = new SerializableSerializer
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-20 09:08:11 +02:00
|
|
|
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-05-20 09:08:11 +02:00
|
|
|
* The node address.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-05-23 22:35:01 +02:00
|
|
|
val nodeAddress = NodeAddress(name, nodename, hostname, port)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-05-20 09:08:11 +02:00
|
|
|
* The reference to the running ClusterNode.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-05-23 22:35:01 +02:00
|
|
|
val node = {
|
2011-05-20 09:08:11 +02:00
|
|
|
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
|
2011-05-23 22:35:01 +02:00
|
|
|
new DefaultClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
|
2011-05-20 09:08:11 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-05-20 09:08:11 +02:00
|
|
|
* Looks up the local hostname.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def lookupLocalhostName = NetworkUtil.getLocalhostName
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
|
|
|
|
|
*/
|
|
|
|
|
def startLocalCluster(): ZkServer =
|
|
|
|
|
startLocalCluster("_akka_cluster/data", "_akka_cluster/log", 2181, 5000)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
|
|
|
|
|
*/
|
|
|
|
|
def startLocalCluster(port: Int, tickTime: Int): ZkServer =
|
|
|
|
|
startLocalCluster("_akka_cluster/data", "_akka_cluster/log", port, tickTime)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
|
|
|
|
|
*/
|
|
|
|
|
def startLocalCluster(tickTime: Int): ZkServer =
|
|
|
|
|
startLocalCluster("_akka_cluster/data", "_akka_cluster/log", 2181, tickTime)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
|
|
|
|
|
*/
|
|
|
|
|
def startLocalCluster(dataPath: String, logPath: String): ZkServer =
|
|
|
|
|
startLocalCluster(dataPath, logPath, 2181, 500)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
|
|
|
|
|
*/
|
|
|
|
|
def startLocalCluster(dataPath: String, logPath: String, port: Int, tickTime: Int): ZkServer = {
|
|
|
|
|
try {
|
|
|
|
|
val zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath, port, tickTime)
|
|
|
|
|
_zkServer.set(Some(zkServer))
|
|
|
|
|
zkServer
|
|
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: Throwable ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.error(e, this, "Could not start local ZooKeeper cluster")
|
|
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Shut down the local ZooKeeper server.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def shutdownLocalCluster() {
|
|
|
|
|
withPrintStackTraceOnError {
|
|
|
|
|
EventHandler.info(this, "Shuts down local cluster")
|
|
|
|
|
_zkServer.get.foreach(_.shutdown())
|
|
|
|
|
_zkServer.set(None)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates a new AkkaZkClient.
|
|
|
|
|
*/
|
2011-05-30 10:53:25 +02:00
|
|
|
def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-23 22:35:01 +02:00
|
|
|
def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
|
|
|
|
|
|
|
|
|
|
def barrier(name: String, count: Int) =
|
|
|
|
|
ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count)
|
|
|
|
|
|
|
|
|
|
def barrier(name: String, count: Int, timeout: Duration) =
|
|
|
|
|
ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count, timeout)
|
|
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def uuidToString(uuid: UUID): String = uuid.toString
|
|
|
|
|
|
|
|
|
|
def stringToUuid(uuid: String): UUID = {
|
|
|
|
|
if (uuid eq null) throw new ClusterException("UUID is null")
|
2011-05-18 12:25:27 +02:00
|
|
|
if (uuid == "") throw new ClusterException("UUID is an empty string")
|
|
|
|
|
try {
|
|
|
|
|
new UUID(uuid)
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: StringIndexOutOfBoundsException ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
val error = new ClusterException("UUID not valid [" + uuid + "]")
|
|
|
|
|
EventHandler.error(error, this, "")
|
|
|
|
|
throw error
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def uuidProtocolToUuid(uuid: UuidProtocol) = new UUID(uuid.getHigh, uuid.getLow)
|
|
|
|
|
|
|
|
|
|
def uuidToUuidProtocol(uuid: UUID) =
|
|
|
|
|
UuidProtocol.newBuilder
|
|
|
|
|
.setHigh(uuid.getTime)
|
|
|
|
|
.setLow(uuid.getClockSeqAndNode)
|
|
|
|
|
.build
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-05-18 08:37:58 +02:00
|
|
|
* A Cluster is made up by a bunch of jvm's, the ClusterNode.
|
|
|
|
|
*
|
2011-04-27 01:10:00 +02:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
2011-05-23 22:35:01 +02:00
|
|
|
class DefaultClusterNode private[akka] (
|
2011-05-18 17:25:30 +02:00
|
|
|
val nodeAddress: NodeAddress,
|
|
|
|
|
val zkServerAddresses: String,
|
2011-05-23 22:35:01 +02:00
|
|
|
val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
|
2011-05-18 17:25:30 +02:00
|
|
|
self ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'")
|
|
|
|
|
|
2011-05-23 22:35:01 +02:00
|
|
|
val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-23 22:35:01 +02:00
|
|
|
import Cluster._
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-23 22:35:01 +02:00
|
|
|
lazy val remoteClientLifeCycleListener = actorOf(new Actor {
|
2011-04-27 01:10:00 +02:00
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case RemoteClientError(cause, client, address) ⇒ client.shutdownClientModule()
|
|
|
|
|
case RemoteClientDisconnected(client, address) ⇒ client.shutdownClientModule()
|
|
|
|
|
case _ ⇒ //ignore other
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
2011-05-30 10:53:25 +02:00
|
|
|
}, "akka.cluster.RemoteClientLifeCycleListener").start()
|
|
|
|
|
|
2011-05-19 10:58:30 +02:00
|
|
|
lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
|
2011-05-30 10:53:25 +02:00
|
|
|
|
|
|
|
|
lazy val remoteDaemonSupervisor = Supervisor(
|
|
|
|
|
SupervisorConfig(
|
|
|
|
|
OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
|
|
|
|
|
Supervise(
|
|
|
|
|
remoteDaemon,
|
|
|
|
|
Permanent)
|
|
|
|
|
:: Nil))
|
|
|
|
|
|
2011-05-19 10:58:30 +02:00
|
|
|
lazy val remoteService: RemoteSupport = {
|
2011-04-27 01:10:00 +02:00
|
|
|
val remote = new akka.remote.netty.NettyRemoteSupport
|
|
|
|
|
remote.start(nodeAddress.hostname, nodeAddress.port)
|
2011-04-28 20:12:37 +02:00
|
|
|
remote.register(RemoteClusterDaemon.ADDRESS, remoteDaemon)
|
2011-04-27 01:10:00 +02:00
|
|
|
remote.addListener(remoteClientLifeCycleListener)
|
|
|
|
|
remote
|
|
|
|
|
}
|
2011-05-30 10:53:25 +02:00
|
|
|
|
2011-05-19 10:58:30 +02:00
|
|
|
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// static nodes
|
2011-05-18 12:25:27 +02:00
|
|
|
val CLUSTER_NODE = "/" + nodeAddress.clusterName
|
|
|
|
|
val MEMBERSHIP_NODE = CLUSTER_NODE + "/members"
|
|
|
|
|
val CONFIGURATION_NODE = CLUSTER_NODE + "/config"
|
|
|
|
|
val PROVISIONING_NODE = CLUSTER_NODE + "/provisioning"
|
|
|
|
|
val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry"
|
|
|
|
|
val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations"
|
2011-04-29 15:47:56 +02:00
|
|
|
val ACTOR_ADDRESS_TO_UUIDS_NODE = CLUSTER_NODE + "/actor-address-to-uuids"
|
2011-05-18 12:25:27 +02:00
|
|
|
val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address"
|
2011-04-27 01:10:00 +02:00
|
|
|
val baseNodes = List(
|
|
|
|
|
CLUSTER_NODE,
|
|
|
|
|
MEMBERSHIP_NODE,
|
|
|
|
|
ACTOR_REGISTRY_NODE,
|
|
|
|
|
ACTOR_LOCATIONS_NODE,
|
2011-04-29 15:47:56 +02:00
|
|
|
ACTORS_AT_NODE_NODE,
|
|
|
|
|
ACTOR_ADDRESS_TO_UUIDS_NODE,
|
2011-04-27 01:10:00 +02:00
|
|
|
CONFIGURATION_NODE,
|
|
|
|
|
PROVISIONING_NODE)
|
|
|
|
|
|
|
|
|
|
val LEADER_ELECTION_NODE = CLUSTER_NODE + "/leader" // should NOT be part of 'baseNodes' only used by 'leaderLock'
|
|
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
private val membershipNodePath = membershipPathFor(nodeAddress.nodeName)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
|
|
|
|
|
|
|
|
|
|
private[akka] val replicaConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] =
|
|
|
|
|
new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
|
|
|
|
|
|
|
|
|
|
// zookeeper listeners
|
2011-05-18 12:25:27 +02:00
|
|
|
private val stateListener = new StateListener(this)
|
|
|
|
|
private val membershipListener = new MembershipChildListener(this)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// cluster node listeners
|
2011-05-18 12:25:27 +02:00
|
|
|
private val changeListeners = new CopyOnWriteArrayList[ChangeListener]()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// Address -> ClusterActorRef
|
2011-05-18 12:25:27 +02:00
|
|
|
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// resources
|
2011-05-23 22:35:01 +02:00
|
|
|
lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-23 22:35:01 +02:00
|
|
|
lazy private[cluster] val leaderElectionCallback = new LockListener {
|
2011-05-18 08:37:58 +02:00
|
|
|
override def lockAcquired() {
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
|
|
|
|
|
self.isLeader.set(true)
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(NewLeader(self.nodeAddress.nodeName))
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
override def lockReleased() {
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.info(this,
|
|
|
|
|
"Node [%s] is *NOT* the leader anymore".format(self.nodeAddress.nodeName))
|
|
|
|
|
self.isLeader.set(false)
|
|
|
|
|
// self.publish(Cluster.LeaderChange)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-23 22:35:01 +02:00
|
|
|
lazy private[cluster] val leaderLock = new WriteLock(
|
2011-04-27 01:10:00 +02:00
|
|
|
zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) {
|
2011-06-17 10:25:02 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
// ugly hack, but what do you do? <--- haha epic
|
|
|
|
|
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
|
|
|
|
|
ownerIdField.setAccessible(true)
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def leader: String = ownerIdField.get(this).asInstanceOf[String]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (enableJMX) createMBean
|
|
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Node
|
|
|
|
|
// =======================================
|
|
|
|
|
|
|
|
|
|
def start(): ClusterNode = {
|
|
|
|
|
isConnected switchOn {
|
2011-05-18 08:37:58 +02:00
|
|
|
initializeNode()
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-20 14:36:26 +02:00
|
|
|
def shutdown() {
|
2011-05-18 08:37:58 +02:00
|
|
|
isConnected switchOff {
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.deleteRecursive(membershipNodePath))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
locallyCachedMembershipNodes.clear()
|
|
|
|
|
locallyCheckedOutActors.clear()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
replicaConnections.toList.foreach({
|
2011-05-18 17:25:30 +02:00
|
|
|
case (_, (address, _)) ⇒
|
2011-05-18 08:37:58 +02:00
|
|
|
Actor.remote.shutdownClientConnection(address) // shut down client connections
|
|
|
|
|
})
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
remoteService.shutdown() // shutdown server
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
remoteClientLifeCycleListener.stop()
|
|
|
|
|
remoteDaemon.stop()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
// for monitoring remote listener
|
|
|
|
|
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
replicaConnections.clear()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
disconnect()
|
|
|
|
|
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def disconnect(): ClusterNode = {
|
2011-05-18 08:37:58 +02:00
|
|
|
zkClient.unsubscribeAll()
|
|
|
|
|
zkClient.close()
|
2011-04-27 01:10:00 +02:00
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def reconnect(): ClusterNode = {
|
2011-05-18 08:37:58 +02:00
|
|
|
zkClient.reconnect()
|
2011-04-27 01:10:00 +02:00
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Change notification
|
|
|
|
|
// =======================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Registers a cluster change listener.
|
|
|
|
|
*/
|
|
|
|
|
def register(listener: ChangeListener): ClusterNode = if (isConnected.isOff) {
|
|
|
|
|
changeListeners.add(listener)
|
|
|
|
|
this
|
|
|
|
|
} else throw new IllegalStateException("Can not register 'ChangeListener' after the cluster node has been started")
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
private[cluster] def publish(change: ChangeNotification) {
|
|
|
|
|
changeListeners.iterator.foreach(_.notify(change, this))
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Leader
|
|
|
|
|
// =======================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the name of the current leader.
|
|
|
|
|
*/
|
|
|
|
|
def leader: String = leaderLock.leader
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
|
|
|
|
|
*/
|
2011-05-18 12:25:27 +02:00
|
|
|
def resign() {
|
|
|
|
|
if (isLeader.get) leaderLock.unlock()
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Actor
|
|
|
|
|
// =======================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(Actor.actorOf(actorClass, address).start, 0, Transient, false, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
|
|
|
|
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, false, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, false, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
|
|
|
|
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, false, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(Actor.actorOf(actorClass, address).start, 0, Transient, serializeMailbox, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
|
|
|
|
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, serializeMailbox, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, serializeMailbox, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
|
|
|
|
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, serializeMailbox, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store(actorRef: ActorRef, format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(actorRef, 0, Transient, false, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
|
|
|
|
store(actorRef, 0, replicationScheme, false, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(actorRef, replicationFactor, Transient, false, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
|
|
|
|
|
store(actorRef, replicationFactor, replicationScheme, false, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(actorRef, 0, Transient, serializeMailbox, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
|
|
|
|
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
|
|
|
|
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
|
|
|
|
|
store(actorRef, 0, replicationScheme, serializeMailbox, format)
|
2011-06-07 11:10:29 -07:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Needed to have reflection through structural typing work.
|
|
|
|
|
*/
|
2011-06-10 16:31:24 +01:00
|
|
|
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
|
|
|
|
|
store(actorRef, replicationFactor, replicationScheme, serializeMailbox, format.asInstanceOf[Serializer])
|
2011-05-20 09:08:11 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Needed to have reflection through structural typing work.
|
|
|
|
|
*/
|
|
|
|
|
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
|
2011-06-07 11:10:29 -07:00
|
|
|
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
|
|
|
|
|
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
|
|
|
|
|
* available durable store.
|
|
|
|
|
*/
|
2011-06-07 11:10:29 -07:00
|
|
|
def store(
|
|
|
|
|
actorRef: ActorRef,
|
|
|
|
|
replicationFactor: Int,
|
2011-06-10 16:31:24 +01:00
|
|
|
replicationScheme: ReplicationScheme,
|
2011-06-07 11:10:29 -07:00
|
|
|
serializeMailbox: Boolean,
|
|
|
|
|
format: Serializer): ClusterNode = if (isConnected.isOn) {
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
import akka.serialization.ActorSerialization._
|
|
|
|
|
|
|
|
|
|
if (!actorRef.isInstanceOf[LocalActorRef]) throw new IllegalArgumentException(
|
|
|
|
|
"'actorRef' must be an instance of 'LocalActorRef' [" + actorRef.getClass.getName + "]")
|
|
|
|
|
|
|
|
|
|
val uuid = actorRef.uuid
|
|
|
|
|
EventHandler.debug(this,
|
2011-05-25 16:18:35 +02:00
|
|
|
"Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-07 11:10:29 -07:00
|
|
|
val actorBytes =
|
2011-06-14 19:35:18 +02:00
|
|
|
if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme))
|
|
|
|
|
else toBinary(actorRef, serializeMailbox, replicationScheme)
|
2011-06-07 11:10:29 -07:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
val actorRegistryPath = actorRegistryPathFor(uuid)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
// create UUID -> Array[Byte] for actor registry
|
2011-06-17 10:25:02 +02:00
|
|
|
try {
|
|
|
|
|
zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper
|
|
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ // if not stored yet, store the actor
|
|
|
|
|
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
|
|
|
|
|
def call: Either[String, Exception] = {
|
|
|
|
|
try {
|
|
|
|
|
Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT))
|
|
|
|
|
} catch {
|
|
|
|
|
case e: KeeperException.NodeExistsException ⇒ Right(e)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
2011-06-17 10:25:02 +02:00
|
|
|
}) match {
|
|
|
|
|
case Left(path) ⇒ path
|
|
|
|
|
case Right(exception) ⇒ actorRegistryPath
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
// create UUID -> Format registry
|
|
|
|
|
try {
|
|
|
|
|
zkClient.createPersistent(actorRegistryFormatPathFor(uuid), format)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryFormatPathFor(uuid), format)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
// create UUID -> ADDRESS registry
|
|
|
|
|
try {
|
|
|
|
|
zkClient.createPersistent(actorRegistryActorAddressPathFor(uuid), actorRef.address)
|
|
|
|
|
} catch {
|
|
|
|
|
case e: ZkNodeExistsException ⇒ zkClient.writeData(actorRegistryActorAddressPathFor(uuid), actorRef.address)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
// create UUID -> Address registry
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
// create UUID -> Node registry
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorLocationsPathFor(uuid)))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
// create ADDRESS -> UUIDs registry
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address)))
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid)))
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
import RemoteClusterDaemon._
|
2011-04-27 01:10:00 +02:00
|
|
|
val command = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(USE)
|
|
|
|
|
.setActorUuid(uuidToUuidProtocol(uuid))
|
|
|
|
|
.build
|
2011-05-25 16:18:35 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
replicaConnectionsForReplicationFactor(replicationFactor) foreach { connection ⇒
|
2011-06-13 15:29:35 +02:00
|
|
|
(connection ? (command, remoteDaemonAckTimeout)).as[Status] match {
|
2011-05-25 16:18:35 +02:00
|
|
|
|
|
|
|
|
case Some(Success) ⇒
|
2011-06-07 11:10:29 -07:00
|
|
|
EventHandler.debug(this, "Replica for [%s] successfully created".format(actorRef.address))
|
2011-05-25 16:18:35 +02:00
|
|
|
|
|
|
|
|
case Some(Failure(cause)) ⇒
|
|
|
|
|
EventHandler.error(cause, this, cause.toString)
|
|
|
|
|
throw cause
|
|
|
|
|
|
|
|
|
|
case None ⇒
|
|
|
|
|
val error = new ClusterException(
|
2011-06-14 14:26:13 +02:00
|
|
|
"Operation to instantiate replicas throughout the cluster timed out")
|
2011-05-25 16:18:35 +02:00
|
|
|
EventHandler.error(error, this, error.toString)
|
|
|
|
|
throw error
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this
|
|
|
|
|
} else throw new ClusterException("Not connected to cluster")
|
|
|
|
|
|
|
|
|
|
/**
|
2011-04-29 15:47:56 +02:00
|
|
|
* Removes actor with uuid from the cluster.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def remove(uuid: UUID) {
|
2011-04-29 15:47:56 +02:00
|
|
|
releaseActorOnAllNodes(uuid)
|
|
|
|
|
|
|
|
|
|
locallyCheckedOutActors.remove(uuid)
|
2011-06-07 11:10:29 -07:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
// warning: ordering matters here
|
2011-06-17 10:25:02 +02:00
|
|
|
// FIXME remove ADDRESS to UUID mapping?
|
|
|
|
|
actorAddressForUuid(uuid) foreach (address ⇒ ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(address))))
|
2011-04-29 15:47:56 +02:00
|
|
|
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid)))
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid)))
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-04-29 15:47:56 +02:00
|
|
|
* Removes actor with address from the cluster.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def remove(address: String): ClusterNode = {
|
2011-04-27 01:10:00 +02:00
|
|
|
isConnected ifOn {
|
2011-04-29 15:47:56 +02:00
|
|
|
EventHandler.debug(this,
|
2011-05-25 16:18:35 +02:00
|
|
|
"Removing actor(s) with address [%s] from cluster".format(address))
|
2011-05-18 17:25:30 +02:00
|
|
|
uuidsForActorAddress(address) foreach (uuid ⇒ remove(uuid))
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
this
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Is the actor with uuid clustered or not?
|
|
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
|
|
|
|
zkClient.exists(actorRegistryPathFor(uuid))
|
2011-04-27 01:10:00 +02:00
|
|
|
} exists (_ == true)
|
|
|
|
|
} else false
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Is the actor with uuid in use on 'this' node or not?
|
|
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def isInUseOnNode(actorAddress: String): Boolean = isInUseOnNode(actorAddress, nodeAddress)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Is the actor with uuid in use or not?
|
|
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
|
|
|
|
zkClient.exists(actorLocationsPathFor(uuid, node))
|
2011-04-27 01:10:00 +02:00
|
|
|
} exists (_ == true)
|
|
|
|
|
} else false
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
|
|
|
|
* for remote access through lookup by its UUID.
|
|
|
|
|
*/
|
2011-06-07 11:10:29 -07:00
|
|
|
def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, formatForActor(actorAddress))
|
2011-05-20 17:13:39 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
|
|
|
|
|
* for remote access through lookup by its UUID.
|
|
|
|
|
*/
|
2011-06-07 11:10:29 -07:00
|
|
|
def use[T <: Actor](actorAddress: String, format: Serializer): Option[ActorRef] = if (isConnected.isOn) {
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
import akka.serialization.ActorSerialization._
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true))
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress)))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
// set home address
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteServerAddress)))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val actorPath = actorRegistryPathFor(uuid)
|
|
|
|
|
zkClient.retryUntilConnected(new Callable[Either[Array[Byte], Exception]]() {
|
|
|
|
|
def call: Either[Array[Byte], Exception] = {
|
|
|
|
|
try {
|
|
|
|
|
Left(if (shouldCompressData) LZF.uncompress(zkClient.connection.readData(actorPath, new Stat, false))
|
|
|
|
|
else zkClient.connection.readData(actorPath, new Stat, false))
|
|
|
|
|
} catch {
|
2011-06-17 10:25:02 +02:00
|
|
|
case e: KeeperException.NoNodeException ⇒ Right(e)
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}) match {
|
|
|
|
|
case Left(bytes) ⇒
|
|
|
|
|
locallyCheckedOutActors += (uuid -> bytes)
|
2011-06-14 19:35:18 +02:00
|
|
|
val actor = fromBinary[T](bytes, remoteServerAddress)
|
2011-06-07 11:10:29 -07:00
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Checking out actor [%s] to be used on node [%s] as local actor"
|
|
|
|
|
.format(actor, nodeAddress.nodeName))
|
2011-05-18 17:25:30 +02:00
|
|
|
actor.start()
|
2011-06-07 11:10:29 -07:00
|
|
|
actor
|
2011-05-18 17:25:30 +02:00
|
|
|
case Right(exception) ⇒ throw exception
|
|
|
|
|
}
|
2011-05-30 10:53:25 +02:00
|
|
|
} headOption // FIXME should not be an array at all coming here but an Option[ActorRef]
|
2011-05-23 22:35:01 +02:00
|
|
|
} else None
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Using (checking out) all actors with a specific UUID on all nodes in the cluster.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def useActorOnAllNodes(uuid: UUID) {
|
|
|
|
|
isConnected ifOn {
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
|
2011-06-07 20:10:08 -07:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
val command = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(USE)
|
|
|
|
|
.setActorUuid(uuidToUuidProtocol(uuid))
|
|
|
|
|
.build
|
2011-06-07 20:10:08 -07:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
membershipNodes foreach { node ⇒
|
|
|
|
|
replicaConnections.get(node) foreach {
|
2011-06-07 20:10:08 -07:00
|
|
|
case (_, connection) ⇒ connection ! command
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Using (checking out) specific UUID on a specefic node.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def useActorOnNode(node: String, uuid: UUID) {
|
|
|
|
|
isConnected ifOn {
|
|
|
|
|
replicaConnections.get(node) foreach {
|
2011-05-18 17:25:30 +02:00
|
|
|
case (_, connection) ⇒
|
2011-05-18 08:37:58 +02:00
|
|
|
connection ! RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(USE)
|
|
|
|
|
.setActorUuid(uuidToUuidProtocol(uuid))
|
|
|
|
|
.build
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Checks in an actor after done using it on this node.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def release(actorAddress: String) {
|
|
|
|
|
isConnected ifOn {
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForActorAddress(actorAddress) foreach { uuid ⇒
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Releasing actor with UUID [%s] after usage".format(uuid))
|
|
|
|
|
locallyCheckedOutActors.remove(uuid)
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, nodeAddress)))
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.delete(actorRegistryNodePathFor(uuid, remoteServerAddress)))
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Releases (checking in) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use'.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def releaseActorOnAllNodes(uuid: UUID) {
|
|
|
|
|
isConnected ifOn {
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Releasing (checking in) all actors with UUID [%s] on all nodes in cluster".format(uuid))
|
|
|
|
|
val command = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(RELEASE)
|
|
|
|
|
.setActorUuid(uuidToUuidProtocol(uuid))
|
|
|
|
|
.build
|
2011-05-18 17:25:30 +02:00
|
|
|
nodesForActorsInUseWithUuid(uuid) foreach { node ⇒
|
|
|
|
|
replicaConnections.get(node) foreach {
|
|
|
|
|
case (_, connection) ⇒
|
|
|
|
|
connection ! command
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Creates an ActorRef with a Router to a set of clustered actors.
|
|
|
|
|
*/
|
2011-05-16 09:47:23 +02:00
|
|
|
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
|
2011-04-27 01:10:00 +02:00
|
|
|
val addresses = addressesForActor(actorAddress)
|
|
|
|
|
EventHandler.debug(this,
|
2011-06-07 11:10:29 -07:00
|
|
|
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
|
|
|
|
|
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
|
|
|
|
|
addresses foreach { case (_, address) ⇒ clusterActorRefs.put(address, actorRef) }
|
|
|
|
|
actorRef.start()
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
} else throw new ClusterException("Not connected to cluster")
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Migrate the actor from 'this' node to node 'to'.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def migrate(to: NodeAddress, actorAddress: String) {
|
|
|
|
|
migrate(nodeAddress, to, actorAddress)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Migrate the actor from node 'from' to node 'to'.
|
|
|
|
|
*/
|
|
|
|
|
def migrate(
|
2011-05-18 17:25:30 +02:00
|
|
|
from: NodeAddress, to: NodeAddress, actorAddress: String) {
|
2011-05-18 08:37:58 +02:00
|
|
|
isConnected ifOn {
|
|
|
|
|
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
|
|
|
|
|
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
|
|
|
|
|
if (isInUseOnNode(actorAddress, from)) {
|
|
|
|
|
migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress)
|
|
|
|
|
} else {
|
|
|
|
|
throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node")
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the UUIDs of all actors checked out on this node.
|
|
|
|
|
*/
|
|
|
|
|
def uuidsForActorsInUse: Array[UUID] = uuidsForActorsInUseOnNode(nodeAddress.nodeName)
|
|
|
|
|
|
|
|
|
|
/**
|
2011-04-29 15:47:56 +02:00
|
|
|
* Returns the addresses of all actors checked out on this node.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def addressesForActorsInUse: Array[String] = actorAddressForUuids(uuidsForActorsInUse)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the UUIDs of all actors registered in this cluster.
|
|
|
|
|
*/
|
|
|
|
|
def uuidsForClusteredActors: Array[UUID] = if (isConnected.isOn) {
|
|
|
|
|
zkClient.getChildren(ACTOR_REGISTRY_NODE).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
|
|
|
|
|
} else Array.empty[UUID]
|
|
|
|
|
|
|
|
|
|
/**
|
2011-04-29 15:47:56 +02:00
|
|
|
* Returns the addresses of all actors registered in this cluster.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def addressesForClusteredActors: Array[String] = actorAddressForUuids(uuidsForClusteredActors)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the actor id for the actor with a specific UUID.
|
|
|
|
|
*/
|
2011-06-17 10:25:02 +02:00
|
|
|
def actorAddressForUuid(uuid: UUID): Option[String] = if (isConnected.isOn) {
|
2011-05-18 12:25:27 +02:00
|
|
|
try {
|
2011-06-17 10:25:02 +02:00
|
|
|
Some(zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String])
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
2011-06-17 10:25:02 +02:00
|
|
|
case e: ZkNoNodeException ⇒ None
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-06-17 10:25:02 +02:00
|
|
|
} else None
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the actor ids for all the actors with a specific UUID.
|
|
|
|
|
*/
|
2011-06-17 10:25:02 +02:00
|
|
|
def actorAddressForUuids(uuids: Array[UUID]): Array[String] =
|
|
|
|
|
uuids map (actorAddressForUuid(_)) filter (_.isDefined) map (_.get)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the actor UUIDs for actor ID.
|
|
|
|
|
*/
|
2011-04-28 20:12:37 +02:00
|
|
|
def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
|
2011-05-18 12:25:27 +02:00
|
|
|
try {
|
2011-05-21 17:30:16 +02:00
|
|
|
zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toArray map {
|
|
|
|
|
case c: CharSequence ⇒ new UUID(c)
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ Array[UUID]()
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
} else Array.empty[UUID]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the node names of all actors in use with UUID.
|
|
|
|
|
*/
|
|
|
|
|
def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] = if (isConnected.isOn) {
|
2011-05-18 12:25:27 +02:00
|
|
|
try {
|
2011-05-21 17:30:16 +02:00
|
|
|
zkClient.getChildren(actorLocationsPathFor(uuid)).toArray.asInstanceOf[Array[String]]
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ Array[String]()
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
} else Array.empty[String]
|
|
|
|
|
|
|
|
|
|
/**
|
2011-04-29 15:47:56 +02:00
|
|
|
* Returns the node names of all actors in use with address.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def nodesForActorsInUseWithAddress(address: String): Array[String] = if (isConnected.isOn) {
|
2011-04-27 01:10:00 +02:00
|
|
|
flatten {
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForActorAddress(address) map { uuid ⇒
|
|
|
|
|
try {
|
2011-05-21 17:30:16 +02:00
|
|
|
zkClient.getChildren(actorLocationsPathFor(uuid)).toArray.asInstanceOf[Array[String]]
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ Array[String]()
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
} else Array.empty[String]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the UUIDs of all actors in use registered on a specific node.
|
|
|
|
|
*/
|
|
|
|
|
def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
|
2011-05-18 12:25:27 +02:00
|
|
|
try {
|
2011-05-21 17:30:16 +02:00
|
|
|
zkClient.getChildren(actorsAtNodePathFor(nodeName)).toArray map {
|
|
|
|
|
case c: CharSequence ⇒ new UUID(c)
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ Array[UUID]()
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
} else Array.empty[UUID]
|
|
|
|
|
|
|
|
|
|
/**
|
2011-04-29 15:47:56 +02:00
|
|
|
* Returns the addresses of all actors in use registered on a specific node.
|
2011-04-27 01:10:00 +02:00
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
|
2011-04-27 01:10:00 +02:00
|
|
|
val uuids =
|
2011-05-18 12:25:27 +02:00
|
|
|
try {
|
2011-05-21 17:30:16 +02:00
|
|
|
zkClient.getChildren(actorsAtNodePathFor(nodeName)).toArray map {
|
|
|
|
|
case c: CharSequence ⇒ new UUID(c)
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ Array[UUID]()
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-29 15:47:56 +02:00
|
|
|
actorAddressForUuids(uuids)
|
2011-04-27 01:10:00 +02:00
|
|
|
} else Array.empty[String]
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns Format for actor with UUID.
|
|
|
|
|
*/
|
2011-05-20 09:08:11 +02:00
|
|
|
def formatForActor(actorAddress: String): Serializer = {
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
val formats = actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
2011-06-17 10:25:02 +02:00
|
|
|
try {
|
|
|
|
|
Some(zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer])
|
|
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ None
|
|
|
|
|
}
|
|
|
|
|
} filter (_.isDefined) map (_.get)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-20 09:08:11 +02:00
|
|
|
if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress))
|
2011-05-21 16:55:32 +02:00
|
|
|
if (formats.forall(_ == formats.head) == false) throw new IllegalStateException("Multiple Serializer classes found for [%s]".format(actorAddress))
|
|
|
|
|
|
|
|
|
|
formats.head
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns home address for actor with UUID.
|
|
|
|
|
*/
|
2011-04-29 15:47:56 +02:00
|
|
|
def addressesForActor(actorAddress: String): Array[(UUID, InetSocketAddress)] = {
|
2011-04-27 01:10:00 +02:00
|
|
|
try {
|
|
|
|
|
for {
|
2011-05-18 17:25:30 +02:00
|
|
|
uuid ← actorUuidsForActorAddress(actorAddress)
|
|
|
|
|
address ← zkClient.getChildren(actorRegistryNodePathFor(uuid)).toList
|
2011-04-27 01:10:00 +02:00
|
|
|
} yield {
|
|
|
|
|
val tokenizer = new java.util.StringTokenizer(address, ":")
|
2011-05-18 12:25:27 +02:00
|
|
|
val hostname = tokenizer.nextToken // hostname
|
|
|
|
|
val port = tokenizer.nextToken.toInt // port
|
2011-04-27 01:10:00 +02:00
|
|
|
(uuid, new InetSocketAddress(hostname, port))
|
|
|
|
|
}
|
|
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: ZkNoNodeException ⇒ Array[(UUID, InetSocketAddress)]()
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Compute Grid
|
|
|
|
|
// =======================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send a function 'Function0[Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def send(f: Function0[Unit], replicationFactor: Int) {
|
2011-06-14 19:35:18 +02:00
|
|
|
Serialization.serialize(f) match {
|
|
|
|
|
case Left(error) ⇒ throw error
|
|
|
|
|
case Right(bytes) ⇒
|
|
|
|
|
val message = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(FUNCTION_FUN0_UNIT)
|
|
|
|
|
.setPayload(ByteString.copyFrom(bytes))
|
|
|
|
|
.build
|
|
|
|
|
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send a function 'Function0[Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument).
|
|
|
|
|
* Returns an 'Array' with all the 'Future's from the computation.
|
|
|
|
|
*/
|
|
|
|
|
def send(f: Function0[Any], replicationFactor: Int): List[Future[Any]] = {
|
2011-06-14 19:35:18 +02:00
|
|
|
Serialization.serialize(f) match {
|
|
|
|
|
case Left(error) ⇒ throw error
|
|
|
|
|
case Right(bytes) ⇒
|
|
|
|
|
val message = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(FUNCTION_FUN0_ANY)
|
|
|
|
|
.setPayload(ByteString.copyFrom(bytes))
|
|
|
|
|
.build
|
|
|
|
|
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
|
|
|
|
|
results.toList.asInstanceOf[List[Future[Any]]]
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send a function 'Function1[Any, Unit]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
|
|
|
|
|
* with the argument speficied.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def send(f: Function1[Any, Unit], arg: Any, replicationFactor: Int) {
|
2011-06-14 19:35:18 +02:00
|
|
|
Serialization.serialize((f, arg)) match {
|
|
|
|
|
case Left(error) ⇒ throw error
|
|
|
|
|
case Right(bytes) ⇒
|
|
|
|
|
val message = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(FUNCTION_FUN1_ARG_UNIT)
|
|
|
|
|
.setPayload(ByteString.copyFrom(bytes))
|
|
|
|
|
.build
|
|
|
|
|
replicaConnectionsForReplicationFactor(replicationFactor) foreach (_ ! message)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Send a function 'Function1[Any, Any]' to be invoked on a random number of nodes (defined by 'replicationFactor' argument)
|
|
|
|
|
* with the argument speficied.
|
|
|
|
|
* Returns an 'Array' with all the 'Future's from the computation.
|
|
|
|
|
*/
|
|
|
|
|
def send(f: Function1[Any, Any], arg: Any, replicationFactor: Int): List[Future[Any]] = {
|
2011-06-14 19:35:18 +02:00
|
|
|
Serialization.serialize((f, arg)) match {
|
|
|
|
|
case Left(error) ⇒ throw error
|
|
|
|
|
case Right(bytes) ⇒
|
|
|
|
|
val message = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(FUNCTION_FUN1_ARG_ANY)
|
|
|
|
|
.setPayload(ByteString.copyFrom(bytes))
|
|
|
|
|
.build
|
|
|
|
|
val results = replicaConnectionsForReplicationFactor(replicationFactor) map (_ ? message)
|
|
|
|
|
results.toList.asInstanceOf[List[Future[Any]]]
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Config
|
|
|
|
|
// =======================================
|
|
|
|
|
|
|
|
|
|
def setConfigElement(key: String, bytes: Array[Byte]) {
|
|
|
|
|
val compressedBytes = if (shouldCompressData) LZF.compress(bytes) else bytes
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Adding config value [%s] under key [%s] in cluster registry".format(key, compressedBytes))
|
|
|
|
|
zkClient.retryUntilConnected(new Callable[Either[Unit, Exception]]() {
|
2011-05-18 12:25:27 +02:00
|
|
|
def call: Either[Unit, Exception] = {
|
|
|
|
|
try {
|
|
|
|
|
Left(zkClient.connection.create(configurationPathFor(key), compressedBytes, CreateMode.PERSISTENT))
|
|
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: KeeperException.NodeExistsException ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
try {
|
2011-04-29 15:47:56 +02:00
|
|
|
Left(zkClient.connection.writeData(configurationPathFor(key), compressedBytes))
|
2011-05-18 12:25:27 +02:00
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: Exception ⇒ Right(e)
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-05-18 12:25:27 +02:00
|
|
|
}) match {
|
2011-05-21 16:55:32 +02:00
|
|
|
case Left(_) ⇒ /* do nothing */
|
2011-05-18 17:25:30 +02:00
|
|
|
case Right(exception) ⇒ throw exception
|
2011-05-18 12:25:27 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns the config element for the key or NULL if no element exists under the key.
|
|
|
|
|
*/
|
2011-06-17 10:25:02 +02:00
|
|
|
def getConfigElement(key: String): Option[Array[Byte]] = try {
|
|
|
|
|
Some(zkClient.connection.readData(configurationPathFor(key), new Stat, true))
|
2011-04-27 01:10:00 +02:00
|
|
|
} catch {
|
2011-06-17 10:25:02 +02:00
|
|
|
case e: KeeperException.NoNodeException ⇒ None
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
def removeConfigElement(key: String) {
|
|
|
|
|
ignore[ZkNoNodeException] {
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Removing config element with key [%s] from cluster registry".format(key))
|
|
|
|
|
zkClient.deleteRecursive(configurationPathFor(key))
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]]
|
|
|
|
|
|
|
|
|
|
// =======================================
|
|
|
|
|
// Private
|
|
|
|
|
// =======================================
|
|
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_NODE, node)
|
|
|
|
|
|
|
|
|
|
private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_NODE, key)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
private[cluster] def actorAddressToUuidsPathFor(actorAddress: String) = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_NODE, actorAddress.replace('.', '_'))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
private[cluster] def actorLocationsPathFor(uuid: UUID) = "%s/%s".format(ACTOR_LOCATIONS_NODE, uuid)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
private[cluster] def actorLocationsPathFor(uuid: UUID, node: NodeAddress) =
|
2011-04-28 20:12:37 +02:00
|
|
|
"%s/%s/%s".format(ACTOR_LOCATIONS_NODE, uuid, node.nodeName)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
private[cluster] def actorsAtNodePathFor(node: String) = "%s/%s".format(ACTORS_AT_NODE_NODE, node)
|
|
|
|
|
|
|
|
|
|
private[cluster] def actorAtNodePathFor(node: String, uuid: UUID) = "%s/%s/%s".format(ACTORS_AT_NODE_NODE, node, uuid)
|
|
|
|
|
|
|
|
|
|
private[cluster] def actorRegistryPathFor(uuid: UUID) = "%s/%s".format(ACTOR_REGISTRY_NODE, uuid)
|
|
|
|
|
|
|
|
|
|
private[cluster] def actorRegistryFormatPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "format")
|
|
|
|
|
|
|
|
|
|
private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "address")
|
|
|
|
|
|
|
|
|
|
private[cluster] def actorRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorRegistryPathFor(uuid), "node")
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-04-29 15:47:56 +02:00
|
|
|
private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String =
|
|
|
|
|
"%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
private[cluster] def initializeNode() {
|
2011-05-23 22:35:01 +02:00
|
|
|
EventHandler.info(this,
|
|
|
|
|
("\nCreating cluster node with" +
|
|
|
|
|
"\n\tcluster name = [%s]" +
|
|
|
|
|
"\n\tnode name = [%s]" +
|
|
|
|
|
"\n\tport = [%s]" +
|
|
|
|
|
"\n\tzookeeper server addresses = [%s]" +
|
|
|
|
|
"\n\tserializer = [%s]")
|
|
|
|
|
.format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
|
2011-05-20 14:36:26 +02:00
|
|
|
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
|
2011-05-18 08:37:58 +02:00
|
|
|
createRootClusterNode()
|
2011-05-30 10:53:25 +02:00
|
|
|
val isLeader = joinLeaderElection()
|
2011-05-18 08:37:58 +02:00
|
|
|
if (isLeader) createNodeStructureIfNeeded()
|
2011-05-30 10:53:25 +02:00
|
|
|
registerListeners()
|
2011-05-18 08:37:58 +02:00
|
|
|
joinMembershipNode()
|
|
|
|
|
joinActorsAtAddressNode()
|
|
|
|
|
fetchMembershipChildrenNodes()
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.info(this, "Cluster node [%s] started successfully".format(nodeAddress))
|
|
|
|
|
}
|
|
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
private[cluster] def addressForNode(node: String): Option[InetSocketAddress] = {
|
|
|
|
|
try {
|
|
|
|
|
val address = zkClient.readData(membershipPathFor(node)).asInstanceOf[String]
|
|
|
|
|
val tokenizer = new java.util.StringTokenizer(address, ":")
|
|
|
|
|
tokenizer.nextToken // cluster name
|
|
|
|
|
tokenizer.nextToken // node name
|
|
|
|
|
val hostname = tokenizer.nextToken // hostname
|
|
|
|
|
val port = tokenizer.nextToken.toInt // port
|
|
|
|
|
Some(new InetSocketAddress(hostname, port))
|
|
|
|
|
} catch {
|
|
|
|
|
case e: ZkNoNodeException ⇒ None
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] =
|
|
|
|
|
uuidsForActorAddress(actorAddress) filter (_ ne null)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a random set with replica connections of size 'replicationFactor'.
|
|
|
|
|
* Default replicationFactor is 0, which returns the empty set.
|
|
|
|
|
*/
|
|
|
|
|
private def replicaConnectionsForReplicationFactor(replicationFactor: Int = 0): Set[ActorRef] = {
|
|
|
|
|
var replicas = HashSet.empty[ActorRef]
|
|
|
|
|
if (replicationFactor < 1) return replicas
|
|
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
connectToAllMembershipNodesInCluster()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
val numberOfReplicas = replicaConnections.size
|
2011-05-18 12:25:27 +02:00
|
|
|
val replicaConnectionsAsArray = replicaConnections.toList map {
|
2011-05-18 17:25:30 +02:00
|
|
|
case (node, (address, actorRef)) ⇒ actorRef
|
2011-05-18 12:25:27 +02:00
|
|
|
} // the ActorRefs
|
2011-04-27 01:10:00 +02:00
|
|
|
|
|
|
|
|
if (numberOfReplicas < replicationFactor) {
|
|
|
|
|
throw new IllegalArgumentException(
|
2011-05-30 10:53:25 +02:00
|
|
|
"Replication factor [" + replicationFactor +
|
|
|
|
|
"] is greater than the number of available nodes [" + numberOfReplicas + "]")
|
2011-04-27 01:10:00 +02:00
|
|
|
} else if (numberOfReplicas == replicationFactor) {
|
|
|
|
|
replicas = replicas ++ replicaConnectionsAsArray
|
|
|
|
|
} else {
|
|
|
|
|
val random = new java.util.Random(System.currentTimeMillis)
|
|
|
|
|
while (replicas.size < replicationFactor) {
|
|
|
|
|
val index = random.nextInt(numberOfReplicas)
|
|
|
|
|
replicas = replicas + replicaConnectionsAsArray(index)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
replicas
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Connect to all available replicas unless already connected).
|
|
|
|
|
*/
|
2011-05-25 16:18:35 +02:00
|
|
|
private def connectToAllMembershipNodesInCluster() {
|
2011-05-18 17:25:30 +02:00
|
|
|
membershipNodes foreach { node ⇒
|
2011-05-25 17:41:53 +02:00
|
|
|
if ((node != Config.nodename)) { // no replica on the "home" node of the ref
|
|
|
|
|
if (!replicaConnections.contains(node)) { // only connect to each replica once
|
2011-06-17 10:25:02 +02:00
|
|
|
val addressOption = addressForNode(node)
|
|
|
|
|
if (addressOption.isDefined) {
|
|
|
|
|
val address = addressOption.get
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Connecting to replica with nodename [%s] and address [%s]".format(node, address))
|
|
|
|
|
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort)
|
|
|
|
|
replicaConnections.put(node, (address, clusterDaemon))
|
|
|
|
|
}
|
2011-05-25 17:41:53 +02:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
private[cluster] def joinMembershipNode() {
|
2011-04-27 01:10:00 +02:00
|
|
|
nodeNameToAddress.put(nodeAddress.nodeName, remoteServerAddress)
|
|
|
|
|
try {
|
|
|
|
|
EventHandler.info(this,
|
|
|
|
|
"Joining cluster as membership node [%s] on [%s]".format(nodeAddress, membershipNodePath))
|
|
|
|
|
zkClient.createEphemeral(membershipNodePath, nodeAddress.toString)
|
|
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: ZkNodeExistsException ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
val error = new ClusterException("Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in by another node")
|
2011-05-30 10:53:25 +02:00
|
|
|
EventHandler.error(error, this, error.toString)
|
2011-04-27 01:10:00 +02:00
|
|
|
throw error
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
private[cluster] def joinActorsAtAddressNode() {
|
2011-04-29 15:47:56 +02:00
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-30 10:53:25 +02:00
|
|
|
private[cluster] def joinLeaderElection(): Boolean = {
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName))
|
|
|
|
|
leaderLock.lock
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[cluster] def failOverConnections(from: InetSocketAddress, to: InetSocketAddress) {
|
|
|
|
|
clusterActorRefs.values(from) foreach (_.failOver(from, to))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String]) = {
|
2011-05-18 17:25:30 +02:00
|
|
|
findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName ⇒
|
|
|
|
|
|
|
|
|
|
val allNodes = locallyCachedMembershipNodes.toList
|
|
|
|
|
val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
|
|
|
|
|
val failedNodeIndex = allNodes.indexWhere(_ == failedNodeName)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
// Migrate to the successor of the failed node (using a sorted circular list of the node names)
|
|
|
|
|
if ((failedNodeIndex == 0 && myIndex == locallyCachedMembershipNodes.size - 1) || // No leftmost successor exists, check the tail
|
|
|
|
|
(failedNodeIndex == myIndex + 1)) {
|
|
|
|
|
// Am I the leftmost successor?
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
// Yes I am the node to migrate the actor to (can only be one in the cluster)
|
|
|
|
|
val actorUuidsForFailedNode = zkClient.getChildren(actorsAtNodePathFor(failedNodeName))
|
|
|
|
|
EventHandler.debug(this,
|
|
|
|
|
"Migrating actors from failed node [%s] to node [%s]: Actor UUIDs [%s]"
|
|
|
|
|
.format(failedNodeName, nodeAddress.nodeName, actorUuidsForFailedNode))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForFailedNode.foreach { uuid ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.debug(this,
|
2011-05-18 17:25:30 +02:00
|
|
|
"Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
|
|
|
|
|
.format(failedNodeName, uuid, nodeAddress.nodeName))
|
|
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
val actorAddressOption = actorAddressForUuid(uuidFrom(uuid))
|
|
|
|
|
if (actorAddressOption.isDefined) {
|
|
|
|
|
val actorAddress = actorAddressOption.get
|
|
|
|
|
migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check
|
|
|
|
|
NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
|
|
|
|
|
|
|
|
|
|
implicit val format: Serializer = formatForActor(actorAddress)
|
|
|
|
|
use(actorAddress) foreach { actor ⇒
|
|
|
|
|
// FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
|
|
|
|
|
//actor.homeAddress = remoteServerAddress
|
|
|
|
|
val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress")
|
|
|
|
|
homeAddress.setAccessible(true)
|
|
|
|
|
homeAddress.set(actor, Some(remoteServerAddress))
|
|
|
|
|
|
|
|
|
|
remoteService.register(actorAddress, actor)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
|
|
|
|
|
val from = nodeNameToAddress.get(failedNodeName)
|
|
|
|
|
val to = remoteServerAddress
|
2011-06-14 19:35:18 +02:00
|
|
|
Serialization.serialize((from, to)) match {
|
|
|
|
|
case Left(error) ⇒ throw error
|
|
|
|
|
case Right(bytes) ⇒
|
|
|
|
|
val command = RemoteDaemonMessageProtocol.newBuilder
|
|
|
|
|
.setMessageType(FAIL_OVER_CONNECTIONS)
|
|
|
|
|
.setPayload(ByteString.copyFrom(bytes))
|
|
|
|
|
.build
|
|
|
|
|
membershipNodes foreach { node ⇒
|
|
|
|
|
replicaConnections.get(node) foreach {
|
|
|
|
|
case (_, connection) ⇒
|
|
|
|
|
connection ! command
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Used when the ephemeral "home" node is already gone, so we can't check.
|
|
|
|
|
*/
|
|
|
|
|
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
|
2011-05-18 17:25:30 +02:00
|
|
|
from: NodeAddress, to: NodeAddress, actorAddress: String) {
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
actorUuidsForActorAddress(actorAddress) map { uuid ⇒
|
2011-06-17 10:25:02 +02:00
|
|
|
val actorAddressOption = actorAddressForUuid(uuid)
|
|
|
|
|
if (actorAddressOption.isDefined) {
|
|
|
|
|
val actorAddress = actorAddressOption.get
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
if (!isInUseOnNode(actorAddress, to)) {
|
|
|
|
|
release(actorAddress)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
val newAddress = new InetSocketAddress(to.hostname, to.port)
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress)))
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to)))
|
|
|
|
|
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from)))
|
|
|
|
|
ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid)))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
// 'use' (check out) actor on the remote 'to' node
|
|
|
|
|
useActorOnNode(to.nodeName, uuid)
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private[cluster] def findFailedNodes(nodes: List[String]): List[String] =
|
|
|
|
|
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
|
|
|
|
|
|
|
|
|
|
private[cluster] def findNewlyConnectedMembershipNodes(nodes: List[String]): List[String] =
|
|
|
|
|
(Set(nodes: _*) diff locallyCachedMembershipNodes).toList
|
|
|
|
|
|
|
|
|
|
private[cluster] def findNewlyDisconnectedMembershipNodes(nodes: List[String]): List[String] =
|
|
|
|
|
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
|
|
|
|
|
|
|
|
|
|
private[cluster] def findNewlyConnectedAvailableNodes(nodes: List[String]): List[String] =
|
|
|
|
|
(Set(nodes: _*) diff locallyCachedMembershipNodes).toList
|
|
|
|
|
|
|
|
|
|
private[cluster] def findNewlyDisconnectedAvailableNodes(nodes: List[String]): List[String] =
|
|
|
|
|
(locallyCachedMembershipNodes diff Set(nodes: _*)).toList
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
private def createRootClusterNode() {
|
|
|
|
|
ignore[ZkNodeExistsException] {
|
2011-04-27 01:10:00 +02:00
|
|
|
zkClient.create(CLUSTER_NODE, null, CreateMode.PERSISTENT)
|
|
|
|
|
EventHandler.info(this, "Created node [%s]".format(CLUSTER_NODE))
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
private def createNodeStructureIfNeeded() {
|
2011-05-18 17:25:30 +02:00
|
|
|
baseNodes.foreach { path ⇒
|
|
|
|
|
try {
|
2011-06-17 10:25:02 +02:00
|
|
|
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
|
2011-05-18 17:25:30 +02:00
|
|
|
EventHandler.debug(this, "Created node [%s]".format(path))
|
|
|
|
|
} catch {
|
|
|
|
|
case e ⇒
|
|
|
|
|
val error = new ClusterException(e.toString)
|
2011-05-20 09:08:11 +02:00
|
|
|
EventHandler.error(error, this)
|
2011-05-18 17:25:30 +02:00
|
|
|
throw error
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-30 10:53:25 +02:00
|
|
|
private def registerListeners() = {
|
2011-04-27 01:10:00 +02:00
|
|
|
zkClient.subscribeStateChanges(stateListener)
|
|
|
|
|
zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
|
|
|
|
|
}
|
|
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
private def unregisterListeners() = {
|
|
|
|
|
zkClient.unsubscribeStateChanges(stateListener)
|
|
|
|
|
zkClient.unsubscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
private def fetchMembershipChildrenNodes() {
|
2011-04-27 01:10:00 +02:00
|
|
|
val membershipChildren = zkClient.getChildren(MEMBERSHIP_NODE)
|
2011-05-18 08:37:58 +02:00
|
|
|
locallyCachedMembershipNodes.clear()
|
2011-04-27 01:10:00 +02:00
|
|
|
membershipChildren.iterator.foreach(locallyCachedMembershipNodes.add)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def createMBean = {
|
|
|
|
|
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
import Cluster._
|
|
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
override def start(): Unit = self.start()
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
override def stop(): Unit = self.shutdown()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
override def disconnect() = self.disconnect()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
override def reconnect(): Unit = self.reconnect()
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
override def resign(): Unit = self.resign()
|
2011-05-18 12:25:27 +02:00
|
|
|
|
|
|
|
|
override def isConnected = self.isConnected.isOn
|
|
|
|
|
|
|
|
|
|
override def getRemoteServerHostname = self.nodeAddress.hostname
|
|
|
|
|
|
|
|
|
|
override def getRemoteServerPort = self.nodeAddress.port
|
|
|
|
|
|
|
|
|
|
override def getNodeName = self.nodeAddress.nodeName
|
|
|
|
|
|
|
|
|
|
override def getClusterName = self.nodeAddress.clusterName
|
|
|
|
|
|
|
|
|
|
override def getZooKeeperServerAddresses = self.zkServerAddresses
|
|
|
|
|
|
|
|
|
|
override def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray
|
|
|
|
|
|
|
|
|
|
override def getLeader = self.leader.toString
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid))
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
override def getNodesForActorInUseWithAddress(id: String) = self.nodesForActorsInUseWithAddress(id)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 12:25:27 +02:00
|
|
|
override def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray
|
|
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
override def setConfigElement(key: String, value: String): Unit = self.setConfigElement(key, value.getBytes("UTF-8"))
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-06-17 10:25:02 +02:00
|
|
|
override def getConfigElement(key: String) = new String(self.getConfigElement(key).getOrElse(Array[Byte]()), "UTF-8")
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-05-21 16:55:32 +02:00
|
|
|
override def removeConfigElement(key: String): Unit = self.removeConfigElement(key)
|
2011-05-18 12:25:27 +02:00
|
|
|
|
|
|
|
|
override def getConfigElementKeys = self.getConfigElementKeys.toArray
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
JMX.register(clusterJmxObjectName, clusterMBean)
|
2011-04-28 20:12:37 +02:00
|
|
|
|
|
|
|
|
// FIXME need monitoring to lookup the cluster MBean dynamically
|
|
|
|
|
// Monitoring.registerLocalMBean(clusterJmxObjectName, clusterMBean)
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class MembershipChildListener(self: ClusterNode) extends IZkChildListener with ErrorHandler {
|
2011-05-18 08:37:58 +02:00
|
|
|
def handleChildChange(parentPath: String, currentChilds: JList[String]) {
|
|
|
|
|
withErrorHandler {
|
|
|
|
|
if (currentChilds ne null) {
|
|
|
|
|
val childList = currentChilds.toList
|
|
|
|
|
if (!childList.isEmpty) EventHandler.debug(this,
|
|
|
|
|
"MembershipChildListener at [%s] has children [%s]"
|
|
|
|
|
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
|
2011-05-18 17:25:30 +02:00
|
|
|
self.findNewlyConnectedMembershipNodes(childList) foreach { name ⇒
|
|
|
|
|
self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(NodeConnected(name))
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
self.findNewlyDisconnectedMembershipNodes(childList) foreach { name ⇒
|
|
|
|
|
self.nodeNameToAddress.remove(name) // update 'nodename-address' map
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(NodeDisconnected(name))
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 08:37:58 +02:00
|
|
|
self.locallyCachedMembershipNodes.clear()
|
|
|
|
|
childList.foreach(self.locallyCachedMembershipNodes.add)
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class StateListener(self: ClusterNode) extends IZkStateListener {
|
2011-05-18 08:37:58 +02:00
|
|
|
def handleStateChanged(state: KeeperState) {
|
|
|
|
|
state match {
|
2011-05-18 17:25:30 +02:00
|
|
|
case KeeperState.SyncConnected ⇒
|
2011-05-18 08:37:58 +02:00
|
|
|
EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress))
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(ThisNode.Connected)
|
2011-05-18 17:25:30 +02:00
|
|
|
case KeeperState.Disconnected ⇒
|
2011-05-18 08:37:58 +02:00
|
|
|
EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress))
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(ThisNode.Disconnected)
|
2011-05-18 17:25:30 +02:00
|
|
|
case KeeperState.Expired ⇒
|
2011-05-18 08:37:58 +02:00
|
|
|
EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress))
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(ThisNode.Expired)
|
2011-05-18 08:37:58 +02:00
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Re-initialize after the zookeeper session has expired and a new session has been created.
|
|
|
|
|
*/
|
2011-05-18 08:37:58 +02:00
|
|
|
def handleNewSession() {
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.debug(this, "Session expired re-initializing node [%s]".format(self.nodeAddress))
|
2011-05-18 08:37:58 +02:00
|
|
|
self.initializeNode()
|
2011-05-23 22:35:01 +02:00
|
|
|
self.publish(NewSession)
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
trait ErrorHandler {
|
2011-05-18 17:25:30 +02:00
|
|
|
def withErrorHandler[T](body: ⇒ T) = {
|
2011-04-27 01:10:00 +02:00
|
|
|
try {
|
2011-06-17 10:25:02 +02:00
|
|
|
ignore[ZkInterruptedException](body)
|
2011-04-27 01:10:00 +02:00
|
|
|
} catch {
|
2011-05-18 17:25:30 +02:00
|
|
|
case e: Throwable ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.error(e, this, e.toString)
|
|
|
|
|
throw e
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
object RemoteClusterDaemon {
|
2011-05-20 17:13:39 +02:00
|
|
|
val ADDRESS = "akka-cluster-daemon".intern
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-30 10:53:25 +02:00
|
|
|
// FIXME configure computeGridDispatcher to what?
|
|
|
|
|
val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
import RemoteClusterDaemon._
|
|
|
|
|
import Cluster._
|
|
|
|
|
|
2011-05-20 22:41:41 +02:00
|
|
|
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-30 10:53:25 +02:00
|
|
|
override def preRestart(reason: Throwable) {
|
|
|
|
|
EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason))
|
|
|
|
|
}
|
|
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def receive: Receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case message: RemoteDaemonMessageProtocol ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
EventHandler.debug(this, "Received command to RemoteClusterDaemon [%s]".format(message))
|
2011-05-25 16:18:35 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
message.getMessageType match {
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case USE ⇒
|
2011-05-25 16:18:35 +02:00
|
|
|
try {
|
|
|
|
|
if (message.hasActorUuid) {
|
2011-06-17 10:25:02 +02:00
|
|
|
for {
|
|
|
|
|
address ← cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid))
|
|
|
|
|
format ← cluster.formatForActor(address)
|
|
|
|
|
} cluster.use(address, format)
|
|
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
} else if (message.hasActorAddress) {
|
|
|
|
|
val address = message.getActorAddress
|
2011-06-17 10:25:02 +02:00
|
|
|
cluster.formatForActor(address) foreach (format ⇒ cluster.use(address, format))
|
|
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
} else {
|
|
|
|
|
EventHandler.warning(this,
|
|
|
|
|
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]"
|
|
|
|
|
.format(message))
|
|
|
|
|
}
|
|
|
|
|
self.reply(Success)
|
2011-06-17 10:25:02 +02:00
|
|
|
|
2011-05-25 16:18:35 +02:00
|
|
|
} catch {
|
|
|
|
|
case error ⇒
|
|
|
|
|
self.reply(Failure(error))
|
|
|
|
|
throw error
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case RELEASE ⇒
|
2011-05-18 12:25:27 +02:00
|
|
|
if (message.hasActorUuid) {
|
2011-06-17 10:25:02 +02:00
|
|
|
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒
|
|
|
|
|
cluster.release(address)
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
} else if (message.hasActorAddress) {
|
2011-05-18 12:25:27 +02:00
|
|
|
cluster release message.getActorAddress
|
2011-05-25 16:18:35 +02:00
|
|
|
} else {
|
|
|
|
|
EventHandler.warning(this,
|
|
|
|
|
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]"
|
|
|
|
|
.format(message))
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case START ⇒ cluster.start()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-20 14:36:26 +02:00
|
|
|
case STOP ⇒ cluster.shutdown()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case DISCONNECT ⇒ cluster.disconnect()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case RECONNECT ⇒ cluster.reconnect()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case RESIGN ⇒ cluster.resign()
|
2011-04-27 01:10:00 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case FAIL_OVER_CONNECTIONS ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
|
|
|
|
|
cluster.failOverConnections(from, to)
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case FUNCTION_FUN0_UNIT ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
actorOf(new Actor() {
|
2011-05-30 10:53:25 +02:00
|
|
|
self.dispatcher = computeGridDispatcher
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case f: Function0[Unit] ⇒ try {
|
2011-05-18 12:25:27 +02:00
|
|
|
f()
|
|
|
|
|
} finally {
|
|
|
|
|
self.stop()
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}).start ! payloadFor(message, classOf[Function0[Unit]])
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case FUNCTION_FUN0_ANY ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
actorOf(new Actor() {
|
2011-05-30 10:53:25 +02:00
|
|
|
self.dispatcher = computeGridDispatcher
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case f: Function0[Any] ⇒ try {
|
2011-05-18 12:25:27 +02:00
|
|
|
self.reply(f())
|
|
|
|
|
} finally {
|
|
|
|
|
self.stop()
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}).start forward payloadFor(message, classOf[Function0[Any]])
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case FUNCTION_FUN1_ARG_UNIT ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
actorOf(new Actor() {
|
2011-05-30 10:53:25 +02:00
|
|
|
self.dispatcher = computeGridDispatcher
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def receive = {
|
2011-05-21 16:55:32 +02:00
|
|
|
case (fun: Function[Any, Unit], param: Any) ⇒ try {
|
|
|
|
|
fun(param)
|
2011-05-18 12:25:27 +02:00
|
|
|
} finally {
|
|
|
|
|
self.stop()
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case FUNCTION_FUN1_ARG_ANY ⇒
|
2011-04-27 01:10:00 +02:00
|
|
|
actorOf(new Actor() {
|
2011-05-30 10:53:25 +02:00
|
|
|
self.dispatcher = computeGridDispatcher
|
2011-05-18 12:25:27 +02:00
|
|
|
|
2011-04-27 01:10:00 +02:00
|
|
|
def receive = {
|
2011-05-21 16:55:32 +02:00
|
|
|
case (fun: Function[Any, Unit], param: Any) ⇒ try {
|
|
|
|
|
self.reply(fun(param))
|
2011-05-18 12:25:27 +02:00
|
|
|
} finally {
|
|
|
|
|
self.stop()
|
|
|
|
|
}
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
case unknown ⇒ EventHandler.warning(this, "Unknown message [%s]".format(unknown))
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
|
2011-06-14 19:35:18 +02:00
|
|
|
Serialization.serialize(message.getPayload.toByteArray, Some(clazz)).asInstanceOf[T]
|
2011-04-27 01:10:00 +02:00
|
|
|
}
|
|
|
|
|
}
|