Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-05-18 13:43:37 +02:00
commit cd7538eace
11 changed files with 681 additions and 461 deletions

View file

@ -15,7 +15,9 @@ import akka.util.ReflectiveAccess
import akka.AkkaException
/**
* Programatic deployment configuration classes. Most values have defaults and can be left out.
* Programmatic deployment configuration classes. Most values have defaults and can be left out.
*
* todo: what does the concept Deploy
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -327,6 +329,8 @@ object Deployer {
}
/**
* TODO: Improved documentation
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object LocalDeployer {
@ -365,6 +369,8 @@ object LocalDeployer {
}
/**
* TODO: Improved documentation
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Address {

View file

@ -8,10 +8,17 @@ import org.apache.bookkeeper.proto.BookieServer
import java.io.File
/*
A simple use of BooKeeper is to implement a write-ahead transaction log. A server maintains an in-memory data structure (with periodic snapshots for example) and logs changes to that structure before it applies the change. The application server creates a ledger at startup and store the ledger id and password in a well known place (ZooKeeper maybe). When it needs to make a change, the server adds an entry with the change information to a ledger and apply the change when BookKeeper adds the entry successfully. The server can even use asyncAddEntry to queue up many changes for high change throughput. BooKeeper meticulously logs the changes in order and call the completion functions in order.
When the application server dies, a backup server will come online, get the last snapshot and then it will open the ledger of the old server and read all the entries from the time the snapshot was taken. (Since it doesn't know the last entry number it will use MAX_INTEGER). Once all the entries have been processed, it will close the ledger and start a new one for its use.
A simple use of BookKeeper is to implement a write-ahead transaction log. A server maintains an in-memory data structure
(with periodic snapshots for example) and logs changes to that structure before it applies the change. The application
server creates a ledger at startup and store the ledger id and password in a well known place (ZooKeeper maybe). When
it needs to make a change, the server adds an entry with the change information to a ledger and apply the change when
BookKeeper adds the entry successfully. The server can even use asyncAddEntry to queue up many changes for high change
throughput. BooKeeper meticulously logs the changes in order and call the completion functions in order.
When the application server dies, a backup server will come online, get the last snapshot and then it will open the
ledger of the old server and read all the entries from the time the snapshot was taken. (Since it doesn't know the last
entry number it will use MAX_INTEGER). Once all the entries have been processed, it will close the ledger and start a
new one for its use.
*/
/**
@ -24,8 +31,8 @@ object BookKeeperServer {
val ledgers = Array(new File("./bk/ledger"))
val bookie = new BookieServer(port, zkServers, journal, ledgers)
def start = {
bookie.start
bookie.join
def start() {
bookie.start()
bookie.join()
}
}

File diff suppressed because it is too large Load diff

View file

@ -34,8 +34,9 @@ class ClusterActorRef private[akka] (
def connections: Map[InetSocketAddress, ActorRef] = addresses.get.toMap
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
route(message)(senderOption)
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@ -48,7 +49,7 @@ class ClusterActorRef private[akka] (
addresses set (
addresses.get map { case (address, actorRef) =>
if (address == from) {
actorRef.stop
actorRef.stop()
(to, createRemoteActorRef(actorRef.uuid, to))
} else (address, actorRef)
}

View file

@ -23,18 +23,22 @@ import com.eaio.uuid.UUID
import java.util.concurrent.CountDownLatch
/**
* A ClusterDeployer is responsible for deploying a Deploy.
*
* big question is: what does Deploy mean?
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ClusterDeployer {
val clusterName = Cluster.name
val nodeName = new UUID().toString // FIXME how to configure node name? now using UUID
val clusterPath = "/%s" format clusterName
val clusterDeploymentLockPath = clusterPath + "/deployment-lock"
val deploymentPath = clusterPath + "/deployment"
val baseNodes = List(clusterPath, clusterDeploymentLockPath, deploymentPath)
val deploymentAddressPath = deploymentPath + "/%s"
val clusterName = Cluster.name
val nodeName = new UUID().toString // FIXME how to configure node name? now using UUID
val clusterPath = "/%s" format clusterName
val clusterDeploymentLockPath = clusterPath + "/deployment-lock"
val deploymentPath = clusterPath + "/deployment"
val baseNodes = List(clusterPath, clusterDeploymentLockPath, deploymentPath)
val deploymentAddressPath = deploymentPath + "/%s"
private val isConnected = new Switch(false)
private val isConnected = new Switch(false)
private val deploymentCompleted = new CountDownLatch(1)
private lazy val zkClient = {
@ -62,6 +66,7 @@ object ClusterDeployer {
zkClient.connection.getZookeeper, clusterDeploymentLockPath, null, clusterDeploymentLockListener) {
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
ownerIdField.setAccessible(true)
def leader: String = ownerIdField.get(this).asInstanceOf[String]
}
@ -69,31 +74,33 @@ object ClusterDeployer {
Deploy(
address = RemoteClusterDaemon.ADDRESS,
routing = Direct,
scope = Clustered(Deployer.defaultAddress, NoReplicas, Stateless))
scope = Clustered(Deployer.defaultAddress, NoReplicas, Stateless))
)
private[akka] def init(deployments: List[Deploy]) {
isConnected.switchOn {
baseNodes.foreach { path =>
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e =>
val error = new DeploymentException(e.toString)
EventHandler.error(error, this)
throw error
}
baseNodes.foreach {
path =>
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e =>
val error = new DeploymentException(e.toString)
EventHandler.error(error, this)
throw error
}
}
val allDeployments = deployments ::: systemDeployments
EventHandler.info(this, "Initializing cluster deployer")
if (deploymentLock.lock()) { // try to be the one doing the clustered deployment
if (deploymentLock.lock()) {
// try to be the one doing the clustered deployment
EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]")
allDeployments foreach (deploy(_)) // deploy
deploymentLock.unlock() // signal deployment complete
deploymentLock.unlock() // signal deployment complete
} else {
deploymentCompleted.await() // wait until deployment is completed
deploymentCompleted.await() // wait until deployment is completed
}
}
}
@ -106,7 +113,7 @@ object ClusterDeployer {
}
private[akka] def deploy(deployment: Deploy) {
val path = deploymentAddressPath.format(deployment.address)
val path = deploymentAddressPath.format(deployment.address)
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
zkClient.writeData(path, deployment)
@ -134,7 +141,7 @@ object ClusterDeployer {
private[akka] def undeployAll() {
try {
for {
child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
deployment <- lookupDeploymentFor(child)
} undeploy(deployment)
} catch {

View file

@ -1,8 +1,8 @@
package akka.cluster
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster
import Cluster._
import akka.actor._
@ -14,6 +14,7 @@ import akka.dispatch._
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.{ Map => JMap }
import akka.cluster.TransactionLog
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -25,6 +26,7 @@ trait Replicable { this: Actor =>
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
sealed trait ReplicationStrategy
object ReplicationStrategy {
case object Transient extends ReplicationStrategy
case object WriteThrough extends ReplicationStrategy
@ -49,7 +51,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String)
def start(): ActorRef = {
EventHandler.debug(this, "Starting ReplicatedActorRef for Actor [%s] with transaction log [%s]"
.format(address, txLog.logId))
actorRef.start
actorRef.start()
}
def stop() {
@ -57,28 +59,49 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String)
actorRef.stop()
}
override def setFaultHandler(handler: FaultHandlingStrategy) = actorRef.setFaultHandler(handler)
override def getFaultHandler(): FaultHandlingStrategy = actorRef.getFaultHandler()
override def setLifeCycle(lifeCycle: LifeCycle): Unit = actorRef.setLifeCycle(lifeCycle)
override def getLifeCycle(): LifeCycle = actorRef.getLifeCycle
def dispatcher_=(md: MessageDispatcher): Unit = actorRef.dispatcher_=(md)
override def setFaultHandler(handler: FaultHandlingStrategy) {
actorRef.setFaultHandler(handler)
}
override def getFaultHandler: FaultHandlingStrategy = actorRef.getFaultHandler()
override def setLifeCycle(lifeCycle: LifeCycle) {
actorRef.setLifeCycle(lifeCycle)
}
override def getLifeCycle: LifeCycle = actorRef.getLifeCycle
def dispatcher_=(md: MessageDispatcher) {
actorRef.dispatcher_=(md)
}
def dispatcher: MessageDispatcher = actorRef.dispatcher
def link(actorRef: ActorRef): Unit = actorRef.link(actorRef)
def unlink(actorRef: ActorRef): Unit = actorRef.unlink(actorRef)
def link(actorRef: ActorRef) {
actorRef.link(actorRef)
}
def unlink(actorRef: ActorRef) {
actorRef.unlink(actorRef)
}
def startLink(actorRef: ActorRef): ActorRef = actorRef.startLink(actorRef)
def supervisor: Option[ActorRef] = actorRef.supervisor
def linkedActors: JMap[Uuid, ActorRef] = actorRef.linkedActors
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = actorRef.postMessageToMailbox(message, senderOption)
protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
actorRef.postMessageToMailbox(message, senderOption)
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
= actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance
protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = actorRef.supervisor_=(sup)
protected[akka] def supervisor_=(sup: Option[ActorRef]) {
actorRef.supervisor_=(sup)
}
protected[akka] def mailbox: AnyRef = actorRef.mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = actorRef.mailbox_=(value)
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = actorRef.handleTrapExit(dead, reason)
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = actorRef.restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
actorRef.handleTrapExit(dead, reason)
}
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
}
protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
actorRef.restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
}
}

View file

@ -54,7 +54,7 @@ object Router {
trait Router {
def connections: Map[InetSocketAddress, ActorRef]
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
def route(message: Any)(implicit sender: Option[ActorRef])
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
}
@ -68,9 +68,10 @@ object Router {
connections.toList.map({ case (address, actor) => actor }).headOption
}
def route(message: Any)(implicit sender: Option[ActorRef]): Unit =
def route(message: Any)(implicit sender: Option[ActorRef]) {
if (connection.isDefined) connection.get.!(message)(sender)
else throw new RoutingException("No node connections for router")
else throw new RoutingException("No node connections for router")
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] =
if (connection.isDefined) connection.get.!!!(message, timeout)(sender)
@ -83,9 +84,10 @@ object Router {
trait Random extends Router {
private val random = new java.util.Random(System.currentTimeMillis)
def route(message: Any)(implicit sender: Option[ActorRef]): Unit =
def route(message: Any)(implicit sender: Option[ActorRef]) {
if (next.isDefined) next.get.!(message)(sender)
else throw new RoutingException("No node connections for router")
else throw new RoutingException("No node connections for router")
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] =
if (next.isDefined) next.get.!!!(message, timeout)(sender)
@ -107,9 +109,10 @@ object Router {
@volatile
private var current = items
def route(message: Any)(implicit sender: Option[ActorRef]): Unit =
def route(message: Any)(implicit sender: Option[ActorRef]) {
if (next.isDefined) next.get.!(message)(sender)
else throw new RoutingException("No node connections for router")
else throw new RoutingException("No node connections for router")
}
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] =
if (next.isDefined) next.get.!!!(message, timeout)(sender)

View file

@ -1,7 +1,8 @@
package akka.cluster
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.cluster
import org.apache.bookkeeper.client.{BookKeeper, LedgerHandle, LedgerEntry, BKException, AsyncCallback}
import org.apache.zookeeper.CreateMode
@ -19,27 +20,33 @@ import akka.cluster.zookeeper._
import java.util.Enumeration
import scala.collection.JavaConversions._
// FIXME allow user to choose dynamically between 'async' and 'sync' tx logging (asyncAddEntry(byte[] data, AddCallback cb, Object ctx))
// FIXME clean up old entries in log after doing a snapshot
// FIXME clean up all meta-data in ZK for a specific UUID when the corresponding actor is shut down
// FIXME delete tx log after migration of actor has been made and create a new one
/**
* TODO: Improved documentation,
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ReplicationException(message: String) extends AkkaException(message)
/**
* TODO: Improved documentation.
*
* TODO: Explain something about threadsafety.
*
* A TransactionLog makes chunks of data durable.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionLog private (
ledger: LedgerHandle, val id: String, val isAsync: Boolean) {
class TransactionLog private(ledger: LedgerHandle, val id: String, val isAsync: Boolean) {
import TransactionLog._
val logId = ledger.getId
val txLogPath = transactionLogNode + "/" + id
val logId = ledger.getId
val txLogPath = transactionLogNode + "/" + id
val snapshotPath = txLogPath + "/snapshot"
private val isOpen = new Switch(true)
@ -47,60 +54,64 @@ class TransactionLog private (
/**
* TODO document method
*/
def recordEntry(entry: Array[Byte]): Unit = if (isOpen.isOn) {
try {
if (isAsync) {
ledger.asyncAddEntry(
entry,
new AsyncCallback.AddCallback {
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
entryId: Long,
ctx: AnyRef) {
handleReturnCode(returnCode)
EventHandler.debug(this,
"Writing entry [%s] to log [%s]".format(entryId, logId))
}
},
null)
} else {
handleReturnCode(ledger.addEntry(entry))
val entryId = ledger.getLastAddPushed
EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
def recordEntry(entry: Array[Byte]) {
if (isOpen.isOn) {
try {
if (isAsync) {
ledger.asyncAddEntry(
entry,
new AsyncCallback.AddCallback {
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
entryId: Long,
ctx: AnyRef) {
handleReturnCode(returnCode)
EventHandler.debug(this,
"Writing entry [%s] to log [%s]".format(entryId, logId))
}
},
null)
} else {
handleReturnCode(ledger.addEntry(entry))
val entryId = ledger.getLastAddPushed
EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
}
} catch {
case e => handleError(e)
}
} catch {
case e => handleError(e)
}
} else transactionClosedError
} else transactionClosedError
}
/**
* TODO document method
*/
def recordSnapshot(snapshot: Array[Byte]): Unit = if (isOpen.isOn) {
try {
if (isAsync) {
ledger.asyncAddEntry(
snapshot,
new AsyncCallback.AddCallback {
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
entryId: Long,
ctx: AnyRef) {
handleReturnCode(returnCode)
storeSnapshotMetaDataInZooKeeper(entryId)
}
},
null)
} else {
handleReturnCode(ledger.addEntry(snapshot))
storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed)
def recordSnapshot(snapshot: Array[Byte]) {
if (isOpen.isOn) {
try {
if (isAsync) {
ledger.asyncAddEntry(
snapshot,
new AsyncCallback.AddCallback {
def addComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
entryId: Long,
ctx: AnyRef) {
handleReturnCode(returnCode)
storeSnapshotMetaDataInZooKeeper(entryId)
}
},
null)
} else {
handleReturnCode(ledger.addEntry(snapshot))
storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed)
}
} catch {
case e => handleError(e)
}
} catch {
case e => handleError(e)
}
} else transactionClosedError
} else transactionClosedError
}
/**
* TODO document method
@ -122,21 +133,22 @@ class TransactionLog private (
*/
def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] = if (isOpen.isOn) {
try {
if (from < 0) throw new IllegalArgumentException("'from' can't be negative [" + from + "]")
if (to < 0) throw new IllegalArgumentException("'to' can't be negative [" + from + "]")
if (from < 0) throw new IllegalArgumentException("'from' can't be negative [" + from + "]")
if (to < 0) throw new IllegalArgumentException("'to' can't be negative [" + from + "]")
if (to < from) throw new IllegalArgumentException("'to' can't be smaller than 'from' [" + from + "," + to + "]")
EventHandler.debug(this,
"Reading entries [%s -> %s] for log [%s]".format(from, to, logId))
if (isAsync) {
val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout)
ledger.asyncReadEntries(
from, to,
new AsyncCallback.ReadCallback {
def readComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
enumeration: Enumeration[LedgerEntry],
ctx: AnyRef) {
returnCode: Int,
ledgerHandle: LedgerHandle,
enumeration: Enumeration[LedgerEntry],
ctx: AnyRef) {
val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]]
var entries = Vector[Array[Byte]]()
while (enumeration.hasMoreElements) {
@ -179,7 +191,7 @@ class TransactionLog private (
case e: ZkNoNodeException =>
handleError(new ReplicationException(
"Transaction log for UUID [" + id +
"] does not have a snapshot recorded in ZooKeeper"))
"] does not have a snapshot recorded in ZooKeeper"))
case e => handleError(e)
}
}
@ -187,70 +199,76 @@ class TransactionLog private (
/**
* TODO document method
*/
def delete(): Unit = if (isOpen.isOn) {
EventHandler.debug(this, "Deleting transaction log [%s]".format(logId))
try {
if (isAsync) {
bookieClient.asyncDeleteLedger(
logId,
new AsyncCallback.DeleteCallback {
def deleteComplete(returnCode: Int, ctx: AnyRef) {
handleReturnCode(returnCode)
}
},
null)
} else {
bookieClient.deleteLedger(logId)
def delete() {
if (isOpen.isOn) {
EventHandler.debug(this, "Deleting transaction log [%s]".format(logId))
try {
if (isAsync) {
bookieClient.asyncDeleteLedger(
logId,
new AsyncCallback.DeleteCallback {
def deleteComplete(returnCode: Int, ctx: AnyRef) {
handleReturnCode(returnCode)
}
},
null)
} else {
bookieClient.deleteLedger(logId)
}
} catch {
case e => handleError(e)
}
} catch {
case e => handleError(e)
}
}
/**
* TODO document method
*/
def close(): Unit = if (isOpen.switchOff) {
EventHandler.debug(this, "Closing transaction log [%s]".format(logId))
try {
if (isAsync) {
ledger.asyncClose(
new AsyncCallback.CloseCallback {
def closeComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
handleReturnCode(returnCode)
}
},
null)
} else {
ledger.close
def close() {
if (isOpen.switchOff) {
EventHandler.debug(this, "Closing transaction log [%s]".format(logId))
try {
if (isAsync) {
ledger.asyncClose(
new AsyncCallback.CloseCallback {
def closeComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
handleReturnCode(returnCode)
}
},
null)
} else {
ledger.close()
}
} catch {
case e => handleError(e)
}
} catch {
case e => handleError(e)
}
}
private def storeSnapshotMetaDataInZooKeeper(snapshotId: Long): Unit = if (isOpen.isOn) {
try {
zkClient.create(snapshotPath, null, CreateMode.PERSISTENT)
} catch {
case e: ZkNodeExistsException => {} // do nothing
case e => handleError(e)
}
private def storeSnapshotMetaDataInZooKeeper(snapshotId: Long) {
if (isOpen.isOn) {
try {
zkClient.create(snapshotPath, null, CreateMode.PERSISTENT)
} catch {
case e: ZkNodeExistsException => {} // do nothing
case e => handleError(e)
}
try {
zkClient.writeData(snapshotPath, snapshotId)
} catch {
case e =>
handleError(new ReplicationException(
"Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" +
id +"]"))
}
EventHandler.debug(this,
"Writing snapshot [%s] to log [%s]".format(snapshotId, logId))
} else transactionClosedError
try {
zkClient.writeData(snapshotPath, snapshotId)
} catch {
case e =>
handleError(new ReplicationException(
"Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" +
id + "]"))
}
EventHandler.debug(this,
"Writing snapshot [%s] to log [%s]".format(snapshotId, logId))
} else transactionClosedError
}
private def handleReturnCode(block: => Long) {
val code = block.toInt
@ -261,7 +279,7 @@ class TransactionLog private (
private def transactionClosedError: Nothing = {
handleError(new ReplicationException(
"Transaction log [" + logId +
"] is closed. You need to open up new a new one with 'TransactionLog.logFor(id)'"))
"] is closed. You need to open up new a new one with 'TransactionLog.logFor(id)'"))
}
}
@ -272,14 +290,14 @@ object TransactionLog {
val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match {
case "CRC32" => BookKeeper.DigestType.CRC32
case "MAC" => BookKeeper.DigestType.MAC
case "MAC" => BookKeeper.DigestType.MAC
case unknown => throw new ConfigurationException(
"akka.cluster.replication.digest-type is invalid [" + unknown + "]")
"akka.cluster.replication.digest-type is invalid [" + unknown + "]")
}
val password = config.getString("akka.cluster.replication.password", "secret").getBytes("UTF-8")
val password = config.getString("akka.cluster.replication.password", "secret").getBytes("UTF-8")
val ensembleSize = config.getInt("akka.cluster.replication.ensemble-size", 3)
val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2)
val timeout = 5000 // FIXME make configurable
val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2)
val timeout = 5000 // FIXME make configurable
private[akka] val transactionLogNode = "/transaction-log-ids"
@ -298,15 +316,15 @@ object TransactionLog {
zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
} catch {
case e: ZkNodeExistsException => {} // do nothing
case e => handleError(e)
case e => handleError(e)
}
EventHandler.info(this,
("Transaction log service started with" +
"\n\tdigest type [%s]" +
"\n\tensemble size [%s]" +
"\n\tquorum size [%s]" +
"\n\tlogging time out [%s]").format(
"\n\tdigest type [%s]" +
"\n\tensemble size [%s]" +
"\n\tquorum size [%s]" +
"\n\tlogging time out [%s]").format(
digestType,
ensembleSize,
quorumSize,
@ -342,7 +360,7 @@ object TransactionLog {
val ledger = try {
if (zkClient.exists(txLogPath)) throw new ReplicationException(
"Transaction log for UUID [" + id +"] already exists")
"Transaction log for UUID [" + id + "] already exists")
val future = new DefaultCompletableFuture[LedgerHandle](timeout)
if (isAsync) {
@ -350,9 +368,9 @@ object TransactionLog {
ensembleSize, quorumSize, digestType, password,
new AsyncCallback.CreateCallback {
def createComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
@ -377,7 +395,7 @@ object TransactionLog {
bookieClient.deleteLedger(logId) // clean up
handleError(new ReplicationException(
"Could not store transaction log [" + logId +
"] meta-data in ZooKeeper for UUID [" + id +"]"))
"] meta-data in ZooKeeper for UUID [" + id + "]"))
}
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
@ -398,7 +416,7 @@ object TransactionLog {
} catch {
case e: ZkNoNodeException =>
handleError(new ReplicationException(
"Transaction log for UUID [" + id +"] does not exist in ZooKeeper"))
"Transaction log for UUID [" + id + "] does not exist in ZooKeeper"))
case e => handleError(e)
}
@ -409,9 +427,9 @@ object TransactionLog {
logId, digestType, password,
new AsyncCallback.OpenCallback {
def openComplete(
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
returnCode: Int,
ledgerHandle: LedgerHandle,
ctx: AnyRef) {
val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
@ -431,7 +449,7 @@ object TransactionLog {
private[akka] def await[T](future: CompletableFuture[T]): T = {
future.await
if (future.result.isDefined) future.result.get
if (future.result.isDefined) future.result.get
else if (future.exception.isDefined) handleError(future.exception.get)
else handleError(new ReplicationException("No result from async read of entries for transaction log"))
}
@ -458,8 +476,8 @@ object LocalBookKeeperEnsemble {
isRunning switchOn {
localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize)
localBookKeeper.runZookeeper(port)
localBookKeeper.initializeZookeper
localBookKeeper.runBookies
localBookKeeper.initializeZookeper()
localBookKeeper.runBookies()
EventHandler.info(this, "LocalBookKeeperEnsemble started successfully")
}
}
@ -473,9 +491,9 @@ object LocalBookKeeperEnsemble {
println("***************************** 1")
localBookKeeper.bs.foreach(_.shutdown()) // stop bookies
println("***************************** 2")
localBookKeeper.zkc.close() // stop zk client
localBookKeeper.zkc.close() // stop zk client
println("***************************** 3")
localBookKeeper.zks.shutdown() // stop zk server
localBookKeeper.zks.shutdown() // stop zk server
println("***************************** 4")
localBookKeeper.serverFactory.shutdown() // stop zk NIOServer
println("***************************** 5")

View file

@ -7,23 +7,28 @@ import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
/**
* todo: what is the purpose of this class?
*/
class AkkaZkClient(zkServers: String,
sessionTimeout: Int,
connectionTimeout: Int,
zkSerializer: ZkSerializer = new SerializableSerializer)
sessionTimeout: Int,
connectionTimeout: Int,
zkSerializer: ZkSerializer = new SerializableSerializer)
extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) {
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
def reconnect() {
getEventLock.lock
val zkLock = getEventLock
zkLock.lock()
try {
_connection.close
_connection.close()
_connection.connect(this)
} catch {
case e: InterruptedException => throw new ZkInterruptedException(e)
} finally {
getEventLock.unlock
zkLock.unlock()
}
}
}

View file

@ -23,10 +23,10 @@ object AkkaZooKeeper {
val zkServer = new ZkServer(
dataPath, logPath,
new IDefaultNameSpace() {
def createDefaultNameSpace(zkClient: ZkClient) = {}
def createDefaultNameSpace(zkClient: ZkClient) {}
},
port, tickTime)
zkServer.start
zkServer.start()
zkServer
}
}

View file

@ -34,18 +34,21 @@ object ZooKeeperBarrier {
def apply(zkClient: ZkClient, cluster: String, name: String, node: String, count: Int, timeout: Duration) =
new ZooKeeperBarrier(zkClient, cluster + "-" + name, node, count, timeout)
def ignore[E : Manifest](body: => Unit): Unit =
def ignore[E: Manifest](body: => Unit) {
try {
body
} catch {
case e if manifest[E].erasure.isAssignableFrom(e.getClass) => ()
}
}
}
/**
* Barrier based on Zookeeper barrier tutorial.
*/
class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: Int, timeout: Duration) extends IZkChildListener {
class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: Int, timeout: Duration)
extends IZkChildListener {
import ZooKeeperBarrier.{BarriersNode, ignore}
val barrier = BarriersNode + "/" + name
@ -57,10 +60,10 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
ignore[ZkNodeExistsException](zkClient.createPersistent(BarriersNode))
ignore[ZkNodeExistsException](zkClient.createPersistent(barrier))
def apply(body: => Unit) = {
def apply(body: => Unit) {
enter
body
leave
leave()
}
def enter = {
@ -75,7 +78,7 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
zkClient.subscribeChildChanges(barrier, this)
}
def leave = {
def leave() {
zkClient.delete(entry)
exitBarrier.await(timeout.length, timeout.unit)
if (zkClient.countChildren(barrier) > 0) {
@ -85,10 +88,10 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
zkClient.unsubscribeChildChanges(barrier, this)
}
def handleChildChange(path: String, children: JList[String]) = {
def handleChildChange(path: String, children: JList[String]) {
if (children.size <= 1) {
ignore[ZkNoNodeException](zkClient.delete(ready))
exitBarrier.countDown
exitBarrier.countDown()
}
}
}