Clustered deployment in ZooKeeper implemented. Read and write deployments from cluster test passing.

This commit is contained in:
Jonas Bonér 2011-05-03 21:04:45 +02:00
parent 13abf0592f
commit bd5cc53235
11 changed files with 335 additions and 158 deletions

View file

@ -14,7 +14,7 @@ class DeployerSpec extends WordSpec with MustMatchers {
"be able to parse 'akka.actor.deployment._' config elements" in { "be able to parse 'akka.actor.deployment._' config elements" in {
val deployment = Deployer.lookupInConfig("service-pi") val deployment = Deployer.lookupInConfig("service-pi")
deployment must be ('defined) deployment must be ('defined)
deployment must equal (Some(Deploy("service-pi", RoundRobin, Clustered(Home("darkstar", 8888), Replicate(3), Stateless)))) deployment must equal (Some(Deploy("service-pi", RoundRobin(), Clustered(Home("darkstar", 8888), Replicate(3), Stateless()))))
} }
} }
} }

View file

@ -15,6 +15,7 @@ import akka.AkkaException
import scala.reflect.BeanProperty import scala.reflect.BeanProperty
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.event.EventHandler
/** /**
* Life-cycle messages for the Actors * Life-cycle messages for the Actors
@ -200,52 +201,58 @@ object Actor extends ListenerManagement {
import DeploymentConfig._ import DeploymentConfig._
Address.validate(address) Address.validate(address)
Deployer.deploymentFor(address) match { try {
case Deploy(_, router, Local) => Deployer.deploymentFor(address) match {
// FIXME handle 'router' in 'Local' actors case Deploy(_, router, Local) =>
newLocalActorRef(clazz, address) // FIXME handle 'router' in 'Local' actors
newLocalActorRef(clazz, address)
case Deploy(_, router, Clustered(Home(hostname, port), replication , state)) => case Deploy(_, router, Clustered(Home(hostname, port), replication , state)) =>
sys.error("Clustered deployment not yet supported") sys.error("Clustered deployment not yet supported")
/* /*
if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
val remoteAddress = Actor.remote.address val remoteAddress = Actor.remote.address
if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) { if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) {
// home node for actor // home node for actor
if (!node.isClustered(address)) node.store(clazz, address) if (!node.isClustered(address)) node.store(clazz, address)
node.use(address).head node.use(address).head
} else { } else {
val router = val router =
node.ref(address, router) node.ref(address, router)
} }
*/ */
/* /*
2. Check Home(..) 2. Check Home(..)
a) If home is same as Actor.remote.address then: a) If home is same as Actor.remote.address then:
- check if actor is stored in ZK, if not; node.store(..) - check if actor is stored in ZK, if not; node.store(..)
- checkout actor using node.use(..) - checkout actor using node.use(..)
b) If not the same b) If not the same
- check out actor using node.ref(..) - check out actor using node.ref(..)
Misc stuff: Misc stuff:
- How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured?
- Deployer should: - Deployer should:
1. Check if deployment exists in ZK 1. Check if deployment exists in ZK
2. If not, upload it 2. If not, upload it
- ClusterNode API and Actor.remote API should be made private[akka] - ClusterNode API and Actor.remote API should be made private[akka]
- Rewrite ClusterSpec or remove it - Rewrite ClusterSpec or remove it
- Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor
- Should we allow configuring of session-scoped remote actors? How? - Should we allow configuring of session-scoped remote actors? How?
*/ */
RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor) RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor)
case invalid => throw new IllegalActorStateException( case invalid => throw new IllegalActorStateException(
"Could not create actor [" + clazz.getName + "Could not create actor [" + clazz.getName +
"] with address [" + address + "] with address [" + address +
"], not bound to a valid deployment scheme [" + invalid + "]") "], not bound to a valid deployment scheme [" + invalid + "]")
}
} catch {
case e: DeploymentException =>
EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address)
newLocalActorRef(clazz, address) // if deployment fails, fall back to local actors
} }
} }

View file

