Profit! Removing Uuids from ActorCells and ActorRefs and essentially replacing the remoting with a new implementation.

This commit is contained in:
Viktor Klang 2011-11-03 14:53:38 +01:00
parent 2f52f43fa8
commit a040a0c54d
21 changed files with 1393 additions and 5859 deletions

View file

@ -292,11 +292,7 @@ class ActorRefSpec extends AkkaSpec {
val inetAddress = app.defaultAddress val inetAddress = app.defaultAddress
val expectedSerializedRepresentation = SerializedActorRef( val expectedSerializedRepresentation = new SerializedActorRef(a.address, inetAddress)
a.uuid,
a.address,
inetAddress.getAddress.getHostAddress,
inetAddress.getPort)
import java.io._ import java.io._

View file

@ -6,65 +6,26 @@ package akka.actor
import akka.testkit._ import akka.testkit._
import akka.util.duration._ import akka.util.duration._
import akka.testkit.Testing.sleepFor import akka.dispatch.Future
import java.util.concurrent.{ TimeUnit, CountDownLatch }
object LocalActorRefProviderSpec {
class NewActor extends Actor {
def receive = {
case _ {}
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalActorRefProviderSpec extends AkkaSpec { class LocalActorRefProviderSpec extends AkkaSpec {
import akka.actor.LocalActorRefProviderSpec._
"An LocalActorRefProvider" must { "An LocalActorRefProvider" must {
"only create one instance of an actor with a specific address in a concurrent environment" in { "only create one instance of an actor with a specific address in a concurrent environment" in {
val provider = app.provider val provider = app.provider
for (i 0 until 100) { // 100 concurrent runs provider.isInstanceOf[LocalActorRefProvider] must be(true)
val latch = new CountDownLatch(4)
var a1: Option[ActorRef] = None implicit val timeout = Timeout(30 seconds)
var a2: Option[ActorRef] = None
var a3: Option[ActorRef] = None
var a4: Option[ActorRef] = None
val address = "new-actor" + i val actors: Seq[Future[ActorRef]] =
(0 until 100) flatMap { i // 100 concurrent runs
spawn { val address = "new-actor" + i
a1 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address)) (1 to 4) map { _ Future { provider.actorOf(Props(c { case _ }), app.guardian, address) } }
latch.countDown()
}
spawn {
a2 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
}
spawn {
a3 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
}
spawn {
a4 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
} }
latch.await(5, TimeUnit.SECONDS) must be === true actors.map(_.get).distinct.size must be(100)
a1.isDefined must be(true)
a2.isDefined must be(true)
a3.isDefined must be(true)
a4.isDefined must be(true)
(a1 == a2) must be(true)
(a1 == a3) must be(true)
(a1 == a4) must be(true)
}
} }
} }
} }

View file

@ -234,7 +234,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props): ActorRef = actorOf(p.withCreator(new Actor { def instance(p: Props): ActorRef = actorOf(p.withCreator(new Actor {
def receive = { def receive = {
case _ case _
delegates put (self.uuid.toString, "") delegates put (self.address, "")
latch1.countDown() latch1.countDown()
} }
})) }))
@ -262,7 +262,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props) = actorOf(p.withCreator(new Actor { def instance(p: Props) = actorOf(p.withCreator(new Actor {
def receive = { def receive = {
case _ case _
delegates put (self.uuid.toString, "") delegates put (self.address, "")
latch2.countDown() latch2.countDown()
} }
})) }))

View file

@ -146,17 +146,17 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
case value value case value value
} }
val hostname: String = System.getProperty("akka.remote.hostname") match { val defaultAddress = new InetSocketAddress(System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostName case null | "" InetAddress.getLocalHost.getHostAddress
case value value case value value
} }, System.getProperty("akka.remote.port") match {
val port: Int = System.getProperty("akka.remote.port") match {
case null | "" AkkaConfig.RemoteServerPort case null | "" AkkaConfig.RemoteServerPort
case value value.toInt case value value.toInt
} })
val defaultAddress = new InetSocketAddress(hostname, port) def hostname: String = defaultAddress.getAddress.getHostAddress
def port: Int = defaultAddress.getPort
// TODO correctly pull its config from the config // TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(this) val dispatcherFactory = new Dispatchers(this)

View file

