ticket #992: misc fixes for transaction log, processed review comments

This commit is contained in:
Peter Veentjer 2011-08-08 20:40:18 +03:00
parent 560701ab20
commit bd049718be
2 changed files with 24 additions and 24 deletions

View file

@ -1800,9 +1800,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
def handleRelease(message: ClusterProtocol.RemoteDaemonMessageProtocol) { def handleRelease(message: ClusterProtocol.RemoteDaemonMessageProtocol) {
if (message.hasActorUuid) { if (message.hasActorUuid) {
cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach { address cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) foreach cluster.release(_)
cluster.release(address)
}
} else if (message.hasActorAddress) { } else if (message.hasActorAddress) {
cluster release message.getActorAddress cluster release message.getActorAddress
} else { } else {
@ -1911,7 +1909,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
} }
self.reply(Success) self.reply(Success)
} catch { } catch {
case error case error:Throwable
self.reply(Failure(error)) self.reply(Failure(error))
throw error throw error
} }

View file

@ -31,7 +31,7 @@ import java.util.Enumeration
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class ReplicationException(message: String, cause: Throwable = null) extends AkkaException(message) { class ReplicationException(message: String, cause: Throwable = null) extends AkkaException(message) {
def this(msg: String) = this(msg, null); def this(msg: String) = this(msg, null)
} }
/** /**
@ -50,7 +50,7 @@ class TransactionLog private (
import TransactionLog._ import TransactionLog._
val logId = ledger.getId val logId = ledger.getId
val txLogPath = transactionLogNode + "/" + id val txLogPath = transactionLogPath(id)
val snapshotPath = txLogPath + "/snapshot" val snapshotPath = txLogPath + "/snapshot"
private val isOpen = new Switch(true) private val isOpen = new Switch(true)
@ -105,7 +105,7 @@ class TransactionLog private (
EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId)) EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
} }
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} else transactionClosedError } else transactionClosedError
} }
@ -150,7 +150,7 @@ class TransactionLog private (
storeSnapshotMetaDataInZooKeeper(snapshotId) storeSnapshotMetaDataInZooKeeper(snapshotId)
} }
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} else transactionClosedError } else transactionClosedError
} }
@ -221,7 +221,7 @@ class TransactionLog private (
toByteArrays(ledger.readEntries(from, to)) toByteArrays(ledger.readEntries(from, to))
} }
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} else transactionClosedError } else transactionClosedError
@ -242,7 +242,7 @@ class TransactionLog private (
Some(snapshotId) Some(snapshotId)
} catch { } catch {
case e: ZkNoNodeException None case e: ZkNoNodeException None
case e handleError(e) case e: Throwable handleError(e)
} }
} }
@ -273,7 +273,7 @@ class TransactionLog private (
zkClient.delete(snapshotPath) zkClient.delete(snapshotPath)
zkClient.delete(txLogPath) zkClient.delete(txLogPath)
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} }
} }
@ -302,7 +302,7 @@ class TransactionLog private (
ledger.close() ledger.close()
} }
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} }
} }
@ -325,13 +325,13 @@ class TransactionLog private (
zkClient.create(snapshotPath, null, CreateMode.PERSISTENT) zkClient.create(snapshotPath, null, CreateMode.PERSISTENT)
} catch { } catch {
case e: ZkNodeExistsException {} // do nothing case e: ZkNodeExistsException {} // do nothing
case e handleError(e) case e: Throwable handleError(e)
} }
try { try {
zkClient.writeData(snapshotPath, snapshotId) zkClient.writeData(snapshotPath, snapshotId)
} catch { } catch {
case e case e: Throwable
handleError(new ReplicationException( handleError(new ReplicationException(
"Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" + id + "]")) "Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" + id + "]"))
} }
@ -386,7 +386,7 @@ object TransactionLog {
zk.create(transactionLogNode, null, CreateMode.PERSISTENT) zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
} catch { } catch {
case e: ZkNodeExistsException {} // do nothing case e: ZkNodeExistsException {} // do nothing
case e handleError(e) case e: Throwable handleError(e)
} }
EventHandler.info(this, EventHandler.info(this,
@ -421,16 +421,18 @@ object TransactionLog {
bookieClient.halt() bookieClient.halt()
EventHandler.info(this, "Transaction log shut down successfully") EventHandler.info(this, "Transaction log shut down successfully")
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} }
} }
def transactionLogPath(id: String): String = transactionLogNode + "/" + id
/** /**
* Checks if a TransactionLog for the given id already exists. * Checks if a TransactionLog for the given id already exists.
*/ */
def exists(id: String): Boolean = { def exists(id: String): Boolean = {
val txLogPath = transactionLogNode + "/" + id val txLogPath = transactionLogPath(id)
zkClient.exists(txLogPath) zkClient.exists(txLogPath)
} }
@ -439,7 +441,7 @@ object TransactionLog {
* it will be overwritten. * it will be overwritten.
*/ */
def newLogFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = { def newLogFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id val txLogPath = transactionLogPath(id)
val ledger = try { val ledger = try {
if (exists(id)) { if (exists(id)) {
@ -450,7 +452,7 @@ object TransactionLog {
txLog.delete() txLog.delete()
txLog.close() txLog.close()
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
} }
@ -474,7 +476,7 @@ object TransactionLog {
bookieClient.createLedger(ensembleSize, quorumSize, digestType, password) bookieClient.createLedger(ensembleSize, quorumSize, digestType, password)
} }
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
val logId = ledger.getId val logId = ledger.getId
@ -483,7 +485,7 @@ object TransactionLog {
zkClient.writeData(txLogPath, logId) zkClient.writeData(txLogPath, logId)
logId //TODO: does this have any effect? logId //TODO: does this have any effect?
} catch { } catch {
case e case e: Throwable
bookieClient.deleteLedger(logId) // clean up bookieClient.deleteLedger(logId) // clean up
handleError(new ReplicationException( handleError(new ReplicationException(
"Could not store transaction log [" + logId + "Could not store transaction log [" + logId +
@ -500,7 +502,7 @@ object TransactionLog {
* @throws ReplicationException if the log with the given id doesn't exist. * @throws ReplicationException if the log with the given id doesn't exist.
*/ */
def logFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = { def logFor(id: String, isAsync: Boolean, replicationScheme: ReplicationScheme): TransactionLog = {
val txLogPath = transactionLogNode + "/" + id val txLogPath = transactionLogPath(id)
val logId = try { val logId = try {
val logId = zkClient.readData(txLogPath).asInstanceOf[Long] val logId = zkClient.readData(txLogPath).asInstanceOf[Long]
@ -511,7 +513,7 @@ object TransactionLog {
case e: ZkNoNodeException case e: ZkNoNodeException
handleError(new ReplicationException( handleError(new ReplicationException(
"Transaction log for UUID [" + id + "] does not exist in ZooKeeper")) "Transaction log for UUID [" + id + "] does not exist in ZooKeeper"))
case e handleError(e) case e: Throwable handleError(e)
} }
val ledger = try { val ledger = try {
@ -532,7 +534,7 @@ object TransactionLog {
bookieClient.openLedger(logId, digestType, password) bookieClient.openLedger(logId, digestType, password)
} }
} catch { } catch {
case e handleError(e) case e: Throwable handleError(e)
} }
TransactionLog(ledger, id, isAsync, replicationScheme) TransactionLog(ledger, id, isAsync, replicationScheme)