implemented server managed typed actor
This commit is contained in:
parent
60d4010421
commit
ec61c29f21
8 changed files with 155 additions and 202 deletions
|
|
@ -1363,7 +1363,7 @@ private[akka] case class RemoteActorRef private[akka] (
|
|||
val port: Int,
|
||||
_timeout: Long,
|
||||
loader: Option[ClassLoader],
|
||||
val actorType: ActorType = ActorType.ScalaActor )
|
||||
val actorType: ActorType = ActorType.ScalaActor)
|
||||
extends ActorRef with ScalaActorRef {
|
||||
|
||||
ensureRemotingEnabled
|
||||
|
|
|
|||
|
|
@ -80,16 +80,6 @@ object RemoteClient extends Logging {
|
|||
def actorFor(classNameOrServiceId: String, hostname: String, port: Int): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, None)
|
||||
|
||||
// FIXME:
|
||||
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int) : T = {
|
||||
|
||||
println("### create RemoteActorRef")
|
||||
val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, None, ActorType.TypedActor)
|
||||
val proxy = TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
|
||||
proxy
|
||||
|
||||
}
|
||||
|
||||
def actorFor(classNameOrServiceId: String, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
actorFor(classNameOrServiceId, classNameOrServiceId, 5000L, hostname, port, Some(loader))
|
||||
|
||||
|
|
@ -108,8 +98,26 @@ object RemoteClient extends Logging {
|
|||
def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int): ActorRef =
|
||||
RemoteActorRef(serviceId, className, hostname, port, timeout, None)
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, hostname: String, port: Int) : T = {
|
||||
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, 5000L, hostname, port, None)
|
||||
}
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int) : T = {
|
||||
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, None)
|
||||
}
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceIdOrClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
|
||||
typedActorFor(intfClass, serviceIdOrClassName, serviceIdOrClassName, timeout, hostname, port, Some(loader))
|
||||
}
|
||||
|
||||
def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader) : T = {
|
||||
typedActorFor(intfClass, serviceId, implClassName, timeout, hostname, port, Some(loader))
|
||||
}
|
||||
|
||||
private[akka] def typedActorFor[T](intfClass: Class[T], serviceId: String, implClassName: String, timeout: Long, hostname: String, port: Int, loader: Option[ClassLoader]) : T = {
|
||||
val actorRef = RemoteActorRef(serviceId, implClassName, hostname, port, timeout, loader, ActorType.TypedActor)
|
||||
TypedActor.createProxyForRemoteActorRef(intfClass, actorRef)
|
||||
}
|
||||
|
||||
private[akka] def actorFor(serviceId: String, className: String, timeout: Long, hostname: String, port: Int, loader: ClassLoader): ActorRef =
|
||||
RemoteActorRef(serviceId, className, hostname, port, timeout, Some(loader))
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ import java.util.concurrent.{ConcurrentHashMap, Executors}
|
|||
import java.util.{Map => JMap}
|
||||
|
||||
import se.scalablesolutions.akka.actor.{
|
||||
Actor, TypedActor, ActorRef, LocalActorRef, RemoteActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
||||
Actor, TypedActor, ActorRef, IllegalActorStateException, RemoteActorSystemMessage}
|
||||
import se.scalablesolutions.akka.actor.Actor._
|
||||
import se.scalablesolutions.akka.util._
|
||||
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
|
||||
|
|
@ -120,12 +120,9 @@ object RemoteServer {
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME private
|
||||
class RemoteActorSet {
|
||||
//private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
//private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
private class RemoteActorSet {
|
||||
private[RemoteServer] val actors = new ConcurrentHashMap[String, ActorRef]
|
||||
private[RemoteServer] val typedActors = new ConcurrentHashMap[String, AnyRef]
|
||||
}
|
||||
|
||||
private val guard = new ReadWriteGuard
|
||||
|
|
@ -133,13 +130,10 @@ object RemoteServer {
|
|||
private val remoteServers = Map[Address, RemoteServer]()
|
||||
|
||||
private[akka] def registerActor(address: InetSocketAddress, uuid: String, actor: ActorRef) = guard.withWriteGuard {
|
||||
// FIXME
|
||||
//actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
val actors = actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors
|
||||
actors.put(uuid, actor)
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).actors.put(uuid, actor)
|
||||
}
|
||||
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: TypedActor) = guard.withWriteGuard {
|
||||
private[akka] def registerTypedActor(address: InetSocketAddress, uuid: String, typedActor: AnyRef) = guard.withWriteGuard {
|
||||
actorsFor(RemoteServer.Address(address.getHostName, address.getPort)).typedActors.put(uuid, typedActor)
|
||||
}
|
||||
|
||||
|
|
@ -165,9 +159,7 @@ object RemoteServer {
|
|||
remoteServers.remove(Address(hostname, port))
|
||||
}
|
||||
|
||||
// FIXME
|
||||
def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
println("##### actorsFor SIZE=" + remoteActorSets.size)
|
||||
private def actorsFor(remoteServerAddress: RemoteServer.Address): RemoteActorSet = {
|
||||
remoteActorSets.getOrElseUpdate(remoteServerAddress,new RemoteActorSet)
|
||||
}
|
||||
}
|
||||
|
|
@ -278,23 +270,19 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
// FIXME: register typed actor in RemoteServer as well
|
||||
def registerTypedActor(id: String, actorRef: AnyRef): Unit = synchronized {
|
||||
/**
|
||||
* Register remote typed actor by a specific id.
|
||||
* @param id custom actor id
|
||||
* @param typedActor typed actor to register
|
||||
*/
|
||||
def registerTypedActor(id: String, typedActor: AnyRef): Unit = synchronized {
|
||||
val typedActors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
|
||||
if (!typedActors.contains(id)) {
|
||||
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", actorRef.getClass.getName, id, hostname, port)
|
||||
typedActors.put(id, actorRef)
|
||||
log.debug("Registering server side remote actor [%s] with id [%s] on [%s:%d]", typedActor.getClass.getName, id, hostname, port)
|
||||
typedActors.put(id, typedActor)
|
||||
}
|
||||
}
|
||||
|
||||
private def actors() : ConcurrentHashMap[String, ActorRef] = {
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
|
||||
}
|
||||
|
||||
private def typedActors() : ConcurrentHashMap[String, AnyRef] = {
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
|
||||
}
|
||||
|
||||
/**
|
||||
* Register Remote Actor by the Actor's 'id' field. It starts the Actor if it is not started already.
|
||||
*/
|
||||
|
|
@ -308,7 +296,6 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
def register(id: String, actorRef: ActorRef): Unit = synchronized {
|
||||
if (_isRunning) {
|
||||
val actors = RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
|
||||
println("___ register ___ " + actors.hashCode + " hostname=" + hostname + " port="+ port)
|
||||
if (!actors.contains(id)) {
|
||||
if (!actorRef.isRunning) actorRef.start
|
||||
log.debug("Registering server side remote actor [%s] with id [%s]", actorRef.actorClass.getName, id)
|
||||
|
|
@ -344,16 +331,27 @@ class RemoteServer extends Logging with ListenerManagement {
|
|||
}
|
||||
}
|
||||
|
||||
//FIXME: unregister typed Actor
|
||||
/**
|
||||
* Unregister Remote Typed Actor by specific 'id'.
|
||||
* <p/>
|
||||
* NOTE: You need to call this method if you have registered an actor by a custom ID.
|
||||
*/
|
||||
def unregisterTypedActor(id: String):Unit = synchronized {
|
||||
if (_isRunning) {
|
||||
log.info("Unregistering server side remote typed actor with id [%s]", id)
|
||||
val registeredTypedActors = typedActors()
|
||||
registeredTypedActors.remove(id)
|
||||
}
|
||||
}
|
||||
|
||||
protected override def manageLifeCycleOfListeners = false
|
||||
|
||||
protected[akka] override def foreachListener(f: (ActorRef) => Unit): Unit = super.foreachListener(f)
|
||||
|
||||
private def actors() : ConcurrentHashMap[String, ActorRef] = {
|
||||
private[akka] def actors() : ConcurrentHashMap[String, ActorRef] = {
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).actors
|
||||
}
|
||||
private def typedActors() : ConcurrentHashMap[String, AnyRef] = {
|
||||
private[akka] def typedActors() : ConcurrentHashMap[String, AnyRef] = {
|
||||
RemoteServer.actorsFor(RemoteServer.Address(hostname, port)).typedActors
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,12 +18,9 @@ public class RemoteTypedActorOneImpl extends TypedActor implements RemoteTypedAc
|
|||
}
|
||||
|
||||
public void oneWay() throws Exception {
|
||||
System.out.println("--------> got it!!!!!!");
|
||||
RemoteTypedActorLog.oneWayLog().put("oneway");
|
||||
}
|
||||
|
||||
|
||||
|
||||
@Override
|
||||
public void preRestart(Throwable e) {
|
||||
try { RemoteTypedActorLog.messageLog().put(e.getMessage()); } catch(Exception ex) {}
|
||||
|
|
|
|||
|
|
@ -144,5 +144,6 @@ class ServerInitiatedRemoteActorSpec extends JUnitSuite {
|
|||
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
|
||||
actor.stop
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -4,42 +4,46 @@
|
|||
|
||||
package se.scalablesolutions.akka.actor.remote
|
||||
|
||||
import java.util.concurrent.{CountDownLatch, TimeUnit}
|
||||
import org.scalatest.junit.JUnitSuite
|
||||
import org.junit.{Test, Before, After}
|
||||
import org.scalatest.Spec
|
||||
import org.scalatest.matchers.ShouldMatchers
|
||||
import org.scalatest.BeforeAndAfterAll
|
||||
import org.scalatest.junit.JUnitRunner
|
||||
import org.junit.runner.RunWith
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
import se.scalablesolutions.akka.remote.{RemoteServer, RemoteClient}
|
||||
import se.scalablesolutions.akka.actor._
|
||||
import RemoteTypedActorLog._
|
||||
|
||||
object ServerInitiatedRemoteTypedActorSpec {
|
||||
val HOSTNAME = "localhost"
|
||||
val PORT = 9990
|
||||
var server: RemoteServer = null
|
||||
|
||||
class SimpleActor extends Actor {
|
||||
def receive = {
|
||||
case _ => println("received message")
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite {
|
||||
@RunWith(classOf[JUnitRunner])
|
||||
class ServerInitiatedRemoteTypedActorSpec extends
|
||||
Spec with
|
||||
ShouldMatchers with
|
||||
BeforeAndAfterAll {
|
||||
import ServerInitiatedRemoteTypedActorSpec._
|
||||
|
||||
private val unit = TimeUnit.MILLISECONDS
|
||||
|
||||
|
||||
@Before
|
||||
def init {
|
||||
override def beforeAll = {
|
||||
server = new RemoteServer()
|
||||
server.start(HOSTNAME, PORT)
|
||||
|
||||
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||
server.registerTypedActor("typed-actor-service", typedActor)
|
||||
|
||||
Thread.sleep(1000)
|
||||
}
|
||||
|
||||
// make sure the servers shutdown cleanly after the test has finished
|
||||
@After
|
||||
def finished {
|
||||
override def afterAll = {
|
||||
try {
|
||||
server.shutdown
|
||||
RemoteClient.shutdownAll
|
||||
|
|
@ -49,38 +53,60 @@ class ServerInitiatedRemoteTypedActorSpec extends JUnitSuite {
|
|||
}
|
||||
}
|
||||
|
||||
describe("Server managed remote typed Actor ") {
|
||||
|
||||
@Test
|
||||
def shouldSendWithBang {
|
||||
it("should receive one-way message") {
|
||||
clearMessageLogs
|
||||
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||
expect("oneway") {
|
||||
actor.oneWay
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
val clientManangedTypedActor = TypedActor.newRemoteInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000, HOSTNAME, PORT)
|
||||
clientManangedTypedActor.requestReply("test-string")
|
||||
Thread.sleep(2000)
|
||||
println("###########")
|
||||
*/
|
||||
/*
|
||||
trace()
|
||||
val actor = Actor.actorOf[SimpleActor].start
|
||||
server.register("simple-actor", actor)
|
||||
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||
server.registerTypedActor("typed-actor-service", typedActor)
|
||||
println("### registered actor")
|
||||
trace()
|
||||
it("should respond to request-reply message") {
|
||||
clearMessageLogs
|
||||
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||
expect("pong") {
|
||||
actor.requestReply("ping")
|
||||
}
|
||||
}
|
||||
|
||||
//val actorRef = RemoteActorRef("typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, HOSTNAME, PORT, 5000L, None)
|
||||
//val myActor = TypedActor.createProxyForRemoteActorRef(classOf[RemoteTypedActorOne], actorRef)
|
||||
val myActor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", classOf[RemoteTypedActorOneImpl].getName, 5000L, HOSTNAME, PORT)
|
||||
println("### call one way")
|
||||
myActor.oneWay()
|
||||
Thread.sleep(3000)
|
||||
println("### call one way - done")
|
||||
*/
|
||||
//assert(RemoteActorSpecActorUnidirectional.latch.await(1, TimeUnit.SECONDS))
|
||||
//actor.stop
|
||||
/* */
|
||||
it("should not recreate registered actors") {
|
||||
val actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||
val numberOfActorsInRegistry = ActorRegistry.actors.length
|
||||
expect("oneway") {
|
||||
actor.oneWay
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
assert(numberOfActorsInRegistry === ActorRegistry.actors.length)
|
||||
}
|
||||
|
||||
it("should support multiple variants to get the actor from client side") {
|
||||
var actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT)
|
||||
expect("oneway") {
|
||||
actor.oneWay
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", HOSTNAME, PORT)
|
||||
expect("oneway") {
|
||||
actor.oneWay
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
actor = RemoteClient.typedActorFor(classOf[RemoteTypedActorOne], "typed-actor-service", 5000L, HOSTNAME, PORT, this.getClass().getClassLoader)
|
||||
expect("oneway") {
|
||||
actor.oneWay
|
||||
oneWayLog.poll(5, TimeUnit.SECONDS)
|
||||
}
|
||||
}
|
||||
|
||||
it("should register and unregister typed actors") {
|
||||
val typedActor = TypedActor.newInstance(classOf[RemoteTypedActorOne], classOf[RemoteTypedActorOneImpl], 1000)
|
||||
server.registerTypedActor("my-test-service", typedActor)
|
||||
assert(server.typedActors().get("my-test-service") != null)
|
||||
server.unregisterTypedActor("my-test-service")
|
||||
assert(server.typedActors().get("my-test-service") == null)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -182,7 +182,6 @@ abstract class TypedActor extends Actor with Proxyable {
|
|||
|
||||
case Link(proxy) => self.link(proxy)
|
||||
case Unlink(proxy) => self.unlink(proxy)
|
||||
case method: String => println("### got method")
|
||||
case unexpected => throw new IllegalActorStateException(
|
||||
"Unexpected message [" + unexpected + "] sent to [" + this + "]")
|
||||
}
|
||||
|
|
@ -408,8 +407,11 @@ object TypedActor extends Logging {
|
|||
proxy.asInstanceOf[T]
|
||||
}
|
||||
|
||||
// FIXME
|
||||
def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = {
|
||||
/**
|
||||
* Create a proxy for a RemoteActorRef representing a server managed remote typed actor.
|
||||
*
|
||||
*/
|
||||
private[akka] def createProxyForRemoteActorRef[T](intfClass: Class[T], actorRef: ActorRef): T = {
|
||||
|
||||
class MyInvocationHandler extends InvocationHandler {
|
||||
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = {
|
||||
|
|
@ -423,9 +425,6 @@ object TypedActor extends Logging {
|
|||
val jProxy = JProxy.newProxyInstance(intfClass.getClassLoader(), interfaces, handler)
|
||||
val awProxy = Proxy.newInstance(interfaces, Array(jProxy, jProxy), true, false)
|
||||
|
||||
// TODO: needed?
|
||||
//typedActor.initialize(proxy)
|
||||
// TODO: timeout??
|
||||
AspectInitRegistry.register(awProxy, AspectInit(intfClass, null, actorRef, None, 5000L))
|
||||
awProxy.asInstanceOf[T]
|
||||
}
|
||||
|
|
@ -569,6 +568,7 @@ object TypedActor extends Logging {
|
|||
private[akka] def isJoinPoint(message: Any): Boolean = message.isInstanceOf[JoinPoint]
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* AspectWerkz Aspect that is turning POJO into proxy to a server managed remote TypedActor.
|
||||
* <p/>
|
||||
|
|
@ -578,103 +578,18 @@ object TypedActor extends Logging {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@Aspect("perInstance")
|
||||
private[akka] sealed class ServerManagedTypedActorAspect {
|
||||
@volatile private var isInitialized = false
|
||||
@volatile private var isStopped = false
|
||||
private var interfaceClass: Class[_] = _
|
||||
private var actorRef: ActorRef = _
|
||||
private var timeout: Long = _
|
||||
private var uuid: String = _
|
||||
private var remoteAddress: Option[InetSocketAddress] = _
|
||||
|
||||
//FIXME
|
||||
|
||||
private[akka] sealed class ServerManagedTypedActorAspect extends ActorAspect {
|
||||
|
||||
@Around("execution(* *.*(..)) && this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
|
||||
def invoke(joinPoint: JoinPoint): AnyRef = {
|
||||
println("### MyAspect intercepted " + joinPoint.getSignature)
|
||||
if (!isInitialized) initialize(joinPoint)
|
||||
remoteDispatch(joinPoint)
|
||||
}
|
||||
|
||||
|
||||
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
|
||||
val (message: Array[AnyRef], isEscaped) = escapeArguments(methodRtti.getParameterValues)
|
||||
|
||||
println("### remote dispatch...")
|
||||
|
||||
val future = RemoteClientModule.send[AnyRef](
|
||||
message, None, None, remoteAddress.get,
|
||||
timeout, isOneWay, actorRef,
|
||||
Some((interfaceClass.getName, methodRtti.getMethod.getName)),
|
||||
ActorType.TypedActor)
|
||||
|
||||
if (isOneWay) null // for void methods
|
||||
else {
|
||||
if (future.isDefined) {
|
||||
future.get.await
|
||||
val result = getResultOrThrowException(future.get)
|
||||
if (result.isDefined) result.get
|
||||
else throw new IllegalActorStateException("No result returned from call to [" + joinPoint + "]")
|
||||
} else throw new IllegalActorStateException("No future returned from call to [" + joinPoint + "]")
|
||||
}
|
||||
}
|
||||
|
||||
/*
|
||||
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
|
||||
|
||||
|
||||
if (!actorRef.isRunning && !isStopped) {
|
||||
isStopped = true
|
||||
// FIXME - what to do?
|
||||
// joinPoint.proceed
|
||||
null
|
||||
} else if (isOneWay) {
|
||||
actorRef.!("joinPoint")
|
||||
//actorRef.!(joinPoint)(senderActorRef)
|
||||
null.asInstanceOf[AnyRef]
|
||||
} else if (TypedActor.returnsFuture_?(methodRtti)) {
|
||||
actorRef.!!!(joinPoint, timeout)(senderActorRef)
|
||||
} else {
|
||||
val result = (actorRef.!!(joinPoint, timeout)(senderActorRef)).as[AnyRef]
|
||||
if (result.isDefined) result.get
|
||||
else throw new ActorTimeoutException("Invocation to [" + joinPoint + "] timed out.")
|
||||
}
|
||||
}
|
||||
*/
|
||||
private def getResultOrThrowException[T](future: Future[T]): Option[T] =
|
||||
if (future.exception.isDefined) throw future.exception.get
|
||||
else future.result
|
||||
|
||||
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
|
||||
var isEscaped = false
|
||||
val escapedArgs = for (arg <- args) yield {
|
||||
val clazz = arg.getClass
|
||||
if (clazz.getName.contains(TypedActor.AW_PROXY_PREFIX)) {
|
||||
isEscaped = true
|
||||
TypedActor.AW_PROXY_PREFIX + clazz.getSuperclass.getName
|
||||
} else arg
|
||||
}
|
||||
(escapedArgs, isEscaped)
|
||||
}
|
||||
|
||||
|
||||
private def initialize(joinPoint: JoinPoint): Unit = {
|
||||
val init = AspectInitRegistry.initFor(joinPoint.getThis)
|
||||
interfaceClass = init.interfaceClass
|
||||
actorRef = init.actorRef
|
||||
uuid = actorRef.uuid
|
||||
override def initialize(joinPoint: JoinPoint): Unit = {
|
||||
super.initialize(joinPoint)
|
||||
remoteAddress = actorRef.remoteAddress
|
||||
println("### address= " + remoteAddress.get)
|
||||
timeout = init.timeout
|
||||
isInitialized = true
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -686,16 +601,8 @@ private[akka] sealed class ServerManagedTypedActorAspect {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@Aspect("perInstance")
|
||||
private[akka] sealed class TypedActorAspect {
|
||||
@volatile private var isInitialized = false
|
||||
@volatile private var isStopped = false
|
||||
private var interfaceClass: Class[_] = _
|
||||
private var typedActor: TypedActor = _
|
||||
private var actorRef: ActorRef = _
|
||||
private var remoteAddress: Option[InetSocketAddress] = _
|
||||
private var timeout: Long = _
|
||||
private var uuid: String = _
|
||||
|
||||
private[akka] sealed class TypedActorAspect extends ActorAspect {
|
||||
|
||||
@Around("execution(* *.*(..)) && !this(se.scalablesolutions.akka.actor.ServerManagedTypedActor)")
|
||||
def invoke(joinPoint: JoinPoint): AnyRef = {
|
||||
if (!isInitialized) initialize(joinPoint)
|
||||
|
|
@ -706,12 +613,26 @@ private[akka] sealed class TypedActorAspect {
|
|||
if (remoteAddress.isDefined) remoteDispatch(joinPoint)
|
||||
else localDispatch(joinPoint)
|
||||
}
|
||||
}
|
||||
|
||||
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
/**
|
||||
* Base class for TypedActorAspect and ServerManagedTypedActorAspect to reduce code duplication.
|
||||
*/
|
||||
private[akka] abstract class ActorAspect {
|
||||
@volatile protected var isInitialized = false
|
||||
@volatile protected var isStopped = false
|
||||
protected var interfaceClass: Class[_] = _
|
||||
protected var typedActor: TypedActor = _
|
||||
protected var actorRef: ActorRef = _
|
||||
protected var timeout: Long = _
|
||||
protected var uuid: String = _
|
||||
protected var remoteAddress: Option[InetSocketAddress] = _
|
||||
|
||||
protected def localDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
val senderActorRef = Some(SenderContextInfo.senderActorRef.value)
|
||||
val senderProxy = Some(SenderContextInfo.senderProxy.value)
|
||||
val senderProxy = Some(SenderContextInfo.senderProxy.value)
|
||||
|
||||
typedActor.context._sender = senderProxy
|
||||
if (!actorRef.isRunning && !isStopped) {
|
||||
|
|
@ -732,7 +653,7 @@ private[akka] sealed class TypedActorAspect {
|
|||
}
|
||||
}
|
||||
|
||||
private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
protected def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
|
||||
val methodRtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val isOneWay = TypedActor.isOneWay(methodRtti)
|
||||
|
||||
|
|
@ -771,7 +692,7 @@ private[akka] sealed class TypedActorAspect {
|
|||
(escapedArgs, isEscaped)
|
||||
}
|
||||
|
||||
private def initialize(joinPoint: JoinPoint): Unit = {
|
||||
protected def initialize(joinPoint: JoinPoint): Unit = {
|
||||
val init = AspectInitRegistry.initFor(joinPoint.getThis)
|
||||
interfaceClass = init.interfaceClass
|
||||
typedActor = init.targetInstance
|
||||
|
|
@ -839,4 +760,7 @@ private[akka] sealed case class AspectInit(
|
|||
}
|
||||
|
||||
|
||||
/**
|
||||
* Marker interface for server manager typed actors.
|
||||
*/
|
||||
private[akka] sealed trait ServerManagedTypedActor extends TypedActor
|
||||
|
|
@ -45,7 +45,6 @@ class TypedActorLifecycleSpec extends Spec with ShouldMatchers with BeforeAndAft
|
|||
fail("expected exception not thrown")
|
||||
} catch {
|
||||
case e: RuntimeException => {
|
||||
println("#failed")
|
||||
cdl.await
|
||||
assert(SamplePojoImpl._pre)
|
||||
assert(SamplePojoImpl._post)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue