From de151617f24069e2ddb7f50248962eaced09868c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 16 Jan 2012 14:11:29 +0100 Subject: [PATCH] Creating TypedProps and implementing support for wrapping an arbitrary ActorRef as a TypedActor --- .../scala/akka/actor/ActorTimeoutSpec.scala | 12 +- .../scala/akka/actor/SupervisorSpec.scala | 2 +- .../scala/akka/actor/TypedActorSpec.scala | 25 +- .../scala/akka/actor/ActorRefProvider.scala | 16 +- .../src/main/scala/akka/actor/Props.scala | 13 - .../main/scala/akka/actor/TypedActor.scala | 306 ++++++++++++------ .../scala/akka/camel/ConsumerScalaTest.scala | 2 +- .../docs/actor/TypedActorDocTestBase.java | 8 +- .../docs/actor/UntypedActorDocTestBase.java | 1 - .../code/akka/docs/actor/ActorDocSpec.scala | 4 +- .../akka/docs/actor/TypedActorDocSpec.scala | 11 +- 11 files changed, 239 insertions(+), 161 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala index 038e3fc9f1..ceb7bd0783 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorTimeoutSpec.scala @@ -14,12 +14,6 @@ import akka.util.Timeout @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeout { - def actorWithTimeout(t: Timeout): ActorRef = system.actorOf(Props(creator = () ⇒ new Actor { - def receive = { - case x ⇒ - } - }, timeout = t)) - val defaultTimeout = system.settings.ActorTimeout.duration val testTimeout = if (system.settings.ActorTimeout.duration < 400.millis) 500 millis else 100 millis @@ -27,7 +21,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo "use the global default timeout if no implicit in scope" in { within(defaultTimeout - 100.millis, defaultTimeout + 400.millis) { - val echo = actorWithTimeout(Timeout(12)) + val echo = system.actorOf(Props.empty) try { val d = system.settings.ActorTimeout.duration val f = echo ? "hallo" @@ -39,7 +33,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo "use implicitly supplied timeout" in { implicit val timeout = Timeout(testTimeout) within(testTimeout - 100.millis, testTimeout + 300.millis) { - val echo = actorWithTimeout(Props.defaultTimeout) + val echo = system.actorOf(Props.empty) try { val f = (echo ? "hallo").mapTo[String] intercept[AskTimeoutException] { Await.result(f, testTimeout + testTimeout) } @@ -49,7 +43,7 @@ class ActorTimeoutSpec extends AkkaSpec with BeforeAndAfterAll with DefaultTimeo "use explicitly supplied timeout" in { within(testTimeout - 100.millis, testTimeout + 300.millis) { - val echo = actorWithTimeout(Props.defaultTimeout) + val echo = system.actorOf(Props.empty) val f = echo.?("hallo", testTimeout) try { intercept[AskTimeoutException] { Await.result(f, testTimeout + 300.millis) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index e68e6f3906..9fe8ffb63a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -72,7 +72,7 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende // Creating actors and supervisors // ===================================================== - private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], props.timeout.duration) + private def child(supervisor: ActorRef, props: Props): ActorRef = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration) def temporaryActorAllForOne = { val supervisor = system.actorOf(Props[Supervisor].withFaultHandler(AllForOneStrategy(List(classOf[Exception]), Some(0)))) diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 6a6500b131..2d99a4925f 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -179,13 +179,14 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) def newFooBar: Foo = newFooBar(Duration(2, "s")) def newFooBar(d: Duration): Foo = - newFooBar(Props().withTimeout(Timeout(d))) + TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d))) - def newFooBar(props: Props): Foo = - TypedActor(system).typedActorOf(classOf[Foo], classOf[Bar], props) + def newFooBar(dispatcher: String, d: Duration): Foo = + TypedActor(system).typedActorOf(TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(Timeout(d)).withDispatcher(dispatcher)) - def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked = - TypedActor(system).typedActorOf(classOf[Stacked], classOf[StackedImpl], props) + def newStacked(): Stacked = + TypedActor(system).typedActorOf( + TypedProps[StackedImpl](classOf[Stacked], classOf[StackedImpl]).withTimeout(Timeout(2000))) def mustStop(typedActor: AnyRef) = TypedActor(system).stop(typedActor) must be(true) @@ -298,11 +299,11 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to handle exceptions when calling methods" in { filterEvents(EventFilter[IllegalStateException]("expected")) { val boss = system.actorOf(Props(context ⇒ { - case p: Props ⇒ context.sender ! TypedActor(context).typedActorOf(classOf[Foo], classOf[Bar], p) + case p: TypedProps[_] ⇒ context.sender ! TypedActor(context).typedActorOf(p) }).withFaultHandler(OneForOneStrategy { case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume })) - val t = Await.result((boss ? Props().withTimeout(2 seconds)).mapTo[Foo], timeout.duration) + val t = Await.result((boss ? TypedProps[Bar](classOf[Foo], classOf[Bar]).withTimeout(2 seconds)).mapTo[Foo], timeout.duration) t.incr() t.failingPigdog() @@ -330,7 +331,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) } "be able to support implementation only typed actors" in { - val t = TypedActor(system).typedActorOf[Foo, Bar](Props()) + val t: Foo = TypedActor(system).typedActorOf(TypedProps[Bar]()) val f = t.futurePigdog(200) val f2 = t.futurePigdog(0) f2.isCompleted must be(false) @@ -340,16 +341,14 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) } "be able to support implementation only typed actors with complex interfaces" in { - val t = TypedActor(system).typedActorOf[Stackable1 with Stackable2, StackedImpl]() + val t: Stackable1 with Stackable2 = TypedActor(system).typedActorOf(TypedProps[StackedImpl]()) t.stackable1 must be("foo") t.stackable2 must be("bar") mustStop(t) } "be able to use balancing dispatcher" in { - val props = Props(timeout = Timeout(6600), dispatcher = "pooled-dispatcher") - - val thais = for (i ← 1 to 60) yield newFooBar(props) + val thais = for (i ← 1 to 60) yield newFooBar("pooled-dispatcher", 6 seconds) val iterator = new CyclicIterator(thais) val results = for (i ← 1 to 120) yield (i, iterator.next.futurePigdog(200L, i)) @@ -405,7 +404,7 @@ class TypedActorSpec extends AkkaSpec(TypedActorSpec.config) "be able to override lifecycle callbacks" in { val latch = new CountDownLatch(16) val ta = TypedActor(system) - val t: LifeCycles = ta.typedActorOf(classOf[LifeCycles], new Creator[LifeCyclesImpl] { def create = new LifeCyclesImpl(latch) }, Props()) + val t: LifeCycles = ta.typedActorOf(TypedProps[LifeCyclesImpl](classOf[LifeCycles], new LifeCyclesImpl(latch))) EventFilter[IllegalStateException]("Crash!", occurrences = 1) intercept { t.crash() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index d940aa2c20..a4b3db0686 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -580,9 +580,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, private def createContinuousTask(delay: Duration, receiver: ActorRef, message: Any): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { - // Check if the receiver is still alive and kicking before sending it a message and reschedule the task + receiver ! message + // Check if the receiver is still alive and kicking before rescheduling the task if (!receiver.isTerminated) { - receiver ! message try timeout.getTimer.newTimeout(this, delay) catch { case _: IllegalStateException ⇒ // stop recurring if timer is stopped } @@ -593,16 +593,8 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter, } } - private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = { - new TimerTask { - def run(timeout: org.jboss.netty.akka.util.Timeout) { - dispatcher.execute(new Runnable { def run = f }) - try timeout.getTimer.newTimeout(this, delay) catch { - case _: IllegalStateException ⇒ // stop recurring if timer is stopped - } - } - } - } + private def createContinuousTask(delay: Duration, f: ⇒ Unit): TimerTask = + createContinuousTask(delay, new Runnable { def run = f }) private def createContinuousTask(delay: Duration, runnable: Runnable): TimerTask = { new TimerTask { diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 0a032408a2..dccfd0bd3c 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -21,7 +21,6 @@ object Props { import FaultHandlingStrategy._ final val defaultCreator: () ⇒ Actor = () ⇒ throw new UnsupportedOperationException("No actor creator specified!") - final val defaultTimeout: Timeout = Timeout(Duration.MinusInf) final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop case _: ActorKilledException ⇒ Stop @@ -95,12 +94,10 @@ object Props { * val props = Props( * creator = .., * dispatcher = .., - * timeout = .., * faultHandler = .., * routerConfig = .. * ) * val props = Props().withCreator(new MyActor) - * val props = Props[MyActor].withTimeout(timeout) * val props = Props[MyActor].withRouter(RoundRobinRouter(..)) * val props = Props[MyActor].withFaultHandler(OneForOneStrategy { * case e: IllegalStateException ⇒ Resume @@ -117,7 +114,6 @@ object Props { * } * }); * Props props = new Props().withCreator(new UntypedActorFactory() { ... }); - * Props props = new Props(MyActor.class).withTimeout(timeout); * Props props = new Props(MyActor.class).withFaultHandler(new OneForOneStrategy(...)); * Props props = new Props(MyActor.class).withRouter(new RoundRobinRouter(..)); * }}} @@ -125,7 +121,6 @@ object Props { case class Props( creator: () ⇒ Actor = Props.defaultCreator, dispatcher: String = Dispatchers.DefaultDispatcherId, - timeout: Timeout = Props.defaultTimeout, faultHandler: FaultHandlingStrategy = Props.defaultFaultHandler, routerConfig: RouterConfig = Props.defaultRoutedProps) { @@ -135,7 +130,6 @@ case class Props( def this() = this( creator = Props.defaultCreator, dispatcher = Dispatchers.DefaultDispatcherId, - timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler) /** @@ -144,7 +138,6 @@ case class Props( def this(factory: UntypedActorFactory) = this( creator = () ⇒ factory.create(), dispatcher = Dispatchers.DefaultDispatcherId, - timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler) /** @@ -153,7 +146,6 @@ case class Props( def this(actorClass: Class[_ <: Actor]) = this( creator = () ⇒ actorClass.newInstance, dispatcher = Dispatchers.DefaultDispatcherId, - timeout = Props.defaultTimeout, faultHandler = Props.defaultFaultHandler, routerConfig = Props.defaultRoutedProps) @@ -183,11 +175,6 @@ case class Props( */ def withDispatcher(d: String) = copy(dispatcher = d) - /** - * Returns a new Props with the specified timeout set. - */ - def withTimeout(t: Timeout) = copy(timeout = t) - /** * Returns a new Props with the specified faulthandler set. */ diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index c1cefd8153..f143db0f8a 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -12,6 +12,7 @@ import akka.serialization.{ Serializer, Serialization } import akka.dispatch._ import akka.serialization.SerializationExtension import java.util.concurrent.TimeoutException +import java.lang.IllegalStateException trait TypedActorFactory { @@ -48,100 +49,31 @@ trait TypedActorFactory { def getActorRefFor(proxy: AnyRef): ActorRef /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the suppli ed interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - * - * Java API + * Creates a new TypedActor with the specified properies */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = - typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, None, interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - * - * Java API - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, name: String): R = - typedActor.createProxyAndTypedActor(actorFactory, interface, impl.newInstance, props, Some(name), interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - * - * Java API - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = - typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, None, interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - * - * Java API - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, name: String): R = - typedActor.createProxyAndTypedActor(actorFactory, interface, impl.create, props, Some(name), interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - * - * Scala API - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: ⇒ T, props: Props, name: String): R = - typedActor.createProxyAndTypedActor(actorFactory, interface, impl, props, Some(name), interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) - * - * Scala API - */ - def typedActorOf[R <: AnyRef, T <: R: ClassManifest](props: Props = Props(), name: String = null): R = { - val clazz = implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[T]] - typedActor.createProxyAndTypedActor(actorFactory, clazz, clazz.newInstance, props, Option(name), clazz.getClassLoader) + def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T]): R = { + val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver + val c = props.creator //Cache this to avoid closing over the Props + val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c())) + typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap)) } /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf + * Creates a new TypedActor with the specified properies */ - def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), name: String = null, loader: ClassLoader = null)(implicit m: Manifest[R]): R = - typedActor.createProxy[R](actorFactory, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, Option(name), if (loader eq null) m.erasure.getClassLoader else loader) + def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], name: String): R = { + val proxyVar = new AtomVar[R] //Chicken'n'egg-resolver + val c = props.creator //Cache this to avoid closing over the Props + val ap = props.actorProps.withCreator(new akka.actor.TypedActor.TypedActor[R, T](proxyVar, c())) + typedActor.createActorRefProxy(props, proxyVar, actorFactory.actorOf(ap, name)) + } /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf + * Creates a TypedActor that intercepts the calls and forwards them as [[akka.actor.TypedActor.MethodCall]] + * to the provided ActorRef. */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = - typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, None, loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxy(actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, Some(name), loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = - typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, None, loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, name: String, loader: ClassLoader): R = - typedActor.createProxy[R](actorFactory, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, Some(name), loader) + def typedActorOf[R <: AnyRef, T <: R](props: TypedProps[T], actorRef: ActorRef): R = + typedActor.createActorRefProxy(props, null: AtomVar[R], actorRef) } @@ -412,6 +344,173 @@ object TypedActor extends ExtensionId[TypedActorExtension] with ExtensionIdProvi } } +/** + * TypedProps is a TypedActor configuration object, that is thread safe and fully sharable. + * It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance. + */ +object TypedProps { + + val defaultDispatcherId: String = Dispatchers.DefaultDispatcherId + val defaultFaultHandler: FaultHandlingStrategy = akka.actor.Props.defaultFaultHandler + val defaultTimeout: Option[Timeout] = None + val defaultLoader: Option[ClassLoader] = None + + /** + * @returns a sequence of interfaces that the speicified class implements, + * or a sequence containing only itself, if itself is an interface. + */ + def extractInterfaces(clazz: Class[_]): Seq[Class[_]] = + if (clazz.isInterface) Seq[Class[_]](clazz) else clazz.getInterfaces.toList + + /** + * Uses the supplied class as the factory for the TypedActor implementation, + * proxying all the interfaces it implements. + * + * Scala API + */ + def apply[T <: AnyRef](implementation: Class[T]): TypedProps[T] = + new TypedProps[T](implementation) + + /** + * Uses the supplied class as the factory for the TypedActor implementation, + * and that has the specified interface, + * or if the interface class is not an interface, all the interfaces it implements, + * appended in the sequence of interfaces. + * + * Scala API + */ + def apply[T <: AnyRef](interface: Class[_ >: T], implementation: Class[T]): TypedProps[T] = + new TypedProps[T](extractInterfaces(interface), () ⇒ implementation.newInstance()) + + /** + * Uses the supplied thunk as the factory for the TypedActor implementation, + * and that has the specified interface, + * or if the interface class is not an interface, all the interfaces it implements, + * appended in the sequence of interfaces. + * + * Scala API + */ + def apply[T <: AnyRef](interface: Class[_ >: T], creator: ⇒ T): TypedProps[T] = + new TypedProps[T](extractInterfaces(interface), () ⇒ creator) + + /** + * Uses the supplied class as the factory for the TypedActor implementation, + * proxying all the interfaces it implements. + * + * Scala API + */ + def apply[T <: AnyRef: ClassManifest](): TypedProps[T] = + new TypedProps[T](implicitly[ClassManifest[T]].erasure.asInstanceOf[Class[T]]) +} + +/** + * TypedProps is a TypedActor configuration object, that is thread safe and fully sharable. + * It's used in TypedActorFactory.typedActorOf to configure a TypedActor instance. + */ +case class TypedProps[T <: AnyRef] protected[akka] (interfaces: Seq[Class[_]], + creator: () ⇒ T, + dispatcher: String = TypedProps.defaultDispatcherId, + faultHandler: FaultHandlingStrategy = TypedProps.defaultFaultHandler, + timeout: Option[Timeout] = TypedProps.defaultTimeout, + loader: Option[ClassLoader] = TypedProps.defaultLoader) { + + /** + * Uses the supplied class as the factory for the TypedActor implementation, + * and that has the specified interface, + * or if the interface class is not an interface, all the interfaces it implements, + * appended in the sequence of interfaces. + */ + def this(implementation: Class[T]) = + this(interfaces = TypedProps.extractInterfaces(implementation), + creator = () ⇒ implementation.newInstance()) + + /** + * Uses the supplied Creator as the factory for the TypedActor implementation, + * and that has the specified interface, + * or if the interface class is not an interface, all the interfaces it implements, + * appended in the sequence of interfaces. + * + * Java API. + */ + def this(interface: Class[_ >: T], implementation: Creator[T]) = + this(interfaces = TypedProps.extractInterfaces(interface), + creator = () ⇒ implementation.create()) + + /** + * Uses the supplied class as the factory for the TypedActor implementation, + * and that has the specified interface, + * or if the interface class is not an interface, all the interfaces it implements, + * appended in the sequence of interfaces. + * + * Java API. + */ + def this(interface: Class[_ >: T], implementation: Class[T]) = + this(interfaces = TypedProps.extractInterfaces(interface), + creator = () ⇒ implementation.newInstance()) + + /** + * Returns a new Props with the specified dispatcher set. + */ + def withDispatcher(d: String) = copy(dispatcher = d) + + /** + * Returns a new Props with the specified faulthandler set. + */ + def withFaultHandler(f: FaultHandlingStrategy) = copy(faultHandler = f) + + /** + * @returns a new Props that will use the specified ClassLoader to create its proxy class in + * If loader is null, it will use the bootstrap classloader. + * + * Java API + */ + def withLoader(loader: ClassLoader): TypedProps[T] = withLoader(Option(loader)) + + /** + * @returns a new Props that will use the specified ClassLoader to create its proxy class in + * If loader is null, it will use the bootstrap classloader. + * + * Scala API + */ + def withLoader(loader: Option[ClassLoader]): TypedProps[T] = this.copy(loader = loader) + + /** + * @returns a new Props that will use the specified Timeout for its non-void-returning methods, + * if null is specified, it will use the default ActorTimeout as specified in the configuration. + * + * Java API + */ + def withTimeout(timeout: Timeout): TypedProps[T] = this.copy(timeout = Option(timeout)) + + /** + * @returns a new Props that will use the specified Timeout for its non-void-returning methods, + * if None is specified, it will use the default ActorTimeout as specified in the configuration. + * + * Scala API + */ + def withTimeout(timeout: Option[Timeout]): TypedProps[T] = this.copy(timeout = timeout) + + /** + * Returns a new Props that has the specified interface, + * or if the interface class is not an interface, all the interfaces it implements, + * appended in the sequence of interfaces. + */ + def withInterface(interface: Class[_ >: T]): TypedProps[T] = + this.copy(interfaces = interfaces ++ TypedProps.extractInterfaces(interface)) + + /** + * Returns a new Props without the specified interface, + * or if the interface class is not an interface, all the interfaces it implements. + */ + def withoutInterface(interface: Class[_ >: T]): TypedProps[T] = + this.copy(interfaces = interfaces diff TypedProps.extractInterfaces(interface)) + + import akka.actor.{ Props ⇒ ActorProps } + def actorProps(): ActorProps = + if (dispatcher == ActorProps().dispatcher && faultHandler == ActorProps().faultHandler) ActorProps() + else ActorProps(dispatcher = dispatcher, faultHandler = faultHandler) +} + case class ContextualTypedActorFactory(typedActor: TypedActorExtension, actorFactory: ActorContext) extends TypedActorFactory { override def getActorRefFor(proxy: AnyRef): ActorRef = typedActor.getActorRefFor(proxy) override def isTypedActor(proxyOrNot: AnyRef): Boolean = typedActor.isTypedActor(proxyOrNot) @@ -440,21 +539,16 @@ class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory wit // Private API - private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, name: Option[String], loader: ClassLoader): R = { - val proxyVar = new AtomVar[R] - configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), name, loader) - } - - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, name: Option[String], loader: ClassLoader): R = - createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, name, loader) - private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, name: Option[String], loader: ClassLoader): T = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) - val timeout = props.timeout match { + + //FIXME + val timeout = settings.ActorTimeout + /*val timeout = props.timeout match { case Props.`defaultTimeout` ⇒ settings.ActorTimeout case x ⇒ x - } + }*/ val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(this, actorVar, timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive val ref = if (name.isDefined) supervisor.actorOf(props, name.get) else supervisor.actorOf(props) @@ -462,7 +556,25 @@ class TypedActorExtension(system: ActorSystemImpl) extends TypedActorFactory wit proxyVar.get } - private[akka] def extractInterfaces(clazz: Class[_]): Array[Class[_]] = if (clazz.isInterface) Array[Class[_]](clazz) else clazz.getInterfaces + private[akka] def createActorRefProxy[R <: AnyRef, T <: R](props: TypedProps[T], proxyVar: AtomVar[R], actorRef: ⇒ ActorRef): R = { + //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling + val actorVar = new AtomVar[ActorRef](null) + val classLoader: ClassLoader = if (props.loader.nonEmpty) props.loader.get else props.interfaces.headOption.map(_.getClassLoader).orNull + val proxy = Proxy.newProxyInstance( + classLoader, + props.interfaces.toArray, + new TypedActorInvocationHandler(this, actorVar, props.timeout.getOrElse(this.settings.ActorTimeout))).asInstanceOf[R] + + proxyVar match { + case null ⇒ + actorVar.set(actorRef) + proxy + case _ ⇒ + proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive + actorVar.set(actorRef) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet + proxyVar.get + } + } private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = if ((typedActor_? ne null) && Proxy.isProxyClass(typedActor_?.getClass)) typedActor_? match { diff --git a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala index 47dbdbba54..4382f9a2a6 100644 --- a/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala +++ b/akka-camel/src/test/scala/akka/camel/ConsumerScalaTest.scala @@ -100,7 +100,7 @@ class ConsumerScalaTest extends WordSpec with BeforeAndAfterAll with MustMatcher "receiving an in-out message exchange" must { "lead to a TimeoutException" in { service.awaitEndpointActivation(1) { - actorOf(Props(creator = () ⇒ new TestBlocker("direct:publish-test-5"), timeout = Timeout(1000))) + actorOf(Props(creator = () ⇒ new TestBlocker("direct:publish-test-5"))) } must be(true) try { diff --git a/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java index 922502f1c9..6726c3e6f0 100644 --- a/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/TypedActorDocTestBase.java @@ -4,6 +4,7 @@ package akka.docs.actor; //#imports + import akka.dispatch.*; import akka.actor.*; import akka.japi.*; @@ -103,15 +104,14 @@ public class TypedActorDocTestBase { try { //#typed-actor-create1 Squarer mySquarer = - TypedActor.get(system).typedActorOf(Squarer.class, SquarerImpl.class, new Props()); + TypedActor.get(system).typedActorOf(new TypedProps(Squarer.class, SquarerImpl.class)); //#typed-actor-create1 //#typed-actor-create2 Squarer otherSquarer = - TypedActor.get(system).typedActorOf(Squarer.class, + TypedActor.get(system).typedActorOf(new TypedProps(Squarer.class, new Creator() { public SquarerImpl create() { return new SquarerImpl("foo"); } - }, - new Props(), + }), "name"); //#typed-actor-create2 diff --git a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java index b1d84a5841..a6cbc21338 100644 --- a/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java +++ b/akka-docs/java/code/akka/docs/actor/UntypedActorDocTestBase.java @@ -65,7 +65,6 @@ public class UntypedActorDocTestBase { return new MyUntypedActor(); } }); - Props props5 = props4.withTimeout(new Timeout(1000)); //#creating-props-config } diff --git a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala index 20ac33480b..21f3492370 100644 --- a/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/ActorDocSpec.scala @@ -194,11 +194,9 @@ class ActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { val props3 = Props(new MyActor) val props4 = Props( creator = { () ⇒ new MyActor }, - dispatcher = "my-dispatcher", - timeout = Timeout(100)) + dispatcher = "my-dispatcher") val props5 = props1.withCreator(new MyActor) val props6 = props5.withDispatcher("my-dispatcher") - val props7 = props6.withTimeout(Timeout(100)) //#creating-props-config } diff --git a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala index a10ce60a36..0d6ad1e648 100644 --- a/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala +++ b/akka-docs/scala/code/akka/docs/actor/TypedActorDocSpec.scala @@ -6,7 +6,7 @@ package akka.docs.actor //#imports import akka.dispatch.{ Promise, Future, Await } import akka.util.duration._ -import akka.actor.{ ActorContext, TypedActor, Props } +import akka.actor.{ ActorContext, TypedActor, TypedProps } //#imports @@ -100,14 +100,11 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "create a typed actor" in { //#typed-actor-create1 val mySquarer: Squarer = - TypedActor(system).typedActorOf[Squarer, SquarerImpl]() + TypedActor(system).typedActorOf(TypedProps[SquarerImpl]()) //#typed-actor-create1 //#typed-actor-create2 val otherSquarer: Squarer = - TypedActor(system).typedActorOf(classOf[Squarer], - new SquarerImpl("foo"), - Props(), - "name") + TypedActor(system).typedActorOf(TypedProps(classOf[Squarer], new SquarerImpl("foo")), "name") //#typed-actor-create2 //#typed-actor-calls @@ -145,7 +142,7 @@ class TypedActorDocSpec extends AkkaSpec(Map("akka.loglevel" -> "INFO")) { "supercharge" in { //#typed-actor-supercharge-usage - val awesomeFooBar = TypedActor(system).typedActorOf[Foo with Bar, FooBar]() + val awesomeFooBar: Foo with Bar = TypedActor(system).typedActorOf(TypedProps[FooBar]()) awesomeFooBar.doFoo(10) val f = awesomeFooBar.doBar("yes")