Creating TypedProps and implementing support for wrapping an arbitrary ActorRef as a TypedActor

This commit is contained in:
Viktor Klang 2012-01-16 14:11:29 +01:00
parent 0470f5f9fd
commit de151617f2
11 changed files with 239 additions and 161 deletions

View file

@ -14,12 +14,6 @@ import akka.util.Timeout
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout {
def actorWithTimeout(t: Timeout): ActorRef = system.actorOf(Props(creator = () new Actor {
def receive = {
case x
}
}, timeout = t))
val defaultTimeout = system.settings.ActorTimeout.duration val defaultTimeout = system.settings.ActorTimeout.duration
val testTimeout = if (system.settings.ActorTimeout.duration < 400.millis) 500 millis else 100 millis val testTimeout = if (system.settings.ActorTimeout.duration < 400.millis) 500 millis else 100 millis
@ -27,7 +21,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
"use the global default timeout if no implicit in scope" in { "use the global default timeout if no implicit in scope" in {
within(defaultTimeout - 100.millis, defaultTimeout + 400.millis) { within(defaultTimeout - 100.millis, defaultTimeout + 400.millis) {
val echo = actorWithTimeout(Timeout(12)) val echo = system.actorOf(Props.empty)
try { try {
val d = system.settings.ActorTimeout.duration val d = system.settings.ActorTimeout.duration
val f = echo ? "hallo" val f = echo ? "hallo"
@ -39,7 +33,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
"use implicitly supplied timeout" in { "use implicitly supplied timeout" in {
implicit val timeout = Timeout(testTimeout) implicit val timeout = Timeout(testTimeout)
within(testTimeout - 100.millis, testTimeout + 300.millis) { within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = actorWithTimeout(Props.defaultTimeout) val echo = system.actorOf(Props.empty)
try { try {
val f = (echo ? "hallo").mapTo[String] val f = (echo ? "hallo").mapTo[String]
intercept[AskTimeoutException] { Await.result(f, testTimeout + testTimeout) } intercept[AskTimeoutException] { Await.result(f, testTimeout + testTimeout) }
@ -49,7 +43,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo
"use explicitly supplied timeout" in { "use explicitly supplied timeout" in {
within(testTimeout - 100.millis, testTimeout + 300.millis) { within(testTimeout - 100.millis, testTimeout + 300.millis) {
val echo = actorWithTimeout(Props.defaultTimeout) val echo = system.actorOf(Props.empty)
val f = echo.?("hallo", testTimeout) val f = echo.?("hallo", testTimeout)
try { try {
intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) } intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) }

View file

@ -72,7 +72,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
// Creating actors and supervisors // Creating actors and supervisors
// ===================================================== // =====================================================
private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], props.timeout.duration) private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
def temporaryActorAllForOne = { def temporaryActorAllForOne = {
val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0)))) val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0))))

View file

@ -179,13 +179,14 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
def newFooBar: Foo = newFooBar(Duration(2, "s")) def newFooBar: Foo = newFooBar(Duration(2, "s"))
def newFooBar(d: Duration): Foo = def newFooBar(d: Duration): Foo =
newFooBar(Props().withTimeout(Timeout(d))) TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)))
def newFooBar(props: Props): Foo = def newFooBar(dispatcher: String, d: Duration): Foo =
TypedActor(system).typedActorOf(classOf[Foo], classOf[Bar], props) TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)).withDispatcher(dispatcher))
def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked = def newStacked(): Stacked =
TypedActor(system).typedActorOf(classOf[Stacked], classOf[StackedImpl], props) TypedActor(system).typedActorOf(
TypedProps[StackedImpl](classOf[Stacked], classOf[StackedImpl]).withTimeout(Timeout(2000)))
def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true) def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true)
@ -298,11 +299,11 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
"be able to handle exceptions when calling methods" in { "be able to handle exceptions when calling methods" in {
filterEvents(EventFilter[IllegalStateException]("expected")) { filterEvents(EventFilter[IllegalStateException]("expected")) {
val boss = system.actorOf(Props(context { val boss = system.actorOf(Props(context {
case p: Props context.sender ! TypedActor(context).typedActorOf(classOf[Foo], classOf[Bar], p) case p: TypedProps[_] context.sender ! TypedActor(context).typedActorOf(p)
}).withFaultHandler(OneForOneStrategy { }).withFaultHandler(OneForOneStrategy {
case e: IllegalStateException if e.getMessage == "expected" FaultHandlingStrategy.Resume case e: IllegalStateException if e.getMessage == "expected" FaultHandlingStrategy.Resume
})) }))
val t = Await.result((boss ? Props().withTimeout(2 seconds)).mapTo[Foo], timeout.duration) val t = Await.result((boss ? TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(2 seconds)).mapTo[Foo], timeout.duration)
t.incr() t.incr()
t.failingPigdog() t.failingPigdog()
@ -330,7 +331,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
} }
"be able to support implementation only typed actors" in { "be able to support implementation only typed actors" in {
val t = TypedActor(system).typedActorOf[Foo, Bar](Props()) val t: Foo = TypedActor(system).typedActorOf(TypedProps[Bar]())
val f = t.futurePigdog(200) val f = t.futurePigdog(200)
val f2 = t.futurePigdog(0) val f2 = t.futurePigdog(0)
f2.isCompleted must be(false) f2.isCompleted must be(false)
@ -340,16 +341,14 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
} }
"be able to support implementation only typed actors with complex interfaces" in { "be able to support implementation only typed actors with complex interfaces" in {
val t = TypedActor(system).typedActorOf[Stackable1 with Stackable2, StackedImpl]() val t: Stackable1 with Stackable2 = TypedActor(system).typedActorOf(TypedProps[StackedImpl]())
t.stackable1 must be("foo") t.stackable1 must be("foo")
t.stackable2 must be("bar") t.stackable2 must be("bar")
mustStop(t) mustStop(t)
} }
"be able to use balancing dispatcher" in { "be able to use balancing dispatcher" in {
val props = Props(timeout = Timeout(6600), dispatcher = "pooled-dispatcher") val thais = for (i 1 to 60) yield newFooBar("pooled-dispatcher", 6 seconds)
val thais = for (i 1 to 60) yield newFooBar(props)
val iterator = new CyclicIterator(thais) val iterator = new CyclicIterator(thais)
val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i)) val results = for (i 1 to 120) yield (i, iterator.next.futurePigdog(200L, i))
@ -405,7 +404,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config)
"be able to override lifecycle callbacks" in { "be able to override lifecycle callbacks" in {
val latch = new CountDownLatch(16) val latch = new CountDownLatch(16)
val ta = TypedActor(system) val ta = TypedActor(system)
val t: LifeCycles = ta.typedActorOf(classOf[LifeCycles], new Creator[LifeCyclesImpl] { def create = new LifeCyclesImpl(latch) }, Props()) val t: LifeCycles = ta.typedActorOf(TypedProps[LifeCyclesImpl](classOf[LifeCycles], new LifeCyclesImpl(latch)))
EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept { EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept {
t.crash() t.crash()
} }

View file

@ -580,9 +580,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = { private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = {
new TimerTask { new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) { def run(timeout: org.jboss.netty.akka.util.Timeout) {
// Check if the receiver is still alive and kicking before sending it a message and reschedule the task receiver ! message
// Check if the receiver is still alive and kicking before rescheduling the task
if (!receiver.isTerminated) { if (!receiver.isTerminated) {
receiver ! message
try timeout.getTimer.newTimeout(this, delay) catch { try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped case _: IllegalStateException // stop recurring if timer is stopped
} }
@ -593,16 +593,8 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter,
} }
} }
private def createContinuousTask(delay: Duration, f: Unit): TimerTask = { private def createContinuousTask(delay: Duration, f: Unit): TimerTask =
new TimerTask { createContinuousTask(delay, new Runnable { def run = f })
def run(timeout: org.jboss.netty.akka.util.Timeout) {
dispatcher.execute(new Runnable { def run = f })
try timeout.getTimer.newTimeout(this, delay) catch {
case _: IllegalStateException // stop recurring if timer is stopped
}
}
}
}
private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = { private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = {
new TimerTask { new TimerTask {

View file

@ -21,7 +21,6 @@ object Props {
import FaultHandlingStrategy._ import FaultHandlingStrategy._
final val defaultCreator: () Actor = () throw new UnsupportedOperationException("No actor creator specified!") final val defaultCreator: () Actor = () throw new UnsupportedOperationException("No actor creator specified!")
final val defaultTimeout: Timeout = Timeout(Duration.MinusInf)
final val defaultDecider: Decider = { final val defaultDecider: Decider = {
case _: ActorInitializationException Stop case _: ActorInitializationException Stop
case _: ActorKilledException Stop case _: ActorKilledException Stop
@ -95,12 +94,10 @@ object Props {
* val props = Props( * val props = Props(
* creator = .., * creator = ..,
* dispatcher = .., * dispatcher = ..,
* timeout = ..,
* faultHandler = .., * faultHandler = ..,
* routerConfig = .. * routerConfig = ..
* ) * )
* val props = Props().withCreator(new MyActor) * val props = Props().withCreator(new MyActor)
* val props = Props[MyActor].withTimeout(timeout)
* val props = Props[MyActor].withRouter(RoundRobinRouter(..)) * val props = Props[MyActor].withRouter(RoundRobinRouter(..))
* val props = Props[MyActor].withFaultHandler(OneForOneStrategy { * val props = Props[MyActor].withFaultHandler(OneForOneStrategy {
* case e: IllegalStateException Resume * case e: IllegalStateException Resume
@ -117,7 +114,6 @@ object Props {
* } * }
* }); * });
* Props props = new Props().withCreator(new UntypedActorFactory() { ... }); * Props props = new Props().withCreator(new UntypedActorFactory() { ... });
* Props props = new Props(MyActor.class).withTimeout(timeout);
* Props props = new Props(MyActor.class).withFaultHandler(new OneForOneStrategy(...)); * Props props = new Props(MyActor.class).withFaultHandler(new OneForOneStrategy(...));
* Props props = new Props(MyActor.class).withRouter(new RoundRobinRouter(..)); * Props props = new Props(MyActor.class).withRouter(new RoundRobinRouter(..));
* }}} * }}}
@ -125,7 +121,6 @@ object Props {
case class Props( case class Props(
creator: () Actor = Props.defaultCreator, creator: () Actor = Props.defaultCreator,
dispatcher: String = Dispatchers.DefaultDispatcherId, dispatcher: String = Dispatchers.DefaultDispatcherId,
timeout: Timeout = Props.defaultTimeout,
faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler,
routerConfig: RouterConfig = Props.defaultRoutedProps) { routerConfig: RouterConfig = Props.defaultRoutedProps) {
@ -135,7 +130,6 @@ case class Props(
def this() = this( def this() = this(
creator = Props.defaultCreator, creator = Props.defaultCreator,
dispatcher = Dispatchers.DefaultDispatcherId, dispatcher = Dispatchers.DefaultDispatcherId,
timeout = Props.defaultTimeout,
faultHandler = Props.defaultFaultHandler) faultHandler = Props.defaultFaultHandler)
/** /**
@ -144,7 +138,6 @@ case class Props(
def this(factory: UntypedActorFactory) = this( def this(factory: UntypedActorFactory) = this(
creator = () factory.create(), creator = () factory.create(),
dispatcher = Dispatchers.DefaultDispatcherId, dispatcher = Dispatchers.DefaultDispatcherId,
timeout = Props.defaultTimeout,
faultHandler = Props.defaultFaultHandler) faultHandler = Props.defaultFaultHandler)
/** /**
@ -153,7 +146,6 @@ case class Props(
def this(actorClass: Class[_ <: Actor]) = this( def this(actorClass: Class[_ <: Actor]) = this(
creator = () actorClass.newInstance, creator = () actorClass.newInstance,
dispatcher = Dispatchers.DefaultDispatcherId, dispatcher = Dispatchers.DefaultDispatcherId,
timeout = Props.defaultTimeout,
faultHandler = Props.defaultFaultHandler, faultHandler = Props.defaultFaultHandler,
routerConfig = Props.defaultRoutedProps) routerConfig = Props.defaultRoutedProps)
@ -183,11 +175,6 @@ case class Props(
*/ */
def withDispatcher(d: String) = copy(dispatcher = d) def withDispatcher(d: String) = copy(dispatcher = d)
/**
* Returns a new Props with the specified timeout set.
*/
def withTimeout(t: Timeout) = copy(timeout = t)
/** /**
* Returns a new Props with the specified faulthandler set. * Returns a new Props with the specified faulthandler set.
*/ */

View file

@ -12,6 +12,7 @@ import akka.serialization.{ Serializer, Serialization }
import akka.dispatch._ import akka.dispatch._
import akka.serialization.SerializationExtension import akka.serialization.SerializationExtension
import java.util.concurrent.TimeoutException import java.util.concurrent.TimeoutException
import java.lang.IllegalStateException
trait TypedActorFactory { trait TypedActorFactory {
@ -48,100 +49,31 @@ trait TypedActorFactory {
def getActorRefFor(proxy: AnyRef): ActorRef def getActorRefFor(proxy: AnyRef): ActorRef
/** /**
* Creates a new TypedActor proxy using the supplied Props, * Creates a new TypedActor with the specified properies
* the interfaces usable by the returned proxy is the suppli ed interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/ */
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T]): R = {
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, interface.getClassLoader) val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
val c = props.creator //Cache this to avoid closing over the Props
/** val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c()))
* Creates a new TypedActor proxy using the supplied Props, typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap))
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*
* Java API
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or
* all interfaces (Class.getInterfaces) if it's not an interface class
*
* Scala API
*/
def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: T, props: Props, name: String): R =
typedActor.createProxyAndTypedActor(actorFactory, interface, impl, props, Some(name), interface.getClassLoader)
/**
* Creates a new TypedActor proxy using the supplied Props,
* the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces)
*
* Scala API
*/
def typedActorOf[R <: AnyRef, T <: R: ClassManifest](props: Props = Props(), name: String = null): R = {
val clazz = implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[T]]
typedActor.createProxyAndTypedActor(actorFactory, clazz, clazz.newInstance, props, Option(name), clazz.getClassLoader)
} }
/** /**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * Creates a new TypedActor with the specified properies
* to create TypedActor proxies, use typedActorOf
*/ */
def createProxy[R <: AnyRef](constructor: Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R = def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], name: String): R = {
typedActor.createProxy[R](actorFactory, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader) val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver
val c = props.creator //Cache this to avoid closing over the Props
val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c()))
typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap, name))
}
/** /**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, * Creates a TypedActor that intercepts the calls and forwards them as [[akka.actor.TypedActor.MethodCall]]
* to create TypedActor proxies, use typedActorOf * to the provided ActorRef.
*/ */
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], actorRef: ActorRef): R =
typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) constructor.create, props, None, loader) typedActor.createActorRefProxy(props, null: AtomVar[R], actorRef)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) constructor.create, props, Some(name), loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, loader: ClassLoader): R =
typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) constructor, props, None, loader)
/**
* Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself,
* to create TypedActor proxies, use typedActorOf
*/
def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Actor, props: Props, name: String, loader: ClassLoader): R =
typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) constructor, props, Some(name), loader)
} }
@ -412,6 +344,173 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi
} }
} }
/**
* TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
* It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
*/
object TypedProps {
val defaultDispatcherId: String = Dispatchers.DefaultDispatcherId
val defaultFaultHandler: FaultHandlingStrategy = akka.actor.Props.defaultFaultHandler
val defaultTimeout: Option[Timeout] = None
val defaultLoader: Option[ClassLoader] = None
/**
* @returns a sequence of interfaces that the speicified class implements,
* or a sequence containing only itself, if itself is an interface.
*/
def extractInterfaces(clazz: Class[_]): Seq[Class[_]] =
if (clazz.isInterface) Seq[Class[_]](clazz) else clazz.getInterfaces.toList
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* proxying all the interfaces it implements.
*
* Scala API
*/
def apply[T <: AnyRef](implementation: Class[T]): TypedProps[T] =
new TypedProps[T](implementation)
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Scala API
*/
def apply[T <: AnyRef](interface: Class[_ >: T], implementation: Class[T]): TypedProps[T] =
new TypedProps[T](extractInterfaces(interface), () implementation.newInstance())
/**
* Uses the supplied thunk as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Scala API
*/
def apply[T <: AnyRef](interface: Class[_ >: T], creator: T): TypedProps[T] =
new TypedProps[T](extractInterfaces(interface), () creator)
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* proxying all the interfaces it implements.
*
* Scala API
*/
def apply[T <: AnyRef: ClassManifest](): TypedProps[T] =
new TypedProps[T](implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[T]])
}
/**
* TypedProps is a TypedActor configuration object, that is thread safe and fully sharable.
* It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance.
*/
case class TypedProps[T <: AnyRef] protected[akka] (interfaces: Seq[Class[_]],
creator: () T,
dispatcher: String = TypedProps.defaultDispatcherId,
faultHandler: FaultHandlingStrategy = TypedProps.defaultFaultHandler,
timeout: Option[Timeout] = TypedProps.defaultTimeout,
loader: Option[ClassLoader] = TypedProps.defaultLoader) {
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*/
def this(implementation: Class[T]) =
this(interfaces = TypedProps.extractInterfaces(implementation),
creator = () implementation.newInstance())
/**
* Uses the supplied Creator as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Java API.
*/
def this(interface: Class[_ >: T], implementation: Creator[T]) =
this(interfaces = TypedProps.extractInterfaces(interface),
creator = () implementation.create())
/**
* Uses the supplied class as the factory for the TypedActor implementation,
* and that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*
* Java API.
*/
def this(interface: Class[_ >: T], implementation: Class[T]) =
this(interfaces = TypedProps.extractInterfaces(interface),
creator = () implementation.newInstance())
/**
* Returns a new Props with the specified dispatcher set.
*/
def withDispatcher(d: String) = copy(dispatcher = d)
/**
* Returns a new Props with the specified faulthandler set.
*/
def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f)
/**
* @returns a new Props that will use the specified ClassLoader to create its proxy class in
* If loader is null, it will use the bootstrap classloader.
*
* Java API
*/
def withLoader(loader: ClassLoader): TypedProps[T] = withLoader(Option(loader))
/**
* @returns a new Props that will use the specified ClassLoader to create its proxy class in
* If loader is null, it will use the bootstrap classloader.
*
* Scala API
*/
def withLoader(loader: Option[ClassLoader]): TypedProps[T] = this.copy(loader = loader)
/**
* @returns a new Props that will use the specified Timeout for its non-void-returning methods,
* if null is specified, it will use the default ActorTimeout as specified in the configuration.
*
* Java API
*/
def withTimeout(timeout: Timeout): TypedProps[T] = this.copy(timeout = Option(timeout))
/**
* @returns a new Props that will use the specified Timeout for its non-void-returning methods,
* if None is specified, it will use the default ActorTimeout as specified in the configuration.
*
* Scala API
*/
def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout)
/**
* Returns a new Props that has the specified interface,
* or if the interface class is not an interface, all the interfaces it implements,
* appended in the sequence of interfaces.
*/
def withInterface(interface: Class[_ >: T]): TypedProps[T] =
this.copy(interfaces = interfaces ++ TypedProps.extractInterfaces(interface))
/**
* Returns a new Props without the specified interface,
* or if the interface class is not an interface, all the interfaces it implements.
*/
def withoutInterface(interface: Class[_ >: T]): TypedProps[T] =
this.copy(interfaces = interfaces diff TypedProps.extractInterfaces(interface))
import akka.actor.{ Props ActorProps }
def actorProps(): ActorProps =
if (dispatcher == ActorProps().dispatcher && faultHandler == ActorProps().faultHandler) ActorProps()
else ActorProps(dispatcher = dispatcher, faultHandler = faultHandler)
}
case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory { case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory {
override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy) override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy)
override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot) override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot)
@ -440,21 +539,16 @@ class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory wit
// Private API // Private API
private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) Actor, props: Props, name: Option[String], loader: ClassLoader): R = {
val proxyVar = new AtomVar[R]
configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader)
}
private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: T, props: Props, name: Option[String], loader: ClassLoader): R =
createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) new TypedActor[R, T](ref, constructor), props, name, loader)
private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = { private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null) val actorVar = new AtomVar[ActorRef](null)
val timeout = props.timeout match {
//FIXME
val timeout = settings.ActorTimeout
/*val timeout = props.timeout match {
case Props.`defaultTimeout` settings.ActorTimeout case Props.`defaultTimeout` settings.ActorTimeout
case x x case x x
} }*/
val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(this, actorVar, timeout)).asInstanceOf[T] val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(this, actorVar, timeout)).asInstanceOf[T]
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props) val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props)
@ -462,7 +556,25 @@ class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory wit
proxyVar.get proxyVar.get
} }
private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ActorRef): R = {
//Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling
val actorVar = new AtomVar[ActorRef](null)
val classLoader: ClassLoader = if (props.loader.nonEmpty) props.loader.get else props.interfaces.headOption.map(_.getClassLoader).orNull
val proxy = Proxy.newProxyInstance(
classLoader,
props.interfaces.toArray,
new TypedActorInvocationHandler(this, actorVar, props.timeout.getOrElse(this.settings.ActorTimeout))).asInstanceOf[R]
proxyVar match {
case null
actorVar.set(actorRef)
proxy
case _
proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive
actorVar.set(actorRef) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet
proxyVar.get
}
}
private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler =
if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match { if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match {

View file

@ -100,7 +100,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher
"receiving an in-out message exchange" must { "receiving an in-out message exchange" must {
"lead to a TimeoutException" in { "lead to a TimeoutException" in {
service.awaitEndpointActivation(1) { service.awaitEndpointActivation(1) {
actorOf(Props(creator = () new TestBlocker("direct:publish-test-5"), timeout = Timeout(1000))) actorOf(Props(creator = () new TestBlocker("direct:publish-test-5")))
} must be(true) } must be(true)
try { try {

View file

@ -4,6 +4,7 @@
package akka.docs.actor; package akka.docs.actor;
//#imports //#imports
import akka.dispatch.*; import akka.dispatch.*;
import akka.actor.*; import akka.actor.*;
import akka.japi.*; import akka.japi.*;
@ -103,15 +104,14 @@ public class TypedActorDocTestBase {
try { try {
//#typed-actor-create1 //#typed-actor-create1
Squarer mySquarer = Squarer mySquarer =
TypedActor.get(system).typedActorOf(Squarer.class, SquarerImpl.class, new Props()); TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class, SquarerImpl.class));
//#typed-actor-create1 //#typed-actor-create1
//#typed-actor-create2 //#typed-actor-create2
Squarer otherSquarer = Squarer otherSquarer =
TypedActor.get(system).typedActorOf(Squarer.class, TypedActor.get(system).typedActorOf(new TypedProps<SquarerImpl>(Squarer.class,
new Creator<SquarerImpl>() { new Creator<SquarerImpl>() {
public SquarerImpl create() { return new SquarerImpl("foo"); } public SquarerImpl create() { return new SquarerImpl("foo"); }
}, }),
new Props(),
"name"); "name");
//#typed-actor-create2 //#typed-actor-create2

View file

@ -65,7 +65,6 @@ public class UntypedActorDocTestBase {
return new MyUntypedActor(); return new MyUntypedActor();
} }
}); });
Props props5 = props4.withTimeout(new Timeout(1000));
//#creating-props-config //#creating-props-config
} }

View file

@ -194,11 +194,9 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
val props3 = Props(new MyActor) val props3 = Props(new MyActor)
val props4 = Props( val props4 = Props(
creator = { () new MyActor }, creator = { () new MyActor },
dispatcher = "my-dispatcher", dispatcher = "my-dispatcher")
timeout = Timeout(100))
val props5 = props1.withCreator(new MyActor) val props5 = props1.withCreator(new MyActor)
val props6 = props5.withDispatcher("my-dispatcher") val props6 = props5.withDispatcher("my-dispatcher")
val props7 = props6.withTimeout(Timeout(100))
//#creating-props-config //#creating-props-config
} }