@ -106,18 +106,17 @@ object DeploymentConfig {
// --- Replication // --- Replication
// -------------------------------- // --------------------------------
sealed trait Replication sealed trait Replication
class ReplicationBase(factor: Int) extends Replication { case class Replicate(factor: Int) extends Replication {
if (factor < 1) throw new IllegalArgumentException("Replication factor can not be negative or zero") if (factor < 1) throw new IllegalArgumentException("Replication factor can not be negative or zero")
} }
case class Replicate(factor: Int) extends ReplicationBase(factor)
// For Java API // For Java API
case class AutoReplicate() extends Replication case class AutoReplicate() extends Replication
case class NoReplicas() extends ReplicationBase(1) case class NoReplicas() extends Replication
// For Scala API // For Scala API
case object AutoReplicate extends Replication case object AutoReplicate extends Replication
case object NoReplicas extends ReplicationBase(1) case object NoReplicas extends Replication
// -------------------------------- // --------------------------------
// --- State // --- State
@ -139,71 +138,66 @@ object DeploymentConfig {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Deployer { object Deployer {
lazy val useClusterDeployer = ReflectiveAccess.ClusterModule.isEnabled
lazy val cluster = ReflectiveAccess.ClusterModule.clusterDeployer val defaultAddress = Home("localhost", 2552) // FIXME allow configuring node-local default hostname and port
lazy val local = new LocalDeployer
lazy val instance: ReflectiveAccess.ClusterModule.ClusterDeployer = {
val deployer =
if (ReflectiveAccess.ClusterModule.isEnabled) ReflectiveAccess.ClusterModule.clusterDeployer
else LocalDeployer
deployer.init(deploymentsInConfig)
deployer
}
def shutdown() {
instance.shutdown()
}
def deploy(deployment: Deploy) { def deploy(deployment: Deploy) {
if (deployment eq null) throw new IllegalArgumentException("Deploy can not be null") if (deployment eq null) throw new IllegalArgumentException("Deploy can not be null")
val address = deployment.address val address = deployment.address
Address.validate(address) Address.validate(address)
if (useClusterDeployer) cluster.deploy(deployment) instance.deploy(deployment)
else local.deploy(deployment)
} }
def deploy(deployment: Seq[Deploy]) { def deploy(deployment: Seq[Deploy]) {
deployment foreach (deploy(_)) deployment foreach (deploy(_))
} }
private def deployLocally(deployment: Deploy) {
deployment match {
case Deploy(address, Direct, Clustered(Home(hostname, port), _, _)) =>
val currentRemoteServerAddress = Actor.remote.address
if (currentRemoteServerAddress.getHostName == hostname) { // are we on the right server?
if (currentRemoteServerAddress.getPort != port) throw new ConfigurationException(
"Remote server started on [" + hostname +
"] is started on port [" + currentRemoteServerAddress.getPort +
"] can not use deployment configuration [" + deployment +
"] due to invalid port [" + port + "]")
// FIXME how to handle registerPerSession
// Actor.remote.register(Actor.newLocalActorRef(address))
}
case Deploy(_, routing, Clustered(Home(hostname, port), replicas, state)) =>
// FIXME clustered actor deployment
case _ => // local deployment do nothing
}
}
/** /**
* Undeploy is idemponent. E.g. safe to invoke multiple times. * Undeploy is idemponent. E.g. safe to invoke multiple times.
*/ */
def undeploy(deployment: Deploy) { def undeploy(deployment: Deploy) {
if (useClusterDeployer) cluster.undeploy(deployment) instance.undeploy(deployment)
else local.undeploy(deployment)
} }
def undeployAll() { def undeployAll() {
if (useClusterDeployer) cluster.undeployAll() instance.undeployAll()
else local.undeployAll()
} }
def isLocal(deployment: Deploy): Boolean = deployment match {
case Deploy(_, _, Local) => true
case _ => false
}
def isClustered(deployment: Deploy): Boolean = isLocal(deployment)
def isLocal(address: String): Boolean = isLocal(deploymentFor(address))
def isClustered(address: String): Boolean = !isLocal(address)
/** /**
* Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound.
*/ */
def deploymentFor(address: String): Deploy = { private[akka] def deploymentFor(address: String): Deploy = {
lookupDeploymentFor(address) match { lookupDeploymentFor(address) match {
case Some(deployment) => deployment case Some(deployment) => deployment
case None => thrownNoDeploymentBoundException(address) case None => thrownNoDeploymentBoundException(address)
} }
} }
def lookupDeploymentFor(address: String): Option[Deploy] = { private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
val deployment_? = val deployment_? = instance.lookupDeploymentFor(address)
if (useClusterDeployer) cluster.lookupDeploymentFor(address)
else local.lookupDeploymentFor(address)
if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_? if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_?
else { else {
val newDeployment = val newDeployment =
@ -226,10 +220,28 @@ object Deployer {
} }
} }
private[akka] def deploymentsInConfig: List[Deploy] = {
for {
address <- addressesInConfig
deployment <- lookupInConfig(address)
} yield deployment
}
private[akka] def addressesInConfig: List[String] = {
val deploymentPath = "akka.actor.deployment"
Config.config.getSection(deploymentPath) match {
case None => Nil
case Some(addressConfig) =>
addressConfig.map.keySet
.map(path => path.substring(0, path.indexOf(".")))
.toSet.toList // toSet to force uniqueness
}
}
/** /**
* Lookup deployment in 'akka.conf' configuration file. * Lookup deployment in 'akka.conf' configuration file.
*/ */
def lookupInConfig(address: String): Option[Deploy] = { private[akka] def lookupInConfig(address: String): Option[Deploy] = {
// -------------------------------- // --------------------------------
// akka.actor.deployment.<address> // akka.actor.deployment.<address>
@ -273,6 +285,7 @@ object Deployer {
// akka.actor.deployment.<address>.clustered.home // akka.actor.deployment.<address>.clustered.home
// -------------------------------- // --------------------------------
val home = clusteredConfig.getListAny("home") match { val home = clusteredConfig.getListAny("home") match {
case Nil => defaultAddress
case List(hostname: String, port: String) => case List(hostname: String, port: String) =>
try { try {
Home(hostname, port.toInt) Home(hostname, port.toInt)
@ -285,14 +298,14 @@ object Deployer {
} }
case invalid => throw new ConfigurationException( case invalid => throw new ConfigurationException(
"Config option [" + addressPath + "Config option [" + addressPath +
".clustered.home] needs to be an arrayon format [\"hostname\", port] - was [" + ".clustered.home] needs to be an array on format [\"hostname\", port] - was [" +
invalid + "]") invalid + "]")
} }
// -------------------------------- // --------------------------------
// akka.actor.deployment.<address>.clustered.replicas // akka.actor.deployment.<address>.clustered.replicas
// -------------------------------- // --------------------------------
val replicas = clusteredConfig.getAny("replicas", 1) match { val replicas = clusteredConfig.getAny("replicas", "1") match {
case "auto" => AutoReplicate case "auto" => AutoReplicate
case "1" => NoReplicas case "1" => NoReplicas
case nrOfReplicas: String => case nrOfReplicas: String =>
@ -319,17 +332,6 @@ object Deployer {
} }
} }
def isLocal(deployment: Deploy): Boolean = deployment match {
case Deploy(_, _, Local) => true
case _ => false
}
def isClustered(deployment: Deploy): Boolean = isLocal(deployment)
def isLocal(address: String): Boolean = isLocal(deploymentFor(address))
def isClustered(address: String): Boolean = !isLocal(address)
private def throwDeploymentBoundException(deployment: Deploy): Nothing = { private def throwDeploymentBoundException(deployment: Deploy): Nothing = {
val e = new DeploymentAlreadyBoundException( val e = new DeploymentAlreadyBoundException(
"Address [" + deployment.address + "Address [" + deployment.address +
@ -344,31 +346,63 @@ object Deployer {
EventHandler.error(e, this, e.getMessage) EventHandler.error(e, this, e.getMessage)
throw e throw e
} }
private def deployLocally(deployment: Deploy) {
deployment match {
case Deploy(address, Direct, Clustered(Home(hostname, port), _, _)) =>
val currentRemoteServerAddress = Actor.remote.address
if (currentRemoteServerAddress.getHostName == hostname) { // are we on the right server?
if (currentRemoteServerAddress.getPort != port) throw new ConfigurationException(
"Remote server started on [" + hostname +
"] is started on port [" + currentRemoteServerAddress.getPort +
"] can not use deployment configuration [" + deployment +
"] due to invalid port [" + port + "]")
// FIXME how to handle registerPerSession
// Actor.remote.register(Actor.newLocalActorRef(address))
}
case Deploy(_, routing, Clustered(Home(hostname, port), replicas, state)) =>
// FIXME clustered actor deployment
case _ => // local deployment do nothing
}
}
} }
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class LocalDeployer { object LocalDeployer {
private val deployments = new ConcurrentHashMap[String, Deploy] private val deployments = new ConcurrentHashMap[String, Deploy]
def deploy(deployment: Deploy) { private[akka] def init(deployments: List[Deploy]) {
EventHandler.info(this, "Initializing local deployer")
EventHandler.info(this, "Deploying locally [\n" + deployments.mkString("\n\t") + "\n]")
deployments foreach (deploy(_)) // deploy
}
private[akka] def shutdown() {
undeployAll()
deployments.clear()
}
private[akka] def deploy(deployment: Deploy) {
if (deployments.putIfAbsent(deployment.address, deployment) != deployment) { if (deployments.putIfAbsent(deployment.address, deployment) != deployment) {
println("----- DEPLOYING " + deployment)
// FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown) // FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown)
// throwDeploymentBoundException(deployment) // throwDeploymentBoundException(deployment)
} }
} }
def undeploy(deployment: Deploy) { private[akka] def undeploy(deployment: Deploy) {
deployments.remove(deployment.address) deployments.remove(deployment.address)
} }
def undeployAll() { private[akka] def undeployAll() {
deployments.clear() deployments.clear()
} }
def lookupDeploymentFor(address: String): Option[Deploy] = { private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
val deployment = deployments.get(address) val deployment = deployments.get(address)
if (deployment eq null) None if (deployment eq null) None
else Some(deployment) else Some(deployment)

View file

@ -99,7 +99,31 @@ object EventHandler extends ListenerManagement {
} }
def start() { def start() {
info(this, "Starting up EventHandler") try {
val defaultListeners = config.getList("akka.event-handlers") match {
case Nil => "akka.event.EventHandler$DefaultListener" :: Nil
case listeners => listeners
}
defaultListeners foreach { listenerName =>
try {
ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz =>
val listener = Actor.actorOf(clazz, listenerName).start()
addListener(listener)
}
} catch {
case e: akka.actor.DeploymentAlreadyBoundException => // do nothing
case e: Exception =>
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]")
}
}
info(this, "Starting up EventHandler")
} catch {
case e: Exception =>
e.printStackTrace()
throw new ConfigurationException("Could not start Event Handler due to [" + e.toString + "]")
}
} }
/** /**
@ -216,23 +240,5 @@ object EventHandler extends ListenerManagement {
} }
} }
val defaultListeners = config.getList("akka.event-handlers") match {
case Nil => "akka.event.EventHandler$DefaultListener" :: Nil
case listeners => listeners
}
defaultListeners foreach { listenerName =>
try {
ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz =>
addListener(Actor.actorOf(clazz, listenerName).start)
}
} catch {
case e: akka.actor.DeploymentAlreadyBoundException => // do nothing
case e: Exception =>
throw new ConfigurationException(
"Event Handler specified in config can't be loaded [" + listenerName +
"] due to [" + e.toString + "]")
}
}
start() start()
} }

View file

@ -136,8 +136,8 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
lazy val eventHandler: ActorRef = { val eventHandler: ActorRef = {
val handler = Actor.actorOf[RemoteEventHandler].start() val handler = Actor.actorOf[RemoteEventHandler](classOf[RemoteEventHandler].getName).start()
// add the remote client and server listener that pipes the events to the event handler system // add the remote client and server listener that pipes the events to the event handler system
addListener(handler) addListener(handler)
handler handler
@ -146,8 +146,8 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule
def shutdown { def shutdown {
eventHandler.stop() eventHandler.stop()
removeListener(eventHandler) removeListener(eventHandler)
this.shutdownClientModule this.shutdownClientModule()
this.shutdownServerModule this.shutdownServerModule()
clear clear
} }

View file

@ -31,7 +31,7 @@ object Helpers {
case elem: T => Array(elem) case elem: T => Array(elem)
} }
def ignore[E : Manifest](body: => Unit): Unit = { def ignore[E : Manifest](body: => Unit) {
try { try {
body body
} }
@ -40,12 +40,12 @@ object Helpers {
} }
} }
def withPrintStackTraceOnError(body: => Unit) = { def withPrintStackTraceOnError(body: => Unit) {
try { try {
body body
} catch { } catch {
case e: Throwable => case e: Throwable =>
EventHandler.error(e, this, "") EventHandler.error(e, this, e.toString)
throw e throw e
} }
} }
@ -106,7 +106,7 @@ object Helpers {
class ResultOrError[R](result: R){ class ResultOrError[R](result: R){
private[this] var contents: Either[R, Throwable] = Left(result) private[this] var contents: Either[R, Throwable] = Left(result)
def update(value: => R) = { def update(value: => R) {
contents = try { contents = try {
Left(value) Left(value)
} catch { } catch {

View file

@ -72,6 +72,8 @@ object ReflectiveAccess {
} }
type ClusterDeployer = { type ClusterDeployer = {
def init(deployments: List[Deploy])
def shutdown()
def deploy(deployment: Deploy) def deploy(deployment: Deploy)
def undeploy(deployment: Deploy) def undeploy(deployment: Deploy)
def undeployAll() def undeployAll()
@ -184,6 +186,9 @@ object ReflectiveAccess {
ctor.setAccessible(true) ctor.setAccessible(true)
Some(ctor.newInstance(args: _*).asInstanceOf[T]) Some(ctor.newInstance(args: _*).asInstanceOf[T])
} catch { } catch {
case e: java.lang.reflect.InvocationTargetException =>
EventHandler.debug(this, e.getCause.toString)
None
case e: Exception => case e: Exception =>
EventHandler.debug(this, e.toString) EventHandler.debug(this, e.toString)
None None

View file

@ -117,6 +117,7 @@ object Cluster {
val UUID_PREFIX = "uuid:".intern val UUID_PREFIX = "uuid:".intern
// config options // config options
val name = config.getString("akka.cluster.name", "default")
val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181")
val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552) val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552)
val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt
@ -287,9 +288,6 @@ object Cluster {
*/ */
def startLocalCluster(dataPath: String, logPath: String, port: Int, tickTime: Int): ZkServer = { def startLocalCluster(dataPath: String, logPath: String, port: Int, tickTime: Int): ZkServer = {
try { try {
EventHandler.info(this,
"Starting local ZooKeeper server on\n\tport [%s]\n\tdata path [%s]\n\tlog path [%s]\n\ttick time [%s]"
.format(port, dataPath, logPath, tickTime))
val zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath, port, tickTime) val zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath, port, tickTime)
_zkServer.set(Some(zkServer)) _zkServer.set(Some(zkServer))
zkServer zkServer
@ -395,10 +393,6 @@ class ClusterNode private[akka] (
}, "akka.cluster.remoteClientLifeCycleListener").start }, "akka.cluster.remoteClientLifeCycleListener").start
val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start
import DeploymentConfig._
Deployer.deploy(Deploy(
RemoteClusterDaemon.ADDRESS, Direct,
Clustered(Home(nodeAddress.hostname, nodeAddress.port), NoReplicas, Stateless)))
val remoteService: RemoteSupport = { val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport val remote = new akka.remote.netty.NettyRemoteSupport

View file

@ -4,24 +4,37 @@
package akka.cluster package akka.cluster
import akka.actor.DeploymentConfig.Deploy import akka.actor.{DeploymentConfig, Deployer, DeploymentException}
import akka.actor.DeploymentException import DeploymentConfig._
import akka.event.EventHandler import akka.event.EventHandler
import akka.util.Switch import akka.util.Switch
import akka.util.Helpers._
import akka.cluster.zookeeper.AkkaZkClient import akka.cluster.zookeeper.AkkaZkClient
import org.apache.zookeeper.CreateMode import org.apache.zookeeper.CreateMode
import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener}
import scala.collection.JavaConversions.collectionAsScalaIterable import scala.collection.JavaConversions.collectionAsScalaIterable
import com.eaio.uuid.UUID
import java.util.concurrent.CountDownLatch
import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException}
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ClusterDeployer { object ClusterDeployer {
val deploymentPath = "deployment" val clusterName = Cluster.name
val deploymentAddressPath = deploymentPath + "/%s" val nodeName = new UUID().toString // FIXME how to configure node name? now using UUID
val clusterPath = "/%s" format clusterName
val clusterDeploymentLockPath = clusterPath + "/deployment-lock"
val deploymentPath = clusterPath + "/deployment"
val baseNodes = List(clusterPath, clusterDeploymentLockPath, deploymentPath)
val deploymentAddressPath = deploymentPath + "/%s"
private val isConnected = new Switch(false) private val isConnected = new Switch(false)
private val deploymentCompleted = new CountDownLatch(1)
private lazy val zkClient = { private lazy val zkClient = {
val zk = new AkkaZkClient( val zk = new AkkaZkClient(
@ -30,54 +43,115 @@ object ClusterDeployer {
Cluster.connectionTimeout, Cluster.connectionTimeout,
Cluster.defaultSerializer) Cluster.defaultSerializer)
EventHandler.info(this, "ClusterDeployer started") EventHandler.info(this, "ClusterDeployer started")
isConnected.switchOn
zk zk
} }
// FIXME invert dependency; let Cluster have an instance of ClusterDeployer instead
lazy val cluster = Cluster(NodeAddress(clusterName, nodeName))
private val clusterDeploymentLockListener = new LockListener {
def lockAcquired() {
EventHandler.debug(this, "Clustered deployment started")
}
def lockReleased() {
EventHandler.debug(this, "Clustered deployment completed")
deploymentCompleted.countDown()
}
}
private lazy val deploymentLock = new WriteLock(
zkClient.connection.getZookeeper, clusterDeploymentLockPath, null, clusterDeploymentLockListener) {
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
ownerIdField.setAccessible(true)
def leader: String = ownerIdField.get(this).asInstanceOf[String]
}
private val systemDeployments = List(
Deploy(
RemoteClusterDaemon.ADDRESS, Direct,
Clustered(Deployer.defaultAddress, NoReplicas, Stateless))
)
private[akka] def init(deployments: List[Deploy]) {
isConnected.switchOn {
baseNodes.foreach { path =>
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
} catch {
case e =>
val error = new DeploymentException(e.toString)
EventHandler.error(error, this)
throw error
}
}
val allDeployments = deployments ::: systemDeployments
EventHandler.info(this, "Initializing cluster deployer")
if (deploymentLock.lock()) { // try to be the one doing the clustered deployment
EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]")
allDeployments foreach (deploy(_)) // deploy
deploymentLock.unlock() // signal deployment complete
} else {
deploymentCompleted.await() // wait until deployment is completed
}
}
}
def shutdown() { def shutdown() {
isConnected switchOff { isConnected switchOff {
undeployAll()
zkClient.close() zkClient.close()
} }
} }
def deploy(deployment: Deploy) { private[akka] def deploy(deployment: Deploy) {
val path = deploymentAddressPath.format(deployment.address)
try { try {
val path = deploymentAddressPath.format(deployment.address) ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
zkClient.create(path, null, CreateMode.PERSISTENT)
zkClient.writeData(path, deployment) zkClient.writeData(path, deployment)
// FIXME trigger some deploy action? // FIXME trigger cluster-wide deploy action
} catch { } catch {
case e => handleError(new DeploymentException("Could store deployment data [" + deployment + "] in ZooKeeper due to: " + e)) case e: NullPointerException =>
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed"))
case e: Exception =>
handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e))
} }
} }
def undeploy(deployment: Deploy) { private[akka] def undeploy(deployment: Deploy) {
try { try {
zkClient.delete(deploymentAddressPath.format(deployment.address)) zkClient.delete(deploymentAddressPath.format(deployment.address))
// FIXME trigger some undeploy action? // FIXME trigger cluster-wide undeployment action
} catch { } catch {
case e => handleError(new DeploymentException("Could undeploy deployment [" + deployment + "] in ZooKeeper due to: " + e)) case e: Exception =>
handleError(new DeploymentException("Could not undeploy deployment [" + deployment + "] in ZooKeeper due to: " + e))
} }
} }
def undeployAll() { private[akka] def undeployAll() {
try { try {
for { for {
child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath)) child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
deployment <- lookupDeploymentFor(child) deployment <- lookupDeploymentFor(child)
} undeploy(deployment) } undeploy(deployment)
} catch { } catch {
case e => handleError(new DeploymentException("Could undeploy all deployment data in ZooKeeper due to: " + e)) case e: Exception =>
handleError(new DeploymentException("Could not undeploy all deployment data in ZooKeeper due to: " + e))
} }
} }
def lookupDeploymentFor(address: String): Option[Deploy] = { private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
try { try {
Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy]) Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy])
} catch { } catch {
case e: Exception => None case e: ZkNoNodeException => None
case e: Exception =>
EventHandler.warning(this, e.toString)
None
} }
} }

