2011-06-17 10:25:02 +02:00
2011-04-27 01:10:00 +02:00
/* *
* Copyright ( C ) 2009 - 2011 Scalable Solutions AB < http : //scalablesolutions.se>
*/
2011-04-28 20:12:37 +02:00
package akka.cluster
2011-04-27 01:10:00 +02:00
import org.apache.zookeeper._
import org.apache.zookeeper.Watcher.Event._
import org.apache.zookeeper.data.Stat
2011-05-18 17:25:30 +02:00
import org.apache.zookeeper.recipes.lock. { WriteLock , LockListener }
2011-04-27 01:10:00 +02:00
import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
2011-06-14 19:35:18 +02:00
import java.util. { List ⇒ JList }
2011-05-18 17:25:30 +02:00
import java.util.concurrent.atomic. { AtomicBoolean , AtomicReference , AtomicInteger }
import java.util.concurrent. { ConcurrentSkipListSet , CopyOnWriteArrayList , Callable , ConcurrentHashMap }
2011-04-27 01:10:00 +02:00
import java.net.InetSocketAddress
import javax.management.StandardMBean
2011-05-18 17:25:30 +02:00
import scala.collection.immutable. { HashMap , HashSet }
2011-04-27 01:10:00 +02:00
import scala.collection.mutable.ConcurrentMap
import scala.collection.JavaConversions._
import ClusterProtocol._
import RemoteDaemonMessageType._
import akka.util._
2011-04-28 20:12:37 +02:00
import Helpers._
2011-06-14 19:35:18 +02:00
2011-04-27 01:10:00 +02:00
import akka.actor._
2011-05-20 09:08:11 +02:00
import Actor._
2011-05-25 16:18:35 +02:00
import Status._
2011-06-10 16:31:24 +01:00
import DeploymentConfig. { ReplicationScheme , ReplicationStrategy , Transient , WriteThrough , WriteBehind }
2011-06-14 19:35:18 +02:00
2011-04-27 01:10:00 +02:00
import akka.event.EventHandler
2011-05-18 17:25:30 +02:00
import akka.dispatch. { Dispatchers , Future }
2011-04-27 01:10:00 +02:00
import akka.remoteinterface._
2011-05-24 19:04:25 +02:00
import akka.routing.RouterType
2011-06-14 19:35:18 +02:00
2011-05-30 10:53:25 +02:00
import akka.config. { Config , Supervision }
import Supervision._
2011-05-20 09:08:11 +02:00
import Config._
2011-06-14 19:35:18 +02:00
import akka.serialization. { Serialization , Serializer , Compression }
2011-05-20 09:08:11 +02:00
import Compression.LZF
2011-04-27 01:10:00 +02:00
import akka.AkkaException
2011-04-28 20:12:37 +02:00
import akka.cluster.zookeeper._
2011-05-23 22:35:01 +02:00
import akka.cluster.ChangeListener._
2011-04-27 01:10:00 +02:00
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
// FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk
/* *
* JMX MBean for the cluster service .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
trait ClusterNodeMBean {
2011-05-18 08:37:58 +02:00
def start ( )
2011-05-18 12:25:27 +02:00
2011-05-18 08:37:58 +02:00
def stop ( )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
def disconnect ( )
2011-05-18 12:25:27 +02:00
2011-05-18 08:37:58 +02:00
def reconnect ( )
2011-05-18 12:25:27 +02:00
2011-05-18 08:37:58 +02:00
def resign ( )
2011-04-27 01:10:00 +02:00
def isConnected : Boolean
def getRemoteServerHostname : String
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def getRemoteServerPort : Int
def getNodeName : String
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def getClusterName : String
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def getZooKeeperServerAddresses : String
def getMemberNodes : Array [ String ]
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def getLeader : String
def getUuidsForClusteredActors : Array [ String ]
2011-05-18 12:25:27 +02:00
2011-04-29 15:47:56 +02:00
def getAddressesForClusteredActors : Array [ String ]
2011-04-27 01:10:00 +02:00
def getUuidsForActorsInUse : Array [ String ]
2011-05-18 12:25:27 +02:00
2011-04-29 15:47:56 +02:00
def getAddressesForActorsInUse : Array [ String ]
2011-04-27 01:10:00 +02:00
def getNodesForActorInUseWithUuid ( uuid : String ) : Array [ String ]
2011-05-18 12:25:27 +02:00
2011-04-29 15:47:56 +02:00
def getNodesForActorInUseWithAddress ( address : String ) : Array [ String ]
2011-04-27 01:10:00 +02:00
def getUuidsForActorsInUseOnNode ( nodeName : String ) : Array [ String ]
2011-05-18 12:25:27 +02:00
2011-04-29 15:47:56 +02:00
def getAddressesForActorsInUseOnNode ( nodeName : String ) : Array [ String ]
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
def setConfigElement ( key : String , value : String )
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def getConfigElement ( key : String ) : AnyRef
2011-05-18 12:25:27 +02:00
2011-05-18 08:37:58 +02:00
def removeConfigElement ( key : String )
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def getConfigElementKeys : Array [ String ]
}
/* *
2011-05-20 09:08:11 +02:00
* Module for the ClusterNode . Also holds global state such as configuration data etc .
2011-04-27 01:10:00 +02:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
object Cluster {
val EMPTY_STRING = "" . intern
2011-05-18 12:25:27 +02:00
val UUID_PREFIX = "uuid:" . intern
2011-04-27 01:10:00 +02:00
// config options
2011-05-23 22:35:01 +02:00
val name = Config . clusterName
2011-05-18 12:25:27 +02:00
val zooKeeperServers = config . getString ( "akka.cluster.zookeeper-server-addresses" , "localhost:2181" )
val remoteServerPort = config . getInt ( "akka.cluster.remote-server-port" , 2552 )
val sessionTimeout = Duration ( config . getInt ( "akka.cluster.session-timeout" , 60 ) , TIME_UNIT ) . toMillis . toInt
val connectionTimeout = Duration ( config . getInt ( "akka.cluster.connection-timeout" , 60 ) , TIME_UNIT ) . toMillis . toInt
2011-04-28 20:12:37 +02:00
val maxTimeToWaitUntilConnected = Duration ( config . getInt ( "akka.cluster.max-time-to-wait-until-connected" , 30 ) , TIME_UNIT ) . toMillis . toInt
2011-05-18 12:25:27 +02:00
val shouldCompressData = config . getBool ( "akka.cluster.use-compression" , false )
val enableJMX = config . getBool ( "akka.enable-jmx" , true )
2011-05-25 16:18:35 +02:00
val remoteDaemonAckTimeout = Duration ( config . getInt ( "akka.cluster.remote-daemon-ack-timeout" , 30 ) , TIME_UNIT ) . toMillis . toInt
val excludeRefNodeInReplicaSet = config . getBool ( "akka.cluster.exclude-ref-node-in-replica-set" , true )
2011-04-27 01:10:00 +02:00
2011-05-20 09:08:11 +02:00
@volatile
private var properties = Map . empty [ String , String ]
2011-04-27 01:10:00 +02:00
2011-05-20 09:08:11 +02:00
def setProperty ( property : ( String , String ) ) {
properties = properties + property
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-21 16:55:32 +02:00
private def nodename : String = properties . get ( "akka.cluster.nodename" ) match {
case Some ( uberride ) ⇒ uberride
case None ⇒ Config . nodename
2011-04-27 01:10:00 +02:00
}
2011-05-21 16:55:32 +02:00
private def hostname : String = properties . get ( "akka.cluster.hostname" ) match {
case Some ( uberride ) ⇒ uberride
case None ⇒ Config . hostname
2011-05-20 09:08:11 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-21 16:55:32 +02:00
private def port : Int = properties . get ( "akka.cluster.port" ) match {
case Some ( uberride ) ⇒ uberride . toInt
case None ⇒ Config . remoteServerPort
2011-05-20 09:08:11 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-20 09:08:11 +02:00
val defaultSerializer = new SerializableSerializer
2011-04-27 01:10:00 +02:00
2011-05-20 09:08:11 +02:00
private val _zkServer = new AtomicReference [ Option [ ZkServer ] ] ( None )
2011-04-27 01:10:00 +02:00
/* *
2011-05-20 09:08:11 +02:00
* The node address .
2011-04-27 01:10:00 +02:00
*/
2011-05-23 22:35:01 +02:00
val nodeAddress = NodeAddress ( name , nodename , hostname , port )
2011-04-27 01:10:00 +02:00
/* *
2011-05-20 09:08:11 +02:00
* The reference to the running ClusterNode .
2011-04-27 01:10:00 +02:00
*/
2011-05-23 22:35:01 +02:00
val node = {
2011-05-20 09:08:11 +02:00
if ( nodeAddress eq null ) throw new IllegalArgumentException ( "NodeAddress can't be null" )
2011-05-23 22:35:01 +02:00
new DefaultClusterNode ( nodeAddress , zooKeeperServers , defaultSerializer )
2011-05-20 09:08:11 +02:00
}
2011-04-27 01:10:00 +02:00
/* *
2011-05-20 09:08:11 +02:00
* Looks up the local hostname .
2011-04-27 01:10:00 +02:00
*/
2011-05-20 09:08:11 +02:00
def lookupLocalhostName = NetworkUtil . getLocalhostName
2011-04-27 01:10:00 +02:00
/* *
* Starts up a local ZooKeeper server . Should only be used for testing purposes .
*/
def startLocalCluster ( ) : ZkServer =
startLocalCluster ( "_akka_cluster/data" , "_akka_cluster/log" , 2181 , 5000 )
/* *
* Starts up a local ZooKeeper server . Should only be used for testing purposes .
*/
def startLocalCluster ( port : Int , tickTime : Int ) : ZkServer =
startLocalCluster ( "_akka_cluster/data" , "_akka_cluster/log" , port , tickTime )
/* *
* Starts up a local ZooKeeper server . Should only be used for testing purposes .
*/
def startLocalCluster ( tickTime : Int ) : ZkServer =
startLocalCluster ( "_akka_cluster/data" , "_akka_cluster/log" , 2181 , tickTime )
/* *
* Starts up a local ZooKeeper server . Should only be used for testing purposes .
*/
def startLocalCluster ( dataPath : String , logPath : String ) : ZkServer =
startLocalCluster ( dataPath , logPath , 2181 , 500 )
/* *
* Starts up a local ZooKeeper server . Should only be used for testing purposes .
*/
def startLocalCluster ( dataPath : String , logPath : String , port : Int , tickTime : Int ) : ZkServer = {
try {
val zkServer = AkkaZooKeeper . startLocalServer ( dataPath , logPath , port , tickTime )
_zkServer . set ( Some ( zkServer ) )
zkServer
} catch {
2011-05-18 17:25:30 +02:00
case e : Throwable ⇒
2011-04-27 01:10:00 +02:00
EventHandler . error ( e , this , "Could not start local ZooKeeper cluster" )
throw e
}
}
/* *
* Shut down the local ZooKeeper server .
*/
2011-05-18 08:37:58 +02:00
def shutdownLocalCluster ( ) {
withPrintStackTraceOnError {
EventHandler . info ( this , "Shuts down local cluster" )
_zkServer . get . foreach ( _ . shutdown ( ) )
_zkServer . set ( None )
}
2011-04-27 01:10:00 +02:00
}
/* *
* Creates a new AkkaZkClient .
*/
2011-05-30 10:53:25 +02:00
def newZkClient ( ) : AkkaZkClient = new AkkaZkClient ( zooKeeperServers , sessionTimeout , connectionTimeout , defaultSerializer )
2011-04-27 01:10:00 +02:00
2011-05-23 22:35:01 +02:00
def createQueue ( rootPath : String , blocking : Boolean = true ) = new ZooKeeperQueue ( node . zkClient , rootPath , blocking )
def barrier ( name : String , count : Int ) =
ZooKeeperBarrier ( node . zkClient , node . nodeAddress . clusterName , name , node . nodeAddress . nodeName , count )
def barrier ( name : String , count : Int , timeout : Duration ) =
ZooKeeperBarrier ( node . zkClient , node . nodeAddress . clusterName , name , node . nodeAddress . nodeName , count , timeout )
2011-04-27 01:10:00 +02:00
def uuidToString ( uuid : UUID ) : String = uuid . toString
def stringToUuid ( uuid : String ) : UUID = {
if ( uuid eq null ) throw new ClusterException ( "UUID is null" )
2011-05-18 12:25:27 +02:00
if ( uuid == "" ) throw new ClusterException ( "UUID is an empty string" )
try {
new UUID ( uuid )
2011-05-18 17:25:30 +02:00
} catch {
case e : StringIndexOutOfBoundsException ⇒
2011-04-27 01:10:00 +02:00
val error = new ClusterException ( "UUID not valid [" + uuid + "]" )
EventHandler . error ( error , this , "" )
throw error
}
}
def uuidProtocolToUuid ( uuid : UuidProtocol ) = new UUID ( uuid . getHigh , uuid . getLow )
def uuidToUuidProtocol ( uuid : UUID ) =
UuidProtocol . newBuilder
. setHigh ( uuid . getTime )
. setLow ( uuid . getClockSeqAndNode )
. build
}
/* *
2011-05-18 08:37:58 +02:00
* A Cluster is made up by a bunch of jvm 's, the ClusterNode .
*
2011-04-27 01:10:00 +02:00
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2011-05-23 22:35:01 +02:00
class DefaultClusterNode private [ akka ] (
2011-05-18 17:25:30 +02:00
val nodeAddress : NodeAddress ,
val zkServerAddresses : String ,
2011-05-23 22:35:01 +02:00
val serializer : ZkSerializer ) extends ErrorHandler with ClusterNode {
2011-05-18 17:25:30 +02:00
self ⇒
2011-04-27 01:10:00 +02:00
if ( nodeAddress eq null ) throw new IllegalArgumentException ( "'nodeAddress' can not be 'null'" )
2011-05-23 22:35:01 +02:00
val clusterJmxObjectName = JMX . nameFor ( nodeAddress . hostname , "monitoring" , "cluster" )
2011-04-27 01:10:00 +02:00
2011-05-23 22:35:01 +02:00
import Cluster._
2011-04-27 01:10:00 +02:00
2011-06-17 12:07:45 +02:00
lazy val remoteClientLifeCycleListener = localActorOf ( new Actor {
2011-04-27 01:10:00 +02:00
def receive = {
2011-05-18 17:25:30 +02:00
case RemoteClientError ( cause , client , address ) ⇒ client . shutdownClientModule ( )
case RemoteClientDisconnected ( client , address ) ⇒ client . shutdownClientModule ( )
case _ ⇒ //ignore other
2011-04-27 01:10:00 +02:00
}
2011-05-30 10:53:25 +02:00
} , "akka.cluster.RemoteClientLifeCycleListener" ) . start ( )
2011-06-17 12:07:45 +02:00
lazy val remoteDaemon = localActorOf ( new RemoteClusterDaemon ( this ) , RemoteClusterDaemon . ADDRESS ) . start ( )
2011-05-30 10:53:25 +02:00
lazy val remoteDaemonSupervisor = Supervisor (
SupervisorConfig (
OneForOneStrategy ( List ( classOf [ Exception ] ) , Int . MaxValue , Int . MaxValue ) , // is infinite restart what we want?
Supervise (
remoteDaemon ,
Permanent )
: : Nil ) )
2011-05-19 10:58:30 +02:00
lazy val remoteService : RemoteSupport = {
2011-04-27 01:10:00 +02:00
val remote = new akka . remote . netty . NettyRemoteSupport
remote . start ( nodeAddress . hostname , nodeAddress . port )
2011-04-28 20:12:37 +02:00
remote . register ( RemoteClusterDaemon . ADDRESS , remoteDaemon )
2011-04-27 01:10:00 +02:00
remote . addListener ( remoteClientLifeCycleListener )
remote
}
2011-05-30 10:53:25 +02:00
2011-05-19 10:58:30 +02:00
lazy val remoteServerAddress : InetSocketAddress = remoteService . address
2011-04-27 01:10:00 +02:00
// static nodes
2011-06-22 09:59:00 +02:00
val CLUSTER_PATH = "/" + nodeAddress . clusterName
val MEMBERSHIP_PATH = CLUSTER_PATH + "/members"
val CONFIGURATION_PATH = CLUSTER_PATH + "/config"
val PROVISIONING_PATH = CLUSTER_PATH + "/provisioning"
val ACTOR_REGISTRY_PATH = CLUSTER_PATH + "/actor-registry"
val ACTOR_LOCATIONS_PATH = CLUSTER_PATH + "/actor-locations"
val ACTOR_ADDRESS_TO_UUIDS_PATH = CLUSTER_PATH + "/actor-address-to-uuids"
val ACTORS_AT_PATH_PATH = CLUSTER_PATH + "/actors-at-address"
val basePaths = List (
CLUSTER_PATH ,
MEMBERSHIP_PATH ,
ACTOR_REGISTRY_PATH ,
ACTOR_LOCATIONS_PATH ,
ACTORS_AT_PATH_PATH ,
ACTOR_ADDRESS_TO_UUIDS_PATH ,
CONFIGURATION_PATH ,
PROVISIONING_PATH )
val LEADER_ELECTION_PATH = CLUSTER_PATH + "/leader" // should NOT be part of 'basePaths' only used by 'leaderLock'
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
private val membershipNodePath = membershipPathFor ( nodeAddress . nodeName )
2011-04-27 01:10:00 +02:00
def membershipNodes : Array [ String ] = locallyCachedMembershipNodes . toList . toArray . asInstanceOf [ Array [ String ] ]
private [ akka ] val replicaConnections : ConcurrentMap [ String , Tuple2 [ InetSocketAddress , ActorRef ] ] =
new ConcurrentHashMap [ String , Tuple2 [ InetSocketAddress , ActorRef ] ]
// zookeeper listeners
2011-05-18 12:25:27 +02:00
private val stateListener = new StateListener ( this )
private val membershipListener = new MembershipChildListener ( this )
2011-04-27 01:10:00 +02:00
// cluster node listeners
2011-05-18 12:25:27 +02:00
private val changeListeners = new CopyOnWriteArrayList [ ChangeListener ] ( )
2011-04-27 01:10:00 +02:00
// Address -> ClusterActorRef
2011-05-18 12:25:27 +02:00
private val clusterActorRefs = new Index [ InetSocketAddress , ClusterActorRef ]
2011-04-27 01:10:00 +02:00
// resources
2011-05-23 22:35:01 +02:00
lazy private [ cluster ] val zkClient = new AkkaZkClient ( zkServerAddresses , sessionTimeout , connectionTimeout , serializer )
2011-04-27 01:10:00 +02:00
2011-05-23 22:35:01 +02:00
lazy private [ cluster ] val leaderElectionCallback = new LockListener {
2011-05-18 08:37:58 +02:00
override def lockAcquired ( ) {
2011-04-27 01:10:00 +02:00
EventHandler . info ( this , "Node [%s] is the new leader" . format ( self . nodeAddress . nodeName ) )
2011-05-23 22:35:01 +02:00
self . publish ( NewLeader ( self . nodeAddress . nodeName ) )
2011-04-27 01:10:00 +02:00
}
2011-05-18 08:37:58 +02:00
override def lockReleased ( ) {
2011-06-22 09:59:00 +02:00
EventHandler . info ( this , "Node [%s] is *NOT* the leader anymore" . format ( self . nodeAddress . nodeName ) )
2011-04-27 01:10:00 +02:00
}
}
2011-05-23 22:35:01 +02:00
lazy private [ cluster ] val leaderLock = new WriteLock (
2011-06-22 09:59:00 +02:00
zkClient . connection . getZookeeper ,
LEADER_ELECTION_PATH , null ,
leaderElectionCallback )
2011-04-27 01:10:00 +02:00
if ( enableJMX ) createMBean
// =======================================
// Node
// =======================================
def start ( ) : ClusterNode = {
isConnected switchOn {
2011-05-18 08:37:58 +02:00
initializeNode ( )
2011-04-27 01:10:00 +02:00
}
this
}
2011-05-20 14:36:26 +02:00
def shutdown ( ) {
2011-05-18 08:37:58 +02:00
isConnected switchOff {
ignore [ ZkNoNodeException ] ( zkClient . deleteRecursive ( membershipNodePath ) )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
locallyCachedMembershipNodes . clear ( )
locallyCheckedOutActors . clear ( )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
replicaConnections . toList . foreach ( {
2011-05-18 17:25:30 +02:00
case ( _ , ( address , _ ) ) ⇒
2011-05-18 08:37:58 +02:00
Actor . remote . shutdownClientConnection ( address ) // shut down client connections
} )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
remoteService . shutdown ( ) // shutdown server
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
remoteClientLifeCycleListener . stop ( )
remoteDaemon . stop ( )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
// for monitoring remote listener
registry . local . actors . filter ( remoteService . hasListener ) . foreach ( _ . stop ( ) )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
replicaConnections . clear ( )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
disconnect ( )
EventHandler . info ( this , "Cluster node shut down [%s]" . format ( nodeAddress ) )
}
2011-04-27 01:10:00 +02:00
}
def disconnect ( ) : ClusterNode = {
2011-05-18 08:37:58 +02:00
zkClient . unsubscribeAll ( )
zkClient . close ( )
2011-04-27 01:10:00 +02:00
this
}
def reconnect ( ) : ClusterNode = {
2011-05-18 08:37:58 +02:00
zkClient . reconnect ( )
2011-04-27 01:10:00 +02:00
this
}
// =======================================
// Change notification
// =======================================
/* *
* Registers a cluster change listener .
*/
2011-06-17 16:07:41 +02:00
def register ( listener : ChangeListener ) : ClusterNode = {
2011-04-27 01:10:00 +02:00
changeListeners . add ( listener )
this
2011-06-17 16:07:41 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
private [ cluster ] def publish ( change : ChangeNotification ) {
changeListeners . iterator . foreach ( _ . notify ( change , this ) )
}
2011-04-27 01:10:00 +02:00
// =======================================
// Leader
// =======================================
/* *
2011-06-22 09:59:00 +02:00
* Returns the name of the current leader lock .
*/
def leader : String = leaderLock . getId
/* *
* Returns true if 'this' node is the current leader .
2011-04-27 01:10:00 +02:00
*/
2011-06-22 09:59:00 +02:00
def isLeader : Boolean = leaderLock . isOwner
2011-04-27 01:10:00 +02:00
/* *
* Explicitly resign from being a leader . If this node is not a leader then this operation is a no - op .
*/
2011-05-18 12:25:27 +02:00
def resign ( ) {
2011-06-22 09:59:00 +02:00
if ( isLeader ) leaderLock . unlock ( )
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
// =======================================
// Actor
// =======================================
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , 0 , Transient , false , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , replicationScheme : ReplicationScheme , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , 0 , replicationScheme , false , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , replicationFactor : Int , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , replicationFactor , Transient , false , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , replicationFactor : Int , replicationScheme : ReplicationScheme , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , replicationFactor , replicationScheme , false , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , 0 , Transient , serializeMailbox , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , replicationScheme : ReplicationScheme , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , 0 , replicationScheme , serializeMailbox , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , replicationFactor : Int , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , replicationFactor , Transient , serializeMailbox , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor of a specific type . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store [ T <: Actor ] ( address : String , actorClass : Class [ T ] , replicationFactor : Int , replicationScheme : ReplicationScheme , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( Actor . actorOf ( actorClass , address ) . start , replicationFactor , replicationScheme , serializeMailbox , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store ( actorRef : ActorRef , serializer : Serializer ) : ClusterNode =
store ( actorRef , 0 , Transient , false , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 11:04:32 +02:00
def store ( actorRef : ActorRef , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( actorRef , 0 , Transient , serializeMailbox , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 11:04:32 +02:00
def store ( actorRef : ActorRef , replicationScheme : ReplicationScheme , serializer : Serializer ) : ClusterNode =
store ( actorRef , 0 , replicationScheme , false , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 11:04:32 +02:00
def store ( actorRef : ActorRef , replicationFactor : Int , serializer : Serializer ) : ClusterNode =
store ( actorRef , replicationFactor , Transient , false , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 11:04:32 +02:00
def store ( actorRef : ActorRef , replicationFactor : Int , replicationScheme : ReplicationScheme , serializer : Serializer ) : ClusterNode =
store ( actorRef , replicationFactor , replicationScheme , false , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store ( actorRef : ActorRef , replicationFactor : Int , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( actorRef , replicationFactor , Transient , serializeMailbox , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-22 09:59:00 +02:00
def store ( actorRef : ActorRef , replicationScheme : ReplicationScheme , serializeMailbox : Boolean , serializer : Serializer ) : ClusterNode =
store ( actorRef , 0 , replicationScheme , serializeMailbox , serializer )
2011-06-07 11:10:29 -07:00
/* *
* Needed to have reflection through structural typing work .
*/
2011-06-22 09:59:00 +02:00
def store ( actorRef : ActorRef , replicationFactor : Int , replicationScheme : ReplicationScheme , serializeMailbox : Boolean , serializer : AnyRef ) : ClusterNode =
store ( actorRef , replicationFactor , replicationScheme , serializeMailbox , serializer . asInstanceOf [ Serializer ] )
2011-05-20 09:08:11 +02:00
/* *
* Needed to have reflection through structural typing work .
*/
2011-06-22 09:59:00 +02:00
def store ( actorRef : ActorRef , replicationFactor : Int , serializeMailbox : Boolean , serializer : AnyRef ) : ClusterNode =
store ( actorRef , replicationFactor , Transient , serializeMailbox , serializer )
2011-04-27 01:10:00 +02:00
/* *
* Clusters an actor with UUID . If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument . You can use this to save off snapshots of the actor to a highly
* available durable store .
*/
2011-06-07 11:10:29 -07:00
def store (
actorRef : ActorRef ,
replicationFactor : Int ,
2011-06-10 16:31:24 +01:00
replicationScheme : ReplicationScheme ,
2011-06-07 11:10:29 -07:00
serializeMailbox : Boolean ,
2011-06-22 09:59:00 +02:00
serializer : Serializer ) : ClusterNode = if ( isConnected . isOn ) {
2011-04-27 01:10:00 +02:00
import akka.serialization.ActorSerialization._
if ( ! actorRef . isInstanceOf [ LocalActorRef ] ) throw new IllegalArgumentException (
"'actorRef' must be an instance of 'LocalActorRef' [" + actorRef . getClass . getName + "]" )
2011-06-22 09:59:00 +02:00
val serializerClassName = serializer . getClass . getName
2011-04-27 01:10:00 +02:00
val uuid = actorRef . uuid
EventHandler . debug ( this ,
2011-05-25 16:18:35 +02:00
"Storing actor [%s] with UUID [%s] in cluster" . format ( actorRef . address , uuid ) )
2011-04-27 01:10:00 +02:00
2011-06-07 11:10:29 -07:00
val actorBytes =
2011-06-14 19:35:18 +02:00
if ( shouldCompressData ) LZF . compress ( toBinary ( actorRef , serializeMailbox , replicationScheme ) )
else toBinary ( actorRef , serializeMailbox , replicationScheme )
2011-06-07 11:10:29 -07:00
2011-04-29 15:47:56 +02:00
val actorRegistryPath = actorRegistryPathFor ( uuid )
2011-04-27 01:10:00 +02:00
// create UUID -> Array[Byte] for actor registry
2011-06-17 10:25:02 +02:00
try {
zkClient . writeData ( actorRegistryPath , actorBytes ) // FIXME Store actor bytes in Data Grid not ZooKeeper
} catch {
case e : ZkNoNodeException ⇒ // if not stored yet, store the actor
zkClient . retryUntilConnected ( new Callable [ Either [ String , Exception ] ] ( ) {
def call : Either [ String , Exception ] = {
try {
Left ( zkClient . connection . create ( actorRegistryPath , actorBytes , CreateMode . PERSISTENT ) )
} catch {
case e : KeeperException . NodeExistsException ⇒ Right ( e )
}
2011-04-27 01:10:00 +02:00
}
2011-06-17 10:25:02 +02:00
} ) match {
case Left ( path ) ⇒ path
case Right ( exception ) ⇒ actorRegistryPath
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
2011-06-22 09:59:00 +02:00
// create UUID -> serializer class name registry
2011-06-17 10:25:02 +02:00
try {
2011-06-22 09:59:00 +02:00
zkClient . createPersistent ( actorRegistrySerializerPathFor ( uuid ) , serializerClassName )
2011-06-17 10:25:02 +02:00
} catch {
2011-06-22 09:59:00 +02:00
case e : ZkNodeExistsException ⇒ zkClient . writeData ( actorRegistrySerializerPathFor ( uuid ) , serializerClassName )
2011-06-17 10:25:02 +02:00
}
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
// create UUID -> ADDRESS registry
try {
zkClient . createPersistent ( actorRegistryActorAddressPathFor ( uuid ) , actorRef . address )
} catch {
case e : ZkNodeExistsException ⇒ zkClient . writeData ( actorRegistryActorAddressPathFor ( uuid ) , actorRef . address )
}
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
// create UUID -> Address registry
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorRegistryNodePathFor ( uuid ) ) )
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
// create UUID -> Node registry
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorLocationsPathFor ( uuid ) ) )
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
// create ADDRESS -> UUIDs registry
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorAddressToUuidsPathFor ( actorRef . address ) ) )
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( "%s/%s" . format ( actorAddressToUuidsPathFor ( actorRef . address ) , uuid ) ) )
2011-04-27 01:10:00 +02:00
}
2011-05-25 16:18:35 +02:00
import RemoteClusterDaemon._
2011-04-27 01:10:00 +02:00
val command = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( USE )
. setActorUuid ( uuidToUuidProtocol ( uuid ) )
. build
2011-05-25 16:18:35 +02:00
2011-05-18 17:25:30 +02:00
replicaConnectionsForReplicationFactor ( replicationFactor ) foreach { connection ⇒
2011-06-13 15:29:35 +02:00
( connection ? ( command , remoteDaemonAckTimeout ) ) . as [ Status ] match {
2011-05-25 16:18:35 +02:00
case Some ( Success ) ⇒
2011-06-07 11:10:29 -07:00
EventHandler . debug ( this , "Replica for [%s] successfully created" . format ( actorRef . address ) )
2011-05-25 16:18:35 +02:00
case Some ( Failure ( cause ) ) ⇒
EventHandler . error ( cause , this , cause . toString )
throw cause
case None ⇒
val error = new ClusterException (
2011-06-14 14:26:13 +02:00
"Operation to instantiate replicas throughout the cluster timed out" )
2011-05-25 16:18:35 +02:00
EventHandler . error ( error , this , error . toString )
throw error
}
2011-04-27 01:10:00 +02:00
}
this
} else throw new ClusterException ( "Not connected to cluster" )
2011-06-22 11:31:01 +02:00
/* *
* Removes actor from the cluster .
*/
def remove ( actorRef : ActorRef ) {
remove ( actorRef . uuid )
}
2011-04-27 01:10:00 +02:00
/* *
2011-04-29 15:47:56 +02:00
* Removes actor with uuid from the cluster .
2011-04-27 01:10:00 +02:00
*/
2011-05-18 08:37:58 +02:00
def remove ( uuid : UUID ) {
2011-04-29 15:47:56 +02:00
releaseActorOnAllNodes ( uuid )
locallyCheckedOutActors . remove ( uuid )
2011-06-07 11:10:29 -07:00
2011-04-29 15:47:56 +02:00
// warning: ordering matters here
2011-06-17 10:25:02 +02:00
// FIXME remove ADDRESS to UUID mapping?
actorAddressForUuid ( uuid ) foreach ( address ⇒ ignore [ ZkNoNodeException ] ( zkClient . deleteRecursive ( actorAddressToUuidsPathFor ( address ) ) ) )
2011-04-29 15:47:56 +02:00
ignore [ ZkNoNodeException ] ( zkClient . deleteRecursive ( actorAtNodePathFor ( nodeAddress . nodeName , uuid ) ) )
ignore [ ZkNoNodeException ] ( zkClient . deleteRecursive ( actorRegistryPathFor ( uuid ) ) )
ignore [ ZkNoNodeException ] ( zkClient . deleteRecursive ( actorLocationsPathFor ( uuid ) ) )
}
2011-04-27 01:10:00 +02:00
/* *
2011-04-29 15:47:56 +02:00
* Removes actor with address from the cluster .
2011-04-27 01:10:00 +02:00
*/
2011-04-29 15:47:56 +02:00
def remove ( address : String ) : ClusterNode = {
2011-04-27 01:10:00 +02:00
isConnected ifOn {
2011-04-29 15:47:56 +02:00
EventHandler . debug ( this ,
2011-05-25 16:18:35 +02:00
"Removing actor(s) with address [%s] from cluster" . format ( address ) )
2011-05-18 17:25:30 +02:00
uuidsForActorAddress ( address ) foreach ( uuid ⇒ remove ( uuid ) )
2011-04-27 01:10:00 +02:00
}
this
}
/* *
* Is the actor with uuid clustered or not ?
*/
2011-04-29 15:47:56 +02:00
def isClustered ( actorAddress : String ) : Boolean = if ( isConnected . isOn ) {
2011-05-18 17:25:30 +02:00
actorUuidsForActorAddress ( actorAddress ) map { uuid ⇒
zkClient . exists ( actorRegistryPathFor ( uuid ) )
2011-04-27 01:10:00 +02:00
} exists ( _ == true )
} else false
/* *
* Is the actor with uuid in use on 'this' node or not ?
*/
2011-04-29 15:47:56 +02:00
def isInUseOnNode ( actorAddress : String ) : Boolean = isInUseOnNode ( actorAddress , nodeAddress )
2011-04-27 01:10:00 +02:00
/* *
* Is the actor with uuid in use or not ?
*/
2011-04-29 15:47:56 +02:00
def isInUseOnNode ( actorAddress : String , node : NodeAddress ) : Boolean = if ( isConnected . isOn ) {
2011-05-18 17:25:30 +02:00
actorUuidsForActorAddress ( actorAddress ) map { uuid ⇒
zkClient . exists ( actorLocationsPathFor ( uuid , node ) )
2011-04-27 01:10:00 +02:00
} exists ( _ == true )
} else false
/* *
* Checks out an actor for use on this node , e . g . checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID .
*/
2011-06-22 09:59:00 +02:00
def use [ T <: Actor ] ( actorAddress : String ) : Option [ ActorRef ] = use ( actorAddress , serializerForActor ( actorAddress ) )
2011-05-20 17:13:39 +02:00
/* *
* Checks out an actor for use on this node , e . g . checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID .
*/
2011-06-22 09:59:00 +02:00
def use [ T <: Actor ] ( actorAddress : String , serializer : Serializer ) : Option [ ActorRef ] = if ( isConnected . isOn ) {
2011-04-27 01:10:00 +02:00
import akka.serialization.ActorSerialization._
2011-05-18 17:25:30 +02:00
actorUuidsForActorAddress ( actorAddress ) map { uuid ⇒
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorAtNodePathFor ( nodeAddress . nodeName , uuid ) , true ) )
ignore [ ZkNodeExistsException ] ( zkClient . createEphemeral ( actorLocationsPathFor ( uuid , nodeAddress ) ) )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
// set home address
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorRegistryNodePathFor ( uuid ) ) )
ignore [ ZkNodeExistsException ] ( zkClient . createEphemeral ( actorRegistryNodePathFor ( uuid , remoteServerAddress ) ) )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
val actorPath = actorRegistryPathFor ( uuid )
zkClient . retryUntilConnected ( new Callable [ Either [ Array [ Byte ] , Exception ] ] ( ) {
def call : Either [ Array [ Byte ] , Exception ] = {
try {
Left ( if ( shouldCompressData ) LZF . uncompress ( zkClient . connection . readData ( actorPath , new Stat , false ) )
else zkClient . connection . readData ( actorPath , new Stat , false ) )
} catch {
2011-06-17 10:25:02 +02:00
case e : KeeperException . NoNodeException ⇒ Right ( e )
2011-04-27 01:10:00 +02:00
}
2011-05-18 12:25:27 +02:00
}
2011-05-18 17:25:30 +02:00
} ) match {
case Left ( bytes ) ⇒
locallyCheckedOutActors += ( uuid -> bytes )
2011-06-14 19:35:18 +02:00
val actor = fromBinary [ T ] ( bytes , remoteServerAddress )
2011-06-07 11:10:29 -07:00
EventHandler . debug ( this ,
"Checking out actor [%s] to be used on node [%s] as local actor"
. format ( actor , nodeAddress . nodeName ) )
2011-05-18 17:25:30 +02:00
actor . start ( )
2011-06-07 11:10:29 -07:00
actor
2011-05-18 17:25:30 +02:00
case Right ( exception ) ⇒ throw exception
}
2011-05-30 10:53:25 +02:00
} headOption // FIXME should not be an array at all coming here but an Option[ActorRef]
2011-05-23 22:35:01 +02:00
} else None
2011-04-27 01:10:00 +02:00
/* *
* Using ( checking out ) all actors with a specific UUID on all nodes in the cluster .
*/
2011-05-18 08:37:58 +02:00
def useActorOnAllNodes ( uuid : UUID ) {
isConnected ifOn {
EventHandler . debug ( this ,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster" . format ( uuid ) )
2011-06-07 20:10:08 -07:00
2011-05-18 08:37:58 +02:00
val command = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( USE )
. setActorUuid ( uuidToUuidProtocol ( uuid ) )
. build
2011-06-07 20:10:08 -07:00
2011-05-18 17:25:30 +02:00
membershipNodes foreach { node ⇒
replicaConnections . get ( node ) foreach {
2011-06-07 20:10:08 -07:00
case ( _ , connection ) ⇒ connection ! command
2011-05-18 17:25:30 +02:00
}
2011-04-27 01:10:00 +02:00
}
}
}
/* *
* Using ( checking out ) specific UUID on a specefic node .
*/
2011-05-18 08:37:58 +02:00
def useActorOnNode ( node : String , uuid : UUID ) {
isConnected ifOn {
replicaConnections . get ( node ) foreach {
2011-05-18 17:25:30 +02:00
case ( _ , connection ) ⇒
2011-05-18 08:37:58 +02:00
connection ! RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( USE )
. setActorUuid ( uuidToUuidProtocol ( uuid ) )
. build
}
2011-04-27 01:10:00 +02:00
}
}
2011-06-22 11:31:01 +02:00
/* *
* Checks in an actor after done using it on this node .
*/
def release ( actorRef : ActorRef ) {
release ( actorRef . address )
}
2011-04-27 01:10:00 +02:00
/* *
* Checks in an actor after done using it on this node .
*/
2011-05-18 08:37:58 +02:00
def release ( actorAddress : String ) {
2011-06-22 11:31:01 +02:00
// FIXME 'Cluster.release' needs to notify all existing ClusterActorRef's that are using the instance that it is no longer available. Then what to do? Should we even remove this method?
2011-05-18 08:37:58 +02:00
isConnected ifOn {
2011-05-18 17:25:30 +02:00
actorUuidsForActorAddress ( actorAddress ) foreach { uuid ⇒
EventHandler . debug ( this ,
"Releasing actor with UUID [%s] after usage" . format ( uuid ) )
locallyCheckedOutActors . remove ( uuid )
ignore [ ZkNoNodeException ] ( zkClient . deleteRecursive ( actorAtNodePathFor ( nodeAddress . nodeName , uuid ) ) )
ignore [ ZkNoNodeException ] ( zkClient . delete ( actorAtNodePathFor ( nodeAddress . nodeName , uuid ) ) )
ignore [ ZkNoNodeException ] ( zkClient . delete ( actorLocationsPathFor ( uuid , nodeAddress ) ) )
ignore [ ZkNoNodeException ] ( zkClient . delete ( actorRegistryNodePathFor ( uuid , remoteServerAddress ) ) )
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
}
}
/* *
* Releases ( checking in ) all actors with a specific UUID on all nodes in the cluster where the actor is in 'use' .
*/
2011-05-18 08:37:58 +02:00
def releaseActorOnAllNodes ( uuid : UUID ) {
isConnected ifOn {
EventHandler . debug ( this ,
"Releasing (checking in) all actors with UUID [%s] on all nodes in cluster" . format ( uuid ) )
val command = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( RELEASE )
. setActorUuid ( uuidToUuidProtocol ( uuid ) )
. build
2011-05-18 17:25:30 +02:00
nodesForActorsInUseWithUuid ( uuid ) foreach { node ⇒
replicaConnections . get ( node ) foreach {
case ( _ , connection ) ⇒
connection ! command
}
2011-04-27 01:10:00 +02:00
}
}
}
/* *
* Creates an ActorRef with a Router to a set of clustered actors .
*/
2011-05-16 09:47:23 +02:00
def ref ( actorAddress : String , router : RouterType ) : ActorRef = if ( isConnected . isOn ) {
2011-04-27 01:10:00 +02:00
val addresses = addressesForActor ( actorAddress )
EventHandler . debug ( this ,
2011-06-07 11:10:29 -07:00
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
. format ( actorAddress , router , remoteServerAddress , addresses . map ( _ . _2 ) . mkString ( "\n\t" ) ) )
2011-04-27 01:10:00 +02:00
2011-05-25 16:18:35 +02:00
val actorRef = Router newRouter ( router , addresses , actorAddress , Actor . TIMEOUT )
addresses foreach { case ( _ , address ) ⇒ clusterActorRefs . put ( address , actorRef ) }
actorRef . start ( )
2011-05-24 19:04:25 +02:00
2011-04-27 01:10:00 +02:00
} else throw new ClusterException ( "Not connected to cluster" )
/* *
* Migrate the actor from 'this' node to node 'to' .
*/
2011-05-18 08:37:58 +02:00
def migrate ( to : NodeAddress , actorAddress : String ) {
migrate ( nodeAddress , to , actorAddress )
}
2011-04-27 01:10:00 +02:00
/* *
* Migrate the actor from node 'from' to node 'to' .
*/
def migrate (
2011-05-18 17:25:30 +02:00
from : NodeAddress , to : NodeAddress , actorAddress : String ) {
2011-05-18 08:37:58 +02:00
isConnected ifOn {
if ( from eq null ) throw new IllegalArgumentException ( "NodeAddress 'from' can not be 'null'" )
if ( to eq null ) throw new IllegalArgumentException ( "NodeAddress 'to' can not be 'null'" )
if ( isInUseOnNode ( actorAddress , from ) ) {
migrateWithoutCheckingThatActorResidesOnItsHomeNode ( from , to , actorAddress )
} else {
throw new ClusterException ( "Can't move actor from node [" + from + "] since it does not exist on this node" )
}
2011-04-27 01:10:00 +02:00
}
}
/* *
* Returns the UUIDs of all actors checked out on this node .
*/
def uuidsForActorsInUse : Array [ UUID ] = uuidsForActorsInUseOnNode ( nodeAddress . nodeName )
/* *
2011-04-29 15:47:56 +02:00
* Returns the addresses of all actors checked out on this node .
2011-04-27 01:10:00 +02:00
*/
2011-04-29 15:47:56 +02:00
def addressesForActorsInUse : Array [ String ] = actorAddressForUuids ( uuidsForActorsInUse )
2011-04-27 01:10:00 +02:00
/* *
* Returns the UUIDs of all actors registered in this cluster .
*/
def uuidsForClusteredActors : Array [ UUID ] = if ( isConnected . isOn ) {
2011-06-22 09:59:00 +02:00
zkClient . getChildren ( ACTOR_REGISTRY_PATH ) . toList . map ( new UUID ( _ ) ) . toArray . asInstanceOf [ Array [ UUID ] ]
2011-04-27 01:10:00 +02:00
} else Array . empty [ UUID ]
/* *
2011-04-29 15:47:56 +02:00
* Returns the addresses of all actors registered in this cluster .
2011-04-27 01:10:00 +02:00
*/
2011-04-29 15:47:56 +02:00
def addressesForClusteredActors : Array [ String ] = actorAddressForUuids ( uuidsForClusteredActors )
2011-04-27 01:10:00 +02:00
/* *
* Returns the actor id for the actor with a specific UUID .
*/
2011-06-17 10:25:02 +02:00
def actorAddressForUuid ( uuid : UUID ) : Option [ String ] = if ( isConnected . isOn ) {
2011-05-18 12:25:27 +02:00
try {
2011-06-17 10:25:02 +02:00
Some ( zkClient . readData ( actorRegistryActorAddressPathFor ( uuid ) ) . asInstanceOf [ String ] )
2011-05-18 17:25:30 +02:00
} catch {
2011-06-17 10:25:02 +02:00
case e : ZkNoNodeException ⇒ None
2011-05-18 12:25:27 +02:00
}
2011-06-17 10:25:02 +02:00
} else None
2011-04-27 01:10:00 +02:00
/* *
* Returns the actor ids for all the actors with a specific UUID .
*/
2011-06-17 10:25:02 +02:00
def actorAddressForUuids ( uuids : Array [ UUID ] ) : Array [ String ] =
uuids map ( actorAddressForUuid ( _ ) ) filter ( _ . isDefined ) map ( _ . get )
2011-04-27 01:10:00 +02:00
/* *
* Returns the actor UUIDs for actor ID .
*/
2011-04-28 20:12:37 +02:00
def uuidsForActorAddress ( actorAddress : String ) : Array [ UUID ] = if ( isConnected . isOn ) {
2011-05-18 12:25:27 +02:00
try {
2011-05-21 17:30:16 +02:00
zkClient . getChildren ( actorAddressToUuidsPathFor ( actorAddress ) ) . toArray map {
case c : CharSequence ⇒ new UUID ( c )
}
2011-05-18 17:25:30 +02:00
} catch {
case e : ZkNoNodeException ⇒ Array [ UUID ] ( )
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
} else Array . empty [ UUID ]
/* *
* Returns the node names of all actors in use with UUID .
*/
def nodesForActorsInUseWithUuid ( uuid : UUID ) : Array [ String ] = if ( isConnected . isOn ) {
2011-05-18 12:25:27 +02:00
try {
2011-05-21 17:30:16 +02:00
zkClient . getChildren ( actorLocationsPathFor ( uuid ) ) . toArray . asInstanceOf [ Array [ String ] ]
2011-05-18 17:25:30 +02:00
} catch {
case e : ZkNoNodeException ⇒ Array [ String ] ( )
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
} else Array . empty [ String ]
/* *
2011-04-29 15:47:56 +02:00
* Returns the node names of all actors in use with address .
2011-04-27 01:10:00 +02:00
*/
2011-04-29 15:47:56 +02:00
def nodesForActorsInUseWithAddress ( address : String ) : Array [ String ] = if ( isConnected . isOn ) {
2011-04-27 01:10:00 +02:00
flatten {
2011-05-18 17:25:30 +02:00
actorUuidsForActorAddress ( address ) map { uuid ⇒
try {
2011-05-21 17:30:16 +02:00
zkClient . getChildren ( actorLocationsPathFor ( uuid ) ) . toArray . asInstanceOf [ Array [ String ] ]
2011-05-18 17:25:30 +02:00
} catch {
case e : ZkNoNodeException ⇒ Array [ String ] ( )
}
2011-04-27 01:10:00 +02:00
}
}
} else Array . empty [ String ]
/* *
* Returns the UUIDs of all actors in use registered on a specific node .
*/
def uuidsForActorsInUseOnNode ( nodeName : String ) : Array [ UUID ] = if ( isConnected . isOn ) {
2011-05-18 12:25:27 +02:00
try {
2011-05-21 17:30:16 +02:00
zkClient . getChildren ( actorsAtNodePathFor ( nodeName ) ) . toArray map {
case c : CharSequence ⇒ new UUID ( c )
}
2011-05-18 17:25:30 +02:00
} catch {
case e : ZkNoNodeException ⇒ Array [ UUID ] ( )
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
} else Array . empty [ UUID ]
/* *
2011-04-29 15:47:56 +02:00
* Returns the addresses of all actors in use registered on a specific node .
2011-04-27 01:10:00 +02:00
*/
2011-04-29 15:47:56 +02:00
def addressesForActorsInUseOnNode ( nodeName : String ) : Array [ String ] = if ( isConnected . isOn ) {
2011-04-27 01:10:00 +02:00
val uuids =
2011-05-18 12:25:27 +02:00
try {
2011-05-21 17:30:16 +02:00
zkClient . getChildren ( actorsAtNodePathFor ( nodeName ) ) . toArray map {
case c : CharSequence ⇒ new UUID ( c )
}
2011-05-18 17:25:30 +02:00
} catch {
case e : ZkNoNodeException ⇒ Array [ UUID ] ( )
2011-05-18 12:25:27 +02:00
}
2011-04-29 15:47:56 +02:00
actorAddressForUuids ( uuids )
2011-04-27 01:10:00 +02:00
} else Array . empty [ String ]
/* *
2011-06-22 09:59:00 +02:00
* Returns Serializer for actor with specific address .
2011-04-27 01:10:00 +02:00
*/
2011-06-22 09:59:00 +02:00
def serializerForActor ( actorAddress : String ) : Serializer = {
// FIXME should only be 1 single class name per actor address - FIX IT
2011-04-27 01:10:00 +02:00
2011-06-22 09:59:00 +02:00
val serializerClassNames = actorUuidsForActorAddress ( actorAddress ) map { uuid ⇒
2011-06-17 10:25:02 +02:00
try {
2011-06-22 09:59:00 +02:00
Some ( zkClient . readData ( actorRegistrySerializerPathFor ( uuid ) , new Stat ) . asInstanceOf [ String ] )
2011-06-17 10:25:02 +02:00
} catch {
case e : ZkNoNodeException ⇒ None
}
} filter ( _ . isDefined ) map ( _ . get )
2011-04-27 01:10:00 +02:00
2011-06-22 09:59:00 +02:00
if ( serializerClassNames . isEmpty ) throw new IllegalStateException ( "No serializer found for actor with address [%s]" . format ( actorAddress ) )
if ( serializerClassNames . forall ( _ == serializerClassNames . head ) == false )
throw new IllegalStateException ( "Multiple serializers found for actor with address [%s]" . format ( actorAddress ) )
2011-05-21 16:55:32 +02:00
2011-06-22 09:59:00 +02:00
val serializerClassName = serializerClassNames . head
ReflectiveAccess . getClassFor ( serializerClassName ) match { // FIXME need to pass in a user provide class loader? Now using default in ReflectiveAccess.
case Right ( clazz ) ⇒ clazz . newInstance . asInstanceOf [ Serializer ]
case Left ( error ) ⇒
EventHandler . error ( error , this , "Could not load serializer class [%s] due to: %s" . format ( serializerClassName , error . toString ) )
throw error
}
2011-04-27 01:10:00 +02:00
}
/* *
* Returns home address for actor with UUID .
*/
2011-04-29 15:47:56 +02:00
def addressesForActor ( actorAddress : String ) : Array [ ( UUID , InetSocketAddress ) ] = {
2011-04-27 01:10:00 +02:00
try {
for {
2011-05-18 17:25:30 +02:00
uuid ← actorUuidsForActorAddress ( actorAddress )
address ← zkClient . getChildren ( actorRegistryNodePathFor ( uuid ) ) . toList
2011-04-27 01:10:00 +02:00
} yield {
val tokenizer = new java . util . StringTokenizer ( address , ":" )
2011-05-18 12:25:27 +02:00
val hostname = tokenizer . nextToken // hostname
val port = tokenizer . nextToken . toInt // port
2011-04-27 01:10:00 +02:00
( uuid , new InetSocketAddress ( hostname , port ) )
}
} catch {
2011-05-18 17:25:30 +02:00
case e : ZkNoNodeException ⇒ Array [ ( UUID , InetSocketAddress ) ] ( )
2011-04-27 01:10:00 +02:00
}
}
// =======================================
// Compute Grid
// =======================================
/* *
* Send a function 'Function0[ Unit ] ' to be invoked on a random number of nodes ( defined by 'replicationFactor' argument ) .
*/
2011-05-18 08:37:58 +02:00
def send ( f : Function0 [ Unit ] , replicationFactor : Int ) {
2011-06-14 19:35:18 +02:00
Serialization . serialize ( f ) match {
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒
val message = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( FUNCTION_FUN0_UNIT )
. setPayload ( ByteString . copyFrom ( bytes ) )
. build
replicaConnectionsForReplicationFactor ( replicationFactor ) foreach ( _ ! message )
}
2011-04-27 01:10:00 +02:00
}
/* *
* Send a function 'Function0[ Any ] ' to be invoked on a random number of nodes ( defined by 'replicationFactor' argument ) .
* Returns an 'Array' with all the 'Future's from the computation .
*/
def send ( f : Function0 [ Any ] , replicationFactor : Int ) : List [ Future [ Any ] ] = {
2011-06-14 19:35:18 +02:00
Serialization . serialize ( f ) match {
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒
val message = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( FUNCTION_FUN0_ANY )
. setPayload ( ByteString . copyFrom ( bytes ) )
. build
val results = replicaConnectionsForReplicationFactor ( replicationFactor ) map ( _ ? message )
results . toList . asInstanceOf [ List [ Future [ Any ] ] ]
}
2011-04-27 01:10:00 +02:00
}
/* *
* Send a function 'Function1[ Any , Unit ] ' to be invoked on a random number of nodes ( defined by 'replicationFactor' argument )
* with the argument speficied .
*/
2011-05-18 08:37:58 +02:00
def send ( f : Function1 [ Any , Unit ] , arg : Any , replicationFactor : Int ) {
2011-06-14 19:35:18 +02:00
Serialization . serialize ( ( f , arg ) ) match {
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒
val message = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( FUNCTION_FUN1_ARG_UNIT )
. setPayload ( ByteString . copyFrom ( bytes ) )
. build
replicaConnectionsForReplicationFactor ( replicationFactor ) foreach ( _ ! message )
}
2011-04-27 01:10:00 +02:00
}
/* *
* Send a function 'Function1[ Any , Any ] ' to be invoked on a random number of nodes ( defined by 'replicationFactor' argument )
* with the argument speficied .
* Returns an 'Array' with all the 'Future's from the computation .
*/
def send ( f : Function1 [ Any , Any ] , arg : Any , replicationFactor : Int ) : List [ Future [ Any ] ] = {
2011-06-14 19:35:18 +02:00
Serialization . serialize ( ( f , arg ) ) match {
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒
val message = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( FUNCTION_FUN1_ARG_ANY )
. setPayload ( ByteString . copyFrom ( bytes ) )
. build
val results = replicaConnectionsForReplicationFactor ( replicationFactor ) map ( _ ? message )
results . toList . asInstanceOf [ List [ Future [ Any ] ] ]
}
2011-04-27 01:10:00 +02:00
}
// =======================================
// Config
// =======================================
def setConfigElement ( key : String , bytes : Array [ Byte ] ) {
val compressedBytes = if ( shouldCompressData ) LZF . compress ( bytes ) else bytes
EventHandler . debug ( this ,
"Adding config value [%s] under key [%s] in cluster registry" . format ( key , compressedBytes ) )
zkClient . retryUntilConnected ( new Callable [ Either [ Unit , Exception ] ] ( ) {
2011-05-18 12:25:27 +02:00
def call : Either [ Unit , Exception ] = {
try {
Left ( zkClient . connection . create ( configurationPathFor ( key ) , compressedBytes , CreateMode . PERSISTENT ) )
} catch {
2011-05-18 17:25:30 +02:00
case e : KeeperException . NodeExistsException ⇒
2011-04-27 01:10:00 +02:00
try {
2011-04-29 15:47:56 +02:00
Left ( zkClient . connection . writeData ( configurationPathFor ( key ) , compressedBytes ) )
2011-05-18 12:25:27 +02:00
} catch {
2011-05-18 17:25:30 +02:00
case e : Exception ⇒ Right ( e )
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
}
}
2011-05-18 12:25:27 +02:00
} ) match {
2011-05-21 16:55:32 +02:00
case Left ( _ ) ⇒ /* do nothing */
2011-05-18 17:25:30 +02:00
case Right ( exception ) ⇒ throw exception
2011-05-18 12:25:27 +02:00
}
2011-04-27 01:10:00 +02:00
}
/* *
* Returns the config element for the key or NULL if no element exists under the key .
*/
2011-06-17 10:25:02 +02:00
def getConfigElement ( key : String ) : Option [ Array [ Byte ] ] = try {
Some ( zkClient . connection . readData ( configurationPathFor ( key ) , new Stat , true ) )
2011-04-27 01:10:00 +02:00
} catch {
2011-06-17 10:25:02 +02:00
case e : KeeperException . NoNodeException ⇒ None
2011-04-27 01:10:00 +02:00
}
2011-05-18 08:37:58 +02:00
def removeConfigElement ( key : String ) {
ignore [ ZkNoNodeException ] {
EventHandler . debug ( this ,
"Removing config element with key [%s] from cluster registry" . format ( key ) )
zkClient . deleteRecursive ( configurationPathFor ( key ) )
}
2011-04-27 01:10:00 +02:00
}
2011-06-22 09:59:00 +02:00
def getConfigElementKeys : Array [ String ] = zkClient . getChildren ( CONFIGURATION_PATH ) . toList . toArray . asInstanceOf [ Array [ String ] ]
2011-04-27 01:10:00 +02:00
// =======================================
// Private
// =======================================
2011-06-22 09:59:00 +02:00
private [ cluster ] def membershipPathFor ( node : String ) = "%s/%s" . format ( MEMBERSHIP_PATH , node )
2011-05-18 12:25:27 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def configurationPathFor ( key : String ) = "%s/%s" . format ( CONFIGURATION_PATH , key )
2011-04-27 01:10:00 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def actorAddressToUuidsPathFor ( actorAddress : String ) = "%s/%s" . format ( ACTOR_ADDRESS_TO_UUIDS_PATH , actorAddress . replace ( '.' , '_' ) )
2011-04-27 01:10:00 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def actorLocationsPathFor ( uuid : UUID ) = "%s/%s" . format ( ACTOR_LOCATIONS_PATH , uuid )
2011-04-27 01:10:00 +02:00
2011-04-29 15:47:56 +02:00
private [ cluster ] def actorLocationsPathFor ( uuid : UUID , node : NodeAddress ) =
2011-06-22 09:59:00 +02:00
"%s/%s/%s" . format ( ACTOR_LOCATIONS_PATH , uuid , node . nodeName )
2011-04-27 01:10:00 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def actorsAtNodePathFor ( node : String ) = "%s/%s" . format ( ACTORS_AT_PATH_PATH , node )
2011-05-18 12:25:27 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def actorAtNodePathFor ( node : String , uuid : UUID ) = "%s/%s/%s" . format ( ACTORS_AT_PATH_PATH , node , uuid )
2011-05-18 12:25:27 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def actorRegistryPathFor ( uuid : UUID ) = "%s/%s" . format ( ACTOR_REGISTRY_PATH , uuid )
2011-05-18 12:25:27 +02:00
2011-06-22 09:59:00 +02:00
private [ cluster ] def actorRegistrySerializerPathFor ( uuid : UUID ) = "%s/%s" . format ( actorRegistryPathFor ( uuid ) , "serializer" )
2011-05-18 12:25:27 +02:00
private [ cluster ] def actorRegistryActorAddressPathFor ( uuid : UUID ) = "%s/%s" . format ( actorRegistryPathFor ( uuid ) , "address" )
private [ cluster ] def actorRegistryNodePathFor ( uuid : UUID ) : String = "%s/%s" . format ( actorRegistryPathFor ( uuid ) , "node" )
2011-04-27 01:10:00 +02:00
2011-04-29 15:47:56 +02:00
private [ cluster ] def actorRegistryNodePathFor ( uuid : UUID , address : InetSocketAddress ) : String =
"%s/%s:%s" . format ( actorRegistryNodePathFor ( uuid ) , address . getHostName , address . getPort )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
private [ cluster ] def initializeNode ( ) {
2011-05-23 22:35:01 +02:00
EventHandler . info ( this ,
( "\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]" )
. format ( nodeAddress . clusterName , nodeAddress . nodeName , nodeAddress . port , zkServerAddresses , serializer ) )
2011-05-20 14:36:26 +02:00
EventHandler . info ( this , "Starting up remote server [%s]" . format ( remoteServerAddress . toString ) )
2011-05-18 08:37:58 +02:00
createRootClusterNode ( )
2011-05-30 10:53:25 +02:00
val isLeader = joinLeaderElection ( )
2011-05-18 08:37:58 +02:00
if ( isLeader ) createNodeStructureIfNeeded ( )
2011-05-30 10:53:25 +02:00
registerListeners ( )
2011-06-22 09:59:00 +02:00
joinMembershipPath ( )
joinActorsAtAddressPath ( )
fetchMembershipNodes ( )
2011-04-27 01:10:00 +02:00
EventHandler . info ( this , "Cluster node [%s] started successfully" . format ( nodeAddress ) )
}
2011-06-17 10:25:02 +02:00
private [ cluster ] def addressForNode ( node : String ) : Option [ InetSocketAddress ] = {
try {
val address = zkClient . readData ( membershipPathFor ( node ) ) . asInstanceOf [ String ]
val tokenizer = new java . util . StringTokenizer ( address , ":" )
tokenizer . nextToken // cluster name
tokenizer . nextToken // node name
val hostname = tokenizer . nextToken // hostname
val port = tokenizer . nextToken . toInt // port
Some ( new InetSocketAddress ( hostname , port ) )
} catch {
case e : ZkNoNodeException ⇒ None
}
2011-04-27 01:10:00 +02:00
}
2011-05-25 16:18:35 +02:00
private def actorUuidsForActorAddress ( actorAddress : String ) : Array [ UUID ] =
uuidsForActorAddress ( actorAddress ) filter ( _ ne null )
2011-04-27 01:10:00 +02:00
/* *
* Returns a random set with replica connections of size 'replicationFactor' .
* Default replicationFactor is 0 , which returns the empty set .
*/
private def replicaConnectionsForReplicationFactor ( replicationFactor : Int = 0 ) : Set [ ActorRef ] = {
var replicas = HashSet . empty [ ActorRef ]
if ( replicationFactor < 1 ) return replicas
2011-05-25 16:18:35 +02:00
connectToAllMembershipNodesInCluster ( )
2011-04-27 01:10:00 +02:00
val numberOfReplicas = replicaConnections . size
2011-05-18 12:25:27 +02:00
val replicaConnectionsAsArray = replicaConnections . toList map {
2011-05-18 17:25:30 +02:00
case ( node , ( address , actorRef ) ) ⇒ actorRef
2011-05-18 12:25:27 +02:00
} // the ActorRefs
2011-04-27 01:10:00 +02:00
if ( numberOfReplicas < replicationFactor ) {
throw new IllegalArgumentException (
2011-05-30 10:53:25 +02:00
"Replication factor [" + replicationFactor +
"] is greater than the number of available nodes [" + numberOfReplicas + "]" )
2011-04-27 01:10:00 +02:00
} else if ( numberOfReplicas == replicationFactor ) {
replicas = replicas ++ replicaConnectionsAsArray
} else {
val random = new java . util . Random ( System . currentTimeMillis )
while ( replicas . size < replicationFactor ) {
val index = random . nextInt ( numberOfReplicas )
replicas = replicas + replicaConnectionsAsArray ( index )
}
}
replicas
}
/* *
* Connect to all available replicas unless already connected ) .
*/
2011-05-25 16:18:35 +02:00
private def connectToAllMembershipNodesInCluster ( ) {
2011-05-18 17:25:30 +02:00
membershipNodes foreach { node ⇒
2011-05-25 17:41:53 +02:00
if ( ( node != Config . nodename ) ) { // no replica on the "home" node of the ref
if ( ! replicaConnections . contains ( node ) ) { // only connect to each replica once
2011-06-17 10:25:02 +02:00
val addressOption = addressForNode ( node )
if ( addressOption . isDefined ) {
val address = addressOption . get
EventHandler . debug ( this ,
"Connecting to replica with nodename [%s] and address [%s]" . format ( node , address ) )
val clusterDaemon = Actor . remote . actorFor ( RemoteClusterDaemon . ADDRESS , address . getHostName , address . getPort )
replicaConnections . put ( node , ( address , clusterDaemon ) )
}
2011-05-25 17:41:53 +02:00
}
2011-05-18 17:25:30 +02:00
}
2011-04-27 01:10:00 +02:00
}
}
2011-06-22 09:59:00 +02:00
private [ cluster ] def joinMembershipPath ( ) {
2011-06-17 11:56:08 +02:00
nodeNameToAddress += ( nodeAddress . nodeName -> remoteServerAddress )
2011-04-27 01:10:00 +02:00
try {
EventHandler . info ( this ,
"Joining cluster as membership node [%s] on [%s]" . format ( nodeAddress , membershipNodePath ) )
zkClient . createEphemeral ( membershipNodePath , nodeAddress . toString )
} catch {
2011-05-18 17:25:30 +02:00
case e : ZkNodeExistsException ⇒
2011-04-27 01:10:00 +02:00
val error = new ClusterException ( "Can't join the cluster. The node name [" + nodeAddress . nodeName + "] is already in by another node" )
2011-05-30 10:53:25 +02:00
EventHandler . error ( error , this , error . toString )
2011-04-27 01:10:00 +02:00
throw error
}
}
2011-06-22 09:59:00 +02:00
private [ cluster ] def joinActorsAtAddressPath ( ) {
2011-04-29 15:47:56 +02:00
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorsAtNodePathFor ( nodeAddress . nodeName ) ) )
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-30 10:53:25 +02:00
private [ cluster ] def joinLeaderElection ( ) : Boolean = {
2011-04-27 01:10:00 +02:00
EventHandler . info ( this , "Node [%s] is joining leader election" . format ( nodeAddress . nodeName ) )
2011-06-22 09:59:00 +02:00
try {
leaderLock . lock
} catch {
case e : KeeperException . NodeExistsException ⇒ false
}
2011-04-27 01:10:00 +02:00
}
private [ cluster ] def failOverConnections ( from : InetSocketAddress , to : InetSocketAddress ) {
clusterActorRefs . values ( from ) foreach ( _ . failOver ( from , to ) )
}
private [ cluster ] def migrateFromFailedNodes [ T <: Actor ] ( currentSetOfClusterNodes : List [ String ] ) = {
2011-05-18 17:25:30 +02:00
findFailedNodes ( currentSetOfClusterNodes ) . foreach { failedNodeName ⇒
val allNodes = locallyCachedMembershipNodes . toList
val myIndex = allNodes . indexWhere ( _ . endsWith ( nodeAddress . nodeName ) )
val failedNodeIndex = allNodes . indexWhere ( _ == failedNodeName )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
// Migrate to the successor of the failed node (using a sorted circular list of the node names)
if ( ( failedNodeIndex == 0 && myIndex == locallyCachedMembershipNodes . size - 1 ) || // No leftmost successor exists, check the tail
( failedNodeIndex == myIndex + 1 ) ) {
// Am I the leftmost successor?
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
// Yes I am the node to migrate the actor to (can only be one in the cluster)
val actorUuidsForFailedNode = zkClient . getChildren ( actorsAtNodePathFor ( failedNodeName ) )
EventHandler . debug ( this ,
"Migrating actors from failed node [%s] to node [%s]: Actor UUIDs [%s]"
. format ( failedNodeName , nodeAddress . nodeName , actorUuidsForFailedNode ) )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
actorUuidsForFailedNode . foreach { uuid ⇒
2011-04-27 01:10:00 +02:00
EventHandler . debug ( this ,
2011-05-18 17:25:30 +02:00
"Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
. format ( failedNodeName , uuid , nodeAddress . nodeName ) )
2011-06-17 10:25:02 +02:00
val actorAddressOption = actorAddressForUuid ( uuidFrom ( uuid ) )
if ( actorAddressOption . isDefined ) {
val actorAddress = actorAddressOption . get
migrateWithoutCheckingThatActorResidesOnItsHomeNode ( // since the ephemeral node is already gone, so can't check
NodeAddress ( nodeAddress . clusterName , failedNodeName ) , nodeAddress , actorAddress )
2011-06-22 09:59:00 +02:00
val serializer : Serializer = serializerForActor ( actorAddress )
use ( actorAddress , serializer ) foreach { actor ⇒
2011-06-17 10:25:02 +02:00
// FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
//actor.homeAddress = remoteServerAddress
val homeAddress = classOf [ LocalActorRef ] . getDeclaredField ( "homeAddress" )
homeAddress . setAccessible ( true )
homeAddress . set ( actor , Some ( remoteServerAddress ) )
remoteService . register ( actorAddress , actor )
}
2011-04-27 01:10:00 +02:00
}
2011-05-18 17:25:30 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
2011-06-17 11:56:08 +02:00
val from = nodeNameToAddress ( failedNodeName )
2011-05-18 17:25:30 +02:00
val to = remoteServerAddress
2011-06-14 19:35:18 +02:00
Serialization . serialize ( ( from , to ) ) match {
case Left ( error ) ⇒ throw error
case Right ( bytes ) ⇒
val command = RemoteDaemonMessageProtocol . newBuilder
. setMessageType ( FAIL_OVER_CONNECTIONS )
. setPayload ( ByteString . copyFrom ( bytes ) )
. build
membershipNodes foreach { node ⇒
replicaConnections . get ( node ) foreach {
case ( _ , connection ) ⇒
connection ! command
}
}
2011-04-27 01:10:00 +02:00
}
2011-05-18 17:25:30 +02:00
}
2011-04-27 01:10:00 +02:00
}
}
/* *
* Used when the ephemeral "home" node is already gone , so we can 't check .
*/
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode (
2011-05-18 17:25:30 +02:00
from : NodeAddress , to : NodeAddress , actorAddress : String ) {
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
actorUuidsForActorAddress ( actorAddress ) map { uuid ⇒
2011-06-17 10:25:02 +02:00
val actorAddressOption = actorAddressForUuid ( uuid )
if ( actorAddressOption . isDefined ) {
val actorAddress = actorAddressOption . get
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
if ( ! isInUseOnNode ( actorAddress , to ) ) {
release ( actorAddress )
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
val newAddress = new InetSocketAddress ( to . hostname , to . port )
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorRegistryNodePathFor ( uuid ) ) )
ignore [ ZkNodeExistsException ] ( zkClient . createEphemeral ( actorRegistryNodePathFor ( uuid , newAddress ) ) )
ignore [ ZkNodeExistsException ] ( zkClient . createEphemeral ( actorLocationsPathFor ( uuid , to ) ) )
ignore [ ZkNodeExistsException ] ( zkClient . createPersistent ( actorAtNodePathFor ( nodeAddress . nodeName , uuid ) ) )
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
ignore [ ZkNoNodeException ] ( zkClient . delete ( actorLocationsPathFor ( uuid , from ) ) )
ignore [ ZkNoNodeException ] ( zkClient . delete ( actorAtNodePathFor ( from . nodeName , uuid ) ) )
2011-04-27 01:10:00 +02:00
2011-06-17 10:25:02 +02:00
// 'use' (check out) actor on the remote 'to' node
useActorOnNode ( to . nodeName , uuid )
}
2011-05-18 17:25:30 +02:00
}
2011-04-27 01:10:00 +02:00
}
}
private [ cluster ] def findFailedNodes ( nodes : List [ String ] ) : List [ String ] =
( locallyCachedMembershipNodes diff Set ( nodes : _ * ) ) . toList
private [ cluster ] def findNewlyConnectedMembershipNodes ( nodes : List [ String ] ) : List [ String ] =
( Set ( nodes : _ * ) diff locallyCachedMembershipNodes ) . toList
private [ cluster ] def findNewlyDisconnectedMembershipNodes ( nodes : List [ String ] ) : List [ String ] =
( locallyCachedMembershipNodes diff Set ( nodes : _ * ) ) . toList
private [ cluster ] def findNewlyConnectedAvailableNodes ( nodes : List [ String ] ) : List [ String ] =
( Set ( nodes : _ * ) diff locallyCachedMembershipNodes ) . toList
private [ cluster ] def findNewlyDisconnectedAvailableNodes ( nodes : List [ String ] ) : List [ String ] =
( locallyCachedMembershipNodes diff Set ( nodes : _ * ) ) . toList
2011-05-18 08:37:58 +02:00
private def createRootClusterNode ( ) {
ignore [ ZkNodeExistsException ] {
2011-06-22 09:59:00 +02:00
zkClient . create ( CLUSTER_PATH , null , CreateMode . PERSISTENT )
EventHandler . info ( this , "Created node [%s]" . format ( CLUSTER_PATH ) )
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
}
2011-05-18 08:37:58 +02:00
private def createNodeStructureIfNeeded ( ) {
2011-06-22 09:59:00 +02:00
basePaths . foreach { path ⇒
2011-05-18 17:25:30 +02:00
try {
2011-06-17 10:25:02 +02:00
ignore [ ZkNodeExistsException ] ( zkClient . create ( path , null , CreateMode . PERSISTENT ) )
2011-05-18 17:25:30 +02:00
EventHandler . debug ( this , "Created node [%s]" . format ( path ) )
} catch {
case e ⇒
val error = new ClusterException ( e . toString )
2011-05-20 09:08:11 +02:00
EventHandler . error ( error , this )
2011-05-18 17:25:30 +02:00
throw error
}
2011-04-27 01:10:00 +02:00
}
}
2011-05-30 10:53:25 +02:00
private def registerListeners ( ) = {
2011-04-27 01:10:00 +02:00
zkClient . subscribeStateChanges ( stateListener )
2011-06-22 09:59:00 +02:00
zkClient . subscribeChildChanges ( MEMBERSHIP_PATH , membershipListener )
2011-04-27 01:10:00 +02:00
}
2011-06-17 10:25:02 +02:00
private def unregisterListeners ( ) = {
zkClient . unsubscribeStateChanges ( stateListener )
2011-06-22 09:59:00 +02:00
zkClient . unsubscribeChildChanges ( MEMBERSHIP_PATH , membershipListener )
2011-06-17 10:25:02 +02:00
}
2011-06-22 09:59:00 +02:00
private def fetchMembershipNodes ( ) {
val membershipChildren = zkClient . getChildren ( MEMBERSHIP_PATH )
2011-05-18 08:37:58 +02:00
locallyCachedMembershipNodes . clear ( )
2011-04-27 01:10:00 +02:00
membershipChildren . iterator . foreach ( locallyCachedMembershipNodes . add )
}
private def createMBean = {
val clusterMBean = new StandardMBean ( classOf [ ClusterNodeMBean ] ) with ClusterNodeMBean {
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
import Cluster._
2011-05-21 16:55:32 +02:00
override def start ( ) : Unit = self . start ( )
2011-05-18 12:25:27 +02:00
2011-05-21 16:55:32 +02:00
override def stop ( ) : Unit = self . shutdown ( )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
override def disconnect ( ) = self . disconnect ( )
2011-04-27 01:10:00 +02:00
2011-05-21 16:55:32 +02:00
override def reconnect ( ) : Unit = self . reconnect ( )
2011-05-18 12:25:27 +02:00
2011-05-21 16:55:32 +02:00
override def resign ( ) : Unit = self . resign ( )
2011-05-18 12:25:27 +02:00
override def isConnected = self . isConnected . isOn
override def getRemoteServerHostname = self . nodeAddress . hostname
override def getRemoteServerPort = self . nodeAddress . port
override def getNodeName = self . nodeAddress . nodeName
override def getClusterName = self . nodeAddress . clusterName
override def getZooKeeperServerAddresses = self . zkServerAddresses
override def getMemberNodes = self . locallyCachedMembershipNodes . iterator . map ( _ . toString ) . toArray
override def getLeader = self . leader . toString
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getUuidsForActorsInUse = self . uuidsForActorsInUse . map ( _ . toString ) . toArray
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getAddressesForActorsInUse = self . addressesForActorsInUse . map ( _ . toString ) . toArray
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getUuidsForClusteredActors = self . uuidsForClusteredActors . map ( _ . toString ) . toArray
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getAddressesForClusteredActors = self . addressesForClusteredActors . map ( _ . toString ) . toArray
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getNodesForActorInUseWithUuid ( uuid : String ) = self . nodesForActorsInUseWithUuid ( stringToUuid ( uuid ) )
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
override def getNodesForActorInUseWithAddress ( id : String ) = self . nodesForActorsInUseWithAddress ( id )
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getUuidsForActorsInUseOnNode ( nodeName : String ) = self . uuidsForActorsInUseOnNode ( nodeName ) . map ( _ . toString ) . toArray
2011-04-27 01:10:00 +02:00
2011-05-18 12:25:27 +02:00
override def getAddressesForActorsInUseOnNode ( nodeName : String ) = self . addressesForActorsInUseOnNode ( nodeName ) . map ( _ . toString ) . toArray
2011-05-21 16:55:32 +02:00
override def setConfigElement ( key : String , value : String ) : Unit = self . setConfigElement ( key , value . getBytes ( "UTF-8" ) )
2011-05-18 12:25:27 +02:00
2011-06-17 10:25:02 +02:00
override def getConfigElement ( key : String ) = new String ( self . getConfigElement ( key ) . getOrElse ( Array [ Byte ] ( ) ) , "UTF-8" )
2011-05-18 12:25:27 +02:00
2011-05-21 16:55:32 +02:00
override def removeConfigElement ( key : String ) : Unit = self . removeConfigElement ( key )
2011-05-18 12:25:27 +02:00
override def getConfigElementKeys = self . getConfigElementKeys . toArray
2011-04-27 01:10:00 +02:00
}
JMX . register ( clusterJmxObjectName , clusterMBean )
2011-04-28 20:12:37 +02:00
// FIXME need monitoring to lookup the cluster MBean dynamically
// Monitoring.registerLocalMBean(clusterJmxObjectName, clusterMBean)
2011-04-27 01:10:00 +02:00
}
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
class MembershipChildListener ( self : ClusterNode ) extends IZkChildListener with ErrorHandler {
2011-05-18 08:37:58 +02:00
def handleChildChange ( parentPath : String , currentChilds : JList [ String ] ) {
withErrorHandler {
if ( currentChilds ne null ) {
val childList = currentChilds . toList
if ( ! childList . isEmpty ) EventHandler . debug ( this ,
"MembershipChildListener at [%s] has children [%s]"
. format ( self . nodeAddress . nodeName , childList . mkString ( " " ) ) )
2011-05-18 17:25:30 +02:00
self . findNewlyConnectedMembershipNodes ( childList ) foreach { name ⇒
2011-06-17 11:56:08 +02:00
self . addressForNode ( name ) foreach ( address ⇒ self . nodeNameToAddress += ( name -> address ) ) // update 'nodename-address' map
2011-05-23 22:35:01 +02:00
self . publish ( NodeConnected ( name ) )
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
self . findNewlyDisconnectedMembershipNodes ( childList ) foreach { name ⇒
2011-06-17 11:56:08 +02:00
self . nodeNameToAddress - name // update 'nodename-address' map
2011-05-23 22:35:01 +02:00
self . publish ( NodeDisconnected ( name ) )
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
2011-05-18 08:37:58 +02:00
self . locallyCachedMembershipNodes . clear ( )
childList . foreach ( self . locallyCachedMembershipNodes . add )
}
2011-04-27 01:10:00 +02:00
}
}
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
class StateListener ( self : ClusterNode ) extends IZkStateListener {
2011-05-18 08:37:58 +02:00
def handleStateChanged ( state : KeeperState ) {
state match {
2011-05-18 17:25:30 +02:00
case KeeperState . SyncConnected ⇒
2011-05-18 08:37:58 +02:00
EventHandler . debug ( this , "Cluster node [%s] - Connected" . format ( self . nodeAddress ) )
2011-05-23 22:35:01 +02:00
self . publish ( ThisNode . Connected )
2011-05-18 17:25:30 +02:00
case KeeperState . Disconnected ⇒
2011-05-18 08:37:58 +02:00
EventHandler . debug ( this , "Cluster node [%s] - Disconnected" . format ( self . nodeAddress ) )
2011-05-23 22:35:01 +02:00
self . publish ( ThisNode . Disconnected )
2011-05-18 17:25:30 +02:00
case KeeperState . Expired ⇒
2011-05-18 08:37:58 +02:00
EventHandler . debug ( this , "Cluster node [%s] - Expired" . format ( self . nodeAddress ) )
2011-05-23 22:35:01 +02:00
self . publish ( ThisNode . Expired )
2011-05-18 08:37:58 +02:00
}
2011-04-27 01:10:00 +02:00
}
/* *
* Re - initialize after the zookeeper session has expired and a new session has been created .
*/
2011-05-18 08:37:58 +02:00
def handleNewSession ( ) {
2011-04-27 01:10:00 +02:00
EventHandler . debug ( this , "Session expired re-initializing node [%s]" . format ( self . nodeAddress ) )
2011-05-18 08:37:58 +02:00
self . initializeNode ( )
2011-05-23 22:35:01 +02:00
self . publish ( NewSession )
2011-04-27 01:10:00 +02:00
}
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
trait ErrorHandler {
2011-05-18 17:25:30 +02:00
def withErrorHandler [ T ] ( body : ⇒ T ) = {
2011-04-27 01:10:00 +02:00
try {
2011-06-17 10:25:02 +02:00
ignore [ ZkInterruptedException ] ( body )
2011-04-27 01:10:00 +02:00
} catch {
2011-05-18 17:25:30 +02:00
case e : Throwable ⇒
2011-04-27 01:10:00 +02:00
EventHandler . error ( e , this , e . toString )
throw e
}
}
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
object RemoteClusterDaemon {
2011-05-20 17:13:39 +02:00
val ADDRESS = "akka-cluster-daemon" . intern
2011-04-27 01:10:00 +02:00
2011-05-30 10:53:25 +02:00
// FIXME configure computeGridDispatcher to what?
val computeGridDispatcher = Dispatchers . newDispatcher ( "akka:cloud:cluster:compute-grid" ) . build
2011-04-27 01:10:00 +02:00
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
class RemoteClusterDaemon ( cluster : ClusterNode ) extends Actor {
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
import RemoteClusterDaemon._
import Cluster._
2011-05-20 22:41:41 +02:00
self . dispatcher = Dispatchers . newPinnedDispatcher ( self )
2011-04-27 01:10:00 +02:00
2011-05-30 10:53:25 +02:00
override def preRestart ( reason : Throwable ) {
EventHandler . debug ( this , "RemoteClusterDaemon failed due to [%s] restarting..." . format ( reason ) )
}
2011-04-27 01:10:00 +02:00
def receive : Receive = {
2011-05-18 17:25:30 +02:00
case message : RemoteDaemonMessageProtocol ⇒
2011-04-27 01:10:00 +02:00
EventHandler . debug ( this , "Received command to RemoteClusterDaemon [%s]" . format ( message ) )
2011-05-25 16:18:35 +02:00
2011-04-27 01:10:00 +02:00
message . getMessageType match {
2011-05-18 17:25:30 +02:00
case USE ⇒
2011-05-25 16:18:35 +02:00
try {
if ( message . hasActorUuid ) {
2011-06-17 10:25:02 +02:00
for {
address ← cluster . actorAddressForUuid ( uuidProtocolToUuid ( message . getActorUuid ) )
2011-06-22 09:59:00 +02:00
serializer ← cluster . serializerForActor ( address )
} cluster . use ( address , serializer )
2011-06-17 10:25:02 +02:00
2011-05-25 16:18:35 +02:00
} else if ( message . hasActorAddress ) {
val address = message . getActorAddress
2011-06-22 09:59:00 +02:00
cluster . serializerForActor ( address ) foreach ( serializer ⇒ cluster . use ( address , serializer ) )
2011-06-17 10:25:02 +02:00
2011-05-25 16:18:35 +02:00
} else {
EventHandler . warning ( this ,
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]"
. format ( message ) )
}
self . reply ( Success )
2011-06-17 10:25:02 +02:00
2011-05-25 16:18:35 +02:00
} catch {
case error ⇒
self . reply ( Failure ( error ) )
throw error
}
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
case RELEASE ⇒
2011-05-18 12:25:27 +02:00
if ( message . hasActorUuid ) {
2011-06-17 10:25:02 +02:00
cluster . actorAddressForUuid ( uuidProtocolToUuid ( message . getActorUuid ) ) foreach { address ⇒
cluster . release ( address )
}
2011-05-18 17:25:30 +02:00
} else if ( message . hasActorAddress ) {
2011-05-18 12:25:27 +02:00
cluster release message . getActorAddress
2011-05-25 16:18:35 +02:00
} else {
EventHandler . warning ( this ,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]"
. format ( message ) )
}
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
case START ⇒ cluster . start ( )
2011-04-27 01:10:00 +02:00
2011-05-20 14:36:26 +02:00
case STOP ⇒ cluster . shutdown ( )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
case DISCONNECT ⇒ cluster . disconnect ( )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
case RECONNECT ⇒ cluster . reconnect ( )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
case RESIGN ⇒ cluster . resign ( )
2011-04-27 01:10:00 +02:00
2011-05-18 17:25:30 +02:00
case FAIL_OVER_CONNECTIONS ⇒
2011-04-27 01:10:00 +02:00
val ( from , to ) = payloadFor ( message , classOf [ ( InetSocketAddress , InetSocketAddress ) ] )
cluster . failOverConnections ( from , to )
2011-05-18 17:25:30 +02:00
case FUNCTION_FUN0_UNIT ⇒
2011-06-22 09:59:00 +02:00
localActorOf ( new Actor ( ) {
2011-05-30 10:53:25 +02:00
self . dispatcher = computeGridDispatcher
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def receive = {
2011-05-18 17:25:30 +02:00
case f : Function0 [ Unit ] ⇒ try {
2011-05-18 12:25:27 +02:00
f ( )
} finally {
self . stop ( )
}
2011-04-27 01:10:00 +02:00
}
} ) . start ! payloadFor ( message , classOf [ Function0 [ Unit ] ] )
2011-05-18 17:25:30 +02:00
case FUNCTION_FUN0_ANY ⇒
2011-06-22 09:59:00 +02:00
localActorOf ( new Actor ( ) {
2011-05-30 10:53:25 +02:00
self . dispatcher = computeGridDispatcher
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def receive = {
2011-05-18 17:25:30 +02:00
case f : Function0 [ Any ] ⇒ try {
2011-05-18 12:25:27 +02:00
self . reply ( f ( ) )
} finally {
self . stop ( )
}
2011-04-27 01:10:00 +02:00
}
} ) . start forward payloadFor ( message , classOf [ Function0 [ Any ] ] )
2011-05-18 17:25:30 +02:00
case FUNCTION_FUN1_ARG_UNIT ⇒
2011-06-22 09:59:00 +02:00
localActorOf ( new Actor ( ) {
2011-05-30 10:53:25 +02:00
self . dispatcher = computeGridDispatcher
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def receive = {
2011-05-21 16:55:32 +02:00
case ( fun : Function [ Any , Unit ] , param : Any ) ⇒ try {
fun ( param )
2011-05-18 12:25:27 +02:00
} finally {
self . stop ( )
}
2011-04-27 01:10:00 +02:00
}
} ) . start ! payloadFor ( message , classOf [ Tuple2 [ Function1 [ Any , Unit ] , Any ] ] )
2011-05-18 17:25:30 +02:00
case FUNCTION_FUN1_ARG_ANY ⇒
2011-06-22 09:59:00 +02:00
localActorOf ( new Actor ( ) {
2011-05-30 10:53:25 +02:00
self . dispatcher = computeGridDispatcher
2011-05-18 12:25:27 +02:00
2011-04-27 01:10:00 +02:00
def receive = {
2011-05-21 16:55:32 +02:00
case ( fun : Function [ Any , Unit ] , param : Any ) ⇒ try {
self . reply ( fun ( param ) )
2011-05-18 12:25:27 +02:00
} finally {
self . stop ( )
}
2011-04-27 01:10:00 +02:00
}
} ) . start forward payloadFor ( message , classOf [ Tuple2 [ Function1 [ Any , Any ] , Any ] ] )
}
2011-05-18 17:25:30 +02:00
case unknown ⇒ EventHandler . warning ( this , "Unknown message [%s]" . format ( unknown ) )
2011-04-27 01:10:00 +02:00
}
private def payloadFor [ T ] ( message : RemoteDaemonMessageProtocol , clazz : Class [ T ] ) : T = {
2011-06-14 19:35:18 +02:00
Serialization . serialize ( message . getPayload . toByteArray , Some ( clazz ) ) . asInstanceOf [ T ]
2011-04-27 01:10:00 +02:00
}
}