diff --git a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala index 031fb1ccb3..2a935d14cc 100644 --- a/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/event/EventStreamSpec.scala @@ -51,7 +51,6 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { "manage subscriptions" in { val bus = new EventStream(true) - bus.start(impl) bus.subscribe(testActor, classOf[M]) bus.publish(M(42)) within(1 second) { @@ -64,7 +63,6 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { "manage log levels" in { val bus = new EventStream(false) - bus.start(impl) bus.startDefaultLoggers(impl) bus.publish(SetTarget(testActor)) expectMsg("OK") @@ -86,7 +84,6 @@ class EventStreamSpec extends AkkaSpec(EventStreamSpec.config) { val b2 = new B2 val c = new C val bus = new EventStream(false) - bus.start(impl) within(2 seconds) { bus.subscribe(testActor, classOf[B2]) === true bus.publish(c) diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index bfb8f4c9cf..01500a0301 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -168,14 +168,6 @@ private[akka] class ActorCell( final def children: Iterable[ActorRef] = childrenRefs.values.view.map(_.child) - final def getChild(name: String): ActorRef = - if (isTerminated) null - else { - val c = childrenRefs - if (c contains name) c(name).child - else null - } - final def tell(message: Any, sender: ActorRef): Unit = dispatcher.dispatch(this, Envelope(message, if (sender eq null) system.deadLetters else sender)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 8db16d83d3..1f7b725062 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -365,6 +365,7 @@ class LocalActorRefProvider( def receive = { case Terminated(_) ⇒ context.self.stop() case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) } } @@ -374,6 +375,7 @@ class LocalActorRefProvider( eventStream.stopDefaultLoggers() context.self.stop() case CreateChild(child, name) ⇒ sender ! (try context.actorOf(child, name) catch { case e: Exception ⇒ e }) + case m ⇒ deadLetters ! DeadLetter(m, sender, self) } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index c7c29d0bdc..f1195d19c8 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -385,7 +385,6 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor provider.init(this) deadLetters.init(dispatcher, provider.rootPath) // this starts the reaper actor and the user-configured logging subscribers, which are also actors - eventStream.start(this) eventStream.startDefaultLoggers(this) registerOnTermination(stopScheduler()) loadExtensions() diff --git a/akka-actor/src/main/scala/akka/event/EventStream.scala b/akka-actor/src/main/scala/akka/event/EventStream.scala index 647fe2336c..6799fc7ae8 100644 --- a/akka-actor/src/main/scala/akka/event/EventStream.scala +++ b/akka-actor/src/main/scala/akka/event/EventStream.scala @@ -25,41 +25,27 @@ class EventStream(debug: Boolean = false) extends LoggingBus with SubchannelClas def isSubclass(x: Class[_], y: Class[_]) = y isAssignableFrom x } - @volatile - private var reaper: ActorRef = _ - protected def classify(event: AnyRef): Class[_] = event.getClass - protected def publish(event: AnyRef, subscriber: ActorRef) = subscriber ! event + protected def publish(event: AnyRef, subscriber: ActorRef) = { + if (subscriber.isTerminated) unsubscribe(subscriber) + else subscriber ! event + } override def subscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { if (debug) publish(Logging.Debug(simpleName(this), "subscribing " + subscriber + " to channel " + channel)) - if (reaper ne null) reaper ! subscriber super.subscribe(subscriber, channel) } override def unsubscribe(subscriber: ActorRef, channel: Class[_]): Boolean = { + val ret = super.unsubscribe(subscriber, channel) if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from channel " + channel)) - super.unsubscribe(subscriber, channel) + ret } override def unsubscribe(subscriber: ActorRef) { - if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) super.unsubscribe(subscriber) - } - - def start(system: ActorSystemImpl) { - reaper = system.systemActorOf(Props(new Actor { - def receive = { - case ref: ActorRef ⇒ watch(ref) - case Terminated(ref) ⇒ unsubscribe(ref) - } - }), "MainBusReaper-" + EventStream.generation.incrementAndGet()) - subscribers foreach (reaper ! _) - } - - def stop() { - reaper.stop() + if (debug) publish(Logging.Debug(simpleName(this), "unsubscribing " + subscriber + " from all channels")) } } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 40b1c09864..0e141cccc4 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -176,7 +176,7 @@ class RemoteActorRefProvider( * Copied from LocalActorRefProvider... */ // FIXME: implement supervision, ticket #1408 - def actorOf(system: ActorSystem, props: RoutedProps, supervisor: ActorRef, name: String): InternalActorRef = { + def actorOf(system: ActorSystem, props: RoutedProps, supervisor: InternalActorRef, name: String): InternalActorRef = { if (props.connectionManager.isEmpty) throw new ConfigurationException("RoutedProps used for creating actor [" + name + "] has zero connections configured; can't create a router") new RoutedActorRef(system, props, supervisor, name) } diff --git a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala index 8e94bf10b2..be5320ee13 100644 --- a/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/AkkaSpec.scala @@ -13,6 +13,7 @@ import akka.util.duration._ import akka.dispatch.FutureTimeoutException import com.typesafe.config.Config import com.typesafe.config.ConfigFactory +import akka.actor.PoisonPill object TimingTest extends Tag("timing") @@ -89,7 +90,9 @@ abstract class AkkaSpec(_system: ActorSystem) @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class AkkaSpecSpec extends WordSpec with MustMatchers { + "An AkkaSpec" must { + "terminate all actors" in { import scala.collection.JavaConverters._ val conf = Map( @@ -103,6 +106,22 @@ class AkkaSpecSpec extends WordSpec with MustMatchers { system.stop() spec.awaitCond(spec.ref forall (_.isTerminated), 2 seconds) } + + "must stop correctly when sending PoisonPill to rootGuardian" in { + import scala.collection.JavaConverters._ + val conf = Map( + "akka.actor.debug.lifecycle" -> true, "akka.actor.debug.event-stream" -> true, + "akka.loglevel" -> "DEBUG", "akka.stdout-loglevel" -> "DEBUG") + val system = ActorSystem("test", ConfigFactory.parseMap(conf.asJava).withFallback(AkkaSpec.testConf)) + val spec = new AkkaSpec(system) {} + val latch = new TestLatch(1)(system) + system.registerOnTermination(latch.countDown()) + + system.actorFor("/") ! PoisonPill + + latch.await(2 seconds) + } + } } diff --git a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java index 5a80699ade..fb8457855d 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java +++ b/akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java @@ -6,6 +6,7 @@ package akka.tutorial.first.java; import akka.actor.ActorRef; import akka.actor.ActorSystem; +import akka.actor.InternalActorRef; import akka.actor.UntypedActor; import akka.actor.UntypedActorFactory; import akka.japi.Creator; @@ -115,7 +116,8 @@ public class Pi { } }; RoutedProps props = new RoutedProps(routerCreator, new LocalConnectionManager(actors), new akka.actor.Timeout(-1), true); - router = new RoutedActorRef(system(), props, getSelf(), "pi"); + // FIXME REALLY this NEEDS to use getContext()! + router = new RoutedActorRef(system(), props, (InternalActorRef) getSelf(), "pi"); } // message handler diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 3283a591f4..7199d42e02 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -6,6 +6,7 @@ package akka.tutorial.first.scala import java.util.concurrent.CountDownLatch import akka.routing.{ RoutedActorRef, LocalConnectionManager, RoundRobinRouter, RoutedProps } import akka.actor.{ ActorSystemImpl, Actor, ActorSystem } +import akka.actor.InternalActorRef object Pi extends App { @@ -55,8 +56,9 @@ object Pi extends App { val workers = Vector.fill(nrOfWorkers)(system.actorOf[Worker]) // wrap them with a load-balancing router + // FIXME REALLY this needs to use context to create the child! val props = RoutedProps(routerFactory = () ⇒ new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers)) - val router = new RoutedActorRef(system, props, self, "pi") + val router = new RoutedActorRef(system, props, self.asInstanceOf[InternalActorRef], "pi") // message handler def receive = {