Got API in place now and RemoteServer/Client/Node etc purged. Need to get test-compile to work so I can start testing the new stuff...

This commit is contained in:
Viktor Klang 2010-12-15 17:52:31 +01:00
parent 74f5445708
commit 5f651c73ba
16 changed files with 837 additions and 844 deletions

View file

@ -126,7 +126,7 @@ object Actor extends Logging {
private[actor] val actorRefInCreation = new scala.util.DynamicVariable[Option[ActorRef]](None)
/**
/**
* Creates an ActorRef out of the Actor with type T.
* <pre>
* import Actor._
@ -140,7 +140,8 @@ object Actor extends Logging {
* val actor = actorOf[MyActor].start
* </pre>
*/
def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
def actorOf[T <: Actor : Manifest]: ActorRef =
ActorRegistry.actorOf[T]
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
@ -158,7 +159,7 @@ object Actor extends Logging {
* </pre>
*/
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port)
ActorRegistry.actorOf[T](host,port)
/**
* Creates an ActorRef out of the Actor of the specified Class.
@ -174,15 +175,8 @@ object Actor extends Logging {
* val actor = actorOf(classOf[MyActor]).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(() => {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
})
def actorOf(clazz: Class[_ <: Actor]): ActorRef =
ActorRegistry.actorOf(clazz)
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
@ -199,23 +193,8 @@ object Actor extends Logging {
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = TIMEOUT): ActorRef = {
import ReflectiveAccess._
import ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT}
ensureRemotingEnabled
(host,port) match {
case null => throw new IllegalArgumentException("No location specified")
case (HOSTNAME, PORT) => actorOf(clazz) //Local
case _ => new RemoteActorRef(clazz.getName,
clazz.getName,
host,
port,
timeout,
true, //Client managed
None)
}
}
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef =
ActorRegistry.actorOf(clazz, host, port, timeout)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
@ -235,7 +214,7 @@ object Actor extends Logging {
* val actor = actorOf(new MyActor).start
* </pre>
*/
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(() => factory)
def actorOf(factory: => Actor): ActorRef = ActorRegistry.actorOf(factory)
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
@ -252,15 +231,9 @@ object Actor extends Logging {
* }
* </pre>
*/
def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
case object Spawn
actorOf(new Actor() {
self.dispatcher = dispatcher
def receive = {
case Spawn => try { body } finally { self.stop }
}
}).start ! Spawn
}
def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit =
ActorRegistry.spawn(body)(dispatcher)
/**
* Implicitly converts the given Option[Any] to a AnyOptionAsTypedOption which offers the method <code>as[T]</code>

View file

@ -80,6 +80,8 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None
protected[akka] val guard = new ReentrantGuard
private[akka] def registry: ActorRegistryInstance
/**
* User overridable callback/setting.
* <p/>
@ -541,6 +543,7 @@ trait ActorRef extends ActorRefShared with java.lang.Comparable[ActorRef] { scal
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class LocalActorRef private[akka] (
private[akka] val registry: ActorRegistryInstance,
private[this] val actorFactory: () => Actor)
extends ActorRef with ScalaActorRef {
@ -563,7 +566,9 @@ class LocalActorRef private[akka] (
if (isRunning) initializeActorInstance
// used only for deserialization
private[akka] def this(__uuid: Uuid,
private[akka] def this(
__registry: ActorRegistryInstance,
__uuid: Uuid,
__id: String,
__hostname: String,
__port: Int,
@ -573,7 +578,7 @@ class LocalActorRef private[akka] (
__supervisor: Option[ActorRef],
__hotswap: Stack[PartialFunction[Any, Unit]],
__factory: () => Actor) = {
this(__factory)
this(__registry, __factory)
_uuid = __uuid
id = __id
timeout = __timeout
@ -583,7 +588,7 @@ class LocalActorRef private[akka] (
hotswap = __hotswap
setActorSelfFields(actor,this)
start
ActorRegistry.register(this)
__registry.register(this)
}
// ========= PUBLIC FUNCTIONS =========
@ -644,7 +649,7 @@ class LocalActorRef private[akka] (
dispatcher.detach(this)
_status = ActorRefInternals.SHUTDOWN
actor.postStop
ActorRegistry.unregister(this)
registry.unregister(this)
setActorSelfFields(actorInstance.get,null)
} //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
}
@ -942,8 +947,8 @@ class LocalActorRef private[akka] (
}
//TODO: REVISIT: REMOVE
/*
protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
/*protected[akka] def registerSupervisorAsRemoteActor: Option[Uuid] = guard.withGuard {
ensureRemotingEnabled
if (_supervisor.isDefined) {
remoteAddress.foreach(address => RemoteClientModule.registerSupervisorForActor(address, this))
@ -1053,34 +1058,8 @@ class LocalActorRef private[akka] (
private def initializeActorInstance = {
actor.preStart // run actor preStart
Actor.log.slf4j.trace("[{}] has started", toString)
ActorRegistry.register(this)
registry.register(this)
}
/*
private def serializeMessage(message: AnyRef): AnyRef = if (Actor.SERIALIZE_MESSAGES) {
if (!message.isInstanceOf[String] &&
!message.isInstanceOf[Byte] &&
!message.isInstanceOf[Int] &&
!message.isInstanceOf[Long] &&
!message.isInstanceOf[Float] &&
!message.isInstanceOf[Double] &&
!message.isInstanceOf[Boolean] &&
!message.isInstanceOf[Char] &&
!message.isInstanceOf[Tuple2[_, _]] &&
!message.isInstanceOf[Tuple3[_, _, _]] &&
!message.isInstanceOf[Tuple4[_, _, _, _]] &&
!message.isInstanceOf[Tuple5[_, _, _, _, _]] &&
!message.isInstanceOf[Tuple6[_, _, _, _, _, _]] &&
!message.isInstanceOf[Tuple7[_, _, _, _, _, _, _]] &&
!message.isInstanceOf[Tuple8[_, _, _, _, _, _, _, _]] &&
!message.getClass.isArray &&
!message.isInstanceOf[List[_]] &&
!message.isInstanceOf[scala.collection.immutable.Map[_, _]] &&
!message.isInstanceOf[scala.collection.immutable.Set[_]]) {
Serializer.Java.deepClone(message)
} else message
} else message
*/
}
/**
@ -1099,12 +1078,13 @@ object RemoteActorSystemMessage {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[akka] case class RemoteActorRef private[akka] (
registry: ActorRegistryInstance,
classOrServiceName: String,
val actorClassName: String,
val hostname: String,
val port: Int,
_timeout: Long,
clientManaged: Boolean,
clientManaged: Boolean, //TODO: REVISIT: ENCODE CLIENT_MANAGED INTO REMOTE PROTOCOL
loader: Option[ClassLoader],
val actorType: ActorType = ActorType.ScalaActor)
extends ActorRef with ScalaActorRef {
@ -1119,16 +1099,14 @@ private[akka] case class RemoteActorRef private[akka] (
start
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
RemoteClientModule.send[Any](
message, senderOption, None, homeAddress, timeout, true, this, None, actorType)
registry.remote.send[Any](message, senderOption, None, homeAddress, timeout, true, this, None, actorType)
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
message: Any,
timeout: Long,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
val future = RemoteClientModule.send[T](
message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType)
val future = registry.remote.send[T](message, senderOption, senderFuture, homeAddress, timeout, false, this, None, actorType)
if (future.isDefined) future.get
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
}
@ -1136,7 +1114,7 @@ private[akka] case class RemoteActorRef private[akka] (
def start: ActorRef = synchronized {
_status = ActorRefInternals.RUNNING
if (clientManaged) {
RemoteClientModule.register(homeAddress, uuid)
registry.remote.registerClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
}
this
}
@ -1146,8 +1124,8 @@ private[akka] case class RemoteActorRef private[akka] (
_status = ActorRefInternals.SHUTDOWN
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
if (clientManaged) {
RemoteClientModule.unregister(homeAddress, uuid)
ActorRegistry.remote.unregister(this) //TODO: Why does this need to be deregistered from the server?
registry.remote.unregisterClientManagedActor(homeAddress.getHostName,homeAddress.getPort, uuid)
registry.remote.unregister(this) //TODO: REVISIT: Why does this need to be deregistered from the server?
}
}
}

View file

@ -13,8 +13,9 @@ import java.util.{Set => JSet}
import annotation.tailrec
import akka.util.ReflectiveAccess._
import java.net.InetSocketAddress
import akka.util. {ReadWriteGuard, Address, ListenerManagement}
import akka.remoteinterface.RemoteServerModule
import akka.util. {ReflectiveAccess, ReadWriteGuard, Address, ListenerManagement}
import akka.dispatch. {MessageDispatcher, Dispatchers}
import akka.remoteinterface. {RemoteSupport, RemoteServerModule}
/**
* Base trait for ActorRegistry events, allows listen to when an actor is added and removed from the ActorRegistry.
@ -37,7 +38,9 @@ case class ActorUnregistered(actor: ActorRef) extends ActorRegistryEvent
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends ListenerManagement {
object ActorRegistry extends ActorRegistryInstance(ReflectiveAccess.Remote.defaultRemoteSupport)
class ActorRegistryInstance(remoteBootstrap: Option[(ActorRegistryInstance) => RemoteSupport]) extends ListenerManagement {
private val actorsByUUID = new ConcurrentHashMap[Uuid, ActorRef]
private val actorsById = new Index[String,ActorRef]
private val remoteActorSets = Map[Address, RemoteActorSet]()
@ -227,11 +230,127 @@ object ActorRegistry extends ListenerManagement {
/**
* Handy access to the RemoteServer module
*/
lazy val remote: RemoteServerModule = getObjectFor("akka.remote.RemoteNode$") match {
case Some(module) => module
case None =>
log.slf4j.error("Wanted remote module but didn't exist on classpath")
null
lazy val remote: RemoteSupport = remoteBootstrap.map(_(this)).getOrElse(throw new UnsupportedOperationException("You need to have akka-remote on classpath"))
/**
* Creates an ActorRef out of the Actor with type T.
* <pre>
* import Actor._
* val actor = actorOf[MyActor]
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor].start
* </pre>
*/
def actorOf[T <: Actor : Manifest]: ActorRef = actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]])
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf[MyActor]("www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf[MyActor]("www.akka.io",2552).start
* </pre>
*/
def actorOf[T <: Actor : Manifest](host: String, port: Int): ActorRef =
actorOf(manifest[T].erasure.asInstanceOf[Class[_ <: Actor]], host, port)
/**
* Creates an ActorRef out of the Actor of the specified Class.
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor])
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor]).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor]): ActorRef = new LocalActorRef(this, () => {
import ReflectiveAccess.{ createInstance, noParams, noArgs }
createInstance[Actor](clazz.asInstanceOf[Class[_]], noParams, noArgs).getOrElse(
throw new ActorInitializationException(
"Could not instantiate Actor" +
"\nMake sure Actor is NOT defined inside a class/trait," +
"\nif so put it outside the class/trait, f.e. in a companion object," +
"\nOR try to change: 'actorOf[MyActor]' to 'actorOf(new MyActor)'."))
})
/**
* Creates a Client-managed ActorRef out of the Actor of the specified Class.
* If the supplied host and port is identical of the configured local node, it will be a local actor
* <pre>
* import Actor._
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(classOf[MyActor],"www.akka.io",2552).start
* </pre>
*/
def actorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long = Actor.TIMEOUT): ActorRef =
remote.clientManagedActorOf(clazz, host, port, timeout)
/**
* Creates an ActorRef out of the Actor. Allows you to pass in a factory function
* that creates the Actor. Please note that this function can be invoked multiple
* times if for example the Actor is supervised and needs to be restarted.
* <p/>
* This function should <b>NOT</b> be used for remote actors.
* <pre>
* import Actor._
* val actor = actorOf(new MyActor)
* actor.start
* actor ! message
* actor.stop
* </pre>
* You can create and start the actor in one statement like this:
* <pre>
* val actor = actorOf(new MyActor).start
* </pre>
*/
def actorOf(factory: => Actor): ActorRef = new LocalActorRef(this,() => factory)
/**
* Use to spawn out a block of code in an event-driven actor. Will shut actor down when
* the block has been executed.
* <p/>
* NOTE: If used from within an Actor then has to be qualified with 'ActorRegistry.spawn' since
* there is a method 'spawn[ActorType]' in the Actor trait already.
* Example:
* <pre>
* import ActorRegistry.{spawn}
*
* spawn {
* ... // do stuff
* }
* </pre>
*/
def spawn(body: => Unit)(implicit dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher): Unit = {
case object Spawn
actorOf(new Actor() {
self.dispatcher = dispatcher
def receive = {
case Spawn => try { body } finally { self.stop }
}
}).start ! Spawn
}
@ -303,12 +422,12 @@ object ActorRegistry extends ListenerManagement {
private[akka] def typedActorsFactories(address: Address) = actorsFor(address).typedActorsFactories
private[akka] class RemoteActorSet {
private[ActorRegistry] val actors = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistry] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[ActorRegistry] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistry] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistry] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
private[ActorRegistryInstance] val actors = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistryInstance] val actorsByUuid = new ConcurrentHashMap[String, ActorRef]
private[ActorRegistryInstance] val actorsFactories = new ConcurrentHashMap[String, () => ActorRef]
private[ActorRegistryInstance] val typedActors = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistryInstance] val typedActorsByUuid = new ConcurrentHashMap[String, AnyRef]
private[ActorRegistryInstance] val typedActorsFactories = new ConcurrentHashMap[String, () => AnyRef]
}
}

