Merged with master

This commit is contained in:
Debasish Ghosh 2011-06-13 13:35:39 +05:30
commit f27e23bf1d
64 changed files with 3087 additions and 2327 deletions

View file

@ -29,6 +29,7 @@ import Helpers._
import akka.actor._
import Actor._
import Status._
import DeploymentConfig.{ ReplicationScheme, ReplicationStrategy, Transient, WriteThrough, WriteBehind }
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
@ -463,7 +464,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, false, format)
store(Actor.actorOf(actorClass, address).start, 0, Transient, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -471,7 +480,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, false, format)
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -479,7 +496,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, serializeMailbox, format)
store(Actor.actorOf(actorClass, address).start, 0, Transient, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, replicationScheme, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -487,7 +512,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, serializeMailbox, format)
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationScheme, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -495,7 +528,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(actorRef: ActorRef, format: Serializer): ClusterNode =
store(actorRef, 0, false, format)
store(actorRef, 0, Transient, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
store(actorRef, 0, replicationScheme, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -503,7 +544,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, false, format)
store(actorRef, replicationFactor, Transient, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, replicationScheme, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -511,20 +560,47 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, 0, serializeMailbox, format)
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, serializeMailbox, format.asInstanceOf[Serializer])
store(actorRef, 0, Transient, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean): ClusterNode = if (isConnected.isOn) {
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, 0, replicationScheme, serializeMailbox, format)
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, replicationScheme: ReplicationScheme, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, replicationScheme, serializeMailbox, format.asInstanceOf[Serializer])
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(
actorRef: ActorRef,
replicationFactor: Int,
replicationScheme: ReplicationScheme,
serializeMailbox: Boolean,
format: Serializer): ClusterNode = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@ -535,12 +611,14 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid))
val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox))
else toBinary(actorRef)
val actorBytes =
if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationScheme)(format))
else toBinary(actorRef, serializeMailbox, replicationScheme)(format)
val actorRegistryPath = actorRegistryPathFor(uuid)
// create UUID -> Array[Byte] for actor registry
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME check for size and warn if too big
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store actor bytes in Data Grid not ZooKeeper
else {
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
def call: Either[String, Exception] = {
@ -590,9 +668,7 @@ class DefaultClusterNode private[akka] (
(connection !! (command, remoteDaemonAckTimeout)) match {
case Some(Success)
EventHandler.debug(this,
"Replica for [%s] successfully created on [%s]"
.format(actorRef.address, connection))
EventHandler.debug(this, "Replica for [%s] successfully created".format(actorRef.address))
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
@ -616,8 +692,9 @@ class DefaultClusterNode private[akka] (
releaseActorOnAllNodes(uuid)
locallyCheckedOutActors.remove(uuid)
// warning: ordering matters here
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // remove ADDRESS to UUID mapping
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // FIXME remove ADDRESS to UUID mapping?
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid)))
@ -662,20 +739,17 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, formatForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = if (isConnected.isOn) {
def use[T <: Actor](actorAddress: String, format: Serializer): Option[ActorRef] = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
actorUuidsForActorAddress(actorAddress) map { uuid
EventHandler.debug(this,
"Checking out actor with UUID [%s] to be used on node [%s] as local actor"
.format(uuid, nodeAddress.nodeName))
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress)))
@ -697,12 +771,12 @@ class DefaultClusterNode private[akka] (
}) match {
case Left(bytes)
locallyCheckedOutActors += (uuid -> bytes)
// FIXME switch to ReplicatedActorRef here
// val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
val actor = fromBinary[T](bytes, remoteServerAddress)
// remoteService.register(UUID_PREFIX + uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
val actor = fromBinary[T](bytes, remoteServerAddress)(format)
EventHandler.debug(this,
"Checking out actor [%s] to be used on node [%s] as local actor"
.format(actor, nodeAddress.nodeName))
actor.start()
actor.asInstanceOf[LocalActorRef]
actor
case Right(exception) throw exception
}
} headOption // FIXME should not be an array at all coming here but an Option[ActorRef]
@ -715,14 +789,15 @@ class DefaultClusterNode private[akka] (
isConnected ifOn {
EventHandler.debug(this,
"Using (checking out) all actors with UUID [%s] on all nodes in cluster".format(uuid))
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
membershipNodes foreach { node
replicaConnections.get(node) foreach {
case (_, connection)
connection ! command
case (_, connection) connection ! command
}
}
}
@ -786,8 +861,8 @@ class DefaultClusterNode private[akka] (
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] connected to [\n\t%s]"
.format(actorAddress, router, addresses.mkString("\n\t")))
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach { case (_, address) clusterActorRefs.put(address, actorRef) }
@ -1230,7 +1305,7 @@ class DefaultClusterNode private[akka] (
homeAddress.setAccessible(true)
homeAddress.set(actor, Some(remoteServerAddress))
remoteService.register(uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
remoteService.register(actorAddress, actor)
}
}
@ -1473,8 +1548,6 @@ object RemoteClusterDaemon {
val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
}
// FIXME supervise RemoteClusterDaemon
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/