Merge pull request #103 from jboner/no-uuid

Profit! Removing Uuids from ActorCells and ActorRefs and essentially repl
This commit is contained in:
viktorklang 2011-11-03 11:53:29 -07:00
commit 5efe091ebc
22 changed files with 1682 additions and 6039 deletions

View file

@ -292,11 +292,7 @@ class ActorRefSpec extends AkkaSpec {
val inetAddress = app.defaultAddress
val expectedSerializedRepresentation = SerializedActorRef(
a.uuid,
a.address,
inetAddress.getAddress.getHostAddress,
inetAddress.getPort)
val expectedSerializedRepresentation = new SerializedActorRef(a.address, inetAddress)
import java.io._

View file

@ -6,64 +6,21 @@ package akka.actor
import akka.testkit._
import akka.util.duration._
import akka.testkit.Testing.sleepFor
import java.util.concurrent.{ TimeUnit, CountDownLatch }
object LocalActorRefProviderSpec {
class NewActor extends Actor {
def receive = {
case _ {}
}
}
}
import akka.dispatch.Future
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalActorRefProviderSpec extends AkkaSpec {
import akka.actor.LocalActorRefProviderSpec._
"An LocalActorRefProvider" must {
"only create one instance of an actor with a specific address in a concurrent environment" in {
val provider = app.provider
for (i 0 until 100) { // 100 concurrent runs
val latch = new CountDownLatch(4)
var a1: Option[ActorRef] = None
var a2: Option[ActorRef] = None
var a3: Option[ActorRef] = None
var a4: Option[ActorRef] = None
provider.isInstanceOf[LocalActorRefProvider] must be(true)
(0 until 100) foreach { i // 100 concurrent runs
val address = "new-actor" + i
spawn {
a1 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
}
spawn {
a2 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
}
spawn {
a3 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
}
spawn {
a4 = Some(provider.actorOf(Props(creator = () new NewActor), app.guardian, address))
latch.countDown()
}
latch.await(5, TimeUnit.SECONDS) must be === true
a1.isDefined must be(true)
a2.isDefined must be(true)
a3.isDefined must be(true)
a4.isDefined must be(true)
(a1 == a2) must be(true)
(a1 == a3) must be(true)
(a1 == a4) must be(true)
implicit val timeout = Timeout(30 seconds)
((1 to 4) map { _ Future { provider.actorOf(Props(c { case _ }), app.guardian, address, true) } }).map(_.get).distinct.size must be(1)
}
}
}

View file

@ -234,7 +234,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props): ActorRef = actorOf(p.withCreator(new Actor {
def receive = {
case _
delegates put (self.uuid.toString, "")
delegates put (self.address, "")
latch1.countDown()
}
}))
@ -262,7 +262,7 @@ class ActorPoolSpec extends AkkaSpec {
def instance(p: Props) = actorOf(p.withCreator(new Actor {
def receive = {
case _
delegates put (self.uuid.toString, "")
delegates put (self.address, "")
latch2.countDown()
}
}))

View file

@ -146,17 +146,17 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor
case value value
}
val hostname: String = System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostName
val defaultAddress = new InetSocketAddress(System.getProperty("akka.remote.hostname") match {
case null | "" InetAddress.getLocalHost.getHostAddress
case value value
}
val port: Int = System.getProperty("akka.remote.port") match {
}, System.getProperty("akka.remote.port") match {
case null | "" AkkaConfig.RemoteServerPort
case value value.toInt
}
})
val defaultAddress = new InetSocketAddress(hostname, port)
def hostname: String = defaultAddress.getAddress.getHostAddress
def port: Int = defaultAddress.getPort
// TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(this)

View file

@ -39,7 +39,7 @@ class AkkaException(message: String = "", cause: Throwable = null) extends Runti
object AkkaException {
val hostname = try {
InetAddress.getLocalHost.getHostName
InetAddress.getLocalHost.getHostAddress
} catch {
case e: UnknownHostException "unknown"
}

View file

@ -10,7 +10,6 @@ import scala.annotation.tailrec
import scala.collection.immutable.{ Stack, TreeMap }
import scala.collection.JavaConverters
import java.util.concurrent.{ ScheduledFuture, TimeUnit }
import java.util.{ Collection JCollection, Collections JCollections }
import akka.AkkaApplication
/**
@ -82,8 +81,6 @@ private[akka] class ActorCell(
var actor: Actor = _
def uuid: Uuid = self.uuid
@inline
final def dispatcher: MessageDispatcher = if (props.dispatcher == Props.defaultDispatcher) app.dispatcher else props.dispatcher
@ -363,12 +360,4 @@ private[akka] class ActorCell(
if (a ne null)
lookupAndSetSelfFields(a.getClass, a, newContext)
}
override final def hashCode: Int = HashCode.hash(HashCode.SEED, uuid)
override final def equals(that: Any): Boolean = {
that.isInstanceOf[ActorCell] && that.asInstanceOf[ActorCell].uuid == uuid
}
override final def toString = "ActorCell[%s]".format(uuid)
}

View file

@ -12,6 +12,7 @@ import akka.AkkaApplication
import akka.event.ActorEventBus
import akka.serialization.Serialization
import akka.actor.DeadLetterActorRef.SerializedDeadLetterActorRef
import java.net.InetSocketAddress
/**
* ActorRef is an immutable and serializable handle to an Actor.
@ -276,18 +277,17 @@ trait ScalaActorRef { ref: ActorRef ⇒
protected[akka] def postMessageToMailbox(message: Any, sender: ActorRef): Unit
protected[akka] def restart(cause: Throwable): Unit
private[akka] def uuid: Uuid //TODO FIXME REMOVE THIS
}
/**
* Memento pattern for serializing ActorRefs transparently
*/
case class SerializedActorRef(uuid: Uuid, address: String, hostname: String, port: Int) {
case class SerializedActorRef(address: String, hostname: String, port: Int) {
import akka.serialization.Serialization.app
def this(address: String, inet: InetSocketAddress) = this(address, inet.getAddress.getHostAddress, inet.getPort)
@throws(classOf[java.io.ObjectStreamException])
def readResolve(): AnyRef = {
if (app.value eq null) throw new IllegalStateException(
@ -366,7 +366,8 @@ class DeadLetterActorRef(val app: AkkaApplication) extends MinimalActorRef {
override def isShutdown(): Boolean = true
protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit = app.eventHandler.notify(DeadLetter(message, sender))
protected[akka] override def postMessageToMailbox(message: Any, sender: ActorRef): Unit =
app.eventHandler.notify(DeadLetter(message, sender))
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
app.eventHandler.notify(DeadLetter(message, this))

View file

@ -31,7 +31,7 @@ trait ActorRefProvider {
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef]
private[akka] def serialize(actor: ActorRef): AnyRef
private[akka] def serialize(actor: ActorRef): SerializedActorRef
private[akka] def createDeathWatch(): DeathWatch
@ -160,6 +160,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
// create a routed actor ref
case deploy @ Some(DeploymentConfig.Deploy(_, _, routerType, nrOfInstances, _, DeploymentConfig.LocalScope))
val routerFactory: () Router = DeploymentConfig.routerTypeFor(routerType) match {
case RouterType.Direct () new DirectRouter
case RouterType.Random () new RandomRouter
@ -177,7 +178,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
actorOf(RoutedProps(routerFactory = routerFactory, connectionManager = new LocalConnectionManager(connections)), supervisor, address)
case _ throw new Exception("Don't know how to create this actor ref! Why?")
case unknown throw new Exception("Don't know how to create this actor ref! Why? Got: " + unknown)
}
} catch {
case e: Exception
@ -218,8 +219,7 @@ class LocalActorRefProvider(val app: AkkaApplication) extends ActorRefProvider {
}
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = actorFor(actor.address)
private[akka] def serialize(actor: ActorRef): AnyRef =
SerializedActorRef(actor.uuid, actor.address, app.hostname, app.port)
private[akka] def serialize(actor: ActorRef): SerializedActorRef = new SerializedActorRef(actor.address, app.defaultAddress)
private[akka] def createDeathWatch(): DeathWatch = new LocalDeathWatch

View file

@ -13,6 +13,7 @@ import akka.actor.DeploymentConfig._
import akka.{ AkkaException, AkkaApplication }
import akka.config.{ Configuration, ConfigurationException }
import akka.util.Duration
import java.net.InetSocketAddress
trait ActorDeployer {
private[akka] def init(deployments: Seq[Deploy]): Unit
@ -35,8 +36,6 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
val deploymentConfig = new DeploymentConfig(app)
// val defaultAddress = Node(Config.nodename)
lazy val instance: ActorDeployer = {
val deployer = if (app.reflective.ClusterModule.isEnabled) app.reflective.ClusterModule.clusterDeployer else LocalDeployer
deployer.init(deploymentsInConfig)
@ -73,22 +72,13 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
}
private[akka] def lookupDeploymentFor(address: String): Option[Deploy] = {
val deployment_? = instance.lookupDeploymentFor(address)
if (deployment_?.isDefined && (deployment_?.get ne null)) deployment_?
else {
val newDeployment = try {
lookupInConfig(address)
} catch {
case e: ConfigurationException
app.eventHandler.error(e, this, e.getMessage) //TODO FIXME I do not condone log AND rethrow
throw e
}
newDeployment match {
case None | Some(null) None
case Some(d) deploy(d); newDeployment // deploy and cache it
}
instance.lookupDeploymentFor(address) match {
case s @ Some(d) if d ne null s
case _
lookupInConfig(address) match {
case None | Some(null) None
case s @ Some(d) deploy(d); s // deploy and cache it
}
}
}
@ -248,7 +238,8 @@ class Deployer(val app: AkkaApplication) extends ActorDeployer {
case e: Exception raiseRemoteNodeParsingError()
}
if (port == 0) raiseRemoteNodeParsingError()
RemoteAddress(hostname, port)
val inet = new InetSocketAddress(hostname, port) //FIXME switch to non-ip-tied
RemoteAddress(Option(inet.getAddress).map(_.getHostAddress).getOrElse(hostname), inet.getPort)
}
}

View file

@ -37,7 +37,7 @@ class BalancingDispatcher(
_timeoutMs: Long)
extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = a.uuid.compareTo(b.uuid) }) //new ConcurrentLinkedQueue[ActorCell]()
private val buddies = new ConcurrentSkipListSet[ActorCell](new Comparator[ActorCell] { def compare(a: ActorCell, b: ActorCell) = System.identityHashCode(a) - System.identityHashCode(b) }) //new ConcurrentLinkedQueue[ActorCell]()
protected val messageQueue: MessageQueue = mailboxType match {
case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {

View file

@ -61,7 +61,7 @@ class Dispatchers(val app: AkkaApplication) {
*/
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher(app, null, "anon", MailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.underlying.uuid.toString, MailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.address, MailboxType, DispatcherShutdownMillis)
}
/**
@ -72,7 +72,7 @@ class Dispatchers(val app: AkkaApplication) {
*/
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(app, null, "anon", mailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.underlying.uuid.toString, mailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.address, mailboxType, DispatcherShutdownMillis)
}
/**

View file

@ -21,33 +21,8 @@ import java.lang.reflect.InvocationTargetException
class RemoteException(message: String) extends AkkaException(message)
trait RemoteService {
def server: RemoteSupport
def address: InetSocketAddress
}
trait RemoteModule {
val UUID_PREFIX = "uuid:".intern
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: Any): Unit
private[akka] def actors: ConcurrentHashMap[String, ActorRef] // FIXME need to invalidate this cache on replication
private[akka] def actorsByUuid: ConcurrentHashMap[String, ActorRef] // FIXME remove actorsByUuid map?
private[akka] def actorsFactories: ConcurrentHashMap[String, () ActorRef] // FIXME what to do wit actorsFactories map?
private[akka] def findActorByAddress(address: String): ActorRef = actors.get(address)
private[akka] def findActorByUuid(uuid: String): ActorRef = actorsByUuid.get(uuid)
private[akka] def findActorFactory(address: String): () ActorRef = actorsFactories.get(address)
private[akka] def findActorByAddressOrUuid(address: String, uuid: String): ActorRef = {
var actorRefOrNull = if (address.startsWith(UUID_PREFIX)) findActorByUuid(address.substring(UUID_PREFIX.length))
else findActorByAddress(address)
if (actorRefOrNull eq null) actorRefOrNull = findActorByUuid(uuid)
actorRefOrNull
}
}
/**
@ -145,20 +120,9 @@ abstract class RemoteSupport(val app: AkkaApplication) extends RemoteServerModul
def shutdown() {
this.shutdownClientModule()
this.shutdownServerModule()
clear
}
protected[akka] override def notifyListeners(message: Any): Unit = app.eventHandler.notify(message)
private[akka] val actors = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[akka] val actorsFactories = new ConcurrentHashMap[String, () ActorRef]
def clear {
actors.clear
actorsByUuid.clear
actorsFactories.clear
}
}
/**
@ -177,39 +141,6 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒
*/
def name: String
/**
* Gets the address of the server instance
*/
def address: InetSocketAddress
/**
* Starts the server up
*/
def start(): RemoteServerModule =
start(app.defaultAddress.getAddress.getHostAddress,
app.defaultAddress.getPort,
None)
/**
* Starts the server up
*/
def start(loader: ClassLoader): RemoteServerModule =
start(app.defaultAddress.getAddress.getHostAddress,
app.defaultAddress.getPort,
Option(loader))
/**
* Starts the server up
*/
def start(host: String, port: Int): RemoteServerModule =
start(host, port, None)
/**
* Starts the server up
*/
def start(host: String, port: Int, loader: ClassLoader): RemoteServerModule =
start(host, port, Option(loader))
/**
* Starts the server up
*/
@ -219,68 +150,9 @@ trait RemoteServerModule extends RemoteModule { this: RemoteSupport ⇒
* Shuts the server down
*/
def shutdownServerModule(): Unit
/**
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
*/
def register(actorRef: ActorRef): Unit = register(actorRef.address, actorRef)
/**
* Register Remote Actor by the Actor's uuid field. It starts the Actor if it is not started already.
*/
def registerByUuid(actorRef: ActorRef): Unit
/**
* Register Remote Actor by a specific 'id' passed as argument. The actor is registered by UUID rather than ID
* when prefixing the handle with the uuid: protocol.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(address: String, actorRef: ActorRef): Unit
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(address: String, factory: ActorRef): Unit
/**
* Register Remote Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
* Java API
*/
def registerPerSession(address: String, factory: Creator[ActorRef]): Unit = registerPerSession(address, factory.create)
/**
* Unregister Remote Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef): Unit
/**
* Unregister Remote Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(address: String): Unit
/**
* Unregister Remote Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(address: String): Unit
}
trait RemoteClientModule extends RemoteModule { self: RemoteSupport
def actorFor(address: String, hostname: String, port: Int): ActorRef =
actorFor(address, hostname, port, None)
def actorFor(address: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(address, hostname, port, Some(loader))
/**
* Clean-up all open connections.
*/
@ -298,11 +170,9 @@ trait RemoteClientModule extends RemoteModule { self: RemoteSupport ⇒
/** Methods that needs to be implemented by a transport **/
protected[akka] def actorFor(address: String, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress,
actorRef: ActorRef,
loader: Option[ClassLoader]): Unit
}
protected[akka] def send(message: Any,
senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit
}

View file

@ -4,15 +4,11 @@
package akka.util
import akka.dispatch.Envelope
import akka.config.ModuleNotAvailableException
import akka.actor._
import DeploymentConfig.ReplicationScheme
import akka.config.ModuleNotAvailableException
import akka.event.EventHandler
import akka.cluster.ClusterNode
import akka.remote.{ RemoteSupport, RemoteService }
import akka.routing.{ RoutedProps, Router }
import java.net.InetSocketAddress
import akka.AkkaApplication
object ReflectiveAccess {

View file

@ -88,7 +88,6 @@ private[camel] trait ConsumerMethodEvent extends ConsumerEvent {
val typedActor: AnyRef
val method: Method
val uuid = actorRef.uuid.toString
val methodName = method.getName
val methodUuid = "%s_%s" format (uuid, methodName)

File diff suppressed because it is too large Load diff

View file

@ -20,22 +20,20 @@ message AkkaRemoteProtocol {
* Defines a remote message.
*/
message RemoteMessageProtocol {
required UuidProtocol uuid = 1;
required ActorInfoProtocol actorInfo = 2;
required bool oneWay = 3;
optional MessageProtocol message = 4;
optional ExceptionProtocol exception = 5;
optional UuidProtocol supervisorUuid = 6;
optional RemoteActorRefProtocol sender = 7;
repeated MetadataEntryProtocol metadata = 8;
required ActorRefProtocol recipient = 1;
optional MessageProtocol message = 2;
optional ExceptionProtocol exception = 3;
optional ActorRefProtocol sender = 4;
repeated MetadataEntryProtocol metadata = 5;
}
/**
* Defines some control messages for the remoting
*/
message RemoteControlProtocol {
optional string cookie = 1;
required CommandType commandType = 2;
required CommandType commandType = 1;
optional string cookie = 2;
optional AddressProtocol origin = 3;
}
/**
@ -67,41 +65,10 @@ enum ReplicationStrategyType {
* Defines a remote ActorRef that "remembers" and uses its original Actor instance
* on the original node.
*/
message RemoteActorRefProtocol {
message ActorRefProtocol {
required string address = 1;
required bytes inetSocketAddress = 2;
optional uint64 timeout = 3;
}
/**
* Defines a fully serialized remote ActorRef (with serialized Actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedActorRefProtocol {
required UuidProtocol uuid = 1;
required string address = 2;
required string actorClassname = 3;
optional bytes actorInstance = 4;
optional string serializerClassname = 5;
optional uint64 timeout = 6;
optional uint64 receiveTimeout = 7;
optional LifeCycleProtocol lifeCycle = 8;
optional RemoteActorRefProtocol supervisor = 9;
optional bytes hotswapStack = 10;
optional ReplicationStorageType replicationStorage = 11;
optional ReplicationStrategyType replicationStrategy = 12;
repeated RemoteMessageProtocol messages = 13;
}
/**
* Defines a fully serialized remote ActorRef (with serialized typed actor instance)
* that is about to be instantiated on the remote node. It is fully disconnected
* from its original host.
*/
message SerializedTypedActorRefProtocol {
required SerializedActorRefProtocol actorRef = 1;
required string interfaceName = 2;
required string host = 2;
required uint32 port = 3;
}
/**
@ -112,15 +79,6 @@ message MessageProtocol {
optional bytes messageManifest = 2;
}
/**
* Defines the actor info.
*/
message ActorInfoProtocol {
required UuidProtocol uuid = 1;
required uint64 timeout = 2;
optional string address = 3;
}
/**
* Defines a UUID.
*/
@ -137,32 +95,6 @@ message MetadataEntryProtocol {
required bytes value = 2;
}
/**
* Defines the serialization scheme used to serialize the message and/or Actor instance.
*/
enum SerializationSchemeType {
JAVA = 1;
SBINARY = 2;
SCALA_JSON = 3;
JAVA_JSON = 4;
PROTOBUF = 5;
}
/**
* Defines the type of the life-cycle of a supervised Actor.
*/
enum LifeCycleType {
PERMANENT = 1;
TEMPORARY = 2;
}
/**
* Defines the life-cycle of a supervised Actor.
*/
message LifeCycleProtocol {
required LifeCycleType lifeCycle = 1;
}
/**
* Defines a remote address.
*/
@ -184,10 +116,9 @@ message ExceptionProtocol {
*/
message RemoteSystemDaemonMessageProtocol {
required RemoteSystemDaemonMessageType messageType = 1;
optional UuidProtocol actorUuid = 2;
optional string actorAddress = 3;
optional bytes payload = 5;
optional UuidProtocol replicateActorFromUuid = 6;
optional string actorAddress = 2;
optional bytes payload = 3;
optional UuidProtocol replicateActorFromUuid = 4;
}
/**
@ -214,8 +145,7 @@ enum RemoteSystemDaemonMessageType {
* Defines the durable mailbox message.
*/
message DurableMailboxMessageProtocol {
required string ownerActorAddress= 1;
optional string senderActorAddress = 2;
optional UuidProtocol futureUuid = 3;
required bytes message = 4;
required ActorRefProtocol recipient= 1;
optional ActorRefProtocol sender = 2;
required bytes message = 3;
}

View file

@ -109,7 +109,7 @@ class Gossiper(remote: Remote) {
private val seeds = Set(address) // FIXME read in list of seeds from config
private val scheduler = new DefaultScheduler
private val address = new InetSocketAddress(app.hostname, app.port)
private val address = app.defaultAddress
private val nodeFingerprint = address.##
private val random = SecureRandom.getInstance("SHA1PRNG")

View file

@ -14,12 +14,12 @@ object MessageSerializer {
def deserialize(app: AkkaApplication, messageProtocol: MessageProtocol, classLoader: Option[ClassLoader] = None): AnyRef = {
val clazz = loadManifest(classLoader, messageProtocol)
app.serialization.deserialize(messageProtocol.getMessage.toByteArray,
clazz, classLoader).fold(x throw x, o o)
clazz, classLoader).fold(x throw x, identity)
}
def serialize(app: AkkaApplication, message: AnyRef): MessageProtocol = {
val builder = MessageProtocol.newBuilder
val bytes = app.serialization.serialize(message).fold(x throw x, b b)
val bytes = app.serialization.serialize(message).fold(x throw x, identity)
builder.setMessage(ByteString.copyFrom(bytes))
builder.setMessageManifest(ByteString.copyFromUtf8(message.getClass.getName))
builder.build

View file

@ -13,7 +13,6 @@ import akka.util._
import akka.util.duration._
import akka.util.Helpers._
import akka.actor.DeploymentConfig._
import akka.serialization.{ Serialization, Serializer, Compression }
import akka.serialization.Compression.LZF
import akka.remote.RemoteProtocol._
import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
@ -21,13 +20,14 @@ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._
import java.net.InetSocketAddress
import com.eaio.uuid.UUID
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
/**
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Remote(val app: AkkaApplication) extends RemoteService {
class Remote(val app: AkkaApplication) {
import app._
import app.config
@ -37,9 +37,6 @@ class Remote(val app: AkkaApplication) extends RemoteService {
val shouldCompressData = config.getBool("akka.remote.use-compression", false)
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout", 30), DefaultTimeUnit).toMillis.toInt
val hostname = app.hostname
val port = app.port
val failureDetector = new AccrualFailureDetector(FailureDetectorThreshold, FailureDetectorMaxSampleSize)
// val gossiper = new Gossiper(this)
@ -53,8 +50,7 @@ class Remote(val app: AkkaApplication) extends RemoteService {
OneForOneStrategy(List(classOf[Exception]), None, None))) // is infinite restart what we want?
private[remote] lazy val remoteDaemon =
new LocalActorRef(
app,
app.provider.actorOf(
Props(new RemoteSystemDaemon(this)).withDispatcher(dispatcherFactory.newPinnedDispatcher(remoteDaemonServiceName)),
remoteDaemonSupervisor,
remoteDaemonServiceName,
@ -73,7 +69,6 @@ class Remote(val app: AkkaApplication) extends RemoteService {
lazy val server: RemoteSupport = {
val remote = new akka.remote.netty.NettyRemoteSupport(app)
remote.start(hostname, port)
remote.register(remoteDaemonServiceName, remoteDaemon)
app.eventHandler.addListener(eventStream.sender)
app.eventHandler.addListener(remoteClientLifeCycleHandler)
@ -83,20 +78,11 @@ class Remote(val app: AkkaApplication) extends RemoteService {
remote
}
lazy val address = server.address
def start() {
val triggerLazyServerVal = address.toString
eventHandler.info(this, "Starting remote server on [%s]".format(triggerLazyServerVal))
def start(): Unit = {
val serverAddress = server.app.defaultAddress //Force init of server
val daemonAddress = remoteDaemon.address //Force init of daemon
eventHandler.info(this, "Starting remote server on [%s] and starting remoteDaemon with address [%s]".format(serverAddress, daemonAddress))
}
def uuidProtocolToUuid(uuid: UuidProtocol): UUID = new UUID(uuid.getHigh, uuid.getLow)
def uuidToUuidProtocol(uuid: UUID): UuidProtocol =
UuidProtocol.newBuilder
.setHigh(uuid.getTime)
.setLow(uuid.getClockSeqAndNode)
.build
}
/**
@ -117,7 +103,7 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
def receive: Actor.Receive = {
case message: RemoteSystemDaemonMessageProtocol
eventHandler.debug(this, "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message, nodename))
eventHandler.debug(this, "Received command [\n%s] to RemoteSystemDaemon on [%s]".format(message.getMessageType, nodename))
message.getMessageType match {
case USE handleUse(message)
@ -151,16 +137,12 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
case Right(instance) instance.asInstanceOf[() Actor]
}
val actorAddress = message.getActorAddress
val newActorRef = app.actorOf(Props(creator = actorFactory), actorAddress)
server.register(actorAddress, newActorRef)
app.actorOf(Props(creator = actorFactory), message.getActorAddress)
} else {
eventHandler.error(this, "Actor 'address' for actor to instantiate is not defined, ignoring remote system daemon command [%s]".format(message))
}
sender ! Success(address.toString)
sender ! Success(app.defaultAddress)
} catch {
case error: Throwable //FIXME doesn't seem sensible
sender ! Failure(error)
@ -237,3 +219,88 @@ class RemoteSystemDaemon(remote: Remote) extends Actor {
}
}
}
class RemoteMessage(input: RemoteMessageProtocol, remote: RemoteSupport, classLoader: Option[ClassLoader] = None) {
lazy val sender: ActorRef =
if (input.hasSender)
remote.app.provider.deserialize(
SerializedActorRef(input.getSender.getAddress, input.getSender.getHost, input.getSender.getPort)).getOrElse(throw new IllegalStateException("OHNOES"))
else
remote.app.deadLetters
lazy val recipient: ActorRef = remote.app.findActor(input.getRecipient.getAddress) match {
case None remote.app.deadLetters
case Some(target) target
}
lazy val payload: Either[Throwable, AnyRef] =
if (input.hasException) Left(parseException())
else Right(MessageSerializer.deserialize(remote.app, input.getMessage, classLoader))
protected def parseException(): Throwable = {
val exception = input.getException
val classname = exception.getClassname
try {
val exceptionClass =
if (classLoader.isDefined) classLoader.get.loadClass(classname) else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Exception
remote.app.eventHandler.error(problem, remote, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
override def toString = "RemoteMessage: " + recipient + "(" + input.getRecipient.getAddress + ") from " + sender
}
trait RemoteMarshallingOps {
def app: AkkaApplication
def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder
arp.setMessage(rmp)
arp.build
}
def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder
arp.setInstruction(rcp)
arp.build
}
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = {
val rep = app.provider.serialize(actor)
ActorRefProtocol.newBuilder.setAddress(rep.address).setHost(rep.hostname).setPort(rep.port).build
}
def createRemoteMessageProtocolBuilder(
recipient: Either[ActorRef, ActorRefProtocol],
message: Either[Throwable, Any],
senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = {
val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(recipient.fold(toRemoteActorRefProtocol _, identity))
message match {
case Right(message)
messageBuilder.setMessage(MessageSerializer.serialize(app, message.asInstanceOf[AnyRef]))
case Left(exception)
messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName)
.setMessage(Option(exception.getMessage).getOrElse(""))
.build)
}
if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
messageBuilder
}
def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol =
createMessageSendEnvelope(createRemoteMessageProtocolBuilder(Right(request.getSender), Left(exception), None).build)
}

View file

@ -22,6 +22,7 @@ import java.net.InetSocketAddress
import java.util.concurrent.ConcurrentHashMap
import com.google.protobuf.ByteString
import java.util.concurrent.atomic.AtomicBoolean
/**
* Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it.
@ -65,14 +66,13 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
// case FailureDetectorType.Custom(implClass) FailureDetector.createCustomFailureDetector(implClass)
// }
val thisHostname = remote.address.getHostName
val thisPort = remote.address.getPort
def isReplicaNode: Boolean = remoteAddresses exists { some some.port == app.port && some.hostname == app.hostname }
def isReplicaNode: Boolean = remoteAddresses exists { some some.hostname == thisHostname && some.port == thisPort }
//app.eventHandler.debug(this, "%s: Deploy Remote Actor with address [%s] connected to [%s]: isReplica(%s)".format(app.defaultAddress, address, remoteAddresses.mkString, isReplicaNode))
if (isReplicaNode) {
// we are on one of the replica node for this remote actor
new LocalActorRef(app, props, supervisor, address, false)
local.actorOf(props, supervisor, address, true) //FIXME systemService = true here to bypass Deploy, should be fixed when create-or-get is replaced by get-or-create
} else {
// we are on the single "reference" node uses the remote actors on the replica nodes
@ -147,28 +147,38 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
}
def actorFor(address: String): Option[ActorRef] = actors.get(address) match {
case null None
case null local.actorFor(address)
case actor: ActorRef Some(actor)
case future: Future[_] Some(future.get.asInstanceOf[ActorRef])
}
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
/**
* Returns true if the actor was in the provider's cache and evicted successfully, else false.
*/
private[akka] def evict(address: String): Boolean = actors.remove(address) ne null
private[akka] def serialize(actor: ActorRef): AnyRef = actor match {
case r: RemoteActorRef SerializedActorRef(actor.uuid, actor.address, r.remoteAddress.getAddress.getHostAddress, r.remoteAddress.getPort)
private[akka] def serialize(actor: ActorRef): SerializedActorRef = actor match {
case r: RemoteActorRef new SerializedActorRef(actor.address, r.remoteAddress)
case other local.serialize(actor)
}
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] =
local.actorFor(actor.address) orElse Some(RemoteActorRef(remote.server, new InetSocketAddress(actor.hostname, actor.port), actor.address, None))
private[akka] def deserialize(actor: SerializedActorRef): Option[ActorRef] = {
if (optimizeLocalScoped_? && (actor.hostname == app.hostname || actor.hostname == app.defaultAddress.getHostName) && actor.port == app.port) {
local.actorFor(actor.address)
} else {
val remoteInetSocketAddress = new InetSocketAddress(actor.hostname, actor.port) //FIXME Drop the InetSocketAddresses and use RemoteAddress
app.eventHandler.debug(this, "%s: Creating RemoteActorRef with address [%s] connected to [%s]".format(app.defaultAddress, actor.address, remoteInetSocketAddress))
Some(RemoteActorRef(remote.server, remoteInetSocketAddress, actor.address, None)) //Should it be None here
}
}
/**
* Using (checking out) actor on a specific node.
*/
def useActorOnNode(remoteAddress: InetSocketAddress, actorAddress: String, actorFactory: () Actor) {
app.eventHandler.debug(this, "Instantiating Actor [%s] on node [%s]".format(actorAddress, remoteAddress))
app.eventHandler.debug(this, "[%s] Instantiating Actor [%s] on node [%s]".format(app.defaultAddress, actorAddress, remoteAddress))
val actorFactoryBytes =
app.serialization.serialize(actorFactory) match {
@ -182,7 +192,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider
.setPayload(ByteString.copyFrom(actorFactoryBytes))
.build()
val connectionFactory = () remote.server.actorFor(remote.remoteDaemonServiceName, remoteAddress.getAddress.getHostAddress, remoteAddress.getPort)
val connectionFactory = () deserialize(new SerializedActorRef(remote.remoteDaemonServiceName, remoteAddress)).get
// try to get the connection for the remote address, if not already there then create it
val connection = remoteDaemonConnectionManager.putIfAbsent(remoteAddress, connectionFactory)
@ -234,9 +244,6 @@ private[akka] case class RemoteActorRef private[akka] (
address: String,
loader: Option[ClassLoader])
extends ActorRef with ScalaActorRef {
private[akka] val uuid: Uuid = newUuid
@volatile
private var running: Boolean = true
@ -244,21 +251,19 @@ private[akka] case class RemoteActorRef private[akka] (
protected[akka] def sendSystemMessage(message: SystemMessage): Unit = unsupported
def postMessageToMailbox(message: Any, sender: ActorRef) {
remote.send[Any](message, Option(sender), remoteAddress, this, loader)
}
def postMessageToMailbox(message: Any, sender: ActorRef): Unit = remote.send(message, Option(sender), remoteAddress, this, loader)
def ?(message: Any)(implicit timeout: Timeout): Future[Any] = remote.app.provider.ask(message, this, timeout)
def suspend(): Unit = unsupported
def suspend(): Unit = ()
def resume(): Unit = unsupported
def resume(): Unit = ()
def stop() { //FIXME send the cause as well!
synchronized {
if (running) {
running = false
remote.send[Any](new Terminate(), None, remoteAddress, this, loader)
remote.send(new Terminate(), None, remoteAddress, this, loader)
}
}
}
@ -266,11 +271,11 @@ private[akka] case class RemoteActorRef private[akka] (
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = remote.app.provider.serialize(this)
def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported
def startsMonitoring(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported
def stopsMonitoring(actorRef: ActorRef): ActorRef = unsupported //FIXME Implement
protected[akka] def restart(cause: Throwable): Unit = unsupported
protected[akka] def restart(cause: Throwable): Unit = ()
private def unsupported = throw new UnsupportedOperationException("Not supported for RemoteActorRef")
}

View file

@ -4,7 +4,7 @@
package akka.remote.netty
import akka.actor.{ ActorRef, Uuid, newUuid, uuidFrom, IllegalActorStateException, PoisonPill, AutoReceivedMessage, simpleName }
import akka.actor.{ ActorRef, IllegalActorStateException, AutoReceivedMessage, simpleName }
import akka.remote._
import RemoteProtocol._
import akka.util._
@ -26,27 +26,12 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import akka.AkkaException
import akka.AkkaApplication
import akka.serialization.RemoteActorSerialization
import akka.dispatch.{ Terminate, DefaultPromise, Promise }
import akka.dispatch.{ Terminate }
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
}
object RemoteEncoder {
def encode(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder
arp.setMessage(rmp)
arp.build
}
def encode(rcp: RemoteControlProtocol): AkkaRemoteProtocol = {
val arp = AkkaRemoteProtocol.newBuilder
arp.setInstruction(rcp)
arp.build
}
}
trait NettyRemoteClientModule extends RemoteClientModule {
self: RemoteSupport
@ -55,16 +40,15 @@ trait NettyRemoteClientModule extends RemoteClientModule {
def app: AkkaApplication
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
remoteAddress: InetSocketAddress,
actorRef: ActorRef,
loader: Option[ClassLoader]): Unit =
withClientFor(remoteAddress, loader) { _.send[T](message, senderOption, remoteAddress, actorRef) }
protected[akka] def send(message: Any,
senderOption: Option[ActorRef],
recipientAddress: InetSocketAddress,
recipient: ActorRef,
loader: Option[ClassLoader]): Unit =
withClientFor(recipientAddress, loader) { _.send(message, senderOption, recipient) }
private[akka] def withClientFor[T](
address: InetSocketAddress, loader: Option[ClassLoader])(body: RemoteClient T): T = {
// loader.foreach(MessageSerializer.setClassLoader(_))
val key = RemoteAddress(address)
lock.readLock.lock
try {
@ -135,14 +119,12 @@ abstract class RemoteClient private[akka] (
val app: AkkaApplication,
val remoteSupport: RemoteSupport,
val module: NettyRemoteClientModule,
val remoteAddress: InetSocketAddress) {
val remoteAddress: InetSocketAddress) extends RemoteMarshallingOps {
val name = simpleName(this) + "@" +
remoteAddress.getAddress.getHostAddress + "::" +
remoteAddress.getPort
val serialization = new RemoteActorSerialization(remoteSupport)
private[remote] val runSwitch = new Switch()
private[remote] def isRunning = runSwitch.isOn
@ -158,21 +140,20 @@ abstract class RemoteClient private[akka] (
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send[T](message: Any, senderOption: Option[ActorRef], remoteAddress: InetSocketAddress, actorRef: ActorRef) {
val messageProtocol = serialization.createRemoteMessageProtocolBuilder(Option(actorRef), Left(actorRef.uuid), actorRef.address, app.AkkaConfig.ActorTimeoutMillis, Right(message), senderOption).build
send(messageProtocol)
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef) {
send(createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
}
/**
* Sends the message across the wire
*/
def send[T](request: RemoteMessageProtocol) {
def send(request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY
app.eventHandler.debug(this, "Sending to connection [%s] message [\n%s]".format(remoteAddress, request))
app.eventHandler.debug(this, "Sending to connection [%s] message [%s]".format(remoteAddress, new RemoteMessage(request, remoteSupport)))
// tell
try {
val future = currentChannel.write(RemoteEncoder.encode(request))
val future = currentChannel.write(createMessageSendEnvelope(request))
future.awaitUninterruptibly() //TODO FIXME SWITCH TO NONBLOCKING WRITE
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
@ -229,7 +210,8 @@ class ActiveRemoteClient private[akka] (
def sendSecureCookie(connection: ChannelFuture) {
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
if (SECURE_COOKIE.nonEmpty) handshake.setCookie(SECURE_COOKIE.get)
connection.getChannel.write(RemoteEncoder.encode(handshake.build))
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder().setHostname(app.hostname).setPort(app.port).build)
connection.getChannel.write(createControlEnvelope(handshake.build))
}
def closeChannel(connection: ChannelFuture) = {
@ -372,16 +354,10 @@ class ActiveRemoteClientHandler(
case arp: AkkaRemoteProtocol if arp.hasInstruction
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN akka.dispatch.Future {
client.module.shutdownClientConnection(remoteAddress)
}
case CommandType.SHUTDOWN akka.dispatch.Future { client.module.shutdownClientConnection(remoteAddress) }
}
case arp: AkkaRemoteProtocol if arp.hasMessage
val reply = arp.getMessage
val replyUuid = uuidFrom(reply.getActorInfo.getUuid.getHigh, reply.getActorInfo.getUuid.getLow)
app.eventHandler.debug(this, "Remote client received RemoteMessageProtocol[\n%s]\nTrying to map back to future [%s]".format(reply, replyUuid))
//TODO FIXME DOESN'T DO ANYTHING ANYMORE
case other
@ -443,63 +419,20 @@ class ActiveRemoteClientHandler(
} else app.eventHandler.error(this, "Unexpected exception from downstream in remote client [%s]".format(event))
}
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
val exception = reply.getException
val classname = exception.getClassname
try {
val exceptionClass =
if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
} catch {
case problem: Exception
app.eventHandler.error(problem, this, problem.getMessage)
CannotInstantiateRemoteExceptionDueToRemoteProtocolParsingErrorException(problem, classname, exception.getMessage)
}
}
}
/**
* Provides the implementation of the Netty remote support
*/
class NettyRemoteSupport(_app: AkkaApplication) extends RemoteSupport(_app) with NettyRemoteServerModule with NettyRemoteClientModule {
// Needed for remote testing and switching on/off under run
val optimizeLocal = new AtomicBoolean(true)
def optimizeLocalScoped_?() = optimizeLocal.get
protected[akka] def actorFor(actorAddress: String, host: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
val homeInetSocketAddress = this.address
if (optimizeLocalScoped_?) {
if ((host == homeInetSocketAddress.getAddress.getHostAddress ||
host == homeInetSocketAddress.getHostName) &&
port == homeInetSocketAddress.getPort) {
//TODO: switch to InetSocketAddress.equals?
val localRef = findActorByAddressOrUuid(actorAddress, actorAddress)
if (localRef ne null) return localRef //Code significantly simpler with the return statement
}
}
val remoteInetSocketAddress = new InetSocketAddress(host, port)
app.eventHandler.debug(this,
"Creating RemoteActorRef with address [%s] connected to [%s]"
.format(actorAddress, remoteInetSocketAddress))
RemoteActorRef(this, remoteInetSocketAddress, actorAddress, loader)
}
override def toString = name
}
class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) {
class NettyRemoteServer(val app: AkkaApplication, serverModule: NettyRemoteServerModule, val host: String, val port: Int, val loader: Option[ClassLoader]) extends RemoteMarshallingOps {
val settings = new RemoteServerSettings(app)
import settings._
val serialization = new RemoteActorSerialization(serverModule.remoteSupport)
val name = "NettyRemoteServer@" + host + ":" + port
val address = new InetSocketAddress(host, port)
@ -517,7 +450,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(settings, serialization, name, openChannels, executor, loader, serverModule)
val pipelineFactory = new RemoteServerPipelineFactory(settings, name, openChannels, executor, loader, serverModule)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("backlog", BACKLOG)
bootstrap.setOption("child.tcpNoDelay", true)
@ -537,7 +470,7 @@ class NettyRemoteServer(app: AkkaApplication, serverModule: NettyRemoteServerMod
b.setCookie(SECURE_COOKIE.get)
b.build
}
openChannels.write(RemoteEncoder.encode(shutdownSignal)).awaitUninterruptibly
openChannels.write(createControlEnvelope(shutdownSignal)).awaitUninterruptibly
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources()
@ -577,7 +510,7 @@ trait NettyRemoteServerModule extends RemoteServerModule {
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
app.eventHandler.debug(this, "Starting up remote server on %s:s".format(_hostname, _port))
app.eventHandler.debug(this, "Starting up remote server on [%s:%s]".format(_hostname, _port))
currentServer.set(Some(new NettyRemoteServer(app, this, _hostname, _port, loader)))
}
@ -597,85 +530,6 @@ trait NettyRemoteServerModule extends RemoteServerModule {
}
}
}
/**
* Register RemoteModule Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def register(id: String, actorRef: ActorRef): Unit = guard withGuard {
if (id.startsWith(UUID_PREFIX)) register(id.substring(UUID_PREFIX.length), actorRef, actorsByUuid)
else register(id, actorRef, actors)
}
def registerByUuid(actorRef: ActorRef): Unit = guard withGuard {
register(actorRef.uuid.toString, actorRef, actorsByUuid)
}
private def register[Key](id: Key, actorRef: ActorRef, registry: ConcurrentHashMap[Key, ActorRef]) {
if (_isRunning.isOn)
registry.put(id, actorRef) //TODO change to putIfAbsent
}
/**
* Register RemoteModule Session Actor by a specific 'id' passed as argument.
* <p/>
* NOTE: If you use this method to register your remote actor then you must unregister the actor by this ID yourself.
*/
def registerPerSession(id: String, factory: ActorRef): Unit = synchronized {
registerPerSession(id, () factory, actorsFactories)
}
private def registerPerSession[Key](id: Key, factory: () ActorRef, registry: ConcurrentHashMap[Key, () ActorRef]) {
if (_isRunning.isOn)
registry.put(id, factory) //TODO change to putIfAbsent
}
/**
* Unregister RemoteModule Actor that is registered using its 'id' field (not custom ID).
*/
def unregister(actorRef: ActorRef): Unit = guard withGuard {
if (_isRunning.isOn) {
app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(actorRef.uuid))
actors.remove(actorRef.address, actorRef)
actorsByUuid.remove(actorRef.uuid.toString, actorRef)
}
}
/**
* Unregister RemoteModule Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregister(id: String): Unit = guard withGuard {
if (_isRunning.isOn) {
app.eventHandler.debug(this, "Unregister server side remote actor with id [%s]".format(id))
if (id.startsWith(UUID_PREFIX)) actorsByUuid.remove(id.substring(UUID_PREFIX.length))
else {
val actorRef = actors get id
actorsByUuid.remove(actorRef.uuid.toString, actorRef)
actors.remove(id, actorRef)
}
}
}
/**
* Unregister RemoteModule Actor by specific 'id'.
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterPerSession(id: String) {
if (_isRunning.isOn) {
app.eventHandler.info(this, "Unregistering server side remote actor with id [%s]".format(id))
actorsFactories.remove(id)
}
}
}
/**
@ -683,7 +537,6 @@ trait NettyRemoteServerModule extends RemoteServerModule {
*/
class RemoteServerPipelineFactory(
val settings: RemoteServerSettings,
val serialization: RemoteActorSerialization,
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
@ -699,7 +552,7 @@ class RemoteServerPipelineFactory(
val protobufEnc = new ProtobufEncoder
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(settings, serialization, name, openChannels, loader, server)
val remoteServer = new RemoteServerHandler(settings, name, openChannels, loader, server)
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
@ -716,7 +569,8 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si
case `authenticated` ctx.sendUpstream(event)
case null event.getMessage match {
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction
remoteProtocol.getInstruction.getCookie match {
val instruction = remoteProtocol.getInstruction
instruction.getCookie match {
case `cookie`
ctx.setAttachment(authenticated)
ctx.sendUpstream(event)
@ -737,20 +591,15 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si
@ChannelHandler.Sharable
class RemoteServerHandler(
val settings: RemoteServerSettings,
val serialization: RemoteActorSerialization,
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler {
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with RemoteMarshallingOps {
import settings._
implicit def app = server.app
// applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]()
//Writes the specified message to the specified channel and propagates write errors to listeners
private def write(channel: Channel, payload: AkkaRemoteProtocol) {
channel.write(payload).addListener(
@ -778,55 +627,32 @@ class RemoteServerHandler(
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
app.eventHandler.debug(this, "Remote client [%s] connected to [%s]".format(clientAddress, server.name))
sessionActors.set(event.getChannel(), new ConcurrentHashMap[String, ActorRef]())
server.notifyListeners(RemoteServerClientConnected(server, clientAddress))
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
app.eventHandler.debug(this, "Remote client [%s] disconnected from [%s]".format(clientAddress, server.name))
// stop all session actors
for (
map Option(sessionActors.remove(event.getChannel));
actor collectionAsScalaIterable(map.values)
) {
try {
actor ! PoisonPill
} catch {
case e: Exception app.eventHandler.error(e, this, "Couldn't stop %s".format(actor))
}
}
server.notifyListeners(RemoteServerClientDisconnected(server, clientAddress))
}
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
val clientAddress = getClientAddress(ctx)
app.eventHandler.debug("Remote client [%s] channel closed from [%s]".format(clientAddress, server.name), this)
server.notifyListeners(RemoteServerClientClosed(server, clientAddress))
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
event.getMessage match {
case null
throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]")
case remote: AkkaRemoteProtocol if remote.hasMessage
handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
//case remote: AkkaRemoteProtocol if remote.hasInstruction => RemoteServer cannot receive control messages (yet)
case null throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]")
case remote: AkkaRemoteProtocol if remote.hasMessage handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
case remote: AkkaRemoteProtocol if remote.hasInstruction //Doesn't handle instructions
case _ //ignore
}
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
app.eventHandler.error(event.getCause, this, "Unexpected exception from remote downstream")
event.getChannel.close
server.notifyListeners(RemoteServerError(event.getCause, server))
}
@ -838,101 +664,29 @@ class RemoteServerHandler(
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try {
app.eventHandler.debug(this, "Received remote message [%s]".format(request))
dispatchToActor(request, channel)
try {
val remoteMessage = new RemoteMessage(request, server.remoteSupport, applicationLoader)
val recipient = remoteMessage.recipient
remoteMessage.payload match {
case Left(t) throw t
case Right(r) r match {
case _: Terminate if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop()
case _: AutoReceivedMessage if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m recipient.!(m)(remoteMessage.sender)
}
}
} catch {
case e: SecurityException
app.eventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request))
server.notifyListeners(RemoteServerError(e, server))
}
} catch {
case e: Exception
server.notifyListeners(RemoteServerError(e, server))
app.eventHandler.error(e, this, e.getMessage)
}
private def dispatchToActor(request: RemoteMessageProtocol, channel: Channel) {
val actorInfo = request.getActorInfo
app.eventHandler.debug(this, "Dispatching to remote actor [%s]".format(actorInfo.getUuid))
val actorRef =
try {
actorOf(actorInfo, channel)
} catch {
case e: SecurityException
app.eventHandler.error(e, this, e.getMessage)
write(channel, createErrorReplyMessage(e, request))
server.notifyListeners(RemoteServerError(e, server))
return
}
val sender = if (request.hasSender) serialization.fromProtobufToRemoteActorRef(request.getSender, applicationLoader) else app.deadLetters
MessageSerializer.deserialize(app, request.getMessage) match {
case _: Terminate if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else actorRef.stop()
case _: AutoReceivedMessage if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m actorRef.!(m)(sender)
}
}
/**
* Creates a new instance of the actor with name, uuid and timeout specified as arguments.
*
* If actor already created then just return it from the registry.
*
* Does not start the actor.
*/
private def actorOf(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
app.eventHandler.debug(this,
"Looking up a remotely available actor for address [%s] on node [%s]"
.format(address, app.nodename))
val byAddress = server.actors.get(address) // try actor-by-address
if (byAddress eq null) {
val byUuid = server.actorsByUuid.get(uuid) // try actor-by-uuid
if (byUuid eq null) {
val bySession = createSessionActor(actorInfo, channel) // try actor-by-session
if (bySession eq null) {
throw new IllegalActorStateException(
"Could not find a remote actor with address [" + address + "] or uuid [" + uuid + "]")
} else bySession
} else byUuid
} else byAddress
}
/**
* gets the actor from the session, or creates one if there is a factory for it
*/
private def createSessionActor(actorInfo: ActorInfoProtocol, channel: Channel): ActorRef = {
val uuid = actorInfo.getUuid
val address = actorInfo.getAddress
findSessionActor(address, channel) match {
case null // we dont have it in the session either, see if we have a factory for it
server.findActorFactory(address) match {
case null null
case factory
val actorRef = factory()
sessionActors.get(channel).put(address, actorRef)
actorRef //Start it where's it's created
}
case sessionActor sessionActor
}
}
private def findSessionActor(id: String, channel: Channel): ActorRef =
sessionActors.get(channel) match {
case null null
case map map get id
}
private def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = {
val actorInfo = request.getActorInfo
val messageBuilder = serialization.createRemoteMessageProtocolBuilder(None, Right(request.getUuid), actorInfo.getAddress, actorInfo.getTimeout, Left(exception), None)
if (request.hasSupervisorUuid) messageBuilder.setSupervisorUuid(request.getSupervisorUuid)
RemoteEncoder.encode(messageBuilder.build)
}
protected def parseUuid(protocol: UuidProtocol): Uuid = uuidFrom(protocol.getHigh, protocol.getLow)
}
class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {

View file

@ -1,116 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.serialization
import akka.actor._
import akka.actor.DeploymentConfig._
import akka.dispatch.Envelope
import akka.util.{ ReflectiveAccess, Duration }
import akka.event.EventHandler
import akka.remote._
import RemoteProtocol._
import akka.AkkaApplication
import scala.collection.immutable.Stack
import java.net.InetSocketAddress
import java.util.{ LinkedList, Collections }
import com.google.protobuf.ByteString
import com.eaio.uuid.UUID
class RemoteActorSerialization(remote: RemoteSupport) {
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte]): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, None)
/**
* Deserializes a byte array (Array[Byte]) into an RemoteActorRef instance.
*/
def fromBinaryToRemoteActorRef(bytes: Array[Byte], loader: ClassLoader): ActorRef =
fromProtobufToRemoteActorRef(RemoteActorRefProtocol.newBuilder.mergeFrom(bytes).build, Some(loader))
/**
* Deserializes a RemoteActorRefProtocol Protocol Buffers (protobuf) Message into an RemoteActorRef instance.
*/
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
remote.app.eventHandler.debug(this, "Deserializing RemoteActorRefProtocol to RemoteActorRef:\n %s".format(protocol))
val ref = RemoteActorRef(
remote,
JavaSerializer.fromBinary(protocol.getInetSocketAddress.toByteArray, Some(classOf[InetSocketAddress]), loader).asInstanceOf[InetSocketAddress],
protocol.getAddress,
loader)
remote.app.eventHandler.debug(this, "Newly deserialized RemoteActorRef has uuid: %s".format(ref.uuid))
ref
}
/**
* Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message.
*/
def toRemoteActorRefProtocol(actor: ActorRef): RemoteActorRefProtocol = {
val remoteAddress = actor match {
case ar: RemoteActorRef
ar.remoteAddress
case ar: ActorRef
remote.register(ar) //FIXME stop doing this and delegate to provider.actorFor in the NettyRemoting
remote.app.defaultAddress //FIXME Shouldn't this be the _current_ address of the remoting?
}
remote.app.eventHandler.debug(this, "Register serialized Actor [%s] as remote @ [%s]".format(actor.uuid, remoteAddress))
RemoteActorRefProtocol.newBuilder
.setInetSocketAddress(ByteString.copyFrom(JavaSerializer.toBinary(remoteAddress)))
.setAddress(actor.address)
.setTimeout(remote.app.AkkaConfig.ActorTimeoutMillis)
.build
}
def createRemoteMessageProtocolBuilder(
actorRef: Option[ActorRef],
replyUuid: Either[Uuid, UuidProtocol],
actorAddress: String,
timeout: Long,
message: Either[Throwable, Any],
senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = {
val uuidProtocol = replyUuid match {
case Left(uid) UuidProtocol.newBuilder.setHigh(uid.getTime).setLow(uid.getClockSeqAndNode).build
case Right(protocol) protocol
}
val actorInfoBuilder = ActorInfoProtocol.newBuilder.setUuid(uuidProtocol).setAddress(actorAddress).setTimeout(timeout)
val actorInfo = actorInfoBuilder.build
val messageBuilder = RemoteMessageProtocol.newBuilder
.setUuid({
val messageUuid = newUuid
UuidProtocol.newBuilder.setHigh(messageUuid.getTime).setLow(messageUuid.getClockSeqAndNode).build
})
.setActorInfo(actorInfo)
.setOneWay(true)
message match {
case Right(message)
messageBuilder.setMessage(MessageSerializer.serialize(remote.app, message.asInstanceOf[AnyRef]))
case Left(exception)
messageBuilder.setException(ExceptionProtocol.newBuilder
.setClassname(exception.getClass.getName)
.setMessage(Option(exception.getMessage).getOrElse(""))
.build)
}
if (senderOption.isDefined)
messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get))
messageBuilder
}
}