1. Implemented replication through transaction log, e.g. logging all messages and replaying them after actor migration

2. Added first replication test (out of many)
3. Improved ScalaDoc
4. Enhanced the remote protocol with replication info

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-07-08 08:28:13 +02:00
parent 6117e599d6
commit 0b1ee758f5
36 changed files with 893 additions and 550 deletions

View file

@ -113,7 +113,7 @@ trait ClusterNodeMBean {
}
/**
* Module for the ClusterNode. Also holds global state such as configuration data etc.
* Module for the Cluster. Also holds global state such as configuration data etc.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -131,6 +131,10 @@ object Cluster {
val enableJMX = config.getBool("akka.enable-jmx", true)
val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt
val includeRefNodeInReplicaSet = config.getBool("akka.cluster.include-ref-node-in-replica-set", true)
val clusterDirectory = config.getString("akka.cluster.log-directory", "_akka_cluster")
val clusterDataDirectory = clusterDirectory + "/data"
val clusterLogDirectory = clusterDirectory + "/log"
@volatile
private var properties = Map.empty[String, String]
@ -189,19 +193,19 @@ object Cluster {
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
def startLocalCluster(): ZkServer =
startLocalCluster("_akka_cluster/data", "_akka_cluster/log", 2181, 5000)
startLocalCluster(clusterDataDirectory, clusterLogDirectory, 2181, 5000)
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
def startLocalCluster(port: Int, tickTime: Int): ZkServer =
startLocalCluster("_akka_cluster/data", "_akka_cluster/log", port, tickTime)
startLocalCluster(clusterDataDirectory, clusterLogDirectory, port, tickTime)
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
*/
def startLocalCluster(tickTime: Int): ZkServer =
startLocalCluster("_akka_cluster/data", "_akka_cluster/log", 2181, tickTime)
startLocalCluster(clusterDataDirectory, clusterLogDirectory, 2181, tickTime)
/**
* Starts up a local ZooKeeper server. Should only be used for testing purposes.
@ -322,7 +326,7 @@ class DefaultClusterNode private[akka] (
}
}, "akka.cluster.RemoteClientLifeCycleListener").start()
private[cluster] lazy val remoteDaemon = localActorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
private[cluster] lazy val remoteDaemon = localActorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.Address).start()
private[cluster] lazy val remoteDaemonSupervisor = Supervisor(
SupervisorConfig(
@ -335,7 +339,7 @@ class DefaultClusterNode private[akka] (
lazy val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
remote.start(hostname, port)
remote.register(RemoteClusterDaemon.ADDRESS, remoteDaemon)
remote.register(RemoteClusterDaemon.Address, remoteDaemon)
remote.addListener(remoteClientLifeCycleListener)
remote
}
@ -676,21 +680,21 @@ class DefaultClusterNode private[akka] (
case Left(path) path
case Right(exception) actorAddressRegistryPath
}
// create ADDRESS -> SERIALIZER CLASS NAME mapping
try {
zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
} catch {
case e: ZkNodeExistsException zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
}
// create ADDRESS -> NODE mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToNodesPathFor(actorAddress)))
// create ADDRESS -> UUIDs mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
}
// create ADDRESS -> SERIALIZER CLASS NAME mapping
try {
zkClient.createPersistent(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
} catch {
case e: ZkNodeExistsException zkClient.writeData(actorAddressRegistrySerializerPathFor(actorAddress), serializerClassName)
}
// create ADDRESS -> NODE mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToNodesPathFor(actorAddress)))
// create ADDRESS -> UUIDs mapping
ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorAddress)))
useActorOnNodes(nodesForReplicationFactor(replicationFactor, Some(actorAddress)).toArray, actorAddress)
this
@ -825,16 +829,20 @@ class DefaultClusterNode private[akka] (
/**
* Using (checking out) actor on a specific set of nodes.
*/
def useActorOnNodes(nodes: Array[String], actorAddress: String) {
def useActorOnNodes(nodes: Array[String], actorAddress: String, replicateFromUuid: Option[UUID] = None) {
EventHandler.debug(this,
"Sending command to nodes [%s] for checking out actor [%s]".format(nodes.mkString(", "), actorAddress))
if (isConnected.get) {
val command = RemoteDaemonMessageProtocol.newBuilder
val builder = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorAddress(actorAddress)
.build
// set the UUID to replicated from - if available
replicateFromUuid foreach (uuid builder.setReplicateActorFromUuid(uuidToUuidProtocol(uuid)))
val command = builder.build
nodes foreach { node
nodeConnections.get(node) foreach {
@ -848,15 +856,15 @@ class DefaultClusterNode private[akka] (
/**
* Using (checking out) actor on all nodes in the cluster.
*/
def useActorOnAllNodes(actorAddress: String) {
useActorOnNodes(membershipNodes, actorAddress)
def useActorOnAllNodes(actorAddress: String, replicateFromUuid: Option[UUID] = None) {
useActorOnNodes(membershipNodes, actorAddress, replicateFromUuid)
}
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(node: String, actorAddress: String) {
useActorOnNodes(Array(node), actorAddress)
def useActorOnNode(node: String, actorAddress: String, replicateFromUuid: Option[UUID] = None) {
useActorOnNodes(Array(node), actorAddress, replicateFromUuid)
}
/**
@ -922,29 +930,6 @@ class DefaultClusterNode private[akka] (
} else throw new ClusterException("Not connected to cluster")
/**
* Migrate the actor from 'this' node to node 'to'.
*/
def migrate(to: NodeAddress, actorAddress: String) {
migrate(nodeAddress, to, actorAddress)
}
/**
* Migrate the actor from node 'from' to node 'to'.
*/
def migrate(
from: NodeAddress, to: NodeAddress, actorAddress: String) {
if (isConnected.get) {
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
if (isInUseOnNode(actorAddress, from)) {
migrateWithoutCheckingThatActorResidesOnItsHomeNode(from, to, actorAddress)
} else {
throw new ClusterException("Can't move actor from node [" + from + "] since it does not exist on this node")
}
}
}
/**
* Returns the UUIDs of all actors checked out on this node.
*/
@ -1285,7 +1270,7 @@ class DefaultClusterNode private[akka] (
val preferredNodes =
if (actorAddress.isDefined) { // use 'preferred-nodes' in deployment config for the actor
Deployer.deploymentFor(actorAddress.get) match {
case Deploy(_, _, _, Clustered(nodes, _, _))
case Deploy(_, _, Clustered(nodes, _, _))
nodes map (node DeploymentConfig.nodeNameFor(node)) take replicationFactor
case _
throw new ClusterException("Actor [" + actorAddress.get + "] is not configured as clustered")
@ -1360,7 +1345,7 @@ class DefaultClusterNode private[akka] (
EventHandler.debug(this,
"Setting up connection to node with nodename [%s] and address [%s]".format(node, address))
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort).start()
val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.Address, address.getHostName, address.getPort).start()
nodeConnections.put(node, (address, clusterDaemon))
}
}
@ -1457,7 +1442,16 @@ class DefaultClusterNode private[akka] (
nodeAddress
}
migrateWithoutCheckingThatActorResidesOnItsHomeNode(failedNodeAddress, migrateToNodeAddress, actorAddress) // since the ephemeral node is already gone, so can't check
// if actor is replicated => pass along the UUID for the actor to replicate from (replay transaction log etc.)
val replicateFromUuid =
if (isReplicated(actorAddress)) Some(uuid)
else None
migrateWithoutCheckingThatActorResidesOnItsHomeNode(
failedNodeAddress,
migrateToNodeAddress,
actorAddress,
replicateFromUuid)
}
// notify all available nodes that they should fail-over all connections from 'from' to 'to'
@ -1486,7 +1480,7 @@ class DefaultClusterNode private[akka] (
* Used when the ephemeral "home" node is already gone, so we can't check if it is available.
*/
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
from: NodeAddress, to: NodeAddress, actorAddress: String) {
from: NodeAddress, to: NodeAddress, actorAddress: String, replicateFromUuid: Option[UUID]) {
EventHandler.debug(this, "Migrating actor [%s] from node [%s] to node [%s]".format(actorAddress, from, to))
if (!isInUseOnNode(actorAddress, to)) {
@ -1502,7 +1496,7 @@ class DefaultClusterNode private[akka] (
//ignore[ZkNoNodeException](zkClient.delete(nodeToUuidsPathFor(from.nodeName, uuid)))
// 'use' (check out) actor on the remote 'to' node
useActorOnNode(to.nodeName, actorAddress)
useActorOnNode(to.nodeName, actorAddress, replicateFromUuid)
}
}
@ -1542,6 +1536,8 @@ class DefaultClusterNode private[akka] (
connectToAllNewlyArrivedMembershipNodesInCluster(membershipNodes, Nil)
}
private def isReplicated(actorAddress: String): Boolean = DeploymentConfig.isReplicated(Deployer.deploymentFor(actorAddress))
private def createMBean = {
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
@ -1672,7 +1668,7 @@ class StateListener(self: ClusterNode) extends IZkStateListener {
trait ErrorHandler {
def withErrorHandler[T](body: T) = {
try {
ignore[ZkInterruptedException](body)
ignore[ZkInterruptedException](body) // FIXME Is it good to ignore ZkInterruptedException? If not, how should we handle it?
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)
@ -1685,13 +1681,15 @@ trait ErrorHandler {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClusterDaemon {
val ADDRESS = "akka-cluster-daemon".intern
val Address = "akka-cluster-daemon".intern
// FIXME configure computeGridDispatcher to what?
val computeGridDispatcher = Dispatchers.newDispatcher("akka:compute-grid").build
}
/**
* Internal "daemon" actor for cluster internal communication.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
@ -1720,12 +1718,51 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
cluster.serializerForActor(actorAddress) foreach { serializer
cluster.use(actorAddress, serializer) foreach { actor
cluster.remoteService.register(actorAddress, actor)
if (message.hasReplicateActorFromUuid) {
// replication is used - fetch the messages and replay them
import akka.remote.protocol.RemoteProtocol._
import akka.remote.MessageSerializer
val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
val deployment = Deployer.deploymentFor(actorAddress)
val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
throw new IllegalStateException(
"Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
try {
// get the transaction log for the actor UUID
val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
// deserialize all messages
val entriesAsBytes = txLog.entries
// val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries // FIXME should work equally good if not a snapshot has been taken yet. => return all entries
val messages: Vector[AnyRef] = entriesAsBytes map { bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
// replay all messages
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
messages foreach { message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
actor ! message // FIXME how to handle '?' messages???
}
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)
throw e
}
}
}
}
} else {
EventHandler.error(this,
"Actor 'address' is not defined, ignoring remote cluster daemon command [%s]"
.format(message))
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
self.reply(Success)