From e6fa55b3a8f1c9de0b51b322972066c6b4714131 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Wed, 25 May 2011 16:18:35 +0200 Subject: [PATCH] - Changed implementation of Actor.actorOf to work in the the new world of cluster.ref, cluster.use and cluster.store. - Changed semantics of replica config. Default replicas is now 0. Replica 1 means one copy of the actor is instantiated on another node. - Actor.remote.actorFor/Actor.remote.register is now separated and orthogonal from cluster implementation. - cluster.ref now creates and instantiates its replicas automatically, e.g. it can be created first and will then set up what it needs. - Added logging everywhere, better warning messages etc. - Each node now fetches the whole deployment configuration from the cluster on boot. - Added some config options to cluster MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jonas Bonér --- .../src/main/scala/akka/actor/Actor.scala | 126 +++++++++--------- .../src/main/scala/akka/actor/Deployer.scala | 10 +- .../remoteinterface/RemoteInterface.scala | 15 +++ .../src/main/scala/akka/cluster/Cluster.scala | 115 +++++++++------- .../scala/akka/cluster/ClusterActorRef.scala | 4 +- .../scala/akka/cluster/ClusterDeployer.scala | 124 +++++++++++------ .../src/main/scala/akka/cluster/Routing.scala | 32 ++++- .../akka/cluster/ClusterDeployerSpec.scala | 10 ++ .../RoundRobin1ReplicaMultiJvmNode1.conf} | 2 +- .../RoundRobin1ReplicaMultiJvmNode1.opts} | 0 .../RoundRobin1ReplicaMultiJvmNode2.conf} | 2 +- .../RoundRobin1ReplicaMultiJvmNode2.opts} | 0 .../RoundRobin1ReplicaMultiJvmSpec.scala} | 22 +-- .../remote/netty/NettyRemoteSupport.scala | 75 ++++++----- config/akka-reference.conf | 8 +- 15 files changed, 328 insertions(+), 217 deletions(-) rename akka-cluster/src/test/scala/akka/cluster/{store_actor/StoreActorMultiJvmNode1.conf => routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf} (78%) rename akka-cluster/src/test/scala/akka/cluster/{store_actor/StoreActorMultiJvmNode1.opts => routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.opts} (100%) rename akka-cluster/src/test/scala/akka/cluster/{store_actor/StoreActorMultiJvmNode2.conf => routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf} (78%) rename akka-cluster/src/test/scala/akka/cluster/{store_actor/StoreActorMultiJvmNode2.opts => routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.opts} (100%) rename akka-cluster/src/test/scala/akka/cluster/{store_actor/StoreActorMultiJvmSpec.scala => routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala} (76%) diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index a43b29158d..e2048f22cb 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -93,6 +93,15 @@ case class UnhandledMessageException(msg: Any, ref: ActorRef) extends Exception override def fillInStackTrace() = this //Don't waste cycles generating stack trace } +/** + * Classes for passing status back to the sender. + */ +object Status { + sealed trait Status extends Serializable + case object Success extends Status + case class Failure(cause: Throwable) extends Status +} + /** * Actor factory module with factory methods for creating various kinds of Actors. * @@ -221,18 +230,7 @@ object Actor extends ListenerManagement { * */ def actorOf[T <: Actor](clazz: Class[T], address: String): ActorRef = { - Address.validate(address) - val actorRefFactory = () ⇒ newLocalActorRef(clazz, address) - try { - Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) ⇒ actorRefFactory() // FIXME handle 'router' in 'Local' actors - case deploy ⇒ newClusterActorRef[T](actorRefFactory, address, deploy) - } - } catch { - case e: DeploymentException ⇒ - EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) - actorRefFactory() // if deployment fails, fall back to local actors - } + createActor(address, () ⇒ newLocalActorRef(clazz, address)) } /** @@ -274,18 +272,7 @@ object Actor extends ListenerManagement { * */ def actorOf[T <: Actor](creator: ⇒ T, address: String): ActorRef = { - Address.validate(address) - val actorRefFactory = () ⇒ new LocalActorRef(() ⇒ creator, address) - try { - Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) ⇒ actorRefFactory() // FIXME handle 'router' in 'Local' actors - case deploy ⇒ newClusterActorRef[T](actorRefFactory, address, deploy) - } - } catch { - case e: DeploymentException ⇒ - EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) - actorRefFactory() // if deployment fails, fall back to local actors - } + createActor(address, () ⇒ new LocalActorRef(() ⇒ creator, address)) } /** @@ -308,18 +295,7 @@ object Actor extends ListenerManagement { * JAVA API */ def actorOf[T <: Actor](creator: Creator[T], address: String): ActorRef = { - Address.validate(address) - val actorRefFactory = () ⇒ new LocalActorRef(() ⇒ creator.create, address) - try { - Deployer.deploymentFor(address) match { - case Deploy(_, router, _, Local) ⇒ actorRefFactory() // FIXME handle 'router' in 'Local' actors - case deploy ⇒ newClusterActorRef[T](actorRefFactory, address, deploy) - } - } catch { - case e: DeploymentException ⇒ - EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) - actorRefFactory() // if deployment fails, fall back to local actors - } + createActor(address, () ⇒ new LocalActorRef(() ⇒ creator.create, address)) } /** @@ -366,6 +342,24 @@ object Actor extends ListenerManagement { anyFuture.resultOrException }) + private[akka] def createActor(address: String, actorFactory: () ⇒ ActorRef): ActorRef = { + Address.validate(address) + registry.actorFor(address) match { // check if the actor for the address is already in the registry + case Some(actorRef) ⇒ actorRef // it is -> return it + case None ⇒ // it is not -> create it + try { + Deployer.deploymentFor(address) match { + case Deploy(_, router, _, Local) ⇒ actorFactory() // create a local actor + case deploy ⇒ newClusterActorRef(actorFactory, address, deploy) + } + } catch { + case e: DeploymentException ⇒ + EventHandler.error(e, this, "Look up deployment for address [%s] falling back to local actor." format address) + actorFactory() // if deployment fails, fall back to local actors + } + } + } + private[akka] def newLocalActorRef(clazz: Class[_ <: Actor], address: String): ActorRef = { new LocalActorRef(() ⇒ { import ReflectiveAccess.{ createInstance, noParams, noArgs } @@ -386,7 +380,7 @@ object Actor extends ListenerManagement { }, address) } - private def newClusterActorRef[T <: Actor](factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = { + private def newClusterActorRef(factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = { deploy match { case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒ @@ -396,43 +390,45 @@ object Actor extends ListenerManagement { val isHomeNode = DeploymentConfig.isHomeNode(home) val replicas = DeploymentConfig.replicaValueFor(replication) - if (isHomeNode) { // home node for clustered actor + def serializerErrorDueTo(reason: String) = + throw new akka.config.ConfigurationException( + "Could not create Serializer object [" + serializerClassName + + "] for serialization of actor [" + address + + "] since " + reason) - def serializerErrorDueTo(reason: String) = - throw new akka.config.ConfigurationException( - "Could not create Serializer object [" + serializerClassName + - "] for serialization of actor [" + address + - "] since " + reason) - - val serializer: Serializer = { - if ((serializerClassName eq null) || - (serializerClassName == "") || - (serializerClassName == Format.defaultSerializerName)) { - Format.Default - } else { - val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match { - case Right(clazz) ⇒ clazz - case Left(exception) ⇒ - val cause = exception match { - case i: InvocationTargetException ⇒ i.getTargetException - case _ ⇒ exception - } - serializerErrorDueTo(cause.toString) - } - val f = clazz.newInstance.asInstanceOf[AnyRef] - if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer] - else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") + val serializer: Serializer = { + if ((serializerClassName eq null) || + (serializerClassName == "") || + (serializerClassName == Format.defaultSerializerName)) { + Format.Default + } else { + val clazz: Class[_] = ReflectiveAccess.getClassFor(serializerClassName) match { + case Right(clazz) ⇒ clazz + case Left(exception) ⇒ + val cause = exception match { + case i: InvocationTargetException ⇒ i.getTargetException + case _ ⇒ exception + } + serializerErrorDueTo(cause.toString) } + val f = clazz.newInstance.asInstanceOf[AnyRef] + if (f.isInstanceOf[Serializer]) f.asInstanceOf[Serializer] + else serializerErrorDueTo("class must be of type [akka.serialization.Serializer]") } + } - if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added) - + if (isHomeNode) { // home node for clustered actor // home node, check out as LocalActorRef cluster .use(address, serializer) - .getOrElse(throw new ConfigurationException("Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) + .getOrElse(throw new ConfigurationException( + "Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) } else { + if (!cluster.isClustered(address)) { + cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added) + } + // Thread.sleep(5000) // remote node (not home node), check out as ClusterActorRef cluster.ref(address, DeploymentConfig.routerTypeFor(router)) } diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index a0090c76de..706cf29b6f 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -159,6 +159,10 @@ object Deployer { deployer } + def start() { + instance.toString + } + def shutdown() { instance.shutdown() } @@ -325,9 +329,9 @@ object Deployer { // -------------------------------- // akka.actor.deployment.
.clustered.replicas // -------------------------------- - val replicas = clusteredConfig.getAny("replicas", "1") match { + val replicas = clusteredConfig.getAny("replicas", "0") match { case "auto" ⇒ AutoReplicate - case "1" ⇒ NoReplicas + case "0" ⇒ NoReplicas case nrOfReplicas: String ⇒ try { Replicate(nrOfReplicas.toInt) @@ -335,7 +339,7 @@ object Deployer { case e: NumberFormatException ⇒ throw new ConfigurationException( "Config option [" + addressPath + - ".clustered.replicas] needs to be either [\"auto\"] or [1-N] - was [" + + ".clustered.replicas] needs to be either [\"auto\"] or [0-N] - was [" + nrOfReplicas + "]") } } diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 2c95946f12..addef862de 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -29,6 +29,20 @@ trait RemoteModule { private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map? private[akka] def actorsFactories: ConcurrentHashMap[String, () ⇒ ActorRef] // FIXME what to do wit actorsFactories map? + private[akka] def findActorByAddress(address: String): ActorRef = actors.get(address) + + private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid) + + private[akka] def findActorFactory(address: String): () ⇒ ActorRef = actorsFactories.get(address) + + private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = { + var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length)) + else findActorByAddress(address) + if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) + actorRefOrNull + } + + /* private[akka] def findActorByAddress(address: String): ActorRef = { val cachedActorRef = actors.get(address) if (cachedActorRef ne null) cachedActorRef @@ -71,6 +85,7 @@ trait RemoteModule { if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) actorRefOrNull } + */ } /** diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 0588e1c07b..5edf04b9fa 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -28,6 +28,7 @@ import akka.util._ import Helpers._ import akka.actor._ import Actor._ +import Status._ import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future } import akka.remoteinterface._ @@ -124,6 +125,8 @@ object Cluster { val maxTimeToWaitUntilConnected = Duration(config.getInt("akka.cluster.max-time-to-wait-until-connected", 30), TIME_UNIT).toMillis.toInt val shouldCompressData = config.getBool("akka.cluster.use-compression", false) val enableJMX = config.getBool("akka.enable-jmx", true) + val remoteDaemonAckTimeout = Duration(config.getInt("akka.cluster.remote-daemon-ack-timeout", 30), TIME_UNIT).toMillis.toInt + val excludeRefNodeInReplicaSet = config.getBool("akka.cluster.exclude-ref-node-in-replica-set", true) @volatile private var properties = Map.empty[String, String] @@ -518,7 +521,7 @@ class DefaultClusterNode private[akka] ( val uuid = actorRef.uuid EventHandler.debug(this, - "Clustering actor [%s] with UUID [%s]".format(actorRef.address, uuid)) + "Storing actor [%s] with UUID [%s] in cluster".format(actorRef.address, uuid)) val actorBytes = if (shouldCompressData) LZF.compress(toBinary(actorRef, serializeMailbox)(format)) else toBinary(actorRef)(format) @@ -565,12 +568,30 @@ class DefaultClusterNode private[akka] ( ignore[ZkNodeExistsException](zkClient.createPersistent("%s/%s".format(actorAddressToUuidsPathFor(actorRef.address), uuid))) } + import RemoteClusterDaemon._ val command = RemoteDaemonMessageProtocol.newBuilder .setMessageType(USE) .setActorUuid(uuidToUuidProtocol(uuid)) .build + replicaConnectionsForReplicationFactor(replicationFactor) foreach { connection ⇒ - connection ! command + (connection !! (command, remoteDaemonAckTimeout)) match { + + case Some(Success) ⇒ + EventHandler.debug(this, + "Replica for [%s] successfully created on [%s]" + .format(actorRef.address, connection)) + + case Some(Failure(cause)) ⇒ + EventHandler.error(cause, this, cause.toString) + throw cause + + case None ⇒ + val error = new ClusterException( + "Operation to instantiate replicas throughout the cluster timed out, cause of error unknow") + EventHandler.error(error, this, error.toString) + throw error + } } this @@ -596,7 +617,7 @@ class DefaultClusterNode private[akka] ( def remove(address: String): ClusterNode = { isConnected ifOn { EventHandler.debug(this, - "Removing actor(s) with ADDRESS [%s] from cluster".format(address)) + "Removing actor(s) with address [%s] from cluster".format(address)) uuidsForActorAddress(address) foreach (uuid ⇒ remove(uuid)) } this @@ -641,7 +662,8 @@ class DefaultClusterNode private[akka] ( actorUuidsForActorAddress(actorAddress) map { uuid ⇒ EventHandler.debug(this, - "Checking out actor with UUID [%s] to be used on node [%s]".format(uuid, nodeAddress.nodeName)) + "Checking out actor with UUID [%s] to be used on node [%s] as local actor" + .format(uuid, nodeAddress.nodeName)) ignore[ZkNodeExistsException](zkClient.createPersistent(actorAtNodePathFor(nodeAddress.nodeName, uuid), true)) ignore[ZkNodeExistsException](zkClient.createEphemeral(actorLocationsPathFor(uuid, nodeAddress))) @@ -750,34 +772,14 @@ class DefaultClusterNode private[akka] ( * Creates an ActorRef with a Router to a set of clustered actors. */ def ref(actorAddress: String, router: RouterType): ActorRef = if (isConnected.isOn) { - val addresses = addressesForActor(actorAddress) - EventHandler.debug(this, - "Creating cluster actor ref with router [%s] for actors [%s]".format(router, addresses.mkString(", "))) + "Checking out cluster actor ref with address [%s] and router [%s] connected to [\n\t%s]" + .format(actorAddress, router, addresses.mkString("\n\t"))) - def registerClusterActorRefForAddress(actorRef: ClusterActorRef, addresses: Array[(UUID, InetSocketAddress)]) { - addresses foreach { - case (_, address) ⇒ clusterActorRefs.put(address, actorRef) - } - } - - // FIXME remove? - def refByUuid(uuid: UUID): ActorRef = { - val actor = Router newRouter (router, addresses, uuidToString(uuid), Actor.TIMEOUT) - registerClusterActorRefForAddress(actor, addresses) - actor - } - - def refByAddress(actorAddress: String): ActorRef = { - //FIXME: unused uuids - val uuids = uuidsForActorAddress(actorAddress) - val actor = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT) - registerClusterActorRefForAddress(actor, addresses) - actor - } - - refByAddress(actorAddress).start() + val actorRef = Router newRouter (router, addresses, actorAddress, Actor.TIMEOUT) + addresses foreach { case (_, address) ⇒ clusterActorRefs.put(address, actorRef) } + actorRef.start() } else throw new ClusterException("Not connected to cluster") @@ -1103,7 +1105,8 @@ class DefaultClusterNode private[akka] ( new InetSocketAddress(hostname, port) } - private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] = uuidsForActorAddress(actorAddress) filter (_ ne null) + private def actorUuidsForActorAddress(actorAddress: String): Array[UUID] = + uuidsForActorAddress(actorAddress) filter (_ ne null) /** * Returns a random set with replica connections of size 'replicationFactor'. @@ -1113,7 +1116,7 @@ class DefaultClusterNode private[akka] ( var replicas = HashSet.empty[ActorRef] if (replicationFactor < 1) return replicas - connectToAllReplicas() + connectToAllMembershipNodesInCluster() val numberOfReplicas = replicaConnections.size val replicaConnectionsAsArray = replicaConnections.toList map { @@ -1138,10 +1141,14 @@ class DefaultClusterNode private[akka] ( /** * Connect to all available replicas unless already connected). */ - private def connectToAllReplicas() { + private def connectToAllMembershipNodesInCluster() { + val runOnThisNode = false // (node: String) ⇒ !excludeRefNodeInReplicaSet && node != Config.nodename membershipNodes foreach { node ⇒ - if (!replicaConnections.contains(node)) { + // if (runOnThisNode(node) && !replicaConnections.contains(node)) { // only connect to each replica once + if (!replicaConnections.contains(node)) { // only connect to each replica once val address = addressForNode(node) + EventHandler.debug(this, + "Connecting to replica with nodename [%s] and address [%s]".format(node, address)) val clusterDaemon = Actor.remote.actorFor(RemoteClusterDaemon.ADDRESS, address.getHostName, address.getPort) replicaConnections.put(node, (address, clusterDaemon)) } @@ -1453,6 +1460,8 @@ object RemoteClusterDaemon { val functionServerDispatcher = Dispatchers.newDispatcher("akka:cloud:cluster:function:server").build } +// FIXME supervise RemoteClusterDaemon + /** * @author Jonas Bonér */ @@ -1466,28 +1475,42 @@ class RemoteClusterDaemon(cluster: ClusterNode) extends Actor { def receive: Receive = { case message: RemoteDaemonMessageProtocol ⇒ EventHandler.debug(this, "Received command to RemoteClusterDaemon [%s]".format(message)) + message.getMessageType match { case USE ⇒ - if (message.hasActorUuid) { - val uuid = uuidProtocolToUuid(message.getActorUuid) - val address = cluster.actorAddressForUuid(uuid) - implicit val format: Serializer = cluster formatForActor address - val actors = cluster use address - } else if (message.hasActorAddress) { - val address = message.getActorAddress - implicit val format: Serializer = cluster formatForActor address - val actors = cluster use address - } else EventHandler.warning(this, - "None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]".format(message)) + try { + if (message.hasActorUuid) { + val uuid = uuidProtocolToUuid(message.getActorUuid) + val address = cluster.actorAddressForUuid(uuid) + implicit val format: Serializer = cluster formatForActor address + val actors = cluster use address + } else if (message.hasActorAddress) { + val address = message.getActorAddress + implicit val format: Serializer = cluster formatForActor address + val actors = cluster use address + } else { + EventHandler.warning(this, + "None of 'uuid', or 'address' is specified, ignoring remote cluster daemon command [%s]" + .format(message)) + } + self.reply(Success) + } catch { + case error ⇒ + self.reply(Failure(error)) + throw error + } case RELEASE ⇒ if (message.hasActorUuid) { cluster release cluster.actorAddressForUuid(uuidProtocolToUuid(message.getActorUuid)) } else if (message.hasActorAddress) { cluster release message.getActorAddress - } else EventHandler.warning(this, - "None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]".format(message)) + } else { + EventHandler.warning(this, + "None of 'uuid' or 'actorAddress'' is specified, ignoring remote cluster daemon command [%s]" + .format(message)) + } case START ⇒ cluster.start() diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index eba563946e..f107904892 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -26,7 +26,9 @@ class ClusterActorRef private[akka] ( extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef this: ClusterActorRef with Router.Router ⇒ - EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(actorAddress)) + EventHandler.debug(this, + "Creating a ClusterActorRef for actor with address [%s] with connections [\n\t%s]" + .format(actorAddress, inetSocketAddresses.mkString("\n\t"))) private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]]( (Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) { diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala index ceadbd8585..0378245a14 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDeployer.scala @@ -70,33 +70,6 @@ object ClusterDeployer { private val systemDeployments: List[Deploy] = Nil - private[akka] def init(deployments: List[Deploy]) { - isConnected switchOn { - baseNodes foreach { path ⇒ - try { - ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) - EventHandler.debug(this, "Created node [%s]".format(path)) - } catch { - case e ⇒ - val error = new DeploymentException(e.toString) - EventHandler.error(error, this) - throw error - } - } - - val allDeployments = deployments ::: systemDeployments - EventHandler.info(this, "Initializing cluster deployer") - if (deploymentLock.lock()) { - // try to be the one doing the clustered deployment - EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]") - allDeployments foreach (deploy(_)) // deploy - deploymentLock.unlock() // signal deployment complete - } else { - deploymentCompleted.await() // wait until deployment is completed - } - } - } - def shutdown() { isConnected switchOff { // undeploy all @@ -117,19 +90,95 @@ object ClusterDeployer { } } + def lookupDeploymentFor(address: String): Option[Deploy] = ensureRunning { + LocalDeployer.lookupDeploymentFor(address) match { // try local cache + case Some(deployment) ⇒ // in local cache + deployment + case None ⇒ // not in cache, check cluster + val deployment = + try { + Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy]) + } catch { + case e: ZkNoNodeException ⇒ None + case e: Exception ⇒ + EventHandler.warning(this, e.toString) + None + } + deployment foreach (LocalDeployer.deploy(_)) // cache it in local cache + deployment + } + } + + def fetchDeploymentsFromCluster: List[Deploy] = ensureRunning { + val addresses = + try { + zkClient.getChildren(deploymentPath).toList + } catch { + case e: ZkNoNodeException ⇒ List[String]() + } + val deployments = addresses map { address ⇒ + zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy] + } + EventHandler.info(this, "Fetched clustered deployments [\n\t%s\n]" format deployments.mkString("\n\t")) + deployments + } + + private[akka] def init(deployments: List[Deploy]) { + println("===============================================================") + println("------------ INIT 1") + isConnected switchOn { + EventHandler.info(this, "Initializing cluster deployer") + + baseNodes foreach { path ⇒ + try { + ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) + EventHandler.debug(this, "Created node [%s]".format(path)) + } catch { + case e ⇒ + val error = new DeploymentException(e.toString) + EventHandler.error(error, this) + throw error + } + } + + println("------------ INIT 2") + val allDeployments = deployments ::: systemDeployments + + // FIXME need to wrap in if (!deploymentDone) { .. } + + if (deploymentLock.lock()) { + println("------------ INIT 3") + // try to be the one doing the clustered deployment + EventHandler.info(this, "Deploying to cluster [\n" + allDeployments.mkString("\n\t") + "\n]") + + println("------------ INIT 4") + allDeployments foreach (deploy(_)) // deploy + println("------------ INIT 5") + + // FIXME need to set deployment done flag + + deploymentLock.unlock() // signal deployment complete + } else { + println("------------ INIT WAITING") + deploymentCompleted.await() // wait until deployment is completed by other "master" node + } + + println("------------ INIT 6") + // fetch clustered deployments and deploy them locally + fetchDeploymentsFromCluster foreach (LocalDeployer.deploy(_)) + } + } + private[akka] def deploy(deployment: Deploy) { ensureRunning { + LocalDeployer.deploy(deployment) deployment match { - case Deploy(_, _, _, Local) ⇒ // local deployment - LocalDeployer.deploy(deployment) - + case Deploy(_, _, _, Local) ⇒ {} // local deployment, do nothing here case _ ⇒ // cluster deployment val path = deploymentAddressPath.format(deployment.address) try { ignore[ZkNodeExistsException](zkClient.create(path, null, CreateMode.PERSISTENT)) zkClient.writeData(path, deployment) - - // FIXME trigger cluster-wide deploy action } catch { case e: NullPointerException ⇒ handleError(new DeploymentException("Could not store deployment data [" + deployment + "] in ZooKeeper since client session is closed")) @@ -140,17 +189,6 @@ object ClusterDeployer { } } - private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = ensureRunning { - try { - Some(zkClient.readData(deploymentAddressPath.format(address)).asInstanceOf[Deploy]) - } catch { - case e: ZkNoNodeException ⇒ None - case e: Exception ⇒ - EventHandler.warning(this, e.toString) - None - } - } - private def ensureRunning[T](body: ⇒ T): T = { if (isConnected.isOn) body else throw new IllegalStateException("ClusterDeployer is not running") diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index 7291774d19..1bde759ca6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -8,6 +8,7 @@ import Cluster._ import akka.actor._ import Actor._ import akka.dispatch.Future +import akka.event.EventHandler import akka.routing.{ RouterType, RoutingException } import RouterType._ @@ -52,22 +53,32 @@ object Router { trait BasicRouter extends Router { def route(message: Any)(implicit sender: Option[ActorRef]): Unit = next match { case Some(actor) ⇒ actor.!(message)(sender) - case _ ⇒ throw new RoutingException("No node connections for router") + case _ ⇒ throwNoConnectionsError() } def route[T](message: Any, timeout: Long)(implicit sender: Option[ActorRef]): Future[T] = next match { case Some(actor) ⇒ actor.!!!(message, timeout)(sender) - case _ ⇒ throw new RoutingException("No node connections for router") + case _ ⇒ throwNoConnectionsError() } protected def next: Option[ActorRef] + + private def throwNoConnectionsError() = { + val error = new RoutingException("No replica connections for router") + EventHandler.error(error, this, error.toString) + throw error + } } /** * @author Jonas Bonér */ trait Direct extends BasicRouter { - lazy val next: Option[ActorRef] = connections.values.headOption + lazy val next: Option[ActorRef] = { + val connection = connections.values.headOption + if (connection.isEmpty) EventHandler.warning(this, "Router has no replica connection") + connection + } } /** @@ -77,8 +88,10 @@ object Router { private val random = new java.util.Random(System.currentTimeMillis) def next: Option[ActorRef] = - if (connections.isEmpty) None - else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next) + if (connections.isEmpty) { + EventHandler.warning(this, "Router has no replica connections") + None + } else Some(connections.valuesIterator.drop(random.nextInt(connections.size)).next) } /** @@ -100,8 +113,13 @@ object Router { case xs ⇒ xs } - if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption - else findNext + if (newItems.isEmpty) { + EventHandler.warning(this, "Router has no replica connections") + None + } else { + if (current.compareAndSet(currentItems, newItems.tail)) newItems.headOption + else findNext + } } findNext diff --git a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala index c5f9addc36..3ab9acdb8a 100644 --- a/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/ClusterDeployerSpec.scala @@ -39,6 +39,16 @@ class ClusterDeployerSpec extends WordSpec with MustMatchers with BeforeAndAfter oldDeployment must equal(newDeployment.get) } } + + "be able to fetch deployments from ZooKeeper" in { + val deployments1 = Deployer.deploymentsInConfig + deployments1 must not equal (Nil) + ClusterDeployer.init(deployments1) + + val deployments2 = ClusterDeployer.fetchDeploymentsFromCluster + deployments2.size must equal(1) + deployments2.first must equal(deployments1.first) + } } override def beforeAll() { diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf similarity index 78% rename from akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf rename to akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf index b96297f0c4..3c9999f42c 100644 --- a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.conf @@ -1,5 +1,5 @@ akka.event-handler-level = "DEBUG" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" -akka.actor.deployment.service-hello.clustered.replicas = 2 +akka.actor.deployment.service-hello.clustered.replicas = 1 akka.actor.deployment.service-hello.clustered.stateless = on diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.opts similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.opts rename to akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode1.opts diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf similarity index 78% rename from akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf rename to akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf index 36795796c2..59aa6fddac 100644 --- a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.conf @@ -1,5 +1,5 @@ akka.event-handler-level = "DEBUG" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" -akka.actor.deployment.service-hello.clustered.replicas = 2 +akka.actor.deployment.service-hello.clustered.replicas = 1 akka.actor.deployment.service-hello.clustered.stateless = on \ No newline at end of file diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.opts similarity index 100% rename from akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.opts rename to akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmNode2.opts diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala similarity index 76% rename from akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala rename to akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala index a4b3489b41..1f6d60efc3 100644 --- a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/routing/roundrobin_1_replica/RoundRobin1ReplicaMultiJvmSpec.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2011 Scalable Solutions AB */ -package akka.cluster.store_actor +package akka.cluster.routing.roundrobin_1_replica import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers @@ -13,20 +13,19 @@ import akka.actor._ import Actor._ import akka.config.Config -object StoreActorMultiJvmSpec { +object RoundRobin1ReplicaMultiJvmSpec { val NrOfNodes = 2 class HelloWorld extends Actor with Serializable { def receive = { case "Hello" ⇒ - println("GOT HELLO on NODE: " + Config.nodename) self.reply("World from node [" + Config.nodename + "]") } } } -class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { - import StoreActorMultiJvmSpec._ +class RoundRobin1ReplicaMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndAfterAll { + import RoundRobin1ReplicaMultiJvmSpec._ "A cluster" must { @@ -40,13 +39,6 @@ class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA Cluster.barrier("start-node2", NrOfNodes) {} - Cluster.barrier("create-clustered-actor-node1", NrOfNodes) { - val hello = Actor.actorOf[HelloWorld]("service-hello") - hello must not equal (null) - hello.address must equal("service-hello") - hello.isInstanceOf[LocalActorRef] must be(true) - } - Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {} Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) {} @@ -64,8 +56,8 @@ class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA } } -class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers { - import StoreActorMultiJvmSpec._ +class RoundRobin1ReplicaMultiJvmNode2 extends WordSpec with MustMatchers { + import RoundRobin1ReplicaMultiJvmSpec._ "A cluster" must { @@ -79,8 +71,6 @@ class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers { Cluster.node.start() } - Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {} - var hello: ActorRef = null Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) { hello = Actor.actorOf[HelloWorld]("service-hello") diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 5aff8408eb..5ffc4b16f3 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -209,7 +209,7 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[Promise[T]]): Option[Promise[T]] = { if (isRunning) { - EventHandler.debug(this, "Sending remote message [%s]".format(request)) + EventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, request)) if (request.getOneWay) { try { @@ -550,16 +550,28 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with def optimizeLocalScoped_?() = optimizeLocal.get - protected[akka] def actorFor(actorAddress: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { - val inetSocketAddress = this.address + protected[akka] def actorFor( + actorAddress: String, + timeout: Long, + host: String, + port: Int, + loader: Option[ClassLoader]): ActorRef = { + + val homeInetSocketAddress = this.address if (optimizeLocalScoped_?) { - if ((host == inetSocketAddress.getAddress.getHostAddress || host == inetSocketAddress.getHostName) && port == inetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals? + if ((host == homeInetSocketAddress.getAddress.getHostAddress || + host == homeInetSocketAddress.getHostName) && + port == homeInetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals? val localRef = findActorByAddressOrUuid(actorAddress, actorAddress) if (localRef ne null) return localRef //Code significantly simpler with the return statement } } - RemoteActorRef(inetSocketAddress, actorAddress, timeout, loader) + val remoteInetSocketAddress = new InetSocketAddress(host, port) + EventHandler.debug(this, + "Creating RemoteActorRef with address [%s] connected to [%s]" + .format(actorAddress, remoteInetSocketAddress)) + RemoteActorRef(remoteInetSocketAddress, actorAddress, timeout, loader) } } @@ -832,7 +844,7 @@ class RemoteServerHandler( // stop all session actors for ( map ← Option(sessionActors.remove(event.getChannel)); - actor ← collectionAsScalaIterable(map.values)gddd + actor ← collectionAsScalaIterable(map.values) ) { try { actor ! PoisonPill } catch { case e: Exception ⇒ } } @@ -923,11 +935,27 @@ class RemoteServerHandler( } } - private def findSessionActor(id: String, channel: Channel): ActorRef = - sessionActors.get(channel) match { - case null ⇒ null - case map ⇒ map get id - } + /** + * Creates a new instance of the actor with name, uuid and timeout specified as arguments. + * + * If actor already created then just return it from the registry. + * + * Does not start the actor. + */ + private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { + val uuid = actorInfo.getUuid + val address = actorInfo.getAddress + + EventHandler.debug(this, + "Creating an remotely available actor for address [%s] on node [%s]" + .format(address, Config.nodename)) + + val actorRef = Actor.createActor(address, () ⇒ createSessionActor(actorInfo, channel)) + + if (actorRef eq null) throw new IllegalActorStateException( + "Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]") + actorRef + } /** * gets the actor from the session, or creates one if there is a factory for it @@ -950,29 +978,12 @@ class RemoteServerHandler( } } - /** - * Creates a new instance of the actor with name, uuid and timeout specified as arguments. - * - * If actor already created then just return it from the registry. - * - * Does not start the actor. - */ - private def createActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = { - val uuid = actorInfo.getUuid - val address = actorInfo.getAddress - - EventHandler.debug(this, "Creating an remotely available actor for address [%s] on node [%s]".format(address, Config.nodename)) - - val actorRef = server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match { - // the actor has not been registered globally. See if we have it in the session - case null ⇒ createSessionActor(actorInfo, channel) // FIXME now session scoped actors are disabled, how to introduce them? - case actorRef ⇒ actorRef + private def findSessionActor(id: String, channel: Channel): ActorRef = + sessionActors.get(channel) match { + case null ⇒ null + case map ⇒ map get id } - if (actorRef eq null) throw new IllegalActorStateException("Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]") - actorRef - } - private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = { val actorInfo = request.getActorInfo val messageBuilder = RemoteActorSerialization.createRemoteMessageProtocolBuilder( diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 40188f1dd8..f2385f19fe 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -52,9 +52,9 @@ akka { # available: "host:", "ip:" and "node:" # default is "host:localhost" replicas = 3 # number of actor replicas in the cluster - # available: integer above 0 (1-N) or the string "auto" for auto-scaling + # available: positivoe integer (0-N) or the string "auto" for auto-scaling # if "auto" is used then 'home' has no meaning - # default is '1'; + # default is '0', meaning no replicas; stateless = on # is the actor stateless or stateful # if turned 'on': actor is defined as stateless and can be load-balanced accordingly # if turned 'off' (or omitted): actor is defined as stateful which means replicatable through transaction log @@ -132,6 +132,10 @@ akka { session-timeout = 60 connection-timeout = 60 use-compression = off + remote-daemon-ack-timeout = 30 + exclude-ref-node-in-replica-set = on # should a replica be instantiated on the same node as the + # cluster reference to the actor + # default: on replication { digest-type = "MAC" # Options: CRC32 (cheap & unsafe), MAC (expensive & secure using password)