diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala
index e6ca32e5d6..30816def5c 100644
--- a/akka-actor/src/main/scala/akka/actor/Deployer.scala
+++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala
@@ -15,7 +15,9 @@ import akka.util.ReflectiveAccess
import akka.AkkaException
/**
- * Programatic deployment configuration classes. Most values have defaults and can be left out.
+ * Programmatic deployment configuration classes. Most values have defaults and can be left out.
+ *
+ * todo: what does the concept Deploy
*
* @author Jonas Bonér
*/
@@ -327,6 +329,8 @@ object Deployer {
}
/**
+ * TODO: Improved documentation
+ *
* @author Jonas Bonér
*/
object LocalDeployer {
@@ -365,6 +369,8 @@ object LocalDeployer {
}
/**
+ * TODO: Improved documentation
+ *
* @author Jonas Bonér
*/
object Address {
diff --git a/akka-cluster/src/main/scala/akka/cluster/BookKeeperServer.scala b/akka-cluster/src/main/scala/akka/cluster/BookKeeperServer.scala
index 7db2d63a1a..24f9918fd8 100644
--- a/akka-cluster/src/main/scala/akka/cluster/BookKeeperServer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/BookKeeperServer.scala
@@ -8,10 +8,17 @@ import org.apache.bookkeeper.proto.BookieServer
import java.io.File
/*
-A simple use of BooKeeper is to implement a write-ahead transaction log. A server maintains an in-memory data structure (with periodic snapshots for example) and logs changes to that structure before it applies the change. The application server creates a ledger at startup and store the ledger id and password in a well known place (ZooKeeper maybe). When it needs to make a change, the server adds an entry with the change information to a ledger and apply the change when BookKeeper adds the entry successfully. The server can even use asyncAddEntry to queue up many changes for high change throughput. BooKeeper meticulously logs the changes in order and call the completion functions in order.
-
-When the application server dies, a backup server will come online, get the last snapshot and then it will open the ledger of the old server and read all the entries from the time the snapshot was taken. (Since it doesn't know the last entry number it will use MAX_INTEGER). Once all the entries have been processed, it will close the ledger and start a new one for its use.
+A simple use of BookKeeper is to implement a write-ahead transaction log. A server maintains an in-memory data structure
+(with periodic snapshots for example) and logs changes to that structure before it applies the change. The application
+server creates a ledger at startup and store the ledger id and password in a well known place (ZooKeeper maybe). When
+it needs to make a change, the server adds an entry with the change information to a ledger and apply the change when
+BookKeeper adds the entry successfully. The server can even use asyncAddEntry to queue up many changes for high change
+throughput. BooKeeper meticulously logs the changes in order and call the completion functions in order.
+When the application server dies, a backup server will come online, get the last snapshot and then it will open the
+ledger of the old server and read all the entries from the time the snapshot was taken. (Since it doesn't know the last
+entry number it will use MAX_INTEGER). Once all the entries have been processed, it will close the ledger and start a
+new one for its use.
*/
/**
@@ -24,8 +31,8 @@ object BookKeeperServer {
val ledgers = Array(new File("./bk/ledger"))
val bookie = new BookieServer(port, zkServers, journal, ledgers)
- def start = {
- bookie.start
- bookie.join
+ def start() {
+ bookie.start()
+ bookie.join()
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
index f17b4c9997..2772652fb5 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala
@@ -14,7 +14,6 @@ import org.I0Itec.zkclient.exception._
import java.util.concurrent.atomic.{AtomicBoolean, AtomicReference, AtomicInteger}
import java.util.concurrent.{ConcurrentSkipListSet, CopyOnWriteArrayList, Callable, ConcurrentHashMap}
-import java.util.{List => JList}
import java.net.InetSocketAddress
import javax.management.StandardMBean
@@ -42,6 +41,7 @@ import akka.cluster.zookeeper._
import com.eaio.uuid.UUID
import com.google.protobuf.ByteString
+import java.util.{List => JList}
// FIXME add watch for each node that when the entry for the node is removed then the node shuts itself down
// FIXME Provisioning data in ZK (file names etc) and files in S3 and on disk
@@ -55,39 +55,53 @@ class ClusterException(message: String) extends AkkaException(message)
*/
trait ClusterNodeMBean {
def start()
+
def stop()
def disconnect()
+
def reconnect()
+
def resign()
def isConnected: Boolean
def getRemoteServerHostname: String
+
def getRemoteServerPort: Int
def getNodeName: String
+
def getClusterName: String
+
def getZooKeeperServerAddresses: String
def getMemberNodes: Array[String]
+
def getLeader: String
def getUuidsForClusteredActors: Array[String]
+
def getAddressesForClusteredActors: Array[String]
def getUuidsForActorsInUse: Array[String]
+
def getAddressesForActorsInUse: Array[String]
def getNodesForActorInUseWithUuid(uuid: String): Array[String]
+
def getNodesForActorInUseWithAddress(address: String): Array[String]
def getUuidsForActorsInUseOnNode(nodeName: String): Array[String]
+
def getAddressesForActorsInUseOnNode(nodeName: String): Array[String]
def setConfigElement(key: String, value: String)
+
def getConfigElement(key: String): AnyRef
+
def removeConfigElement(key: String)
+
def getConfigElementKeys: Array[String]
}
@@ -98,17 +112,17 @@ trait ClusterNodeMBean {
*/
object Cluster {
val EMPTY_STRING = "".intern
- val UUID_PREFIX = "uuid:".intern
+ val UUID_PREFIX = "uuid:".intern
// config options
- val name = config.getString("akka.cluster.name", "default")
- val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
- val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
- val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
- val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
+ val name = config.getString("akka.cluster.name", "default")
+ val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
+ val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
+ val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
+ val connectionTimeout = Duration(config.getInt("akka.cluster.connection-timeout", 60), TIME_UNIT).toMillis.toInt
val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt
- val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
- val enableJMX = config.getBool("akka.enable-jmx", true)
+ val shouldCompressData = config.getBool("akka.cluster.use-compression", false)
+ val enableJMX = config.getBool("akka.enable-jmx", true)
/**
* Cluster membership change listener.
@@ -126,12 +140,19 @@ object Cluster {
case ThisNode.Expired => thisNodeExpired(client)
}
}
+
def nodeConnected(node: String, client: ClusterNode) {}
+
def nodeDisconnected(node: String, client: ClusterNode) {}
+
def newLeader(name: String, client: ClusterNode) {}
+
def thisNodeNewSession(client: ClusterNode) {}
+
def thisNodeConnected(client: ClusterNode) {}
+
def thisNodeDisconnected(client: ClusterNode) {}
+
def thisNodeExpired(client: ClusterNode) {}
}
@@ -142,22 +163,31 @@ object Cluster {
abstract class ChangeListenerAdapter extends ChangeListener
sealed trait ChangeNotification
+
case class NodeConnected(node: String) extends ChangeNotification
+
case class NodeDisconnected(node: String) extends ChangeNotification
+
case class NewLeader(name: String) extends ChangeNotification
+
case object NewSession extends ChangeNotification
+
object ThisNode {
+
case object Connected extends ChangeNotification
+
case object Disconnected extends ChangeNotification
+
case object Expired extends ChangeNotification
+
}
type Nodes = HashMap[NodeAddress, ClusterNode]
val defaultSerializer = new SerializableSerializer
- private val _zkServer = new AtomicReference[Option[ZkServer]](None)
- private val _nodes = new AtomicReference[Nodes](new Nodes)
+ private val _zkServer = new AtomicReference[Option[ZkServer]](None)
+ private val _nodes = new AtomicReference[Nodes](new Nodes)
private val _clusterNames = new ConcurrentSkipListSet[String]
private[cluster] def updateNodes(f: Nodes => Nodes) {
@@ -202,9 +232,9 @@ object Cluster {
* Creates a new cluster node; ClusterNode.
*/
def apply(
- nodeAddress: NodeAddress,
- zkServerAddresses: String = Cluster.zooKeeperServers,
- serializer: ZkSerializer = Cluster.defaultSerializer): ClusterNode =
+ nodeAddress: NodeAddress,
+ zkServerAddresses: String = Cluster.zooKeeperServers,
+ serializer: ZkSerializer = Cluster.defaultSerializer): ClusterNode =
newNode(nodeAddress, zkServerAddresses, serializer)
/**
@@ -229,9 +259,9 @@ object Cluster {
* Creates a new cluster node; ClusterNode.
*/
def newNode(
- nodeAddress: NodeAddress,
- zkServerAddresses: String,
- serializer: ZkSerializer): ClusterNode = {
+ nodeAddress: NodeAddress,
+ zkServerAddresses: String,
+ serializer: ZkSerializer): ClusterNode = {
if (nodeAddress eq null) throw new IllegalArgumentException("NodeAddress can't be null")
@@ -338,8 +368,10 @@ object Cluster {
def stringToUuid(uuid: String): UUID = {
if (uuid eq null) throw new ClusterException("UUID is null")
- if (uuid == "") throw new ClusterException("UUID is an empty string")
- try { new UUID(uuid) }
+ if (uuid == "") throw new ClusterException("UUID is an empty string")
+ try {
+ new UUID(uuid)
+ }
catch {
case e: StringIndexOutOfBoundsException =>
val error = new ClusterException("UUID not valid [" + uuid + "]")
@@ -362,10 +394,11 @@ object Cluster {
*
* @author Jonas Bonér
*/
-class ClusterNode private[akka] (
- val nodeAddress: NodeAddress,
- val zkServerAddresses: String,
- val serializer: ZkSerializer) extends ErrorHandler { self =>
+class ClusterNode private[akka](
+ val nodeAddress: NodeAddress,
+ val zkServerAddresses: String,
+ val serializer: ZkSerializer) extends ErrorHandler {
+ self =>
if (nodeAddress eq null) throw new IllegalArgumentException("'nodeAddress' can not be 'null'")
@@ -373,11 +406,11 @@ class ClusterNode private[akka] (
EventHandler.info(this,
("\nCreating cluster node with" +
- "\n\tnode name = [%s]" +
- "\n\tcluster name = [%s]" +
- "\n\tzookeeper server addresses = [%s]" +
- "\n\tserializer = [%s]")
- .format(nodeAddress.nodeName, nodeAddress.clusterName, zkServerAddresses, serializer))
+ "\n\tnode name = [%s]" +
+ "\n\tcluster name = [%s]" +
+ "\n\tzookeeper server addresses = [%s]" +
+ "\n\tserializer = [%s]")
+ .format(nodeAddress.nodeName, nodeAddress.clusterName, zkServerAddresses, serializer))
val remoteClientLifeCycleListener = actorOf(new Actor {
def receive = {
@@ -401,14 +434,14 @@ class ClusterNode private[akka] (
val clusterJmxObjectName = JMX.nameFor(nodeAddress.hostname, "monitoring", "cluster")
// static nodes
- val CLUSTER_NODE = "/" + nodeAddress.clusterName
- val MEMBERSHIP_NODE = CLUSTER_NODE + "/members"
- val CONFIGURATION_NODE = CLUSTER_NODE + "/config"
- val PROVISIONING_NODE = CLUSTER_NODE + "/provisioning"
- val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry"
- val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations"
+ val CLUSTER_NODE = "/" + nodeAddress.clusterName
+ val MEMBERSHIP_NODE = CLUSTER_NODE + "/members"
+ val CONFIGURATION_NODE = CLUSTER_NODE + "/config"
+ val PROVISIONING_NODE = CLUSTER_NODE + "/provisioning"
+ val ACTOR_REGISTRY_NODE = CLUSTER_NODE + "/actor-registry"
+ val ACTOR_LOCATIONS_NODE = CLUSTER_NODE + "/actor-locations"
val ACTOR_ADDRESS_TO_UUIDS_NODE = CLUSTER_NODE + "/actor-address-to-uuids"
- val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address"
+ val ACTORS_AT_NODE_NODE = CLUSTER_NODE + "/actors-at-address"
val baseNodes = List(
CLUSTER_NODE,
MEMBERSHIP_NODE,
@@ -421,16 +454,16 @@ class ClusterNode private[akka] (
val LEADER_ELECTION_NODE = CLUSTER_NODE + "/leader" // should NOT be part of 'baseNodes' only used by 'leaderLock'
- val isConnected = new Switch(false)
- val isLeader = new AtomicBoolean(false)
+ val isConnected = new Switch(false)
+ val isLeader = new AtomicBoolean(false)
val electionNumber = new AtomicInteger(Integer.MAX_VALUE)
- private val membershipNodePath = membershipPathFor(nodeAddress.nodeName)
+ private val membershipNodePath = membershipPathFor(nodeAddress.nodeName)
// local caches of ZK data
- private[akka] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
+ private[akka] val locallyCachedMembershipNodes = new ConcurrentSkipListSet[String]()
private[akka] val nodeNameToAddress: ConcurrentMap[String, InetSocketAddress] = new ConcurrentHashMap[String, InetSocketAddress]
- private[akka] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]]
+ private[akka] val locallyCheckedOutActors: ConcurrentMap[UUID, Array[Byte]] = new ConcurrentHashMap[UUID, Array[Byte]]
def membershipNodes: Array[String] = locallyCachedMembershipNodes.toList.toArray.asInstanceOf[Array[String]]
@@ -438,14 +471,14 @@ class ClusterNode private[akka] (
new ConcurrentHashMap[String, Tuple2[InetSocketAddress, ActorRef]]
// zookeeper listeners
- private val stateListener = new StateListener(this)
- private val membershipListener = new MembershipChildListener(this)
+ private val stateListener = new StateListener(this)
+ private val membershipListener = new MembershipChildListener(this)
// cluster node listeners
- private val changeListeners = new CopyOnWriteArrayList[ChangeListener]()
+ private val changeListeners = new CopyOnWriteArrayList[ChangeListener]()
// Address -> ClusterActorRef
- private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
+ private val clusterActorRefs = new Index[InetSocketAddress, ClusterActorRef]
// resources
private[cluster] val zkClient = new AkkaZkClient(zkServerAddresses, sessionTimeout, connectionTimeout, serializer)
@@ -470,6 +503,7 @@ class ClusterNode private[akka] (
// ugly hack, but what do you do? <--- haha epic
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
ownerIdField.setAccessible(true)
+
def leader: String = ownerIdField.get(this).asInstanceOf[String]
}
@@ -555,7 +589,9 @@ class ClusterNode private[akka] (
/**
* Explicitly resign from being a leader. If this node is not a leader then this operation is a no-op.
*/
- def resign() { if (isLeader.get) leaderLock.unlock() }
+ def resign() {
+ if (isLeader.get) leaderLock.unlock()
+ }
// =======================================
// Actor
@@ -567,8 +603,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorClass: Class[T], address: String)
- (implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, 0, false)
+ (actorClass: Class[T], address: String)
+ (implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, 0, false)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@@ -576,8 +612,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorClass: Class[T], address: String, replicationFactor: Int)
- (implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, replicationFactor, false)
+ (actorClass: Class[T], address: String, replicationFactor: Int)
+ (implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, replicationFactor, false)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@@ -585,8 +621,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorClass: Class[T], address: String, serializeMailbox: Boolean)
- (implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, 0, serializeMailbox)
+ (actorClass: Class[T], address: String, serializeMailbox: Boolean)
+ (implicit format: Format[T]): ClusterNode = store(Actor.actorOf(actorClass, address).start, 0, serializeMailbox)
/**
* Clusters an actor of a specific type. If the actor is already clustered then the clustered version will be updated
@@ -594,8 +630,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorClass: Class[T], address: String, replicationFactor: Int, serializeMailbox: Boolean)
- (implicit format: Format[T]): ClusterNode =
+ (actorClass: Class[T], address: String, replicationFactor: Int, serializeMailbox: Boolean)
+ (implicit format: Format[T]): ClusterNode =
store(Actor.actorOf(actorClass, address).start, replicationFactor, serializeMailbox)
/**
@@ -604,8 +640,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorRef: ActorRef)
- (implicit format: Format[T]): ClusterNode = store(actorRef, 0, false)
+ (actorRef: ActorRef)
+ (implicit format: Format[T]): ClusterNode = store(actorRef, 0, false)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@@ -613,8 +649,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorRef: ActorRef, replicationFactor: Int)
- (implicit format: Format[T]): ClusterNode = store(actorRef, replicationFactor, false)
+ (actorRef: ActorRef, replicationFactor: Int)
+ (implicit format: Format[T]): ClusterNode = store(actorRef, replicationFactor, false)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@@ -622,8 +658,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorRef: ActorRef, serializeMailbox: Boolean)
- (implicit format: Format[T]): ClusterNode = store(actorRef, 0, serializeMailbox)
+ (actorRef: ActorRef, serializeMailbox: Boolean)
+ (implicit format: Format[T]): ClusterNode = store(actorRef, 0, serializeMailbox)
/**
* Clusters an actor with UUID. If the actor is already clustered then the clustered version will be updated
@@ -631,8 +667,8 @@ class ClusterNode private[akka] (
* available durable store.
*/
def store[T <: Actor]
- (actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean)
- (implicit format: Format[T]): ClusterNode = if (isConnected.isOn) {
+ (actorRef: ActorRef, replicationFactor: Int, serializeMailbox: Boolean)
+ (implicit format: Format[T]): ClusterNode = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@@ -644,20 +680,22 @@ class ClusterNode private[akka] (
"Clustering actor [%s] with UUID [%s]".format(actorRef.address, uuid))
val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox)(format))
- else toBinary(actorRef)(format)
+ else toBinary(actorRef)(format)
val actorRegistryPath = actorRegistryPathFor(uuid)
// create UUID -> Array[Byte] for actor registry
if (zkClient.exists(actorRegistryPath)) zkClient.writeData(actorRegistryPath, actorBytes) // FIXME check for size and warn if too big
else {
zkClient.retryUntilConnected(new Callable[Either[String, Exception]]() {
- def call: Either[String, Exception] = {
- try {
- Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT))
- } catch { case e: KeeperException.NodeExistsException => Right(e) }
+ def call: Either[String, Exception] = {
+ try {
+ Left(zkClient.connection.create(actorRegistryPath, actorBytes, CreateMode.PERSISTENT))
+ } catch {
+ case e: KeeperException.NodeExistsException => Right(e)
}
+ }
}) match {
- case Left(path) => path
+ case Left(path) => path
case Right(exception) => actorRegistryPath
}
@@ -676,22 +714,23 @@ class ClusterNode private[akka] (
}
// create UUID -> Address registry
- ignore[ZkNodeExistsException]( zkClient.createPersistent(actorRegistryNodePathFor(uuid)) )
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
// create UUID -> Node registry
- ignore[ZkNodeExistsException]( zkClient.createPersistent(actorLocationsPathFor(uuid)) )
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorLocationsPathFor(uuid)))
// create ADDRESS -> UUIDs registry
- ignore[ZkNodeExistsException]( zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address)) )
- ignore[ZkNodeExistsException]( zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid)) )
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorAddressToUuidsPathFor(actorRef.address)))
+ ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid)))
}
val command = RemoteDaemonMessageProtocol.newBuilder
.setMessageType(USE)
.setActorUuid(uuidToUuidProtocol(uuid))
.build
- replicaConnectionsForReplicationFactor(replicationFactor) foreach { connection =>
- connection ! command
+ replicaConnectionsForReplicationFactor(replicationFactor) foreach {
+ connection =>
+ connection ! command
}
this
@@ -705,7 +744,7 @@ class ClusterNode private[akka] (
locallyCheckedOutActors.remove(uuid)
// warning: ordering matters here
- ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // remove ADDRESS to UUID mapping
+ ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAddressToUuidsPathFor(actorAddressForUuid(uuid)))) // remove ADDRESS to UUID mapping
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorRegistryPathFor(uuid)))
ignore[ZkNoNodeException](zkClient.deleteRecursive(actorLocationsPathFor(uuid)))
@@ -727,8 +766,9 @@ class ClusterNode private[akka] (
* Is the actor with uuid clustered or not?
*/
def isClustered(actorAddress: String): Boolean = if (isConnected.isOn) {
- actorUuidsForActorAddress(actorAddress) map { uuid =>
- zkClient.exists(actorRegistryPathFor(uuid))
+ actorUuidsForActorAddress(actorAddress) map {
+ uuid =>
+ zkClient.exists(actorRegistryPathFor(uuid))
} exists (_ == true)
} else false
@@ -741,8 +781,9 @@ class ClusterNode private[akka] (
* Is the actor with uuid in use or not?
*/
def isInUseOnNode(actorAddress: String, node: NodeAddress): Boolean = if (isConnected.isOn) {
- actorUuidsForActorAddress(actorAddress) map { uuid =>
- zkClient.exists(actorLocationsPathFor(uuid, node))
+ actorUuidsForActorAddress(actorAddress) map {
+ uuid =>
+ zkClient.exists(actorLocationsPathFor(uuid, node))
} exists (_ == true)
} else false
@@ -755,36 +796,39 @@ class ClusterNode private[akka] (
import akka.serialization.ActorSerialization._
- actorUuidsForActorAddress(actorAddress) map { uuid =>
- EventHandler.debug(this,
- "Checking out actor with UUID [%s] to be used on node [%s]".format(uuid, nodeAddress.nodeName))
+ actorUuidsForActorAddress(actorAddress) map {
+ uuid =>
+ EventHandler.debug(this,
+ "Checking out actor with UUID [%s] to be used on node [%s]".format(uuid, nodeAddress.nodeName))
- ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true))
- ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress)))
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true))
+ ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress)))
- // set home address
- ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
- ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteServerAddress)))
+ // set home address
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
+ ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, remoteServerAddress)))
- val actorPath = actorRegistryPathFor(uuid)
- zkClient.retryUntilConnected(new Callable[Either[Array[Byte], Exception]]() {
+ val actorPath = actorRegistryPathFor(uuid)
+ zkClient.retryUntilConnected(new Callable[Either[Array[Byte], Exception]]() {
def call: Either[Array[Byte], Exception] = {
try {
Left(if (shouldCompressData) LZF.uncompress(zkClient.connection.readData(actorPath, new Stat, false))
- else zkClient.connection.readData(actorPath, new Stat, false))
- } catch { case e: KeeperException.NodeExistsException => Right(e) }
+ else zkClient.connection.readData(actorPath, new Stat, false))
+ } catch {
+ case e: KeeperException.NodeExistsException => Right(e)
+ }
}
- }) match {
- case Left(bytes) =>
- locallyCheckedOutActors += (uuid -> bytes)
- // FIXME switch to ReplicatedActorRef here
- // val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
- val actor = fromBinary[T](bytes, remoteServerAddress)(format)
- remoteService.register(UUID_PREFIX + uuid, actor) // clustered refs are always registered and looked up by UUID
- actor.start()
- actor.asInstanceOf[LocalActorRef]
- case Right(exception) => throw exception
- }
+ }) match {
+ case Left(bytes) =>
+ locallyCheckedOutActors += (uuid -> bytes)
+ // FIXME switch to ReplicatedActorRef here
+ // val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
+ val actor = fromBinary[T](bytes, remoteServerAddress)(format)
+ remoteService.register(UUID_PREFIX + uuid, actor) // clustered refs are always registered and looked up by UUID
+ actor.start()
+ actor.asInstanceOf[LocalActorRef]
+ case Right(exception) => throw exception
+ }
}
} else Array.empty[LocalActorRef]
@@ -891,6 +935,7 @@ class ClusterNode private[akka] (
}
def refByAddress(actorAddress: String): ActorRef = {
+ //FIXME: unused uuids
val uuids = uuidsForActorAddress(actorAddress)
val actor = Router newRouter (
router, addresses,
@@ -914,7 +959,7 @@ class ClusterNode private[akka] (
* Migrate the actor from node 'from' to node 'to'.
*/
def migrate(
- from: NodeAddress, to: NodeAddress, actorAddress: String) {
+ from: NodeAddress, to: NodeAddress, actorAddress: String) {
isConnected ifOn {
if (from eq null) throw new IllegalArgumentException("NodeAddress 'from' can not be 'null'")
if (to eq null) throw new IllegalArgumentException("NodeAddress 'to' can not be 'null'")
@@ -952,8 +997,12 @@ class ClusterNode private[akka] (
* Returns the actor id for the actor with a specific UUID.
*/
def actorAddressForUuid(uuid: UUID): String = if (isConnected.isOn) {
- try { zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String] }
- catch { case e: ZkNoNodeException => "" }
+ try {
+ zkClient.readData(actorRegistryActorAddressPathFor(uuid)).asInstanceOf[String]
+ }
+ catch {
+ case e: ZkNoNodeException => ""
+ }
} else ""
/**
@@ -965,16 +1014,24 @@ class ClusterNode private[akka] (
* Returns the actor UUIDs for actor ID.
*/
def uuidsForActorAddress(actorAddress: String): Array[UUID] = if (isConnected.isOn) {
- try { zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] }
- catch { case e: ZkNoNodeException => Array[UUID]() }
+ try {
+ zkClient.getChildren(actorAddressToUuidsPathFor(actorAddress)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
+ }
+ catch {
+ case e: ZkNoNodeException => Array[UUID]()
+ }
} else Array.empty[UUID]
/**
* Returns the node names of all actors in use with UUID.
*/
def nodesForActorsInUseWithUuid(uuid: UUID): Array[String] = if (isConnected.isOn) {
- try { zkClient.getChildren(actorLocationsPathFor(uuid)).toList.toArray.asInstanceOf[Array[String]] }
- catch { case e: ZkNoNodeException => Array[String]() }
+ try {
+ zkClient.getChildren(actorLocationsPathFor(uuid)).toList.toArray.asInstanceOf[Array[String]]
+ }
+ catch {
+ case e: ZkNoNodeException => Array[String]()
+ }
} else Array.empty[String]
/**
@@ -982,9 +1039,14 @@ class ClusterNode private[akka] (
*/
def nodesForActorsInUseWithAddress(address: String): Array[String] = if (isConnected.isOn) {
flatten {
- actorUuidsForActorAddress(address) map { uuid =>
- try { zkClient.getChildren(actorLocationsPathFor(uuid)).toList.toArray.asInstanceOf[Array[String]] }
- catch { case e: ZkNoNodeException => Array[String]() }
+ actorUuidsForActorAddress(address) map {
+ uuid =>
+ try {
+ val list = zkClient.getChildren(actorLocationsPathFor(uuid))
+ list.toList.toArray.asInstanceOf[Array[String]]
+ } catch {
+ case e: ZkNoNodeException => Array[String]()
+ }
}
}
} else Array.empty[String]
@@ -993,8 +1055,12 @@ class ClusterNode private[akka] (
* Returns the UUIDs of all actors in use registered on a specific node.
*/
def uuidsForActorsInUseOnNode(nodeName: String): Array[UUID] = if (isConnected.isOn) {
- try { zkClient.getChildren(actorsAtNodePathFor(nodeName)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] }
- catch { case e: ZkNoNodeException => Array[UUID]() }
+ try {
+ zkClient.getChildren(actorsAtNodePathFor(nodeName)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
+ }
+ catch {
+ case e: ZkNoNodeException => Array[UUID]()
+ }
} else Array.empty[UUID]
/**
@@ -1002,8 +1068,12 @@ class ClusterNode private[akka] (
*/
def addressesForActorsInUseOnNode(nodeName: String): Array[String] = if (isConnected.isOn) {
val uuids =
- try { zkClient.getChildren(actorsAtNodePathFor(nodeName)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]] }
- catch { case e: ZkNoNodeException => Array[UUID]() }
+ try {
+ zkClient.getChildren(actorsAtNodePathFor(nodeName)).toList.map(new UUID(_)).toArray.asInstanceOf[Array[UUID]]
+ }
+ catch {
+ case e: ZkNoNodeException => Array[UUID]()
+ }
actorAddressForUuids(uuids)
} else Array.empty[String]
@@ -1012,8 +1082,9 @@ class ClusterNode private[akka] (
*/
def formatForActor[T <: Actor](actorAddress: String): Format[T] = {
- val formats = actorUuidsForActorAddress(actorAddress) map { uuid =>
- zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Format[T]]
+ val formats = actorUuidsForActorAddress(actorAddress) map {
+ uuid =>
+ zkClient.readData(actorRegistryFormatPathFor(uuid), new Stat).asInstanceOf[Format[T]]
}
val format = formats.head
@@ -1029,12 +1100,12 @@ class ClusterNode private[akka] (
def addressesForActor(actorAddress: String): Array[(UUID, InetSocketAddress)] = {
try {
for {
- uuid <- actorUuidsForActorAddress(actorAddress)
+ uuid <- actorUuidsForActorAddress(actorAddress)
address <- zkClient.getChildren(actorRegistryNodePathFor(uuid)).toList
} yield {
val tokenizer = new java.util.StringTokenizer(address, ":")
- val hostname = tokenizer.nextToken // hostname
- val port = tokenizer.nextToken.toInt // port
+ val hostname = tokenizer.nextToken // hostname
+ val port = tokenizer.nextToken.toInt // port
(uuid, new InetSocketAddress(hostname, port))
}
} catch {
@@ -1105,19 +1176,24 @@ class ClusterNode private[akka] (
EventHandler.debug(this,
"Adding config value [%s] under key [%s] in cluster registry".format(key, compressedBytes))
zkClient.retryUntilConnected(new Callable[Either[Unit, Exception]]() {
- def call: Either[Unit, Exception] = {
- try {
- Left(zkClient.connection.create(configurationPathFor(key), compressedBytes, CreateMode.PERSISTENT))
- } catch { case e: KeeperException.NodeExistsException =>
+ def call: Either[Unit, Exception] = {
+ try {
+ Left(zkClient.connection.create(configurationPathFor(key), compressedBytes, CreateMode.PERSISTENT))
+ } catch {
+ case e: KeeperException.NodeExistsException =>
try {
Left(zkClient.connection.writeData(configurationPathFor(key), compressedBytes))
- } catch { case e: Exception => Right(e) }
- }
+ } catch {
+ case e: Exception => Right(e)
+ }
}
- }) match {
- case Left(_) => { /* do nothing */ }
- case Right(exception) => throw exception
}
+ }) match {
+ case Left(_) => {
+ /* do nothing */
+ }
+ case Right(exception) => throw exception
+ }
}
/**
@@ -1159,23 +1235,29 @@ class ClusterNode private[akka] (
// Private
// =======================================
- private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_NODE, node)
+ private[cluster] def membershipPathFor(node: String) = "%s/%s".format(MEMBERSHIP_NODE, node)
- private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_NODE, key)
+ private[cluster] def configurationPathFor(key: String) = "%s/%s".format(CONFIGURATION_NODE, key)
- private[cluster] def actorAddressToUuidsPathFor(actorAddress: String) = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_NODE, actorAddress.replace('.', '_'))
+ private[cluster] def actorAddressToUuidsPathFor(actorAddress: String) = "%s/%s".format(ACTOR_ADDRESS_TO_UUIDS_NODE, actorAddress.replace('.', '_'))
+
+ private[cluster] def actorLocationsPathFor(uuid: UUID) = "%s/%s".format(ACTOR_LOCATIONS_NODE, uuid)
- private[cluster] def actorLocationsPathFor(uuid: UUID) = "%s/%s".format(ACTOR_LOCATIONS_NODE, uuid)
private[cluster] def actorLocationsPathFor(uuid: UUID, node: NodeAddress) =
"%s/%s/%s".format(ACTOR_LOCATIONS_NODE, uuid, node.nodeName)
- private[cluster] def actorsAtNodePathFor(node: String) = "%s/%s".format(ACTORS_AT_NODE_NODE, node)
- private[cluster] def actorAtNodePathFor(node: String, uuid: UUID) = "%s/%s/%s".format(ACTORS_AT_NODE_NODE, node, uuid)
+ private[cluster] def actorsAtNodePathFor(node: String) = "%s/%s".format(ACTORS_AT_NODE_NODE, node)
+
+ private[cluster] def actorAtNodePathFor(node: String, uuid: UUID) = "%s/%s/%s".format(ACTORS_AT_NODE_NODE, node, uuid)
+
+ private[cluster] def actorRegistryPathFor(uuid: UUID) = "%s/%s".format(ACTOR_REGISTRY_NODE, uuid)
+
+ private[cluster] def actorRegistryFormatPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "format")
+
+ private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "address")
+
+ private[cluster] def actorRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorRegistryPathFor(uuid), "node")
- private[cluster] def actorRegistryPathFor(uuid: UUID) = "%s/%s".format(ACTOR_REGISTRY_NODE, uuid)
- private[cluster] def actorRegistryFormatPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "format")
- private[cluster] def actorRegistryActorAddressPathFor(uuid: UUID) = "%s/%s".format(actorRegistryPathFor(uuid), "address")
- private[cluster] def actorRegistryNodePathFor(uuid: UUID): String = "%s/%s".format(actorRegistryPathFor(uuid), "node")
private[cluster] def actorRegistryNodePathFor(uuid: UUID, address: InetSocketAddress): String =
"%s/%s:%s".format(actorRegistryNodePathFor(uuid), address.getHostName, address.getPort)
@@ -1196,8 +1278,8 @@ class ClusterNode private[akka] (
val tokenizer = new java.util.StringTokenizer(address, ":")
tokenizer.nextToken // cluster name
tokenizer.nextToken // node name
- val hostname = tokenizer.nextToken // hostname
- val port = tokenizer.nextToken.toInt // port
+ val hostname = tokenizer.nextToken // hostname
+ val port = tokenizer.nextToken.toInt // port
new InetSocketAddress(hostname, port)
}
@@ -1214,7 +1296,9 @@ class ClusterNode private[akka] (
connectToAllReplicas()
val numberOfReplicas = replicaConnections.size
- val replicaConnectionsAsArray = replicaConnections.toList map { case (node, (address, actorRef)) => actorRef } // the ActorRefs
+ val replicaConnectionsAsArray = replicaConnections.toList map {
+ case (node, (address, actorRef)) => actorRef
+ } // the ActorRefs
if (numberOfReplicas < replicationFactor) {
throw new IllegalArgumentException(
@@ -1235,16 +1319,17 @@ class ClusterNode private[akka] (
* Connect to all available replicas unless already connected).
*/
private def connectToAllReplicas() {
- membershipNodes foreach { node =>
- if (!replicaConnections.contains(node)) {
- val address = addressForNode(node)
- val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort)
- replicaConnections.put(node, (address, clusterDaemon))
- }
+ membershipNodes foreach {
+ node =>
+ if (!replicaConnections.contains(node)) {
+ val address = addressForNode(node)
+ val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort)
+ replicaConnections.put(node, (address, clusterDaemon))
+ }
}
}
- private[cluster] def joinMembershipNode() {
+ private[cluster] def joinMembershipNode() {
nodeNameToAddress.put(nodeAddress.nodeName, remoteServerAddress)
try {
EventHandler.info(this,
@@ -1272,56 +1357,62 @@ class ClusterNode private[akka] (
}
private[cluster] def migrateFromFailedNodes[T <: Actor](currentSetOfClusterNodes: List[String]) = {
- findFailedNodes(currentSetOfClusterNodes).foreach { failedNodeName =>
+ findFailedNodes(currentSetOfClusterNodes).foreach {
+ failedNodeName =>
- val allNodes = locallyCachedMembershipNodes.toList
- val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
- val failedNodeIndex = allNodes.indexWhere(_ == failedNodeName)
+ val allNodes = locallyCachedMembershipNodes.toList
+ val myIndex = allNodes.indexWhere(_.endsWith(nodeAddress.nodeName))
+ val failedNodeIndex = allNodes.indexWhere(_ == failedNodeName)
- // Migrate to the successor of the failed node (using a sorted circular list of the node names)
- if ((failedNodeIndex == 0 && myIndex == locallyCachedMembershipNodes.size - 1) || // No leftmost successor exists, check the tail
- (failedNodeIndex == myIndex + 1)) { // Am I the leftmost successor?
+ // Migrate to the successor of the failed node (using a sorted circular list of the node names)
+ if ((failedNodeIndex == 0 && myIndex == locallyCachedMembershipNodes.size - 1) || // No leftmost successor exists, check the tail
+ (failedNodeIndex == myIndex + 1)) {
+ // Am I the leftmost successor?
- // Yes I am the node to migrate the actor to (can only be one in the cluster)
- val actorUuidsForFailedNode = zkClient.getChildren(actorsAtNodePathFor(failedNodeName))
- EventHandler.debug(this,
- "Migrating actors from failed node [%s] to node [%s]: Actor UUIDs [%s]"
- .format(failedNodeName, nodeAddress.nodeName, actorUuidsForFailedNode))
-
- actorUuidsForFailedNode.foreach { uuid =>
+ // Yes I am the node to migrate the actor to (can only be one in the cluster)
+ val actorUuidsForFailedNode = zkClient.getChildren(actorsAtNodePathFor(failedNodeName))
EventHandler.debug(this,
- "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
- .format(failedNodeName, uuid, nodeAddress.nodeName))
+ "Migrating actors from failed node [%s] to node [%s]: Actor UUIDs [%s]"
+ .format(failedNodeName, nodeAddress.nodeName, actorUuidsForFailedNode))
- val actorAddress = actorAddressForUuid(uuidFrom(uuid))
- migrateWithoutCheckingThatActorResidesOnItsHomeNode( // since the ephemeral node is already gone, so can't check
- NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
+ actorUuidsForFailedNode.foreach {
+ uuid =>
+ EventHandler.debug(this,
+ "Cluster node [%s] has failed, migrating actor with UUID [%s] to [%s]"
+ .format(failedNodeName, uuid, nodeAddress.nodeName))
- implicit val format: Format[T] = formatForActor(actorAddress)
- use(actorAddress) foreach { actor =>
- // FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
- //actor.homeAddress = remoteServerAddress
- val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress")
- homeAddress.setAccessible(true)
- homeAddress.set(actor, Some(remoteServerAddress))
+ val actorAddress = actorAddressForUuid(uuidFrom(uuid))
+ migrateWithoutCheckingThatActorResidesOnItsHomeNode(// since the ephemeral node is already gone, so can't check
+ NodeAddress(nodeAddress.clusterName, failedNodeName), nodeAddress, actorAddress)
- remoteService.register(uuid, actor)
+ implicit val format: Format[T] = formatForActor(actorAddress)
+ use(actorAddress) foreach {
+ actor =>
+ // FIXME remove ugly reflection when we have 1.0 final which has 'fromBinary(byte, homeAddress)(format)'
+ //actor.homeAddress = remoteServerAddress
+ val homeAddress = classOf[LocalActorRef].getDeclaredField("homeAddress")
+ homeAddress.setAccessible(true)
+ homeAddress.set(actor, Some(remoteServerAddress))
+
+ remoteService.register(uuid, actor)
+ }
+ }
+
+ // notify all available nodes that they should fail-over all connections from 'from' to 'to'
+ val from = nodeNameToAddress.get(failedNodeName)
+ val to = remoteServerAddress
+ val command = RemoteDaemonMessageProtocol.newBuilder
+ .setMessageType(FAIL_OVER_CONNECTIONS)
+ .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((from, to))))
+ .build
+ membershipNodes foreach {
+ node =>
+ replicaConnections.get(node) foreach {
+ case (_, connection) =>
+ connection ! command
+ }
}
}
-
- // notify all available nodes that they should fail-over all connections from 'from' to 'to'
- val from = nodeNameToAddress.get(failedNodeName)
- val to = remoteServerAddress
- val command = RemoteDaemonMessageProtocol.newBuilder
- .setMessageType(FAIL_OVER_CONNECTIONS)
- .setPayload(ByteString.copyFrom(Serializers.Java.toBinary((from, to))))
- .build
- membershipNodes foreach { node =>
- replicaConnections.get(node) foreach { case (_, connection) =>
- connection ! command
- }
- }
- }
}
}
@@ -1329,26 +1420,27 @@ class ClusterNode private[akka] (
* Used when the ephemeral "home" node is already gone, so we can't check.
*/
private def migrateWithoutCheckingThatActorResidesOnItsHomeNode(
- from: NodeAddress, to: NodeAddress, actorAddress: String) {
+ from: NodeAddress, to: NodeAddress, actorAddress: String) {
- actorUuidsForActorAddress(actorAddress) map { uuid =>
- val actorAddress = actorAddressForUuid(uuid)
+ actorUuidsForActorAddress(actorAddress) map {
+ uuid =>
+ val actorAddress = actorAddressForUuid(uuid)
- if (!isInUseOnNode(actorAddress, to)) {
- release(actorAddress)
+ if (!isInUseOnNode(actorAddress, to)) {
+ release(actorAddress)
- val newAddress = new InetSocketAddress(to.hostname, to.port)
- ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
- ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress)))
- ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to)))
- ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
+ val newAddress = new InetSocketAddress(to.hostname, to.port)
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorRegistryNodePathFor(uuid)))
+ ignore[ZkNodeExistsException](zkClient.createEphemeral(actorRegistryNodePathFor(uuid, newAddress)))
+ ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, to)))
+ ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid)))
- ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from)))
- ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid)))
+ ignore[ZkNoNodeException](zkClient.delete(actorLocationsPathFor(uuid, from)))
+ ignore[ZkNoNodeException](zkClient.delete(actorAtNodePathFor(from.nodeName, uuid)))
- // 'use' (check out) actor on the remote 'to' node
- useActorOnNode(to.nodeName, uuid)
- }
+ // 'use' (check out) actor on the remote 'to' node
+ useActorOnNode(to.nodeName, uuid)
+ }
}
}
@@ -1375,17 +1467,18 @@ class ClusterNode private[akka] (
}
private def createNodeStructureIfNeeded() {
- baseNodes.foreach { path =>
- try {
- zkClient.create(path, null, CreateMode.PERSISTENT)
- EventHandler.debug(this, "Created node [%s]".format(path))
- } catch {
- case e: ZkNodeExistsException => {} // do nothing
- case e =>
- val error = new ClusterException(e.toString)
- EventHandler.error(error, this, "")
- throw error
- }
+ baseNodes.foreach {
+ path =>
+ try {
+ zkClient.create(path, null, CreateMode.PERSISTENT)
+ EventHandler.debug(this, "Created node [%s]".format(path))
+ } catch {
+ case e: ZkNodeExistsException => {} // do nothing
+ case e =>
+ val error = new ClusterException(e.toString)
+ EventHandler.error(error, this, "")
+ throw error
+ }
}
}
@@ -1402,43 +1495,70 @@ class ClusterNode private[akka] (
private def createMBean = {
val clusterMBean = new StandardMBean(classOf[ClusterNodeMBean]) with ClusterNodeMBean {
+
import Cluster._
- override def start() {self.start()}
- override def stop() {self.stop()}
+ override def start() {
+ self.start()
+ }
+
+ override def stop() {
+ self.stop()
+ }
override def disconnect() = self.disconnect()
- override def reconnect() {self.reconnect()}
- override def resign() {self.resign()}
- override def isConnected = self.isConnected.isOn
+ override def reconnect() {
+ self.reconnect()
+ }
- override def getRemoteServerHostname = self.nodeAddress.hostname
- override def getRemoteServerPort = self.nodeAddress.port
+ override def resign() {
+ self.resign()
+ }
- override def getNodeName = self.nodeAddress.nodeName
- override def getClusterName = self.nodeAddress.clusterName
- override def getZooKeeperServerAddresses = self.zkServerAddresses
+ override def isConnected = self.isConnected.isOn
- override def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray
- override def getLeader = self.leader.toString
+ override def getRemoteServerHostname = self.nodeAddress.hostname
- override def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray
- override def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray
+ override def getRemoteServerPort = self.nodeAddress.port
- override def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray
- override def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray
+ override def getNodeName = self.nodeAddress.nodeName
+
+ override def getClusterName = self.nodeAddress.clusterName
+
+ override def getZooKeeperServerAddresses = self.zkServerAddresses
+
+ override def getMemberNodes = self.locallyCachedMembershipNodes.iterator.map(_.toString).toArray
+
+ override def getLeader = self.leader.toString
+
+ override def getUuidsForActorsInUse = self.uuidsForActorsInUse.map(_.toString).toArray
+
+ override def getAddressesForActorsInUse = self.addressesForActorsInUse.map(_.toString).toArray
+
+ override def getUuidsForClusteredActors = self.uuidsForClusteredActors.map(_.toString).toArray
+
+ override def getAddressesForClusteredActors = self.addressesForClusteredActors.map(_.toString).toArray
+
+ override def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid))
- override def getNodesForActorInUseWithUuid(uuid: String) = self.nodesForActorsInUseWithUuid(stringToUuid(uuid))
override def getNodesForActorInUseWithAddress(id: String) = self.nodesForActorsInUseWithAddress(id)
- override def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray
- override def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray
+ override def getUuidsForActorsInUseOnNode(nodeName: String) = self.uuidsForActorsInUseOnNode(nodeName).map(_.toString).toArray
- override def setConfigElement(key: String, value: String) {self.setConfigElement(key, value.getBytes("UTF-8"))}
- override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8")
- override def removeConfigElement(key: String) { self.removeConfigElement(key)}
- override def getConfigElementKeys =self.getConfigElementKeys.toArray
+ override def getAddressesForActorsInUseOnNode(nodeName: String) = self.addressesForActorsInUseOnNode(nodeName).map(_.toString).toArray
+
+ override def setConfigElement(key: String, value: String) {
+ self.setConfigElement(key, value.getBytes("UTF-8"))
+ }
+
+ override def getConfigElement(key: String) = new String(self.getConfigElement(key), "UTF-8")
+
+ override def removeConfigElement(key: String) {
+ self.removeConfigElement(key)
+ }
+
+ override def getConfigElementKeys = self.getConfigElementKeys.toArray
}
JMX.register(clusterJmxObjectName, clusterMBean)
@@ -1514,7 +1634,9 @@ trait ErrorHandler {
try {
body
} catch {
- case e: org.I0Itec.zkclient.exception.ZkInterruptedException => { /* ignore */ }
+ case e: org.I0Itec.zkclient.exception.ZkInterruptedException => {
+ /* ignore */
+ }
case e: Throwable =>
EventHandler.error(e, this, e.toString)
throw e
@@ -1536,6 +1658,7 @@ object RemoteClusterDaemon {
* @author Jonas Bonér
*/
class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
+
import RemoteClusterDaemon._
import Cluster._
@@ -1560,20 +1683,24 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
"None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]".format(message))
case RELEASE =>
- if (message.hasActorUuid) { cluster release cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) }
- else if (message.hasActorAddress) { cluster release message.getActorAddress }
+ if (message.hasActorUuid) {
+ cluster release cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid))
+ }
+ else if (message.hasActorAddress) {
+ cluster release message.getActorAddress
+ }
else EventHandler.warning(this,
"None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message))
- case START => cluster.start()
+ case START => cluster.start()
- case STOP => cluster.stop()
+ case STOP => cluster.stop()
- case DISCONNECT => cluster.disconnect()
+ case DISCONNECT => cluster.disconnect()
- case RECONNECT => cluster.reconnect()
+ case RECONNECT => cluster.reconnect()
- case RESIGN => cluster.resign()
+ case RESIGN => cluster.resign()
case FAIL_OVER_CONNECTIONS =>
val (from, to) = payloadFor(message, classOf[(InetSocketAddress, InetSocketAddress)])
@@ -1582,32 +1709,52 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case FUNCTION_FUN0_UNIT =>
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
+
def receive = {
- case f: Function0[Unit] => try { f() } finally { self.stop() }
+ case f: Function0[Unit] => try {
+ f()
+ } finally {
+ self.stop()
+ }
}
}).start ! payloadFor(message, classOf[Function0[Unit]])
case FUNCTION_FUN0_ANY =>
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
+
def receive = {
- case f: Function0[Any] => try { self.reply(f()) } finally { self.stop() }
+ case f: Function0[Any] => try {
+ self.reply(f())
+ } finally {
+ self.stop()
+ }
}
}).start forward payloadFor(message, classOf[Function0[Any]])
case FUNCTION_FUN1_ARG_UNIT =>
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
+
def receive = {
- case t: Tuple2[Function1[Any, Unit], Any] => try { t._1(t._2) } finally { self.stop() }
+ case t: Tuple2[Function1[Any, Unit], Any] => try {
+ t._1(t._2)
+ } finally {
+ self.stop()
+ }
}
}).start ! payloadFor(message, classOf[Tuple2[Function1[Any, Unit], Any]])
case FUNCTION_FUN1_ARG_ANY =>
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
+
def receive = {
- case t: Tuple2[Function1[Any, Any], Any] => try { self.reply(t._1(t._2)) } finally { self.stop() }
+ case t: Tuple2[Function1[Any, Any], Any] => try {
+ self.reply(t._1(t._2))
+ } finally {
+ self.stop()
+ }
}
}).start forward payloadFor(message, classOf[Tuple2[Function1[Any, Any], Any]])
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
index fd4e91b513..eb614a05fe 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala
@@ -34,8 +34,9 @@ class ClusterActorRef private[akka] (
def connections: Map[InetSocketAddress, ActorRef] = addresses.get.toMap
- override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
+ override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
route(message)(senderOption)
+ }
override def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
@@ -48,7 +49,7 @@ class ClusterActorRef private[akka] (
addresses set (
addresses.get map { case (address, actorRef) =>
if (address == from) {
- actorRef.stop
+ actorRef.stop()
(to, createRemoteActorRef(actorRef.uuid, to))
} else (address, actorRef)
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
index d86105f59f..e9661bf5f8 100644
--- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala
@@ -23,18 +23,22 @@ import com.eaio.uuid.UUID
import java.util.concurrent.CountDownLatch
/**
+ * A ClusterDeployer is responsible for deploying a Deploy.
+ *
+ * big question is: what does Deploy mean?
+ *
* @author Jonas Bonér
*/
object ClusterDeployer {
- val clusterName = Cluster.name
- val nodeName = new UUID().toString // FIXME how to configure node name? now using UUID
- val clusterPath = "/%s" format clusterName
- val clusterDeploymentLockPath = clusterPath + "/deployment-lock"
- val deploymentPath = clusterPath + "/deployment"
- val baseNodes = List(clusterPath, clusterDeploymentLockPath, deploymentPath)
- val deploymentAddressPath = deploymentPath + "/%s"
+ val clusterName = Cluster.name
+ val nodeName = new UUID().toString // FIXME how to configure node name? now using UUID
+ val clusterPath = "/%s" format clusterName
+ val clusterDeploymentLockPath = clusterPath + "/deployment-lock"
+ val deploymentPath = clusterPath + "/deployment"
+ val baseNodes = List(clusterPath, clusterDeploymentLockPath, deploymentPath)
+ val deploymentAddressPath = deploymentPath + "/%s"
- private val isConnected = new Switch(false)
+ private val isConnected = new Switch(false)
private val deploymentCompleted = new CountDownLatch(1)
private lazy val zkClient = {
@@ -62,6 +66,7 @@ object ClusterDeployer {
zkClient.connection.getZookeeper, clusterDeploymentLockPath, null, clusterDeploymentLockListener) {
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
ownerIdField.setAccessible(true)
+
def leader: String = ownerIdField.get(this).asInstanceOf[String]
}
@@ -69,31 +74,33 @@ object ClusterDeployer {
Deploy(
address = RemoteClusterDaemon.ADDRESS,
routing = Direct,
- scope = Clustered(Deployer.defaultAddress, NoReplicas, Stateless))
+ scope = Clustered(Deployer.defaultAddress, NoReplicas, Stateless))
)
private[akka] def init(deployments: List[Deploy]) {
isConnected.switchOn {
- baseNodes.foreach { path =>
- try {
- ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
- EventHandler.debug(this, "Created node [%s]".format(path))
- } catch {
- case e =>
- val error = new DeploymentException(e.toString)
- EventHandler.error(error, this)
- throw error
- }
+ baseNodes.foreach {
+ path =>
+ try {
+ ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
+ EventHandler.debug(this, "Created node [%s]".format(path))
+ } catch {
+ case e =>
+ val error = new DeploymentException(e.toString)
+ EventHandler.error(error, this)
+ throw error
+ }
}
val allDeployments = deployments ::: systemDeployments
EventHandler.info(this, "Initializing cluster deployer")
- if (deploymentLock.lock()) { // try to be the one doing the clustered deployment
+ if (deploymentLock.lock()) {
+ // try to be the one doing the clustered deployment
EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]")
allDeployments foreach (deploy(_)) // deploy
- deploymentLock.unlock() // signal deployment complete
+ deploymentLock.unlock() // signal deployment complete
} else {
- deploymentCompleted.await() // wait until deployment is completed
+ deploymentCompleted.await() // wait until deployment is completed
}
}
}
@@ -106,7 +113,7 @@ object ClusterDeployer {
}
private[akka] def deploy(deployment: Deploy) {
- val path = deploymentAddressPath.format(deployment.address)
+ val path = deploymentAddressPath.format(deployment.address)
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
zkClient.writeData(path, deployment)
@@ -134,7 +141,7 @@ object ClusterDeployer {
private[akka] def undeployAll() {
try {
for {
- child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
+ child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
deployment <- lookupDeploymentFor(child)
} undeploy(deployment)
} catch {
diff --git a/akka-cluster/src/main/scala/akka/cluster/replication/ReplicatedClusterRef.scala b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala
similarity index 67%
rename from akka-cluster/src/main/scala/akka/cluster/replication/ReplicatedClusterRef.scala
rename to akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala
index b28dea5a58..864870d2ec 100644
--- a/akka-cluster/src/main/scala/akka/cluster/replication/ReplicatedClusterRef.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/ReplicatedClusterRef.scala
@@ -1,8 +1,8 @@
+package akka.cluster
+
/**
* Copyright (C) 2009-2011 Scalable Solutions AB
*/
-package akka.cluster
-
import Cluster._
import akka.actor._
@@ -14,6 +14,7 @@ import akka.dispatch._
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
import java.util.{ Map => JMap }
+import akka.cluster.TransactionLog
/**
* @author Jonas Bonér
@@ -25,6 +26,7 @@ trait Replicable { this: Actor =>
* @author Jonas Bonér
*/
sealed trait ReplicationStrategy
+
object ReplicationStrategy {
case object Transient extends ReplicationStrategy
case object WriteThrough extends ReplicationStrategy
@@ -49,7 +51,7 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String)
def start(): ActorRef = {
EventHandler.debug(this, "Starting ReplicatedActorRef for Actor [%s] with transaction log [%s]"
.format(address, txLog.logId))
- actorRef.start
+ actorRef.start()
}
def stop() {
@@ -57,28 +59,49 @@ class ReplicatedActorRef private[akka] (actorRef: ActorRef, val address: String)
actorRef.stop()
}
- override def setFaultHandler(handler: FaultHandlingStrategy) = actorRef.setFaultHandler(handler)
- override def getFaultHandler(): FaultHandlingStrategy = actorRef.getFaultHandler()
- override def setLifeCycle(lifeCycle: LifeCycle): Unit = actorRef.setLifeCycle(lifeCycle)
- override def getLifeCycle(): LifeCycle = actorRef.getLifeCycle
- def dispatcher_=(md: MessageDispatcher): Unit = actorRef.dispatcher_=(md)
+ override def setFaultHandler(handler: FaultHandlingStrategy) {
+ actorRef.setFaultHandler(handler)
+ }
+ override def getFaultHandler: FaultHandlingStrategy = actorRef.getFaultHandler()
+ override def setLifeCycle(lifeCycle: LifeCycle) {
+ actorRef.setLifeCycle(lifeCycle)
+ }
+ override def getLifeCycle: LifeCycle = actorRef.getLifeCycle
+ def dispatcher_=(md: MessageDispatcher) {
+ actorRef.dispatcher_=(md)
+ }
def dispatcher: MessageDispatcher = actorRef.dispatcher
- def link(actorRef: ActorRef): Unit = actorRef.link(actorRef)
- def unlink(actorRef: ActorRef): Unit = actorRef.unlink(actorRef)
+ def link(actorRef: ActorRef) {
+ actorRef.link(actorRef)
+ }
+ def unlink(actorRef: ActorRef) {
+ actorRef.unlink(actorRef)
+ }
def startLink(actorRef: ActorRef): ActorRef = actorRef.startLink(actorRef)
def supervisor: Option[ActorRef] = actorRef.supervisor
def linkedActors: JMap[Uuid, ActorRef] = actorRef.linkedActors
- protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = actorRef.postMessageToMailbox(message, senderOption)
+ protected[akka] def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {
+ actorRef.postMessageToMailbox(message, senderOption)
+ }
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
- senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
+ senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T]
+ = actorRef.postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout, senderOption, senderFuture)
protected[akka] def actorInstance: AtomicReference[Actor] = actorRef.actorInstance
- protected[akka] def supervisor_=(sup: Option[ActorRef]): Unit = actorRef.supervisor_=(sup)
+ protected[akka] def supervisor_=(sup: Option[ActorRef]) {
+ actorRef.supervisor_=(sup)
+ }
protected[akka] def mailbox: AnyRef = actorRef.mailbox
protected[akka] def mailbox_=(value: AnyRef): AnyRef = actorRef.mailbox_=(value)
- protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable): Unit = actorRef.handleTrapExit(dead, reason)
- protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
- protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]): Unit = actorRef.restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
+ protected[akka] def handleTrapExit(dead: ActorRef, reason: Throwable) {
+ actorRef.handleTrapExit(dead, reason)
+ }
+ protected[akka] def restart(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
+ actorRef.restart(reason, maxNrOfRetries, withinTimeRange)
+ }
+ protected[akka] def restartLinkedActors(reason: Throwable, maxNrOfRetries: Option[Int], withinTimeRange: Option[Int]) {
+ actorRef.restartLinkedActors(reason, maxNrOfRetries, withinTimeRange)
+ }
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
index b4e5584ff3..0f7408be4b 100644
--- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala
@@ -54,7 +54,7 @@ object Router {
trait Router {
def connections: Map[InetSocketAddress, ActorRef]
- def route(message: Any)(implicit sender: Option[ActorRef]): Unit
+ def route(message: Any)(implicit sender: Option[ActorRef])
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T]
}
@@ -68,9 +68,10 @@ object Router {
connections.toList.map({ case (address, actor) => actor }).headOption
}
- def route(message: Any)(implicit sender: Option[ActorRef]): Unit =
+ def route(message: Any)(implicit sender: Option[ActorRef]) {
if (connection.isDefined) connection.get.!(message)(sender)
- else throw new RoutingException("No node connections for router")
+ else throw new RoutingException("No node connections for router")
+ }
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] =
if (connection.isDefined) connection.get.!!!(message, timeout)(sender)
@@ -83,9 +84,10 @@ object Router {
trait Random extends Router {
private val random = new java.util.Random(System.currentTimeMillis)
- def route(message: Any)(implicit sender: Option[ActorRef]): Unit =
+ def route(message: Any)(implicit sender: Option[ActorRef]) {
if (next.isDefined) next.get.!(message)(sender)
- else throw new RoutingException("No node connections for router")
+ else throw new RoutingException("No node connections for router")
+ }
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] =
if (next.isDefined) next.get.!!!(message, timeout)(sender)
@@ -107,9 +109,10 @@ object Router {
@volatile
private var current = items
- def route(message: Any)(implicit sender: Option[ActorRef]): Unit =
+ def route(message: Any)(implicit sender: Option[ActorRef]) {
if (next.isDefined) next.get.!(message)(sender)
- else throw new RoutingException("No node connections for router")
+ else throw new RoutingException("No node connections for router")
+ }
def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] =
if (next.isDefined) next.get.!!!(message, timeout)(sender)
diff --git a/akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
similarity index 64%
rename from akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala
rename to akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
index 8254d86872..3d0c466d20 100644
--- a/akka-cluster/src/main/scala/akka/cluster/replication/TransactionLog.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/TransactionLog.scala
@@ -1,7 +1,8 @@
+package akka.cluster
+
/**
* Copyright (C) 2009-2011 Scalable Solutions AB
*/
-package akka.cluster
import org.apache.bookkeeper.client.{BookKeeper, LedgerHandle, LedgerEntry, BKException, AsyncCallback}
import org.apache.zookeeper.CreateMode
@@ -19,27 +20,33 @@ import akka.cluster.zookeeper._
import java.util.Enumeration
-import scala.collection.JavaConversions._
-
// FIXME allow user to choose dynamically between 'async' and 'sync' tx logging (asyncAddEntry(byte[] data, AddCallback cb, Object ctx))
// FIXME clean up old entries in log after doing a snapshot
// FIXME clean up all meta-data in ZK for a specific UUID when the corresponding actor is shut down
// FIXME delete tx log after migration of actor has been made and create a new one
/**
+ * TODO: Improved documentation,
+ *
* @author Jonas Bonér
*/
class ReplicationException(message: String) extends AkkaException(message)
/**
+ * TODO: Improved documentation.
+ *
+ * TODO: Explain something about threadsafety.
+ *
+ * A TransactionLog makes chunks of data durable.
+ *
* @author Jonas Bonér
*/
-class TransactionLog private (
- ledger: LedgerHandle, val id: String, val isAsync: Boolean) {
+class TransactionLog private(ledger: LedgerHandle, val id: String, val isAsync: Boolean) {
+
import TransactionLog._
- val logId = ledger.getId
- val txLogPath = transactionLogNode + "/" + id
+ val logId = ledger.getId
+ val txLogPath = transactionLogNode + "/" + id
val snapshotPath = txLogPath + "/snapshot"
private val isOpen = new Switch(true)
@@ -47,60 +54,64 @@ class TransactionLog private (
/**
* TODO document method
*/
- def recordEntry(entry: Array[Byte]): Unit = if (isOpen.isOn) {
- try {
- if (isAsync) {
- ledger.asyncAddEntry(
- entry,
- new AsyncCallback.AddCallback {
- def addComplete(
- returnCode: Int,
- ledgerHandle: LedgerHandle,
- entryId: Long,
- ctx: AnyRef) {
- handleReturnCode(returnCode)
- EventHandler.debug(this,
- "Writing entry [%s] to log [%s]".format(entryId, logId))
- }
- },
- null)
- } else {
- handleReturnCode(ledger.addEntry(entry))
- val entryId = ledger.getLastAddPushed
- EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
+ def recordEntry(entry: Array[Byte]) {
+ if (isOpen.isOn) {
+ try {
+ if (isAsync) {
+ ledger.asyncAddEntry(
+ entry,
+ new AsyncCallback.AddCallback {
+ def addComplete(
+ returnCode: Int,
+ ledgerHandle: LedgerHandle,
+ entryId: Long,
+ ctx: AnyRef) {
+ handleReturnCode(returnCode)
+ EventHandler.debug(this,
+ "Writing entry [%s] to log [%s]".format(entryId, logId))
+ }
+ },
+ null)
+ } else {
+ handleReturnCode(ledger.addEntry(entry))
+ val entryId = ledger.getLastAddPushed
+ EventHandler.debug(this, "Writing entry [%s] to log [%s]".format(entryId, logId))
+ }
+ } catch {
+ case e => handleError(e)
}
- } catch {
- case e => handleError(e)
- }
- } else transactionClosedError
+ } else transactionClosedError
+ }
/**
* TODO document method
*/
- def recordSnapshot(snapshot: Array[Byte]): Unit = if (isOpen.isOn) {
- try {
- if (isAsync) {
- ledger.asyncAddEntry(
- snapshot,
- new AsyncCallback.AddCallback {
- def addComplete(
- returnCode: Int,
- ledgerHandle: LedgerHandle,
- entryId: Long,
- ctx: AnyRef) {
- handleReturnCode(returnCode)
- storeSnapshotMetaDataInZooKeeper(entryId)
- }
- },
- null)
- } else {
- handleReturnCode(ledger.addEntry(snapshot))
- storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed)
+ def recordSnapshot(snapshot: Array[Byte]) {
+ if (isOpen.isOn) {
+ try {
+ if (isAsync) {
+ ledger.asyncAddEntry(
+ snapshot,
+ new AsyncCallback.AddCallback {
+ def addComplete(
+ returnCode: Int,
+ ledgerHandle: LedgerHandle,
+ entryId: Long,
+ ctx: AnyRef) {
+ handleReturnCode(returnCode)
+ storeSnapshotMetaDataInZooKeeper(entryId)
+ }
+ },
+ null)
+ } else {
+ handleReturnCode(ledger.addEntry(snapshot))
+ storeSnapshotMetaDataInZooKeeper(ledger.getLastAddPushed)
+ }
+ } catch {
+ case e => handleError(e)
}
- } catch {
- case e => handleError(e)
- }
- } else transactionClosedError
+ } else transactionClosedError
+ }
/**
* TODO document method
@@ -122,21 +133,22 @@ class TransactionLog private (
*/
def entriesInRange(from: Long, to: Long): Vector[Array[Byte]] = if (isOpen.isOn) {
try {
- if (from < 0) throw new IllegalArgumentException("'from' can't be negative [" + from + "]")
- if (to < 0) throw new IllegalArgumentException("'to' can't be negative [" + from + "]")
+ if (from < 0) throw new IllegalArgumentException("'from' can't be negative [" + from + "]")
+ if (to < 0) throw new IllegalArgumentException("'to' can't be negative [" + from + "]")
if (to < from) throw new IllegalArgumentException("'to' can't be smaller than 'from' [" + from + "," + to + "]")
EventHandler.debug(this,
"Reading entries [%s -> %s] for log [%s]".format(from, to, logId))
+
if (isAsync) {
val future = new DefaultCompletableFuture[Vector[Array[Byte]]](timeout)
ledger.asyncReadEntries(
from, to,
new AsyncCallback.ReadCallback {
def readComplete(
- returnCode: Int,
- ledgerHandle: LedgerHandle,
- enumeration: Enumeration[LedgerEntry],
- ctx: AnyRef) {
+ returnCode: Int,
+ ledgerHandle: LedgerHandle,
+ enumeration: Enumeration[LedgerEntry],
+ ctx: AnyRef) {
val future = ctx.asInstanceOf[CompletableFuture[Vector[Array[Byte]]]]
var entries = Vector[Array[Byte]]()
while (enumeration.hasMoreElements) {
@@ -179,7 +191,7 @@ class TransactionLog private (
case e: ZkNoNodeException =>
handleError(new ReplicationException(
"Transaction log for UUID [" + id +
- "] does not have a snapshot recorded in ZooKeeper"))
+ "] does not have a snapshot recorded in ZooKeeper"))
case e => handleError(e)
}
}
@@ -187,70 +199,76 @@ class TransactionLog private (
/**
* TODO document method
*/
- def delete(): Unit = if (isOpen.isOn) {
- EventHandler.debug(this, "Deleting transaction log [%s]".format(logId))
- try {
- if (isAsync) {
- bookieClient.asyncDeleteLedger(
- logId,
- new AsyncCallback.DeleteCallback {
- def deleteComplete(returnCode: Int, ctx: AnyRef) {
- handleReturnCode(returnCode)
- }
- },
- null)
- } else {
- bookieClient.deleteLedger(logId)
+ def delete() {
+ if (isOpen.isOn) {
+ EventHandler.debug(this, "Deleting transaction log [%s]".format(logId))
+ try {
+ if (isAsync) {
+ bookieClient.asyncDeleteLedger(
+ logId,
+ new AsyncCallback.DeleteCallback {
+ def deleteComplete(returnCode: Int, ctx: AnyRef) {
+ handleReturnCode(returnCode)
+ }
+ },
+ null)
+ } else {
+ bookieClient.deleteLedger(logId)
+ }
+ } catch {
+ case e => handleError(e)
}
- } catch {
- case e => handleError(e)
}
}
/**
* TODO document method
*/
- def close(): Unit = if (isOpen.switchOff) {
- EventHandler.debug(this, "Closing transaction log [%s]".format(logId))
- try {
- if (isAsync) {
- ledger.asyncClose(
- new AsyncCallback.CloseCallback {
- def closeComplete(
- returnCode: Int,
- ledgerHandle: LedgerHandle,
- ctx: AnyRef) {
- handleReturnCode(returnCode)
- }
- },
- null)
- } else {
- ledger.close
+ def close() {
+ if (isOpen.switchOff) {
+ EventHandler.debug(this, "Closing transaction log [%s]".format(logId))
+ try {
+ if (isAsync) {
+ ledger.asyncClose(
+ new AsyncCallback.CloseCallback {
+ def closeComplete(
+ returnCode: Int,
+ ledgerHandle: LedgerHandle,
+ ctx: AnyRef) {
+ handleReturnCode(returnCode)
+ }
+ },
+ null)
+ } else {
+ ledger.close()
+ }
+ } catch {
+ case e => handleError(e)
}
- } catch {
- case e => handleError(e)
}
}
- private def storeSnapshotMetaDataInZooKeeper(snapshotId: Long): Unit = if (isOpen.isOn) {
- try {
- zkClient.create(snapshotPath, null, CreateMode.PERSISTENT)
- } catch {
- case e: ZkNodeExistsException => {} // do nothing
- case e => handleError(e)
- }
+ private def storeSnapshotMetaDataInZooKeeper(snapshotId: Long) {
+ if (isOpen.isOn) {
+ try {
+ zkClient.create(snapshotPath, null, CreateMode.PERSISTENT)
+ } catch {
+ case e: ZkNodeExistsException => {} // do nothing
+ case e => handleError(e)
+ }
- try {
- zkClient.writeData(snapshotPath, snapshotId)
- } catch {
- case e =>
- handleError(new ReplicationException(
- "Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" +
- id +"]"))
- }
- EventHandler.debug(this,
- "Writing snapshot [%s] to log [%s]".format(snapshotId, logId))
- } else transactionClosedError
+ try {
+ zkClient.writeData(snapshotPath, snapshotId)
+ } catch {
+ case e =>
+ handleError(new ReplicationException(
+ "Could not store transaction log snapshot meta-data in ZooKeeper for UUID [" +
+ id + "]"))
+ }
+ EventHandler.debug(this,
+ "Writing snapshot [%s] to log [%s]".format(snapshotId, logId))
+ } else transactionClosedError
+ }
private def handleReturnCode(block: => Long) {
val code = block.toInt
@@ -261,7 +279,7 @@ class TransactionLog private (
private def transactionClosedError: Nothing = {
handleError(new ReplicationException(
"Transaction log [" + logId +
- "] is closed. You need to open up new a new one with 'TransactionLog.logFor(id)'"))
+ "] is closed. You need to open up new a new one with 'TransactionLog.logFor(id)'"))
}
}
@@ -272,14 +290,14 @@ object TransactionLog {
val digestType = config.getString("akka.cluster.replication.digest-type", "CRC32") match {
case "CRC32" => BookKeeper.DigestType.CRC32
- case "MAC" => BookKeeper.DigestType.MAC
+ case "MAC" => BookKeeper.DigestType.MAC
case unknown => throw new ConfigurationException(
- "akka.cluster.replication.digest-type is invalid [" + unknown + "]")
+ "akka.cluster.replication.digest-type is invalid [" + unknown + "]")
}
- val password = config.getString("akka.cluster.replication.password", "secret").getBytes("UTF-8")
+ val password = config.getString("akka.cluster.replication.password", "secret").getBytes("UTF-8")
val ensembleSize = config.getInt("akka.cluster.replication.ensemble-size", 3)
- val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2)
- val timeout = 5000 // FIXME make configurable
+ val quorumSize = config.getInt("akka.cluster.replication.quorum-size", 2)
+ val timeout = 5000 // FIXME make configurable
private[akka] val transactionLogNode = "/transaction-log-ids"
@@ -298,15 +316,15 @@ object TransactionLog {
zk.create(transactionLogNode, null, CreateMode.PERSISTENT)
} catch {
case e: ZkNodeExistsException => {} // do nothing
- case e => handleError(e)
+ case e => handleError(e)
}
EventHandler.info(this,
("Transaction log service started with" +
- "\n\tdigest type [%s]" +
- "\n\tensemble size [%s]" +
- "\n\tquorum size [%s]" +
- "\n\tlogging time out [%s]").format(
+ "\n\tdigest type [%s]" +
+ "\n\tensemble size [%s]" +
+ "\n\tquorum size [%s]" +
+ "\n\tlogging time out [%s]").format(
digestType,
ensembleSize,
quorumSize,
@@ -342,7 +360,7 @@ object TransactionLog {
val ledger = try {
if (zkClient.exists(txLogPath)) throw new ReplicationException(
- "Transaction log for UUID [" + id +"] already exists")
+ "Transaction log for UUID [" + id + "] already exists")
val future = new DefaultCompletableFuture[LedgerHandle](timeout)
if (isAsync) {
@@ -350,9 +368,9 @@ object TransactionLog {
ensembleSize, quorumSize, digestType, password,
new AsyncCallback.CreateCallback {
def createComplete(
- returnCode: Int,
- ledgerHandle: LedgerHandle,
- ctx: AnyRef) {
+ returnCode: Int,
+ ledgerHandle: LedgerHandle,
+ ctx: AnyRef) {
val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
@@ -377,7 +395,7 @@ object TransactionLog {
bookieClient.deleteLedger(logId) // clean up
handleError(new ReplicationException(
"Could not store transaction log [" + logId +
- "] meta-data in ZooKeeper for UUID [" + id +"]"))
+ "] meta-data in ZooKeeper for UUID [" + id + "]"))
}
EventHandler.info(this, "Created new transaction log [%s] for UUID [%s]".format(logId, id))
@@ -398,7 +416,7 @@ object TransactionLog {
} catch {
case e: ZkNoNodeException =>
handleError(new ReplicationException(
- "Transaction log for UUID [" + id +"] does not exist in ZooKeeper"))
+ "Transaction log for UUID [" + id + "] does not exist in ZooKeeper"))
case e => handleError(e)
}
@@ -409,9 +427,9 @@ object TransactionLog {
logId, digestType, password,
new AsyncCallback.OpenCallback {
def openComplete(
- returnCode: Int,
- ledgerHandle: LedgerHandle,
- ctx: AnyRef) {
+ returnCode: Int,
+ ledgerHandle: LedgerHandle,
+ ctx: AnyRef) {
val future = ctx.asInstanceOf[CompletableFuture[LedgerHandle]]
if (returnCode == BKException.Code.OK) future.completeWithResult(ledgerHandle)
else future.completeWithException(BKException.create(returnCode))
@@ -431,7 +449,7 @@ object TransactionLog {
private[akka] def await[T](future: CompletableFuture[T]): T = {
future.await
- if (future.result.isDefined) future.result.get
+ if (future.result.isDefined) future.result.get
else if (future.exception.isDefined) handleError(future.exception.get)
else handleError(new ReplicationException("No result from async read of entries for transaction log"))
}
@@ -458,8 +476,8 @@ object LocalBookKeeperEnsemble {
isRunning switchOn {
localBookKeeper = new LocalBookKeeper(TransactionLog.ensembleSize)
localBookKeeper.runZookeeper(port)
- localBookKeeper.initializeZookeper
- localBookKeeper.runBookies
+ localBookKeeper.initializeZookeper()
+ localBookKeeper.runBookies()
EventHandler.info(this, "LocalBookKeeperEnsemble started successfully")
}
}
@@ -473,9 +491,9 @@ object LocalBookKeeperEnsemble {
println("***************************** 1")
localBookKeeper.bs.foreach(_.shutdown()) // stop bookies
println("***************************** 2")
- localBookKeeper.zkc.close() // stop zk client
+ localBookKeeper.zkc.close() // stop zk client
println("***************************** 3")
- localBookKeeper.zks.shutdown() // stop zk server
+ localBookKeeper.zks.shutdown() // stop zk server
println("***************************** 4")
localBookKeeper.serverFactory.shutdown() // stop zk NIOServer
println("***************************** 5")
diff --git a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
index 42037707ac..7b787a0d84 100644
--- a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala
@@ -7,23 +7,28 @@ import org.I0Itec.zkclient._
import org.I0Itec.zkclient.serialize._
import org.I0Itec.zkclient.exception._
+/**
+ * todo: what is the purpose of this class?
+ */
class AkkaZkClient(zkServers: String,
- sessionTimeout: Int,
- connectionTimeout: Int,
- zkSerializer: ZkSerializer = new SerializableSerializer)
+ sessionTimeout: Int,
+ connectionTimeout: Int,
+ zkSerializer: ZkSerializer = new SerializableSerializer)
extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) {
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
def reconnect() {
- getEventLock.lock
+ val zkLock = getEventLock
+
+ zkLock.lock()
try {
- _connection.close
+ _connection.close()
_connection.connect(this)
} catch {
case e: InterruptedException => throw new ZkInterruptedException(e)
} finally {
- getEventLock.unlock
+ zkLock.unlock()
}
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZooKeeper.scala b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZooKeeper.scala
index 6095f72ec6..0c85ca505d 100644
--- a/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZooKeeper.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/zookeeper/AkkaZooKeeper.scala
@@ -23,10 +23,10 @@ object AkkaZooKeeper {
val zkServer = new ZkServer(
dataPath, logPath,
new IDefaultNameSpace() {
- def createDefaultNameSpace(zkClient: ZkClient) = {}
+ def createDefaultNameSpace(zkClient: ZkClient) {}
},
port, tickTime)
- zkServer.start
+ zkServer.start()
zkServer
}
}
diff --git a/akka-cluster/src/main/scala/akka/cluster/zookeeper/ZooKeeperBarrier.scala b/akka-cluster/src/main/scala/akka/cluster/zookeeper/ZooKeeperBarrier.scala
index 4f83cdd7b2..76878618c0 100644
--- a/akka-cluster/src/main/scala/akka/cluster/zookeeper/ZooKeeperBarrier.scala
+++ b/akka-cluster/src/main/scala/akka/cluster/zookeeper/ZooKeeperBarrier.scala
@@ -34,18 +34,21 @@ object ZooKeeperBarrier {
def apply(zkClient: ZkClient, cluster: String, name: String, node: String, count: Int, timeout: Duration) =
new ZooKeeperBarrier(zkClient, cluster + "-" + name, node, count, timeout)
- def ignore[E : Manifest](body: => Unit): Unit =
+ def ignore[E: Manifest](body: => Unit) {
try {
body
} catch {
case e if manifest[E].erasure.isAssignableFrom(e.getClass) => ()
}
+ }
}
/**
* Barrier based on Zookeeper barrier tutorial.
*/
-class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: Int, timeout: Duration) extends IZkChildListener {
+class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: Int, timeout: Duration)
+ extends IZkChildListener {
+
import ZooKeeperBarrier.{BarriersNode, ignore}
val barrier = BarriersNode + "/" + name
@@ -57,10 +60,10 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
ignore[ZkNodeExistsException](zkClient.createPersistent(BarriersNode))
ignore[ZkNodeExistsException](zkClient.createPersistent(barrier))
- def apply(body: => Unit) = {
+ def apply(body: => Unit) {
enter
body
- leave
+ leave()
}
def enter = {
@@ -75,7 +78,7 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
zkClient.subscribeChildChanges(barrier, this)
}
- def leave = {
+ def leave() {
zkClient.delete(entry)
exitBarrier.await(timeout.length, timeout.unit)
if (zkClient.countChildren(barrier) > 0) {
@@ -85,10 +88,10 @@ class ZooKeeperBarrier(zkClient: ZkClient, name: String, node: String, count: In
zkClient.unsubscribeChildChanges(barrier, this)
}
- def handleChildChange(path: String, children: JList[String]) = {
+ def handleChildChange(path: String, children: JList[String]) {
if (children.size <= 1) {
ignore[ZkNoNodeException](zkClient.delete(ready))
- exitBarrier.countDown
+ exitBarrier.countDown()
}
}
}