diff --git a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala index 9cd00f976a..d92b1a5a67 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/actor/DeployerSpec.scala @@ -12,12 +12,12 @@ class DeployerSpec extends WordSpec with MustMatchers { "A Deployer" must { "be able to parse 'akka.actor.deployment._' config elements" in { - val deployment = Deployer.lookupInConfig("service-pi") + val deployment = Deployer.lookupInConfig("service-ping") deployment must be('defined) deployment must equal(Some( Deploy( - "service-pi", - RoundRobin, + "service-ping", + LeastCPU, "akka.serialization.Format$Default$", Clustered( Node("test-1"), diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 7686af1479..8bf98a9bcd 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -386,7 +386,7 @@ object Actor extends ListenerManagement { case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒ ClusterModule.ensureEnabled() - if (Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") + if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") val hostname = home match { case Host(hostname) ⇒ hostname @@ -402,8 +402,6 @@ object Actor extends ListenerManagement { case NoReplicas() ⇒ 0 } - import ClusterModule.node - if (hostname == Config.hostname) { // home node for clustered actor def serializerErrorDueTo(reason: String) = @@ -429,12 +427,13 @@ object Actor extends ListenerManagement { } val f = clazz.newInstance.asInstanceOf[AnyRef] if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer] - else serializerErrorDueTo("class must be of type [akka.serialization.Serializer") + else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") } } - if (!node.isClustered(address)) node.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added) - node.use(address) + if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added) + + cluster.use(address, serializer) } else { val routerType = router match { @@ -451,7 +450,7 @@ object Actor extends ListenerManagement { case LeastMessages ⇒ RouterType.LeastMessages case LeastMessages() ⇒ RouterType.LeastMessages } - node.ref(address, routerType) + cluster.ref(address, routerType) } /* @@ -459,7 +458,7 @@ object Actor extends ListenerManagement { - How to define a single ClusterNode to use? Where should it be booted up? How should it be configured? - ClusterNode API and Actor.remote API should be made private[akka] - Rewrite ClusterSpec or remove it - - Actor.stop on home node (actor checked out with node.use(..)) should do node.remove(..) of actor + - Actor.stop on home node (actor checked out with cluster.use(..)) should do cluster.remove(..) of actor - Should we allow configuring of session-scoped remote actors? How? diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 62f34082da..79a768f85e 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -615,8 +615,7 @@ class LocalActorRef private[akka] (private[this] val actorFactory: () ⇒ Actor, currentMessage = null Actor.registry.unregister(this) - if (ClusterModule.isEnabled && isRemotingEnabled) - Actor.remote.unregister(this) + if (ClusterModule.isEnabled) Actor.remote.unregister(this) setActorSelfFields(actorInstance.get, null) } @@ -992,7 +991,8 @@ private[akka] case class RemoteActorRef private[akka] ( loader: Option[ClassLoader]) extends ActorRef with ScalaActorRef { - ensureRemotingEnabled() + ClusterModule.ensureEnabled() + timeout = _timeout // FIXME BAD, we should not have different ActorRefs diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index b808d19c59..d9e937487a 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -11,14 +11,14 @@ import java.util.concurrent.ConcurrentHashMap import akka.event.EventHandler import akka.actor.DeploymentConfig._ import akka.config.{ ConfigurationException, Config } -import akka.util.ReflectiveAccess +import akka.util.ReflectiveAccess._ import akka.serialization.Format import akka.AkkaException /** - * Programmatic deployment configuration classes. Most values have defaults and can be left out. - * - * todo: what does the concept Deploy + * Module holding the programmatic deployment configuration classes. + * Defines the deployment specification. + * Most values have defaults and can be left out. * * @author Jonas Bonér */ @@ -117,9 +117,9 @@ object Deployer { val defaultAddress = Host(Config.hostname) - lazy val instance: ReflectiveAccess.ClusterModule.ClusterDeployer = { + lazy val instance: ClusterModule.ClusterDeployer = { val deployer = - if (ReflectiveAccess.ClusterModule.isEnabled) ReflectiveAccess.ClusterModule.clusterDeployer + if (ClusterModule.isEnabled) ClusterModule.clusterDeployer else LocalDeployer deployer.init(deploymentsInConfig) deployer diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala index 4b1d7731fb..74200894ba 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteEventHandler.scala @@ -1,9 +1,9 @@ -package akka.remoteinterface - /** * Copyright (C) 2009-2011 Scalable Solutions AB */ +package akka.remoteinterface + import akka.actor.Actor import akka.event.EventHandler @@ -21,7 +21,7 @@ class RemoteEventHandler extends Actor { // client case RemoteClientError(cause, client, address) ⇒ EventHandler.error(cause, client, "RemoteClientError - Address[%s]" format address.toString) - case RemoteClientWriteFailed(request, cause, client, address) ⇒ EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(address.toString)) + case RemoteClientWriteFailed(request, cause, client, address) ⇒ EventHandler.error(cause, client, "RemoteClientWriteFailed - Request[%s] Address[%s]".format(request, address.toString)) case RemoteClientDisconnected(client, address) ⇒ EventHandler.info(client, "RemoteClientDisconnected - Address[%s]" format address.toString) case RemoteClientConnected(client, address) ⇒ EventHandler.info(client, "RemoteClientConnected - Address[%s]" format address.toString) case RemoteClientStarted(client, address) ⇒ EventHandler.info(client, "RemoteClientStarted - Address[%s]" format address.toString) diff --git a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala index b72d4f0239..3efdb67a63 100644 --- a/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/akka/util/ReflectiveAccess.scala @@ -23,12 +23,6 @@ object ReflectiveAccess { val loader = getClass.getClassLoader - lazy val isRemotingEnabled: Boolean = RemoteModule.isEnabled - lazy val isClusterEnabled: Boolean = ClusterModule.isEnabled - - def ensureClusterEnabled() { ClusterModule.ensureEnabled() } - def ensureRemotingEnabled() { RemoteModule.ensureEnabled() } - /** * Reflective access to the Cluster module. * @@ -88,7 +82,7 @@ object ReflectiveAccess { def remove(address: String) - def use(address: String): Array[ActorRef] + def use(address: String, format: Serializer): Array[ActorRef] def ref(address: String, router: RouterType): ActorRef def isClustered(address: String): Boolean diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 507af446bf..1eb0f624b3 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -741,8 +741,13 @@ class ClusterNode private[akka] ( * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available * for remote access through lookup by its UUID. */ - def use[T <: Actor](actorAddress: String)( - implicit format: Serializer = formatForActor(actorAddress)): Array[LocalActorRef] = if (isConnected.isOn) { + def use[T <: Actor](actorAddress: String): Array[LocalActorRef] = use(actorAddress, formatForActor(actorAddress)) + + /** + * Checks out an actor for use on this node, e.g. checked out as a 'LocalActorRef' but it makes it available + * for remote access through lookup by its UUID. + */ + def use[T <: Actor](actorAddress: String, format: Serializer): Array[LocalActorRef] = if (isConnected.isOn) { import akka.serialization.ActorSerialization._ @@ -1572,7 +1577,7 @@ trait ErrorHandler { * @author Jonas Bonér */ object RemoteClusterDaemon { - val ADDRESS = "akka-cluster-daemon" + val ADDRESS = "akka-cluster-daemon".intern // FIXME configure functionServerDispatcher to what? val functionServerDispatcher = Dispatchers.newExecutorBasedEventDrivenDispatcher("akka:cloud:cluster:function:server").build diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index 037e25627c..bc5c189a1c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -7,9 +7,10 @@ import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import org.I0Itec.zkclient._ import akka.actor._ +import Actor._ object ClusterDeployerSpec { - class Pi extends Actor { + class HelloWorld extends Actor with Serializable { def receive = { case "Hello" ⇒ self.reply("World") } @@ -24,8 +25,10 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter var zkServer: ZkServer = _ + // FIXME create multi-jvm test for ClusterDeployer to make sure that only one node can make the deployment and that all other nicely waits until he is done + "A ClusterDeployer" should { - "be able to deploy deployments in configuration file" in { + "be able to deploy deployments in akka.conf into ZooKeeper and then lookup the deployments by 'address'" in { val deployments = Deployer.deploymentsInConfig deployments must not equal (Nil) ClusterDeployer.init(deployments) @@ -37,12 +40,11 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter } } - /* - "be able to create an actor deployed using ClusterDeployer" in { - val pi = Actor.actorOf[Pi]("service-pi") - pi must not equal(null) + "be able to create an actor deployed using ClusterDeployer, add it to ZooKeeper and then check the actor out for use" in { + val pi = Actor.actorOf[HelloWorld]("service-hello") + pi must not equal (null) + pi.address must equal("service-hello") } -*/ } override def beforeAll() { diff --git a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala index 1faf71bffc..8e523f2c7f 100644 --- a/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala +++ b/akka-remote/src/main/scala/akka/remote/BootableRemoteActorService.scala @@ -8,9 +8,9 @@ import akka.actor.{ Actor, BootableActorLoaderService } import akka.util.{ ReflectiveAccess, Bootable } /** - * This bundle/service is responsible for booting up and shutting down the remote actors facility + * This bundle/service is responsible for booting up and shutting down the remote actors facility. *

- * It is used in Kernel + * It is used in Kernel. */ trait BootableRemoteActorService extends Bootable { self: BootableActorLoaderService ⇒ @@ -22,7 +22,7 @@ trait BootableRemoteActorService extends Bootable { def startRemoteService() { remoteServerThread.start() } abstract override def onLoad() { - if (ReflectiveAccess.isRemotingEnabled && RemoteServerSettings.isRemotingEnabled) { + if (ReflectiveAccess.ClusterModule.isEnabled && RemoteServerSettings.isRemotingEnabled) { startRemoteService() } super.onLoad() diff --git a/config/akka-reference.conf b/config/akka-reference.conf index e9741d78d0..efb7492e66 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -40,8 +40,8 @@ akka { # -- all configuration options -- # ------------------------------- - service-pi { # stateless actor with replication factor 3 and round-robin load-balancer - router = "round-robin" # routing (load-balance) scheme to use + service-ping { # stateless actor with replication factor 3 and round-robin load-balancer + router = "least-cpu" # routing (load-balance) scheme to use # available: "direct", "round-robin", "random", "least-cpu", "least-ram", "least-messages" # or: fully qualified class name of the router class # default is "direct"; @@ -62,34 +62,14 @@ akka { } } - # ---------------------------------- - # -- variations of using defaults -- - # ---------------------------------- + service-pong {} # local actor - service-ping-1 {} # local actor - - service-pong-2 { # stateful actor with replication factor 1 and default home address - clustered {} - } - - service-pong-2 { # stateless actor with replication factor 1 and routing 'direct' and default home address + service-hello { + router = "round-robin" clustered { - stateless = on - } - } - - service-pong-4 { # stateless autoscaled actor - router = "least-cpu" - clustered { - home = "ip:0.0.0.0" - replicas = "auto" - stateless = on - } - } - session-registry { # stateful, replicated actor with replication factor 3 - clustered { - home = "host:darkstar.lan" + home = "host:localhost" replicas = 3 + stateless = on } } }