diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 348b11195e..2b8a14a41b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -1758,6 +1758,8 @@ object RemoteClusterDaemon { /** * Internal "daemon" actor for cluster internal communication. * + * It acts as the brain of the cluster that responds to cluster events (messages) and undertakes action. + * * @author Jonas Bonér */ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { @@ -1774,185 +1776,207 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { def receive: Receive = { case message: RemoteDaemonMessageProtocol ⇒ EventHandler.debug(this, - "Received command [\n%s] to RemoteClusterDaemon on node [%s]" - .format(message, cluster.nodeAddress.nodeName)) + "Received command [\n%s] to RemoteClusterDaemon on node [%s]".format(message, cluster.nodeAddress.nodeName)) message.getMessageType match { - - case USE ⇒ - try { - if (message.hasActorAddress) { - val actorAddress = message.getActorAddress - cluster.serializerForActor(actorAddress) foreach { serializer ⇒ - cluster.use(actorAddress, serializer) foreach { newActorRef ⇒ - cluster.remoteService.register(actorAddress, newActorRef) - - if (message.hasReplicateActorFromUuid) { - // replication is used - fetch the messages and replay them - import akka.remote.protocol.RemoteProtocol._ - import akka.remote.MessageSerializer - - val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) - val deployment = Deployer.deploymentFor(actorAddress) - val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( - throw new IllegalStateException( - "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) - val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) - - try { - // get the transaction log for the actor UUID - val txLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) - - // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) - val (snapshotAsBytes, entriesAsBytes) = txLog.latestSnapshotAndSubsequentEntries - - // deserialize and restore actor snapshot - val actorRefToUseForReplay = - snapshotAsBytes match { - - // we have a new actor ref - the snapshot - case Some(bytes) ⇒ - // stop the new actor ref and use the snapshot instead - cluster.remoteService.unregister(actorAddress) - - // deserialize the snapshot actor ref and register it as remote actor - val uncompressedBytes = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) - else bytes - - val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() - cluster.remoteService.register(actorAddress, snapshotActorRef) - - // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef have the same UUID (which they should) - //newActorRef.stop() - - snapshotActorRef - - // we have no snapshot - use the new actor ref - case None ⇒ - newActorRef - } - - // deserialize the messages - val messages: Vector[AnyRef] = entriesAsBytes map { bytes ⇒ - val messageBytes = - if (Cluster.shouldCompressData) LZF.uncompress(bytes) - else bytes - MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) - } - - EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) - - // replay all messages - messages foreach { message ⇒ - EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) - - // FIXME how to handle '?' messages? We can *not* replay them with the correct semantics. Should we: 1. Ignore/drop them and log warning? 2. Throw exception when about to log them? 3. Other? - actorRefToUseForReplay ! message - } - - } catch { - case e: Throwable ⇒ - EventHandler.error(e, this, e.toString) - throw e - } - } - } - } - } else { - EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message)) - } - self.reply(Success) - - } catch { - case error ⇒ - self.reply(Failure(error)) - throw error - } - - case RELEASE ⇒ - if (message.hasActorUuid) { - cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒ - cluster.release(address) - } - } else if (message.hasActorAddress) { - cluster release message.getActorAddress - } else { - EventHandler.warning(this, - "None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]" - .format(message)) - } - - case START ⇒ cluster.start() - - case STOP ⇒ cluster.shutdown() - - case DISCONNECT ⇒ cluster.disconnect() - - case RECONNECT ⇒ cluster.reconnect() - - case RESIGN ⇒ cluster.resign() - - case FAIL_OVER_CONNECTIONS ⇒ - val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)]) - cluster.failOverClusterActorRefConnections(from, to) - - case FUNCTION_FUN0_UNIT ⇒ - localActorOf(new Actor() { - self.dispatcher = computeGridDispatcher - - def receive = { - case f: Function0[_] ⇒ try { - f() - } finally { - self.stop() - } - } - }).start ! payloadFor(message, classOf[Function0[Unit]]) - - case FUNCTION_FUN0_ANY ⇒ - localActorOf(new Actor() { - self.dispatcher = computeGridDispatcher - - def receive = { - case f: Function0[_] ⇒ try { - self.reply(f()) - } finally { - self.stop() - } - } - }).start forward payloadFor(message, classOf[Function0[Any]]) - - case FUNCTION_FUN1_ARG_UNIT ⇒ - localActorOf(new Actor() { - self.dispatcher = computeGridDispatcher - - def receive = { - case (fun: Function[_, _], param: Any) ⇒ try { - fun.asInstanceOf[Any ⇒ Unit].apply(param) - } finally { - self.stop() - } - } - }).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) - - case FUNCTION_FUN1_ARG_ANY ⇒ - localActorOf(new Actor() { - self.dispatcher = computeGridDispatcher - - def receive = { - case (fun: Function[_, _], param: Any) ⇒ try { - self.reply(fun.asInstanceOf[Any ⇒ Any](param)) - } finally { - self.stop() - } - } - }).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + case USE ⇒ handleUse(message) + case RELEASE ⇒ handleRelease(message) + case START ⇒ cluster.start() + case STOP ⇒ cluster.shutdown() + case DISCONNECT ⇒ cluster.disconnect() + case RECONNECT ⇒ cluster.reconnect() + case RESIGN ⇒ cluster.resign() + case FAIL_OVER_CONNECTIONS ⇒ handleFailover(message) + case FUNCTION_FUN0_UNIT ⇒ handle_fun0_unit(message) + case FUNCTION_FUN0_ANY ⇒ handle_fun0_any(message) + case FUNCTION_FUN1_ARG_UNIT ⇒ handle_fun1_arg_unit(message) + case FUNCTION_FUN1_ARG_ANY ⇒ handle_fun1_arg_any(message) + //TODO: should we not deal with unrecognized message types? } case unknown ⇒ EventHandler.warning(this, "Unknown message [%s]".format(unknown)) } + def handleRelease(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + if (message.hasActorUuid) { + cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address ⇒ + cluster.release(address) + } + } else if (message.hasActorAddress) { + cluster release message.getActorAddress + } else { + EventHandler.warning(this, + "None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message)) + } + } + + def handleUse(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + def deserializeMessages(entriesAsBytes: Vector[Array[Byte]]): Vector[AnyRef] = { + import akka.remote.protocol.RemoteProtocol._ + import akka.remote.MessageSerializer + + entriesAsBytes map { bytes ⇒ + val messageBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes + MessageSerializer.deserialize(MessageProtocol.parseFrom(messageBytes), None) + } + } + + def createActorRefToUseForReplay(snapshotAsBytes: Option[Array[Byte]], actorAddress: String, newActorRef: LocalActorRef): ActorRef = { + snapshotAsBytes match { + + // we have a new actor ref - the snapshot + case Some(bytes) ⇒ + // stop the new actor ref and use the snapshot instead + //TODO: What if that actor already has been retrieved and is being used?? + //So do we have a race here? + cluster.remoteService.unregister(actorAddress) + + // deserialize the snapshot actor ref and register it as remote actor + val uncompressedBytes = + if (Cluster.shouldCompressData) LZF.uncompress(bytes) + else bytes + + val snapshotActorRef = fromBinary(uncompressedBytes, newActorRef.uuid).start() + cluster.remoteService.register(actorAddress, snapshotActorRef) + + // FIXME we should call 'stop()' here (to GC the actor), but can't since that will currently + //shut down the TransactionLog for this UUID - since both this actor and the new snapshotActorRef + //have the same UUID (which they should) + //newActorRef.stop() + + snapshotActorRef + + // we have no snapshot - use the new actor ref + case None ⇒ + newActorRef + } + } + + try { + if (message.hasActorAddress) { + val actorAddress = message.getActorAddress + cluster.serializerForActor(actorAddress) foreach { serializer ⇒ + cluster.use(actorAddress, serializer) foreach { newActorRef ⇒ + cluster.remoteService.register(actorAddress, newActorRef) + + if (message.hasReplicateActorFromUuid) { + // replication is used - fetch the messages and replay them + val replicateFromUuid = uuidProtocolToUuid(message.getReplicateActorFromUuid) + val deployment = Deployer.deploymentFor(actorAddress) + val replicationScheme = DeploymentConfig.replicationSchemeFor(deployment).getOrElse( + throw new IllegalStateException( + "Actor [" + actorAddress + "] should have been configured as a replicated actor but could not find its ReplicationScheme")) + val isWriteBehind = DeploymentConfig.isWriteBehindReplication(replicationScheme) + + try { + // get the transaction log for the actor UUID + val readonlyTxLog = TransactionLog.logFor(replicateFromUuid.toString, isWriteBehind, replicationScheme) + + // get the latest snapshot (Option[Array[Byte]]) and all the subsequent messages (Array[Byte]) + val (snapshotAsBytes, entriesAsBytes) = readonlyTxLog.latestSnapshotAndSubsequentEntries + + // deserialize and restore actor snapshot. This call will automatically recreate a transaction log. + val actorRef = createActorRefToUseForReplay(snapshotAsBytes, actorAddress, newActorRef) + + // deserialize the messages + val messages: Vector[AnyRef] = deserializeMessages(entriesAsBytes) + + EventHandler.info(this, "Replaying [%s] messages to actor [%s]".format(messages.size, actorAddress)) + + // replay all messages + messages foreach { message ⇒ + EventHandler.debug(this, "Replaying message [%s] to actor [%s]".format(message, actorAddress)) + + // FIXME how to handle '?' messages? + // We can *not* replay them with the correct semantics. Should we: + // 1. Ignore/drop them and log warning? + // 2. Throw exception when about to log them? + // 3. Other? + actorRef ! message + } + + } catch { + case e: Throwable ⇒ + EventHandler.error(e, this, e.toString) + throw e + } + } + } + } + } else { + EventHandler.error(this, "Actor 'address' is not defined, ignoring remote cluster daemon command [%s]".format(message)) + } + self.reply(Success) + } catch { + case error ⇒ + self.reply(Failure(error)) + throw error + } + } + + def handle_fun0_unit(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + localActorOf(new Actor() { + self.dispatcher = computeGridDispatcher + + def receive = { + case f: Function0[_] ⇒ try { + f() + } finally { + self.stop() + } + } + }).start ! payloadFor(message, classOf[Function0[Unit]]) + } + + def handle_fun0_any(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + localActorOf(new Actor() { + self.dispatcher = computeGridDispatcher + + def receive = { + case f: Function0[_] ⇒ try { + self.reply(f()) + } finally { + self.stop() + } + } + }).start forward payloadFor(message, classOf[Function0[Any]]) + } + + def handle_fun1_arg_unit(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + localActorOf(new Actor() { + self.dispatcher = computeGridDispatcher + + def receive = { + case (fun: Function[_, _], param: Any) ⇒ try { + fun.asInstanceOf[Any ⇒ Unit].apply(param) + } finally { + self.stop() + } + } + }).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]]) + } + + def handle_fun1_arg_any(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + localActorOf(new Actor() { + self.dispatcher = computeGridDispatcher + + def receive = { + case (fun: Function[_, _], param: Any) ⇒ try { + self.reply(fun.asInstanceOf[Any ⇒ Any](param)) + } finally { + self.stop() + } + } + }).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]]) + } + + def handleFailover(message: ClusterProtocol.RemoteDaemonMessageProtocol) { + val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)]) + cluster.failOverClusterActorRefConnections(from, to) + } + private def payloadFor[T](message: RemoteDaemonMessageProtocol, clazz: Class[T]): T = { Serialization.deserialize(message.getPayload.toByteArray, clazz, None) match { case Left(error) ⇒ throw error diff --git a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala index d12820c130..0b33bfe6f3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala +++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala @@ -14,26 +14,25 @@ import akka.config._ import Config._ import akka.util._ import akka.actor._ -import DeploymentConfig.{ ReplicationScheme } +import DeploymentConfig.ReplicationScheme import akka.event.EventHandler import akka.dispatch.{ DefaultPromise, Promise, MessageInvocation } import akka.remote.MessageSerializer import akka.cluster.zookeeper._ -import akka.serialization.Compression -import Compression.LZF import akka.serialization.ActorSerialization._ +import akka.serialization.Compression.LZF import java.util.Enumeration // FIXME allow user to choose dynamically between 'async' and 'sync' tx logging (asyncAddEntry(byte[] data, AddCallback cb, Object ctx)) // FIXME clean up old entries in log after doing a snapshot -// FIXME clean up all meta-data in ZK for a specific UUID when the corresponding actor is shut down -// FIXME delete tx log after migration of actor has been made and create a new one /** * @author Jonas Bonér */ -class ReplicationException(message: String) extends AkkaException(message) +class ReplicationException(message: String, cause: Throwable = null) extends AkkaException(message) { + def this(msg: String) = this(msg, null); +} /** * TODO: Explain something about threadsafety. @@ -58,39 +57,50 @@ class TransactionLog private ( /** * Record an Actor message invocation. + * + * @param invocation the MessageInvocation to record + * @param actorRef the LocalActorRef that received the message. + * @throws ReplicationException if the TransactionLog already is closed. */ - def recordEntry(messageHandle: MessageInvocation, actorRef: LocalActorRef) { + def recordEntry(invocation: MessageInvocation, actorRef: LocalActorRef) { val entryId = ledger.getLastAddPushed + 1 - if (entryId != 0 && (entryId % snapshotFrequency) == 0) { - recordSnapshot(toBinary(actorRef, false, replicationScheme)) - } else recordEntry(MessageSerializer.serialize(messageHandle.message.asInstanceOf[AnyRef]).toByteArray) + val needsSnapshot = entryId != 0 && (entryId % snapshotFrequency) == 0 + + if (needsSnapshot) { + //todo: could it be that the message is never persisted when a snapshot is added? + val bytes = toBinary(actorRef, false, replicationScheme) + recordSnapshot(bytes) + } else { + val bytes = MessageSerializer.serialize(invocation.message.asInstanceOf[AnyRef]).toByteArray + recordEntry(bytes) + } } /** * Record an entry. + * + * @param entry the entry in byte form to record. + * @throws ReplicationException if the TransactionLog already is closed. */ def recordEntry(entry: Array[Byte]) { if (isOpen.isOn) { - val bytes = + val entryBytes = if (Cluster.shouldCompressData) LZF.compress(entry) else entry + try { if (isAsync) { ledger.asyncAddEntry( - bytes, + entryBytes, new AsyncCallback.AddCallback { - def addComplete( - returnCode: Int, - ledgerHandle: LedgerHandle, - entryId: Long, - ctx: AnyRef) { + def addComplete(returnCode: Int, ledgerHandle: LedgerHandle, entryId: Long, ctx: AnyRef) { handleReturnCode(returnCode) EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId)) } }, null) } else { - handleReturnCode(ledger.addEntry(bytes)) + handleReturnCode(ledger.addEntry(entryBytes)) val entryId = ledger.getLastAddPushed EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId)) } @@ -102,22 +112,22 @@ class TransactionLog private ( /** * Record a snapshot. + * + * @param snapshot the snapshot in byteform to record. + * @throws ReplicationException if the TransactionLog already is closed. */ def recordSnapshot(snapshot: Array[Byte]) { if (isOpen.isOn) { - val bytes = + val snapshotBytes = if (Cluster.shouldCompressData) LZF.compress(snapshot) else snapshot + try { if (isAsync) { ledger.asyncAddEntry( - bytes, + snapshotBytes, new AsyncCallback.AddCallback { - def addComplete( - returnCode: Int, - ledgerHandle: LedgerHandle, - snapshotId: Long, - ctx: AnyRef) { + def addComplete(returnCode: Int, ledgerHandle: LedgerHandle, snapshotId: Long, ctx: AnyRef) { handleReturnCode(returnCode) EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId)) storeSnapshotMetaDataInZooKeeper(snapshotId) @@ -125,10 +135,18 @@ class TransactionLog private ( }, null) } else { - handleReturnCode(ledger.addEntry(bytes)) + //todo: could this be racy, since writing the snapshot itself and storing the snapsnot id, is not + //an atomic operation? + + //first store the snapshot. + handleReturnCode(ledger.addEntry(snapshotBytes)) val snapshotId = ledger.getLastAddPushed + //this is the location where all previous entries can be removed. + //TODO: how to remove data? + EventHandler.debug(this, "Writing snapshot to log [%s]".format(snapshotId)) + //and now store the snapshot metadata. storeSnapshotMetaDataInZooKeeper(snapshotId) } } catch { @@ -139,6 +157,8 @@ class TransactionLog private ( /** * Get all the entries for this transaction log. + * + * @throws ReplicationException if the TransactionLog already is closed. */ def entries: Vector[Array[Byte]] = entriesInRange(0, ledger.getLastAddConfirmed) @@ -168,26 +188,26 @@ class TransactionLog private ( /** * Get a range of entries from 'from' to 'to' for this transaction log. + * + * @param from the first element of the range + * @param the last index from the range (including). + * @return a Vector containing Byte Arrays. Each element in the vector is a record. + * @throws IllegalArgumenException if from or to is negative, or if 'from' is bigger than 'to'. + * @throws ReplicationException if the TransactionLog already is closed. */ def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] = if (isOpen.isOn) { try { if (from < 0) throw new IllegalArgumentException("'from' index can't be negative [" + from + "]") if (to < 0) throw new IllegalArgumentException("'to' index can't be negative [" + from + "]") if (to < from) throw new IllegalArgumentException("'to' index can't be smaller than 'from' index [" + from + "," + to + "]") - EventHandler.debug(this, - "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) + EventHandler.debug(this, "Reading entries [%s -> %s] for log [%s]".format(from, to, logId)) if (isAsync) { val future = new DefaultPromise[Vector[Array[Byte]]](timeout) ledger.asyncReadEntries( from, to, new AsyncCallback.ReadCallback { - def readComplete( - returnCode: Int, - ledgerHandle: LedgerHandle, - enumeration: Enumeration[LedgerEntry], - ctx: AnyRef) { - + def readComplete(returnCode: Int, ledgerHandle: LedgerHandle, enumeration: Enumeration[LedgerEntry], ctx: AnyRef) { val future = ctx.asInstanceOf[Promise[Vector[Array[Byte]]]] val entries = toByteArrays(enumeration) @@ -207,6 +227,8 @@ class TransactionLog private ( /** * Get the last entry written to this transaction log. + * + * Returns -1 if there has never been an entry. */ def latestEntryId: Long = ledger.getLastAddConfirmed @@ -216,8 +238,7 @@ class TransactionLog private ( def latestSnapshotId: Option[Long] = { try { val snapshotId = zkClient.readData(snapshotPath).asInstanceOf[Long] - EventHandler.debug(this, - "Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId)) + EventHandler.debug(this, "Retrieved latest snapshot id [%s] from transaction log [%s]".format(snapshotId, logId)) Some(snapshotId) } catch { case e: ZkNoNodeException ⇒ None @@ -226,7 +247,10 @@ class TransactionLog private ( } /** - * Delete all entries for this transaction log. + * Delete this transaction log. So all entries but also all metadata will be removed. + * + * TODO: Behavior unclear what happens when already deleted (what happens to the ledger). + * TODO: Behavior unclear what happens when already closed. */ def delete() { if (isOpen.isOn) { @@ -244,6 +268,10 @@ class TransactionLog private ( } else { bookieClient.deleteLedger(logId) } + + //also remote everything else that belongs to this TransactionLog. + zkClient.delete(snapshotPath) + zkClient.delete(txLogPath) } catch { case e ⇒ handleError(e) } @@ -252,6 +280,8 @@ class TransactionLog private ( /** * Close this transaction log. + * + * If already closed, the call is ignored. */ def close() { if (isOpen.switchOff) { @@ -303,8 +333,7 @@ class TransactionLog private ( } catch { case e ⇒ handleError(new ReplicationException( - "Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" + - id + "]")) + "Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" + id + "]")) } EventHandler.debug(this, "Writing snapshot [%s] to log [%s]".format(snapshotId, logId)) } else transactionClosedError @@ -398,18 +427,32 @@ object TransactionLog { } /** - * Creates a new transaction log for the 'id' specified. + * Checks if a TransactionLog for the given id already exists. */ - def newLogFor( - id: String, - isAsync: Boolean, - replicationScheme: ReplicationScheme): TransactionLog = { + def exists(id: String): Boolean = { + val txLogPath = transactionLogNode + "/" + id + zkClient.exists(txLogPath) + } + /** + * Creates a new transaction log for the 'id' specified. If a TransactionLog already exists for the id, + * it will be overwritten. + */ + def newLogFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = { val txLogPath = transactionLogNode + "/" + id val ledger = try { - if (zkClient.exists(txLogPath)) throw new ReplicationException( - "Transaction log for UUID [" + id + "] already exists") + if (exists(id)) { + //if it exists, we need to delete it first. This gives it the overwrite semantics we are looking for. + try { + val ledger = bookieClient.createLedger(ensembleSize, quorumSize, digestType, password) + val txLog = TransactionLog(ledger, id, false, null) + txLog.delete() + txLog.close() + } catch { + case e ⇒ handleError(e) + } + } val future = new DefaultPromise[LedgerHandle](timeout) if (isAsync) { @@ -438,13 +481,13 @@ object TransactionLog { try { zkClient.create(txLogPath, null, CreateMode.PERSISTENT) zkClient.writeData(txLogPath, logId) - logId + logId //TODO: does this have any effect? } catch { case e ⇒ bookieClient.deleteLedger(logId) // clean up handleError(new ReplicationException( "Could not store transaction log [" + logId + - "] meta-data in ZooKeeper for UUID [" + id + "]")) + "] meta-data in ZooKeeper for UUID [" + id + "]", e)) } EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id)) @@ -453,12 +496,10 @@ object TransactionLog { /** * Fetches an existing transaction log for the 'id' specified. + * + * @throws ReplicationException if the log with the given id doesn't exist. */ - def logFor( - id: String, - isAsync: Boolean, - replicationScheme: ReplicationScheme): TransactionLog = { - + def logFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = { val txLogPath = transactionLogNode + "/" + id val logId = try { @@ -479,10 +520,7 @@ object TransactionLog { bookieClient.asyncOpenLedger( logId, digestType, password, new AsyncCallback.OpenCallback { - def openComplete( - returnCode: Int, - ledgerHandle: LedgerHandle, - ctx: AnyRef) { + def openComplete(returnCode: Int, ledgerHandle: LedgerHandle, ctx: AnyRef) { val future = ctx.asInstanceOf[Promise[LedgerHandle]] if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle) else future.completeWithException(BKException.create(returnCode)) @@ -514,20 +552,29 @@ object TransactionLog { } /** + * TODO: Documentation. + * * @author Jonas Bonér */ object LocalBookKeeperEnsemble { private val isRunning = new Switch(false) + + //TODO: should probably come from the config file. private val port = 5555 @volatile private var localBookKeeper: LocalBookKeeper = _ /** - * TODO document method + * Starts the LocalBookKeeperEnsemble. + * + * Call can safely be made when already started. + * + * This call will block until it is started. */ def start() { isRunning switchOn { + EventHandler.info(this, "Starting LocalBookKeeperEnsemble") localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize) localBookKeeper.runZookeeper(port) localBookKeeper.initializeZookeper() @@ -537,7 +584,11 @@ object LocalBookKeeperEnsemble { } /** - * TODO document method + * Shuts down the LocalBookKeeperEnsemble. + * + * Call can safely bemade when already shutdown. + * + * This call will block until the shutdown completes. */ def shutdown() { isRunning switchOff { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala index bdc430ee6d..a3d8c44fe7 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/ClusterTestNode.scala @@ -13,28 +13,6 @@ import akka.util.Duration import System.{ currentTimeMillis ⇒ now } import java.io.File -import akka.actor.Deployer - -trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll { - def testNodes: Int - - override def beforeAll() = { - Cluster.startLocalCluster() - onReady() - ClusterTestNode.ready(getClass.getName) - } - - def onReady() = {} - - override def afterAll() = { - ClusterTestNode.waitForExits(getClass.getName, testNodes - 1) - ClusterTestNode.cleanUp(getClass.getName) - onShutdown() - Cluster.shutdownLocalCluster() - } - - def onShutdown() = {} -} trait ClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll { diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala new file mode 100644 index 0000000000..0d2b078d11 --- /dev/null +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MasterClusterTestNode.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2011 Typesafe Inc. + */ + +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.BeforeAndAfterAll + +trait MasterClusterTestNode extends WordSpec with MustMatchers with BeforeAndAfterAll { + def testNodes: Int + + override def beforeAll() = { + Cluster.startLocalCluster() + onReady() + ClusterTestNode.ready(getClass.getName) + } + + def onReady() = {} + + override def afterAll() = { + ClusterTestNode.waitForExits(getClass.getName, testNodes - 1) + ClusterTestNode.cleanUp(getClass.getName) + onShutdown() + Cluster.shutdownLocalCluster() + } + + def onShutdown() = {} +} + diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala index c40a06b404..1138cb7f46 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmSpec.scala @@ -47,8 +47,7 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1 extends Cluste for (i ← 0 until 10) (actorRef ? Count(i)).as[String] must be(Some("World from node [node1]")) } - barrier("start-node2", NrOfNodes) { - } + barrier("start-node2", NrOfNodes).await() node.shutdown() } @@ -64,11 +63,9 @@ class ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2 extends Master "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { - barrier("start-node1", NrOfNodes) { - } + barrier("start-node1", NrOfNodes).await() - barrier("create-actor-on-node1", NrOfNodes) { - } + barrier("create-actor-on-node1", NrOfNodes).await() barrier("start-node2", NrOfNodes) { node.start() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala index 80ad04a4df..bdb920451e 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writebehind/snapshot/ReplicationTransactionLogWriteBehindSnapshotMultiJvmSpec.scala @@ -68,8 +68,7 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode1 extends ClusterT (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") } - barrier("start-node2", NrOfNodes) { - } + barrier("start-node2", NrOfNodes).await() node.shutdown() } @@ -85,11 +84,9 @@ class ReplicationTransactionLogWriteBehindSnapshotMultiJvmNode2 extends MasterCl "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { - barrier("start-node1", NrOfNodes) { - } + barrier("start-node1", NrOfNodes).await() - barrier("create-actor-on-node1", NrOfNodes) { - } + barrier("create-actor-on-node1", NrOfNodes).await() barrier("start-node2", NrOfNodes) { node.start() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf index 42e57847b5..74957902ed 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode1.conf @@ -1,8 +1,6 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" -akka.actor.deployment.hello-world.clustered.replication-factor = 1 - akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf index cc2fb1ef3b..74957902ed 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteBehindNoSnapshotMultiJvmNode2.conf @@ -1,7 +1,6 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" -akka.actor.deployment.hello-world.clustered.replication-factor = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf index cc2fb1ef3b..74957902ed 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1.conf @@ -1,7 +1,6 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" -akka.actor.deployment.hello-world.clustered.replication-factor = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf index cc2fb1ef3b..74957902ed 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2.conf @@ -1,7 +1,6 @@ akka.enabled-modules = ["cluster"] akka.event-handler-level = "WARNING" akka.actor.deployment.hello-world.router = "direct" -akka.actor.deployment.hello-world.clustered.replication-factor = 1 akka.actor.deployment.hello-world.clustered.replication.storage = "transaction-log" akka.actor.deployment.hello-world.clustered.replication.strategy = "write-through" akka.cluster.replication.snapshot-frequency = 1000 diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala index 97fbb1c79b..f1f5a4d71b 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/nosnapshot/ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec.scala @@ -20,9 +20,11 @@ object ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmSpec { var log = "" def receive = { case Count(nr) ⇒ + println("Received number: " + nr) log += nr.toString self.reply("World from node [" + Config.nodename + "]") case GetLog ⇒ + println("Received getLog") self.reply(Log(log)) } } @@ -47,8 +49,7 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode1 extends Clust (actorRef ? Count(i)).as[String] must be(Some("World from node [node1]")) } - barrier("start-node2", NrOfNodes) { - } + barrier("start-node2", NrOfNodes).await() node.shutdown() } @@ -64,11 +65,9 @@ class ReplicationTransactionLogWriteThroughNoSnapshotMultiJvmNode2 extends Maste "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { - barrier("start-node1", NrOfNodes) { - } + barrier("start-node1", NrOfNodes).await() - barrier("create-actor-on-node1", NrOfNodes) { - } + barrier("create-actor-on-node1", NrOfNodes).await() barrier("start-node2", NrOfNodes) { node.start() diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala index b1136a5490..96ca99f074 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/replication/transactionlog/writethrough/snapshot/ReplicationTransactionLogWriteThroughSnapshotMultiJvmSpec.scala @@ -68,8 +68,7 @@ class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode1 extends Cluster (actorRef ? Count(counter)).as[String].get must be("World from node [node1]") } - barrier("start-node2", NrOfNodes) { - } + barrier("start-node2", NrOfNodes).await() node.shutdown() } @@ -85,11 +84,9 @@ class ReplicationTransactionLogWriteThroughSnapshotMultiJvmNode2 extends MasterC "be able to replicate an actor with a transaction log and replay transaction log after actor migration" in { - barrier("start-node1", NrOfNodes) { - } + barrier("start-node1", NrOfNodes).await() - barrier("create-actor-on-node1", NrOfNodes) { - } + barrier("create-actor-on-node1", NrOfNodes).await() barrier("start-node2", NrOfNodes) { node.start() diff --git a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala index 259c5179e1..b35d0b3d49 100644 --- a/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/TransactionLogSpec.scala @@ -3,33 +3,47 @@ */ package akka.cluster -import org.apache.bookkeeper.client.{ BookKeeper, BKException } -import BKException._ -import org.apache.zookeeper.server.ZooKeeperServer - +import org.apache.bookkeeper.client.BookKeeper import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers -import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach, Spec } - -import akka.serialization._ -import akka.actor._ -import ActorSerialization._ -import Actor._ - -import java.util.concurrent.{ CyclicBarrier, TimeUnit } -import java.io.File -import java.nio.ByteBuffer +import org.scalatest.BeforeAndAfterAll import com.eaio.uuid.UUID -import scala.collection.JavaConversions._ - class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterAll { private var bookKeeper: BookKeeper = _ private var localBookKeeper: LocalBookKeeper = _ // synchronous API - "A Transaction Log" should { + "A synchronous Transaction Log" should { + + "be able to be deleted - synchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + + txlog.delete() + txlog.close() + + val zkClient = TransactionLog.zkClient + assert(zkClient.readData(txlog.snapshotPath, true) == null) + assert(zkClient.readData(txlog.txLogPath, true) == null) + } + + "fail to be opened if non existing - synchronous" in { + val uuid = (new UUID).toString + intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) + } + + "be able to be checked for existence - synchronous" in { + val uuid = (new UUID).toString + TransactionLog.exists(uuid) must be(false) + + TransactionLog.newLogFor(uuid, false, null) + TransactionLog.exists(uuid) must be(true) + } + "be able to record entries - synchronous" in { val uuid = (new UUID).toString val txlog = TransactionLog.newLogFor(uuid, false, null) @@ -37,6 +51,19 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA txlog.recordEntry(entry) } + "be able to overweite an existing txlog if one already exists - synchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, false, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txLog2 = TransactionLog.newLogFor(uuid, false, null) + txLog2.latestSnapshotId.isDefined must be(false) + txLog2.latestEntryId must be(-1) + } + "be able to record and delete entries - synchronous" in { val uuid = (new UUID).toString val txlog1 = TransactionLog.newLogFor(uuid, false, null) @@ -45,7 +72,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA txlog1.recordEntry(entry) txlog1.delete txlog1.close - intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, false, null)) + intercept[ReplicationException](TransactionLog.logFor(uuid, false, null)) } "be able to record entries and read entries with 'entriesInRange' - synchronous" in { @@ -146,7 +173,7 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA } } - "A Transaction Log" should { + "An asynchronous Transaction Log" should { "be able to record entries - asynchronous" in { val uuid = (new UUID).toString val txlog = TransactionLog.newLogFor(uuid, true, null) @@ -156,6 +183,46 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA txlog.close } + "be able to be deleted - asynchronous" in { + val uuid = (new UUID).toString + val txlog = TransactionLog.newLogFor(uuid, true, null) + val entry = "hello".getBytes("UTF-8") + txlog.recordEntry(entry) + + txlog.delete() + txlog.close() + + val zkClient = TransactionLog.zkClient + assert(zkClient.readData(txlog.snapshotPath, true) == null) + assert(zkClient.readData(txlog.txLogPath, true) == null) + } + + "be able to be checked for existence - asynchronous" in { + val uuid = (new UUID).toString + TransactionLog.exists(uuid) must be(false) + + TransactionLog.newLogFor(uuid, true, null) + TransactionLog.exists(uuid) must be(true) + } + + "fail to be opened if non existing - asynchronous" in { + val uuid = (new UUID).toString + intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) + } + + "be able to overweite an existing txlog if one already exists - asynchronous" in { + val uuid = (new UUID).toString + val txlog1 = TransactionLog.newLogFor(uuid, true, null) + val entry = "hello".getBytes("UTF-8") + txlog1.recordEntry(entry) + txlog1.recordEntry(entry) + txlog1.close + + val txLog2 = TransactionLog.newLogFor(uuid, true, null) + txLog2.latestSnapshotId.isDefined must be(false) + txLog2.latestEntryId must be(-1) + } + "be able to record and delete entries - asynchronous" in { val uuid = (new UUID).toString val txlog1 = TransactionLog.newLogFor(uuid, true, null) @@ -167,8 +234,9 @@ class TransactionLogSpec extends WordSpec with MustMatchers with BeforeAndAfterA Thread.sleep(200) txlog1.delete Thread.sleep(200) - intercept[BKNoSuchLedgerExistsException](TransactionLog.logFor(uuid, true, null)) + intercept[ReplicationException](TransactionLog.logFor(uuid, true, null)) } + "be able to record entries and read entries with 'entriesInRange' - asynchronous" in { val uuid = (new UUID).toString val txlog1 = TransactionLog.newLogFor(uuid, true, null)