@ -39,7 +39,7 @@ class AkkaException(message: String = "", cause: Throwable = null) extends Runti
object AkkaException { object AkkaException {
val hostname = try { val hostname = try {
InetAddress.getLocalHost.getHostName InetAddress.getLocalHost.getHostAddress
} catch { } catch {
case e: UnknownHostException "unknown" case e: UnknownHostException "unknown"
} }

View file

@ -10,7 +10,6 @@ import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap } import scala.collection.immutable.{ Stack, TreeMap }
import scala.collection.JavaConverters import scala.collection.JavaConverters
import java.util.concurrent.{ ScheduledFuture, TimeUnit } import java.util.concurrent.{ ScheduledFuture, TimeUnit }
import java.util.{ Collection JCollection, Collections JCollections }
import akka.AkkaApplication import akka.AkkaApplication
/** /**
@ -82,8 +81,6 @@ private[akka] class ActorCell(
var actor: Actor = _ var actor: Actor = _
def uuid: Uuid = self.uuid
@inline @inline
final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher
@ -363,12 +360,4 @@ private[akka] class ActorCell(
if (a ne null) if (a ne null)
lookupAndSetSelfFields(a.getClass, a, newContext) lookupAndSetSelfFields(a.getClass, a, newContext)
} }
override final def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
override final def equals(that: Any): Boolean = {
that.isInstanceOf[ActorCell] && that.asInstanceOf[ActorCell].uuid == uuid
}
override final def toString = "ActorCell[%s]".format(uuid)
} }

View file

@ -12,6 +12,7 @@ import akka.AkkaApplication
import akka.event.ActorEventBus import akka.event.ActorEventBus
import akka.serialization.Serialization import akka.serialization.Serialization
import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef
import java.net.InetSocketAddress
/** /**
* ActorRef is an immutable and serializable handle to an Actor. * ActorRef is an immutable and serializable handle to an Actor.
@ -276,18 +277,17 @@ trait ScalaActorRef { ref: ActorRef ⇒
protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit
protected[akka] def restart(cause: Throwable): Unit protected[akka] def restart(cause: Throwable): Unit
private[akka] def uuid: Uuid //TODO FIXME REMOVE THIS
} }
/** /**
* Memento pattern for serializing ActorRefs transparently * Memento pattern for serializing ActorRefs transparently
*/ */
case class SerializedActorRef(uuid: Uuid, address: String, hostname: String, port: Int) { case class SerializedActorRef(address: String, hostname: String, port: Int) {
import akka.serialization.Serialization.app import akka.serialization.Serialization.app
def this(address: String, inet: InetSocketAddress) = this(address, inet.getAddress.getHostAddress, inet.getPort)
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = { def readResolve(): AnyRef = {
if (app.value eq null) throw new IllegalStateException( if (app.value eq null) throw new IllegalStateException(
@ -366,7 +366,8 @@ class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef {
override def isShutdown(): Boolean = true override def isShutdown(): Boolean = true
protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = app.eventHandler.notify(DeadLetter(message, sender)) protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit =
app.eventHandler.notify(DeadLetter(message, sender))
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
app.eventHandler.notify(DeadLetter(message, this)) app.eventHandler.notify(DeadLetter(message, this))

View file

@ -31,7 +31,7 @@ trait ActorRefProvider {
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef]
private[akka] def serialize(actor: ActorRef): AnyRef private[akka] def serialize(actor: ActorRef): SerializedActorRef
private[akka] def createDeathWatch(): DeathWatch private[akka] def createDeathWatch(): DeathWatch
@ -177,7 +177,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, address) actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, address)
case _ throw new Exception("Don't know how to create this actor ref! Why?") case unknown throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
} }
} catch { } catch {
case e: Exception case e: Exception
@ -218,8 +218,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
} }
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address) private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address)
private[akka] def serialize(actor: ActorRef): AnyRef = private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(actor.address, app.defaultAddress)
SerializedActorRef(actor.uuid, actor.address, app.hostname, app.port)
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch

View file

@ -13,6 +13,7 @@ import akka.actor.DeploymentConfig._
import akka.{ AkkaException, AkkaApplication } import akka.{ AkkaException, AkkaApplication }
import akka.config.{ Configuration, ConfigurationException } import akka.config.{ Configuration, ConfigurationException }
import akka.util.Duration import akka.util.Duration
import java.net.InetSocketAddress
trait ActorDeployer { trait ActorDeployer {
private[akka] def init(deployments: Seq[Deploy]): Unit private[akka] def init(deployments: Seq[Deploy]): Unit
@ -248,7 +249,8 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
case e: Exception raiseRemoteNodeParsingError() case e: Exception raiseRemoteNodeParsingError()
} }
if (port == 0) raiseRemoteNodeParsingError() if (port == 0) raiseRemoteNodeParsingError()
RemoteAddress(hostname, port) val inet = new InetSocketAddress(hostname, port) //FIXME switch to non-ip-tied
RemoteAddress(Option(inet.getAddress).map(_.getHostAddress).getOrElse(hostname), inet.getPort)
} }
} }

View file

