diff --git a/akka-core/src/main/scala/actor/ActiveObject.scala b/akka-core/src/main/scala/actor/ActiveObject.scala index f6389ea1ba..337f785876 100644 --- a/akka-core/src/main/scala/actor/ActiveObject.scala +++ b/akka-core/src/main/scala/actor/ActiveObject.scala @@ -67,12 +67,58 @@ final class ActiveObjectConfiguration { } } +/** + * Holds RTTI (runtime type information) for the Active Object, f.e. current 'sender' + * reference etc. + *
+ * In order to make use of this context you have to create a member field in your + * Active Object that has the type 'ActiveObjectContext', then an instance will + * be injected for you to use. + * + * This class does not contain static information but is updated by the runtime system + * at runtime. + * + * Here is an example of usage: + *
+ * class Ping {
+ * // This context will be injected, holds RTTI (runtime type information)
+ * // for the current message send
+ * private ActiveObjectContext context = null;
+ *
+ * public void hit(int count) {
+ * Pong pong = (Pong) context.getSender();
+ * pong.hit(count++)
+ * }
+ * }
+ *
+ *
+ * @author Jonas Bonér
+ */
+final class ActiveObjectContext {
+ private[akka] var _sender: AnyRef = _
+ /**
+ * Returns the current sender Active Object reference.
+ * Scala style getter.
+ */
+ def sender = _sender
+
+ /**
+ * Returns the current sender Active Object reference.
+ * Java style getter.
+ */
+ def getSender = _sender
+}
+
+private[akka] object ActiveObjectContext {
+ private[actor] val sender = new scala.util.DynamicVariable[AnyRef](null)
+}
+
/**
* Factory class for creating Active Objects out of plain POJOs and/or POJOs with interfaces.
*
* @author Jonas Bonér
*/
-object ActiveObject {
+object ActiveObject extends Logging {
import Actor.actorOf
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
@@ -283,7 +329,8 @@ object ActiveObject {
private[akka] def newInstance[T](target: Class[T], actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
val proxy = Proxy.newInstance(target, false, false)
- actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy)
+ val context = injectActiveObjectContext(proxy)
+ actorRef.actor.asInstanceOf[Dispatcher].initialize(target, proxy, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(target, actorRef, remoteAddress, timeout))
@@ -291,9 +338,11 @@ object ActiveObject {
proxy.asInstanceOf[T]
}
- private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actorRef: ActorRef,
+ remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ val context = injectActiveObjectContext(target)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, false)
- actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target)
+ actorRef.actor.asInstanceOf[Dispatcher].initialize(target.getClass, target, context)
actorRef.timeout = timeout
if (remoteAddress.isDefined) actorRef.makeRemote(remoteAddress.get)
AspectInitRegistry.register(proxy, AspectInit(intf, actorRef, remoteAddress, timeout))
@@ -374,8 +423,31 @@ object ActiveObject {
this
}
+ private def injectActiveObjectContext(activeObject: AnyRef): Option[ActiveObjectContext] = {
+ def injectActiveObjectContext0(activeObject: AnyRef, clazz: Class[_]): Option[ActiveObjectContext] = {
+ val contextField = clazz.getDeclaredFields.toList.find(_.getType == classOf[ActiveObjectContext])
+ if (contextField.isDefined) {
+ contextField.get.setAccessible(true)
+ val context = new ActiveObjectContext
+ contextField.get.set(activeObject, context)
+ Some(context)
+ } else {
+ val parent = clazz.getSuperclass
+ if (parent != null) injectActiveObjectContext0(activeObject, parent)
+ else {
+ log.warning(
+ "Can't set 'ActiveObjectContext' for ActiveObject [%s] since no field of this type could be found.",
+ activeObject.getClass.getName)
+ None
+ }
+ }
+ }
+ injectActiveObjectContext0(activeObject, activeObject.getClass)
+ }
+
private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
Supervisor(SupervisorConfig(restartStrategy, components))
+
}
private[akka] object AspectInitRegistry {
@@ -412,16 +484,20 @@ private[akka] sealed class ActiveObjectAspect {
private var actorRef: ActorRef = _
private var remoteAddress: Option[InetSocketAddress] = _
private var timeout: Long = _
+ @volatile private var instance: AnyRef = _
@Around("execution(* *.*(..))")
def invoke(joinPoint: JoinPoint): AnyRef = {
+ instance = joinPoint.getThis
+ ActiveObjectContext.sender.value = instance
if (!isInitialized) {
- val init = AspectInitRegistry.initFor(joinPoint.getThis)
+ val init = AspectInitRegistry.initFor(instance)
target = init.target
actorRef = init.actorRef
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
+
}
dispatch(joinPoint)
}
@@ -434,9 +510,9 @@ private[akka] sealed class ActiveObjectAspect {
private def localDispatch(joinPoint: JoinPoint): AnyRef = {
val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
if (isOneWay(rtti)) {
- (actorRef ! Invocation(joinPoint, true, true) ).asInstanceOf[AnyRef]
- }
- else {
+ actorRef ! Invocation(joinPoint, true, true)
+ null.asInstanceOf[AnyRef]
+ } else {
val result = actorRef !! (Invocation(joinPoint, false, isVoid(rtti)), timeout)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
@@ -540,13 +616,15 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private var initTxState: Option[Method] = None
-
+ private var context: Option[ActiveObjectContext] = None
+
def this(transactionalRequired: Boolean) = this(transactionalRequired,None)
- private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
+ private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef, ctx: Option[ActiveObjectContext]) = {
if (transactionalRequired || targetClass.isAnnotationPresent(Annotations.transactionrequired)) self.makeTransactionRequired
self.id = targetClass.getName
target = Some(targetInstance)
+ context = ctx
val methods = targetInstance.getClass.getDeclaredMethods.toList
// See if we have any config define restart callbacks
@@ -590,6 +668,10 @@ private[akka] class Dispatcher(transactionalRequired: Boolean, val callbacks: Op
def receive = {
case Invocation(joinPoint, isOneWay, _) =>
+ context.foreach { ctx =>
+ val sender = ActiveObjectContext.sender.value
+ if (sender ne null) ctx._sender = sender
+ }
if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
if (isOneWay) joinPoint.proceed
else self.reply(joinPoint.proceed)