Moved ClusterNode interface, NodeAddress and ChangeListener into akka-actor as real Trait instead of using structural typing.

Refactored boot dependency in Cluster/Actor/Deployer.
Added multi-jvm test for testing clustered actor deployment, check out as LocalActorRef and ClusterActorRef.

Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-05-23 22:35:01 +02:00
parent 7778c93c1c
commit ddb2a69c19
23 changed files with 709 additions and 265 deletions

View file

@ -38,6 +38,7 @@ import Compression.LZF
import akka.AkkaException
import akka.cluster.zookeeper._
import akka.cluster.ChangeListener._
import com.eaio.uuid.UUID
@ -153,7 +154,7 @@ object Cluster {
val UUID_PREFIX = "uuid:".intern
// config options
val name = config.getString("akka.cluster.name", "default")
val name = Config.clusterName
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
@ -162,64 +163,6 @@ object Cluster {
val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
val enableJMX = config.getBool("akka.enable-jmx", true)
/**
* Cluster membership change listener.
* For Scala API.
*/
trait ChangeListener {
def notify(event: ChangeNotification, client: ClusterNode) {
event match {
case NodeConnected(name) nodeConnected(name, client)
case NodeDisconnected(name) nodeDisconnected(name, client)
case NewLeader(name: String) newLeader(name, client)
case NewSession thisNodeNewSession(client)
case ThisNode.Connected thisNodeConnected(client)
case ThisNode.Disconnected thisNodeDisconnected(client)
case ThisNode.Expired thisNodeExpired(client)
}
}
def nodeConnected(node: String, client: ClusterNode) {}
def nodeDisconnected(node: String, client: ClusterNode) {}
def newLeader(name: String, client: ClusterNode) {}
def thisNodeNewSession(client: ClusterNode) {}
def thisNodeConnected(client: ClusterNode) {}
def thisNodeDisconnected(client: ClusterNode) {}
def thisNodeExpired(client: ClusterNode) {}
}
/**
* Cluster membership change listener.
* For Java API.
*/
abstract class ChangeListenerAdapter extends ChangeListener
sealed trait ChangeNotification
case class NodeConnected(node: String) extends ChangeNotification
case class NodeDisconnected(node: String) extends ChangeNotification
case class NewLeader(name: String) extends ChangeNotification
case object NewSession extends ChangeNotification
object ThisNode {
case object Connected extends ChangeNotification
case object Disconnected extends ChangeNotification
case object Expired extends ChangeNotification
}
@volatile
private var properties = Map.empty[String, String]
@ -249,14 +192,14 @@ object Cluster {
/**
* The node address.
*/
lazy val nodeAddress = NodeAddress(name, nodename, hostname, port)
val nodeAddress = NodeAddress(name, nodename, hostname, port)
/**
* The reference to the running ClusterNode.
*/
lazy val node: ClusterNode = {
val node = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
new ClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
new DefaultClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
}
/**
@ -319,6 +262,14 @@ object Cluster {
*/
def newZkClient: AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
def barrier(name: String, count: Int) =
ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count)
def barrier(name: String, count: Int, timeout: Duration) =
ZooKeeperBarrier(node.zkClient, node.nodeAddress.clusterName, name, node.nodeAddress.nodeName, count, timeout)
def uuidToString(uuid: UUID): String = uuid.toString
def stringToUuid(uuid: String): UUID = {
@ -348,35 +299,26 @@ object Cluster {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ClusterNode private[akka] (
class DefaultClusterNode private[akka] (
val nodeAddress: NodeAddress,
val zkServerAddresses: String,
val serializer: ZkSerializer) extends ErrorHandler {
val serializer: ZkSerializer) extends ErrorHandler with ClusterNode {
self
if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'")
val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
import Cluster._
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
val remoteClientLifeCycleListener = actorOf(new Actor {
lazy val remoteClientLifeCycleListener = actorOf(new Actor {
def receive = {
case RemoteClientError(cause, client, address) client.shutdownClientModule()
case RemoteClientDisconnected(client, address) client.shutdownClientModule()
case _ //ignore other
}
}, "akka.cluster.remoteClientLifeCycleListener").start()
lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
lazy val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
remote.start(nodeAddress.hostname, nodeAddress.port)
@ -386,8 +328,6 @@ class ClusterNode private[akka] (
}
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
// static nodes
val CLUSTER_NODE = "/" + nodeAddress.clusterName
val MEMBERSHIP_NODE = CLUSTER_NODE + "/members"
@ -409,17 +349,8 @@ class ClusterNode private[akka] (
val LEADER_ELECTION_NODE = CLUSTER_NODE + "/leader" // should NOT be part of 'baseNodes' only used by 'leaderLock'
val isConnected = new Switch(false)
val isLeader = new AtomicBoolean(false)
val electionNumber = new AtomicInteger(Integer.MAX_VALUE)
private val membershipNodePath = membershipPathFor(nodeAddress.nodeName)
// local caches of ZK data
private[akka] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
private[akka] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress]
private[akka] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]]
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
private[akka] val replicaConnections: ConcurrentMap[String, Tuple2[InetSocketAddress, ActorRef]] =
@ -436,13 +367,13 @@ class ClusterNode private[akka] (
private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
// resources
private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
lazy private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
private[cluster] val leaderElectionCallback = new LockListener {
lazy private[cluster] val leaderElectionCallback = new LockListener {
override def lockAcquired() {
EventHandler.info(this, "Node [%s] is the new leader".format(self.nodeAddress.nodeName))
self.isLeader.set(true)
self.publish(Cluster.NewLeader(self.nodeAddress.nodeName))
self.publish(NewLeader(self.nodeAddress.nodeName))
}
override def lockReleased() {
@ -453,7 +384,7 @@ class ClusterNode private[akka] (
}
}
private[cluster] val leaderLock = new WriteLock(
lazy private[cluster] val leaderLock = new WriteLock(
zkClient.connection.getZookeeper, LEADER_ELECTION_NODE, null, leaderElectionCallback) {
// ugly hack, but what do you do? <--- haha epic
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
@ -468,8 +399,6 @@ class ClusterNode private[akka] (
// Node
// =======================================
def isRunning: Boolean = isConnected.isOn
def start(): ClusterNode = {
isConnected switchOn {
initializeNode()
@ -738,13 +667,13 @@ class ClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String): Array[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, format: Serializer): Array[LocalActorRef] = if (isConnected.isOn) {
def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@ -780,8 +709,8 @@ class ClusterNode private[akka] (
actor.asInstanceOf[LocalActorRef]
case Right(exception) throw exception
}
}
} else Array.empty[LocalActorRef]
} headOption // FIXME should not be an array at all coming here
} else None
/**
* Using (checking out) all actors with a specific UUID on all nodes in the cluster.
@ -1151,22 +1080,6 @@ class ClusterNode private[akka] (
def getConfigElementKeys: Array[String] = zkClient.getChildren(CONFIGURATION_NODE).toList.toArray.asInstanceOf[Array[String]]
// =======================================
// Queue
// =======================================
def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(zkClient, rootPath, blocking)
// =======================================
// Barrier
// =======================================
def barrier(name: String, count: Int) =
ZooKeeperBarrier(zkClient, nodeAddress.clusterName, name, nodeAddress.nodeName, count)
def barrier(name: String, count: Int, timeout: Duration) =
ZooKeeperBarrier(zkClient, nodeAddress.clusterName, name, nodeAddress.nodeName, count, timeout)
// =======================================
// Private
// =======================================
@ -1198,7 +1111,14 @@ class ClusterNode private[akka] (
"%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort)
private[cluster] def initializeNode() {
EventHandler.info(this, "Initializing cluster node [%s]".format(nodeAddress))
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createRootClusterNode()
val isLeader = joinLeaderElection
@ -1499,12 +1419,12 @@ class MembershipChildListener(self: ClusterNode) extends IZkChildListener with E
.format(self.nodeAddress.nodeName, childList.mkString(" ")))
self.findNewlyConnectedMembershipNodes(childList) foreach { name
self.nodeNameToAddress.put(name, self.addressForNode(name)) // update 'nodename-address' map
self.publish(Cluster.NodeConnected(name))
self.publish(NodeConnected(name))
}
self.findNewlyDisconnectedMembershipNodes(childList) foreach { name
self.nodeNameToAddress.remove(name) // update 'nodename-address' map
self.publish(Cluster.NodeDisconnected(name))
self.publish(NodeDisconnected(name))
}
self.locallyCachedMembershipNodes.clear()
@ -1522,13 +1442,13 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
state match {
case KeeperState.SyncConnected
EventHandler.debug(this, "Cluster node [%s] - Connected".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Connected)
self.publish(ThisNode.Connected)
case KeeperState.Disconnected
EventHandler.debug(this, "Cluster node [%s] - Disconnected".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Disconnected)
self.publish(ThisNode.Disconnected)
case KeeperState.Expired
EventHandler.debug(this, "Cluster node [%s] - Expired".format(self.nodeAddress))
self.publish(Cluster.ThisNode.Expired)
self.publish(ThisNode.Expired)
}
}
@ -1538,7 +1458,7 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
def handleNewSession() {
EventHandler.debug(this, "Session expired re-initializing node [%s]".format(self.nodeAddress))
self.initializeNode()
self.publish(Cluster.NewSession)
self.publish(NewSession)
}
}