From 3b698b9470155c6ad4f90103142a6ffc351f4bf5 Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 20 Oct 2011 23:37:54 +0200 Subject: [PATCH] nearly done, only two known test failures MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - moved typed actor factories to app/context (like actor factories) - fixed a few misplaced supervision changes, all such tests green now - actually test akka-reference.conf in ConfigSpec - made DispatcherActorSpec more deterministic (failed intermittently here, was due to race towards thread pool) - wrapped all actor initialization failures into ActorInitializationException and made sure that this leads to Stop - default to Stop on ActorKilledException - fixed ActorModelSpec to separately supervise the “waves of actors” because otherwise the app.guardian is way too busy processing all those ChildTerminated messages - change ActorCell._children from Vector[Stats] to TreeMap[ActorRef, Stats] for performance reasons, have not measured memory impact, yet - ensured that InterrupedException does not leave current thread via Failed message to supervisor (wrapped in ActorInterruptedException) - set core-size=1 and max-size=4 for default dispatcher during test --- .../scala/akka/actor/SupervisorSpec.scala | 13 +- .../scala/akka/actor/TypedActorSpec.scala | 16 +- .../actor/dispatch/DispatcherActorSpec.scala | 8 +- .../test/scala/akka/config/ConfigSpec.scala | 3 +- .../scala/akka/routing/ActorPoolSpec.scala | 35 +---- .../src/main/scala/akka/AkkaApplication.scala | 7 +- .../src/main/scala/akka/actor/Actor.scala | 6 +- .../src/main/scala/akka/actor/ActorCell.scala | 23 +-- .../src/main/scala/akka/actor/FSM.scala | 8 +- akka-actor/src/main/scala/akka/actor/IO.scala | 8 +- .../src/main/scala/akka/actor/Props.scala | 1 + .../main/scala/akka/actor/TypedActor.scala | 147 +++++++++--------- .../src/main/scala/akka/routing/Routing.scala | 2 +- .../scala/akka/util/ListenerManagement.scala | 5 +- .../akka/remote/RemoteActorRefProvider.scala | 1 + .../scala/akka/remote/MultiJvmSync.scala | 11 +- .../scala/akka/testkit/TestActorRef.scala | 2 +- config/akka.test.conf | 6 + project/AkkaBuild.scala | 2 +- 19 files changed, 149 insertions(+), 155 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 24c104debe..7cf0220f26 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -55,14 +55,17 @@ object SupervisorSpec { class Master extends Actor { val temp = context.actorOf(Props[PingPongActor]) + self startsMonitoring temp + var s: UntypedChannel = _ def receive = { - case Die ⇒ (temp.?(Die, TimeoutMillis)).get - case _: Terminated ⇒ + case Die ⇒ temp ! Die; s = context.channel + case Terminated(`temp`, cause) ⇒ s ! cause } } } +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach { import SupervisorSpec._ @@ -147,12 +150,12 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach { "not restart programmatically linked temporary actor" in { val master = actorOf(Props[Master].withFaultHandler(OneForOneStrategy(List(classOf[Exception]), Some(0)))) - intercept[RuntimeException] { - (master.?(Die, TimeoutMillis)).get + (master.?(Die, TimeoutMillis)).get match { + case r: RuntimeException ⇒ r === ExceptionMessage } sleepFor(1 second) - messageLog.size must be(0) + messageLogPoll must be(null) } "not restart temporary actor" in { diff --git a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala index 7df3a592ca..ebae2d148a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/TypedActorSpec.scala @@ -7,6 +7,7 @@ package akka.actor import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } import akka.japi.{ Option ⇒ JOption } import akka.util.Duration +import akka.util.duration._ import akka.dispatch.{ Dispatchers, Future, KeptPromise } import akka.serialization.Serialization import java.util.concurrent.atomic.AtomicReference @@ -145,10 +146,10 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte newFooBar(Props().withTimeout(Timeout(d))) def newFooBar(props: Props): Foo = - app.typedActor.typedActorOf(classOf[Foo], classOf[Bar], props) + app.typedActorOf(classOf[Foo], classOf[Bar], props) def newStacked(props: Props = Props().withTimeout(Timeout(2000))): Stacked = - app.typedActor.typedActorOf(classOf[Stacked], classOf[StackedImpl], props) + app.typedActorOf(classOf[Stacked], classOf[StackedImpl], props) def mustStop(typedActor: AnyRef) = app.typedActor.stop(typedActor) must be(true) @@ -260,7 +261,12 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte "be able to handle exceptions when calling methods" in { filterEvents(EventFilter[IllegalStateException]("expected")) { - val t = newFooBar + val boss = actorOf(Props(context ⇒ { + case p: Props ⇒ context.channel ! context.typedActorOf(classOf[Foo], classOf[Bar], p) + }).withFaultHandler(OneForOneStrategy { + case e: IllegalStateException if e.getMessage == "expected" ⇒ FaultHandlingStrategy.Resume + })) + val t = (boss ? Props().withTimeout(2 seconds)).as[Foo].get t.incr() t.failingPigdog() @@ -292,7 +298,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte } "be able to support implementation only typed actors" in { - val t = app.typedActor.typedActorOf[Foo, Bar](Props()) + val t = app.typedActorOf[Foo, Bar](Props()) val f = t.futurePigdog(200) val f2 = t.futurePigdog(0) f2.isCompleted must be(false) @@ -302,7 +308,7 @@ class TypedActorSpec extends AkkaSpec with BeforeAndAfterEach with BeforeAndAfte } "be able to support implementation only typed actors with complex interfaces" in { - val t = app.typedActor.typedActorOf[Stackable1 with Stackable2, StackedImpl]() + val t = app.typedActorOf[Stackable1 with Stackable2, StackedImpl]() t.stackable1 must be("foo") t.stackable2 must be("bar") mustStop(t) diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index 59255bd473..d677e23fc1 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -67,24 +67,26 @@ class DispatcherActorSpec extends AkkaSpec { val works = new AtomicBoolean(true) val latch = new CountDownLatch(100) + val thereWeAre = new CountDownLatch(1) val start = new CountDownLatch(1) val fastOne = actorOf( Props(context ⇒ { case "sabotage" ⇒ works.set(false) }).withDispatcher(throughputDispatcher)) val slowOne = actorOf( Props(context ⇒ { - case "hogexecutor" ⇒ start.await + case "hogexecutor" ⇒ thereWeAre.countDown(); start.await case "ping" ⇒ if (works.get) latch.countDown() }).withDispatcher(throughputDispatcher)) slowOne ! "hogexecutor" (1 to 100) foreach { _ ⇒ slowOne ! "ping" } + assert(thereWeAre.await(2, TimeUnit.SECONDS)) fastOne ! "sabotage" start.countDown() - val result = latch.await(10, TimeUnit.SECONDS) + latch.await(10, TimeUnit.SECONDS) fastOne.stop() slowOne.stop() - assert(result === true) + assert(latch.getCount() === 0) } "respect throughput deadline" in { diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 24abbc8ad0..531ad1e0e4 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -5,8 +5,9 @@ package akka.config import akka.testkit.AkkaSpec +import akka.AkkaApplication -class ConfigSpec extends AkkaSpec { +class ConfigSpec extends AkkaSpec(AkkaApplication("ConfigSpec", Configuration.fromFile("config/akka-reference.conf"))) { "The default configuration file (i.e. akka-reference.conf)" must { "contain all configuration properties for akka-actor that are used in code with their correct defaults" in { diff --git a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala index 0c1f381b1b..15b23dec0f 100644 --- a/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/routing/ActorPoolSpec.scala @@ -330,7 +330,7 @@ class ActorPoolSpec extends AkkaSpec { "support typed actors" in { import RoutingSpec._ - import app.typedActor._ + def createPool = new Actor with DefaultActorPool with BoundedCapacityStrategy with MailboxPressureCapacitor with SmallestMailboxSelector with Filter with RunningMeanBackoff with BasicRampup { def lowerBound = 1 def upperBound = 5 @@ -340,11 +340,11 @@ class ActorPoolSpec extends AkkaSpec { def rampupRate = 0.1 def backoffRate = 0.50 def backoffThreshold = 0.50 - def instance(p: Props) = getActorRefFor(typedActorOf[Foo, FooImpl](p)) + def instance(p: Props) = app.typedActor.getActorRefFor(context.typedActorOf[Foo, FooImpl](p)) def receive = _route } - val pool = createProxy[Foo](createPool, Props().withFaultHandler(faultHandler)) + val pool = app.createProxy[Foo](createPool, Props().withFaultHandler(faultHandler)) val results = for (i ← 1 to 100) yield (i, pool.sq(i, 100)) @@ -357,7 +357,7 @@ class ActorPoolSpec extends AkkaSpec { val deathCount = new AtomicInteger(0) val keepDying = new AtomicBoolean(false) - val pool1 = actorOf( + val pool1, pool2 = actorOf( Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { def lowerBound = 2 def upperBound = 5 @@ -368,30 +368,7 @@ class ActorPoolSpec extends AkkaSpec { def selectionCount = 1 def receive = _route def pressureThreshold = 1 - def instance(p: Props) = actorOf(p.withCreator(new Actor { - if (deathCount.get > 5) deathCount.set(0) - if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } - def receive = { - case akka.Die ⇒ - if (keepDying.get) deathCount.incrementAndGet - throw new RuntimeException - case _ ⇒ pingCount.incrementAndGet - } - })) - }).withFaultHandler(faultHandler)) - - val pool2 = actorOf( - Props(new Actor with DefaultActorPool with BoundedCapacityStrategy with ActiveFuturesPressureCapacitor with SmallestMailboxSelector with BasicFilter { - def lowerBound = 2 - def upperBound = 5 - def rampupRate = 0.1 - def backoffRate = 0.1 - def backoffThreshold = 0.5 - def partialFill = true - def selectionCount = 1 - def receive = _route - def pressureThreshold = 1 - def instance(p: Props) = actorOf(p.withCreator(new Actor { + def instance(p: Props) = context.actorOf(p.withCreator(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } def receive = { @@ -414,7 +391,7 @@ class ActorPoolSpec extends AkkaSpec { def selectionCount = 1 def receive = _route def pressureThreshold = 1 - def instance(p: Props) = actorOf(p.withCreator(new Actor { + def instance(p: Props) = context.actorOf(p.withCreator(new Actor { if (deathCount.get > 5) deathCount.set(0) if (deathCount.get > 0) { deathCount.incrementAndGet; throw new IllegalStateException("keep dying") } diff --git a/akka-actor/src/main/scala/akka/AkkaApplication.scala b/akka-actor/src/main/scala/akka/AkkaApplication.scala index 129e9a8aac..cedfc56c32 100644 --- a/akka-actor/src/main/scala/akka/AkkaApplication.scala +++ b/akka-actor/src/main/scala/akka/AkkaApplication.scala @@ -77,7 +77,7 @@ object AkkaApplication { } -class AkkaApplication(val name: String, val config: Configuration) extends ActorRefFactory { +class AkkaApplication(val name: String, val config: Configuration) extends ActorRefFactory with TypedActorFactory { def this(name: String) = this(name, AkkaApplication.defaultConfig) def this() = this("default") @@ -177,8 +177,9 @@ class AkkaApplication(val name: String, val config: Configuration) extends Actor import akka.actor.FaultHandlingStrategy._ new LocalActorRef(this, Props(context ⇒ { case _ ⇒ }).withFaultHandler(OneForOneStrategy { - case _: ActorKilledException ⇒ Stop - case _: Exception ⇒ Restart + case _: ActorKilledException ⇒ Stop + case _: ActorInitializationException ⇒ Stop + case _: Exception ⇒ Restart }).withDispatcher(dispatcher), provider.theOneWhoWalksTheBubblesOfSpaceTime, "ApplicationSupervisor", diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index d3ef4a04c9..4413a05e29 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -79,9 +79,9 @@ class ActorKilledException private[akka] (message: String, cause: Throwable) def this(msg: String) = this(msg, null); } -class ActorInitializationException private[akka] (message: String, cause: Throwable = null) - extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null); +case class ActorInitializationException private[akka] (actor: ActorRef, message: String, cause: Throwable = null) + extends AkkaException(message, cause) with NoStackTrace { + def this(msg: String) = this(null, msg, null); } class ActorTimeoutException private[akka] (message: String, cause: Throwable = null) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 586f2c9d81..2be9eef251 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -18,7 +18,7 @@ import akka.AkkaApplication * Exposes contextual information for the actor and the current message. * TODO: everything here for current compatibility - could be limited more */ -private[akka] trait ActorContext extends ActorRefFactory { +private[akka] trait ActorContext extends ActorRefFactory with TypedActorFactory { def self: ActorRef with ScalaActorRef @@ -72,6 +72,8 @@ private[akka] class ActorCell( protected def guardian = self + protected def typedActor = app.typedActor + def provider = app.provider var futureTimeout: Option[ScheduledFuture[AnyRef]] = None //FIXME TODO Doesn't need to be volatile either, since it will only ever be accessed when a message is processed @@ -163,7 +165,7 @@ private[akka] class ActorCell( val instance = props.creator() if (instance eq null) - throw new ActorInitializationException("Actor instance passed to actorOf can't be 'null'") + throw ActorInitializationException(self, "Actor instance passed to actorOf can't be 'null'") instance } finally { @@ -182,13 +184,14 @@ private[akka] class ActorCell( checkReceiveTimeout if (app.AkkaConfig.DebugLifecycle) app.eventHandler.debug(self, "started") } catch { - case e ⇒ try { - app.eventHandler.error(e, self, "error while creating actor") - // prevent any further messages to be processed until the actor has been restarted - dispatcher.suspend(this) - } finally { - supervisor ! Failed(self, e) - } + case e ⇒ + try { + app.eventHandler.error(e, self, "error while creating actor") + // prevent any further messages to be processed until the actor has been restarted + dispatcher.suspend(this) + } finally { + supervisor ! Failed(self, ActorInitializationException(self, "exception during creation", e)) + } } def recreate(cause: Throwable): Unit = try { @@ -218,7 +221,7 @@ private[akka] class ActorCell( // prevent any further messages to be processed until the actor has been restarted dispatcher.suspend(this) } finally { - supervisor ! Failed(self, e) + supervisor ! Failed(self, ActorInitializationException(self, "exception during re-creation", e)) } } diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index 4beff7229d..f1dbd61a16 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -466,14 +466,10 @@ trait FSM[S, D] extends ListenerManagement { } } case SubscribeTransitionCallBack(actorRef) ⇒ + // TODO use DeathWatch to clean up list addListener(actorRef) // send current state back as reference point - try { - actorRef ! CurrentState(self, currentState.stateName) - } catch { - case e: ActorInitializationException ⇒ - app.eventHandler.warning(context.self, "trying to register not running listener") - } + actorRef ! CurrentState(self, currentState.stateName) case UnsubscribeTransitionCallBack(actorRef) ⇒ removeListener(actorRef) case value ⇒ { diff --git a/akka-actor/src/main/scala/akka/actor/IO.scala b/akka-actor/src/main/scala/akka/actor/IO.scala index a47e287bca..600d3a334a 100644 --- a/akka-actor/src/main/scala/akka/actor/IO.scala +++ b/akka-actor/src/main/scala/akka/actor/IO.scala @@ -394,12 +394,8 @@ private[akka] class IOWorker(app: AkkaApplication, ioManager: ActorRef, val buff case Some(channel) ⇒ channel.close channels -= handle - try { - handle.owner ! IO.Closed(handle, cause) - } catch { - case e: ActorInitializationException ⇒ - app.eventHandler debug (ioManager, "IO.Handle's owner not running") - } + // TODO: what if handle.owner is no longer running? + handle.owner ! IO.Closed(handle, cause) case None ⇒ } } diff --git a/akka-actor/src/main/scala/akka/actor/Props.scala b/akka-actor/src/main/scala/akka/actor/Props.scala index 26fee852bb..b3ae7f27ef 100644 --- a/akka-actor/src/main/scala/akka/actor/Props.scala +++ b/akka-actor/src/main/scala/akka/actor/Props.scala @@ -23,6 +23,7 @@ object Props { final val defaultTimeout: Timeout = Timeout(Duration.MinusInf) final val defaultDecider: Decider = { case _: ActorInitializationException ⇒ Stop + case _: ActorKilledException ⇒ Stop case _: Exception ⇒ Restart case _ ⇒ Escalate } diff --git a/akka-actor/src/main/scala/akka/actor/TypedActor.scala b/akka-actor/src/main/scala/akka/actor/TypedActor.scala index ea23e77bd7..1345f436d3 100644 --- a/akka-actor/src/main/scala/akka/actor/TypedActor.scala +++ b/akka-actor/src/main/scala/akka/actor/TypedActor.scala @@ -120,6 +120,76 @@ object TypedActor { implicit def timeout = app.AkkaConfig.ActorTimeout } +trait TypedActorFactory { this: ActorRefFactory ⇒ + + protected def typedActor: TypedActor + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, interface.getClassLoader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, interface.getClassLoader) + + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(this, interface, impl.newInstance, props, loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or + * all interfaces (Class.getInterfaces) if it's not an interface class + */ + def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(this, interface, impl.create, props, loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) + */ + def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = + typedActor.createProxyAndTypedActor(this, impl, impl.newInstance, props, loader) + + /** + * Creates a new TypedActor proxy using the supplied Props, + * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) + */ + def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { + val clazz = m.erasure.asInstanceOf[Class[T]] + typedActor.createProxyAndTypedActor(this, clazz, clazz.newInstance, props, if (loader eq null) clazz.getClassLoader else loader) + } + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = + typedActor.createProxy[R](this, typedActor.extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, if (loader eq null) m.erasure.getClassLoader else loader) + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = + typedActor.createProxy(this, interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, loader) + + /** + * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, + * to create TypedActor proxies, use typedActorOf + */ + def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = + typedActor.createProxy[R](this, interfaces, (ref: AtomVar[R]) ⇒ constructor, props, loader) + +} + //TODO Document this class, not only in Scaladoc, but also in a dedicated typed-actor.rst, for both java and scala /** * A TypedActor in Akka is an implementation of the Active Objects Pattern, i.e. an object with asynchronous method dispatch @@ -140,50 +210,6 @@ object TypedActor { class TypedActor(val app: AkkaApplication) { import TypedActor.MethodCall - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props): R = - createProxyAndTypedActor(interface, impl.newInstance, props, interface.getClassLoader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props): R = - createProxyAndTypedActor(interface, impl.create, props, interface.getClassLoader) - - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Class[T], props: Props, loader: ClassLoader): R = - createProxyAndTypedActor(interface, impl.newInstance, props, loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied interface class (if the class represents an interface) or - * all interfaces (Class.getInterfaces) if it's not an interface class - */ - def typedActorOf[R <: AnyRef, T <: R](interface: Class[R], impl: Creator[T], props: Props, loader: ClassLoader): R = - createProxyAndTypedActor(interface, impl.create, props, loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) - */ - def typedActorOf[R <: AnyRef, T <: R](impl: Class[T], props: Props, loader: ClassLoader): R = - createProxyAndTypedActor(impl, impl.newInstance, props, loader) - - /** - * Creates a new TypedActor proxy using the supplied Props, - * the interfaces usable by the returned proxy is the supplied implementation class' interfaces (Class.getInterfaces) - */ - def typedActorOf[R <: AnyRef, T <: R](props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[T]): R = { - val clazz = m.erasure.asInstanceOf[Class[T]] - createProxyAndTypedActor(clazz, clazz.newInstance, props, if (loader eq null) clazz.getClassLoader else loader) - } - /** * Stops the underlying ActorRef for the supplied TypedActor proxy, if any, returns whether it could stop it or not */ @@ -205,27 +231,6 @@ class TypedActor(val app: AkkaApplication) { */ def isTypedActor(proxyOrNot: AnyRef): Boolean = invocationHandlerFor(proxyOrNot) ne null - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](constructor: ⇒ Actor, props: Props = Props(), loader: ClassLoader = null)(implicit m: Manifest[R]): R = - createProxy[R](extractInterfaces(m.erasure), (ref: AtomVar[R]) ⇒ constructor, props, if (loader eq null) m.erasure.getClassLoader else loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: Creator[Actor], props: Props, loader: ClassLoader): R = - createProxy(interfaces, (ref: AtomVar[R]) ⇒ constructor.create, props, loader) - - /** - * Creates a proxy given the supplied Props, this is not a TypedActor, so you'll need to implement the MethodCall handling yourself, - * to create TypedActor proxies, use typedActorOf - */ - def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: ⇒ Actor, props: Props, loader: ClassLoader): R = - createProxy[R](interfaces, (ref: AtomVar[R]) ⇒ constructor, props, loader) - /* Internal API */ private[akka] def invocationHandlerFor(typedActor_? : AnyRef): TypedActorInvocationHandler = @@ -239,15 +244,15 @@ class TypedActor(val app: AkkaApplication) { } else null - private[akka] def createProxy[R <: AnyRef](interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, loader: ClassLoader): R = { + private[akka] def createProxy[R <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], constructor: (AtomVar[R]) ⇒ Actor, props: Props, loader: ClassLoader): R = { val proxyVar = new AtomVar[R] - configureAndProxyLocalActorRef[R](interfaces, proxyVar, props.withCreator(constructor(proxyVar)), loader) + configureAndProxyLocalActorRef[R](supervisor, interfaces, proxyVar, props.withCreator(constructor(proxyVar)), loader) } - private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](interface: Class[_], constructor: ⇒ T, props: Props, loader: ClassLoader): R = - createProxy[R](extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, loader) + private[akka] def createProxyAndTypedActor[R <: AnyRef, T <: R](supervisor: ActorRefFactory, interface: Class[_], constructor: ⇒ T, props: Props, loader: ClassLoader): R = + createProxy[R](supervisor, extractInterfaces(interface), (ref: AtomVar[R]) ⇒ new TypedActor[R, T](ref, constructor), props, loader) - private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, loader: ClassLoader): T = { + private[akka] def configureAndProxyLocalActorRef[T <: AnyRef](supervisor: ActorRefFactory, interfaces: Array[Class[_]], proxyVar: AtomVar[T], props: Props, loader: ClassLoader): T = { //Warning, do not change order of the following statements, it's some elaborate chicken-n-egg handling val actorVar = new AtomVar[ActorRef](null) val timeout = props.timeout match { @@ -256,7 +261,7 @@ class TypedActor(val app: AkkaApplication) { } val proxy: T = Proxy.newProxyInstance(loader, interfaces, new TypedActorInvocationHandler(actorVar)(timeout)).asInstanceOf[T] proxyVar.set(proxy) // Chicken and egg situation we needed to solve, set the proxy so that we can set the self-reference inside each receive - val ref = app.actorOf(props) + val ref = supervisor.actorOf(props) actorVar.set(ref) //Make sure the InvocationHandler gets ahold of the actor reference, this is not a problem since the proxy hasn't escaped this method yet proxyVar.get } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index 28d253d172..77ce3de642 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -362,7 +362,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable { private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = { val responses = connectionManager.connections.iterable.flatMap { actor ⇒ try { - if (actor.isShutdown) throw new ActorInitializationException("For compatability - check death first") + if (actor.isShutdown) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]]) } catch { case e: Exception ⇒ diff --git a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala index 0607ccb6a9..775f5f674e 100644 --- a/akka-actor/src/main/scala/akka/util/ListenerManagement.scala +++ b/akka-actor/src/main/scala/akka/util/ListenerManagement.scala @@ -44,8 +44,7 @@ trait ListenerManagement { def hasListeners: Boolean = !listeners.isEmpty /** - * Checks if a specific listener is registered. ActorInitializationException leads to removal of listener if that - * one isShutdown. + * Checks if a specific listener is registered. Pruned eventually when isShutdown==true in notify. */ def hasListener(listener: ActorRef): Boolean = listeners.contains(listener) @@ -62,7 +61,7 @@ trait ListenerManagement { } /** - * Execute f with each listener as argument. ActorInitializationException is not handled. + * Execute f with each listener as argument. */ protected[akka] def foreachListener(f: (ActorRef) ⇒ Unit) { val iterator = listeners.iterator diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 6d6c2d14c2..67b33d036f 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -32,6 +32,7 @@ class RemoteActorRefProvider(val app: AkkaApplication) extends ActorRefProvider import akka.dispatch.Promise private[akka] val theOneWhoWalksTheBubblesOfSpaceTime: ActorRef = new UnsupportedActorRef {} + private[akka] def terminationFuture = new DefaultPromise[AkkaApplication.ExitStatus](Timeout.never)(app.dispatcher) val local = new LocalActorRefProvider(app) val remote = new Remote(app) diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala index 26c10b4ea4..4a35d8c03f 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/MultiJvmSync.scala @@ -4,23 +4,20 @@ package akka.remote -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers -import org.scalatest.BeforeAndAfterAll - +import akka.testkit.AkkaSpec import akka.util.Duration -trait MultiJvmSync extends WordSpec with MustMatchers with BeforeAndAfterAll { +trait MultiJvmSync extends AkkaSpec { def nodes: Int - override def beforeAll() = { + override def atStartup() = { onStart() MultiJvmSync.start(getClass.getName, nodes) } def onStart() {} - override def afterAll() = { + override def atTermination() = { MultiJvmSync.end(getClass.getName, nodes) onEnd() } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index b612a7a7f8..3402429ddd 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -59,7 +59,7 @@ object TestActorRef { import ReflectiveAccess.{ createInstance, noParams, noArgs } createInstance[T](m.erasure, noParams, noArgs) match { case Right(value) ⇒ value - case Left(exception) ⇒ throw new ActorInitializationException( + case Left(exception) ⇒ throw new ActorInitializationException(null, "Could not instantiate Actor" + "\nMake sure Actor is NOT defined inside a class/trait," + "\nif so put it outside the class/trait, f.e. in a companion object," + diff --git a/config/akka.test.conf b/config/akka.test.conf index 59f9e520cf..25161e06c4 100644 --- a/config/akka.test.conf +++ b/config/akka.test.conf @@ -7,4 +7,10 @@ include "akka-reference.conf" akka { event-handlers = ["akka.testkit.TestEventListener"] event-handler-level = "ERROR" + actor { + default-dispatcher { + core-pool-size-factor = 1 + max-pool-size-factor = 4 + } + } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 6a529657cb..98d0624abb 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -394,7 +394,7 @@ object AkkaBuild extends Build { object Dependencies { import Dependency._ - val testkit = Seq(Test.scalatest) + val testkit = Seq(Test.scalatest, Test.junit) val actorTests = Seq( Test.junit, Test.scalatest, Test.multiverse, Test.commonsMath, Test.mockito,