@ -37,7 +37,7 @@ class BalancingDispatcher(
_timeoutMs: Long) _timeoutMs: Long)
extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = a.uuid.compareTo(b.uuid) }) //new ConcurrentLinkedQueue[ActorCell]() private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = System.identityHashCode(a) - System.identityHashCode(b) }) //new ConcurrentLinkedQueue[ActorCell]()
protected val messageQueue: MessageQueue = mailboxType match { protected val messageQueue: MessageQueue = mailboxType match {
case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics { case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {

View file

@ -61,7 +61,7 @@ class Dispatchers(val app: AkkaApplication) {
*/ */
def newPinnedDispatcher(actor: LocalActorRef) = actor match { def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher(app, null, "anon", MailboxType, DispatcherShutdownMillis) case null new PinnedDispatcher(app, null, "anon", MailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.underlying.uuid.toString, MailboxType, DispatcherShutdownMillis) case some new PinnedDispatcher(app, some.underlying, some.address, MailboxType, DispatcherShutdownMillis)
} }
/** /**
@ -72,7 +72,7 @@ class Dispatchers(val app: AkkaApplication) {
*/ */
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match { def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(app, null, "anon", mailboxType, DispatcherShutdownMillis) case null new PinnedDispatcher(app, null, "anon", mailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.underlying.uuid.toString, mailboxType, DispatcherShutdownMillis) case some new PinnedDispatcher(app, some.underlying, some.address, mailboxType, DispatcherShutdownMillis)
} }
/** /**

View file

@ -23,31 +23,10 @@ class RemoteException(message: String) extends AkkaException(message)
trait RemoteService { trait RemoteService {
def server: RemoteSupport def server: RemoteSupport
def address: InetSocketAddress
} }
trait RemoteModule { trait RemoteModule {
val UUID_PREFIX = "uuid:".intern
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: Any): Unit protected[akka] def notifyListeners(message: Any): Unit
private[akka] def actors: ConcurrentHashMap[String, ActorRef] // FIXME need to invalidate this cache on replication
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map?
private[akka] def actorsFactories: ConcurrentHashMap[String, () ActorRef] // FIXME what to do wit actorsFactories map?
private[akka] def findActorByAddress(address: String): ActorRef = actors.get(address)
private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid)
private[akka] def findActorFactory(address: String): () ActorRef = actorsFactories.get(address)
private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = {
var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length))
else findActorByAddress(address)
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}
} }
/** /**
@ -145,20 +124,9 @@ abstract class RemoteSupport(val app: AkkaApplication) extends RemoteServerModul
def shutdown() { def shutdown() {
this.shutdownClientModule() this.shutdownClientModule()
this.shutdownServerModule() this.shutdownServerModule()
clear
} }
protected[akka] override def notifyListeners(message: Any): Unit = app.eventHandler.notify(message) protected[akka] override def notifyListeners(message: Any): Unit = app.eventHandler.notify(message)
private[akka] val actors = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsFactories = new ConcurrentHashMap[String, () ActorRef]
def clear {
actors.clear
actorsByUuid.clear
actorsFactories.clear
}
} }
/** /**
@ -177,39 +145,6 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒
*/ */
def name: String def name: String
/**
* Gets the address of the server instance
*/
def address: InetSocketAddress
/**
* Starts the server up
*/
def start(): RemoteServerModule =
start(app.defaultAddress.getAddress.getHostAddress,
app.defaultAddress.getPort,
None)
/**
* Starts the server up
*/
def start(loader: ClassLoader): RemoteServerModule =
start(app.defaultAddress.getAddress.getHostAddress,
app.defaultAddress.getPort,
Option(loader))
/**
* Starts the server up
*/
def start(host: String, port: Int): RemoteServerModule =
start(host, port, None)
/**
* Starts the server up
*/
def start(host: String, port: Int, loader: ClassLoader): RemoteServerModule =
start(host, port, Option(loader))
/** /**
* Starts the server up * Starts the server up
*/ */
@ -219,68 +154,9 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒
* Shuts the server down * Shuts the server down
*/ */
def shutdownServerModule(): Unit def shutdownServerModule(): Unit
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
*/
def register(actorRef: ActorRef): Unit = register(actorRef.address, actorRef)
/**
* Register Remote Actor by the Actor's uuid field. It starts the Actor if it is not started already.
*/
def registerByUuid(actorRef: ActorRef): Unit
/**
* Register Remote Actor by a specific 'id' passed as argument. The actor is registered by UUID rather than ID
* when prefixing the handle with the uuid: protocol.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(address: String, actorRef: ActorRef): Unit
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(address: String, factory: ActorRef): Unit
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
* Java API
*/
def registerPerSession(address: String, factory: Creator[ActorRef]): Unit = registerPerSession(address, factory.create)
/**
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef): Unit
/**
* Unregister Remote Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(address: String): Unit
/**
* Unregister Remote Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(address: String): Unit
} }
trait RemoteClientModule extends RemoteModule { self: RemoteSupport trait RemoteClientModule extends RemoteModule { self: RemoteSupport
def actorFor(address: String, hostname: String, port: Int): ActorRef =
actorFor(address, hostname, port, None)
def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(address, hostname, port, Some(loader))
/** /**
* Clean-up all open connections. * Clean-up all open connections.
*/ */
@ -298,11 +174,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒
/** Methods that needs to be implemented by a transport **/ /** Methods that needs to be implemented by a transport **/
protected[akka] def actorFor(address: String, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef
protected[akka] def send[T](message: Any, protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress, remoteAddress: InetSocketAddress,
actorRef: ActorRef, recipient: ActorRef,
loader: Option[ClassLoader]): Unit loader: Option[ClassLoader]): Unit
} }

View file

@ -88,7 +88,6 @@ private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
val typedActor: AnyRef val typedActor: AnyRef
val method: Method val method: Method
val uuid = actorRef.uuid.toString
val methodName = method.getName val methodName = method.getName
val methodUuid = "%s_%s" format (uuid, methodName) val methodUuid = "%s_%s" format (uuid, methodName)

File diff suppressed because it is too large Load diff

View file

@ -20,14 +20,11 @@ message AkkaRemoteProtocol {
* Defines a remote message. * Defines a remote message.
*/ */
message RemoteMessageProtocol { message RemoteMessageProtocol {
required UuidProtocol uuid = 1; required ActorRefProtocol recipient = 1;
required ActorInfoProtocol actorInfo = 2; optional MessageProtocol message = 2;
required bool oneWay = 3; optional ExceptionProtocol exception = 3;
optional MessageProtocol message = 4; optional ActorRefProtocol sender = 4;
optional ExceptionProtocol exception = 5; repeated MetadataEntryProtocol metadata = 5;
optional UuidProtocol supervisorUuid = 6;
optional RemoteActorRefProtocol sender = 7;
repeated MetadataEntryProtocol metadata = 8;
} }
/** /**
@ -67,41 +64,10 @@ enum ReplicationStrategyType {
* Defines a remote ActorRef that "remembers" and uses its original Actor instance * Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node. * on the original node.
*/ */
message RemoteActorRefProtocol { message ActorRefProtocol {
required string address = 1; required string address = 1;
required bytes inetSocketAddress = 2; required string host = 2;
optional uint64 timeout = 3; required uint32 port = 3;
}
/**
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedActorRefProtocol {
required UuidProtocol uuid = 1;
required string address = 2;
required string actorClassname = 3;
optional bytes actorInstance = 4;
optional string serializerClassname = 5;
optional uint64 timeout = 6;
optional uint64 receiveTimeout = 7;
optional LifeCycleProtocol lifeCycle = 8;
optional RemoteActorRefProtocol supervisor = 9;
optional bytes hotswapStack = 10;
optional ReplicationStorageType replicationStorage = 11;
optional ReplicationStrategyType replicationStrategy = 12;
repeated RemoteMessageProtocol messages = 13;
}
/**
* Defines a fully serialized remote ActorRef (with serialized typed actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedTypedActorRefProtocol {
required SerializedActorRefProtocol actorRef = 1;
required string interfaceName = 2;
} }
/** /**
@ -112,15 +78,6 @@ message MessageProtocol {
optional bytes messageManifest = 2; optional bytes messageManifest = 2;
} }
/**
* Defines the actor info.
*/
message ActorInfoProtocol {
required UuidProtocol uuid = 1;
required uint64 timeout = 2;
optional string address = 3;
}
/** /**
* Defines a UUID. * Defines a UUID.
*/ */
@ -137,32 +94,6 @@ message MetadataEntryProtocol {
required bytes value = 2; required bytes value = 2;
} }
/**
* Defines the serialization scheme used to serialize the message and/or Actor instance.
*/
enum SerializationSchemeType {
JAVA = 1;
SBINARY = 2;
SCALA_JSON = 3;
JAVA_JSON = 4;
PROTOBUF = 5;
}
/**
* Defines the type of the life-cycle of a supervised Actor.
*/
enum LifeCycleType {
PERMANENT = 1;
TEMPORARY = 2;
}
/**
* Defines the life-cycle of a supervised Actor.
*/
message LifeCycleProtocol {
required LifeCycleType lifeCycle = 1;
}
/** /**
* Defines a remote address. * Defines a remote address.
*/ */
@ -184,10 +115,9 @@ message ExceptionProtocol {
*/ */
message RemoteSystemDaemonMessageProtocol { message RemoteSystemDaemonMessageProtocol {
required RemoteSystemDaemonMessageType messageType = 1; required RemoteSystemDaemonMessageType messageType = 1;
optional UuidProtocol actorUuid = 2; optional string actorAddress = 2;
optional string actorAddress = 3; optional bytes payload = 3;
optional bytes payload = 5; optional UuidProtocol replicateActorFromUuid = 4;
optional UuidProtocol replicateActorFromUuid = 6;
} }
/** /**
@ -214,8 +144,7 @@ enum RemoteSystemDaemonMessageType {
* Defines the durable mailbox message. * Defines the durable mailbox message.
*/ */
message DurableMailboxMessageProtocol { message DurableMailboxMessageProtocol {
required string ownerActorAddress= 1; required ActorRefProtocol recipient= 1;
optional string senderActorAddress = 2; optional ActorRefProtocol sender = 2;
optional UuidProtocol futureUuid = 3; required bytes message = 3;
required bytes message = 4;
} }

View file

@ -109,7 +109,7 @@ class Gossiper(remote: Remote) {
private val seeds = Set(address) // FIXME read in list of seeds from config private val seeds = Set(address) // FIXME read in list of seeds from config
private val scheduler = new DefaultScheduler private val scheduler = new DefaultScheduler
private val address = new InetSocketAddress(app.hostname, app.port) private val address = app.defaultAddress
private val nodeFingerprint = address.## private val nodeFingerprint = address.##
private val random = SecureRandom.getInstance("SHA1PRNG") private val random = SecureRandom.getInstance("SHA1PRNG")

View file

@ -14,12 +14,12 @@ object MessageSerializer {
def deserialize(app: AkkaApplication, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = { def deserialize(app: AkkaApplication, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
val clazz = loadManifest(classLoader, messageProtocol) val clazz = loadManifest(classLoader, messageProtocol)
app.serialization.deserialize(messageProtocol.getMessage.toByteArray, app.serialization.deserialize(messageProtocol.getMessage.toByteArray,
clazz, classLoader).fold(x throw x, o o) clazz, classLoader).fold(x throw x, identity)
} }
def serialize(app: AkkaApplication, message: AnyRef): MessageProtocol = { def serialize(app: AkkaApplication, message: AnyRef): MessageProtocol = {
val builder = MessageProtocol.newBuilder val builder = MessageProtocol.newBuilder
val bytes = app.serialization.serialize(message).fold(x throw x, b b) val bytes = app.serialization.serialize(message).fold(x throw x, identity)
builder.setMessage(ByteString.copyFrom(bytes)) builder.setMessage(ByteString.copyFrom(bytes))
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName)) builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.build builder.build

View file

@ -13,7 +13,6 @@ import akka.util._
import akka.util.duration._ import akka.util.duration._
import akka.util.Helpers._ import akka.util.Helpers._
import akka.actor.DeploymentConfig._ import akka.actor.DeploymentConfig._
import akka.serialization.{ Serialization, Serializer, Compression }
import akka.serialization.Compression.LZF import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
@ -21,6 +20,7 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress import java.net.InetSocketAddress
import com.eaio.uuid.UUID import com.eaio.uuid.UUID
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
/** /**
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc. * Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
@ -37,9 +37,6 @@ class Remote(val app: AkkaApplication) extends RemoteService {
val shouldCompressData = config.getBool("akka.remote.use-compression", false) val shouldCompressData = config.getBool("akka.remote.use-compression", false)
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
val hostname = app.hostname
val port = app.port
val failureDetector = new AccrualFailureDetector(FailureDetectorThreshold, FailureDetectorMaxSampleSize) val failureDetector = new AccrualFailureDetector(FailureDetectorThreshold, FailureDetectorMaxSampleSize)
// val gossiper = new Gossiper(this) // val gossiper = new Gossiper(this)
@ -53,8 +50,7 @@ class Remote(val app: AkkaApplication) extends RemoteService {
OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want? OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want?
private[remote] lazy val remoteDaemon = private[remote] lazy val remoteDaemon =
new LocalActorRef( app.provider.actorOf(
app,
Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)), Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)),
remoteDaemonSupervisor, remoteDaemonSupervisor,
remoteDaemonServiceName, remoteDaemonServiceName,
@ -73,7 +69,6 @@ class Remote(val app: AkkaApplication) extends RemoteService {
lazy val server: RemoteSupport = { lazy val server: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport(app) val remote = new akka.remote.netty.NettyRemoteSupport(app)
remote.start(hostname, port) remote.start(hostname, port)
remote.register(remoteDaemonServiceName, remoteDaemon)
app.eventHandler.addListener(eventStream.sender) app.eventHandler.addListener(eventStream.sender)
app.eventHandler.addListener(remoteClientLifeCycleHandler) app.eventHandler.addListener(remoteClientLifeCycleHandler)
@ -83,20 +78,11 @@ class Remote(val app: AkkaApplication) extends RemoteService {
remote remote
} }
lazy val address = server.address def start(): Unit = {
val serverAddress = server.app.defaultAddress //Force init of server
def start() { val daemonAddress = remoteDaemon.address //Force init of daemon
val triggerLazyServerVal = address.toString eventHandler.info(this, "Starting remote server on [%s] and starting remoteDaemon with address [%s]".format(serverAddress, daemonAddress))
eventHandler.info(this, "Starting remote server on [%s]".format(triggerLazyServerVal))
} }
def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow)
def uuidToUuidProtocol(uuid: UUID): UuidProtocol =
UuidProtocol.newBuilder
.setHigh(uuid.getTime)
.setLow(uuid.getClockSeqAndNode)
.build
} }
/** /**
@ -117,7 +103,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
def receive: Actor.Receive = { def receive: Actor.Receive = {
case message: RemoteSystemDaemonMessageProtocol case message: RemoteSystemDaemonMessageProtocol
eventHandler.debug(this, "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message, nodename)) eventHandler.debug(this, "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message.getMessageType, nodename))
message.getMessageType match { message.getMessageType match {
case USE handleUse(message) case USE handleUse(message)
@ -151,16 +137,12 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
case Right(instance) instance.asInstanceOf[() Actor] case Right(instance) instance.asInstanceOf[() Actor]
} }
val actorAddress = message.getActorAddress app.actorOf(Props(creator = actorFactory), message.getActorAddress)
val newActorRef = app.actorOf(Props(creator = actorFactory), actorAddress)
server.register(actorAddress, newActorRef)
} else { } else {
eventHandler.error(this, "Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [%s]".format(message)) eventHandler.error(this, "Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [%s]".format(message))
} }
sender ! Success(address.toString) sender ! Success(app.defaultAddress)
} catch { } catch {
case error: Throwable //FIXME doesn't seem sensible case error: Throwable //FIXME doesn't seem sensible
sender ! Failure(error) sender ! Failure(error)
@ -237,3 +219,38 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
} }
} }
} }
class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLoader: Option[ClassLoader] = None) {
lazy val sender: ActorRef =
if (input.hasSender)
remote.app.provider.deserialize(
SerializedActorRef(input.getSender.getAddress, input.getSender.getHost, input.getSender.getPort)).getOrElse(throw new IllegalStateException("OHNOES"))
else
remote.app.deadLetters
lazy val recipient: ActorRef = remote.app.findActor(input.getRecipient.getAddress) match {
case None remote.app.deadLetters
case Some(target) target
}
lazy val payload: Either[Throwable, AnyRef] =
if (input.hasException) Left(parseException())
else Right(MessageSerializer.deserialize(remote.app, input.getMessage, classLoader))
protected def parseException(): Throwable = {
val exception = input.getException
val classname = exception.getClassname
try {
val exceptionClass =
if (classLoader.isDefined) classLoader.get.loadClass(classname) else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Exception
remote.app.eventHandler.error(problem, remote, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getAddress + ") from " + sender
}

View file

@ -22,6 +22,7 @@ import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.ConcurrentHashMap
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicBoolean
/** /**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -65,14 +66,13 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
// case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass) // case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
// } // }
val thisHostname = remote.address.getHostName def isReplicaNode: Boolean = remoteAddresses exists { some some.port == app.port && some.hostname == app.hostname }
val thisPort = remote.address.getPort
def isReplicaNode: Boolean = remoteAddresses exists { some some.hostname == thisHostname && some.port == thisPort } //app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
if (isReplicaNode) { if (isReplicaNode) {
// we are on one of the replica node for this remote actor // we are on one of the replica node for this remote actor
new LocalActorRef(app, props, supervisor, address, false) local.actorOf(props, supervisor, address, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
} else { } else {
// we are on the single "reference" node uses the remote actors on the replica nodes // we are on the single "reference" node uses the remote actors on the replica nodes
@ -147,28 +147,38 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
} }
def actorFor(address: String): Option[ActorRef] = actors.get(address) match { def actorFor(address: String): Option[ActorRef] = actors.get(address) match {
case null None case null local.actorFor(address)
case actor: ActorRef Some(actor) case actor: ActorRef Some(actor)
case future: Future[_] Some(future.get.asInstanceOf[ActorRef]) case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
} }
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
/** /**
* Returns true if the actor was in the provider's cache and evicted successfully, else false. * Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/ */
private[akka] def evict(address: String): Boolean = actors.remove(address) ne null private[akka] def evict(address: String): Boolean = actors.remove(address) ne null
private[akka] def serialize(actor: ActorRef): AnyRef = actor match { private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match {
case r: RemoteActorRef SerializedActorRef(actor.uuid, actor.address, r.remoteAddress.getAddress.getHostAddress, r.remoteAddress.getPort) case r: RemoteActorRef new SerializedActorRef(actor.address, r.remoteAddress)
case other local.serialize(actor) case other local.serialize(actor)
} }
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
local.actorFor(actor.address) orElse Some(RemoteActorRef(remote.server, new InetSocketAddress(actor.hostname, actor.port), actor.address, None)) if (optimizeLocalScoped_? && (actor.hostname == app.hostname || actor.hostname == app.defaultAddress.getHostName) && actor.port == app.port) {
local.actorFor(actor.address)
} else {
val remoteInetSocketAddress = new InetSocketAddress(actor.hostname, actor.port) //FIXME Drop the InetSocketAddresses and use RemoteAddress
app.eventHandler.debug(this, "%s: Creating RemoteActorRef with address [%s] connected to [%s]".format(app.defaultAddress, actor.address, remoteInetSocketAddress))
Some(RemoteActorRef(remote.server, remoteInetSocketAddress, actor.address, None)) //Should it be None here
}
}
/** /**
* Using (checking out) actor on a specific node. * Using (checking out) actor on a specific node.
*/ */
def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () Actor) { def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () Actor) {
app.eventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress)) app.eventHandler.debug(this, "[%s] Instantiating Actor [%s] on node [%s]".format(app.defaultAddress, actorAddress, remoteAddress))
val actorFactoryBytes = val actorFactoryBytes =
app.serialization.serialize(actorFactory) match { app.serialization.serialize(actorFactory) match {
@ -182,7 +192,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
.setPayload(ByteString.copyFrom(actorFactoryBytes)) .setPayload(ByteString.copyFrom(actorFactoryBytes))
.build() .build()
val connectionFactory = () remote.server.actorFor(remote.remoteDaemonServiceName, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort) val connectionFactory = () deserialize(new SerializedActorRef(remote.remoteDaemonServiceName, remoteAddress)).get
// try to get the connection for the remote address, if not already there then create it // try to get the connection for the remote address, if not already there then create it
val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory) val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory)
@ -234,9 +244,6 @@ private[akka] case class RemoteActorRef private[akka] (
address: String, address: String,
loader: Option[ClassLoader]) loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef { extends ActorRef with ScalaActorRef {
private[akka] val uuid: Uuid = newUuid
@volatile @volatile
private var running: Boolean = true private var running: Boolean = true
@ -250,9 +257,9 @@ private[akka] case class RemoteActorRef private[akka] (
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout) def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
def suspend(): Unit = unsupported def suspend(): Unit = ()
def resume(): Unit = unsupported def resume(): Unit = ()
def stop() { //FIXME send the cause as well! def stop() { //FIXME send the cause as well!
synchronized { synchronized {
@ -266,11 +273,11 @@ private[akka] case class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException]) @throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = remote.app.provider.serialize(this) private def writeReplace(): AnyRef = remote.app.provider.serialize(this)
def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
protected[akka] def restart(cause: Throwable): Unit = unsupported protected[akka] def restart(cause: Throwable): Unit = ()
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef") private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
} }

View file

@ -57,14 +57,13 @@ trait NettyRemoteClientModule extends RemoteClientModule {
protected[akka] def send[T](message: Any, protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef], senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress, recipientAddress: InetSocketAddress,
actorRef: ActorRef, recipient: ActorRef,
loader: Option[ClassLoader]): Unit = loader: Option[ClassLoader]): Unit =
withClientFor(remoteAddress, loader) { _.send[T](message, senderOption, remoteAddress, actorRef) } withClientFor(recipientAddress, loader) { _.send[T](message, senderOption, recipient) }
private[akka] def withClientFor[T]( private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient T): T = { address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient T): T = {
// loader.foreach(MessageSerializer.setClassLoader(_))
val key = RemoteAddress(address) val key = RemoteAddress(address)
lock.readLock.lock lock.readLock.lock
try { try {
@ -158,9 +157,8 @@ abstract class RemoteClient private[akka] (
/** /**
* Converts the message to the wireprotocol and sends the message across the wire * Converts the message to the wireprotocol and sends the message across the wire
*/ */
def send[T](message: Any, senderOption: Option[ActorRef], remoteAddress: InetSocketAddress, actorRef: ActorRef) { def send[T](message: Any, senderOption: Option[ActorRef], recipient: ActorRef) {
val messageProtocol = serialization.createRemoteMessageProtocolBuilder(Option(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), senderOption).build send(serialization.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
send(messageProtocol)
} }
/** /**
@ -168,7 +166,7 @@ abstract class RemoteClient private[akka] (
*/ */
def send[T](request: RemoteMessageProtocol) { def send[T](request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY if (isRunning) { //TODO FIXME RACY
app.eventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request)) app.eventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, new RemoteMessage(request, remoteSupport)))
// tell // tell
try { try {
@ -378,10 +376,6 @@ class ActiveRemoteClientHandler(
} }
case arp: AkkaRemoteProtocol if arp.hasMessage case arp: AkkaRemoteProtocol if arp.hasMessage
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
app.eventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid))
//TODO FIXME DOESN'T DO ANYTHING ANYMORE //TODO FIXME DOESN'T DO ANYTHING ANYMORE
case other case other
@ -443,54 +437,13 @@ class ActiveRemoteClientHandler(
} else app.eventHandler.error(this, "Unexpected exception from downstream in remote client [%s]".format(event)) } else app.eventHandler.error(this, "Unexpected exception from downstream in remote client [%s]".format(event))
} }
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
val exception = reply.getException
val classname = exception.getClassname
try {
val exceptionClass =
if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Exception
app.eventHandler.error(problem, this, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
} }
/** /**
* Provides the implementation of the Netty remote support * Provides the implementation of the Netty remote support
*/ */
class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with NettyRemoteServerModule with NettyRemoteClientModule { class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with NettyRemoteServerModule with NettyRemoteClientModule {
override def toString = name
// Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(actorAddress: String, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
val homeInetSocketAddress = this.address
if (optimizeLocalScoped_?) {
if ((host == homeInetSocketAddress.getAddress.getHostAddress ||
host == homeInetSocketAddress.getHostName) &&
port == homeInetSocketAddress.getPort) {
//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(actorAddress, actorAddress)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
}
val remoteInetSocketAddress = new InetSocketAddress(host, port)
app.eventHandler.debug(this,
"Creating RemoteActorRef with address [%s] connected to [%s]"
.format(actorAddress, remoteInetSocketAddress))
RemoteActorRef(this, remoteInetSocketAddress, actorAddress, loader)
}
} }
class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) { class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
@ -577,7 +530,7 @@ trait NettyRemoteServerModule extends RemoteServerModule {
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard { def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try { try {
_isRunning switchOn { _isRunning switchOn {
app.eventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port)) app.eventHandler.debug(this, "Starting up remote server on [%s:%s]".format(_hostname, _port))
currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader))) currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader)))
} }
@ -597,85 +550,6 @@ trait NettyRemoteServerModule extends RemoteServerModule {
} }
} }
} }
/**
* Register RemoteModule Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actorRef: ActorRef): Unit = guard withGuard {
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
else register(id, actorRef, actors)
}
def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef.uuid.toString, actorRef, actorsByUuid)
}
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning.isOn)
registry.put(id, actorRef) //TODO change to putIfAbsent
}
/**
* Register RemoteModule Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(id: String, factory: ActorRef): Unit = synchronized {
registerPerSession(id, () factory, actorsFactories)
}
private def registerPerSession[Key](id: Key, factory: () ActorRef, registry: ConcurrentHashMap[Key, () ActorRef]) {
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
/**
* Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid))
actors.remove(actorRef.address, actorRef)
actorsByUuid.remove(actorRef.uuid.toString, actorRef)
}
}
/**
* Unregister RemoteModule Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id))
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
val actorRef = actors get id
actorsByUuid.remove(actorRef.uuid.toString, actorRef)
actors.remove(id, actorRef)
}
}
}
/**
* Unregister RemoteModule Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(id: String) {
if (_isRunning.isOn) {
app.eventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id))
actorsFactories.remove(id)
}
}
} }
/** /**
@ -747,10 +621,6 @@ class RemoteServerHandler(
implicit def app = server.app implicit def app = server.app
// applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]()
//Writes the specified message to the specified channel and propagates write errors to listeners //Writes the specified message to the specified channel and propagates write errors to listeners
private def write(channel: Channel, payload: AkkaRemoteProtocol) { private def write(channel: Channel, payload: AkkaRemoteProtocol) {
channel.write(payload).addListener( channel.write(payload).addListener(
@ -778,48 +648,26 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
app.eventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name)) app.eventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name))
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress)) server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
} }
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
app.eventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name)) app.eventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name))
// stop all session actors
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try {
actor ! PoisonPill
} catch {
case e: Exception app.eventHandler.error(e, this, "Couldn't stop %s".format(actor))
}
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress)) server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
} }
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx) val clientAddress = getClientAddress(ctx)
app.eventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this) app.eventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this)
server.notifyListeners(RemoteServerClientClosed(server, clientAddress)) server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
} }
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
event.getMessage match { event.getMessage match {
case null case null throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]")
throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]") case remote: AkkaRemoteProtocol if remote.hasMessage handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
case remote: AkkaRemoteProtocol if remote.hasMessage
handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
//case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet) //case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet)
case _ //ignore case _ //ignore
} }
} }
@ -838,101 +686,32 @@ class RemoteServerHandler(
} }
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try { private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try {
app.eventHandler.debug(this, "Received remote message [%s]".format(request)) try {
dispatchToActor(request, channel) val remoteMessage = new RemoteMessage(request, server.remoteSupport, applicationLoader)
val recipient = remoteMessage.recipient
remoteMessage.payload match {
case Left(t) throw t
case Right(r) r match {
case _: Terminate if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop()
case _: AutoReceivedMessage if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m recipient.!(m)(remoteMessage.sender)
}
}
} catch {
case e: SecurityException
app.eventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request))
server.notifyListeners(RemoteServerError(e, server))
}
} catch { } catch {
case e: Exception case e: Exception
server.notifyListeners(RemoteServerError(e, server)) server.notifyListeners(RemoteServerError(e, server))
app.eventHandler.error(e, this, e.getMessage) app.eventHandler.error(e, this, e.getMessage)
} }
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) { private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol =
val actorInfo = request.getActorInfo RemoteEncoder.encode(serialization.createRemoteMessageProtocolBuilder(Right(request.getSender), Left(exception), None).build)
app.eventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid))
val actorRef =
try {
actorOf(actorInfo, channel)
} catch {
case e: SecurityException
app.eventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request))
server.notifyListeners(RemoteServerError(e, server))
return
}
val sender = if (request.hasSender) serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader) else app.deadLetters
MessageSerializer.deserialize(app, request.getMessage) match {
case _: Terminate if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop()
case _: AutoReceivedMessage if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m actorRef.!(m)(sender)
}
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
* If actor already created then just return it from the registry.
*
* Does not start the actor.
*/
private def actorOf(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
app.eventHandler.debug(this,
"Looking up a remotely available actor for address [%s] on node [%s]"
.format(address, app.nodename))
val byAddress = server.actors.get(address) // try actor-by-address
if (byAddress eq null) {
val byUuid = server.actorsByUuid.get(uuid) // try actor-by-uuid
if (byUuid eq null) {
val bySession = createSessionActor(actorInfo, channel) // try actor-by-session
if (bySession eq null) {
throw new IllegalActorStateException(
"Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]")
} else bySession
} else byUuid
} else byAddress
}
/**
* gets the actor from the session, or creates one if there is a factory for it
*/
private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
findSessionActor(address, channel) match {
case null // we dont have it in the session either, see if we have a factory for it
server.findActorFactory(address) match {
case null null
case factory
val actorRef = factory()
sessionActors.get(channel).put(address, actorRef)
actorRef //Start it where's it's created
}
case sessionActor sessionActor
}
}
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null null
case map map get id
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
val actorInfo = request.getActorInfo
val messageBuilder = serialization.createRemoteMessageProtocolBuilder(None, Right(request.getUuid), actorInfo.getAddress, actorInfo.getTimeout, Left(exception), None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
RemoteEncoder.encode(messageBuilder.build)
}
protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh, protocol.getLow)
} }
class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) { class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {

View file

@ -6,17 +6,10 @@ package akka.serialization
import akka.actor._ import akka.actor._
import akka.actor.DeploymentConfig._ import akka.actor.DeploymentConfig._
import akka.dispatch.Envelope
import akka.util.{ ReflectiveAccess, Duration }
import akka.event.EventHandler
import akka.remote._ import akka.remote._
import RemoteProtocol._ import RemoteProtocol._
import akka.AkkaApplication
import scala.collection.immutable.Stack
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.{ LinkedList, Collections }
import com.google.protobuf.ByteString import com.google.protobuf.ByteString
@ -24,79 +17,20 @@ import com.eaio.uuid.UUID
class RemoteActorSerialization(remote: RemoteSupport) { class RemoteActorSerialization(remote: RemoteSupport) {
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
remote.app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol))
val ref = RemoteActorRef(
remote,
JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress],
protocol.getAddress,
loader)
remote.app.eventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid))
ref
}
/** /**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/ */
def toRemoteActorRefProtocol(actor: ActorRef): RemoteActorRefProtocol = { def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = {
val remoteAddress = actor match { val rep = remote.app.provider.serialize(actor)
case ar: RemoteActorRef ActorRefProtocol.newBuilder.setAddress(rep.address).setHost(rep.hostname).setPort(rep.port).build
ar.remoteAddress
case ar: ActorRef
remote.register(ar) //FIXME stop doing this and delegate to provider.actorFor in the NettyRemoting
remote.app.defaultAddress //FIXME Shouldn't this be the _current_ address of the remoting?
}
remote.app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress))
RemoteActorRefProtocol.newBuilder
.setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress)))
.setAddress(actor.address)
.setTimeout(remote.app.AkkaConfig.ActorTimeoutMillis)
.build
} }
def createRemoteMessageProtocolBuilder( def createRemoteMessageProtocolBuilder(
actorRef: Option[ActorRef], recipient: Either[ActorRef, ActorRefProtocol],
replyUuid: Either[Uuid, UuidProtocol],
actorAddress: String,
timeout: Long,
message: Either[Throwable, Any], message: Either[Throwable, Any],
senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = {
val uuidProtocol = replyUuid match { val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(recipient.fold(toRemoteActorRefProtocol _, identity))
case Left(uid) UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build
case Right(protocol) protocol
}
val actorInfoBuilder = ActorInfoProtocol.newBuilder.setUuid(uuidProtocol).setAddress(actorAddress).setTimeout(timeout)
val actorInfo = actorInfoBuilder.build
val messageBuilder = RemoteMessageProtocol.newBuilder
.setUuid({
val messageUuid = newUuid
UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build
})
.setActorInfo(actorInfo)
.setOneWay(true)
message match { message match {
case Right(message) case Right(message)
@ -108,8 +42,7 @@ class RemoteActorSerialization(remote: RemoteSupport) {
.build) .build)
} }
if (senderOption.isDefined) if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
messageBuilder messageBuilder
} }