diff --git a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala index fe490fdd0a..0fa5b8e017 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/RoutingSpec.scala @@ -7,11 +7,25 @@ import akka.testing._ import akka.testing.Testing.{ sleepFor, testMillis } import akka.util.duration._ -import akka.actor.Actor import akka.actor.Actor._ import akka.routing._ import java.util.concurrent.atomic.AtomicInteger +import akka.dispatch.{ KeptPromise, Future } +import akka.actor.{ TypedActor, Actor } + +object RoutingSpec { + trait Foo { + def sq(x: Int, sleep: Long): Future[Int] + } + + class FooImpl extends Foo { + def sq(x: Int, sleep: Long): Future[Int] = { + if (sleep > 0) Thread.sleep(sleep) + new KeptPromise(Right(x * x)) + } + } +} class RoutingSpec extends WordSpec with MustMatchers { import Routing._ @@ -491,6 +505,29 @@ class RoutingSpec extends WordSpec with MustMatchers { pool.stop() } + + "support typed actors" in { + import RoutingSpec._ + import TypedActor._ + def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { + def lowerBound = 1 + def upperBound = 5 + def pressureThreshold = 1 + def partialFill = true + def selectionCount = 1 + def rampupRate = 0.1 + def backoffRate = 0.50 + def backoffThreshold = 0.50 + def instance = getActorRefFor(typedActorOf[Foo, FooImpl]()) + def receive = _route + } + + val pool = createProxy[Foo](createPool) + + val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100)) + + for ((i, r) ← results) r.get must equal(i * i) + } } } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index 52a6cf7622..e3c6c8e9e1 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -13,26 +13,24 @@ import java.util.concurrent.atomic.AtomicReference object TypedActor { private val selfReference = new ThreadLocal[AnyRef] - def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] - trait TypedActor[Iface <: AnyRef, Impl <: Iface] { self: Actor ⇒ - val proxyRef: AtomicReference[Iface] - def callMethod(methodCall: MethodCall): Unit - def receive: Receive = { - case m: MethodCall ⇒ - selfReference set proxyRef.get - try { callMethod(m) } finally { selfReference set null } - } + def self[T <: AnyRef] = selfReference.get.asInstanceOf[T] match { + case null ⇒ throw new IllegalStateException("Calling TypedActor.self outside of a TypedActor implementation method!") + case some ⇒ some } - class DefaultTypedActor[Iface <: AnyRef, Impl <: Iface]( - val proxyRef: AtomicReference[Iface], createInstance: ⇒ Impl) extends TypedActor[Iface, Impl] with Actor { + class TypedActor[R <: AnyRef, T <: R](val proxyRef: AtomicReference[R], createInstance: ⇒ T) extends Actor { val me = createInstance def callMethod(methodCall: MethodCall): Unit = methodCall match { case m if m.isOneWay ⇒ m(me) case m if m.returnsFuture_? ⇒ self.senderFuture.get completeWith m(me).asInstanceOf[Future[Any]] case m ⇒ self reply m(me) } + def receive = { + case m: MethodCall ⇒ + selfReference set proxyRef.get + try { callMethod(m) } finally { selfReference set null } + } } case class TypedActorInvocationHandler(actor: ActorRef) extends InvocationHandler { @@ -81,42 +79,24 @@ object TypedActor { private def readResolve(): AnyRef = MethodCall(ownerType.getDeclaredMethod(methodName, parameterTypes: _*), parameterValues) } - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration): T = - newTypedActor(Array[Class[_]](interface), impl.newInstance, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration): R = + createProxyAndTypedActor(interface, impl.newInstance, config, interface.getClassLoader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration): T = - newTypedActor(Array[Class[_]](interface), impl.create, config, interface.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration): R = + createProxyAndTypedActor(interface, impl.create, config, interface.getClassLoader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Class[TI], config: Configuration, loader: ClassLoader): T = - newTypedActor(Array[Class[_]](interface), impl.newInstance, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], config: Configuration, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.newInstance, config, loader) - def typedActorOf[T <: AnyRef, TI <: T](interface: Class[T], impl: Creator[TI], config: Configuration, loader: ClassLoader): T = - newTypedActor(Array[Class[_]](interface), impl.create, config, loader) + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], config: Configuration, loader: ClassLoader): R = + createProxyAndTypedActor(interface, impl.create, config, loader) def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], config: Configuration, loader: ClassLoader): R = - newTypedActor(impl.getInterfaces, impl.newInstance, config, loader) + createProxyAndTypedActor(impl, impl.newInstance, config, loader) def typedActorOf[R <: AnyRef, T <: R](config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { val clazz = m.erasure.asInstanceOf[Class[T]] - newTypedActor(clazz.getInterfaces, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) - } - - private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = - newTypedActor[R, T](interfaces, (ref: AtomicReference[R]) ⇒ new DefaultTypedActor[R, T](ref, constructor), config, loader) - - private[akka] def newTypedActor[R <: AnyRef, T <: R](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ TypedActor[R, T], config: Configuration, loader: ClassLoader): R = { - val proxyRef = new AtomicReference[R] - configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef).asInstanceOf[Actor]), config, loader) - } - - protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { - actor.timeout = config.timeout.toMillis - actor.dispatcher = config.dispatcher - - val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] - proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop - proxy + createProxyAndTypedActor(clazz, clazz.newInstance, config, if (loader eq null) clazz.getClassLoader else loader) } def stop(typedActor: AnyRef): Boolean = getActorRefFor(typedActor) match { @@ -134,4 +114,32 @@ object TypedActor { } def isTypedActor(typedActor_? : AnyRef): Boolean = getActorRefFor(typedActor_?) ne null + + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, config: Configuration, loader: ClassLoader): R = + createProxy[R](extractInterfaces(interface), (ref: AtomicReference[R]) ⇒ new TypedActor[R, T](ref, constructor), config, loader) + + def createProxy[R <: AnyRef](constructor: ⇒ Actor, config: Configuration = Configuration(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = + createProxy[R](extractInterfaces(m.erasure), (ref: AtomicReference[R]) ⇒ constructor, config, if (loader eq null) m.erasure.getClassLoader else loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, config: Configuration, loader: ClassLoader): R = + createProxy[R](interfaces, (ref: AtomicReference[R]) ⇒ constructor, config, loader) + + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomicReference[R]) ⇒ Actor, config: Configuration, loader: ClassLoader): R = { + val proxyRef = new AtomicReference[R] + configureAndProxyLocalActorRef[R](interfaces, proxyRef, actorOf(constructor(proxyRef)), config, loader) + } + + protected def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyRef: AtomicReference[T], actor: ActorRef, config: Configuration, loader: ClassLoader): T = { + actor.timeout = config.timeout.toMillis + actor.dispatcher = config.dispatcher + + val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actor)).asInstanceOf[T] + proxyRef.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + Actor.registry.registerTypedActor(actor.start, proxy) //We only have access to the proxy from the outside, so register it with the ActorRegistry, will be removed on actor.stop + proxy + } + + private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = + if (clazz.isInterface) Array[Class[_]](clazz) + else clazz.getInterfaces } \ No newline at end of file