1. Made LocalActorRef aware of replication

2. Added configuration for transaction log replication
3. Added replication schemes WriteThrough and WriteBehind
4. Refactored serializer creation and lookup in Actor.scala
5. Extended network protocol with replication strategy
6. Added BookKeeper management to tests
7. Improved logging and error messages
8. Removed ReplicatedActorRef
9. Added snapshot management to TransactionLog

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-06-07 11:10:29 -07:00
parent 3d7a717b06
commit 04efc440d6
22 changed files with 6540 additions and 2183 deletions

View file

@ -29,6 +29,7 @@ import Helpers._
import akka.actor._
import Actor._
import Status._
import DeploymentConfig.{ ReplicationStrategy, Transient, WriteThrough, WriteBehind }
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
@ -463,7 +464,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, false, format)
store(Actor.actorOf(actorClass, address).start, 0, Transient, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, replicationStrategy, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -471,7 +480,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, false, format)
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationStrategy, false, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -479,7 +496,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, serializeMailbox, format)
store(Actor.actorOf(actorClass, address).start, 0, Transient, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, 0, replicationStrategy, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@ -487,7 +512,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, serializeMailbox, format)
store(Actor.actorOf(actorClass, address).start, replicationFactor, Transient, serializeMailbox, format)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store[T <: Actor](address: String, actorClass: Class[T], replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, replicationStrategy, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -495,7 +528,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(actorRef: ActorRef, format: Serializer): ClusterNode =
store(actorRef, 0, false, format)
store(actorRef, 0, Transient, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode =
store(actorRef, 0, replicationStrategy, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -503,7 +544,15 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(actorRef: ActorRef, replicationFactor: Int, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, false, format)
store(actorRef, replicationFactor, Transient, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, replicationStrategy, false, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@ -511,20 +560,47 @@ class DefaultClusterNode private[akka] (
* available durable store.
*/
def store(actorRef: ActorRef, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, 0, serializeMailbox, format)
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, serializeMailbox, format.asInstanceOf[Serializer])
store(actorRef, 0, Transient, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode = if (isConnected.isOn) {
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(actorRef: ActorRef, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: Serializer): ClusterNode =
store(actorRef, 0, replicationStrategy, serializeMailbox, format)
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, replicationStrategy: ReplicationStrategy, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, replicationStrategy, serializeMailbox, format.asInstanceOf[Serializer])
/**
* Needed to have reflection through structural typing work.
*/
def store(actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean, format: AnyRef): ClusterNode =
store(actorRef, replicationFactor, Transient, serializeMailbox, format)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
* with the actor passed in as argument. You can use this to save off snapshots of the actor to a highly
* available durable store.
*/
def store(
actorRef: ActorRef,
replicationFactor: Int,
replicationStrategy: ReplicationStrategy,
serializeMailbox: Boolean,
format: Serializer): ClusterNode = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@ -535,12 +611,14 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid))
val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox)(format))
else toBinary(actorRef)(format)
val actorBytes =
if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox, replicationStrategy)(format))
else toBinary(actorRef, serializeMailbox, replicationStrategy)(format)
val actorRegistryPath = actorRegistryPathFor(uuid)
// create UUID -> Array[Byte] for actor registry
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME check for size and warn if too big
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME Store in Data Grid not ZooKeeper
else {
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
def call: Either[String, Exception] = {
@ -590,9 +668,7 @@ class DefaultClusterNode private[akka] (
(connection !! (command, remoteDaemonAckTimeout)) match {
case Some(Success)
EventHandler.debug(this,
"Replica for [%s] successfully created on [%s]"
.format(actorRef.address, connection))
EventHandler.debug(this, "Replica for [%s] successfully created".format(actorRef.address))
case Some(Failure(cause))
EventHandler.error(cause, this, cause.toString)
@ -616,8 +692,9 @@ class DefaultClusterNode private[akka] (
releaseActorOnAllNodes(uuid)
locallyCheckedOutActors.remove(uuid)
// warning: ordering matters here
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // remove ADDRESS to UUID mapping
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // FIXME remove ADDRESS to UUID mapping?
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid)))
@ -662,20 +739,17 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, formatForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, format: Serializer): Option[LocalActorRef] = if (isConnected.isOn) {
def use[T <: Actor](actorAddress: String, format: Serializer): Option[ActorRef] = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
actorUuidsForActorAddress(actorAddress) map { uuid
EventHandler.debug(this,
"Checking out actor with UUID [%s] to be used on node [%s] as local actor"
.format(uuid, nodeAddress.nodeName))
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true))
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress)))
@ -697,12 +771,12 @@ class DefaultClusterNode private[akka] (
}) match {
case Left(bytes)
locallyCheckedOutActors += (uuid -> bytes)
// FIXME switch to ReplicatedActorRef here
// val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
val actor = fromBinary[T](bytes, remoteServerAddress)(format)
// remoteService.register(UUID_PREFIX + uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
EventHandler.debug(this,
"Checking out actor [%s] to be used on node [%s] as local actor"
.format(actor, nodeAddress.nodeName))
actor.start()
actor.asInstanceOf[LocalActorRef]
actor
case Right(exception) throw exception
}
} headOption // FIXME should not be an array at all coming here but an Option[ActorRef]
@ -786,8 +860,8 @@ class DefaultClusterNode private[akka] (
def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) {
val addresses = addressesForActor(actorAddress)
EventHandler.debug(this,
"Checking out cluster actor ref with address [%s] and router [%s] connected to [\n\t%s]"
.format(actorAddress, router, addresses.mkString("\n\t")))
"Checking out cluster actor ref with address [%s] and router [%s] on [%s] connected to [\n\t%s]"
.format(actorAddress, router, remoteServerAddress, addresses.map(_._2).mkString("\n\t")))
val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT)
addresses foreach { case (_, address) clusterActorRefs.put(address, actorRef) }