Added some API to be able to wrap interfaces on top of Actors, solving the ActorPool for TypedActor dilemma, closing ticket #724

This commit is contained in:
Viktor Klang 2011-05-23 17:08:45 +02:00
parent 1f5a04c678
commit e320825137
2 changed files with 85 additions and 40 deletions

View file

@ -7,11 +7,25 @@ import akka.testing._
import akka.testing.Testing.{ sleepFor, testMillis }
import akka.util.duration._
import akka.actor.Actor
import akka.actor.Actor._
import akka.routing._
import java.util.concurrent.atomic.AtomicInteger
import akka.dispatch.{ KeptPromise, Future }
import akka.actor.{ TypedActor, Actor }
object RoutingSpec {
trait Foo {
def sq(x: Int, sleep: Long): Future[Int]
}
class FooImpl extends Foo {
def sq(x: Int, sleep: Long): Future[Int] = {
if (sleep > 0) Thread.sleep(sleep)
new KeptPromise(Right(x * x))
}
}
}
class RoutingSpec extends WordSpec with MustMatchers {
import Routing._
@ -491,6 +505,29 @@ class RoutingSpec extends WordSpec with MustMatchers {
pool.stop()
}
"support typed actors" in {
import RoutingSpec._
import TypedActor._
def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup {
def lowerBound = 1
def upperBound = 5
def pressureThreshold = 1
def partialFill = true
def selectionCount = 1
def rampupRate = 0.1
def backoffRate = 0.50
def backoffThreshold = 0.50
def instance = getActorRefFor(typedActorOf[Foo, FooImpl]())
def receive = _route
}
val pool = createProxy[Foo](createPool)
val results = for (i 1 to 100) yield (i, pool.sq(i, 100))
for ((i, r) results) r.get must equal(i * i)
}
}
}

View file

@ -13,26 +13,24 @@ import java.util.concurrent.atomic.AtomicReference
object TypedActor {
private val selfReference = new ThreadLocal[AnyRef]
def self[T <: AnyRef] = selfReference.get.asInstanceOf[T]
trait TypedActor[Iface <: AnyRef, Impl <: Iface] { self: Actor
val proxyRef: AtomicReference[Iface]
def callMethod(methodCall: MethodCall): Unit
def receive: Receive = {
case m: MethodCall
selfReference set proxyRef.get
try { callMethod(m) } finally { selfReference set null }
}
def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match {
case null throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!")
case some some
}
class DefaultTypedActor[Iface <: AnyRef, Impl <: Iface](
val proxyRef: AtomicReference[Iface], createInstance: Impl) extends TypedActor[Iface, Impl] with Actor {
class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: T) extends Actor {
val me = createInstance
def callMethod(methodCall: MethodCall): Unit = methodCall match {
case m if m.isOneWay m(me)
case m if m.returnsFuture_? self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]]
case m self reply m(me)
}
def receive = {
case m: MethodCall
selfReference set proxyRef.get
try { callMethod(m) } finally { selfReference set null }
}
}
case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler {
@ -81,42 +79,24 @@ object TypedActor {
private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues)
}
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration): T =
newTypedActor(Array[Class[_]](interface), impl.newInstance, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader)
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration): T =
newTypedActor(Array[Class[_]](interface), impl.create, config, interface.getClassLoader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R =
createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader)
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration, loader: ClassLoader): T =
newTypedActor(Array[Class[_]](interface), impl.newInstance, config, loader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.newInstance, config, loader)
def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration, loader: ClassLoader): T =
newTypedActor(Array[Class[_]](interface), impl.create, config, loader)
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R =
createProxyAndTypedActor(interface, impl.create, config, loader)
def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R =
newTypedActor(impl.getInterfaces, impl.newInstance, config, loader)
createProxyAndTypedActor(impl, impl.newInstance, config, loader)
def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = {
val clazz = m.erasure.asInstanceOf[Class[T]]
newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
}
private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: T, config: Configuration, loader: ClassLoader): R =
newTypedActor[R, T](interfaces, (ref: AtomicReference[R]) new DefaultTypedActor[R, T](ref, constructor), config, loader)
private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) TypedActor[R, T], config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomicReference[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef).asInstanceOf[Actor]), config, loader)
}
protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = {
actor.timeout = config.timeout.toMillis
actor.dispatcher = config.dispatcher
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T]
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
proxy
createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader)
}
def stop(typedActor: AnyRef): Boolean = getActorRefFor(typedActor) match {
@ -134,4 +114,32 @@ object TypedActor {
}
def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: T, config: Configuration, loader: ClassLoader): R =
createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) new TypedActor[R, T](ref, constructor), config, loader)
def createProxy[R <: AnyRef](constructor: Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R =
createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) constructor, config, if (loader eq null) m.erasure.getClassLoader else loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, config: Configuration, loader: ClassLoader): R =
createProxy[R](interfaces, (ref: AtomicReference[R]) constructor, config, loader)
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) Actor, config: Configuration, loader: ClassLoader): R = {
val proxyRef = new AtomicReference[R]
configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader)
}
protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = {
actor.timeout = config.timeout.toMillis
actor.dispatcher = config.dispatcher
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T]
proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop
proxy
}
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] =
if (clazz.isInterface) Array[Class[_]](clazz)
else clazz.getInterfaces
}