refactoring and minor edits

This commit is contained in:
Jonas Bonér 2011-05-30 10:53:25 +02:00
parent 6f1ff4efdb
commit 112ddefd7d
7 changed files with 122 additions and 92 deletions

View file

@ -159,9 +159,6 @@ object Actor extends ListenerManagement {
*/
private[akka] lazy val remote: RemoteSupport = cluster.remoteService
// start up a cluster node to join the ZooKeeper cluster
//if (ClusterModule.isEnabled) cluster.start()
/**
* Creates an ActorRef out of the Actor with type T.
* <pre>
@ -382,10 +379,13 @@ object Actor extends ListenerManagement {
private def newClusterActorRef(factory: () ActorRef, address: String, deploy: Deploy): ActorRef = {
deploy match {
case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State))
case Deploy(configAdress, router, serializerClassName, Clustered(home, replication: Replication, state: State))
ClusterModule.ensureEnabled()
if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
if (configAdress != address) throw new IllegalStateException(
"Deployment config for [" + address + "] is wrong [" + deploy + "]")
if (!Actor.remote.isRunning) throw new IllegalStateException(
"Remote server is not running")
val isHomeNode = DeploymentConfig.isHomeNode(home)
val replicas = DeploymentConfig.replicaValueFor(replication)
@ -417,15 +417,20 @@ object Actor extends ListenerManagement {
}
}
if (isHomeNode) { // home node for clustered actor
val isStateful = state match {
case Stateless false
case Stateful true
}
if (isStateful && isHomeNode) { // stateful actor's home node
cluster
.use(address, serializer)
.getOrElse(throw new ConfigurationException(
"Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
} else {
if (!cluster.isClustered(address)) {
cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
if (!cluster.isClustered(address)) { // add actor to cluster registry (if not already added)
cluster.store(factory().start(), replicas, false, serializer)
}
// remote node (not home node), check out as ClusterActorRef
@ -651,14 +656,21 @@ trait Actor {
private[akka] final def apply(msg: Any) = {
if (msg.isInstanceOf[AnyRef] && (msg.asInstanceOf[AnyRef] eq null))
throw new InvalidMessageException("Message from [" + self.sender + "] to [" + self.toString + "] is null")
val behaviorStack = self.hotswap
msg match {
case l: AutoReceivedMessage autoReceiveMessage(l)
case msg if behaviorStack.nonEmpty &&
behaviorStack.head.isDefinedAt(msg) behaviorStack.head.apply(msg)
case msg if behaviorStack.isEmpty &&
processingBehavior.isDefinedAt(msg) processingBehavior.apply(msg)
case unknown unhandled(unknown) //This is the only line that differs from processingbehavior
case l: AutoReceivedMessage
autoReceiveMessage(l)
case msg if behaviorStack.nonEmpty && behaviorStack.head.isDefinedAt(msg)
behaviorStack.head.apply(msg)
case msg if behaviorStack.isEmpty && processingBehavior.isDefinedAt(msg)
processingBehavior.apply(msg)
case unknown
unhandled(unknown) //This is the only line that differs from processingbehavior
}
}

View file

@ -45,7 +45,6 @@ private[actor] final class ActorRegistry private[actor] () extends ListenerManag
*/
def actorFor(address: String): Option[ActorRef] = {
if (actorsByAddress.containsKey(address)) Some(actorsByAddress.get(address))
// else if (isClusterEnabled) ClusterModule.node.use(address) // FIXME uncomment and fix
else None
}
@ -135,9 +134,9 @@ class LocalActorRegistry(
def shutdownAll() {
foreach(_.stop)
if (ClusterModule.isEnabled) Actor.remote.clear //FIXME: Should this be here?
actorsByAddress.clear
actorsByUuid.clear
typedActorsByUuid.clear
actorsByAddress.clear()
actorsByUuid.clear()
typedActorsByUuid.clear()
}
//============== ACTORS ==============

View file

@ -356,7 +356,7 @@ object Deployer {
}
}
private def throwDeploymentBoundException(deployment: Deploy): Nothing = {
private[akka] def throwDeploymentBoundException(deployment: Deploy): Nothing = {
val e = new DeploymentAlreadyBoundException(
"Address [" + deployment.address +
"] already bound to [" + deployment +
@ -365,7 +365,7 @@ object Deployer {
throw e
}
private def thrownNoDeploymentBoundException(address: String): Nothing = {
private[akka] def thrownNoDeploymentBoundException(address: String): Nothing = {
val e = new NoDeploymentBoundException("Address [" + address + "] is not bound to a deployment")
EventHandler.error(e, this, e.getMessage)
throw e
@ -392,8 +392,7 @@ object LocalDeployer {
private[akka] def deploy(deployment: Deploy) {
if (deployments.putIfAbsent(deployment.address, deployment) != deployment) {
// FIXME do automatic 'undeploy' and redeploy (perhaps have it configurable if redeploy should be done or exception thrown)
// throwDeploymentBoundException(deployment)
//Deployer.throwDeploymentBoundException(deployment) // FIXME uncomment this and fix the issue with multiple deployments
}
}

View file

@ -44,7 +44,7 @@ class SupervisorException private[akka] (message: String, cause: Throwable = nul
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Supervisor {
def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start
def apply(config: SupervisorConfig): Supervisor = SupervisorFactory(config).newInstance.start()
}
/**
@ -108,7 +108,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
def uuid = supervisor.uuid
def start: Supervisor = {
def start(): Supervisor = {
this
}

View file

@ -166,7 +166,7 @@ object ReflectiveAccess {
classloader: ClassLoader = loader): Either[Exception, T] = try {
assert(params ne null)
assert(args ne null)
getClassFor(fqn) match {
getClassFor(fqn, classloader) match {
case Right(value)
val ctor = value.getDeclaredConstructor(params: _*)
ctor.setAccessible(true)
@ -180,7 +180,7 @@ object ReflectiveAccess {
//Obtains a reference to fqn.MODULE$
def getObjectFor[T](fqn: String, classloader: ClassLoader = loader): Either[Exception, T] = try {
getClassFor(fqn) match {
getClassFor(fqn, classloader) match {
case Right(value)
val instance = value.getDeclaredField("MODULE$")
instance.setAccessible(true)

View file

@ -33,7 +33,8 @@ import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
import akka.routing.RouterType
import akka.config.Config
import akka.config.{ Config, Supervision }
import Supervision._
import Config._
import akka.serialization.{ Format, Serializers, Serializer, Compression }
import Compression.LZF
@ -225,7 +226,7 @@ object Cluster {
/**
* Creates a new AkkaZkClient.
*/
def newZkClient: AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
def newZkClient(): AkkaZkClient = new AkkaZkClient(zooKeeperServers, sessionTimeout, connectionTimeout, defaultSerializer)
def createQueue(rootPath: String, blocking: Boolean = true) = new ZooKeeperQueue(node.zkClient, rootPath, blocking)
@ -282,8 +283,18 @@ class DefaultClusterNode private[akka] (
case RemoteClientDisconnected(client, address) client.shutdownClientModule()
case _ //ignore other
}
}, "akka.cluster.remoteClientLifeCycleListener").start()
}, "akka.cluster.RemoteClientLifeCycleListener").start()
lazy val remoteDaemon = actorOf(new RemoteClusterDaemon(this), RemoteClusterDaemon.ADDRESS).start()
lazy val remoteDaemonSupervisor = Supervisor(
SupervisorConfig(
OneForOneStrategy(List(classOf[Exception]), Int.MaxValue, Int.MaxValue), // is infinite restart what we want?
Supervise(
remoteDaemon,
Permanent)
:: Nil))
lazy val remoteService: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport
remote.start(nodeAddress.hostname, nodeAddress.port)
@ -291,6 +302,7 @@ class DefaultClusterNode private[akka] (
remote.addListener(remoteClientLifeCycleListener)
remote
}
lazy val remoteServerAddress: InetSocketAddress = remoteService.address
// static nodes
@ -688,12 +700,12 @@ class DefaultClusterNode private[akka] (
// FIXME switch to ReplicatedActorRef here
// val actor = new ReplicatedActorRef(fromBinary[T](bytes, remoteServerAddress)(format))
val actor = fromBinary[T](bytes, remoteServerAddress)(format)
remoteService.register(UUID_PREFIX + uuid, actor) // clustered refs are always registered and looked up by UUID
// remoteService.register(UUID_PREFIX + uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
actor.start()
actor.asInstanceOf[LocalActorRef]
case Right(exception) throw exception
}
} headOption // FIXME should not be an array at all coming here
} headOption // FIXME should not be an array at all coming here but an Option[ActorRef]
} else None
/**
@ -1086,9 +1098,9 @@ class DefaultClusterNode private[akka] (
.format(nodeAddress.clusterName, nodeAddress.nodeName, nodeAddress.port, zkServerAddresses, serializer))
EventHandler.info(this, "Starting up remote server [%s]".format(remoteServerAddress.toString))
createRootClusterNode()
val isLeader = joinLeaderElection
val isLeader = joinLeaderElection()
if (isLeader) createNodeStructureIfNeeded()
registerListeners
registerListeners()
joinMembershipNode()
joinActorsAtAddressNode()
fetchMembershipChildrenNodes()
@ -1125,7 +1137,8 @@ class DefaultClusterNode private[akka] (
if (numberOfReplicas < replicationFactor) {
throw new IllegalArgumentException(
"Replication factor [" + replicationFactor + "] is greater than the number of available nodes [" + numberOfReplicas + "]")
"Replication factor [" + replicationFactor +
"] is greater than the number of available nodes [" + numberOfReplicas + "]")
} else if (numberOfReplicas == replicationFactor) {
replicas = replicas ++ replicaConnectionsAsArray
} else {
@ -1164,7 +1177,7 @@ class DefaultClusterNode private[akka] (
} catch {
case e: ZkNodeExistsException
val error = new ClusterException("Can't join the cluster. The node name [" + nodeAddress.nodeName + "] is already in by another node")
EventHandler.error(error, this, "")
EventHandler.error(error, this, error.toString)
throw error
}
}
@ -1173,7 +1186,7 @@ class DefaultClusterNode private[akka] (
ignore[ZkNodeExistsException](zkClient.createPersistent(actorsAtNodePathFor(nodeAddress.nodeName)))
}
private[cluster] def joinLeaderElection: Boolean = {
private[cluster] def joinLeaderElection(): Boolean = {
EventHandler.info(this, "Node [%s] is joining leader election".format(nodeAddress.nodeName))
leaderLock.lock
}
@ -1217,7 +1230,7 @@ class DefaultClusterNode private[akka] (
homeAddress.setAccessible(true)
homeAddress.set(actor, Some(remoteServerAddress))
remoteService.register(uuid, actor)
remoteService.register(uuid, actor) // FIXME is Actor.remote.register(UUID, ..) correct here?
}
}
@ -1302,7 +1315,7 @@ class DefaultClusterNode private[akka] (
}
}
private def registerListeners = {
private def registerListeners() = {
zkClient.subscribeStateChanges(stateListener)
zkClient.subscribeChildChanges(MEMBERSHIP_NODE, membershipListener)
}
@ -1456,8 +1469,8 @@ trait ErrorHandler {
object RemoteClusterDaemon {
val ADDRESS = "akka-cluster-daemon".intern
// FIXME configure functionServerDispatcher to what?
val functionServerDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:function:server").build
// FIXME configure computeGridDispatcher to what?
val computeGridDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:compute-grid").build
}
// FIXME supervise RemoteClusterDaemon
@ -1472,6 +1485,10 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
self.dispatcher = Dispatchers.newPinnedDispatcher(self)
override def preRestart(reason: Throwable) {
EventHandler.debug(this, "RemoteClusterDaemon failed due to [%s] restarting...".format(reason))
}
def receive: Receive = {
case message: RemoteDaemonMessageProtocol
EventHandler.debug(this, "Received command to RemoteClusterDaemon [%s]".format(message))
@ -1528,7 +1545,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case FUNCTION_FUN0_UNIT
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[Unit] try {
@ -1541,7 +1558,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case FUNCTION_FUN0_ANY
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
self.dispatcher = computeGridDispatcher
def receive = {
case f: Function0[Any] try {
@ -1554,7 +1571,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case FUNCTION_FUN1_ARG_UNIT
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[Any, Unit], param: Any) try {
@ -1567,7 +1584,7 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor {
case FUNCTION_FUN1_ARG_ANY
actorOf(new Actor() {
self.dispatcher = functionServerDispatcher
self.dispatcher = computeGridDispatcher
def receive = {
case (fun: Function[Any, Unit], param: Any) try {

View file

@ -1,9 +1,11 @@
####################
# Akka Config File #
####################
##############################
# Akka Reference Config File #
##############################
# This file has all the default settings, so all these could be removed with no visible effect.
# This the reference config file has all the default settings.
# All these could be removed with no visible effect.
# Modify as needed.
# This file is imported in the 'akka.conf' file. Make your edits/overrides there.
akka {
version = "2.0-SNAPSHOT" # Akka version, checked against the runtime version of Akka.
@ -132,19 +134,58 @@ akka {
session-timeout = 60
connection-timeout = 60
use-compression = off
remote-daemon-ack-timeout = 30
exclude-ref-node-in-replica-set = on # should a replica be instantiated on the same node as the
remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc.
exclude-ref-node-in-replica-set = on # Should a replica be instantiated on the same node as the
# cluster reference to the actor
# default: on
# Default: on
replication {
digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
password = "secret"
digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)
password = "secret" # FIXME: store open in file?
ensemble-size = 3
quorum-size = 2
}
}
remote {
# secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
secure-cookie = ""
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
layer = "akka.remote.netty.NettyRemoteSupport"
server {
# FIXME remove hostname/port
hostname = "localhost" # The hostname or IP that clients should connect to
port = 2552 # The port clients should connect to. Default is 2552 (AKKA)
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
connection-timeout = 1
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog
execution-pool-keepalive = 60 # Length in akka.time-unit how long core threads will be kept alive if idling
execution-pool-size = 16 # Size of the core pool of the remote execution unit
max-channel-memory-size = 0 # Maximum channel size, 0 for off
max-total-memory-size = 0 # Maximum total size of all channels, 0 for off
}
client {
buffering {
retry-message-send-on-failure = on
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
}
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576
reap-futures-delay = 5
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
}
}
stm {
fair = on # Should global transactions be fair or non-fair (non fair yield better performance)
max-retries = 1000
@ -199,42 +240,4 @@ akka {
expired-header-name = "Async-Timeout" # the name of the response header to use when an async request expires
expired-header-value = "expired" # the value of the response header to use when an async request expires
}
remote {
# secure-cookie = "050E0A0D0D06010A00000900040D060F0C09060B" # generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' or using 'Crypt.generateSecureCookie'
secure-cookie = ""
compression-scheme = "zlib" # Options: "zlib" (lzf to come), leave out for no compression
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
layer = "akka.remote.netty.NettyRemoteSupport"
server {
hostname = "localhost" # The hostname or IP that clients should connect to
port = 2552 # The port clients should connect to. Default is 2552 (AKKA)
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
connection-timeout = 1
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
backlog = 4096 # Sets the size of the connection backlog
execution-pool-keepalive = 60 # Length in akka.time-unit how long core threads will be kept alive if idling
execution-pool-size = 16 # Size of the core pool of the remote execution unit
max-channel-memory-size = 0 # Maximum channel size, 0 for off
max-total-memory-size = 0 # Maximum total size of all channels, 0 for off
}
client {
buffering {
retry-message-send-on-failure = on
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
# If positive then a bounded mailbox is used and the capacity is set using the property
}
reconnect-delay = 5
read-timeout = 10
message-frame-size = 1048576
reap-futures-delay = 5
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
}
}
}