Full clustering circle now works, remote communication.

Added test for cluster communication.
Refactored deployment parsing.
Added InetSocketAddress to remote protocol.

Signed-off-by: Jonas Bonér <jonasremove@jonasboner.com>
This commit is contained in:
Jonas Bonér 2011-05-24 19:04:25 +02:00
parent 5fd10978d8
commit f75dcdbd15
18 changed files with 360 additions and 246 deletions

View file

@ -389,23 +389,12 @@ object Actor extends ListenerManagement {
private def newClusterActorRef[T <: Actor](factory: () ActorRef, address: String, deploy: Deploy): ActorRef = {
deploy match {
case Deploy(_, router, serializerClassName, Clustered(home, replication: Replication, state: State))
ClusterModule.ensureEnabled()
ClusterModule.ensureEnabled()
if (!Actor.remote.isRunning) throw new IllegalStateException("Remote server is not running")
val isHomeNode = home match {
case Host(hostname) hostname == Config.hostname
case IP(address) address == "0.0.0.0" // FIXME checking if IP address is on home node is missing
case Node(nodename) nodename == Config.nodename
}
val replicas = replication match {
case Replicate(replicas) replicas
case AutoReplicate -1
case AutoReplicate() -1
case NoReplicas 0
case NoReplicas() 0
}
val isHomeNode = DeploymentConfig.isHomeNode(home)
val replicas = DeploymentConfig.replicaValueFor(replication)
if (isHomeNode) { // home node for clustered actor
@ -438,26 +427,14 @@ object Actor extends ListenerManagement {
if (!cluster.isClustered(address)) cluster.store(factory().start(), replicas, false, serializer) // add actor to cluster registry (if not already added)
// home node, check out as LocalActorRef
cluster
.use(address, serializer)
.getOrElse(throw new ConfigurationException("Could not check out actor [" + address + "] from cluster registry as a \"local\" actor"))
} else {
val routerType = router match {
case Direct RouterType.Direct
case Direct() RouterType.Direct
case RoundRobin RouterType.RoundRobin
case RoundRobin() RouterType.RoundRobin
case Random RouterType.Random
case Random() RouterType.Random
case LeastCPU RouterType.LeastCPU
case LeastCPU() RouterType.LeastCPU
case LeastRAM RouterType.LeastRAM
case LeastRAM() RouterType.LeastRAM
case LeastMessages RouterType.LeastMessages
case LeastMessages() RouterType.LeastMessages
}
cluster.ref(address, routerType)
// remote node (not home node), check out as ClusterActorRef
cluster.ref(address, DeploymentConfig.routerTypeFor(router))
}
/*

View file

@ -986,6 +986,7 @@ object RemoteActorSystemMessage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
val remoteAddress: InetSocketAddress,
val address: String,
_timeout: Long,
loader: Option[ClassLoader])
@ -996,7 +997,7 @@ private[akka] case class RemoteActorRef private[akka] (
timeout = _timeout
// FIXME BAD, we should not have different ActorRefs
/*
import DeploymentConfig._
val remoteAddress = Deployer.deploymentFor(address) match {
case Deploy(_, _, _, Clustered(home, _, _))
@ -1011,7 +1012,7 @@ private[akka] case class RemoteActorRef private[akka] (
//throw new IllegalStateException(
// "Actor with Address [" + address + "] is not bound to a Clustered Deployment")
}
*/
start()
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]) {

View file

@ -11,6 +11,7 @@ import java.util.concurrent.ConcurrentHashMap
import akka.event.EventHandler
import akka.actor.DeploymentConfig._
import akka.config.{ ConfigurationException, Config }
import akka.routing.RouterType
import akka.util.ReflectiveAccess._
import akka.serialization.Format
import akka.AkkaException
@ -106,6 +107,39 @@ object DeploymentConfig {
// For Scala API
case object Stateless extends State
case object Stateful extends State
// --------------------------------
// --- Helper methods for parsing
// --------------------------------
def isHomeNode(home: Home): Boolean = home match {
case Host(hostname) hostname == Config.hostname
case IP(address) address == "0.0.0.0" // FIXME checking if IP address is on home node is missing
case Node(nodename) nodename == Config.nodename
}
def replicaValueFor(replication: Replication): Int = replication match {
case Replicate(replicas) replicas
case AutoReplicate -1
case AutoReplicate() -1
case NoReplicas 0
case NoReplicas() 0
}
def routerTypeFor(routing: Routing): RouterType = routing match {
case Direct RouterType.Direct
case Direct() RouterType.Direct
case RoundRobin RouterType.RoundRobin
case RoundRobin() RouterType.RoundRobin
case Random RouterType.Random
case Random() RouterType.Random
case LeastCPU RouterType.LeastCPU
case LeastCPU() RouterType.LeastCPU
case LeastRAM RouterType.LeastRAM
case LeastRAM() RouterType.LeastRAM
case LeastMessages RouterType.LeastMessages
case LeastMessages() RouterType.LeastMessages
}
}
/**

View file

@ -3,22 +3,3 @@
*/
package akka.actor
import akka.AkkaException
class RoutingException(message: String) extends AkkaException(message)
sealed trait RouterType
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RouterType {
object Direct extends RouterType
object Random extends RouterType
object RoundRobin extends RouterType
object LeastCPU extends RouterType
object LeastRAM extends RouterType
object LeastMessages extends RouterType
}
// FIXME move all routing in cluster here when we can

View file

@ -10,6 +10,7 @@ import akka.actor._
import akka.dispatch.Future
import akka.config.Config
import akka.util._
import akka.routing.RouterType
import akka.AkkaException
import com.eaio.uuid.UUID

View file

@ -6,6 +6,7 @@ package akka.remoteinterface
import akka.japi.Creator
import akka.actor._
import DeploymentConfig._
import akka.util._
import akka.dispatch.Promise
import akka.serialization._
@ -24,21 +25,49 @@ trait RemoteModule {
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: Any): Unit
private[akka] def actors: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef]
private[akka] def actorsFactories: ConcurrentHashMap[String, () ActorRef]
private[akka] def actors: ConcurrentHashMap[String, ActorRef] // FIXME need to invalidate this cache on replication
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map?
private[akka] def actorsFactories: ConcurrentHashMap[String, () ActorRef] // FIXME what to do wit actorsFactories map?
/** Lookup methods **/
private[akka] def findActorByAddress(address: String): ActorRef = {
val cachedActorRef = actors.get(address)
if (cachedActorRef ne null) cachedActorRef
else {
val actorRef =
Deployer.lookupDeploymentFor(address) match {
case Some(Deploy(_, router, _, Clustered(home, _, _)))
private[akka] def findActorByAddress(address: String): ActorRef = actors.get(address)
if (DeploymentConfig.isHomeNode(home)) { // on home node
Actor.registry.actorFor(address) match { // try to look up in actor registry
case Some(actorRef) // in registry -> DONE
actorRef
case None // not in registry -> check out as 'ref' from cluster (which puts it in actor registry for next time around)
Actor.cluster.ref(address, DeploymentConfig.routerTypeFor(router))
}
} else throw new IllegalActorStateException("Trying to look up remote actor on non-home node. FIXME: fix this behavior")
case Some(Deploy(_, _, _, Local))
Actor.registry.actorFor(address).getOrElse(throw new IllegalActorStateException("Could not lookup locally deployed actor in actor registry"))
case _
actors.get(address) // FIXME do we need to fall back to local here? If it is not clustered then it should not be a remote actor in the first place. Throw exception.
}
actors.put(address, actorRef) // cache it for next time around
actorRef
}
}
private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid)
private[akka] def findActorFactory(address: String): () ActorRef = actorsFactories.get(address)
private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = {
var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length))
// find by address
var actorRefOrNull =
if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length)) // FIXME remove lookup by UUID? probably
else findActorByAddress(address)
// find by uuid
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}

