Merge with upstream
This commit is contained in:
commit
8886de10bb
7 changed files with 262 additions and 45 deletions
|
|
@ -1363,7 +1363,8 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
val hostname: String,
|
val hostname: String,
|
||||||
val port: Int,
|
val port: Int,
|
||||||
_timeout: Long,
|
_timeout: Long,
|
||||||
loader: Option[ClassLoader])
|
loader: Option[ClassLoader],
|
||||||
|
val actorType: ActorType = ActorType.ScalaActor)
|
||||||
extends ActorRef with ScalaActorRef {
|
extends ActorRef with ScalaActorRef {
|
||||||
|
|
||||||
ensureRemotingEnabled
|
ensureRemotingEnabled
|
||||||
|
|
@ -1376,7 +1377,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
|
|
||||||
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
|
def postMessageToMailbox(message: Any, senderOption: Option[ActorRef]): Unit =
|
||||||
RemoteClientModule.send[Any](
|
RemoteClientModule.send[Any](
|
||||||
message, senderOption, None, remoteAddress.get, timeout, true, this, None, ActorType.ScalaActor)
|
message, senderOption, None, remoteAddress.get, timeout, true, this, None, actorType)
|
||||||
|
|
||||||
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
|
def postMessageToMailboxAndCreateFutureResultWithTimeout[T](
|
||||||
message: Any,
|
message: Any,
|
||||||
|
|
@ -1384,7 +1385,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
||||||
senderOption: Option[ActorRef],
|
senderOption: Option[ActorRef],
|
||||||
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
senderFuture: Option[CompletableFuture[T]]): CompletableFuture[T] = {
|
||||||
val future = RemoteClientModule.send[T](
|
val future = RemoteClientModule.send[T](
|
||||||
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, ActorType.ScalaActor)
|
message, senderOption, senderFuture, remoteAddress.get, timeout, false, this, None, actorType)
|
||||||
if (future.isDefined) future.get
|
if (future.isDefined) future.get
|
||||||
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
|
else throw new IllegalActorStateException("Expected a future from remote call to actor " + toString)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ import java.util.concurrent.atomic.AtomicLong
|
||||||
|
|
||||||
import scala.collection.mutable.{HashSet, HashMap}
|
import scala.collection.mutable.{HashSet, HashMap}
|
||||||
import scala.reflect.BeanProperty
|
import scala.reflect.BeanProperty
|
||||||
|
import se.scalablesolutions.akka.actor._
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Atomic remote request/reply message id generator.
|
* Atomic remote request/reply message id generator.
|
||||||
|
|
@ -76,8 +77,6 @@ object RemoteClient extends Logging {
|
||||||
private val remoteClients = new HashMap[String, RemoteClient]
|
private val remoteClients = new HashMap[String, RemoteClient]
|
||||||
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
|
private val remoteActors = new HashMap[RemoteServer.Address, HashSet[String]]
|
||||||
|
|
||||||
// FIXME: simplify overloaded methods when we have Scala 2.8
|
|
||||||
|
|
||||||
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
|
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
|
||||||
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
|
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
|
||||||
|
|
||||||
|
|
@ -99,6 +98,27 @@ object RemoteClient extends Logging {
|
||||||
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||||
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
|
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
|
||||||
|
|
||||||
|
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = {
|
||||||
|
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = {
|
||||||
|
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
|
||||||
|
}
|
||||||
|
|
||||||
|
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
|
||||||
|
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
|
||||||
|
}
|
||||||
|
|
||||||
|
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
|
||||||
|
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = {
|
||||||
|
val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor)
|
||||||
|
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
|
||||||
|
}
|
||||||
|
|
||||||
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||||
RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))
|
RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
|
||||||
import java.util.{Map => JMap}
|
import java.util.{Map => JMap}
|
||||||
|
|
||||||
import se.scalablesolutions.akka.actor.{
|
import se.scalablesolutions.akka.actor.{
|
||||||
Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
||||||
import se.scalablesolutions.akka.actor.Actor._
|
import se.scalablesolutions.akka.actor.Actor._
|
||||||
import se.scalablesolutions.akka.util._
|
import se.scalablesolutions.akka.util._
|
||||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||||
|
|
@ -133,8 +133,8 @@ object RemoteServer {
|
||||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def registerTypedActor(address: InetSocketAddress, name: String, typedActor: AnyRef) = guard.withWriteGuard {
|
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
|
||||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(name, typedActor)
|
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
private[akka] def getOrCreateServer(address: InetSocketAddress): RemoteServer = guard.withWriteGuard {
|
||||||
|
|
@ -271,7 +271,18 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: register typed actor in RemoteServer as well
|
/**
|
||||||
|
* Register remote typed actor by a specific id.
|
||||||
|
* @param id custom actor id
|
||||||
|
* @param typedActor typed actor to register
|
||||||
|
*/
|
||||||
|
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
||||||
|
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
|
||||||
|
if (!typedActors.contains(id)) {
|
||||||
|
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
|
||||||
|
typedActors.put(id, typedActor)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
||||||
|
|
@ -321,11 +332,24 @@ class RemoteServer extends Logging with ListenerManagement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unregister Remote Typed Actor by specific 'id'.
|
||||||
|
* <p/>
|
||||||
|
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||||
|
*/
|
||||||
|
def unregisterTypedActor(id: String):Unit = synchronized {
|
||||||
|
if (_isRunning) {
|
||||||
|
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
||||||
|
val registeredTypedActors = typedActors()
|
||||||
|
registeredTypedActors.remove(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
protected override def manageLifeCycleOfListeners = false
|
protected override def manageLifeCycleOfListeners = false
|
||||||
|
|
||||||
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
||||||
|
|
||||||
private[akka] def actors() = RemoteServer.actorsFor(address).actors
|
private[akka] def actors() = RemoteServer.actorsFor(address).actors
|
||||||
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
|
private[akka] def typedActors() = RemoteServer.actorsFor(address).typedActors
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@
|
||||||
<system id="akka">
|
<system id="akka">
|
||||||
<package name="se.scalablesolutions.akka.actor">
|
<package name="se.scalablesolutions.akka.actor">
|
||||||
<aspect class="TypedActorAspect" />
|
<aspect class="TypedActorAspect" />
|
||||||
|
<aspect class="ServerManagedTypedActorAspect" />
|
||||||
</package>
|
</package>
|
||||||
</system>
|
</system>
|
||||||
</aspectwerkz>
|
</aspectwerkz>
|
||||||
|
|
|
||||||
|
|
@ -144,5 +144,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
||||||
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
|
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
|
||||||
actor.stop
|
actor.stop
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,112 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.actor.remote
|
||||||
|
|
||||||
|
import org.scalatest.Spec
|
||||||
|
import org.scalatest.matchers.ShouldMatchers
|
||||||
|
import org.scalatest.BeforeAndAfterAll
|
||||||
|
import org.scalatest.junit.JUnitRunner
|
||||||
|
import org.junit.runner.RunWith
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
|
||||||
|
import se.scalablesolutions.akka.actor._
|
||||||
|
import RemoteTypedActorLog._
|
||||||
|
|
||||||
|
object ServerInitiatedRemoteTypedActorSpec {
|
||||||
|
val HOSTNAME = "localhost"
|
||||||
|
val PORT = 9990
|
||||||
|
var server: RemoteServer = null
|
||||||
|
}
|
||||||
|
|
||||||
|
@RunWith(classOf[JUnitRunner])
|
||||||
|
class ServerInitiatedRemoteTypedActorSpec extends
|
||||||
|
Spec with
|
||||||
|
ShouldMatchers with
|
||||||
|
BeforeAndAfterAll {
|
||||||
|
import ServerInitiatedRemoteTypedActorSpec._
|
||||||
|
|
||||||
|
private val unit = TimeUnit.MILLISECONDS
|
||||||
|
|
||||||
|
|
||||||
|
override def beforeAll = {
|
||||||
|
server = new RemoteServer()
|
||||||
|
server.start(HOSTNAME, PORT)
|
||||||
|
|
||||||
|
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||||
|
server.registerTypedActor("typed-actor-service", typedActor)
|
||||||
|
|
||||||
|
Thread.sleep(1000)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure the servers shutdown cleanly after the test has finished
|
||||||
|
override def afterAll = {
|
||||||
|
try {
|
||||||
|
server.shutdown
|
||||||
|
RemoteClient.shutdownAll
|
||||||
|
Thread.sleep(1000)
|
||||||
|
} catch {
|
||||||
|
case e => ()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
describe("Server managed remote typed Actor ") {
|
||||||
|
|
||||||
|
it("should receive one-way message") {
|
||||||
|
clearMessageLogs
|
||||||
|
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||||
|
expect("oneway") {
|
||||||
|
actor.oneWay
|
||||||
|
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should respond to request-reply message") {
|
||||||
|
clearMessageLogs
|
||||||
|
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||||
|
expect("pong") {
|
||||||
|
actor.requestReply("ping")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should not recreate registered actors") {
|
||||||
|
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||||
|
val numberOfActorsInRegistry = ActorRegistry.actors.length
|
||||||
|
expect("oneway") {
|
||||||
|
actor.oneWay
|
||||||
|
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should support multiple variants to get the actor from client side") {
|
||||||
|
var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||||
|
expect("oneway") {
|
||||||
|
actor.oneWay
|
||||||
|
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT)
|
||||||
|
expect("oneway") {
|
||||||
|
actor.oneWay
|
||||||
|
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader)
|
||||||
|
expect("oneway") {
|
||||||
|
actor.oneWay
|
||||||
|
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
it("should register and unregister typed actors") {
|
||||||
|
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||||
|
server.registerTypedActor("my-test-service", typedActor)
|
||||||
|
assert(server.typedActors().get("my-test-service") != null)
|
||||||
|
server.unregisterTypedActor("my-test-service")
|
||||||
|
assert(server.typedActors().get("my-test-service") == null)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
|
@ -16,9 +16,8 @@ import org.codehaus.aspectwerkz.proxy.Proxy
|
||||||
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
|
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
|
||||||
|
|
||||||
import java.net.InetSocketAddress
|
import java.net.InetSocketAddress
|
||||||
import java.lang.reflect.{InvocationTargetException, Method, Field}
|
|
||||||
|
|
||||||
import scala.reflect.BeanProperty
|
import scala.reflect.BeanProperty
|
||||||
|
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy => JProxy}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* TypedActor is a type-safe actor made out of a POJO with interface.
|
* TypedActor is a type-safe actor made out of a POJO with interface.
|
||||||
|
|
@ -408,24 +407,47 @@ object TypedActor extends Logging {
|
||||||
proxy.asInstanceOf[T]
|
proxy.asInstanceOf[T]
|
||||||
}
|
}
|
||||||
|
|
||||||
/*
|
/**
|
||||||
// NOTE: currently not used - but keep it around
|
* Create a proxy for a RemoteActorRef representing a server managed remote typed actor.
|
||||||
private[akka] def newInstance[T <: TypedActor](targetClass: Class[T],
|
*
|
||||||
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
*/
|
||||||
val proxy = {
|
private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = {
|
||||||
val instance = Proxy.newInstance(targetClass, true, false)
|
|
||||||
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
|
class MyInvocationHandler extends InvocationHandler {
|
||||||
else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
|
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = {
|
||||||
|
// do nothing, this is just a dummy
|
||||||
|
null
|
||||||
|
}
|
||||||
}
|
}
|
||||||
val context = injectTypedActorContext(proxy)
|
val handler = new MyInvocationHandler()
|
||||||
actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
|
|
||||||
actorRef.timeout = timeout
|
val interfaces = Array(intfClass, classOf[ServerManagedTypedActor]).asInstanceOf[Array[java.lang.Class[_]]]
|
||||||
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
|
val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler)
|
||||||
AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
|
val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false)
|
||||||
actorRef.start
|
|
||||||
proxy.asInstanceOf[T]
|
AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L))
|
||||||
|
awProxy.asInstanceOf[T]
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
// NOTE: currently not used - but keep it around
|
||||||
|
private[akka] def newInstance[T <: TypedActor](targetClass: Class[T],
|
||||||
|
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
|
||||||
|
val proxy = {
|
||||||
|
val instance = Proxy.newInstance(targetClass, true, false)
|
||||||
|
if (instance.isInstanceOf[TypedActor]) instance.asInstanceOf[TypedActor]
|
||||||
|
else throw new IllegalActorStateException("Actor [" + targetClass.getName + "] is not a sub class of 'TypedActor'")
|
||||||
|
}
|
||||||
|
val context = injectTypedActorContext(proxy)
|
||||||
|
actorRef.actor.asInstanceOf[Dispatcher].initialize(targetClass, proxy, proxy, context)
|
||||||
|
actorRef.timeout = timeout
|
||||||
|
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
|
||||||
|
AspectInitRegistry.register(proxy, AspectInit(targetClass, proxy, actorRef, remoteAddress, timeout))
|
||||||
|
actorRef.start
|
||||||
|
proxy.asInstanceOf[T]
|
||||||
|
}
|
||||||
|
*/
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stops the current Typed Actor.
|
* Stops the current Typed Actor.
|
||||||
|
|
@ -546,6 +568,30 @@ object TypedActor extends Logging {
|
||||||
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor.
|
||||||
|
* <p/>
|
||||||
|
* Is deployed on a 'perInstance' basis with the pointcut 'execution(* *.*(..))',
|
||||||
|
* e.g. all methods on the instance.
|
||||||
|
*
|
||||||
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
|
*/
|
||||||
|
@Aspect("perInstance")
|
||||||
|
private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect {
|
||||||
|
|
||||||
|
@Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
|
||||||
|
def invoke(joinPoint: JoinPoint): AnyRef = {
|
||||||
|
if (!isInitialized) initialize(joinPoint)
|
||||||
|
remoteDispatch(joinPoint)
|
||||||
|
}
|
||||||
|
|
||||||
|
override def initialize(joinPoint: JoinPoint): Unit = {
|
||||||
|
super.initialize(joinPoint)
|
||||||
|
remoteAddress = actorRef.remoteAddress
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* AspectWerkz Aspect that is turning POJO into TypedActor.
|
* AspectWerkz Aspect that is turning POJO into TypedActor.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
@ -555,18 +601,9 @@ object TypedActor extends Logging {
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
@Aspect("perInstance")
|
@Aspect("perInstance")
|
||||||
private[akka] sealed class TypedActorAspect {
|
private[akka] sealed class TypedActorAspect extends ActorAspect {
|
||||||
@volatile private var isInitialized = false
|
|
||||||
@volatile private var isStopped = false
|
|
||||||
private var interfaceClass: Class[_] = _
|
|
||||||
private var typedActor: TypedActor = _
|
|
||||||
private var actorRef: ActorRef = _
|
|
||||||
private var remoteAddress: Option[InetSocketAddress] = _
|
|
||||||
private var timeout: Long = _
|
|
||||||
private var uuid: String = _
|
|
||||||
@volatile private var instance: TypedActor = _
|
|
||||||
|
|
||||||
@Around("execution(* *.*(..))")
|
@Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
|
||||||
def invoke(joinPoint: JoinPoint): AnyRef = {
|
def invoke(joinPoint: JoinPoint): AnyRef = {
|
||||||
if (!isInitialized) initialize(joinPoint)
|
if (!isInitialized) initialize(joinPoint)
|
||||||
dispatch(joinPoint)
|
dispatch(joinPoint)
|
||||||
|
|
@ -576,12 +613,26 @@ private[akka] sealed class TypedActorAspect {
|
||||||
if (remoteAddress.isDefined) remoteDispatch(joinPoint)
|
if (remoteAddress.isDefined) remoteDispatch(joinPoint)
|
||||||
else localDispatch(joinPoint)
|
else localDispatch(joinPoint)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
|
/**
|
||||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
* Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication.
|
||||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
*/
|
||||||
|
private[akka] abstract class ActorAspect {
|
||||||
|
@volatile protected var isInitialized = false
|
||||||
|
@volatile protected var isStopped = false
|
||||||
|
protected var interfaceClass: Class[_] = _
|
||||||
|
protected var typedActor: TypedActor = _
|
||||||
|
protected var actorRef: ActorRef = _
|
||||||
|
protected var timeout: Long = _
|
||||||
|
protected var uuid: String = _
|
||||||
|
protected var remoteAddress: Option[InetSocketAddress] = _
|
||||||
|
|
||||||
|
protected def localDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||||
|
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||||
|
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||||
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
|
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
|
||||||
val senderProxy = Some(SenderContextInfo.senderProxy.value)
|
val senderProxy = Some(SenderContextInfo.senderProxy.value)
|
||||||
|
|
||||||
typedActor.context._sender = senderProxy
|
typedActor.context._sender = senderProxy
|
||||||
if (!actorRef.isRunning && !isStopped) {
|
if (!actorRef.isRunning && !isStopped) {
|
||||||
|
|
@ -602,7 +653,7 @@ private[akka] sealed class TypedActorAspect {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
|
protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||||
|
|
||||||
|
|
@ -641,7 +692,7 @@ private[akka] sealed class TypedActorAspect {
|
||||||
(escapedArgs, isEscaped)
|
(escapedArgs, isEscaped)
|
||||||
}
|
}
|
||||||
|
|
||||||
private def initialize(joinPoint: JoinPoint): Unit = {
|
protected def initialize(joinPoint: JoinPoint): Unit = {
|
||||||
val init = AspectInitRegistry.initFor(joinPoint.getThis)
|
val init = AspectInitRegistry.initFor(joinPoint.getThis)
|
||||||
interfaceClass = init.interfaceClass
|
interfaceClass = init.interfaceClass
|
||||||
typedActor = init.targetInstance
|
typedActor = init.targetInstance
|
||||||
|
|
@ -653,6 +704,7 @@ private[akka] sealed class TypedActorAspect {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal helper class to help pass the contextual information between threads.
|
* Internal helper class to help pass the contextual information between threads.
|
||||||
*
|
*
|
||||||
|
|
@ -704,5 +756,11 @@ private[akka] sealed case class AspectInit(
|
||||||
val timeout: Long) {
|
val timeout: Long) {
|
||||||
def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) =
|
def this(interfaceClass: Class[_], targetInstance: TypedActor, actorRef: ActorRef, timeout: Long) =
|
||||||
this(interfaceClass, targetInstance, actorRef, None, timeout)
|
this(interfaceClass, targetInstance, actorRef, None, timeout)
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marker interface for server manager typed actors.
|
||||||
|
*/
|
||||||
|
private[akka] sealed trait ServerManagedTypedActor extends TypedActor
|
||||||
Loading…
Add table
Add a link
Reference in a new issue