From 6f6d9193f4ece1f039bd018b565fa669bf7b55c1 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 May 2011 12:13:59 +0200 Subject: [PATCH 1/5] Fixing ticket #888, adding startLink to Supervisor --- akka-actor/src/main/scala/akka/actor/Supervisor.scala | 2 ++ 1 file changed, 2 insertions(+) diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index d20b8f31f4..2d7531a33f 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -116,6 +116,8 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act def link(child: ActorRef) = supervisor.link(child) + def startLink(child: ActorRef) = supervisor.startLink(child) + def unlink(child: ActorRef) = supervisor.unlink(child) def children: List[ActorRef] = From 2642c9a9cc484cc5a09b1de514c454679ea05046 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 May 2011 12:14:33 +0200 Subject: [PATCH 2/5] Adding support for retrieving interfaces proxied and the current implementation behind the proxy, intended for annotation processing etc --- .../test/java/akka/actor/TestAnnotation.java | 10 +++ .../main/scala/akka/actor/TypedActor.scala | 87 ++++++++++++------- 2 files changed, 64 insertions(+), 33 deletions(-) create mode 100644 akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java diff --git a/akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java b/akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java new file mode 100644 index 0000000000..05b4c22c37 --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java @@ -0,0 +1,10 @@ +package akka.actor; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@Inherited +public @interface TestAnnotation { + String someString() default "pigdog"; +} diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index b7c6b45422..97f08d8273 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -9,7 +9,8 @@ import akka.actor.Actor.{ actorOf, futureToAnyOptionAsTypedOption } import akka.dispatch.{ MessageDispatcher, Dispatchers, Future } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Duration } -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } +import collection.immutable object TypedActor { private val selfReference = new ThreadLocal[AnyRef] @@ -19,24 +20,26 @@ object TypedActor { case some ⇒ some } - class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: ⇒ T) extends Actor { + private class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomVar[R], createInstance: ⇒ T) extends Actor { val me = createInstance - def callMethod(methodCall: MethodCall): Unit = methodCall match { - case m if m.isOneWay ⇒ m(me) - case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] - case m ⇒ self reply m(me) - } def receive = { case m: MethodCall ⇒ selfReference set proxyRef.get - try { callMethod(m) } finally { selfReference set null } + try { + m match { + case m if m.isOneWay ⇒ m(me) + case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] + case m ⇒ self reply m(me) + } + } finally { selfReference set null } } } - case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { + class TypedActorInvocationHandler(val interfaces: immutable.Seq[Class[_]], implementationVar: AtomVar[Class[_]], val actor: ActorRef) extends InvocationHandler { + def implementation: Class[_] = implementationVar.get def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString - case "equals" ⇒ ((proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean + case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ MethodCall(method, args) match { @@ -109,42 +112,60 @@ object TypedActor { case ref ⇒ ref.stop; true } - def getActorRefFor(typedActor: AnyRef): ActorRef = typedActor match { - case null ⇒ null - case other ⇒ Proxy.getInvocationHandler(other) match { - case null ⇒ null - case handler: TypedActorInvocationHandler ⇒ handler.actor - case _ ⇒ null - } + def getActorRefFor(typedActor: AnyRef): ActorRef = invocationHandlerFor(typedActor) match { + case null ⇒ null + case handler ⇒ handler.actor } - def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null + def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = + if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match { + case null ⇒ null + case other ⇒ Proxy.getInvocationHandler(other) match { + case null ⇒ null + case handler: TypedActorInvocationHandler ⇒ handler + case _ ⇒ null + } + } + else null - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = - createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + def isTypedActor(typedActor_? : AnyRef): Boolean = invocationHandlerFor(typedActor_?) ne null def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = - createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = - createProxy[R](interfaces, (ref: AtomicReference[R]) ⇒ constructor, config, loader) + createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, config, loader) - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[R] - configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomVar[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader) } - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { - actor.timeout = config.timeout.toMillis - actor.dispatcher = config.dispatcher + /* Internal API */ - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + + protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: ⇒ Actor, config: Configuration, loader: ClassLoader): T = { + val implementationVar = new AtomVar[Class[_]] + val ref = actorOf({ + val instance = actor + implementationVar.set(instance match { + case null ⇒ null + case some: TypedActor[_, _] ⇒ some.me.getClass + case some ⇒ some.getClass + }) + instance + }) + + ref.timeout = config.timeout.toMillis + ref.dispatcher = config.dispatcher + + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(interfaces.toList, implementationVar, ref)).asInstanceOf[T] proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop + Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop proxy } - private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = - if (clazz.isInterface) Array[Class[_]](clazz) - else clazz.getInterfaces + private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces } \ No newline at end of file From 724cbf43767d3a999d27cededdb0827f9fa00792 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 May 2011 13:52:22 +0200 Subject: [PATCH 3/5] Removing hte slick interfaces/implementation classes because it`s just not good enough --- .../src/main/scala/akka/actor/TypedActor.scala | 16 +++------------- 1 file changed, 3 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 97f08d8273..0789ef263f 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -35,8 +35,7 @@ object TypedActor { } } - class TypedActorInvocationHandler(val interfaces: immutable.Seq[Class[_]], implementationVar: AtomVar[Class[_]], val actor: ActorRef) extends InvocationHandler { - def implementation: Class[_] = implementationVar.get + case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean @@ -147,21 +146,12 @@ object TypedActor { createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: ⇒ Actor, config: Configuration, loader: ClassLoader): T = { - val implementationVar = new AtomVar[Class[_]] - val ref = actorOf({ - val instance = actor - implementationVar.set(instance match { - case null ⇒ null - case some: TypedActor[_, _] ⇒ some.me.getClass - case some ⇒ some.getClass - }) - instance - }) + val ref = actorOf(actor) ref.timeout = config.timeout.toMillis ref.dispatcher = config.dispatcher - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(interfaces.toList, implementationVar, ref)).asInstanceOf[T] + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T] proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop proxy From 53107891414be0340ac6b1039b5775a638cc81ac Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 May 2011 13:52:22 +0200 Subject: [PATCH 4/5] Removing hte slick interfaces/implementation classes because it`s just not good enough --- .../main/scala/akka/actor/TypedActor.scala | 24 ++++++------------- 1 file changed, 7 insertions(+), 17 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 97f08d8273..ce29597c76 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -35,8 +35,7 @@ object TypedActor { } } - class TypedActorInvocationHandler(val interfaces: immutable.Seq[Class[_]], implementationVar: AtomVar[Class[_]], val actor: ActorRef) extends InvocationHandler { - def implementation: Class[_] = implementationVar.get + case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean @@ -136,32 +135,23 @@ object TypedActor { def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, config, loader) - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { + /* Internal API */ + + private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { val proxyRef = new AtomVar[R] configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader) } - /* Internal API */ - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: ⇒ Actor, config: Configuration, loader: ClassLoader): T = { - val implementationVar = new AtomVar[Class[_]] - val ref = actorOf({ - val instance = actor - implementationVar.set(instance match { - case null ⇒ null - case some: TypedActor[_, _] ⇒ some.me.getClass - case some ⇒ some.getClass - }) - instance - }) + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: ⇒ Actor, config: Configuration, loader: ClassLoader): T = { + val ref = actorOf(actor) ref.timeout = config.timeout.toMillis ref.dispatcher = config.dispatcher - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(interfaces.toList, implementationVar, ref)).asInstanceOf[T] + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T] proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop proxy From 3ccfdf822c82d6cccc8edefebc74b628930108c2 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Tue, 24 May 2011 14:57:11 +0200 Subject: [PATCH 5/5] Adding a Java API method for generic proxying --- akka-actor/src/main/scala/akka/actor/TypedActor.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index f840db29ad..78f234c91a 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -132,6 +132,9 @@ object TypedActor { def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R = + createProxy(interfaces, (ref: AtomVar[R]) ⇒ constructor.create, config, loader) + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, config, loader)