View file

@ -0,0 +1,50 @@
package akka.cluster
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.I0Itec.zkclient._
import akka.actor._
class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach {
val dataPath = "_akka_cluster/data"
val logPath = "_akka_cluster/log"
var zkServer: ZkServer = _
"A ClusterDeployer" should {
"be able to deploy deployments in configuration file" in {
val deployments = Deployer.deploymentsInConfig
deployments must not equal(Nil)
ClusterDeployer.init(deployments)
deployments map { oldDeployment =>
val newDeployment = ClusterDeployer.lookupDeploymentFor(oldDeployment.address)
newDeployment must be('defined)
oldDeployment must equal(newDeployment.get)
}
}
}
override def beforeAll() {
try {
zkServer = Cluster.startLocalCluster(dataPath, logPath)
Thread.sleep(5000)
} catch {
case e => e.printStackTrace()
}
}
override def beforeEach() {
Cluster.reset()
}
override def afterAll() {
Deployer.shutdown()
Cluster.shutdownLocalCluster()
Actor.registry.local.shutdownAll()
}
}

View file

@ -41,17 +41,23 @@ akka {
# ------------------------------- # -------------------------------
service-pi { # stateless actor with replication factor 3 and round-robin load-balancer service-pi { # stateless actor with replication factor 3 and round-robin load-balancer
router = "round-robin" # default is "direct"; router = "round-robin" # routing (load-balance) scheme to use
# available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" # available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages"
# or: fully qualified class name of the router class # or: fully qualified class name of the router class
# default is "direct";
clustered { # makes the actor available in the cluster registry clustered { # makes the actor available in the cluster registry
# if omitted: actor is defined as local non-clustered actor # default (if omitted) is local non-clustered actor
home = ["darkstar", 8888] # home address/node for clustered actor; if omitted then "localhost:2552" will be used home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor
replicas = 3 # default is 1; # available: "host:<hostname>", "ip:<ip address>" and "node:<node name>"
# default is "host:localhost"
replicas = 3 # number of actor replicas in the cluster
# available: integer above 0 (1-N) or the string "auto" for auto-scaling # available: integer above 0 (1-N) or the string "auto" for auto-scaling
stateless = on # default is 'off': # if "auto" is used then 'home' has no meaning
# default is '1';
stateless = on # is the actor stateless or stateful
# if turned 'on': actor is defined as stateless and can be load-balanced accordingly # if turned 'on': actor is defined as stateless and can be load-balanced accordingly
# if turned 'off' (or omitted): actor is defined as stateful which means replicatable through transaction log # if turned 'off' (or omitted): actor is defined as stateful which means replicatable through transaction log
# default is 'off'
} }
} }
@ -110,6 +116,7 @@ akka {
} }
cluster { cluster {
name = "test-cluster"
zookeeper-server-addresses = "localhost:2181" zookeeper-server-addresses = "localhost:2181"
remote-server-port = 2552 remote-server-port = 2552
max-time-to-wait-until-connected = 30 max-time-to-wait-until-connected = 30