Merge branch 'wip-new-serialization'

This commit is contained in:
Jonas Bonér 2011-05-20 09:31:57 +02:00
commit b95382c3e2
19 changed files with 272 additions and 310 deletions

View file

@ -27,13 +27,14 @@ import RemoteDaemonMessageType._
import akka.util._
import Helpers._
import akka.actor._
import akka.actor.Actor._
import Actor._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
import akka.config.Config._
import akka.serialization.{ Format, Serializers }
import akka.serialization.Compression.LZF
import akka.config.Config
import Config._
import akka.serialization.{ Format, Serializers, Serializer, Compression }
import Compression.LZF
import akka.AkkaException
import akka.cluster.zookeeper._
@ -48,6 +49,43 @@ import java.util.{ List ⇒ JList }
class ClusterException(message: String) extends AkkaException(message)
/**
* Node address holds the node name and the cluster name and can be used as a hash lookup key for a Node instance.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class NodeAddress(
val clusterName: String,
val nodeName: String,
val hostname: String,
val port: Int) {
if ((hostname eq null) || hostname == "") throw new NullPointerException("Host name must not be null or empty string")
if ((nodeName eq null) || nodeName == "") throw new NullPointerException("Node name must not be null or empty string")
if ((clusterName eq null) || clusterName == "") throw new NullPointerException("Cluster name must not be null or empty string")
if (port < 1) throw new NullPointerException("Port can not be negative")
override def toString = "%s:%s:%s:%s".format(clusterName, nodeName, hostname, port)
override def hashCode = 0 + clusterName.## + nodeName.## + hostname.## + port.##
override def equals(other: Any) = NodeAddress.unapply(this) == NodeAddress.unapply(other)
}
object NodeAddress {
def apply(
clusterName: String = Cluster.name,
nodeName: String = Config.nodename,
hostname: String = Config.hostname,
port: Int = Config.remoteServerPort): NodeAddress =
new NodeAddress(clusterName, nodeName, hostname, port)
def unapply(other: Any) = other match {
case address: NodeAddress Some((address.clusterName, address.nodeName, address.hostname, address.port))
case _ None
}
}
/**
* JMX MBean for the cluster service.
*
@ -106,7 +144,7 @@ trait ClusterNodeMBean {
}
/**
* Factory object for ClusterNode. Also holds global state such as configuration data etc.
* Module for the ClusterNode. Also holds global state such as configuration data etc.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -182,16 +220,46 @@ object Cluster {
}
type Nodes = HashMap[NodeAddress, ClusterNode]
@volatile
private var properties = Map.empty[String, String]
def setProperty(property: (String, String)) {
properties = properties + property
}
private def nodename: String = {
val overridden = properties.get("akka.cluster.nodename")
if (overridden.isDefined) overridden.get
else Config.nodename
}
private def hostname: String = {
val overridden = properties.get("akka.cluster.hostname")
if (overridden.isDefined) overridden.get
else Config.hostname
}
private def port: Int = {
val overridden = properties.get("akka.cluster.port")
if (overridden.isDefined) overridden.get.toInt
else Config.remoteServerPort
}
val defaultSerializer = new SerializableSerializer
private val _zkServer = new AtomicReference[Option[ZkServer]](None)
private val _nodes = new AtomicReference[Nodes](new Nodes)
private val _clusterNames = new ConcurrentSkipListSet[String]
private[cluster] def updateNodes(f: Nodes Nodes) {
while (Some(_nodes.get).map(node _nodes.compareAndSet(node, f(node)) == false).get) {}
/**
* The node address.
*/
lazy val nodeAddress = NodeAddress(name, nodename, hostname, port)
/**
* The reference to the running ClusterNode.
*/
lazy val node: ClusterNode = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
new ClusterNode(nodeAddress, zooKeeperServers, defaultSerializer)
}
/**
@ -199,83 +267,6 @@ object Cluster {
*/
def lookupLocalhostName = NetworkUtil.getLocalhostName
/**
* Returns all the nodes created by this Cluster object, e.g. created in this class loader hierarchy in this JVM.
*/
def nodes = _nodes.get
/**
* Returns an Array with NodeAddress for all the nodes in a specific cluster.
*/
def nodesInCluster(clusterName: String): Array[NodeAddress] = _nodes.get.filter(_._1 == clusterName).map(_._1).toArray
/**
* Returns the NodeAddress for a random node in a specific cluster.
*/
def randomNodeInCluster(clusterName: String): NodeAddress = {
val nodes = nodesInCluster(clusterName)
val random = new java.util.Random
nodes(random.nextInt(nodes.length))
}
/**
* Returns the names of all clusters that this JVM is connected to.
*/
def clusters: Array[String] = _clusterNames.toList.toArray
/**
* Returns the node for a specific NodeAddress.
*/
def nodeFor(nodeAddress: NodeAddress) = _nodes.get()(nodeAddress)
/**
* Creates a new cluster node; ClusterNode.
*/
def apply(
nodeAddress: NodeAddress,
zkServerAddresses: String = Cluster.zooKeeperServers,
serializer: ZkSerializer = Cluster.defaultSerializer): ClusterNode =
newNode(nodeAddress, zkServerAddresses, serializer)
/**
* Creates a new cluster node; ClusterNode.
*/
def newNode(nodeAddress: NodeAddress): ClusterNode =
newNode(nodeAddress, Cluster.zooKeeperServers, Cluster.defaultSerializer)
/**
* Creates a new cluster node; ClusterNode.
*/
def newNode(nodeAddress: NodeAddress, zkServerAddresses: String): ClusterNode =
newNode(nodeAddress, zkServerAddresses, Cluster.defaultSerializer)
/**
* Creates a new cluster node; ClusterNode.
*/
def newNode(nodeAddress: NodeAddress, serializer: ZkSerializer): ClusterNode =
newNode(nodeAddress, Cluster.zooKeeperServers, serializer)
/**
* Creates a new cluster node; ClusterNode.
*/
def newNode(
nodeAddress: NodeAddress,
zkServerAddresses: String,
serializer: ZkSerializer): ClusterNode = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
val node = new ClusterNode(
nodeAddress,
if ((zkServerAddresses eq null) || zkServerAddresses == "") Cluster.zooKeeperServers else zkServerAddresses,
if (serializer eq null) Cluster.defaultSerializer else serializer)
// FIXME Cluster nodes are never removed?
updateNodes(_ + (nodeAddress -> node))
_clusterNames add nodeAddress.clusterName
node
}
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
@ -315,44 +306,21 @@ object Cluster {
}
}
/**
* Resets all clusters managed connected to in this JVM.
* <p/>
* <b>WARNING: Use with care</b>
*/
def reset() {
withPrintStackTraceOnError {
EventHandler.info(this, "Resetting all clusters connected to in this JVM")
if (!clusters.isEmpty) {
nodes foreach { tp
val (_, node) = tp
node.disconnect()
node.remoteService.shutdown()
}
implicit val zkClient = newZkClient
clusters foreach (resetNodesInCluster(_))
ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode))
zkClient.close()
}
}
}
/**
* Resets all nodes in a specific cluster.
*/
def resetNodesInCluster(clusterName: String)(implicit zkClient: AkkaZkClient = newZkClient) = withPrintStackTraceOnError {
EventHandler.info(this, "Resetting nodes in cluster [%s]".format(clusterName))
ignore[ZkNoNodeException](zkClient.deleteRecursive("/" + clusterName))
}
/**
* Shut down the local ZooKeeper server.
*/
def shutdownLocalCluster() {
withPrintStackTraceOnError {
EventHandler.info(this, "Shuts down local cluster")
reset()
node.disconnect()
node.remoteService.shutdown()
implicit val zkClient = newZkClient
ignore[ZkNoNodeException](zkClient.deleteRecursive("/" + name))
ignore[ZkNoNodeException](zkClient.deleteRecursive(ZooKeeperBarrier.BarriersNode))
zkClient.close()
_zkServer.get.foreach(_.shutdown())
_zkServer.set(None)
}
@ -404,11 +372,12 @@ class ClusterNode private[akka] (
EventHandler.info(this,
("\nCreating cluster node with" +
"\n\tnode name = [%s]" +
"\n\tcluster name = [%s]" +
"\n\tnode name = [%s]" +
"\n\tport = [%s]" +
"\n\tzookeeper server addresses = [%s]" +
"\n\tserializer = [%s]")
.format(nodeAddress.nodeName, nodeAddress.clusterName, zkServerAddresses, serializer))
.format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
val remoteClientLifeCycleListener = actorOf(new Actor {
def receive = {
@ -541,7 +510,6 @@ class ClusterNode private[akka] (
registry.local.actors.filter(remoteService.hasListener).foreach(_.stop())
replicaConnections.clear()
updateNodes(_ - nodeAddress)
disconnect()
EventHandler.info(this, "Cluster node shut down [%s]".format(nodeAddress))
@ -600,57 +568,69 @@ class ClusterNode private[akka] (
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorClass: Class[T], address: String)(implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, 0, false)
def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorClass: Class[T], address: String, replicationFactor: Int)(implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, replicationFactor, false)
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorClass: Class[T], address: String, serializeMailbox: Boolean)(implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, 0, serializeMailbox)
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorClass: Class[T], address: String, replicationFactor: Int, serializeMailbox: Boolean)(implicit format: Format[T]): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, serializeMailbox)
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorRef: ActorRef)(implicit format: Format[T]): ClusterNode = store(actorRef, 0, false)
def store(actorRef: ActorRef, format: Serializer): ClusterNode =
store(actorRef, 0, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorRef: ActorRef, replicationFactor: Int)(implicit format: Format[T]): ClusterNode = store(actorRef, replicationFactor, false)
def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorRef: ActorRef, serializeMailbox: Boolean)(implicit format: Format[T]): ClusterNode = store(actorRef, 0, serializeMailbox)
def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, 0, serializeMailbox, format)
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, serializeMailbox, format.asInstanceOf[Serializer])
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean)(implicit format: Format[T]): ClusterNode = if (isConnected.isOn) {
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@ -771,7 +751,7 @@ class ClusterNode private[akka] (
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String)(
implicit format: Format[T] = formatForActor(actorAddress)): Array[LocalActorRef] = if (isConnected.isOn) {
implicit format: Serializer = formatForActor(actorAddress)): Array[LocalActorRef] = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@ -1046,16 +1026,16 @@ class ClusterNode private[akka] (
/**
* Returns Format for actor with UUID.
*/
def formatForActor[T <: Actor](actorAddress: String): Format[T] = {
def formatForActor(actorAddress: String): Serializer = {
val formats = actorUuidsForActorAddress(actorAddress) map { uuid
zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Format[T]]
zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Serializer]
}
val format = formats.head
if (formats.isEmpty) throw new IllegalStateException("No Format found for [%s]".format(actorAddress))
if (formats.isEmpty) throw new IllegalStateException("No Serializer found for [%s]".format(actorAddress))
if (formats map (_ == format) exists (_ == false)) throw new IllegalStateException(
"Multiple Format classes found for [%s]".format(actorAddress))
"Multiple Serializer classes found for [%s]".format(actorAddress))
format
}
@ -1347,7 +1327,7 @@ class ClusterNode private[akka] (
migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check
NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
implicit val format: Format[T] = formatForActor(actorAddress)
implicit val format: Serializer = formatForActor(actorAddress)
use(actorAddress) foreach { actor
// FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
//actor.homeAddress = remoteServerAddress
@ -1434,7 +1414,7 @@ class ClusterNode private[akka] (
case e: ZkNodeExistsException {} // do nothing
case e
val error = new ClusterException(e.toString)
EventHandler.error(error, this, "")
EventHandler.error(error, this)
throw error
}
}
@ -1629,11 +1609,11 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
if (message.hasActorUuid) {
val uuid = uuidProtocolToUuid(message.getActorUuid)
val address = cluster.actorAddressForUuid(uuid)
implicit val format: Format[Actor] = cluster formatForActor address
implicit val format: Serializer = cluster formatForActor address
val actors = cluster use address
} else if (message.hasActorAddress) {
val address = message.getActorAddress
implicit val format: Format[Actor] = cluster formatForActor address
implicit val format: Serializer = cluster formatForActor address
val actors = cluster use address
} else EventHandler.warning(this,
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]".format(message))