diff --git a/akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java b/akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java new file mode 100644 index 0000000000..05b4c22c37 --- /dev/null +++ b/akka-actor-tests/src/test/java/akka/actor/TestAnnotation.java @@ -0,0 +1,10 @@ +package akka.actor; + +import java.lang.annotation.*; + +@Retention(RetentionPolicy.RUNTIME) +@Target(ElementType.METHOD) +@Inherited +public @interface TestAnnotation { + String someString() default "pigdog"; +} diff --git a/akka-actor/src/main/scala/akka/actor/Supervisor.scala b/akka-actor/src/main/scala/akka/actor/Supervisor.scala index d20b8f31f4..2d7531a33f 100644 --- a/akka-actor/src/main/scala/akka/actor/Supervisor.scala +++ b/akka-actor/src/main/scala/akka/actor/Supervisor.scala @@ -116,6 +116,8 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act def link(child: ActorRef) = supervisor.link(child) + def startLink(child: ActorRef) = supervisor.startLink(child) + def unlink(child: ActorRef) = supervisor.unlink(child) def children: List[ActorRef] = diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index b7c6b45422..78f234c91a 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -9,7 +9,8 @@ import akka.actor.Actor.{ actorOf, futureToAnyOptionAsTypedOption } import akka.dispatch.{ MessageDispatcher, Dispatchers, Future } import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy } import akka.util.{ Duration } -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicReference ⇒ AtomVar } +import collection.immutable object TypedActor { private val selfReference = new ThreadLocal[AnyRef] @@ -19,24 +20,25 @@ object TypedActor { case some ⇒ some } - class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: ⇒ T) extends Actor { + private class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomVar[R], createInstance: ⇒ T) extends Actor { val me = createInstance - def callMethod(methodCall: MethodCall): Unit = methodCall match { - case m if m.isOneWay ⇒ m(me) - case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] - case m ⇒ self reply m(me) - } def receive = { case m: MethodCall ⇒ selfReference set proxyRef.get - try { callMethod(m) } finally { selfReference set null } + try { + m match { + case m if m.isOneWay ⇒ m(me) + case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] + case m ⇒ self reply m(me) + } + } finally { selfReference set null } } } case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match { case "toString" ⇒ actor.toString - case "equals" ⇒ ((proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean + case "equals" ⇒ (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean case "hashCode" ⇒ actor.hashCode.asInstanceOf[AnyRef] case _ ⇒ MethodCall(method, args) match { @@ -109,42 +111,55 @@ object TypedActor { case ref ⇒ ref.stop; true } - def getActorRefFor(typedActor: AnyRef): ActorRef = typedActor match { - case null ⇒ null - case other ⇒ Proxy.getInvocationHandler(other) match { - case null ⇒ null - case handler: TypedActorInvocationHandler ⇒ handler.actor - case _ ⇒ null - } + def getActorRefFor(typedActor: AnyRef): ActorRef = invocationHandlerFor(typedActor) match { + case null ⇒ null + case handler ⇒ handler.actor } - def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null + def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = + if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match { + case null ⇒ null + case other ⇒ Proxy.getInvocationHandler(other) match { + case null ⇒ null + case handler: TypedActorInvocationHandler ⇒ handler + case _ ⇒ null + } + } + else null - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = - createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + def isTypedActor(typedActor_? : AnyRef): Boolean = invocationHandlerFor(typedActor_?) ne null def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = - createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R = + createProxy(interfaces, (ref: AtomVar[R]) ⇒ constructor.create, config, loader) def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = - createProxy[R](interfaces, (ref: AtomicReference[R]) ⇒ constructor, config, loader) + createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, config, loader) - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[R] - configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader) + /* Internal API */ + + private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomVar[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader) } - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { - actor.timeout = config.timeout.toMillis - actor.dispatcher = config.dispatcher + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: ⇒ Actor, config: Configuration, loader: ClassLoader): T = { + + val ref = actorOf(actor) + + ref.timeout = config.timeout.toMillis + ref.dispatcher = config.dispatcher + + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T] proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop + Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop proxy } - private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = - if (clazz.isInterface) Array[Class[_]](clazz) - else clazz.getInterfaces + private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces } \ No newline at end of file