Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Viktor Klang 2011-05-20 20:47:27 +02:00
commit 476334bad7
11 changed files with 61 additions and 90 deletions

View file

@ -12,12 +12,12 @@ class DeployerSpec extends WordSpec with MustMatchers {
"A Deployer" must {
"be able to parse 'akka.actor.deployment._' config elements" in {
val deployment = Deployer.lookupInConfig("service-pi")
val deployment = Deployer.lookupInConfig("service-ping")
deployment must be('defined)
deployment must equal(Some(
Deploy(
"service-pi",
RoundRobin,
"service-ping",
LeastCPU,
"akka.serialization.Format$Default$",
Clustered(
Node("test-1"),

View file

@ -386,7 +386,7 @@ object Actor extends ListenerManagement {
case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State))
ClusterModule.ensureEnabled()
if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
val hostname = home match {
case Host(hostname) hostname
@ -402,8 +402,6 @@ object Actor extends ListenerManagement {
case NoReplicas() 0
}
import ClusterModule.node
if (hostname == Config.hostname) { // home node for clustered actor
def serializerErrorDueTo(reason: String) =
@ -429,12 +427,13 @@ object Actor extends ListenerManagement {
}
val f = clazz.newInstance.asInstanceOf[AnyRef]
if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer]
else serializerErrorDueTo("class must be of type [akka.serialization.Serializer")
else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]")
}
}
if (!node.isClustered(address)) node.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
node.use(address)
if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
cluster.use(address, serializer)
} else {
val routerType = router match {
@ -451,7 +450,7 @@ object Actor extends ListenerManagement {
case LeastMessages RouterType.LeastMessages
case LeastMessages() RouterType.LeastMessages
}
node.ref(address, routerType)
cluster.ref(address, routerType)
}
/*
@ -459,7 +458,7 @@ object Actor extends ListenerManagement {
- How to define a single ClusterNode to use? Where should it be booted up? How should it be configured?
- ClusterNode API and Actor.remote API should be made private[akka]
- Rewrite ClusterSpec or remove it
- Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor
- Actor.stop on home node (actor checked out with cluster.use(..)) should do cluster.remove(..) of actor
- Should we allow configuring of session-scoped remote actors? How?

View file

@ -615,8 +615,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor,
currentMessage = null
Actor.registry.unregister(this)
if (ClusterModule.isEnabled && isRemotingEnabled)
Actor.remote.unregister(this)
if (ClusterModule.isEnabled) Actor.remote.unregister(this)
setActorSelfFields(actorInstance.get, null)
}
@ -992,7 +991,8 @@ private[akka] case class RemoteActorRef private[akka] (
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
ensureRemotingEnabled()
ClusterModule.ensureEnabled()
timeout = _timeout
// FIXME BAD, we should not have different ActorRefs

View file

@ -11,14 +11,14 @@ import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
import akka.config.{ ConfigurationException, Config }
import akka.util.ReflectiveAccess
import akka.util.ReflectiveAccess._
import akka.serialization.Format
import akka.AkkaException
/**
* Programmatic deployment configuration classes. Most values have defaults and can be left out.
*
* todo: what does the concept Deploy
* Module holding the programmatic deployment configuration classes.
* Defines the deployment specification.
* Most values have defaults and can be left out.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -117,9 +117,9 @@ object Deployer {
val defaultAddress = Host(Config.hostname)
lazy val instance: ReflectiveAccess.ClusterModule.ClusterDeployer = {
lazy val instance: ClusterModule.ClusterDeployer = {
val deployer =
if (ReflectiveAccess.ClusterModule.isEnabled) ReflectiveAccess.ClusterModule.clusterDeployer
if (ClusterModule.isEnabled) ClusterModule.clusterDeployer
else LocalDeployer
deployer.init(deploymentsInConfig)
deployer

View file

@ -1,9 +1,9 @@
package akka.remoteinterface
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remoteinterface
import akka.actor.Actor
import akka.event.EventHandler
@ -21,7 +21,7 @@ class RemoteEventHandler extends Actor {
// client
case RemoteClientError(cause, client, address) EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString)
case RemoteClientWriteFailed(request, cause, client, address) EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString))
case RemoteClientWriteFailed(request, cause, client, address) EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(request, address.toString))
case RemoteClientDisconnected(client, address) EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString)
case RemoteClientConnected(client, address) EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString)
case RemoteClientStarted(client, address) EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString)

View file

@ -23,12 +23,6 @@ object ReflectiveAccess {
val loader = getClass.getClassLoader
lazy val isRemotingEnabled: Boolean = RemoteModule.isEnabled
lazy val isClusterEnabled: Boolean = ClusterModule.isEnabled
def ensureClusterEnabled() { ClusterModule.ensureEnabled() }
def ensureRemotingEnabled() { RemoteModule.ensureEnabled() }
/**
* Reflective access to the Cluster module.
*
@ -88,7 +82,7 @@ object ReflectiveAccess {
def remove(address: String)
def use(address: String): Array[ActorRef]
def use(address: String, format: Serializer): Array[ActorRef]
def ref(address: String, router: RouterType): ActorRef
def isClustered(address: String): Boolean

View file

@ -741,8 +741,13 @@ class ClusterNode private[akka] (
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String)(
implicit format: Serializer = formatForActor(actorAddress)): Array[LocalActorRef] = if (isConnected.isOn) {
def use[T <: Actor](actorAddress: String): Array[LocalActorRef] = use(actorAddress, formatForActor(actorAddress))
/**
* Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available
* for remote access through lookup by its UUID.
*/
def use[T <: Actor](actorAddress: String, format: Serializer): Array[LocalActorRef] = if (isConnected.isOn) {
import akka.serialization.ActorSerialization._
@ -1572,7 +1577,7 @@ trait ErrorHandler {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClusterDaemon {
val ADDRESS = "akka-cluster-daemon"
val ADDRESS = "akka-cluster-daemon".intern
// FIXME configure functionServerDispatcher to what?
val functionServerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("akka:cloud:cluster:function:server").build

View file

@ -43,13 +43,11 @@ object ClusterDeployer {
private val isConnected = new Switch(false)
private val deploymentCompleted = new CountDownLatch(1)
private val _zkClient = new AtomicReference[AkkaZkClient](null)
private def zkClient: AkkaZkClient = ensureRunning {
val zk = _zkClient.get
if (zk eq null) handleError(new IllegalStateException("No ZooKeeper client connection available"))
else zk
}
private val zkClient = new AkkaZkClient(
Cluster.zooKeeperServers,
Cluster.sessionTimeout,
Cluster.connectionTimeout,
Cluster.defaultSerializer)
private val clusterDeploymentLockListener = new LockListener {
def lockAcquired() {
@ -62,7 +60,7 @@ object ClusterDeployer {
}
}
private lazy val deploymentLock = new WriteLock(
private val deploymentLock = new WriteLock(
zkClient.connection.getZookeeper, clusterDeploymentLockPath, null, clusterDeploymentLockListener) {
private val ownerIdField = classOf[WriteLock].getDeclaredField("ownerId")
ownerIdField.setAccessible(true)
@ -74,13 +72,7 @@ object ClusterDeployer {
private[akka] def init(deployments: List[Deploy]) {
isConnected switchOn {
_zkClient.compareAndSet(null, new AkkaZkClient(
Cluster.zooKeeperServers,
Cluster.sessionTimeout,
Cluster.connectionTimeout,
Cluster.defaultSerializer))
baseNodes.foreach { path
baseNodes foreach { path
try {
ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT))
EventHandler.debug(this, "Created node [%s]".format(path))
@ -106,14 +98,13 @@ object ClusterDeployer {
}
def shutdown() {
val zk = zkClient
isConnected switchOff {
// undeploy all
try {
for {
child collectionAsScalaIterable(zk.getChildren(deploymentPath))
deployment zk.readData(deploymentAddressPath.format(child)).asInstanceOf[Deploy]
} zk.delete(deploymentAddressPath.format(deployment.address))
child collectionAsScalaIterable(zkClient.getChildren(deploymentPath))
deployment zkClient.readData(deploymentAddressPath.format(child)).asInstanceOf[Deploy]
} zkClient.delete(deploymentAddressPath.format(deployment.address))
} catch {
case e: Exception
@ -121,7 +112,7 @@ object ClusterDeployer {
}
// shut down ZooKeeper client
zk.close()
zkClient.close()
EventHandler.info(this, "ClusterDeployer shut down successfully")
}
}

View file

@ -7,9 +7,10 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.I0Itec.zkclient._
import akka.actor._
import Actor._
object ClusterDeployerSpec {
class Pi extends Actor {
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello" self.reply("World")
}
@ -24,8 +25,10 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
var zkServer: ZkServer = _
// FIXME create multi-jvm test for ClusterDeployer to make sure that only one node can make the deployment and that all other nicely waits until he is done
"A ClusterDeployer" should {
"be able to deploy deployments in configuration file" in {
"be able to deploy deployments in akka.conf into ZooKeeper and then lookup the deployments by 'address'" in {
val deployments = Deployer.deploymentsInConfig
deployments must not equal (Nil)
ClusterDeployer.init(deployments)
@ -37,12 +40,11 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter
}
}
/*
"be able to create an actor deployed using ClusterDeployer" in {
val pi = Actor.actorOf[Pi]("service-pi")
pi must not equal(null)
"be able to create an actor deployed using ClusterDeployer, add it to ZooKeeper and then check the actor out for use" in {
val pi = Actor.actorOf[HelloWorld]("service-hello")
pi must not equal (null)
pi.address must equal("service-hello")
}
*/
}
override def beforeAll() {

View file

@ -8,9 +8,9 @@ import akka.actor.{ Actor, BootableActorLoaderService }
import akka.util.{ ReflectiveAccess, Bootable }
/**
* This bundle/service is responsible for booting up and shutting down the remote actors facility
* This bundle/service is responsible for booting up and shutting down the remote actors facility.
* <p/>
* It is used in Kernel
* It is used in Kernel.
*/
trait BootableRemoteActorService extends Bootable {
self: BootableActorLoaderService
@ -22,7 +22,7 @@ trait BootableRemoteActorService extends Bootable {
def startRemoteService() { remoteServerThread.start() }
abstract override def onLoad() {
if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) {
if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) {
startRemoteService()
}
super.onLoad()

View file

@ -40,8 +40,8 @@ akka {
# -- all configuration options --
# -------------------------------
service-pi { # stateless actor with replication factor 3 and round-robin load-balancer
router = "round-robin" # routing (load-balance) scheme to use
service-ping { # stateless actor with replication factor 3 and round-robin load-balancer
router = "least-cpu" # routing (load-balance) scheme to use
# available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages"
# or: fully qualified class name of the router class
# default is "direct";
@ -62,34 +62,14 @@ akka {
}
}
# ----------------------------------
# -- variations of using defaults --
# ----------------------------------
service-pong {} # local actor
service-ping-1 {} # local actor
service-pong-2 { # stateful actor with replication factor 1 and default home address
clustered {}
}
service-pong-2 { # stateless actor with replication factor 1 and routing 'direct' and default home address
service-hello {
router = "round-robin"
clustered {
stateless = on
}
}
service-pong-4 { # stateless autoscaled actor
router = "least-cpu"
clustered {
home = "ip:0.0.0.0"
replicas = "auto"
stateless = on
}
}
session-registry { # stateful, replicated actor with replication factor 3
clustered {
home = "host:darkstar.lan"
home = "host:localhost"
replicas = 3
stateless = on
}
}
}