diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index 8560e951b2..a43b29158d 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -389,23 +389,12 @@ object Actor extends ListenerManagement { private def newClusterActorRef[T <: Actor](factory: () ⇒ ActorRef, address: String, deploy: Deploy): ActorRef = { deploy match { case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State)) ⇒ - ClusterModule.ensureEnabled() + ClusterModule.ensureEnabled() if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running") - val isHomeNode = home match { - case Host(hostname) ⇒ hostname == Config.hostname - case IP(address) ⇒ address == "0.0.0.0" // FIXME checking if IP address is on home node is missing - case Node(nodename) ⇒ nodename == Config.nodename - } - - val replicas = replication match { - case Replicate(replicas) ⇒ replicas - case AutoReplicate ⇒ -1 - case AutoReplicate() ⇒ -1 - case NoReplicas ⇒ 0 - case NoReplicas() ⇒ 0 - } + val isHomeNode = DeploymentConfig.isHomeNode(home) + val replicas = DeploymentConfig.replicaValueFor(replication) if (isHomeNode) { // home node for clustered actor @@ -438,26 +427,14 @@ object Actor extends ListenerManagement { if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added) + // home node, check out as LocalActorRef cluster .use(address, serializer) .getOrElse(throw new ConfigurationException("Could not check out actor [" + address + "] from cluster registry as a \"local\" actor")) } else { - val routerType = router match { - case Direct ⇒ RouterType.Direct - case Direct() ⇒ RouterType.Direct - case RoundRobin ⇒ RouterType.RoundRobin - case RoundRobin() ⇒ RouterType.RoundRobin - case Random ⇒ RouterType.Random - case Random() ⇒ RouterType.Random - case LeastCPU ⇒ RouterType.LeastCPU - case LeastCPU() ⇒ RouterType.LeastCPU - case LeastRAM ⇒ RouterType.LeastRAM - case LeastRAM() ⇒ RouterType.LeastRAM - case LeastMessages ⇒ RouterType.LeastMessages - case LeastMessages() ⇒ RouterType.LeastMessages - } - cluster.ref(address, routerType) + // remote node (not home node), check out as ClusterActorRef + cluster.ref(address, DeploymentConfig.routerTypeFor(router)) } /* diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 8b3e25fc2e..c9efbb439f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -986,6 +986,7 @@ object RemoteActorSystemMessage { * @author Jonas Bonér */ private[akka] case class RemoteActorRef private[akka] ( + val remoteAddress: InetSocketAddress, val address: String, _timeout: Long, loader: Option[ClassLoader]) @@ -996,7 +997,7 @@ private[akka] case class RemoteActorRef private[akka] ( timeout = _timeout // FIXME BAD, we should not have different ActorRefs - + /* import DeploymentConfig._ val remoteAddress = Deployer.deploymentFor(address) match { case Deploy(_, _, _, Clustered(home, _, _)) ⇒ @@ -1011,7 +1012,7 @@ private[akka] case class RemoteActorRef private[akka] ( //throw new IllegalStateException( // "Actor with Address [" + address + "] is not bound to a Clustered Deployment") } - +*/ start() def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) { diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index 5879656ca3..a0090c76de 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap import akka.event.EventHandler import akka.actor.DeploymentConfig._ import akka.config.{ ConfigurationException, Config } +import akka.routing.RouterType import akka.util.ReflectiveAccess._ import akka.serialization.Format import akka.AkkaException @@ -106,6 +107,39 @@ object DeploymentConfig { // For Scala API case object Stateless extends State case object Stateful extends State + + // -------------------------------- + // --- Helper methods for parsing + // -------------------------------- + + def isHomeNode(home: Home): Boolean = home match { + case Host(hostname) ⇒ hostname == Config.hostname + case IP(address) ⇒ address == "0.0.0.0" // FIXME checking if IP address is on home node is missing + case Node(nodename) ⇒ nodename == Config.nodename + } + + def replicaValueFor(replication: Replication): Int = replication match { + case Replicate(replicas) ⇒ replicas + case AutoReplicate ⇒ -1 + case AutoReplicate() ⇒ -1 + case NoReplicas ⇒ 0 + case NoReplicas() ⇒ 0 + } + + def routerTypeFor(routing: Routing): RouterType = routing match { + case Direct ⇒ RouterType.Direct + case Direct() ⇒ RouterType.Direct + case RoundRobin ⇒ RouterType.RoundRobin + case RoundRobin() ⇒ RouterType.RoundRobin + case Random ⇒ RouterType.Random + case Random() ⇒ RouterType.Random + case LeastCPU ⇒ RouterType.LeastCPU + case LeastCPU() ⇒ RouterType.LeastCPU + case LeastRAM ⇒ RouterType.LeastRAM + case LeastRAM() ⇒ RouterType.LeastRAM + case LeastMessages ⇒ RouterType.LeastMessages + case LeastMessages() ⇒ RouterType.LeastMessages + } } /** diff --git a/akka-actor/src/main/scala/akka/actor/Routing.scala b/akka-actor/src/main/scala/akka/actor/Routing.scala index 7da3087273..6bce48e677 100644 --- a/akka-actor/src/main/scala/akka/actor/Routing.scala +++ b/akka-actor/src/main/scala/akka/actor/Routing.scala @@ -3,22 +3,3 @@ */ package akka.actor -import akka.AkkaException - -class RoutingException(message: String) extends AkkaException(message) - -sealed trait RouterType - -/** - * @author Jonas Bonér - */ -object RouterType { - object Direct extends RouterType - object Random extends RouterType - object RoundRobin extends RouterType - object LeastCPU extends RouterType - object LeastRAM extends RouterType - object LeastMessages extends RouterType -} - -// FIXME move all routing in cluster here when we can \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala index f27ae58341..7f7da6d29c 100644 --- a/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala +++ b/akka-actor/src/main/scala/akka/cluster/ClusterInterface.scala @@ -10,6 +10,7 @@ import akka.actor._ import akka.dispatch.Future import akka.config.Config import akka.util._ +import akka.routing.RouterType import akka.AkkaException import com.eaio.uuid.UUID diff --git a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala index 91478c8eb2..2c95946f12 100644 --- a/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala +++ b/akka-actor/src/main/scala/akka/remoteinterface/RemoteInterface.scala @@ -6,6 +6,7 @@ package akka.remoteinterface import akka.japi.Creator import akka.actor._ +import DeploymentConfig._ import akka.util._ import akka.dispatch.Promise import akka.serialization._ @@ -24,21 +25,49 @@ trait RemoteModule { def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope protected[akka] def notifyListeners(message: ⇒ Any): Unit - private[akka] def actors: ConcurrentHashMap[String, ActorRef] - private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] - private[akka] def actorsFactories: ConcurrentHashMap[String, () ⇒ ActorRef] + private[akka] def actors: ConcurrentHashMap[String, ActorRef] // FIXME need to invalidate this cache on replication + private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map? + private[akka] def actorsFactories: ConcurrentHashMap[String, () ⇒ ActorRef] // FIXME what to do wit actorsFactories map? - /** Lookup methods **/ + private[akka] def findActorByAddress(address: String): ActorRef = { + val cachedActorRef = actors.get(address) + if (cachedActorRef ne null) cachedActorRef + else { + val actorRef = + Deployer.lookupDeploymentFor(address) match { + case Some(Deploy(_, router, _, Clustered(home, _, _))) ⇒ - private[akka] def findActorByAddress(address: String): ActorRef = actors.get(address) + if (DeploymentConfig.isHomeNode(home)) { // on home node + Actor.registry.actorFor(address) match { // try to look up in actor registry + case Some(actorRef) ⇒ // in registry -> DONE + actorRef + case None ⇒ // not in registry -> check out as 'ref' from cluster (which puts it in actor registry for next time around) + Actor.cluster.ref(address, DeploymentConfig.routerTypeFor(router)) + } + } else throw new IllegalActorStateException("Trying to look up remote actor on non-home node. FIXME: fix this behavior") + + case Some(Deploy(_, _, _, Local)) ⇒ + Actor.registry.actorFor(address).getOrElse(throw new IllegalActorStateException("Could not lookup locally deployed actor in actor registry")) + + case _ ⇒ + actors.get(address) // FIXME do we need to fall back to local here? If it is not clustered then it should not be a remote actor in the first place. Throw exception. + } + + actors.put(address, actorRef) // cache it for next time around + actorRef + } + } private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid) private[akka] def findActorFactory(address: String): () ⇒ ActorRef = actorsFactories.get(address) private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = { - var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length)) - else findActorByAddress(address) + // find by address + var actorRefOrNull = + if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length)) // FIXME remove lookup by UUID? probably + else findActorByAddress(address) + // find by uuid if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid) actorRefOrNull } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 99b985053b..83fd45a5cb 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -13,6 +13,103 @@ import scala.collection.immutable.Seq import java.util.concurrent.atomic.AtomicReference import annotation.tailrec +import akka.AkkaException + +class RoutingException(message: String) extends AkkaException(message) + +sealed trait RouterType + +/** + * @author Jonas Bonér + */ +object RouterType { + object Direct extends RouterType + object Random extends RouterType + object RoundRobin extends RouterType + object LeastCPU extends RouterType + object LeastRAM extends RouterType + object LeastMessages extends RouterType +} + +/** + * A Router is a trait whose purpose is to route incoming messages to actors. + */ +trait Router { this: Actor ⇒ + + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, ActorRef] + + protected def broadcast(message: Any) {} + + protected def dispatch: Receive = { + case Routing.Broadcast(message) ⇒ + broadcast(message) + case a if routes.isDefinedAt(a) ⇒ + if (isSenderDefined) routes(a).forward(transform(a))(someSelf) + else routes(a).!(transform(a))(None) + } + + def receive = dispatch + + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined +} + +/** + * An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors. + */ +abstract class UntypedRouter extends UntypedActor { + protected def transform(msg: Any): Any = msg + + protected def route(msg: Any): ActorRef + + protected def broadcast(message: Any) {} + + private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined + + @throws(classOf[Exception]) + def onReceive(msg: Any): Unit = msg match { + case m: Routing.Broadcast ⇒ broadcast(m.message) + case _ ⇒ + val r = route(msg) + if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") + if (isSenderDefined) r.forward(transform(msg))(someSelf) + else r.!(transform(msg))(None) + } +} + +/** + * A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to. + */ +trait LoadBalancer extends Router { self: Actor ⇒ + protected def seq: InfiniteIterator[ActorRef] + + protected def routes = { + case x if seq.hasNext ⇒ seq.next + } + + override def broadcast(message: Any) = seq.items.foreach(_ ! message) + + override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) +} + +/** + * A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to. + */ +abstract class UntypedLoadBalancer extends UntypedRouter { + protected def seq: InfiniteIterator[ActorRef] + + protected def route(msg: Any) = + if (seq.hasNext) seq.next + else null + + override def broadcast(message: Any) = seq.items.foreach(_ ! message) + + override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) +} + object Routing { sealed trait RoutingMessage @@ -118,82 +215,3 @@ case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends Infini override def exists(f: ActorRef ⇒ Boolean): Boolean = items.exists(f) } -/** - * A Router is a trait whose purpose is to route incoming messages to actors. - */ -trait Router { this: Actor ⇒ - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, ActorRef] - - protected def broadcast(message: Any) {} - - protected def dispatch: Receive = { - case Routing.Broadcast(message) ⇒ - broadcast(message) - case a if routes.isDefinedAt(a) ⇒ - if (isSenderDefined) routes(a).forward(transform(a))(someSelf) - else routes(a).!(transform(a))(None) - } - - def receive = dispatch - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined -} - -/** - * An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors. - */ -abstract class UntypedRouter extends UntypedActor { - protected def transform(msg: Any): Any = msg - - protected def route(msg: Any): ActorRef - - protected def broadcast(message: Any) {} - - private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined - - @throws(classOf[Exception]) - def onReceive(msg: Any): Unit = msg match { - case m: Routing.Broadcast ⇒ broadcast(m.message) - case _ ⇒ - val r = route(msg) - if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!") - if (isSenderDefined) r.forward(transform(msg))(someSelf) - else r.!(transform(msg))(None) - } -} - -/** - * A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -trait LoadBalancer extends Router { self: Actor ⇒ - protected def seq: InfiniteIterator[ActorRef] - - protected def routes = { - case x if seq.hasNext ⇒ seq.next - } - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) -} - -/** - * A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets - * to dispatch incoming messages to. - */ -abstract class UntypedLoadBalancer extends UntypedRouter { - protected def seq: InfiniteIterator[ActorRef] - - protected def route(msg: Any) = - if (seq.hasNext) seq.next - else null - - override def broadcast(message: Any) = seq.items.foreach(_ ! message) - - override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg)) -} - diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index 1d12c5dbdf..0588e1c07b 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -31,6 +31,7 @@ import Actor._ import akka.event.EventHandler import akka.dispatch.{ Dispatchers, Future } import akka.remoteinterface._ +import akka.routing.RouterType import akka.config.Config import Config._ import akka.serialization.{ Format, Serializers, Serializer, Compression } @@ -776,7 +777,8 @@ class DefaultClusterNode private[akka] ( actor } - refByAddress(actorAddress) + refByAddress(actorAddress).start() + } else throw new ClusterException("Not connected to cluster") /** diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala index 5fc58ca42a..eba563946e 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRef.scala @@ -19,21 +19,21 @@ import com.eaio.uuid.UUID * @author Jonas Bonér */ class ClusterActorRef private[akka] ( - actorAddresses: Array[Tuple2[UUID, InetSocketAddress]], - address: String, + inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], + actorAddress: String, timeout: Long, val replicationStrategy: ReplicationStrategy) - extends RemoteActorRef(address, timeout, None) { + extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef this: ClusterActorRef with Router.Router ⇒ - EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(address)) + EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(actorAddress)) - private[akka] val addresses = new AtomicReference[Map[InetSocketAddress, ActorRef]]( - (Map[InetSocketAddress, ActorRef]() /: actorAddresses) { - case (map, (uuid, address)) ⇒ map + (address -> createRemoteActorRef(uuid, address)) + private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]]( + (Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) { + case (map, (uuid, inetSocketAddress)) ⇒ map + (inetSocketAddress -> createRemoteActorRef(actorAddress, inetSocketAddress)) }) - def connections: Map[InetSocketAddress, ActorRef] = addresses.get + def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit = route(message)(senderOption) @@ -42,19 +42,20 @@ class ClusterActorRef private[akka] ( message: Any, timeout: Long, senderOption: Option[ActorRef], - senderFuture: Option[Promise[T]]): Promise[T] = + senderFuture: Option[Promise[T]]): Promise[T] = { route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]] + } - private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) { - addresses set (addresses.get map { - case (`from`, actorRef) ⇒ + private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) { + inetSocketAddressToActorRefMap set (inetSocketAddressToActorRefMap.get map { + case (`fromInetSocketAddress`, actorRef) ⇒ actorRef.stop() - (to, createRemoteActorRef(actorRef.uuid, to)) + (toInetSocketAddress, createRemoteActorRef(actorRef.address, toInetSocketAddress)) case other ⇒ other }) } - // clustered refs are always registered and looked up by UUID - private def createRemoteActorRef(uuid: UUID, address: InetSocketAddress) = - RemoteActorRef(uuidToString(uuid), Actor.TIMEOUT, None) + private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = { + RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None) + } } diff --git a/akka-cluster/src/main/scala/akka/cluster/Routing.scala b/akka-cluster/src/main/scala/akka/cluster/Routing.scala index d863299777..7291774d19 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Routing.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Routing.scala @@ -6,18 +6,17 @@ package akka.cluster import Cluster._ import akka.actor._ -import akka.actor.Actor._ -import akka.actor.RouterType._ +import Actor._ import akka.dispatch.Future -import akka.AkkaException - -import java.net.InetSocketAddress +import akka.routing.{ RouterType, RoutingException } +import RouterType._ import com.eaio.uuid.UUID -import annotation.tailrec -import java.util.concurrent.atomic.AtomicReference -class RoutingException(message: String) extends AkkaException(message) +import annotation.tailrec + +import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicReference /** * @author Jonas Bonér @@ -25,24 +24,14 @@ class RoutingException(message: String) extends AkkaException(message) object Router { def newRouter( routerType: RouterType, - addresses: Array[Tuple2[UUID, InetSocketAddress]], - serviceId: String, + inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]], + actorAddress: String, timeout: Long, replicationStrategy: ReplicationStrategy = ReplicationStrategy.WriteThrough): ClusterActorRef = { - routerType match { - case Direct ⇒ new ClusterActorRef( - addresses, serviceId, timeout, - replicationStrategy) with Direct - - case Random ⇒ new ClusterActorRef( - addresses, serviceId, timeout, - replicationStrategy) with Random - - case RoundRobin ⇒ new ClusterActorRef( - addresses, serviceId, timeout, - replicationStrategy) with RoundRobin - + case Direct ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, replicationStrategy) with Direct + case Random ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, replicationStrategy) with Random + case RoundRobin ⇒ new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, replicationStrategy) with RoundRobin case LeastCPU ⇒ sys.error("Router LeastCPU not supported yet") case LeastRAM ⇒ sys.error("Router LeastRAM not supported yet") case LeastMessages ⇒ sys.error("Router LeastMessages not supported yet") diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf index deaf85b42c..b96297f0c4 100644 --- a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode1.conf @@ -1,3 +1,4 @@ +akka.event-handler-level = "DEBUG" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" akka.actor.deployment.service-hello.clustered.replicas = 2 diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf index 0c1a16ea15..36795796c2 100644 --- a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmNode2.conf @@ -1,3 +1,4 @@ +akka.event-handler-level = "DEBUG" akka.actor.deployment.service-hello.router = "round-robin" akka.actor.deployment.service-hello.clustered.home = "node:node1" akka.actor.deployment.service-hello.clustered.replicas = 2 diff --git a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala index 8e32f3960b..a4b3489b41 100644 --- a/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala +++ b/akka-cluster/src/test/scala/akka/cluster/store_actor/StoreActorMultiJvmSpec.scala @@ -11,13 +11,16 @@ import org.scalatest.BeforeAndAfterAll import akka.cluster._ import akka.actor._ import Actor._ +import akka.config.Config object StoreActorMultiJvmSpec { val NrOfNodes = 2 class HelloWorld extends Actor with Serializable { def receive = { - case "Hello" ⇒ self.reply("World") + case "Hello" ⇒ + println("GOT HELLO on NODE: " + Config.nodename) + self.reply("World from node [" + Config.nodename + "]") } } } @@ -38,14 +41,16 @@ class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA Cluster.barrier("start-node2", NrOfNodes) {} Cluster.barrier("create-clustered-actor-node1", NrOfNodes) { - val pi = Actor.actorOf[HelloWorld]("service-hello") - pi must not equal (null) - pi.address must equal("service-hello") - pi.isInstanceOf[LocalActorRef] must be(true) + val hello = Actor.actorOf[HelloWorld]("service-hello") + hello must not equal (null) + hello.address must equal("service-hello") + hello.isInstanceOf[LocalActorRef] must be(true) } Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {} + Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) {} + Cluster.node.shutdown() } } @@ -76,11 +81,18 @@ class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers { Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {} + var hello: ActorRef = null Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) { - val pi = Actor.actorOf[HelloWorld]("service-hello") - pi must not equal (null) - pi.address must equal("service-hello") - pi.isInstanceOf[ClusterActorRef] must be(true) + hello = Actor.actorOf[HelloWorld]("service-hello") + hello must not equal (null) + hello.address must equal("service-hello") + hello.isInstanceOf[ClusterActorRef] must be(true) + } + + Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) { + hello must not equal (null) + val reply = (hello !! "Hello").as[String].getOrElse(fail("Should have recieved reply from node1")) + reply must equal("World from node [node1]") } Cluster.node.shutdown() diff --git a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java index f83430b478..5ebf1f58dd 100644 --- a/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java +++ b/akka-remote/src/main/java/akka/remote/protocol/RemoteProtocol.java @@ -1739,8 +1739,15 @@ public final class RemoteProtocol { public boolean hasAddress() { return hasAddress; } public java.lang.String getAddress() { return address_; } - // optional uint64 timeout = 2; - public static final int TIMEOUT_FIELD_NUMBER = 2; + // required bytes inetSocketAddress = 2; + public static final int INETSOCKETADDRESS_FIELD_NUMBER = 2; + private boolean hasInetSocketAddress; + private com.google.protobuf.ByteString inetSocketAddress_ = com.google.protobuf.ByteString.EMPTY; + public boolean hasInetSocketAddress() { return hasInetSocketAddress; } + public com.google.protobuf.ByteString getInetSocketAddress() { return inetSocketAddress_; } + + // optional uint64 timeout = 3; + public static final int TIMEOUT_FIELD_NUMBER = 3; private boolean hasTimeout; private long timeout_ = 0L; public boolean hasTimeout() { return hasTimeout; } @@ -1750,6 +1757,7 @@ public final class RemoteProtocol { } public final boolean isInitialized() { if (!hasAddress) return false; + if (!hasInetSocketAddress) return false; return true; } @@ -1759,8 +1767,11 @@ public final class RemoteProtocol { if (hasAddress()) { output.writeString(1, getAddress()); } + if (hasInetSocketAddress()) { + output.writeBytes(2, getInetSocketAddress()); + } if (hasTimeout()) { - output.writeUInt64(2, getTimeout()); + output.writeUInt64(3, getTimeout()); } getUnknownFields().writeTo(output); } @@ -1775,9 +1786,13 @@ public final class RemoteProtocol { size += com.google.protobuf.CodedOutputStream .computeStringSize(1, getAddress()); } + if (hasInetSocketAddress()) { + size += com.google.protobuf.CodedOutputStream + .computeBytesSize(2, getInetSocketAddress()); + } if (hasTimeout()) { size += com.google.protobuf.CodedOutputStream - .computeUInt64Size(2, getTimeout()); + .computeUInt64Size(3, getTimeout()); } size += getUnknownFields().getSerializedSize(); memoizedSerializedSize = size; @@ -1940,6 +1955,9 @@ public final class RemoteProtocol { if (other.hasAddress()) { setAddress(other.getAddress()); } + if (other.hasInetSocketAddress()) { + setInetSocketAddress(other.getInetSocketAddress()); + } if (other.hasTimeout()) { setTimeout(other.getTimeout()); } @@ -1972,7 +1990,11 @@ public final class RemoteProtocol { setAddress(input.readString()); break; } - case 16: { + case 18: { + setInetSocketAddress(input.readBytes()); + break; + } + case 24: { setTimeout(input.readUInt64()); break; } @@ -2002,7 +2024,28 @@ public final class RemoteProtocol { return this; } - // optional uint64 timeout = 2; + // required bytes inetSocketAddress = 2; + public boolean hasInetSocketAddress() { + return result.hasInetSocketAddress(); + } + public com.google.protobuf.ByteString getInetSocketAddress() { + return result.getInetSocketAddress(); + } + public Builder setInetSocketAddress(com.google.protobuf.ByteString value) { + if (value == null) { + throw new NullPointerException(); + } + result.hasInetSocketAddress = true; + result.inetSocketAddress_ = value; + return this; + } + public Builder clearInetSocketAddress() { + result.hasInetSocketAddress = false; + result.inetSocketAddress_ = getDefaultInstance().getInetSocketAddress(); + return this; + } + + // optional uint64 timeout = 3; public boolean hasTimeout() { return result.hasTimeout(); } @@ -5679,37 +5722,37 @@ public final class RemoteProtocol { "der\030\007 \001(\0132\027.RemoteActorRefProtocol\022(\n\010me", "tadata\030\010 \003(\0132\026.MetadataEntryProtocol\"J\n\025" + "RemoteControlProtocol\022\016\n\006cookie\030\001 \001(\t\022!\n" + - "\013commandType\030\002 \002(\0162\014.CommandType\":\n\026Remo" + - "teActorRefProtocol\022\017\n\007address\030\001 \002(\t\022\017\n\007t" + - "imeout\030\002 \001(\004\"\323\002\n\032SerializedActorRefProto" + - "col\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022\017\n\007addr" + - "ess\030\002 \002(\t\022\026\n\016actorClassname\030\003 \002(\t\022\025\n\ract" + - "orInstance\030\004 \001(\014\022\033\n\023serializerClassname\030" + - "\005 \001(\t\022\017\n\007timeout\030\006 \001(\004\022\026\n\016receiveTimeout" + - "\030\007 \001(\004\022%\n\tlifeCycle\030\010 \001(\0132\022.LifeCyclePro", - "tocol\022+\n\nsupervisor\030\t \001(\0132\027.RemoteActorR" + - "efProtocol\022\024\n\014hotswapStack\030\n \001(\014\022(\n\010mess" + - "ages\030\013 \003(\0132\026.RemoteMessageProtocol\"g\n\037Se" + - "rializedTypedActorRefProtocol\022-\n\010actorRe" + - "f\030\001 \002(\0132\033.SerializedActorRefProtocol\022\025\n\r" + - "interfaceName\030\002 \002(\t\"r\n\017MessageProtocol\0225" + - "\n\023serializationScheme\030\001 \002(\0162\030.Serializat" + - "ionSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017messag" + - "eManifest\030\003 \001(\014\"R\n\021ActorInfoProtocol\022\033\n\004" + - "uuid\030\001 \002(\0132\r.UuidProtocol\022\017\n\007timeout\030\002 \002", - "(\004\022\017\n\007address\030\003 \001(\t\")\n\014UuidProtocol\022\014\n\004h" + - "igh\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntryP" + - "rotocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6\n\021L" + - "ifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016.Li" + - "feCycleType\"1\n\017AddressProtocol\022\020\n\010hostna" + - "me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProto" + - "col\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*" + - "(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020" + - "\002*]\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013" + - "\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON", - "\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPER" + - "MANENT\020\001\022\r\n\tTEMPORARY\020\002B\030\n\024akka.remote.p" + - "rotocolH\001" + "\013commandType\030\002 \002(\0162\014.CommandType\"U\n\026Remo" + + "teActorRefProtocol\022\017\n\007address\030\001 \002(\t\022\031\n\021i" + + "netSocketAddress\030\002 \002(\014\022\017\n\007timeout\030\003 \001(\004\"" + + "\323\002\n\032SerializedActorRefProtocol\022\033\n\004uuid\030\001" + + " \002(\0132\r.UuidProtocol\022\017\n\007address\030\002 \002(\t\022\026\n\016" + + "actorClassname\030\003 \002(\t\022\025\n\ractorInstance\030\004 " + + "\001(\014\022\033\n\023serializerClassname\030\005 \001(\t\022\017\n\007time" + + "out\030\006 \001(\004\022\026\n\016receiveTimeout\030\007 \001(\004\022%\n\tlif", + "eCycle\030\010 \001(\0132\022.LifeCycleProtocol\022+\n\nsupe" + + "rvisor\030\t \001(\0132\027.RemoteActorRefProtocol\022\024\n" + + "\014hotswapStack\030\n \001(\014\022(\n\010messages\030\013 \003(\0132\026." + + "RemoteMessageProtocol\"g\n\037SerializedTyped" + + "ActorRefProtocol\022-\n\010actorRef\030\001 \002(\0132\033.Ser" + + "ializedActorRefProtocol\022\025\n\rinterfaceName" + + "\030\002 \002(\t\"r\n\017MessageProtocol\0225\n\023serializati" + + "onScheme\030\001 \002(\0162\030.SerializationSchemeType" + + "\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001" + + "(\014\"R\n\021ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.", + "UuidProtocol\022\017\n\007timeout\030\002 \002(\004\022\017\n\007address" + + "\030\003 \001(\t\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003" + + "low\030\002 \002(\004\"3\n\025MetadataEntryProtocol\022\013\n\003ke" + + "y\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6\n\021LifeCycleProto" + + "col\022!\n\tlifeCycle\030\001 \002(\0162\016.LifeCycleType\"1" + + "\n\017AddressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004p" + + "ort\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassn" + + "ame\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*(\n\013CommandTyp" + + "e\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*]\n\027Serializ" + + "ationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016", + "\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBU" + + "F\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tT" + + "EMPORARY\020\002B\030\n\024akka.remote.protocolH\001" }; com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner = new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() { @@ -5745,7 +5788,7 @@ public final class RemoteProtocol { internal_static_RemoteActorRefProtocol_fieldAccessorTable = new com.google.protobuf.GeneratedMessage.FieldAccessorTable( internal_static_RemoteActorRefProtocol_descriptor, - new java.lang.String[] { "Address", "Timeout", }, + new java.lang.String[] { "Address", "InetSocketAddress", "Timeout", }, akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.class, akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder.class); internal_static_SerializedActorRefProtocol_descriptor = diff --git a/akka-remote/src/main/protocol/RemoteProtocol.proto b/akka-remote/src/main/protocol/RemoteProtocol.proto index 3b51a6e6d8..795a58814c 100644 --- a/akka-remote/src/main/protocol/RemoteProtocol.proto +++ b/akka-remote/src/main/protocol/RemoteProtocol.proto @@ -52,7 +52,8 @@ enum CommandType { */ message RemoteActorRefProtocol { required string address = 1; - optional uint64 timeout = 2; + required bytes inetSocketAddress = 2; + optional uint64 timeout = 3; } /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala similarity index 100% rename from akka-remote/src/main/scala/akka/remote/RemoteShared.scala rename to akka-remote/src/main/scala/akka/remote/RemoteConfig.scala diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 6411104295..5aff8408eb 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -25,7 +25,8 @@ import akka.actor.{ LifeCycleMessage } import akka.actor.Actor._ -import akka.config.Config._ +import akka.config.Config +import Config._ import akka.util._ import akka.event.EventHandler @@ -206,7 +207,10 @@ abstract class RemoteClient private[akka] ( def send[T]( request: RemoteMessageProtocol, senderFuture: Option[Promise[T]]): Option[Promise[T]] = { + if (isRunning) { + EventHandler.debug(this, "Sending remote message [%s]".format(request)) + if (request.getOneWay) { try { val future = currentChannel.write(RemoteEncoder.encode(request)) @@ -225,6 +229,7 @@ abstract class RemoteClient private[akka] ( } else throw e } None + } else { val futureResult = if (senderFuture.isDefined) senderFuture.get else new DefaultPromise[T](request.getActorInfo.getTimeout) @@ -254,6 +259,7 @@ abstract class RemoteClient private[akka] ( } Some(futureResult) } + } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress) notifyListeners(RemoteClientError(exception, module, remoteAddress)) @@ -544,16 +550,16 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with def optimizeLocalScoped_?() = optimizeLocal.get - protected[akka] def actorFor(address: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { + protected[akka] def actorFor(actorAddress: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = { + val inetSocketAddress = this.address if (optimizeLocalScoped_?) { - val home = this.address - if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) { //TODO: switch to InetSocketAddress.equals? - val localRef = findActorByAddressOrUuid(address, address) + if ((host == inetSocketAddress.getAddress.getHostAddress || host == inetSocketAddress.getHostName) && port == inetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals? + val localRef = findActorByAddressOrUuid(actorAddress, actorAddress) if (localRef ne null) return localRef //Code significantly simpler with the return statement } } - RemoteActorRef(address, timeout, loader) + RemoteActorRef(inetSocketAddress, actorAddress, timeout, loader) } } @@ -826,7 +832,7 @@ class RemoteServerHandler( // stop all session actors for ( map ← Option(sessionActors.remove(event.getChannel)); - actor ← collectionAsScalaIterable(map.values) + actor ← collectionAsScalaIterable(map.values)gddd ) { try { actor ! PoisonPill } catch { case e: Exception ⇒ } } @@ -839,11 +845,15 @@ class RemoteServerHandler( server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) } - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match { - case null ⇒ throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) - case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ handleRemoteMessageProtocol(remote.getMessage, event.getChannel) - //case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet) - case _ ⇒ //ignore + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { + event.getMessage match { + case null ⇒ + throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event) + case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ + handleRemoteMessageProtocol(remote.getMessage, event.getChannel) + //case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet) + case _ ⇒ //ignore + } } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { @@ -857,12 +867,13 @@ class RemoteServerHandler( case _ ⇒ None } - private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = + private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { + EventHandler.debug(this, "Received remote message [%s]".format(request)) dispatchToActor(request, channel) + } private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { val actorInfo = request.getActorInfo - val actorRef = try { createActor(actorInfo, channel) } catch { case e: SecurityException ⇒ @@ -950,11 +961,16 @@ class RemoteServerHandler( val uuid = actorInfo.getUuid val address = actorInfo.getAddress - server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match { + EventHandler.debug(this, "Creating an remotely available actor for address [%s] on node [%s]".format(address, Config.nodename)) + + val actorRef = server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match { // the actor has not been registered globally. See if we have it in the session - case null ⇒ createSessionActor(actorInfo, channel) + case null ⇒ createSessionActor(actorInfo, channel) // FIXME now session scoped actors are disabled, how to introduce them? case actorRef ⇒ actorRef } + + if (actorRef eq null) throw new IllegalActorStateException("Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]") + actorRef } private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = { diff --git a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala index a0705d7a52..8a34a0ec5e 100644 --- a/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala +++ b/akka-remote/src/main/scala/akka/serialization/SerializationProtocol.scala @@ -156,6 +156,7 @@ object ActorSerialization { } object RemoteActorSerialization { + /** * Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance. */ @@ -172,23 +173,29 @@ object RemoteActorSerialization { * Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance. */ private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = { - val ref = RemoteActorRef( + RemoteActorRef( + Serializers.Java.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress])).asInstanceOf[InetSocketAddress], protocol.getAddress, protocol.getTimeout, loader) - ref } /** * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ def toRemoteActorRefProtocol(actor: ActorRef): RemoteActorRefProtocol = { - actor match { - case ar: LocalActorRef ⇒ Actor.remote.registerByUuid(ar) - case _ ⇒ {} + val remoteAddress = actor match { + case ar: RemoteActorRef ⇒ + ar.remoteAddress + case ar: LocalActorRef ⇒ + Actor.remote.registerByUuid(ar) + ReflectiveAccess.RemoteModule.configDefaultAddress + case _ ⇒ + ReflectiveAccess.RemoteModule.configDefaultAddress } RemoteActorRefProtocol.newBuilder - .setAddress("uuid:" + actor.uuid.toString) + .setInetSocketAddress(ByteString.copyFrom(Serializers.Java.toBinary(remoteAddress))) + .setAddress(actor.address) .setTimeout(actor.timeout) .build }