View file

@ -142,7 +142,7 @@ sealed class Supervisor(handler: FaultHandlingStrategy) {
actorRef.lifeCycle = lifeCycle
supervisor.link(actorRef)
if (registerAsRemoteService)
ActorRegistry.remote.register(actorRef)
ActorRegistry.remote.register(actorRef) //TODO: REVISIT: Is this the most sensible approach? other way of obtaining ActorRegistry?
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor)

View file

@ -132,7 +132,7 @@ trait MessageDispatcher extends MailboxFactory with Logging {
val i = uuids.iterator
while(i.hasNext()) {
val uuid = i.next()
ActorRegistry.actorFor(uuid) match {
ActorRegistry.actorFor(uuid) match { //TODO: REVISIT: How to keep track of which registry?
case Some(actor) => actor.stop
case None =>
log.slf4j.error("stopAllLinkedActors couldn't find linked actor: " + uuid)

View file

@ -5,13 +5,33 @@
package akka.remoteinterface
import akka.japi.Creator
import akka.actor.ActorRef
import akka.util.{ReentrantGuard, Logging, ListenerManagement}
import java.net.InetSocketAddress
import akka.actor._
import akka.util._
import akka.dispatch.CompletableFuture
import akka.actor. {ActorRegistryInstance, ActorType, RemoteActorRef, ActorRef}
import akka.config.Config.{config, TIME_UNIT}
trait RemoteModule extends Logging {
def registry: ActorRegistryInstance
def optimizeLocalScoped_?(): Boolean //Apply optimizations for remote operations in local scope
protected[akka] def notifyListeners(message: => Any): Unit
}
abstract class RemoteSupport extends ListenerManagement with RemoteServerModule with RemoteClientModule {
def shutdown {
this.shutdownServerModule
this.shutdownClientModule
}
protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
}
/**
* This is the interface for the RemoteServer functionality, it's used in ActorRegistry.remote
*/
trait RemoteServerModule extends ListenerManagement with Logging {
trait RemoteServerModule extends RemoteModule {
protected val guard = new ReentrantGuard
/**
@ -37,12 +57,12 @@ trait RemoteServerModule extends ListenerManagement with Logging {
/**
* Starts the server up
*/
def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule //TODO possibly hidden
def start(host: String, port: Int, loader: Option[ClassLoader] = None): RemoteServerModule
/**
* Shuts the server down
*/
def shutdown: Unit //TODO possibly hidden
def shutdownServerModule: Unit
/**
* Register typed actor by interface name.
@ -147,3 +167,71 @@ trait RemoteServerModule extends ListenerManagement with Logging {
*/
def unregisterTypedPerSessionActor(id: String): Unit
}
trait RemoteClientModule extends RemoteModule { self: RemoteModule =>
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, None)
def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, Some(loader))
def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef =
actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, None)
def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, Some(loader))
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None)
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader))
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(serviceId, className, timeout, hostname, port, None)
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T =
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None)
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T =
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T =
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T =
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef
/** Methods that needs to be implemented by a transport **/
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, host: String, port: Int, loader: Option[ClassLoader]): T
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]]
//TODO: REVISIT: IMPLEMENT OR REMOVE
//private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef
//private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef
/**
* Clean-up all open connections.
*/
def shutdownClientModule: Unit
private[akka] def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid): Unit
}

