1. Completed replication over BookKeeper based transaction log with configurable actor snapshotting every X message.

2. Completed replay of of transaction log on all replicated actors on migration after node crash.
3. Added end to end tests for write behind and write through replication and replay on fail-over.

Signed-off-by: Jonas Bonér <jonas@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-07-08 19:35:27 +02:00
parent 0b1ee758f5
commit 34c838d0f4
27 changed files with 702 additions and 61 deletions

View file

@ -22,9 +22,6 @@ import scala.collection.immutable.{ HashMap, HashSet }
import scala.collection.mutable.ConcurrentMap
import scala.collection.JavaConversions._
import ClusterProtocol._
import RemoteDaemonMessageType._
import akka.util._
import Helpers._
@ -42,12 +39,16 @@ import akka.config.{ Config, Supervision }
import Supervision._
import Config._
import akka.serialization.{ Serialization, Serializer, Compression }
import akka.serialization.{ Serialization, Serializer, Compression, ActorSerialization }
import ActorSerialization._
import Compression.LZF
import akka.AkkaException
import akka.cluster.zookeeper._
import akka.cluster.ChangeListener._
import ChangeListener._
import ClusterProtocol._
import RemoteDaemonMessageType._
import akka.AkkaException
import com.eaio.uuid.UUID
@ -742,20 +743,20 @@ class DefaultClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String): Option[ActorRef] = use(actorAddress, serializerForActor(actorAddress))
def use[T <: Actor](actorAddress: String): Option[LocalActorRef] = use(actorAddress, serializerForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[ActorRef] = if (isConnected.get) {
def use[T <: Actor](actorAddress: String, serializer: Serializer): Option[LocalActorRef] = if (isConnected.get) {
val nodeName = nodeAddress.nodeName
ignore[ZkNodeExistsException](zkClient.createEphemeral(actorAddressToNodesPathFor(actorAddress, nodeName)))
val actorFactoryPath = actorAddressRegistryPathFor(actorAddress)
zkClient.retryUntilConnected(new Callable[Either[Exception, () ActorRef]]() {
def call: Either[Exception, () ActorRef] = {
zkClient.retryUntilConnected(new Callable[Either[Exception, () LocalActorRef]]() {
def call: Either[Exception, () LocalActorRef] = {
try {
val actorFactoryBytes =
@ -763,9 +764,9 @@ class DefaultClusterNode private[akka] (
else zkClient.connection.readData(actorFactoryPath, new Stat, false)
val actorFactory =
Serialization.deserialize(actorFactoryBytes, classOf[() ActorRef], None) match {
Serialization.deserialize(actorFactoryBytes, classOf[() LocalActorRef], None) match {
case Left(error) throw error
case Right(instance) instance.asInstanceOf[() ActorRef]
case Right(instance) instance.asInstanceOf[() LocalActorRef]
}
Right(actorFactory)
@ -1716,8 +1717,8 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
cluster.serializerForActor(actorAddress) foreach { serializer
cluster.use(actorAddress, serializer) foreach { actor
cluster.remoteService.register(actorAddress, actor)
cluster.use(actorAddress, serializer) foreach { newActorRef
cluster.remoteService.register(actorAddress, newActorRef)
if (message.hasReplicateActorFromUuid) {
// replication is used - fetch the messages and replay them
@ -1735,10 +1736,37 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
// get the transaction log for the actor UUID
val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
// deserialize all messages
val entriesAsBytes = txLog.entries
// val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries // FIXME should work equally good if not a snapshot has been taken yet. => return all entries
// get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
// deserialize and restore actor snapshot
val actorRefToUseForReplay =
snapshotAsBytes match {
// we have a new actor ref - the snapshot
case Some(bytes)
// stop the new actor ref and use the snapshot instead
cluster.remoteService.unregister(actorAddress)
// deserialize the snapshot actor ref and register it as remote actor
val uncompressedBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
cluster.remoteService.register(actorAddress, snapshotActorRef)
// FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
//newActorRef.stop()
snapshotActorRef
// we have no snapshot - use the new actor ref
case None
newActorRef
}
// deserialize the messages
val messages: Vector[AnyRef] = entriesAsBytes map { bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
@ -1746,13 +1774,16 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
// replay all messages
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
// replay all messages
messages foreach { message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
actor ! message // FIXME how to handle '?' messages???
// FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
actorRefToUseForReplay ! message
}
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)