Added ActiveObjectContext with sender reference

This commit is contained in:
Jonas Bonér 2010-05-24 12:20:41 +02:00
parent 7406d23e4e
commit 39a9bef0d1

View file

@ -67,12 +67,58 @@ final class ActiveObjectConfiguration {
}
}
/**
* Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender'
* reference etc.
* <p/>
* In order to make use of this context you have to create a member field in your
* Active Object that has the type 'ActiveObjectContext', then an instance will
* be injected for you to use.
* <p/>
* This class does not contain static information but is updated by the runtime system
* at runtime.
* <p/>
* Here is an example of usage:
* <pre>
* class Ping {
* // This context will be injected, holds RTTI (runtime type information)
* // for the current message send
* private ActiveObjectContext context = null;
*
* public void hit(int count) {
* Pong pong = (Pong) context.getSender();
* pong.hit(count++)
* }
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
final class ActiveObjectContext {
private[akka] var _sender: AnyRef = _
/**
* Returns the current sender Active Object reference.
* Scala style getter.
*/
def sender = _sender
/**
* Returns the current sender Active Object reference.
* Java style getter.
*/
def getSender = _sender
}
private[akka] object ActiveObjectContext {
private[actor] val sender = new scala.util.DynamicVariable[AnyRef](null)
}
/**
* Factory class for creating Active Objects out of plain POJOs and/or POJOs with interfaces.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActiveObject {
object ActiveObject extends Logging {
import Actor.actorOf
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
@ -283,7 +329,8 @@ object ActiveObject {
private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(target, false, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy)
val context = injectActiveObjectContext(proxy)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
@ -291,9 +338,11 @@ object ActiveObject {
proxy.asInstanceOf[T]
}
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target)
actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
@ -374,8 +423,31 @@ object ActiveObject {
this
}
private def injectActiveObjectContext(activeObject: AnyRef): Option[ActiveObjectContext] = {
def injectActiveObjectContext0(activeObject: AnyRef, clazz: Class[_]): Option[ActiveObjectContext] = {
val contextField = clazz.getDeclaredFields.toList.find(_.getType == classOf[ActiveObjectContext])
if (contextField.isDefined) {
contextField.get.setAccessible(true)
val context = new ActiveObjectContext
contextField.get.set(activeObject, context)
Some(context)
} else {
val parent = clazz.getSuperclass
if (parent != null) injectActiveObjectContext0(activeObject, parent)
else {
log.warning(
"Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
activeObject.getClass.getName)
None
}
}
}
injectActiveObjectContext0(activeObject, activeObject.getClass)
}
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
Supervisor(SupervisorConfig(restartStrategy, components))
}
private[akka] object AspectInitRegistry {
@ -412,16 +484,20 @@ private[akka] sealed class ActiveObjectAspect {
private var actorRef: ActorRef = _
private var remoteAddress: Option[InetSocketAddress] = _
private var timeout: Long = _
@volatile private var instance: AnyRef = _
@Around("execution(* *.*(..))")
def invoke(joinPoint: JoinPoint): AnyRef = {
instance = joinPoint.getThis
ActiveObjectContext.sender.value = instance
if (!isInitialized) {
val init = AspectInitRegistry.initFor(joinPoint.getThis)
val init = AspectInitRegistry.initFor(instance)
target = init.target
actorRef = init.actorRef
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
}
dispatch(joinPoint)
}
@ -434,9 +510,9 @@ private[akka] sealed class ActiveObjectAspect {
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
if (isOneWay(rtti)) {
(actorRef ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
}
else {
actorRef ! Invocation(joinPoint, true, true)
null.asInstanceOf[AnyRef]
} else {
val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
@ -540,13 +616,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
private var context: Option[ActiveObjectContext] = None
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
context = ctx
val methods = targetInstance.getClass.getDeclaredMethods.toList
// See if we have any config define restart callbacks
@ -590,6 +668,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def receive = {
case Invocation(joinPoint, isOneWay, _) =>
context.foreach { ctx =>
val sender = ActiveObjectContext.sender.value
if (sender ne null) ctx._sender = sender
}
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else self.reply(joinPoint.proceed)