View file

@ -4,12 +4,13 @@
package akka.util
import akka.actor.{ActorRef, IllegalActorStateException, ActorType, Uuid}
import akka.dispatch.{Future, CompletableFuture, MessageInvocation}
import akka.config.{Config, ModuleNotAvailableException}
import akka.AkkaException
import java.net.InetSocketAddress
import akka.remoteinterface.RemoteSupport
import akka.actor._
/**
* Helper class for reflective access to different modules in order to allow optional loading of modules.
@ -20,10 +21,10 @@ object ReflectiveAccess extends Logging {
val loader = getClass.getClassLoader
lazy val isRemotingEnabled = RemoteClientModule.isEnabled
lazy val isRemotingEnabled = Remote.isEnabled
lazy val isTypedActorEnabled = TypedActorModule.isEnabled
def ensureRemotingEnabled = RemoteClientModule.ensureEnabled
def ensureRemotingEnabled = Remote.ensureEnabled
def ensureTypedActorEnabled = TypedActorModule.ensureEnabled
/**
@ -31,82 +32,26 @@ object ReflectiveAccess extends Logging {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClientModule {
object Remote {
val TRANSPORT = Config.config.getString("akka.remote.transport","akka.remote.NettyRemoteSupport")
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
val PORT = Config.config.getInt("akka.remote.server.port", 2552)
type RemoteClient = {
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[_]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]]
def registerSupervisorForActor(actorRef: ActorRef)
}
type RemoteClientObject = {
def register(hostname: String, port: Int, uuid: Uuid): Unit
def unregister(hostname: String, port: Int, uuid: Uuid): Unit
def clientFor(address: InetSocketAddress): RemoteClient
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient
}
lazy val isEnabled = remoteClientObjectInstance.isDefined
lazy val isEnabled = remoteSupportClass.isDefined
def ensureEnabled = if (!isEnabled) throw new ModuleNotAvailableException(
"Can't load the remoting module, make sure that akka-remote.jar is on the classpath")
val remoteClientObjectInstance: Option[RemoteClientObject] =
getObjectFor("akka.remote.RemoteClient$")
//TODO: REVISIT: Make class configurable
val remoteSupportClass: Option[Class[_ <: RemoteSupport]] = getClassFor(TRANSPORT)
def register(address: InetSocketAddress, uuid: Uuid) = {
ensureEnabled
remoteClientObjectInstance.get.register(address.getHostName, address.getPort, uuid)
protected[akka] val defaultRemoteSupport: Option[ActorRegistryInstance => RemoteSupport] = remoteSupportClass map {
remoteClass => (registry: ActorRegistryInstance) =>
createInstance[RemoteSupport](remoteClass,Array[Class[_]](classOf[ActorRegistryInstance]),Array[AnyRef](registry)).
getOrElse(throw new ModuleNotAvailableException("Can't instantiate "+
remoteClass.getName+
", make sure that akka-remote.jar is on the classpath"))
}
def unregister(address: InetSocketAddress, uuid: Uuid) = {
ensureEnabled
remoteClientObjectInstance.get.unregister(address.getHostName, address.getPort, uuid)
}
def registerSupervisorForActor(remoteAddress: InetSocketAddress, actorRef: ActorRef) = {
ensureEnabled
val remoteClient = remoteClientObjectInstance.get.clientFor(remoteAddress)
remoteClient.registerSupervisorForActor(actorRef)
}
def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient = {
ensureEnabled
remoteClientObjectInstance.get.clientFor(hostname, port, loader)
}
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[_]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]] = {
ensureEnabled
clientFor(remoteAddress.getHostName, remoteAddress.getPort, None).send[T](
message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
}
}
/**
* Reflective access to the RemoteServer module.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServerModule {
val HOSTNAME = Config.config.getString("akka.remote.server.hostname", "localhost")
val PORT = Config.config.getInt("akka.remote.server.port", 2552)
}
/**

View file

@ -13,7 +13,7 @@ public class SupervisionConfig {
public SupervisorConfig createSupervisorConfig(List<ActorRef> toSupervise) {
ArrayList<Server> targets = new ArrayList<Server>(toSupervise.size());
for(ActorRef ref : toSupervise) {
targets.add(new Supervise(ref, permanent(), new RemoteAddress("localhost",2552)));
targets.add(new Supervise(ref, permanent(), true));
}
return new SupervisorConfig(new AllForOneStrategy(new Class[] { Exception.class },50,1000), targets.toArray(new Server[0]));

View file

@ -17,7 +17,7 @@ trait BootableRemoteActorService extends Bootable with Logging {
self: BootableActorLoaderService =>
protected lazy val remoteServerThread = new Thread(new Runnable() {
import ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT}
import ReflectiveAccess.Remote.{HOSTNAME,PORT}
def run = ActorRegistry.remote.start(HOSTNAME,PORT,loader = self.applicationLoader)
}, "Akka Remote Service")
@ -34,7 +34,7 @@ trait BootableRemoteActorService extends Bootable with Logging {
abstract override def onUnload = {
log.slf4j.info("Shutting down Remote Actors Service")
RemoteNode.shutdown
ActorRegistry.remote.shutdown
if (remoteServerThread.isAlive) remoteServerThread.join(1000)
log.slf4j.info("Remote Actors Service has been shut down")
super.onUnload

View file

@ -4,63 +4,470 @@
package akka.remote
import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import java.util.{Map => JMap}
import akka.actor.Actor._
import akka.actor.{Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage, uuidFrom, Uuid, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType}
import akka.util._
import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture, Future}
import akka.remote.protocol.RemoteProtocol._
import akka.remote.protocol.RemoteProtocol.ActorType._
import akka.config.Config._
import akka.config.ConfigurationException
import akka.serialization.RemoteActorSerialization
import akka.japi.Creator
import akka.actor.{ActorRegistryInstance, Actor, RemoteActorRef, TypedActor, ActorRef, IllegalActorStateException,
RemoteActorSystemMessage, uuidFrom, Uuid, Exit, ActorRegistry, LifeCycleMessage, ActorType => AkkaActorType}
import akka.remoteinterface. {RemoteSupport, RemoteModule, RemoteServerModule, RemoteClientModule}
import akka.config.Config._
import akka.serialization.RemoteActorSerialization._
import akka.AkkaException
import akka.actor.Actor._
import akka.util._
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.group.{DefaultChannelGroup, ChannelGroup}
import org.jboss.netty.channel.group.{DefaultChannelGroup,ChannelGroup}
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import org.jboss.netty.handler.codec.compression.{ZlibEncoder, ZlibDecoder}
import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap}
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
import scala.collection.mutable.Map
import java.net.{ SocketAddress, InetSocketAddress }
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import scala.collection.mutable.{ HashSet, HashMap }
import scala.reflect.BeanProperty
import akka.dispatch. {Future, DefaultCompletableFuture, CompletableFuture}
import akka.japi.Creator
import akka.remoteinterface.RemoteServerModule
import java.lang.reflect.InvocationTargetException
/**
* Use this object if you need a single remote server on a specific node.
*
* <pre>
* // takes hostname and port from 'akka.conf'
* RemoteNode.start
* </pre>
*
* <pre>
* RemoteNode.start(hostname, port)
* </pre>
*
* You can specify the class loader to use to load the remote actors.
* <pre>
* RemoteNode.start(hostname, port, classLoader)
* </pre>
*
* If you need to create more than one, then you can use the RemoteServer:
*
* <pre>
* val server = new RemoteServer
* server.start(hostname, port)
* </pre>
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(
@BeanProperty val cause: Throwable,
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
/**
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteNode extends RemoteServer
trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagement with Logging =>
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
protected[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T =
TypedActor.createProxyForRemoteActorRef(intfClass, RemoteActorRef(registry, serviceId, implClassName, hostname, port, timeout, false, loader, AkkaActorType.TypedActor))
protected[akka] def send[T](message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType): Option[CompletableFuture[T]] =
clientFor(remoteAddress, None).send[T](message, senderOption, senderFuture, remoteAddress, timeout, isOneWay, actorRef, typedActorInfo, actorType)
private[akka] def clientFor(
address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized { //TODO: REVIST: synchronized here seems bottlenecky
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
if (remoteClients.contains(hash)) remoteClients(hash)
else {
val client = new RemoteClient(hostname, port, loader, self.notifyListeners _)
client.connect
remoteClients += hash -> client
client
}
}
def shutdownClientFor(address: InetSocketAddress) = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
if (remoteClients.contains(hash)) {
val client = remoteClients(hash)
client.shutdown
remoteClients -= hash
}
}
//TODO: REVISIT IMPLEMENT OR REMOVE
/*private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor().registerSupervisorForActor(actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef =
clientFor().deregisterSupervisorForActor(actorRef)*/
/**
* Clean-up all open connections.
*/
def shutdownClientModule = synchronized {
remoteClients.foreach({ case (addr, client) => client.shutdown })
remoteClients.clear
}
def registerClientManagedActor(hostname: String, port: Int, uuid: Uuid) = synchronized {
actorsFor(Address(hostname, port)) += uuid
}
private[akka] def unregisterClientManagedActor(hostname: String, port: Int, uuid: Uuid) = synchronized {
val set = actorsFor(Address(hostname, port))
set -= uuid
if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
}
private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = {
val set = remoteActors.get(remoteServerAddress)
if (set.isDefined && (set.get ne null)) set.get
else {
val remoteActorSet = new HashSet[Uuid]
remoteActors.put(remoteServerAddress, remoteActorSet)
remoteActorSet
}
}
}
object RemoteClient {
val SECURE_COOKIE: Option[String] = {
val cookie = config.getString("akka.remote.secure-cookie", "")
if (cookie == "") None else Some(cookie)
}
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
}
/**
* RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClient private[akka] (
val hostname: String,
val port: Int,
val loader: Option[ClassLoader] = None,
val notifyListeners: (=> Any) => Unit) extends Logging {
val name = "RemoteClient@" + hostname + "::" + port
//FIXME Should these be clear:ed on postStop?
private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
private val remoteAddress = new InetSocketAddress(hostname, port)
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var timer: HashedWheelTimer = _
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
private val reconnectionTimeWindow = Duration(config.getInt(
"akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
@volatile
private var reconnectionTimeWindowStart = 0L
def connect = runSwitch switchOn {
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
log.slf4j.info("Starting remote client connection to [{}:{}]", hostname, port)
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, this))
log.slf4j.error("Remote client connection to [{}:{}] has failed", hostname, port)
log.slf4j.debug("Remote client connection failed", connection.getCause)
}
notifyListeners(RemoteClientStarted(this))
}
def shutdown = runSwitch switchOff {
log.slf4j.info("Shutting down {}", name)
notifyListeners(RemoteClientShutdown(this))
timer.stop
timer = null
openChannels.close.awaitUninterruptibly
openChannels = null
bootstrap.releaseExternalResources
bootstrap = null
connection = null
log.slf4j.info("{} has been shut down", name)
}
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: AkkaActorType): Option[CompletableFuture[T]] = {
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
else None
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
cookie
).build, senderFuture)
}
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
connection.getChannel.write(request)
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult)
connection.getChannel.write(request)
Some(futureResult)
}
} else {
val exception = new RemoteClientException(
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
notifyListeners(RemoteClientError(exception, this))
throw exception
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't unregister supervisor for " + actorRef + " since it is not under supervision")
else supervisors.remove(actorRef.supervisor.get.uuid)
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
if (reconnectionTimeWindowStart == 0L) {
reconnectionTimeWindowStart = System.currentTimeMillis
true
} else {
val timeLeft = reconnectionTimeWindow - (System.currentTimeMillis - reconnectionTimeWindowStart)
if (timeLeft > 0) {
log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
true
} else false
}
}
private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClientPipelineFactory(
name: String,
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
supervisors: ConcurrentMap[Uuid, ActorRef],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
lazy val engine = {
val e = RemoteServerSslContext.client.createSSLEngine()
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
e.setUseClientMode(true)
e
}
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
}
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
new StaticChannelPipeline(stages: _*)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelHandler.Sharable
class RemoteClientHandler(
val name: String,
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
val supervisors: ConcurrentMap[Uuid, ActorRef],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
val timer: HashedWheelTimer,
val client: RemoteClient)
extends SimpleChannelUpstreamHandler with Logging {
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.slf4j.debug(event.toString)
}
super.handleUpstream(ctx, event)
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
val result = event.getMessage
if (result.isInstanceOf[RemoteMessageProtocol]) {
val reply = result.asInstanceOf[RemoteMessageProtocol]
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow)
log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
if (reply.hasSupervisorUuid()) {
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
}
val exception = parseException(reply, client.loader)
future.completeWithException(exception)
}
futures remove replyUuid
} else {
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
client.notifyListeners(RemoteClientError(exception, client))
throw exception
}
} catch {
case e: Exception =>
client.notifyListeners(RemoteClientError(e, client))
log.slf4j.error("Unexpected exception in remote client handler: {}", e)
throw e
}
}
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn {
if (client.isWithinReconnectionTimeWindow) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
client.openChannels.remove(event.getChannel)
client.isAuthenticated.set(false)
log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress)
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.notifyListeners(RemoteClientError(client.connection.getCause, client))
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
log.slf4j.debug("Reconnection failed", client.connection.getCause)
}
}
}, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.shutdown }
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = {
client.notifyListeners(RemoteClientConnected(client))
log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress)
client.resetReconnectionTimeWindow
}
if (RemoteServer.SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
sslHandler.handshake.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) connect
else throw new RemoteClientException("Could not establish SSL handshake", client)
}
})
} else connect
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client))
log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.notifyListeners(RemoteClientError(event.getCause, client))
if (event.getCause ne null)
log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause)
else
log.slf4j.error("Unexpected exception from downstream in remote client: {}", event)
event.getChannel.close
}
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
val exception = reply.getException
val classname = exception.getClassname
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
}
}
/**
* For internal use only. Holds configuration variables, remote actors, remote typed actors and remote servers.
@ -118,41 +525,62 @@ object RemoteServer {
/**
* Life-cycle events for RemoteServer.
*/
sealed trait RemoteServerLifeCycleEvent
sealed trait RemoteServerLifeCycleEvent //TODO: REVISIT: Document change from RemoteServer to RemoteServerModule
case class RemoteServerStarted(
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerShutdown(
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerError(
@BeanProperty val cause: Throwable,
@BeanProperty val server: RemoteServer) extends RemoteServerLifeCycleEvent
@BeanProperty val server: RemoteServerModule) extends RemoteServerLifeCycleEvent
case class RemoteServerClientConnected(
@BeanProperty val server: RemoteServer,
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientDisconnected(
@BeanProperty val server: RemoteServer,
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
case class RemoteServerClientClosed(
@BeanProperty val server: RemoteServer,
@BeanProperty val server: RemoteServerModule,
@BeanProperty val clientAddress: Option[InetSocketAddress]) extends RemoteServerLifeCycleEvent
/**
* Use this class if you need a more than one remote server on a specific node.
*
* <pre>
* val server = new RemoteServer
* server.start
* </pre>
*
* If you need to create more than one, then you can use the RemoteServer:
*
* <pre>
* RemoteNode.start
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* Provides the implementation of the Netty remote support
*/
class RemoteServer extends RemoteServerModule {
class NettyRemoteSupport(val registry: ActorRegistryInstance)
extends RemoteSupport with NettyRemoteServerModule with NettyRemoteClientModule {
protected[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef = {
//TODO: REVISIT: Possible to optimize server-managed actors in local scope?
//val Host = this.hostname
//val Port = this.port
//(host,port) match {
// case (Host, Port) if optimizeLocalScoped_? =>
//if actor with that servicename or uuid is present locally, return a LocalActorRef to that one
//else return RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
// case _ =>
// RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
//}
RemoteActorRef(registry, serviceId, className, hostname, port, timeout, false, loader)
}
def clientManagedActorOf(clazz: Class[_ <: Actor], host: String, port: Int, timeout: Long): ActorRef = {
val Host = this.hostname
val Port = this.port
(host,port) match {
case (Host, Port) if optimizeLocalScoped_? =>
registry.actorOf(clazz) //Local
case _ =>
new RemoteActorRef(registry,clazz.getName,clazz.getName,host,port,timeout,true /*Client managed*/, None)
}
}
val optimizeLocalScoped_? = true
}
trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServer._
@volatile private[akka] var address = Address(RemoteServer.HOSTNAME,RemoteServer.PORT)
@ -172,7 +600,7 @@ class RemoteServer extends RemoteServerModule {
def isRunning = _isRunning.isOn
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServer = guard withGuard {
def start(_hostname: String, _port: Int, loader: Option[ClassLoader] = None): RemoteServerModule = guard withGuard {
try {
_isRunning switchOn {
address = Address(_hostname,_port)
@ -196,7 +624,7 @@ class RemoteServer extends RemoteServerModule {
this
}
def shutdown = guard withGuard {
def shutdownServerModule = guard withGuard {
_isRunning switchOff {
try {
openChannels.disconnect
@ -336,22 +764,15 @@ class RemoteServer extends RemoteServerModule {
* <p/>
* NOTE: You need to call this method if you have registered an actor by a custom ID.
*/
def unregisterTypedPerSessionActor(id: String): Unit = {
if (_isRunning.isOn) {
typedActorsFactories.remove(id)
}
}
def unregisterTypedPerSessionActor(id: String): Unit =
if (_isRunning.isOn) typedActorsFactories.remove(id)
protected override def manageLifeCycleOfListeners = false
protected[akka] override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
private[akka] def actors = ActorRegistry.actors(address)
private[akka] def actorsByUuid = ActorRegistry.actorsByUuid(address)
private[akka] def actorsFactories = ActorRegistry.actorsFactories(address)
private[akka] def typedActors = ActorRegistry.typedActors(address)
private[akka] def typedActorsByUuid = ActorRegistry.typedActorsByUuid(address)
private[akka] def typedActorsFactories = ActorRegistry.typedActorsFactories(address)
private[akka] def actors = registry.actors(address)
private[akka] def actorsByUuid = registry.actorsByUuid(address)
private[akka] def actorsFactories = registry.actorsFactories(address)
private[akka] def typedActors = registry.typedActors(address)
private[akka] def typedActorsByUuid = registry.typedActorsByUuid(address)
private[akka] def typedActorsFactories = registry.typedActorsFactories(address)
}
object RemoteServerSslContext {
@ -376,7 +797,7 @@ class RemoteServerPipelineFactory(
val name: String,
val openChannels: ChannelGroup,
val loader: Option[ClassLoader],
val server: RemoteServer) extends ChannelPipelineFactory {
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
import RemoteServer._
def getPipeline: ChannelPipeline = {
@ -413,7 +834,7 @@ class RemoteServerHandler(
val name: String,
val openChannels: ChannelGroup,
val applicationLoader: Option[ClassLoader],
val server: RemoteServer) extends SimpleChannelUpstreamHandler with Logging {
val server: NettyRemoteServerModule) extends SimpleChannelUpstreamHandler with Logging {
import RemoteServer._
val AW_PROXY_PREFIX = "$$ProxiedByAW".intern
@ -422,7 +843,7 @@ class RemoteServerHandler(
val sessionActors = new ChannelLocal[ConcurrentHashMap[String, ActorRef]]()
val typedSessionActors = new ChannelLocal[ConcurrentHashMap[String, AnyRef]]()
applicationLoader.foreach(MessageSerializer.setClassLoader(_))
applicationLoader.foreach(MessageSerializer.setClassLoader(_)) //TODO: REVISIT: THIS FEELS A BIT DODGY
/**
* ChannelOpen overridden to store open channels for a clean postStop of a RemoteServer.

View file

@ -1,515 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remote
import akka.remote.protocol.RemoteProtocol.{ActorType => ActorTypeProtocol, _}
import akka.actor.{Exit, Actor, ActorRef, ActorType, RemoteActorRef, IllegalActorStateException}
import akka.dispatch.{DefaultCompletableFuture, CompletableFuture}
import akka.actor.{Uuid,newUuid,uuidFrom}
import akka.config.Config._
import akka.serialization.RemoteActorSerialization._
import akka.AkkaException
import Actor._
import org.jboss.netty.channel._
import group.DefaultChannelGroup
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
import java.net.{ SocketAddress, InetSocketAddress }
import java.util.concurrent.{ TimeUnit, Executors, ConcurrentMap, ConcurrentHashMap, ConcurrentSkipListSet }
import java.util.concurrent.atomic.{ AtomicLong, AtomicBoolean }
import scala.collection.mutable.{ HashSet, HashMap }
import scala.reflect.BeanProperty
import akka.actor._
import akka.util._
/**
* Life-cycle events for RemoteClient.
*/
sealed trait RemoteClientLifeCycleEvent
case class RemoteClientError(
@BeanProperty val cause: Throwable,
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientDisconnected(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientConnected(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientStarted(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
case class RemoteClientShutdown(
@BeanProperty val client: RemoteClient) extends RemoteClientLifeCycleEvent
/**
* Thrown for example when trying to send a message using a RemoteClient that is either not started or shut down.
*/
class RemoteClientException private[akka] (message: String, @BeanProperty val client: RemoteClient) extends AkkaException(message)
/**
* The RemoteClient object manages RemoteClient instances and gives you an API to lookup remote actor handles.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteClient extends Logging {
val SECURE_COOKIE: Option[String] = {
val cookie = config.getString("akka.remote.secure-cookie", "")
if (cookie == "") None
else Some(cookie)
}
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout", 1), TIME_UNIT)
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay", 5), TIME_UNIT)
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size", 1048576)
private val remoteClients = new HashMap[String, RemoteClient]
private val remoteActors = new HashMap[Address, HashSet[Uuid]]
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, None)
def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, Actor.TIMEOUT, hostname, port, Some(loader))
def actorFor(serviceId: String, className: String, hostname: String, port: Int): ActorRef =
actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, None)
def actorFor(serviceId: String, className: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(serviceId, className, Actor.TIMEOUT, hostname, port, Some(loader))
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, None)
def actorFor(classNameOrServiceId: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
actorFor(classNameOrServiceId, classNameOrServiceId, timeout, hostname, port, Some(loader))
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, false, None)
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int): T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, Actor.TIMEOUT, hostname, port, None)
}
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int): T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
}
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = {
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
}
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): T = {
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
}
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): T = {
val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, false, loader, ActorType.TypedActor)
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
}
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, false, Some(loader))
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]): ActorRef =
RemoteActorRef(serviceId, className, hostname, port, timeout, false, loader)
def clientFor(hostname: String, port: Int): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), None)
def clientFor(hostname: String, port: Int, loader: ClassLoader): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), Some(loader))
def clientFor(address: InetSocketAddress): RemoteClient =
clientFor(address, None)
def clientFor(address: InetSocketAddress, loader: ClassLoader): RemoteClient =
clientFor(address, Some(loader))
private[akka] def clientFor(hostname: String, port: Int, loader: Option[ClassLoader]): RemoteClient =
clientFor(new InetSocketAddress(hostname, port), loader)
private[akka] def clientFor(
address: InetSocketAddress, loader: Option[ClassLoader]): RemoteClient = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
loader.foreach(MessageSerializer.setClassLoader(_))
if (remoteClients.contains(hash)) remoteClients(hash)
else {
val client = new RemoteClient(hostname, port, loader)
client.connect
remoteClients += hash -> client
client
}
}
def shutdownClientFor(address: InetSocketAddress) = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ':' + port
if (remoteClients.contains(hash)) {
val client = remoteClients(hash)
client.shutdown
remoteClients -= hash
}
}
/**
* Clean-up all open connections.
*/
def shutdownAll = synchronized {
remoteClients.foreach({ case (addr, client) => client.shutdown })
remoteClients.clear
}
def register(hostname: String, port: Int, uuid: Uuid) = synchronized {
actorsFor(Address(hostname, port)) += uuid
}
private[akka] def unregister(hostname: String, port: Int, uuid: Uuid) = synchronized {
val set = actorsFor(Address(hostname, port))
set -= uuid
if (set.isEmpty) shutdownClientFor(new InetSocketAddress(hostname, port))
}
private[akka] def actorsFor(remoteServerAddress: Address): HashSet[Uuid] = {
val set = remoteActors.get(remoteServerAddress)
if (set.isDefined && (set.get ne null)) set.get
else {
val remoteActorSet = new HashSet[Uuid]
remoteActors.put(remoteServerAddress, remoteActorSet)
remoteActorSet
}
}
}
/**
* RemoteClient represents a connection to a RemoteServer. Is used to send messages to remote actors on the RemoteServer.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClient private[akka] (
val hostname: String, val port: Int, val loader: Option[ClassLoader] = None)
extends Logging with ListenerManagement {
val name = "RemoteClient@" + hostname + "::" + port
//FIXME Should these be clear:ed on postStop?
private val futures = new ConcurrentHashMap[Uuid, CompletableFuture[_]]
private val supervisors = new ConcurrentHashMap[Uuid, ActorRef]
private val remoteAddress = new InetSocketAddress(hostname, port)
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var timer: HashedWheelTimer = _
private[remote] val runSwitch = new Switch()
private[remote] val isAuthenticated = new AtomicBoolean(false)
private[remote] def isRunning = runSwitch.isOn
private val reconnectionTimeWindow = Duration(config.getInt(
"akka.remote.client.reconnection-time-window", 600), TIME_UNIT).toMillis
@volatile
private var reconnectionTimeWindowStart = 0L
def connect = runSwitch switchOn {
openChannels = new DefaultChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors, bootstrap, remoteAddress, timer, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
log.slf4j.info("Starting remote client connection to [{}:{}]", hostname, port)
// Wait until the connection attempt succeeds or fails.
connection = bootstrap.connect(remoteAddress)
val channel = connection.awaitUninterruptibly.getChannel
openChannels.add(channel)
if (!connection.isSuccess) {
notifyListeners(RemoteClientError(connection.getCause, this))
log.slf4j.error("Remote client connection to [{}:{}] has failed", hostname, port)
log.slf4j.debug("Remote client connection failed", connection.getCause)
}
notifyListeners(RemoteClientStarted(this))
}
def shutdown = runSwitch switchOff {
log.slf4j.info("Shutting down {}", name)
notifyListeners(RemoteClientShutdown(this))
timer.stop
timer = null
openChannels.close.awaitUninterruptibly
openChannels = null
bootstrap.releaseExternalResources
bootstrap = null
connection = null
log.slf4j.info("{} has been shut down", name)
}
@deprecated("Use addListener instead")
def registerListener(actorRef: ActorRef) = addListener(actorRef)
@deprecated("Use removeListener instead")
def deregisterListener(actorRef: ActorRef) = removeListener(actorRef)
override def notifyListeners(message: => Any): Unit = super.notifyListeners(message)
protected override def manageLifeCycleOfListeners = false
def send[T](
message: Any,
senderOption: Option[ActorRef],
senderFuture: Option[CompletableFuture[T]],
remoteAddress: InetSocketAddress,
timeout: Long,
isOneWay: Boolean,
actorRef: ActorRef,
typedActorInfo: Option[Tuple2[String, String]],
actorType: ActorType): Option[CompletableFuture[T]] = {
val cookie = if (isAuthenticated.compareAndSet(false, true)) RemoteClient.SECURE_COOKIE
else None
send(createRemoteMessageProtocolBuilder(
Some(actorRef),
Left(actorRef.uuid),
actorRef.id,
actorRef.actorClassName,
actorRef.timeout,
Left(message),
isOneWay,
senderOption,
typedActorInfo,
actorType,
cookie
).build, senderFuture)
}
def send[T](
request: RemoteMessageProtocol,
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
if (isRunning) {
if (request.getOneWay) {
connection.getChannel.write(request)
None
} else {
val futureResult = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFuture[T](request.getActorInfo.getTimeout)
futures.put(uuidFrom(request.getUuid.getHigh, request.getUuid.getLow), futureResult)
connection.getChannel.write(request)
Some(futureResult)
}
} else {
val exception = new RemoteClientException(
"Remote client is not running, make sure you have invoked 'RemoteClient.connect' before using it.", this)
notifyListeners(RemoteClientError(exception, this))
throw exception
}
}
private[akka] def registerSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't register supervisor for " + actorRef + " since it is not under supervision")
else supervisors.putIfAbsent(actorRef.supervisor.get.uuid, actorRef)
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef) =
if (!actorRef.supervisor.isDefined) throw new IllegalActorStateException(
"Can't unregister supervisor for " + actorRef + " since it is not under supervision")
else supervisors.remove(actorRef.supervisor.get.uuid)
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
if (reconnectionTimeWindowStart == 0L) {
reconnectionTimeWindowStart = System.currentTimeMillis
true
} else {
val timeLeft = reconnectionTimeWindow - (System.currentTimeMillis - reconnectionTimeWindowStart)
if (timeLeft > 0) {
log.slf4j.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
true
} else false
}
}
private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClientPipelineFactory(
name: String,
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
supervisors: ConcurrentMap[Uuid, ActorRef],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
timer: HashedWheelTimer,
client: RemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
def join(ch: ChannelHandler*) = Array[ChannelHandler](ch: _*)
lazy val engine = {
val e = RemoteServerSslContext.client.createSSLEngine()
e.setEnabledCipherSuites(e.getSupportedCipherSuites) //TODO is this sensible?
e.setUseClientMode(true)
e
}
val ssl = if (RemoteServer.SECURE) join(new SslHandler(engine)) else join()
val timeout = new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT.toMillis.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClient.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(RemoteMessageProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val (enc, dec) = RemoteServer.COMPRESSION_SCHEME match {
case "zlib" => (join(new ZlibEncoder(RemoteServer.ZLIB_COMPRESSION_LEVEL)), join(new ZlibDecoder))
case _ => (join(), join())
}
val remoteClient = new RemoteClientHandler(name, futures, supervisors, bootstrap, remoteAddress, timer, client)
val stages = ssl ++ join(timeout) ++ dec ++ join(lenDec, protobufDec) ++ enc ++ join(lenPrep, protobufEnc, remoteClient)
new StaticChannelPipeline(stages: _*)
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelHandler.Sharable
class RemoteClientHandler(
val name: String,
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
val supervisors: ConcurrentMap[Uuid, ActorRef],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
val timer: HashedWheelTimer,
val client: RemoteClient)
extends SimpleChannelUpstreamHandler with Logging {
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.slf4j.debug(event.toString)
}
super.handleUpstream(ctx, event)
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
val result = event.getMessage
if (result.isInstanceOf[RemoteMessageProtocol]) {
val reply = result.asInstanceOf[RemoteMessageProtocol]
val replyUuid = uuidFrom(reply.getUuid.getHigh, reply.getUuid.getLow)
log.debug("Remote client received RemoteMessageProtocol[\n{}]",reply)
val future = futures.get(replyUuid).asInstanceOf[CompletableFuture[Any]]
if (reply.hasMessage) {
if (future eq null) throw new IllegalActorStateException("Future mapped to UUID " + replyUuid + " does not exist")
val message = MessageSerializer.deserialize(reply.getMessage)
future.completeWithResult(message)
} else {
if (reply.hasSupervisorUuid()) {
val supervisorUuid = uuidFrom(reply.getSupervisorUuid.getHigh, reply.getSupervisorUuid.getLow)
if (!supervisors.containsKey(supervisorUuid)) throw new IllegalActorStateException(
"Expected a registered supervisor for UUID [" + supervisorUuid + "] but none was found")
val supervisedActor = supervisors.get(supervisorUuid)
if (!supervisedActor.supervisor.isDefined) throw new IllegalActorStateException(
"Can't handle restart for remote actor " + supervisedActor + " since its supervisor has been removed")
else supervisedActor.supervisor.get ! Exit(supervisedActor, parseException(reply, client.loader))
}
val exception = parseException(reply, client.loader)
future.completeWithException(exception)
}
futures remove replyUuid
} else {
val exception = new RemoteClientException("Unknown message received in remote client handler: " + result, client)
client.notifyListeners(RemoteClientError(exception, client))
throw exception
}
} catch {
case e: Exception =>
client.notifyListeners(RemoteClientError(e, client))
log.slf4j.error("Unexpected exception in remote client handler: {}", e)
throw e
}
}
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn {
if (client.isWithinReconnectionTimeWindow) {
timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = {
client.openChannels.remove(event.getChannel)
client.isAuthenticated.set(false)
log.slf4j.debug("Remote client reconnecting to [{}]", remoteAddress)
client.connection = bootstrap.connect(remoteAddress)
client.connection.awaitUninterruptibly // Wait until the connection attempt succeeds or fails.
if (!client.connection.isSuccess) {
client.notifyListeners(RemoteClientError(client.connection.getCause, client))
log.slf4j.error("Reconnection to [{}] has failed", remoteAddress)
log.slf4j.debug("Reconnection failed", client.connection.getCause)
}
}
}, RemoteClient.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.shutdown }
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
def connect = {
client.notifyListeners(RemoteClientConnected(client))
log.slf4j.debug("Remote client connected to [{}]", ctx.getChannel.getRemoteAddress)
client.resetReconnectionTimeWindow
}
if (RemoteServer.SECURE) {
val sslHandler: SslHandler = ctx.getPipeline.get(classOf[SslHandler])
sslHandler.handshake.addListener(new ChannelFutureListener {
def operationComplete(future: ChannelFuture): Unit = {
if (future.isSuccess) connect
else throw new RemoteClientException("Could not establish SSL handshake", client)
}
})
} else connect
}
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
client.notifyListeners(RemoteClientDisconnected(client))
log.slf4j.debug("Remote client disconnected from [{}]", ctx.getChannel.getRemoteAddress)
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.notifyListeners(RemoteClientError(event.getCause, client))
if (event.getCause ne null)
log.slf4j.error("Unexpected exception from downstream in remote client", event.getCause)
else
log.slf4j.error("Unexpected exception from downstream in remote client: {}", event)
event.getChannel.close
}
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
val exception = reply.getException
val classname = exception.getClassname
val exceptionClass = if (loader.isDefined) loader.get.loadClass(classname)
else Class.forName(classname)
exceptionClass
.getConstructor(Array[Class[_]](classOf[String]): _*)
.newInstance(exception.getMessage).asInstanceOf[Throwable]
}
}

View file

@ -17,7 +17,7 @@ import scala.collection.immutable.Stack
import com.google.protobuf.ByteString
import akka.util.ReflectiveAccess
import akka.util.ReflectiveAccess.RemoteServerModule.{HOSTNAME,PORT}
import akka.util.ReflectiveAccess.Remote.{HOSTNAME,PORT}
/**
* Type class definition for Actor Serialization
@ -191,6 +191,7 @@ object ActorSerialization {
}
val ar = new LocalActorRef(
ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance?
uuidFrom(protocol.getUuid.getHigh, protocol.getUuid.getLow),
protocol.getId,
protocol.getOriginalAddress.getHostname,
@ -230,6 +231,7 @@ object RemoteActorSerialization {
private[akka] def fromProtobufToRemoteActorRef(protocol: RemoteActorRefProtocol, loader: Option[ClassLoader]): ActorRef = {
Actor.log.slf4j.debug("Deserializing RemoteActorRefProtocol to RemoteActorRef:\n {}", protocol)
RemoteActorRef(
ActorRegistry,//TODO: REVISIST: Change to an implicit ActorRegistryInstance?
protocol.getClassOrServiceName,
protocol.getActorClassname,
protocol.getHomeAddress.getHostname,
@ -245,8 +247,10 @@ object RemoteActorSerialization {
def toRemoteActorRefProtocol(ar: ActorRef): RemoteActorRefProtocol = {
import ar._
Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",Array[AnyRef](actorClassName, HOSTNAME, PORT.asInstanceOf[AnyRef]))
ActorRegistry.remote.registerByUuid(ar)
Actor.log.slf4j.debug("Register serialized Actor [{}] as remote @ [{}:{}]",
Array[AnyRef](actorClassName, registry.remote.hostname, registry.remote.port.asInstanceOf[AnyRef]))
registry.remote.registerByUuid(ar)
RemoteActorRefProtocol.newBuilder
.setClassOrServiceName(uuid.toString)

View file

@ -4,10 +4,11 @@ import java.util.concurrent.{CountDownLatch, TimeUnit}
import org.scalatest.junit.JUnitSuite
import org.junit.{Test, Before, After}
import akka.remote.{RemoteServer, RemoteClient}
import akka.dispatch.Dispatchers
import akka.actor.{ActorRef, Actor}
import Actor._
import akka.remote. {NettyRemoteSupport, RemoteServer, RemoteClient}
import akka.actor. {RemoteActorRef, ActorRegistryInstance, ActorRef, Actor}
class ExpectedRemoteProblem extends RuntimeException
object ClientInitiatedRemoteActorSpec {
case class Send(actor: Actor)
@ -28,8 +29,7 @@ object ClientInitiatedRemoteActorSpec {
def receive = {
case "Hello" =>
self.reply("World")
case "Failure" =>
throw new RuntimeException("Expected exception; to test fault-tolerance")
case "Failure" => throw new ExpectedRemoteProblem
}
}
@ -40,6 +40,12 @@ object ClientInitiatedRemoteActorSpec {
}
}
class CountDownActor(latch: CountDownLatch) extends Actor {
def receive = {
case "World" => latch.countDown
}
}
object SendOneWayAndReplySenderActor {
val latch = new CountDownLatch(1)
}
@ -74,59 +80,54 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
val HOSTNAME = "localhost"
val PORT1 = 9990
val PORT2 = 9991
var s1: RemoteServer = null
var s1,s2: ActorRegistryInstance = null
private val unit = TimeUnit.MILLISECONDS
@Before
def init() {
s1 = new RemoteServer()
s1.start(HOSTNAME, PORT1)
Thread.sleep(1000)
s1 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_)))
s2 = new ActorRegistryInstance(Some(new NettyRemoteSupport(_)))
s1.remote.start(HOSTNAME, PORT1)
s2.remote.start(HOSTNAME, PORT2)
Thread.sleep(2000)
}
@After
def finished() {
s1.shutdown
val s2 = RemoteServer.serverFor(HOSTNAME, PORT2)
if (s2.isDefined) s2.get.shutdown
RemoteClient.shutdownAll
s1.remote.shutdown
s2.remote.shutdown
s1.shutdownAll
s2.shutdownAll
Thread.sleep(1000)
}
@Test
def shouldSendOneWay = {
val actor = actorOf[RemoteActorSpecActorUnidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
actor ! "OneWay"
val clientManaged = s1.actorOf[RemoteActorSpecActorUnidirectional](HOSTNAME,PORT2).start
//implicit val self = Some(s2.actorOf[RemoteActorSpecActorUnidirectional].start)
assert(clientManaged ne null)
assert(clientManaged.getClass.equals(classOf[RemoteActorRef]))
clientManaged ! "OneWay"
assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
actor.stop
clientManaged.stop
}
@Test
def shouldSendOneWayAndReceiveReply = {
val actor = actorOf[SendOneWayAndReplyReceiverActor]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val sender = actorOf[SendOneWayAndReplySenderActor]
sender.homeAddress = (HOSTNAME, PORT2)
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendTo = actor
sender.start
sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].sendOff
assert(SendOneWayAndReplySenderActor.latch.await(3, TimeUnit.SECONDS))
assert(sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.isDefined === true)
assert("World" === sender.actor.asInstanceOf[SendOneWayAndReplySenderActor].state.get.asInstanceOf[String])
actor.stop
sender.stop
val latch = new CountDownLatch(1)
val actor = s2.actorOf[SendOneWayAndReplyReceiverActor](HOSTNAME, PORT1).start
implicit val sender = Some(s1.actorOf(new CountDownActor(latch)).start)
actor ! "OneWay"
assert(latch.await(3,TimeUnit.SECONDS))
}
@Test
def shouldSendBangBangMessageAndReceiveReply = {
val actor = actorOf[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start
val result = actor !! "Hello"
assert("World" === result.get.asInstanceOf[String])
actor.stop
@ -134,29 +135,20 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
@Test
def shouldSendBangBangMessageAndReceiveReplyConcurrently = {
val actors = (1 to 10).
map(num => {
val a = actorOf[RemoteActorSpecActorBidirectional]
a.makeRemote(HOSTNAME, PORT1)
a.start
}).toList
val actors = (1 to 10).map(num => { s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start }).toList
actors.map(_ !!! "Hello").foreach(future => assert("World" === future.await.result.asInstanceOf[Option[String]].get))
actors.foreach(_.stop)
}
@Test
def shouldRegisterActorByUuid {
val actor1 = actorOf[MyActorCustomConstructor]
actor1.makeRemote(HOSTNAME, PORT1)
actor1.start
val actor1 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start
actor1 ! "incrPrefix"
assert((actor1 !! "test").get === "1-test")
actor1 ! "incrPrefix"
assert((actor1 !! "test").get === "2-test")
val actor2 = actorOf[MyActorCustomConstructor]
actor2.makeRemote(HOSTNAME, PORT1)
actor2.start
val actor2 = s2.actorOf[MyActorCustomConstructor](HOSTNAME, PORT1).start
assert((actor2 !! "test").get === "default-test")
@ -164,19 +156,11 @@ class ClientInitiatedRemoteActorSpec extends JUnitSuite {
actor2.stop
}
@Test
@Test(expected=classOf[ExpectedRemoteProblem])
def shouldSendAndReceiveRemoteException {
implicit val timeout = 500000000L
val actor = actorOf[RemoteActorSpecActorBidirectional]
actor.makeRemote(HOSTNAME, PORT1)
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assert("Expected exception; to test fault-tolerance" === e.getMessage())
}
val actor = s2.actorOf[RemoteActorSpecActorBidirectional](HOSTNAME, PORT1).start
actor !! "Failure"
actor.stop
}
}

View file

@ -76,9 +76,6 @@ object RemoteSupervisorSpec {
var server: RemoteServer = null
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteSupervisorSpec extends JUnitSuite {
import RemoteSupervisorSpec._

View file

@ -4,10 +4,9 @@
package sample.remote
import akka.actor.Actor
import akka.actor.Actor._
import akka.remote.{RemoteClient, RemoteNode}
import akka.util.Logging
import akka.actor. {ActorRegistry, Actor}
class HelloWorldActor extends Actor {
def receive = {
@ -20,9 +19,9 @@ class HelloWorldActor extends Actor {
object ServerManagedRemoteActorServer extends Logging {
def run = {
RemoteNode.start("localhost", 2552)
ActorRegistry.remote.start("localhost", 2552)
log.slf4j.info("Remote node started")
RemoteNode.register("hello-service", actorOf[HelloWorldActor])
ActorRegistry.remote.register("hello-service", actorOf[HelloWorldActor])
log.slf4j.info("Remote actor registered and started")
}
@ -32,7 +31,7 @@ object ServerManagedRemoteActorServer extends Logging {
object ServerManagedRemoteActorClient extends Logging {
def run = {
val actor = RemoteClient.actorFor("hello-service", "localhost", 2552)
val actor = ActorRegistry.remote.actorFor("hello-service", "localhost", 2552)
log.slf4j.info("Remote client created")
log.slf4j.info("Sending 'Hello' to remote actor")
val result = actor !! "Hello"

View file

@ -841,8 +841,8 @@ private[akka] abstract class ActorAspect {
val isOneWay = TypedActor.isOneWay(methodRtti)
val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues)
val future = RemoteClientModule.send[AnyRef](
//TODO: REVISIT: MAKE REGISTRY COME FROM ACTORREF
val future = ActorRegistry.remote.send[AnyRef](
message, None, None, remoteAddress.get,
timeout, isOneWay, actorRef,
Some((interfaceClass.getName, methodRtti.getMethod.getName)),