View file

@ -6,7 +6,7 @@ package akka.docs.actor
//#imports //#imports
import akka.dispatch.{ Promise, Future, Await } import akka.dispatch.{ Promise, Future, Await }
import akka.util.duration._ import akka.util.duration._
import akka.actor.{ ActorContext, TypedActor, Props } import akka.actor.{ ActorContext, TypedActor, TypedProps }
//#imports //#imports
@ -100,14 +100,11 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"create a typed actor" in { "create a typed actor" in {
//#typed-actor-create1 //#typed-actor-create1
val mySquarer: Squarer = val mySquarer: Squarer =
TypedActor(system).typedActorOf[Squarer, SquarerImpl]() TypedActor(system).typedActorOf(TypedProps[SquarerImpl]())
//#typed-actor-create1 //#typed-actor-create1
//#typed-actor-create2 //#typed-actor-create2
val otherSquarer: Squarer = val otherSquarer: Squarer =
TypedActor(system).typedActorOf(classOf[Squarer], TypedActor(system).typedActorOf(TypedProps(classOf[Squarer], new SquarerImpl("foo")), "name")
new SquarerImpl("foo"),
Props(),
"name")
//#typed-actor-create2 //#typed-actor-create2
//#typed-actor-calls //#typed-actor-calls
@ -145,7 +142,7 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) {
"supercharge" in { "supercharge" in {
//#typed-actor-supercharge-usage //#typed-actor-supercharge-usage
val awesomeFooBar = TypedActor(system).typedActorOf[Foo with Bar, FooBar]() val awesomeFooBar: Foo with Bar = TypedActor(system).typedActorOf(TypedProps[FooBar]())
awesomeFooBar.doFoo(10) awesomeFooBar.doFoo(10)
val f = awesomeFooBar.doBar("yes") val f = awesomeFooBar.doBar("yes")