Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Jonas Bonér 2011-05-24 19:04:35 +02:00
commit 71beab820c
3 changed files with 59 additions and 32 deletions

View file

@ -0,0 +1,10 @@
package akka.actor;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Inherited
public @interface TestAnnotation {
String someString() default "pigdog";
}

View file

@ -116,6 +116,8 @@ sealed class Supervisor(handler: FaultHandlingStrategy, maxRestartsHandler: (Act
def link(child: ActorRef) = supervisor.link(child)
def startLink(child: ActorRef) = supervisor.startLink(child)
def unlink(child: ActorRef) = supervisor.unlink(child)
def children: List[ActorRef] =

View file

@ -9,7 +9,8 @@ import akka.actor.Actor.{ actorOf, futureToAnyOptionAsTypedOption }
import akka.dispatch.{ MessageDispatcher, Dispatchers, Future }
import java.lang.reflect.{ InvocationTargetException, Method, InvocationHandler, Proxy }
import akka.util.{ Duration }
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.atomic.{ AtomicReference AtomVar }
import collection.immutable
object TypedActor {
private val selfReference = new ThreadLocal[AnyRef]
@ -19,24 +20,25 @@ object TypedActor {
case some some
}
class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: T) extends Actor {
private class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomVar[R], createInstance: T) extends Actor {
val me = createInstance
def callMethod(methodCall: MethodCall): Unit = methodCall match {
case m if m.isOneWay m(me)
case m if m.returnsFuture_? self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
case m self reply m(me)
}
def receive = {
case m: MethodCall
selfReference set proxyRef.get
try { callMethod(m) } finally { selfReference set null }
try {
m match {
case m if m.isOneWay m(me)
case m if m.returnsFuture_? self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
case m self reply m(me)
}
} finally { selfReference set null }
}
}
case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler {
def invoke(proxy: AnyRef, method: Method, args: Array[AnyRef]): AnyRef = method.getName match {
case "toString" actor.toString
case "equals" ((proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "equals" (args.length == 1 && (proxy eq args(0)) || actor == getActorRefFor(args(0))).asInstanceOf[AnyRef] //Force boxing of the boolean
case "hashCode" actor.hashCode.asInstanceOf[AnyRef]
case _
MethodCall(method, args) match {
@ -109,42 +111,55 @@ object TypedActor {
case ref ref.stop; true
}
def getActorRefFor(typedActor: AnyRef): ActorRef = typedActor match {
case null null
case other Proxy.getInvocationHandler(other) match {
case null null
case handler: TypedActorInvocationHandler handler.actor
case _ null
}
def getActorRefFor(typedActor: AnyRef): ActorRef = invocationHandlerFor(typedActor) match {
case null null
case handler handler.actor
}
def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null
def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {
case null null
case other Proxy.getInvocationHandler(other) match {
case null null
case handler: TypedActorInvocationHandler handler
case _ null
}
}
else null
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, config: Configuration, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) new TypedActor[R, T](ref, constructor), config, loader)
def isTypedActor(typedActor_? : AnyRef): Boolean = invocationHandlerFor(typedActor_?) ne null
def createProxy[R <: AnyRef](constructor: Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], config: Configuration, loader: ClassLoader): R =
createProxy(interfaces, (ref: AtomVar[R]) constructor.create, config, loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, config: Configuration, loader: ClassLoader): R =
createProxy[R](interfaces, (ref: AtomicReference[R]) constructor, config, loader)
createProxy[R](interfaces, (ref: AtomVar[R]) constructor, config, loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) Actor, config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomicReference[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader)
/* Internal API */
private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomVar[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, constructor(proxyRef), config, loader)
}
protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = {
actor.timeout = config.timeout.toMillis
actor.dispatcher = config.dispatcher
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, config: Configuration, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), config, loader)
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T]
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomVar[T], actor: Actor, config: Configuration, loader: ClassLoader): T = {
val ref = actorOf(actor)
ref.timeout = config.timeout.toMillis
ref.dispatcher = config.dispatcher
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(ref)).asInstanceOf[T]
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
Actor.registry.registerTypedActor(ref.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
proxy
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] =
if (clazz.isInterface) Array[Class[_]](clazz)
else clazz.getInterfaces
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces
}