From 0ed6a67e080a15558e187df9b065b993896d1f8a Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 29 Dec 2011 16:27:32 +0100 Subject: [PATCH] Remote DeathWatch (2): make it work MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - introduce EmptyLocalActorRef, which is returned for unsuccessful look-ups of local scope - this fixes the problem that actors—after their death—can still be looked up without losing their identity; otherwise behave like DeadLetterActorRef - adapt tests accordingly - make DeathWatchSpec reusable and build remote test from it - remove several unused imports of LocalActorRef - use LocalRef/RemoteRef in pattern matches where applicable: these are marker traits for a ref’s scope; InternalActorRef mandates a scope as per its self-type --- .../scala/akka/actor/ActorLookupSpec.scala | 59 ++++++++++++------- .../test/scala/akka/actor/ActorRefSpec.scala | 4 +- .../scala/akka/actor/DeathWatchSpec.scala | 18 ++++-- .../src/main/scala/akka/actor/ActorRef.scala | 51 ++++++++++++---- .../scala/akka/actor/ActorRefProvider.scala | 2 +- .../main/scala/akka/actor/ActorSystem.scala | 2 +- .../src/main/scala/akka/actor/Locker.scala | 4 +- .../scala/akka/dispatch/Dispatchers.scala | 1 - .../actor/mailbox/BeanstalkBasedMailbox.scala | 1 - .../actor/mailbox/MongoDurableMessage.scala | 1 - .../actor/mailbox/RedisBasedMailbox.scala | 1 - .../actor/mailbox/ZooKeeperBasedMailbox.scala | 1 - .../mailbox/ZooKeeperBasedMailboxSpec.scala | 2 +- .../src/main/scala/akka/remote/Remote.scala | 13 ++-- .../akka/remote/RemoteActorRefProvider.scala | 13 ++-- .../akka/remote/RemoteDeathWatchSpec.scala | 33 +++++++++++ 16 files changed, 147 insertions(+), 59 deletions(-) create mode 100644 akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 6ef64df1b2..c4e8c73aa0 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -43,6 +43,10 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian val root = system.asInstanceOf[ActorSystemImpl].lookupRoot + def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match { + case RelativeActorPath(elems) ⇒ system.actorFor("/").path / elems + }) + "An ActorSystem" must { "find actors by looking up their path" in { @@ -101,14 +105,18 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { system.actorFor("system/") must be === syst } - "return deadLetters for non-existing paths" in { - system.actorFor("a/b/c") must be === system.deadLetters - system.actorFor("") must be === system.deadLetters - system.actorFor("akka://all-systems/Nobody") must be === system.deadLetters - system.actorFor("akka://all-systems/user") must be === system.deadLetters - system.actorFor(system / "hallo") must be === system.deadLetters - system.actorFor(Seq()) must be === system.deadLetters - system.actorFor(Seq("a")) must be === system.deadLetters + "return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in { + def check(lookup: ActorRef, result: ActorRef) = { + lookup.getClass must be === result.getClass + lookup must be === result + } + check(system.actorFor("a/b/c"), empty("a/b/c")) + check(system.actorFor(""), system.deadLetters) + check(system.actorFor("akka://all-systems/Nobody"), system.deadLetters) + check(system.actorFor("akka://all-systems/user"), system.deadLetters) + check(system.actorFor(system / "hallo"), empty("user/hallo")) + check(system.actorFor(Seq()), system.deadLetters) + check(system.actorFor(Seq("a")), empty("a")) } "find temporary actors" in { @@ -119,13 +127,14 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { system.actorFor(a.path.toString) must be === a system.actorFor(a.path.elements) must be === a system.actorFor(a.path.toString + "/") must be === a - system.actorFor(a.path.toString + "/hallo") must be === system.deadLetters + system.actorFor(a.path.toString + "/hallo").isTerminated must be === true f.isCompleted must be === false + a.isTerminated must be === false a ! 42 f.isCompleted must be === true Await.result(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread - awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second) + awaitCond(system.actorFor(a.path).isTerminated, 1 second) } } @@ -195,21 +204,26 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { for (target ← Seq(root, syst, user, system.deadLetters)) check(target) } - "return deadLetters for non-existing paths" in { + "return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in { import scala.collection.JavaConverters._ - def checkOne(looker: ActorRef, query: Query) { - Await.result(looker ? query, timeout.duration) must be === system.deadLetters + def checkOne(looker: ActorRef, query: Query, result: ActorRef) { + val lookup = Await.result(looker ? query, timeout.duration) + lookup.getClass must be === result.getClass + lookup must be === result } def check(looker: ActorRef) { - Seq(LookupString("a/b/c"), - LookupString(""), - LookupString("akka://all-systems/Nobody"), - LookupPath(system / "hallo"), - LookupPath(looker.path child "hallo"), // test Java API - LookupPath(looker.path descendant Seq("a", "b").asJava), // test Java API - LookupElems(Seq()), - LookupElems(Seq("a"))) foreach (checkOne(looker, _)) + val lookname = looker.path.elements.mkString("", "/", "/") + for ( + (l, r) ← Seq(LookupString("a/b/c") -> empty(lookname + "a/b/c"), + LookupString("") -> system.deadLetters, + LookupString("akka://all-systems/Nobody") -> system.deadLetters, + LookupPath(system / "hallo") -> empty("user/hallo"), + LookupPath(looker.path child "hallo") -> empty(lookname + "hallo"), // test Java API + LookupPath(looker.path descendant Seq("a", "b").asJava) -> empty(lookname + "a/b"), // test Java API + LookupElems(Seq()) -> system.deadLetters, + LookupElems(Seq("a")) -> empty(lookname + "a")) + ) checkOne(looker, l, r) } for (looker ← all) check(looker) } @@ -228,11 +242,12 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a f.isCompleted must be === false + a.isTerminated must be === false a ! 42 f.isCompleted must be === true Await.result(f, timeout.duration) must be === 42 // clean-up is run as onComplete callback, i.e. dispatched on another thread - awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second) + awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration).asInstanceOf[ActorRef].isTerminated, 1 second) } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala index 83c82e443b..ceea20b2c7 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorRefSpec.scala @@ -281,7 +281,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { " Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'" } - "must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { + "must return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in { import java.io._ val baos = new ByteArrayOutputStream(8192 * 32) @@ -297,7 +297,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout { Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) { val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray)) - in.readObject must be === system.deadLetters + in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing") } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 5abf768c22..30828c1014 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -4,18 +4,28 @@ package akka.actor -import org.scalatest.BeforeAndAfterEach import akka.testkit._ import akka.util.duration._ import java.util.concurrent.atomic._ import akka.dispatch.Await @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout { - def startWatching(target: ActorRef) = system.actorOf(Props(new Actor { +class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec + +object DeathWatchSpec { + def props(target: ActorRef, testActor: ActorRef) = Props(new Actor { context.watch(target) def receive = { case x ⇒ testActor forward x } - })) + }) +} + +trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout ⇒ + + import DeathWatchSpec._ + + lazy val supervisor = system.actorOf(Props[Supervisor], "watchers") + + def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds) "The Death Watch" must { def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index c9da4d5ae7..5d59ef8ba5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -203,13 +203,26 @@ trait ScalaActorRef { ref: ActorRef ⇒ def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout) } +/** + * All ActorRefs have a scope which describes where they live. Since it is + * often necessary to distinguish between local and non-local references, this + * is the only method provided on the scope. + */ +trait ActorRefScope { + def isLocal: Boolean +} + +trait LocalRef extends ActorRefScope { + final def isLocal = true +} + /** * Internal trait for assembling all the functionality needed internally on * ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE! * * DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA! */ -private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { +private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope ⇒ def resume(): Unit def suspend(): Unit def restart(cause: Throwable): Unit @@ -225,6 +238,11 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe * exist, return Nobody. */ def getChild(name: Iterator[String]): InternalActorRef + /** + * Scope: if this ref points to an actor which resides within the same JVM, + * i.e. whose mailbox is directly reachable etc. + */ + def isLocal: Boolean } private[akka] case object Nobody extends MinimalActorRef { @@ -242,7 +260,7 @@ private[akka] class LocalActorRef private[akka] ( val systemService: Boolean = false, _receiveTimeout: Option[Duration] = None, _hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap) - extends InternalActorRef { + extends InternalActorRef with LocalRef { /* * actorCell.start() publishes actorCell & this to the dispatcher, which @@ -354,7 +372,7 @@ private[akka] class LocalActorRef private[akka] ( def restart(cause: Throwable): Unit = actorCell.restart(cause) @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path.toString) + protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) } /** @@ -377,7 +395,7 @@ case class SerializedActorRef(path: String) { /** * Trait for ActorRef implementations where all methods contain default stubs. */ -trait MinimalActorRef extends InternalActorRef { +trait MinimalActorRef extends InternalActorRef with LocalRef { def getParent: InternalActorRef = Nobody def getChild(names: Iterator[String]): InternalActorRef = { @@ -400,6 +418,9 @@ trait MinimalActorRef extends InternalActorRef { def sendSystemMessage(message: SystemMessage): Unit = () def restart(cause: Throwable): Unit = () + + @throws(classOf[java.io.ObjectStreamException]) + protected def writeReplace(): AnyRef = SerializedActorRef(path.toString) } object MinimalActorRef { @@ -431,8 +452,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { _path } - private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) { - _path = rootPath / "deadLetters" + private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) { + _path = path brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher) } @@ -451,7 +472,20 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef { } @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = DeadLetterActorRef.serialized + override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized +} + +/** + * This special dead letter reference has a name: it is that which is returned + * by a local look-up which is unsuccessful. + */ +class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath) + extends DeadLetterActorRef(_eventStream) { + init(_dispatcher, _path) + override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match { + case d: DeadLetter ⇒ // do NOT form endless loops + case _ ⇒ eventStream.publish(DeadLetter(message, sender, this)) + } } class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef { @@ -517,7 +551,4 @@ class AskActorRef( override def stop(): Unit = if (running.getAndSet(false)) { deathWatch.publish(Terminated(this)) } - - @throws(classOf[java.io.ObjectStreamException]) - private def writeReplace(): AnyRef = SerializedActorRef(path.toString) } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 115fca87d5..48f8f627c5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -467,7 +467,7 @@ class LocalActorRefProvider( def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = if (path.isEmpty) deadLetters else ref.getChild(path.iterator) match { - case Nobody ⇒ deadLetters + case Nobody ⇒ new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path.filterNot(_.isEmpty)) case x ⇒ x } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 8c9a3bd5b9..721286e41a 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -423,7 +423,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor private lazy val _start: this.type = { // the provider is expected to start default loggers, LocalActorRefProvider does this provider.init(this) - deadLetters.init(dispatcher, provider.rootPath) + deadLetters.init(dispatcher, lookupRoot.path / "deadLetters") // this starts the reaper actor and the user-configured logging subscribers, which are also actors registerOnTermination(stopScheduler()) _locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch) diff --git a/akka-actor/src/main/scala/akka/actor/Locker.scala b/akka-actor/src/main/scala/akka/actor/Locker.scala index 8bbcdd15e6..d4fd1badd5 100644 --- a/akka-actor/src/main/scala/akka/actor/Locker.scala +++ b/akka-actor/src/main/scala/akka/actor/Locker.scala @@ -18,8 +18,8 @@ class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val de val soul = iter.next() deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere soul.getKey match { - case _: LocalActorRef ⇒ // nothing to do, they know what they signed up for - case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure + case _: LocalRef ⇒ // nothing to do, they know what they signed up for + case nonlocal ⇒ nonlocal.stop() // try again in case it was due to a communications failure } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 5abdd32438..db200d09d6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -6,7 +6,6 @@ package akka.dispatch import java.util.concurrent.TimeUnit import java.util.concurrent.ConcurrentHashMap -import akka.actor.LocalActorRef import akka.actor.newUuid import akka.util.{ Duration, ReflectiveAccess } import akka.actor.ActorSystem diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index cd9186388e..42bece7d6a 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -6,7 +6,6 @@ package akka.actor.mailbox import com.surftools.BeanstalkClient._ import com.surftools.BeanstalkClientImpl._ import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import akka.actor.ActorContext diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala index af82322276..264e1904b6 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoDurableMessage.scala @@ -10,7 +10,6 @@ import org.bson.io.OutputBuffer import org.bson.types.ObjectId import java.io.InputStream import org.bson.collection._ -import akka.actor.LocalActorRef import akka.actor.ActorRef import akka.dispatch.Envelope diff --git a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala index 6d0f173bbf..8c7587ec00 100644 --- a/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-redis-mailbox/src/main/scala/akka/actor/mailbox/RedisBasedMailbox.scala @@ -4,7 +4,6 @@ package akka.actor.mailbox import com.redis._ -import akka.actor.LocalActorRef import akka.AkkaException import akka.actor.ActorContext import akka.dispatch.Envelope diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 4309da402a..117acac383 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -4,7 +4,6 @@ package akka.actor.mailbox import java.util.concurrent.TimeUnit.MILLISECONDS -import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException import org.I0Itec.zkclient.serialize._ diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala index e5af760f40..4febbafe6f 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/test/scala/akka/actor/mailbox/ZooKeeperBasedMailboxSpec.scala @@ -1,6 +1,6 @@ package akka.actor.mailbox -import akka.actor.{ Actor, LocalActorRef } +import akka.actor.Actor import akka.cluster.zookeeper._ import org.I0Itec.zkclient._ import akka.dispatch.MessageDispatcher diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 91a4ceb03f..2a1b7b999c 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -117,7 +117,6 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti sealed trait DaemonMsg case class DaemonMsgCreate(factory: () ⇒ Actor, path: String, supervisor: ActorRef) extends DaemonMsg case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg -case class DaemonMsgTerminated(deceased: ActorRef) extends DaemonMsg /** * Internal system "daemon" actor for remote internal communication. @@ -177,13 +176,13 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa case DaemonMsgWatch(watcher, watched) ⇒ val other = system.actorFor(watcher.path.root / "remote") system.deathWatch.subscribe(other, watched) - case DaemonMsgTerminated(deceased) ⇒ - system.deathWatch.publish(Terminated(deceased)) } - case Terminated(child) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) + case Terminated(child: LocalActorRef) ⇒ removeChild(child.path.elements.drop(1).mkString("/")) - case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) + case t: Terminated ⇒ system.deathWatch.publish(t) + + case unknown ⇒ log.warning("Unknown message {} received by {}", unknown, this) } } @@ -253,13 +252,13 @@ trait RemoteMarshallingOps { remoteMessage.recipient match { case `remoteDaemon` ⇒ remoteMessage.payload match { - case m: DaemonMsg ⇒ + case m @ (_: DaemonMsg | _: Terminated) ⇒ try remoteDaemon ! m catch { case e: Exception ⇒ log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender) } case x ⇒ log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender) } - case l @ (_: LocalActorRef | _: MinimalActorRef) ⇒ + case l: LocalRef ⇒ remoteMessage.payload match { case msg: SystemMessage ⇒ if (useUntrustedMode) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 949bdf8436..89ae932cbf 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -31,7 +31,6 @@ class RemoteActorRefProvider( val remoteSettings = new RemoteSettings(settings.config, systemName) - def deathWatch = local.deathWatch def rootGuardian = local.rootGuardian def guardian = local.guardian def systemGuardian = local.systemGuardian @@ -49,6 +48,8 @@ class RemoteActorRefProvider( private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer) + val deathWatch = new RemoteDeathWatch(local.deathWatch, this) + def init(system: ActorSystemImpl) { local.init(system) remote.init(system, this) @@ -150,6 +151,10 @@ class RemoteActorRefProvider( } } +trait RemoteRef extends ActorRefScope { + final def isLocal = false +} + /** * Remote ActorRef that is used when referencing the Actor on a different node than its "home" node. * This reference is network-aware (remembers its origin) and immutable. @@ -160,7 +165,7 @@ private[akka] class RemoteActorRef private[akka] ( val path: ActorPath, val getParent: InternalActorRef, loader: Option[ClassLoader]) - extends InternalActorRef { + extends InternalActorRef with RemoteRef { def getChild(name: Iterator[String]): InternalActorRef = { val s = name.toStream @@ -206,11 +211,11 @@ private[akka] class RemoteActorRef private[akka] ( class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch { def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match { - case r: RemoteActorRef ⇒ + case r: RemoteRef ⇒ val ret = local.subscribe(watcher, watched) provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched) ret - case l: LocalActorRef ⇒ + case l: LocalRef ⇒ local.subscribe(watcher, watched) case _ ⇒ provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass) diff --git a/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala new file mode 100644 index 0000000000..b51720aa01 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/RemoteDeathWatchSpec.scala @@ -0,0 +1,33 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ +package akka.remote + +import akka.testkit._ +import akka.actor.{ ActorSystem, DeathWatchSpec } +import com.typesafe.config.ConfigFactory + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString(""" +akka { + actor { + provider = "akka.remote.RemoteActorRefProvider" + deployment { + /watchers.remote = "akka://other@127.0.0.1:2666" + } + } + cluster.nodename = buh + remote.server { + hostname = "127.0.0.1" + port = 2665 + } +} +""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec { + + val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.server.port=2666").withFallback(system.settings.config)) + + override def atTermination() { + other.shutdown() + } + +}