View file

@ -13,6 +13,103 @@ import scala.collection.immutable.Seq
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.AkkaException
class RoutingException(message: String) extends AkkaException(message)
sealed trait RouterType
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RouterType {
object Direct extends RouterType
object Random extends RouterType
object RoundRobin extends RouterType
object LeastCPU extends RouterType
object LeastRAM extends RouterType
object LeastMessages extends RouterType
}
/**
* A Router is a trait whose purpose is to route incoming messages to actors.
*/
trait Router { this: Actor
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorRef]
protected def broadcast(message: Any) {}
protected def dispatch: Receive = {
case Routing.Broadcast(message)
broadcast(message)
case a if routes.isDefinedAt(a)
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
}
def receive = dispatch
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
}
/**
* An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors.
*/
abstract class UntypedRouter extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any): ActorRef
protected def broadcast(message: Any) {}
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = msg match {
case m: Routing.Broadcast broadcast(m.message)
case _
val r = route(msg)
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
/**
* A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
trait LoadBalancer extends Router { self: Actor
protected def seq: InfiniteIterator[ActorRef]
protected def routes = {
case x if seq.hasNext seq.next
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedRouter {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) =
if (seq.hasNext) seq.next
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
object Routing {
sealed trait RoutingMessage
@ -118,82 +215,3 @@ case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends Infini
override def exists(f: ActorRef Boolean): Boolean = items.exists(f)
}
/**
* A Router is a trait whose purpose is to route incoming messages to actors.
*/
trait Router { this: Actor
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorRef]
protected def broadcast(message: Any) {}
protected def dispatch: Receive = {
case Routing.Broadcast(message)
broadcast(message)
case a if routes.isDefinedAt(a)
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
}
def receive = dispatch
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
}
/**
* An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors.
*/
abstract class UntypedRouter extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any): ActorRef
protected def broadcast(message: Any) {}
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = msg match {
case m: Routing.Broadcast broadcast(m.message)
case _
val r = route(msg)
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
/**
* A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
trait LoadBalancer extends Router { self: Actor
protected def seq: InfiniteIterator[ActorRef]
protected def routes = {
case x if seq.hasNext seq.next
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedRouter {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) =
if (seq.hasNext) seq.next
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
override def isDefinedAt(msg: Any) = seq.exists(_.isDefinedAt(msg))
}

View file

@ -31,6 +31,7 @@ import Actor._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future }
import akka.remoteinterface._
import akka.routing.RouterType
import akka.config.Config
import Config._
import akka.serialization.{ Format, Serializers, Serializer, Compression }
@ -776,7 +777,8 @@ class DefaultClusterNode private[akka] (
actor
}
refByAddress(actorAddress)
refByAddress(actorAddress).start()
} else throw new ClusterException("Not connected to cluster")
/**

View file

@ -19,21 +19,21 @@ import com.eaio.uuid.UUID
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ClusterActorRef private[akka] (
actorAddresses: Array[Tuple2[UUID, InetSocketAddress]],
address: String,
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
actorAddress: String,
timeout: Long,
val replicationStrategy: ReplicationStrategy)
extends RemoteActorRef(address, timeout, None) {
extends RemoteActorRef(null, actorAddress, timeout, None) { // FIXME UGLY HACK - should not extend RemoteActorRef
this: ClusterActorRef with Router.Router
EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(address))
EventHandler.debug(this, "Creating a ClusterActorRef for actor with address [%s]".format(actorAddress))
private[akka] val addresses = new AtomicReference[Map[InetSocketAddress, ActorRef]](
(Map[InetSocketAddress, ActorRef]() /: actorAddresses) {
case (map, (uuid, address)) map + (address -> createRemoteActorRef(uuid, address))
private[akka] val inetSocketAddressToActorRefMap = new AtomicReference[Map[InetSocketAddress, ActorRef]](
(Map[InetSocketAddress, ActorRef]() /: inetSocketAddresses) {
case (map, (uuid, inetSocketAddress)) map + (inetSocketAddress -> createRemoteActorRef(actorAddress, inetSocketAddress))
})
def connections: Map[InetSocketAddress, ActorRef] = addresses.get
def connections: Map[InetSocketAddress, ActorRef] = inetSocketAddressToActorRefMap.get
override def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
route(message)(senderOption)
@ -42,19 +42,20 @@ class ClusterActorRef private[akka] (
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[Promise[T]]): Promise[T] =
senderFuture: Option[Promise[T]]): Promise[T] = {
route[T](message, timeout)(senderOption).asInstanceOf[Promise[T]]
}
private[akka] def failOver(from: InetSocketAddress, to: InetSocketAddress) {
addresses set (addresses.get map {
case (`from`, actorRef)
private[akka] def failOver(fromInetSocketAddress: InetSocketAddress, toInetSocketAddress: InetSocketAddress) {
inetSocketAddressToActorRefMap set (inetSocketAddressToActorRefMap.get map {
case (`fromInetSocketAddress`, actorRef)
actorRef.stop()
(to, createRemoteActorRef(actorRef.uuid, to))
(toInetSocketAddress, createRemoteActorRef(actorRef.address, toInetSocketAddress))
case other other
})
}
// clustered refs are always registered and looked up by UUID
private def createRemoteActorRef(uuid: UUID, address: InetSocketAddress) =
RemoteActorRef(uuidToString(uuid), Actor.TIMEOUT, None)
private def createRemoteActorRef(actorAddress: String, inetSocketAddress: InetSocketAddress) = {
RemoteActorRef(inetSocketAddress, actorAddress, Actor.TIMEOUT, None)
}
}

View file

@ -6,18 +6,17 @@ package akka.cluster
import Cluster._
import akka.actor._
import akka.actor.Actor._
import akka.actor.RouterType._
import Actor._
import akka.dispatch.Future
import akka.AkkaException
import java.net.InetSocketAddress
import akka.routing.{ RouterType, RoutingException }
import RouterType._
import com.eaio.uuid.UUID
import annotation.tailrec
import java.util.concurrent.atomic.AtomicReference
class RoutingException(message: String) extends AkkaException(message)
import annotation.tailrec
import java.net.InetSocketAddress
import java.util.concurrent.atomic.AtomicReference
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -25,24 +24,14 @@ class RoutingException(message: String) extends AkkaException(message)
object Router {
def newRouter(
routerType: RouterType,
addresses: Array[Tuple2[UUID, InetSocketAddress]],
serviceId: String,
inetSocketAddresses: Array[Tuple2[UUID, InetSocketAddress]],
actorAddress: String,
timeout: Long,
replicationStrategy: ReplicationStrategy = ReplicationStrategy.WriteThrough): ClusterActorRef = {
routerType match {
case Direct new ClusterActorRef(
addresses, serviceId, timeout,
replicationStrategy) with Direct
case Random new ClusterActorRef(
addresses, serviceId, timeout,
replicationStrategy) with Random
case RoundRobin new ClusterActorRef(
addresses, serviceId, timeout,
replicationStrategy) with RoundRobin
case Direct new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, replicationStrategy) with Direct
case Random new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, replicationStrategy) with Random
case RoundRobin new ClusterActorRef(inetSocketAddresses, actorAddress, timeout, replicationStrategy) with RoundRobin
case LeastCPU sys.error("Router LeastCPU not supported yet")
case LeastRAM sys.error("Router LeastRAM not supported yet")
case LeastMessages sys.error("Router LeastMessages not supported yet")

View file

@ -1,3 +1,4 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 2

View file

@ -1,3 +1,4 @@
akka.event-handler-level = "DEBUG"
akka.actor.deployment.service-hello.router = "round-robin"
akka.actor.deployment.service-hello.clustered.home = "node:node1"
akka.actor.deployment.service-hello.clustered.replicas = 2

View file

@ -11,13 +11,16 @@ import org.scalatest.BeforeAndAfterAll
import akka.cluster._
import akka.actor._
import Actor._
import akka.config.Config
object StoreActorMultiJvmSpec {
val NrOfNodes = 2
class HelloWorld extends Actor with Serializable {
def receive = {
case "Hello" self.reply("World")
case "Hello"
println("GOT HELLO on NODE: " + Config.nodename)
self.reply("World from node [" + Config.nodename + "]")
}
}
}
@ -38,14 +41,16 @@ class StoreActorMultiJvmNode1 extends WordSpec with MustMatchers with BeforeAndA
Cluster.barrier("start-node2", NrOfNodes) {}
Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {
val pi = Actor.actorOf[HelloWorld]("service-hello")
pi must not equal (null)
pi.address must equal("service-hello")
pi.isInstanceOf[LocalActorRef] must be(true)
val hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[LocalActorRef] must be(true)
}
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {}
Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) {}
Cluster.node.shutdown()
}
}
@ -76,11 +81,18 @@ class StoreActorMultiJvmNode2 extends WordSpec with MustMatchers {
Cluster.barrier("create-clustered-actor-node1", NrOfNodes) {}
var hello: ActorRef = null
Cluster.barrier("get-ref-to-actor-on-node2", NrOfNodes) {
val pi = Actor.actorOf[HelloWorld]("service-hello")
pi must not equal (null)
pi.address must equal("service-hello")
pi.isInstanceOf[ClusterActorRef] must be(true)
hello = Actor.actorOf[HelloWorld]("service-hello")
hello must not equal (null)
hello.address must equal("service-hello")
hello.isInstanceOf[ClusterActorRef] must be(true)
}
Cluster.barrier("send-message-from-node2-to-node1", NrOfNodes) {
hello must not equal (null)
val reply = (hello !! "Hello").as[String].getOrElse(fail("Should have recieved reply from node1"))
reply must equal("World from node [node1]")
}
Cluster.node.shutdown()

View file

@ -1739,8 +1739,15 @@ public final class RemoteProtocol {
public boolean hasAddress() { return hasAddress; }
public java.lang.String getAddress() { return address_; }
// optional uint64 timeout = 2;
public static final int TIMEOUT_FIELD_NUMBER = 2;
// required bytes inetSocketAddress = 2;
public static final int INETSOCKETADDRESS_FIELD_NUMBER = 2;
private boolean hasInetSocketAddress;
private com.google.protobuf.ByteString inetSocketAddress_ = com.google.protobuf.ByteString.EMPTY;
public boolean hasInetSocketAddress() { return hasInetSocketAddress; }
public com.google.protobuf.ByteString getInetSocketAddress() { return inetSocketAddress_; }
// optional uint64 timeout = 3;
public static final int TIMEOUT_FIELD_NUMBER = 3;
private boolean hasTimeout;
private long timeout_ = 0L;
public boolean hasTimeout() { return hasTimeout; }
@ -1750,6 +1757,7 @@ public final class RemoteProtocol {
}
public final boolean isInitialized() {
if (!hasAddress) return false;
if (!hasInetSocketAddress) return false;
return true;
}
@ -1759,8 +1767,11 @@ public final class RemoteProtocol {
if (hasAddress()) {
output.writeString(1, getAddress());
}
if (hasInetSocketAddress()) {
output.writeBytes(2, getInetSocketAddress());
}
if (hasTimeout()) {
output.writeUInt64(2, getTimeout());
output.writeUInt64(3, getTimeout());
}
getUnknownFields().writeTo(output);
}
@ -1775,9 +1786,13 @@ public final class RemoteProtocol {
size += com.google.protobuf.CodedOutputStream
.computeStringSize(1, getAddress());
}
if (hasInetSocketAddress()) {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(2, getInetSocketAddress());
}
if (hasTimeout()) {
size += com.google.protobuf.CodedOutputStream
.computeUInt64Size(2, getTimeout());
.computeUInt64Size(3, getTimeout());
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@ -1940,6 +1955,9 @@ public final class RemoteProtocol {
if (other.hasAddress()) {
setAddress(other.getAddress());
}
if (other.hasInetSocketAddress()) {
setInetSocketAddress(other.getInetSocketAddress());
}
if (other.hasTimeout()) {
setTimeout(other.getTimeout());
}
@ -1972,7 +1990,11 @@ public final class RemoteProtocol {
setAddress(input.readString());
break;
}
case 16: {
case 18: {
setInetSocketAddress(input.readBytes());
break;
}
case 24: {
setTimeout(input.readUInt64());
break;
}
@ -2002,7 +2024,28 @@ public final class RemoteProtocol {
return this;
}
// optional uint64 timeout = 2;
// required bytes inetSocketAddress = 2;
public boolean hasInetSocketAddress() {
return result.hasInetSocketAddress();
}
public com.google.protobuf.ByteString getInetSocketAddress() {
return result.getInetSocketAddress();
}
public Builder setInetSocketAddress(com.google.protobuf.ByteString value) {
if (value == null) {
throw new NullPointerException();
}
result.hasInetSocketAddress = true;
result.inetSocketAddress_ = value;
return this;
}
public Builder clearInetSocketAddress() {
result.hasInetSocketAddress = false;
result.inetSocketAddress_ = getDefaultInstance().getInetSocketAddress();
return this;
}
// optional uint64 timeout = 3;
public boolean hasTimeout() {
return result.hasTimeout();
}
@ -5679,37 +5722,37 @@ public final class RemoteProtocol {
"der\030\007 \001(\0132\027.RemoteActorRefProtocol\022(\n\010me",
"tadata\030\010 \003(\0132\026.MetadataEntryProtocol\"J\n\025" +
"RemoteControlProtocol\022\016\n\006cookie\030\001 \001(\t\022!\n" +
"\013commandType\030\002 \002(\0162\014.CommandType\":\n\026Remo" +
"teActorRefProtocol\022\017\n\007address\030\001 \002(\t\022\017\n\007t" +
"imeout\030\002 \001(\004\"\323\002\n\032SerializedActorRefProto" +
"col\022\033\n\004uuid\030\001 \002(\0132\r.UuidProtocol\022\017\n\007addr" +
"ess\030\002 \002(\t\022\026\n\016actorClassname\030\003 \002(\t\022\025\n\ract" +
"orInstance\030\004 \001(\014\022\033\n\023serializerClassname\030" +
"\005 \001(\t\022\017\n\007timeout\030\006 \001(\004\022\026\n\016receiveTimeout" +
"\030\007 \001(\004\022%\n\tlifeCycle\030\010 \001(\0132\022.LifeCyclePro",
"tocol\022+\n\nsupervisor\030\t \001(\0132\027.RemoteActorR" +
"efProtocol\022\024\n\014hotswapStack\030\n \001(\014\022(\n\010mess" +
"ages\030\013 \003(\0132\026.RemoteMessageProtocol\"g\n\037Se" +
"rializedTypedActorRefProtocol\022-\n\010actorRe" +
"f\030\001 \002(\0132\033.SerializedActorRefProtocol\022\025\n\r" +
"interfaceName\030\002 \002(\t\"r\n\017MessageProtocol\0225" +
"\n\023serializationScheme\030\001 \002(\0162\030.Serializat" +
"ionSchemeType\022\017\n\007message\030\002 \002(\014\022\027\n\017messag" +
"eManifest\030\003 \001(\014\"R\n\021ActorInfoProtocol\022\033\n\004" +
"uuid\030\001 \002(\0132\r.UuidProtocol\022\017\n\007timeout\030\002 \002",
"(\004\022\017\n\007address\030\003 \001(\t\")\n\014UuidProtocol\022\014\n\004h" +
"igh\030\001 \002(\004\022\013\n\003low\030\002 \002(\004\"3\n\025MetadataEntryP" +
"rotocol\022\013\n\003key\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6\n\021L" +
"ifeCycleProtocol\022!\n\tlifeCycle\030\001 \002(\0162\016.Li" +
"feCycleType\"1\n\017AddressProtocol\022\020\n\010hostna" +
"me\030\001 \002(\t\022\014\n\004port\030\002 \002(\r\"7\n\021ExceptionProto" +
"col\022\021\n\tclassname\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*" +
"(\n\013CommandType\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020" +
"\002*]\n\027SerializationSchemeType\022\010\n\004JAVA\020\001\022\013" +
"\n\007SBINARY\020\002\022\016\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON",
"\020\004\022\014\n\010PROTOBUF\020\005*-\n\rLifeCycleType\022\r\n\tPER" +
"MANENT\020\001\022\r\n\tTEMPORARY\020\002B\030\n\024akka.remote.p" +
"rotocolH\001"
"\013commandType\030\002 \002(\0162\014.CommandType\"U\n\026Remo" +
"teActorRefProtocol\022\017\n\007address\030\001 \002(\t\022\031\n\021i" +
"netSocketAddress\030\002 \002(\014\022\017\n\007timeout\030\003 \001(\004\"" +
"\323\002\n\032SerializedActorRefProtocol\022\033\n\004uuid\030\001" +
" \002(\0132\r.UuidProtocol\022\017\n\007address\030\002 \002(\t\022\026\n\016" +
"actorClassname\030\003 \002(\t\022\025\n\ractorInstance\030\004 " +
"\001(\014\022\033\n\023serializerClassname\030\005 \001(\t\022\017\n\007time" +
"out\030\006 \001(\004\022\026\n\016receiveTimeout\030\007 \001(\004\022%\n\tlif",
"eCycle\030\010 \001(\0132\022.LifeCycleProtocol\022+\n\nsupe" +
"rvisor\030\t \001(\0132\027.RemoteActorRefProtocol\022\024\n" +
"\014hotswapStack\030\n \001(\014\022(\n\010messages\030\013 \003(\0132\026." +
"RemoteMessageProtocol\"g\n\037SerializedTyped" +
"ActorRefProtocol\022-\n\010actorRef\030\001 \002(\0132\033.Ser" +
"ializedActorRefProtocol\022\025\n\rinterfaceName" +
"\030\002 \002(\t\"r\n\017MessageProtocol\0225\n\023serializati" +
"onScheme\030\001 \002(\0162\030.SerializationSchemeType" +
"\022\017\n\007message\030\002 \002(\014\022\027\n\017messageManifest\030\003 \001" +
"(\014\"R\n\021ActorInfoProtocol\022\033\n\004uuid\030\001 \002(\0132\r.",
"UuidProtocol\022\017\n\007timeout\030\002 \002(\004\022\017\n\007address" +
"\030\003 \001(\t\")\n\014UuidProtocol\022\014\n\004high\030\001 \002(\004\022\013\n\003" +
"low\030\002 \002(\004\"3\n\025MetadataEntryProtocol\022\013\n\003ke" +
"y\030\001 \002(\t\022\r\n\005value\030\002 \002(\014\"6\n\021LifeCycleProto" +
"col\022!\n\tlifeCycle\030\001 \002(\0162\016.LifeCycleType\"1" +
"\n\017AddressProtocol\022\020\n\010hostname\030\001 \002(\t\022\014\n\004p" +
"ort\030\002 \002(\r\"7\n\021ExceptionProtocol\022\021\n\tclassn" +
"ame\030\001 \002(\t\022\017\n\007message\030\002 \002(\t*(\n\013CommandTyp" +
"e\022\013\n\007CONNECT\020\001\022\014\n\010SHUTDOWN\020\002*]\n\027Serializ" +
"ationSchemeType\022\010\n\004JAVA\020\001\022\013\n\007SBINARY\020\002\022\016",
"\n\nSCALA_JSON\020\003\022\r\n\tJAVA_JSON\020\004\022\014\n\010PROTOBU" +
"F\020\005*-\n\rLifeCycleType\022\r\n\tPERMANENT\020\001\022\r\n\tT" +
"EMPORARY\020\002B\030\n\024akka.remote.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@ -5745,7 +5788,7 @@ public final class RemoteProtocol {
internal_static_RemoteActorRefProtocol_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_RemoteActorRefProtocol_descriptor,
new java.lang.String[] { "Address", "Timeout", },
new java.lang.String[] { "Address", "InetSocketAddress", "Timeout", },
akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.class,
akka.remote.protocol.RemoteProtocol.RemoteActorRefProtocol.Builder.class);
internal_static_SerializedActorRefProtocol_descriptor =

View file

@ -52,7 +52,8 @@ enum CommandType {
*/
message RemoteActorRefProtocol {
required string address = 1;
optional uint64 timeout = 2;
required bytes inetSocketAddress = 2;
optional uint64 timeout = 3;
}
/**

View file

@ -25,7 +25,8 @@ import akka.actor.{
LifeCycleMessage
}
import akka.actor.Actor._
import akka.config.Config._
import akka.config.Config
import Config._
import akka.util._
import akka.event.EventHandler
@ -206,7 +207,10 @@ abstract class RemoteClient private[akka] (
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[Promise[T]]): Option[Promise[T]] = {
if (isRunning) {
EventHandler.debug(this, "Sending remote message [%s]".format(request))
if (request.getOneWay) {
try {
val future = currentChannel.write(RemoteEncoder.encode(request))
@ -225,6 +229,7 @@ abstract class RemoteClient private[akka] (
} else throw e
}
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultPromise[T](request.getActorInfo.getTimeout)
@ -254,6 +259,7 @@ abstract class RemoteClient private[akka] (
}
Some(futureResult)
}
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", module, remoteAddress)
notifyListeners(RemoteClientError(exception, module, remoteAddress))
@ -544,16 +550,16 @@ class NettyRemoteSupport extends RemoteSupport with NettyRemoteServerModule with
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(address: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
protected[akka] def actorFor(actorAddress: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
val inetSocketAddress = this.address
if (optimizeLocalScoped_?) {
val home = this.address
if ((host == home.getAddress.getHostAddress || host == home.getHostName) && port == home.getPort) { //TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(address, address)
if ((host == inetSocketAddress.getAddress.getHostAddress || host == inetSocketAddress.getHostName) && port == inetSocketAddress.getPort) { //TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(actorAddress, actorAddress)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
}
RemoteActorRef(address, timeout, loader)
RemoteActorRef(inetSocketAddress, actorAddress, timeout, loader)
}
}
@ -826,7 +832,7 @@ class RemoteServerHandler(
// stop all session actors
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
actor collectionAsScalaIterable(map.values)gddd
) {
try { actor ! PoisonPill } catch { case e: Exception }
}
@ -839,12 +845,16 @@ class RemoteServerHandler(
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = event.getMessage match {
case null throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
case remote: AkkaRemoteProtocol if remote.hasMessage handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
event.getMessage match {
case null
throw new IllegalActorStateException("Message in remote MessageEvent is null: " + event)
case remote: AkkaRemoteProtocol if remote.hasMessage
handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
//case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet)
case _ //ignore
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
event.getChannel.close
@ -857,12 +867,13 @@ class RemoteServerHandler(
case _ None
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) =
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
EventHandler.debug(this, "Received remote message [%s]".format(request))
dispatchToActor(request, channel)
}
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
val actorRef =
try { createActor(actorInfo, channel) } catch {
case e: SecurityException
@ -950,11 +961,16 @@ class RemoteServerHandler(
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match {
EventHandler.debug(this, "Creating an remotely available actor for address [%s] on node [%s]".format(address, Config.nodename))
val actorRef = server.findActorByAddressOrUuid(address, parseUuid(uuid).toString) match {
// the actor has not been registered globally. See if we have it in the session
case null createSessionActor(actorInfo, channel)
case null createSessionActor(actorInfo, channel) // FIXME now session scoped actors are disabled, how to introduce them?
case actorRef actorRef
}
if (actorRef eq null) throw new IllegalActorStateException("Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]")
actorRef
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {

View file

@ -156,6 +156,7 @@ object ActorSerialization {
}
object RemoteActorSerialization {
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
@ -172,23 +173,29 @@ object RemoteActorSerialization {
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
val ref = RemoteActorRef(
RemoteActorRef(
Serializers.Java.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress])).asInstanceOf[InetSocketAddress],
protocol.getAddress,
protocol.getTimeout,
loader)
ref
}
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
def toRemoteActorRefProtocol(actor: ActorRef): RemoteActorRefProtocol = {
actor match {
case ar: LocalActorRef Actor.remote.registerByUuid(ar)
case _ {}
val remoteAddress = actor match {
case ar: RemoteActorRef
ar.remoteAddress
case ar: LocalActorRef
Actor.remote.registerByUuid(ar)
ReflectiveAccess.RemoteModule.configDefaultAddress
case _
ReflectiveAccess.RemoteModule.configDefaultAddress
}
RemoteActorRefProtocol.newBuilder
.setAddress("uuid:" + actor.uuid.toString)
.setInetSocketAddress(ByteString.copyFrom(Serializers.Java.toBinary(remoteAddress)))
.setAddress(actor.address)
.setTimeout(actor.timeout)
.build
}