diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index 592bfc4bc4..f2e30bcd91 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -14,7 +14,7 @@ class DeployerSpec extends WordSpec with MustMatchers { "be able to parse 'akka.actor.deployment._' config elements" in { val deployment = Deployer.lookupInConfig("service-pi") deployment must be ('defined) - deployment must equal (Some(Deploy("service-pi", RoundRobin, Clustered(Home("darkstar", 8888), Replicate(3), Stateless)))) + deployment must equal (Some(Deploy("service-pi", RoundRobin(), Clustered(Home("darkstar", 8888), Replicate(3), Stateless())))) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 31abf5400a..f4c613ea91 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -15,6 +15,7 @@ import akka.AkkaException import scala.reflect.BeanProperty import com.eaio.uuid.UUID +import akka.event.EventHandler /** * Life-cycle messages for the Actors @@ -200,52 +201,58 @@ object Actor extends ListenerManagement { import DeploymentConfig._ Address.validate(address) - Deployer.deploymentFor(address) match { - case Deploy(_, router, Local) => - // FIXME handle 'router' in 'Local' actors - newLocalActorRef(clazz, address) + try { + Deployer.deploymentFor(address) match { + case Deploy(_, router, Local) => + // FIXME handle 'router' in 'Local' actors + newLocalActorRef(clazz, address) - case Deploy(_, router, Clustered(Home(hostname, port), replication , state)) => - sys.error("Clustered deployment not yet supported") - /* - if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") - val remoteAddress = Actor.remote.address - if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) { - // home node for actor - if (!node.isClustered(address)) node.store(clazz, address) - node.use(address).head - } else { - val router = - node.ref(address, router) - } - */ - /* - 2. Check Home(..) - a) If home is same as Actor.remote.address then: - - check if actor is stored in ZK, if not; node.store(..) - - checkout actor using node.use(..) - b) If not the same - - check out actor using node.ref(..) + case Deploy(_, router, Clustered(Home(hostname, port), replication , state)) => + sys.error("Clustered deployment not yet supported") + /* + if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") + val remoteAddress = Actor.remote.address + if (remoteAddress.getHostName == hostname && remoteAddress.getPort == port) { + // home node for actor + if (!node.isClustered(address)) node.store(clazz, address) + node.use(address).head + } else { + val router = + node.ref(address, router) + } + */ + /* + 2. Check Home(..) + a) If home is same as Actor.remote.address then: + - check if actor is stored in ZK, if not; node.store(..) + - checkout actor using node.use(..) + b) If not the same + - check out actor using node.ref(..) - Misc stuff: - - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - - Deployer should: - 1. Check if deployment exists in ZK - 2. If not, upload it - - ClusterNode API and Actor.remote API should be made private[akka] - - Rewrite ClusterSpec or remove it - - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor - - Should we allow configuring of session-scoped remote actors? How? + Misc stuff: + - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? + - Deployer should: + 1. Check if deployment exists in ZK + 2. If not, upload it + - ClusterNode API and Actor.remote API should be made private[akka] + - Rewrite ClusterSpec or remove it + - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor + - Should we allow configuring of session-scoped remote actors? How? - */ + */ - RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor) + RemoteActorRef(address, Actor.TIMEOUT, None, ActorType.ScalaActor) - case invalid => throw new IllegalActorStateException( - "Could not create actor [" + clazz.getName + - "] with address [" + address + - "], not bound to a valid deployment scheme [" + invalid + "]") + case invalid => throw new IllegalActorStateException( + "Could not create actor [" + clazz.getName + + "] with address [" + address + + "], not bound to a valid deployment scheme [" + invalid + "]") + } + } catch { + case e: DeploymentException => + EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) + newLocalActorRef(clazz, address) // if deployment fails, fall back to local actors } } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 98ccfb51dd..3bde8ea4eb 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -106,18 +106,17 @@ object DeploymentConfig { // --- Replication // -------------------------------- sealed trait Replication - class ReplicationBase(factor: Int) extends Replication { + case class Replicate(factor: Int) extends Replication { if (factor < 1) throw new IllegalArgumentException("Replication factor can not be negative or zero") } - case class Replicate(factor: Int) extends ReplicationBase(factor) // For Java API case class AutoReplicate() extends Replication - case class NoReplicas() extends ReplicationBase(1) + case class NoReplicas() extends Replication // For Scala API case object AutoReplicate extends Replication - case object NoReplicas extends ReplicationBase(1) + case object NoReplicas extends Replication // -------------------------------- // --- State @@ -139,71 +138,66 @@ object DeploymentConfig { * @author Jonas Bonér */ object Deployer { - lazy val useClusterDeployer = ReflectiveAccess.ClusterModule.isEnabled - lazy val cluster = ReflectiveAccess.ClusterModule.clusterDeployer - lazy val local = new LocalDeployer + + val defaultAddress = Home("localhost", 2552) // FIXME allow configuring node-local default hostname and port + + lazy val instance: ReflectiveAccess.ClusterModule.ClusterDeployer = { + val deployer = + if (ReflectiveAccess.ClusterModule.isEnabled) ReflectiveAccess.ClusterModule.clusterDeployer + else LocalDeployer + deployer.init(deploymentsInConfig) + deployer + } + + def shutdown() { + instance.shutdown() + } def deploy(deployment: Deploy) { if (deployment eq null) throw new IllegalArgumentException("Deploy can not be null") val address = deployment.address Address.validate(address) - if (useClusterDeployer) cluster.deploy(deployment) - else local.deploy(deployment) + instance.deploy(deployment) } def deploy(deployment: Seq[Deploy]) { deployment foreach (deploy(_)) } - private def deployLocally(deployment: Deploy) { - deployment match { - case Deploy(address, Direct, Clustered(Home(hostname, port), _, _)) => - val currentRemoteServerAddress = Actor.remote.address - if (currentRemoteServerAddress.getHostName == hostname) { // are we on the right server? - if (currentRemoteServerAddress.getPort != port) throw new ConfigurationException( - "Remote server started on [" + hostname + - "] is started on port [" + currentRemoteServerAddress.getPort + - "] can not use deployment configuration [" + deployment + - "] due to invalid port [" + port + "]") - - // FIXME how to handle registerPerSession -// Actor.remote.register(Actor.newLocalActorRef(address)) - } - - case Deploy(_, routing, Clustered(Home(hostname, port), replicas, state)) => - // FIXME clustered actor deployment - - case _ => // local deployment do nothing - } - } - /** * Undeploy is idemponent. E.g. safe to invoke multiple times. */ def undeploy(deployment: Deploy) { - if (useClusterDeployer) cluster.undeploy(deployment) - else local.undeploy(deployment) + instance.undeploy(deployment) } def undeployAll() { - if (useClusterDeployer) cluster.undeployAll() - else local.undeployAll() + instance.undeployAll() } + def isLocal(deployment: Deploy): Boolean = deployment match { + case Deploy(_, _, Local) => true + case _ => false + } + + def isClustered(deployment: Deploy): Boolean = isLocal(deployment) + + def isLocal(address: String): Boolean = isLocal(deploymentFor(address)) + + def isClustered(address: String): Boolean = !isLocal(address) + /** * Same as 'lookupDeploymentFor' but throws an exception if no deployment is bound. */ - def deploymentFor(address: String): Deploy = { + private[akka] def deploymentFor(address: String): Deploy = { lookupDeploymentFor(address) match { case Some(deployment) => deployment case None => thrownNoDeploymentBoundException(address) } } - def lookupDeploymentFor(address: String): Option[Deploy] = { - val deployment_? = - if (useClusterDeployer) cluster.lookupDeploymentFor(address) - else local.lookupDeploymentFor(address) + private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = { + val deployment_? = instance.lookupDeploymentFor(address) if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_? else { val newDeployment = @@ -226,10 +220,28 @@ object Deployer { } } + private[akka] def deploymentsInConfig: List[Deploy] = { + for { + address <- addressesInConfig + deployment <- lookupInConfig(address) + } yield deployment + } + + private[akka] def addressesInConfig: List[String] = { + val deploymentPath = "akka.actor.deployment" + Config.config.getSection(deploymentPath) match { + case None => Nil + case Some(addressConfig) => + addressConfig.map.keySet + .map(path => path.substring(0, path.indexOf("."))) + .toSet.toList // toSet to force uniqueness + } + } + /** * Lookup deployment in 'akka.conf' configuration file. */ - def lookupInConfig(address: String): Option[Deploy] = { + private[akka] def lookupInConfig(address: String): Option[Deploy] = { // -------------------------------- // akka.actor.deployment.
@@ -273,6 +285,7 @@ object Deployer { // akka.actor.deployment..clustered.home // -------------------------------- val home = clusteredConfig.getListAny("home") match { + case Nil => defaultAddress case List(hostname: String, port: String) => try { Home(hostname, port.toInt) @@ -285,14 +298,14 @@ object Deployer { } case invalid => throw new ConfigurationException( "Config option [" + addressPath + - ".clustered.home] needs to be an arrayon format [\"hostname\", port] - was [" + + ".clustered.home] needs to be an array on format [\"hostname\", port] - was [" + invalid + "]") } // -------------------------------- // akka.actor.deployment..clustered.replicas // -------------------------------- - val replicas = clusteredConfig.getAny("replicas", 1) match { + val replicas = clusteredConfig.getAny("replicas", "1") match { case "auto" => AutoReplicate case "1" => NoReplicas case nrOfReplicas: String => @@ -319,17 +332,6 @@ object Deployer { } } - def isLocal(deployment: Deploy): Boolean = deployment match { - case Deploy(_, _, Local) => true - case _ => false - } - - def isClustered(deployment: Deploy): Boolean = isLocal(deployment) - - def isLocal(address: String): Boolean = isLocal(deploymentFor(address)) - - def isClustered(address: String): Boolean = !isLocal(address) - private def throwDeploymentBoundException(deployment: Deploy): Nothing = { val e = new DeploymentAlreadyBoundException( "Address [" + deployment.address + @@ -344,31 +346,63 @@ object Deployer { EventHandler.error(e, this, e.getMessage) throw e } + + private def deployLocally(deployment: Deploy) { + deployment match { + case Deploy(address, Direct, Clustered(Home(hostname, port), _, _)) => + val currentRemoteServerAddress = Actor.remote.address + if (currentRemoteServerAddress.getHostName == hostname) { // are we on the right server? + if (currentRemoteServerAddress.getPort != port) throw new ConfigurationException( + "Remote server started on [" + hostname + + "] is started on port [" + currentRemoteServerAddress.getPort + + "] can not use deployment configuration [" + deployment + + "] due to invalid port [" + port + "]") + + // FIXME how to handle registerPerSession +// Actor.remote.register(Actor.newLocalActorRef(address)) + } + + case Deploy(_, routing, Clustered(Home(hostname, port), replicas, state)) => + // FIXME clustered actor deployment + + case _ => // local deployment do nothing + } + } } /** * @author Jonas Bonér */ -class LocalDeployer { +object LocalDeployer { private val deployments = new ConcurrentHashMap[String, Deploy] - def deploy(deployment: Deploy) { + private[akka] def init(deployments: List[Deploy]) { + EventHandler.info(this, "Initializing local deployer") + EventHandler.info(this, "Deploying locally [\n" + deployments.mkString("\n\t") + "\n]") + deployments foreach (deploy(_)) // deploy + } + + private[akka] def shutdown() { + undeployAll() + deployments.clear() + } + + private[akka] def deploy(deployment: Deploy) { if (deployments.putIfAbsent(deployment.address, deployment) != deployment) { - println("----- DEPLOYING " + deployment) // FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown) // throwDeploymentBoundException(deployment) } } - def undeploy(deployment: Deploy) { + private[akka] def undeploy(deployment: Deploy) { deployments.remove(deployment.address) } - def undeployAll() { + private[akka] def undeployAll() { deployments.clear() } - def lookupDeploymentFor(address: String): Option[Deploy] = { + private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = { val deployment = deployments.get(address) if (deployment eq null) None else Some(deployment) diff --git a/akka-actor/src/main/scala/akka/event/EventHandler.scala b/akka-actor/src/main/scala/akka/event/EventHandler.scala index 12a88201c8..3fb44ec1aa 100644 --- a/akka-actor/src/main/scala/akka/event/EventHandler.scala +++ b/akka-actor/src/main/scala/akka/event/EventHandler.scala @@ -99,7 +99,31 @@ object EventHandler extends ListenerManagement { } def start() { - info(this, "Starting up EventHandler") + try { + val defaultListeners = config.getList("akka.event-handlers") match { + case Nil => "akka.event.EventHandler$DefaultListener" :: Nil + case listeners => listeners + } + defaultListeners foreach { listenerName => + try { + ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz => + val listener = Actor.actorOf(clazz, listenerName).start() + addListener(listener) + } + } catch { + case e: akka.actor.DeploymentAlreadyBoundException => // do nothing + case e: Exception => + throw new ConfigurationException( + "Event Handler specified in config can't be loaded [" + listenerName + + "] due to [" + e.toString + "]") + } + } + info(this, "Starting up EventHandler") + } catch { + case e: Exception => + e.printStackTrace() + throw new ConfigurationException("Could not start Event Handler due to [" + e.toString + "]") + } } /** @@ -216,23 +240,5 @@ object EventHandler extends ListenerManagement { } } - val defaultListeners = config.getList("akka.event-handlers") match { - case Nil => "akka.event.EventHandler$DefaultListener" :: Nil - case listeners => listeners - } - defaultListeners foreach { listenerName => - try { - ReflectiveAccess.getClassFor[Actor](listenerName) map { clazz => - addListener(Actor.actorOf(clazz, listenerName).start) - } - } catch { - case e: akka.actor.DeploymentAlreadyBoundException => // do nothing - case e: Exception => - throw new ConfigurationException( - "Event Handler specified in config can't be loaded [" + listenerName + - "] due to [" + e.toString + "]") - } - } - start() } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 21ebbe2213..20104fdcdb 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -136,8 +136,8 @@ case class CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorExcept abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule { - lazy val eventHandler: ActorRef = { - val handler = Actor.actorOf[RemoteEventHandler].start() + val eventHandler: ActorRef = { + val handler = Actor.actorOf[RemoteEventHandler](classOf[RemoteEventHandler].getName).start() // add the remote client and server listener that pipes the events to the event handler system addListener(handler) handler @@ -146,8 +146,8 @@ abstract class RemoteSupport extends ListenerManagement with RemoteServerModule def shutdown { eventHandler.stop() removeListener(eventHandler) - this.shutdownClientModule - this.shutdownServerModule + this.shutdownClientModule() + this.shutdownServerModule() clear } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index 998df26885..405df37aac 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -31,7 +31,7 @@ object Helpers { case elem: T => Array(elem) } - def ignore[E : Manifest](body: => Unit): Unit = { + def ignore[E : Manifest](body: => Unit) { try { body } @@ -40,12 +40,12 @@ object Helpers { } } - def withPrintStackTraceOnError(body: => Unit) = { + def withPrintStackTraceOnError(body: => Unit) { try { body } catch { case e: Throwable => - EventHandler.error(e, this, "") + EventHandler.error(e, this, e.toString) throw e } } @@ -106,7 +106,7 @@ object Helpers { class ResultOrError[R](result: R){ private[this] var contents: Either[R, Throwable] = Left(result) - def update(value: => R) = { + def update(value: => R) { contents = try { Left(value) } catch { diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index 5e3f582fee..2c1c4baed1 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -72,6 +72,8 @@ object ReflectiveAccess { } type ClusterDeployer = { + def init(deployments: List[Deploy]) + def shutdown() def deploy(deployment: Deploy) def undeploy(deployment: Deploy) def undeployAll() @@ -184,6 +186,9 @@ object ReflectiveAccess { ctor.setAccessible(true) Some(ctor.newInstance(args: _*).asInstanceOf[T]) } catch { + case e: java.lang.reflect.InvocationTargetException => + EventHandler.debug(this, e.getCause.toString) + None case e: Exception => EventHandler.debug(this, e.toString) None diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index bab0ae5fb9..5af3e65e1a 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -117,6 +117,7 @@ object Cluster { val UUID_PREFIX = "uuid:".intern // config options + val name = config.getString("akka.cluster.name", "default") val zooKeeperServers = config.getString("akka.cluster.zookeeper-server-addresses", "localhost:2181") val remoteServerPort = config.getInt("akka.cluster.remote-server-port", 2552) val sessionTimeout = Duration(config.getInt("akka.cluster.session-timeout", 60), TIME_UNIT).toMillis.toInt @@ -287,9 +288,6 @@ object Cluster { */ def startLocalCluster(dataPath: String, logPath: String, port: Int, tickTime: Int): ZkServer = { try { - EventHandler.info(this, - "Starting local ZooKeeper server on\n\tport [%s]\n\tdata path [%s]\n\tlog path [%s]\n\ttick time [%s]" - .format(port, dataPath, logPath, tickTime)) val zkServer = AkkaZooKeeper.startLocalServer(dataPath, logPath, port, tickTime) _zkServer.set(Some(zkServer)) zkServer @@ -395,10 +393,6 @@ class ClusterNode private[akka] ( }, "akka.cluster.remoteClientLifeCycleListener").start val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start - import DeploymentConfig._ - Deployer.deploy(Deploy( - RemoteClusterDaemon.ADDRESS, Direct, - Clustered(Home(nodeAddress.hostname, nodeAddress.port), NoReplicas, Stateless))) val remoteService: RemoteSupport = { val remote = new akka.remote.netty.NettyRemoteSupport diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index 279c05bc3f..6eb438d536 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -4,24 +4,37 @@ package akka.cluster -import akka.actor.DeploymentConfig.Deploy -import akka.actor.DeploymentException +import akka.actor.{DeploymentConfig, Deployer, DeploymentException} +import DeploymentConfig._ import akka.event.EventHandler import akka.util.Switch +import akka.util.Helpers._ import akka.cluster.zookeeper.AkkaZkClient import org.apache.zookeeper.CreateMode +import org.apache.zookeeper.recipes.lock.{WriteLock, LockListener} import scala.collection.JavaConversions.collectionAsScalaIterable +import com.eaio.uuid.UUID + +import java.util.concurrent.CountDownLatch +import org.I0Itec.zkclient.exception.{ZkNoNodeException, ZkNodeExistsException} + /** * @author Jonas Bonér */ object ClusterDeployer { - val deploymentPath = "deployment" - val deploymentAddressPath = deploymentPath + "/%s" + val clusterName = Cluster.name + val nodeName = new UUID().toString // FIXME how to configure node name? now using UUID + val clusterPath = "/%s" format clusterName + val clusterDeploymentLockPath = clusterPath + "/deployment-lock" + val deploymentPath = clusterPath + "/deployment" + val baseNodes = List(clusterPath, clusterDeploymentLockPath, deploymentPath) + val deploymentAddressPath = deploymentPath + "/%s" - private val isConnected = new Switch(false) + private val isConnected = new Switch(false) + private val deploymentCompleted = new CountDownLatch(1) private lazy val zkClient = { val zk = new AkkaZkClient( @@ -30,54 +43,115 @@ object ClusterDeployer { Cluster.connectionTimeout, Cluster.defaultSerializer) EventHandler.info(this, "ClusterDeployer started") - isConnected.switchOn zk } + // FIXME invert dependency; let Cluster have an instance of ClusterDeployer instead + lazy val cluster = Cluster(NodeAddress(clusterName, nodeName)) + + private val clusterDeploymentLockListener = new LockListener { + def lockAcquired() { + EventHandler.debug(this, "Clustered deployment started") + } + + def lockReleased() { + EventHandler.debug(this, "Clustered deployment completed") + deploymentCompleted.countDown() + } + } + + private lazy val deploymentLock = new WriteLock( + zkClient.connection.getZookeeper, clusterDeploymentLockPath, null, clusterDeploymentLockListener) { + private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId") + ownerIdField.setAccessible(true) + def leader: String = ownerIdField.get(this).asInstanceOf[String] + } + + private val systemDeployments = List( + Deploy( + RemoteClusterDaemon.ADDRESS, Direct, + Clustered(Deployer.defaultAddress, NoReplicas, Stateless)) + ) + + private[akka] def init(deployments: List[Deploy]) { + isConnected.switchOn { + baseNodes.foreach { path => + try { + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) + EventHandler.debug(this, "Created node [%s]".format(path)) + } catch { + case e => + val error = new DeploymentException(e.toString) + EventHandler.error(error, this) + throw error + } + } + + val allDeployments = deployments ::: systemDeployments + EventHandler.info(this, "Initializing cluster deployer") + if (deploymentLock.lock()) { // try to be the one doing the clustered deployment + EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]") + allDeployments foreach (deploy(_)) // deploy + deploymentLock.unlock() // signal deployment complete + } else { + deploymentCompleted.await() // wait until deployment is completed + } + } + } + def shutdown() { isConnected switchOff { + undeployAll() zkClient.close() } } - def deploy(deployment: Deploy) { + private[akka] def deploy(deployment: Deploy) { + val path = deploymentAddressPath.format(deployment.address) try { - val path = deploymentAddressPath.format(deployment.address) - zkClient.create(path, null, CreateMode.PERSISTENT) + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) zkClient.writeData(path, deployment) - // FIXME trigger some deploy action? + // FIXME trigger cluster-wide deploy action } catch { - case e => handleError(new DeploymentException("Could store deployment data [" + deployment + "] in ZooKeeper due to: " + e)) + case e: NullPointerException => + handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed")) + case e: Exception => + handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper due to: " + e)) } } - def undeploy(deployment: Deploy) { + private[akka] def undeploy(deployment: Deploy) { try { zkClient.delete(deploymentAddressPath.format(deployment.address)) - // FIXME trigger some undeploy action? + // FIXME trigger cluster-wide undeployment action } catch { - case e => handleError(new DeploymentException("Could undeploy deployment [" + deployment + "] in ZooKeeper due to: " + e)) + case e: Exception => + handleError(new DeploymentException("Could not undeploy deployment [" + deployment + "] in ZooKeeper due to: " + e)) } } - def undeployAll() { + private[akka] def undeployAll() { try { for { child <- collectionAsScalaIterable(zkClient.getChildren(deploymentPath)) deployment <- lookupDeploymentFor(child) } undeploy(deployment) } catch { - case e => handleError(new DeploymentException("Could undeploy all deployment data in ZooKeeper due to: " + e)) + case e: Exception => + handleError(new DeploymentException("Could not undeploy all deployment data in ZooKeeper due to: " + e)) } } - def lookupDeploymentFor(address: String): Option[Deploy] = { + private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = { try { Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy]) } catch { - case e: Exception => None + case e: ZkNoNodeException => None + case e: Exception => + EventHandler.warning(this, e.toString) + None } } diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala new file mode 100644 index 0000000000..5faf61630d --- /dev/null +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -0,0 +1,50 @@ +package akka.cluster + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } + +import org.I0Itec.zkclient._ + +import akka.actor._ + +class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfterAll with BeforeAndAfterEach { + + val dataPath = "_akka_cluster/data" + val logPath = "_akka_cluster/log" + + var zkServer: ZkServer = _ + + "A ClusterDeployer" should { + "be able to deploy deployments in configuration file" in { + val deployments = Deployer.deploymentsInConfig + deployments must not equal(Nil) + ClusterDeployer.init(deployments) + + deployments map { oldDeployment => + val newDeployment = ClusterDeployer.lookupDeploymentFor(oldDeployment.address) + newDeployment must be('defined) + oldDeployment must equal(newDeployment.get) + } + } + } + + override def beforeAll() { + try { + zkServer = Cluster.startLocalCluster(dataPath, logPath) + Thread.sleep(5000) + } catch { + case e => e.printStackTrace() + } + } + + override def beforeEach() { + Cluster.reset() + } + + override def afterAll() { + Deployer.shutdown() + Cluster.shutdownLocalCluster() + Actor.registry.local.shutdownAll() + } +} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index bb3cdaa8a3..4f0c0dc8c5 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -41,17 +41,23 @@ akka { # ------------------------------- service-pi { # stateless actor with replication factor 3 and round-robin load-balancer - router = "round-robin" # default is "direct"; + router = "round-robin" # routing (load-balance) scheme to use # available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" # or: fully qualified class name of the router class + # default is "direct"; clustered { # makes the actor available in the cluster registry - # if omitted: actor is defined as local non-clustered actor - home = ["darkstar", 8888] # home address/node for clustered actor; if omitted then "localhost:2552" will be used - replicas = 3 # default is 1; + # default (if omitted) is local non-clustered actor + home = "node:test-1" # defines the hostname, IP-address or node name of the "home" node for clustered actor + # available: "host: