This commit is contained in:
Peter Veentjer 2011-08-06 23:08:01 +03:00
parent 02aeec6b57
commit aaec3aef77
13 changed files with 439 additions and 302 deletions

View file

@ -1758,6 +1758,8 @@ object RemoteClusterDaemon {
/**
* Internal "daemon" actor for cluster internal communication.
*
* It acts as the brain of the cluster that responds to cluster events (messages) and undertakes action.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
@ -1774,185 +1776,207 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
def receive: Receive = {
case message: RemoteDaemonMessageProtocol
EventHandler.debug(this,
"Received command [\n%s] to RemoteClusterDaemon on node [%s]"
.format(message, cluster.nodeAddress.nodeName))
"Received command [\n%s] to RemoteClusterDaemon on node [%s]".format(message, cluster.nodeAddress.nodeName))
message.getMessageType match {
case USE
try {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
cluster.serializerForActor(actorAddress) foreach { serializer
cluster.use(actorAddress, serializer) foreach { newActorRef
cluster.remoteService.register(actorAddress, newActorRef)
if (message.hasReplicateActorFromUuid) {
// replication is used - fetch the messages and replay them
import akka.remote.protocol.RemoteProtocol._
import akka.remote.MessageSerializer
val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
val deployment = Deployer.deploymentFor(actorAddress)
val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
throw new IllegalStateException(
"Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
try {
// get the transaction log for the actor UUID
val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
// get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries
// deserialize and restore actor snapshot
val actorRefToUseForReplay =
snapshotAsBytes match {
// we have a new actor ref - the snapshot
case Some(bytes)
// stop the new actor ref and use the snapshot instead
cluster.remoteService.unregister(actorAddress)
// deserialize the snapshot actor ref and register it as remote actor
val uncompressedBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
cluster.remoteService.register(actorAddress, snapshotActorRef)
// FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should)
//newActorRef.stop()
snapshotActorRef
// we have no snapshot - use the new actor ref
case None
newActorRef
}
// deserialize the messages
val messages: Vector[AnyRef] = entriesAsBytes map { bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
// replay all messages
messages foreach { message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
// FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other?
actorRefToUseForReplay ! message
}
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)
throw e
}
}
}
}
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
self.reply(Success)
} catch {
case error
self.reply(Failure(error))
throw error
}
case RELEASE
if (message.hasActorUuid) {
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address
cluster.release(address)
}
} else if (message.hasActorAddress) {
cluster release message.getActorAddress
} else {
EventHandler.warning(this,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]"
.format(message))
}
case START cluster.start()
case STOP cluster.shutdown()
case DISCONNECT cluster.disconnect()
case RECONNECT cluster.reconnect()
case RESIGN cluster.resign()
case FAIL_OVER_CONNECTIONS
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
cluster.failOverClusterActorRefConnections(from, to)
case FUNCTION_FUN0_UNIT
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[_] try {
f()
} finally {
self.stop()
}
}
}).start ! payloadFor(message, classOf[Function0[Unit]])
case FUNCTION_FUN0_ANY
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[_] try {
self.reply(f())
} finally {
self.stop()
}
}
}).start forward payloadFor(message, classOf[Function0[Any]])
case FUNCTION_FUN1_ARG_UNIT
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[_, _], param: Any) try {
fun.asInstanceOf[Any Unit].apply(param)
} finally {
self.stop()
}
}
}).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
case FUNCTION_FUN1_ARG_ANY
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[_, _], param: Any) try {
self.reply(fun.asInstanceOf[Any Any](param))
} finally {
self.stop()
}
}
}).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
case USE handleUse(message)
case RELEASE handleRelease(message)
case START cluster.start()
case STOP cluster.shutdown()
case DISCONNECT cluster.disconnect()
case RECONNECT cluster.reconnect()
case RESIGN cluster.resign()
case FAIL_OVER_CONNECTIONS handleFailover(message)
case FUNCTION_FUN0_UNIT handle_fun0_unit(message)
case FUNCTION_FUN0_ANY handle_fun0_any(message)
case FUNCTION_FUN1_ARG_UNIT handle_fun1_arg_unit(message)
case FUNCTION_FUN1_ARG_ANY handle_fun1_arg_any(message)
//TODO: should we not deal with unrecognized message types?
}
case unknown EventHandler.warning(this, "Unknown message [%s]".format(unknown))
}
def handleRelease(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
if (message.hasActorUuid) {
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address
cluster.release(address)
}
} else if (message.hasActorAddress) {
cluster release message.getActorAddress
} else {
EventHandler.warning(this,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message))
}
}
def handleUse(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
def deserializeMessages(entriesAsBytes: Vector[Array[Byte]]): Vector[AnyRef] = {
import akka.remote.protocol.RemoteProtocol._
import akka.remote.MessageSerializer
entriesAsBytes map { bytes
val messageBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None)
}
}
def createActorRefToUseForReplay(snapshotAsBytes: Option[Array[Byte]], actorAddress: String, newActorRef: LocalActorRef): ActorRef = {
snapshotAsBytes match {
// we have a new actor ref - the snapshot
case Some(bytes)
// stop the new actor ref and use the snapshot instead
//TODO: What if that actor already has been retrieved and is being used??
//So do we have a race here?
cluster.remoteService.unregister(actorAddress)
// deserialize the snapshot actor ref and register it as remote actor
val uncompressedBytes =
if (Cluster.shouldCompressData) LZF.uncompress(bytes)
else bytes
val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start()
cluster.remoteService.register(actorAddress, snapshotActorRef)
// FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently
//shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef
//have the same UUID (which they should)
//newActorRef.stop()
snapshotActorRef
// we have no snapshot - use the new actor ref
case None
newActorRef
}
}
try {
if (message.hasActorAddress) {
val actorAddress = message.getActorAddress
cluster.serializerForActor(actorAddress) foreach { serializer
cluster.use(actorAddress, serializer) foreach { newActorRef
cluster.remoteService.register(actorAddress, newActorRef)
if (message.hasReplicateActorFromUuid) {
// replication is used - fetch the messages and replay them
val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid)
val deployment = Deployer.deploymentFor(actorAddress)
val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse(
throw new IllegalStateException(
"Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme"))
val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme)
try {
// get the transaction log for the actor UUID
val readonlyTxLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme)
// get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte])
val (snapshotAsBytes, entriesAsBytes) = readonlyTxLog.latestSnapshotAndSubsequentEntries
// deserialize and restore actor snapshot. This call will automatically recreate a transaction log.
val actorRef = createActorRefToUseForReplay(snapshotAsBytes, actorAddress, newActorRef)
// deserialize the messages
val messages: Vector[AnyRef] = deserializeMessages(entriesAsBytes)
EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress))
// replay all messages
messages foreach { message
EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress))
// FIXME how to handle '?' messages?
// We can *not* replay them with the correct semantics. Should we:
// 1. Ignore/drop them and log warning?
// 2. Throw exception when about to log them?
// 3. Other?
actorRef ! message
}
} catch {
case e: Throwable
EventHandler.error(e, this, e.toString)
throw e
}
}
}
}
} else {
EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message))
}
self.reply(Success)
} catch {
case error
self.reply(Failure(error))
throw error
}
}
def handle_fun0_unit(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[_] try {
f()
} finally {
self.stop()
}
}
}).start ! payloadFor(message, classOf[Function0[Unit]])
}
def handle_fun0_any(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[_] try {
self.reply(f())
} finally {
self.stop()
}
}
}).start forward payloadFor(message, classOf[Function0[Any]])
}
def handle_fun1_arg_unit(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[_, _], param: Any) try {
fun.asInstanceOf[Any Unit].apply(param)
} finally {
self.stop()
}
}
}).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
}
def handle_fun1_arg_any(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
localActorOf(new Actor() {
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[_, _], param: Any) try {
self.reply(fun.asInstanceOf[Any Any](param))
} finally {
self.stop()
}
}
}).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}
def handleFailover(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
cluster.failOverClusterActorRefConnections(from, to)
}
private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = {
Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match {
case Left(error) throw error

View file

@ -14,26 +14,25 @@ import akka.config._
import Config._
import akka.util._
import akka.actor._
import DeploymentConfig.{ ReplicationScheme }
import DeploymentConfig.ReplicationScheme
import akka.event.EventHandler
import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation }
import akka.remote.MessageSerializer
import akka.cluster.zookeeper._
import akka.serialization.Compression
import Compression.LZF
import akka.serialization.ActorSerialization._
import akka.serialization.Compression.LZF
import java.util.Enumeration
// FIXME allow user to choose dynamically between 'async' and 'sync' tx logging (asyncAddEntry(byte[] data, AddCallback cb, Object ctx))
// FIXME clean up old entries in log after doing a snapshot
// FIXME clean up all meta-data in ZK for a specific UUID when the corresponding actor is shut down
// FIXME delete tx log after migration of actor has been made and create a new one
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReplicationException(message: String) extends AkkaException(message)
class ReplicationException(message: String, cause: Throwable = null) extends AkkaException(message) {
def this(msg: String) = this(msg, null);
}
/**
* TODO: Explain something about threadsafety.
@ -58,39 +57,50 @@ class TransactionLog private (
/**
* Record an Actor message invocation.
*
* @param invocation the MessageInvocation to record
* @param actorRef the LocalActorRef that received the message.
* @throws ReplicationException if the TransactionLog already is closed.
*/
def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) {
def recordEntry(invocation: MessageInvocation, actorRef: LocalActorRef) {
val entryId = ledger.getLastAddPushed + 1
if (entryId != 0 && (entryId % snapshotFrequency) == 0) {
recordSnapshot(toBinary(actorRef, false, replicationScheme))
} else recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray)
val needsSnapshot = entryId != 0 && (entryId % snapshotFrequency) == 0
if (needsSnapshot) {
//todo: could it be that the message is never persisted when a snapshot is added?
val bytes = toBinary(actorRef, false, replicationScheme)
recordSnapshot(bytes)
} else {
val bytes = MessageSerializer.serialize(invocation.message.asInstanceOf[AnyRef]).toByteArray
recordEntry(bytes)
}
}
/**
* Record an entry.
*
* @param entry the entry in byte form to record.
* @throws ReplicationException if the TransactionLog already is closed.
*/
def recordEntry(entry: Array[Byte]) {
if (isOpen.isOn) {
val bytes =
val entryBytes =
if (Cluster.shouldCompressData) LZF.compress(entry)
else entry
try {
if (isAsync) {
ledger.asyncAddEntry(
bytes,
entryBytes,
new AsyncCallback.AddCallback {
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
entryId: Long,
ctx: AnyRef) {
def addComplete(returnCode: Int, ledgerHandle: LedgerHandle, entryId: Long, ctx: AnyRef) {
handleReturnCode(returnCode)
EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
}
},
null)
} else {
handleReturnCode(ledger.addEntry(bytes))
handleReturnCode(ledger.addEntry(entryBytes))
val entryId = ledger.getLastAddPushed
EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
}
@ -102,22 +112,22 @@ class TransactionLog private (
/**
* Record a snapshot.
*
* @param snapshot the snapshot in byteform to record.
* @throws ReplicationException if the TransactionLog already is closed.
*/
def recordSnapshot(snapshot: Array[Byte]) {
if (isOpen.isOn) {
val bytes =
val snapshotBytes =
if (Cluster.shouldCompressData) LZF.compress(snapshot)
else snapshot
try {
if (isAsync) {
ledger.asyncAddEntry(
bytes,
snapshotBytes,
new AsyncCallback.AddCallback {
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
snapshotId: Long,
ctx: AnyRef) {
def addComplete(returnCode: Int, ledgerHandle: LedgerHandle, snapshotId: Long, ctx: AnyRef) {
handleReturnCode(returnCode)
EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId))
storeSnapshotMetaDataInZooKeeper(snapshotId)
@ -125,10 +135,18 @@ class TransactionLog private (
},
null)
} else {
handleReturnCode(ledger.addEntry(bytes))
//todo: could this be racy, since writing the snapshot itself and storing the snapsnot id, is not
//an atomic operation?
//first store the snapshot.
handleReturnCode(ledger.addEntry(snapshotBytes))
val snapshotId = ledger.getLastAddPushed
//this is the location where all previous entries can be removed.
//TODO: how to remove data?
EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId))
//and now store the snapshot metadata.
storeSnapshotMetaDataInZooKeeper(snapshotId)
}
} catch {
@ -139,6 +157,8 @@ class TransactionLog private (
/**
* Get all the entries for this transaction log.
*
* @throws ReplicationException if the TransactionLog already is closed.
*/
def entries: Vector[Array[Byte]] = entriesInRange(0, ledger.getLastAddConfirmed)
@ -168,26 +188,26 @@ class TransactionLog private (
/**
* Get a range of entries from 'from' to 'to' for this transaction log.
*
* @param from the first element of the range
* @param the last index from the range (including).
* @return a Vector containing Byte Arrays. Each element in the vector is a record.
* @throws IllegalArgumenException if from or to is negative, or if 'from' is bigger than 'to'.
* @throws ReplicationException if the TransactionLog already is closed.
*/
def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] = if (isOpen.isOn) {
try {
if (from < 0) throw new IllegalArgumentException("'from' index can't be negative [" + from + "]")
if (to < 0) throw new IllegalArgumentException("'to' index can't be negative [" + from + "]")
if (to < from) throw new IllegalArgumentException("'to' index can't be smaller than 'from' index [" + from + "," + to + "]")
EventHandler.debug(this,
"Reading entries [%s -> %s] for log [%s]".format(from, to, logId))
EventHandler.debug(this, "Reading entries [%s -> %s] for log [%s]".format(from, to, logId))
if (isAsync) {
val future = new DefaultPromise[Vector[Array[Byte]]](timeout)
ledger.asyncReadEntries(
from, to,
new AsyncCallback.ReadCallback {
def readComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
enumeration: Enumeration[LedgerEntry],
ctx: AnyRef) {
def readComplete(returnCode: Int, ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) {
val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]]
val entries = toByteArrays(enumeration)
@ -207,6 +227,8 @@ class TransactionLog private (
/**
* Get the last entry written to this transaction log.
*
* Returns -1 if there has never been an entry.
*/
def latestEntryId: Long = ledger.getLastAddConfirmed
@ -216,8 +238,7 @@ class TransactionLog private (
def latestSnapshotId: Option[Long] = {
try {
val snapshotId = zkClient.readData(snapshotPath).asInstanceOf[Long]
EventHandler.debug(this,
"Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId))
EventHandler.debug(this, "Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId))
Some(snapshotId)
} catch {
case e: ZkNoNodeException None
@ -226,7 +247,10 @@ class TransactionLog private (
}
/**
* Delete all entries for this transaction log.
* Delete this transaction log. So all entries but also all metadata will be removed.
*
* TODO: Behavior unclear what happens when already deleted (what happens to the ledger).
* TODO: Behavior unclear what happens when already closed.
*/
def delete() {
if (isOpen.isOn) {
@ -244,6 +268,10 @@ class TransactionLog private (
} else {
bookieClient.deleteLedger(logId)
}
//also remote everything else that belongs to this TransactionLog.
zkClient.delete(snapshotPath)
zkClient.delete(txLogPath)
} catch {
case e handleError(e)
}
@ -252,6 +280,8 @@ class TransactionLog private (
/**
* Close this transaction log.
*
* If already closed, the call is ignored.
*/
def close() {
if (isOpen.switchOff) {
@ -303,8 +333,7 @@ class TransactionLog private (
} catch {
case e
handleError(new ReplicationException(
"Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" +
id + "]"))
"Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" + id + "]"))
}
EventHandler.debug(this, "Writing snapshot [%s] to log [%s]".format(snapshotId, logId))
} else transactionClosedError
@ -398,18 +427,32 @@ object TransactionLog {
}
/**
* Creates a new transaction log for the 'id' specified.
* Checks if a TransactionLog for the given id already exists.
*/
def newLogFor(
id: String,
isAsync: Boolean,
replicationScheme: ReplicationScheme): TransactionLog = {
def exists(id: String): Boolean = {
val txLogPath = transactionLogNode + "/" + id
zkClient.exists(txLogPath)
}
/**
* Creates a new transaction log for the 'id' specified. If a TransactionLog already exists for the id,
* it will be overwritten.
*/
def newLogFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id
val ledger = try {
if (zkClient.exists(txLogPath)) throw new ReplicationException(
"Transaction log for UUID [" + id + "] already exists")
if (exists(id)) {
//if it exists, we need to delete it first. This gives it the overwrite semantics we are looking for.
try {
val ledger = bookieClient.createLedger(ensembleSize, quorumSize, digestType, password)
val txLog = TransactionLog(ledger, id, false, null)
txLog.delete()
txLog.close()
} catch {
case e handleError(e)
}
}
val future = new DefaultPromise[LedgerHandle](timeout)
if (isAsync) {
@ -438,13 +481,13 @@ object TransactionLog {
try {
zkClient.create(txLogPath, null, CreateMode.PERSISTENT)
zkClient.writeData(txLogPath, logId)
logId
logId //TODO: does this have any effect?
} catch {
case e
bookieClient.deleteLedger(logId) // clean up
handleError(new ReplicationException(
"Could not store transaction log [" + logId +
"] meta-data in ZooKeeper for UUID [" + id + "]"))
"] meta-data in ZooKeeper for UUID [" + id + "]", e))
}
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
@ -453,12 +496,10 @@ object TransactionLog {
/**
* Fetches an existing transaction log for the 'id' specified.
*
* @throws ReplicationException if the log with the given id doesn't exist.
*/
def logFor(
id: String,
isAsync: Boolean,
replicationScheme: ReplicationScheme): TransactionLog = {
def logFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id
val logId = try {
@ -479,10 +520,7 @@ object TransactionLog {
bookieClient.asyncOpenLedger(
logId, digestType, password,
new AsyncCallback.OpenCallback {
def openComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
def openComplete(returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) {
val future = ctx.asInstanceOf[Promise[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
@ -514,20 +552,29 @@ object TransactionLog {
}
/**
* TODO: Documentation.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object LocalBookKeeperEnsemble {
private val isRunning = new Switch(false)
//TODO: should probably come from the config file.
private val port = 5555
@volatile
private var localBookKeeper: LocalBookKeeper = _
/**
* TODO document method
* Starts the LocalBookKeeperEnsemble.
*
* Call can safely be made when already started.
*
* This call will block until it is started.
*/
def start() {
isRunning switchOn {
EventHandler.info(this, "Starting LocalBookKeeperEnsemble")
localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize)
localBookKeeper.runZookeeper(port)
localBookKeeper.initializeZookeper()
@ -537,7 +584,11 @@ object LocalBookKeeperEnsemble {
}
/**
* TODO document method
* Shuts down the LocalBookKeeperEnsemble.
*
* Call can safely bemade when already shutdown.
*
* This call will block until the shutdown completes.
*/
def shutdown() {
isRunning switchOff {

View file

@ -13,28 +13,6 @@ import akka.util.Duration
import System.{ currentTimeMillis now }
import java.io.File
import akka.actor.Deployer
trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
def testNodes: Int
override def beforeAll() = {
Cluster.startLocalCluster()
onReady()
ClusterTestNode.ready(getClass.getName)
}
def onReady() = {}
override def afterAll() = {
ClusterTestNode.waitForExits(getClass.getName, testNodes - 1)
ClusterTestNode.cleanUp(getClass.getName)
onShutdown()
Cluster.shutdownLocalCluster()
}
def onShutdown() = {}
}
trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.BeforeAndAfterAll
trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll {
def testNodes: Int
override def beforeAll() = {
Cluster.startLocalCluster()
onReady()
ClusterTestNode.ready(getClass.getName)
}
def onReady() = {}
override def afterAll() = {
ClusterTestNode.waitForExits(getClass.getName, testNodes - 1)
ClusterTestNode.cleanUp(getClass.getName)
onShutdown()
Cluster.shutdownLocalCluster()
}
def onShutdown() = {}
}

View file

@ -47,8 +47,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends Cluste
for (i 0 until 10) (actorRef ? Count(i)).as[String] must be(Some("World from node [node1]"))
}
barrier("start-node2", NrOfNodes) {
}
barrier("start-node2", NrOfNodes).await()
node.shutdown()
}
@ -64,11 +63,9 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends Master
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
}
barrier("start-node1", NrOfNodes).await()
barrier("create-actor-on-node1", NrOfNodes) {
}
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()

View file

@ -68,8 +68,7 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterT
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
}
barrier("start-node2", NrOfNodes) {
}
barrier("start-node2", NrOfNodes).await()
node.shutdown()
}
@ -85,11 +84,9 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterCl
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
}
barrier("start-node1", NrOfNodes).await()
barrier("create-actor-on-node1", NrOfNodes) {
}
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()

View file

@ -1,8 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -1,7 +1,6 @@
akka.enabled-modules = ["cluster"]
akka.event-handler-level = "WARNING"
akka.actor.deployment.hello-world.router = "direct"
akka.actor.deployment.hello-world.clustered.replication-factor = 1
akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log"
akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through"
akka.cluster.replication.snapshot-frequency = 1000

View file

@ -20,9 +20,11 @@ object ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec {
var log = ""
def receive = {
case Count(nr)
println("Received number: " + nr)
log += nr.toString
self.reply("World from node [" + Config.nodename + "]")
case GetLog
println("Received getLog")
self.reply(Log(log))
}
}
@ -47,8 +49,7 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends Clust
(actorRef ? Count(i)).as[String] must be(Some("World from node [node1]"))
}
barrier("start-node2", NrOfNodes) {
}
barrier("start-node2", NrOfNodes).await()
node.shutdown()
}
@ -64,11 +65,9 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2 extends Maste
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
}
barrier("start-node1", NrOfNodes).await()
barrier("create-actor-on-node1", NrOfNodes) {
}
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()

View file

@ -68,8 +68,7 @@ class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1 extends Cluster
(actorRef ? Count(counter)).as[String].get must be("World from node [node1]")
}
barrier("start-node2", NrOfNodes) {
}
barrier("start-node2", NrOfNodes).await()
node.shutdown()
}
@ -85,11 +84,9 @@ class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2 extends MasterC
"be able to replicate an actor with a transaction log and replay transaction log after actor migration" in {
barrier("start-node1", NrOfNodes) {
}
barrier("start-node1", NrOfNodes).await()
barrier("create-actor-on-node1", NrOfNodes) {
}
barrier("create-actor-on-node1", NrOfNodes).await()
barrier("start-node2", NrOfNodes) {
node.start()

View file

@ -3,33 +3,47 @@
*/
package akka.cluster
import org.apache.bookkeeper.client.{ BookKeeper, BKException }
import BKException._
import org.apache.zookeeper.server.ZooKeeperServer
import org.apache.bookkeeper.client.BookKeeper
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Spec }
import akka.serialization._
import akka.actor._
import ActorSerialization._
import Actor._
import java.util.concurrent.{ CyclicBarrier, TimeUnit }
import java.io.File
import java.nio.ByteBuffer
import org.scalatest.BeforeAndAfterAll
import com.eaio.uuid.UUID
import scala.collection.JavaConversions._
class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll {
private var bookKeeper: BookKeeper = _
private var localBookKeeper: LocalBookKeeper = _
// synchronous API
"A Transaction Log" should {
"A synchronous Transaction Log" should {
"be able to be deleted - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
txlog.delete()
txlog.close()
val zkClient = TransactionLog.zkClient
assert(zkClient.readData(txlog.snapshotPath, true) == null)
assert(zkClient.readData(txlog.txLogPath, true) == null)
}
"fail to be opened if non existing - synchronous" in {
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to be checked for existence - synchronous" in {
val uuid = (new UUID).toString
TransactionLog.exists(uuid) must be(false)
TransactionLog.newLogFor(uuid, false, null)
TransactionLog.exists(uuid) must be(true)
}
"be able to record entries - synchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, false, null)
@ -37,6 +51,19 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
txlog.recordEntry(entry)
}
"be able to overweite an existing txlog if one already exists - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txLog2 = TransactionLog.newLogFor(uuid, false, null)
txLog2.latestSnapshotId.isDefined must be(false)
txLog2.latestEntryId must be(-1)
}
"be able to record and delete entries - synchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, false, null)
@ -45,7 +72,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
txlog1.recordEntry(entry)
txlog1.delete
txlog1.close
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null))
intercept[ReplicationException](TransactionLog.logFor(uuid, false, null))
}
"be able to record entries and read entries with 'entriesInRange' - synchronous" in {
@ -146,7 +173,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
}
}
"A Transaction Log" should {
"An asynchronous Transaction Log" should {
"be able to record entries - asynchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, true, null)
@ -156,6 +183,46 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
txlog.close
}
"be able to be deleted - asynchronous" in {
val uuid = (new UUID).toString
val txlog = TransactionLog.newLogFor(uuid, true, null)
val entry = "hello".getBytes("UTF-8")
txlog.recordEntry(entry)
txlog.delete()
txlog.close()
val zkClient = TransactionLog.zkClient
assert(zkClient.readData(txlog.snapshotPath, true) == null)
assert(zkClient.readData(txlog.txLogPath, true) == null)
}
"be able to be checked for existence - asynchronous" in {
val uuid = (new UUID).toString
TransactionLog.exists(uuid) must be(false)
TransactionLog.newLogFor(uuid, true, null)
TransactionLog.exists(uuid) must be(true)
}
"fail to be opened if non existing - asynchronous" in {
val uuid = (new UUID).toString
intercept[ReplicationException](TransactionLog.logFor(uuid, true, null))
}
"be able to overweite an existing txlog if one already exists - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
val entry = "hello".getBytes("UTF-8")
txlog1.recordEntry(entry)
txlog1.recordEntry(entry)
txlog1.close
val txLog2 = TransactionLog.newLogFor(uuid, true, null)
txLog2.latestSnapshotId.isDefined must be(false)
txLog2.latestEntryId must be(-1)
}
"be able to record and delete entries - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true, null)
@ -167,8 +234,9 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA
Thread.sleep(200)
txlog1.delete
Thread.sleep(200)
intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null))
intercept[ReplicationException](TransactionLog.logFor(uuid, true, null))
}
"be able to record entries and read entries with 'entriesInRange' - asynchronous" in {
val uuid = (new UUID).toString
val txlog1 = TransactionLog.newLogFor(uuid, true, null)