Remote DeathWatch (2): make it work

- introduce EmptyLocalActorRef, which is returned for unsuccessful
  look-ups of local scope
- this fixes the problem that actors—after their death—can still be
  looked up without losing their identity; otherwise behave like
  DeadLetterActorRef
- adapt tests accordingly
- make DeathWatchSpec reusable and build remote test from it
- remove several unused imports of LocalActorRef
- use LocalRef/RemoteRef in pattern matches where applicable: these are
  marker traits for a ref’s scope; InternalActorRef mandates a scope as
  per its self-type
This commit is contained in:
Roland 2011-12-29 16:27:32 +01:00
parent b15f4a2f1a
commit 0ed6a67e08
16 changed files with 147 additions and 59 deletions

View file

@ -43,6 +43,10 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
val syst = system.asInstanceOf[ActorSystemImpl].systemGuardian
val root = system.asInstanceOf[ActorSystemImpl].lookupRoot
def empty(path: String) = new EmptyLocalActorRef(system.eventStream, system.dispatcher, path match {
case RelativeActorPath(elems) system.actorFor("/").path / elems
})
"An ActorSystem" must {
"find actors by looking up their path" in {
@ -101,14 +105,18 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
system.actorFor("system/") must be === syst
}
"return deadLetters for non-existing paths" in {
system.actorFor("a/b/c") must be === system.deadLetters
system.actorFor("") must be === system.deadLetters
system.actorFor("akka://all-systems/Nobody") must be === system.deadLetters
system.actorFor("akka://all-systems/user") must be === system.deadLetters
system.actorFor(system / "hallo") must be === system.deadLetters
system.actorFor(Seq()) must be === system.deadLetters
system.actorFor(Seq("a")) must be === system.deadLetters
"return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in {
def check(lookup: ActorRef, result: ActorRef) = {
lookup.getClass must be === result.getClass
lookup must be === result
}
check(system.actorFor("a/b/c"), empty("a/b/c"))
check(system.actorFor(""), system.deadLetters)
check(system.actorFor("akka://all-systems/Nobody"), system.deadLetters)
check(system.actorFor("akka://all-systems/user"), system.deadLetters)
check(system.actorFor(system / "hallo"), empty("user/hallo"))
check(system.actorFor(Seq()), system.deadLetters)
check(system.actorFor(Seq("a")), empty("a"))
}
"find temporary actors" in {
@ -119,13 +127,14 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
system.actorFor(a.path.toString) must be === a
system.actorFor(a.path.elements) must be === a
system.actorFor(a.path.toString + "/") must be === a
system.actorFor(a.path.toString + "/hallo") must be === system.deadLetters
system.actorFor(a.path.toString + "/hallo").isTerminated must be === true
f.isCompleted must be === false
a.isTerminated must be === false
a ! 42
f.isCompleted must be === true
Await.result(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(system.actorFor(a.path) == system.deadLetters, 1 second)
awaitCond(system.actorFor(a.path).isTerminated, 1 second)
}
}
@ -195,21 +204,26 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
for (target Seq(root, syst, user, system.deadLetters)) check(target)
}
"return deadLetters for non-existing paths" in {
"return deadLetters or EmptyLocalActorRef, respectively, for non-existing paths" in {
import scala.collection.JavaConverters._
def checkOne(looker: ActorRef, query: Query) {
Await.result(looker ? query, timeout.duration) must be === system.deadLetters
def checkOne(looker: ActorRef, query: Query, result: ActorRef) {
val lookup = Await.result(looker ? query, timeout.duration)
lookup.getClass must be === result.getClass
lookup must be === result
}
def check(looker: ActorRef) {
Seq(LookupString("a/b/c"),
LookupString(""),
LookupString("akka://all-systems/Nobody"),
LookupPath(system / "hallo"),
LookupPath(looker.path child "hallo"), // test Java API
LookupPath(looker.path descendant Seq("a", "b").asJava), // test Java API
LookupElems(Seq()),
LookupElems(Seq("a"))) foreach (checkOne(looker, _))
val lookname = looker.path.elements.mkString("", "/", "/")
for (
(l, r) Seq(LookupString("a/b/c") -> empty(lookname + "a/b/c"),
LookupString("") -> system.deadLetters,
LookupString("akka://all-systems/Nobody") -> system.deadLetters,
LookupPath(system / "hallo") -> empty("user/hallo"),
LookupPath(looker.path child "hallo") -> empty(lookname + "hallo"), // test Java API
LookupPath(looker.path descendant Seq("a", "b").asJava) -> empty(lookname + "a/b"), // test Java API
LookupElems(Seq()) -> system.deadLetters,
LookupElems(Seq("a")) -> empty(lookname + "a"))
) checkOne(looker, l, r)
}
for (looker all) check(looker)
}
@ -228,11 +242,12 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements), timeout.duration) must be === a
Await.result(c2 ? LookupElems(Seq("..", "..") ++ a.path.elements :+ ""), timeout.duration) must be === a
f.isCompleted must be === false
a.isTerminated must be === false
a ! 42
f.isCompleted must be === true
Await.result(f, timeout.duration) must be === 42
// clean-up is run as onComplete callback, i.e. dispatched on another thread
awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration) == system.deadLetters, 1 second)
awaitCond(Await.result(c2 ? LookupPath(a.path), timeout.duration).asInstanceOf[ActorRef].isTerminated, 1 second)
}
}

View file

@ -281,7 +281,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
" Use 'akka.serialization.Serialization.currentSystem.withValue(system) { ... }'"
}
"must return deadLetters on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
"must return EmptyLocalActorRef on deserialize if not present in actor hierarchy (and remoting is not enabled)" in {
import java.io._
val baos = new ByteArrayOutputStream(8192 * 32)
@ -297,7 +297,7 @@ class ActorRefSpec extends AkkaSpec with DefaultTimeout {
Serialization.currentSystem.withValue(system.asInstanceOf[ActorSystemImpl]) {
val in = new ObjectInputStream(new ByteArrayInputStream(baos.toByteArray))
in.readObject must be === system.deadLetters
in.readObject must be === new EmptyLocalActorRef(system.eventStream, system.dispatcher, system.actorFor("/").path / "non-existing")
}
}

View file

@ -4,18 +4,28 @@
package akka.actor
import org.scalatest.BeforeAndAfterEach
import akka.testkit._
import akka.util.duration._
import java.util.concurrent.atomic._
import akka.dispatch.Await
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class DeathWatchSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSender with DefaultTimeout {
def startWatching(target: ActorRef) = system.actorOf(Props(new Actor {
class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec
object DeathWatchSpec {
def props(target: ActorRef, testActor: ActorRef) = Props(new Actor {
context.watch(target)
def receive = { case x testActor forward x }
}))
})
}
trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
import DeathWatchSpec._
lazy val supervisor = system.actorOf(Props[Supervisor], "watchers")
def startWatching(target: ActorRef) = Await.result((supervisor ? props(target, testActor)).mapTo[ActorRef], 3 seconds)
"The Death Watch" must {
def expectTerminationOf(actorRef: ActorRef) = expectMsgPF(5 seconds, actorRef + ": Stopped or Already terminated when linking") {

View file

@ -203,13 +203,26 @@ trait ScalaActorRef { ref: ActorRef ⇒
def ?(message: Any, timeout: Timeout)(implicit ignore: Int = 0): Future[Any] = ?(message)(timeout)
}
/**
* All ActorRefs have a scope which describes where they live. Since it is
* often necessary to distinguish between local and non-local references, this
* is the only method provided on the scope.
*/
trait ActorRefScope {
def isLocal: Boolean
}
trait LocalRef extends ActorRefScope {
final def isLocal = true
}
/**
* Internal trait for assembling all the functionality needed internally on
* ActorRefs. NOTE THAT THIS IS NOT A STABLE EXTERNAL INTERFACE!
*
* DO NOT USE THIS UNLESS INTERNALLY WITHIN AKKA!
*/
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef {
private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRef { this: ActorRefScope
def resume(): Unit
def suspend(): Unit
def restart(cause: Throwable): Unit
@ -225,6 +238,11 @@ private[akka] abstract class InternalActorRef extends ActorRef with ScalaActorRe
* exist, return Nobody.
*/
def getChild(name: Iterator[String]): InternalActorRef
/**
* Scope: if this ref points to an actor which resides within the same JVM,
* i.e. whose mailbox is directly reachable etc.
*/
def isLocal: Boolean
}
private[akka] case object Nobody extends MinimalActorRef {
@ -242,7 +260,7 @@ private[akka] class LocalActorRef private[akka] (
val systemService: Boolean = false,
_receiveTimeout: Option[Duration] = None,
_hotswap: Stack[PartialFunction[Any, Unit]] = Props.noHotSwap)
extends InternalActorRef {
extends InternalActorRef with LocalRef {
/*
* actorCell.start() publishes actorCell & this to the dispatcher, which
@ -354,7 +372,7 @@ private[akka] class LocalActorRef private[akka] (
def restart(cause: Throwable): Unit = actorCell.restart(cause)
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}
/**
@ -377,7 +395,7 @@ case class SerializedActorRef(path: String) {
/**
* Trait for ActorRef implementations where all methods contain default stubs.
*/
trait MinimalActorRef extends InternalActorRef {
trait MinimalActorRef extends InternalActorRef with LocalRef {
def getParent: InternalActorRef = Nobody
def getChild(names: Iterator[String]): InternalActorRef = {
@ -400,6 +418,9 @@ trait MinimalActorRef extends InternalActorRef {
def sendSystemMessage(message: SystemMessage): Unit = ()
def restart(cause: Throwable): Unit = ()
@throws(classOf[java.io.ObjectStreamException])
protected def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}
object MinimalActorRef {
@ -431,8 +452,8 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
_path
}
private[akka] def init(dispatcher: MessageDispatcher, rootPath: ActorPath) {
_path = rootPath / "deadLetters"
private[akka] def init(dispatcher: MessageDispatcher, path: ActorPath) {
_path = path
brokenPromise = Promise.failed(new ActorKilledException("In DeadLetterActorRef - promises are always broken."))(dispatcher)
}
@ -451,7 +472,20 @@ class DeadLetterActorRef(val eventStream: EventStream) extends MinimalActorRef {
}
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = DeadLetterActorRef.serialized
override protected def writeReplace(): AnyRef = DeadLetterActorRef.serialized
}
/**
* This special dead letter reference has a name: it is that which is returned
* by a local look-up which is unsuccessful.
*/
class EmptyLocalActorRef(_eventStream: EventStream, _dispatcher: MessageDispatcher, _path: ActorPath)
extends DeadLetterActorRef(_eventStream) {
init(_dispatcher, _path)
override def !(message: Any)(implicit sender: ActorRef = null): Unit = message match {
case d: DeadLetter // do NOT form endless loops
case _ eventStream.publish(DeadLetter(message, sender, this))
}
}
class VirtualPathContainer(val path: ActorPath, override val getParent: InternalActorRef, val log: LoggingAdapter) extends MinimalActorRef {
@ -517,7 +551,4 @@ class AskActorRef(
override def stop(): Unit = if (running.getAndSet(false)) {
deathWatch.publish(Terminated(this))
}
@throws(classOf[java.io.ObjectStreamException])
private def writeReplace(): AnyRef = SerializedActorRef(path.toString)
}

View file

@ -467,7 +467,7 @@ class LocalActorRefProvider(
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef =
if (path.isEmpty) deadLetters
else ref.getChild(path.iterator) match {
case Nobody deadLetters
case Nobody new EmptyLocalActorRef(eventStream, dispatcher, ref.path / path.filterNot(_.isEmpty))
case x x
}

View file

@ -423,7 +423,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
private lazy val _start: this.type = {
// the provider is expected to start default loggers, LocalActorRefProvider does this
provider.init(this)
deadLetters.init(dispatcher, provider.rootPath)
deadLetters.init(dispatcher, lookupRoot.path / "deadLetters")
// this starts the reaper actor and the user-configured logging subscribers, which are also actors
registerOnTermination(stopScheduler())
_locker = new Locker(scheduler, ReaperInterval, lookupRoot.path / "locker", deathWatch)

View file

@ -18,8 +18,8 @@ class Locker(scheduler: Scheduler, period: Duration, val path: ActorPath, val de
val soul = iter.next()
deathWatch.subscribe(Locker.this, soul.getKey) // in case Terminated got lost somewhere
soul.getKey match {
case _: LocalActorRef // nothing to do, they know what they signed up for
case nonlocal nonlocal.stop() // try again in case it was due to a communications failure
case _: LocalRef // nothing to do, they know what they signed up for
case nonlocal nonlocal.stop() // try again in case it was due to a communications failure
}
}
}

View file

@ -6,7 +6,6 @@ package akka.dispatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.ConcurrentHashMap
import akka.actor.LocalActorRef
import akka.actor.newUuid
import akka.util.{ Duration, ReflectiveAccess }
import akka.actor.ActorSystem

View file

@ -6,7 +6,6 @@ package akka.actor.mailbox
import com.surftools.BeanstalkClient._
import com.surftools.BeanstalkClientImpl._
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import akka.actor.ActorContext

View file

@ -10,7 +10,6 @@ import org.bson.io.OutputBuffer
import org.bson.types.ObjectId
import java.io.InputStream
import org.bson.collection._
import akka.actor.LocalActorRef
import akka.actor.ActorRef
import akka.dispatch.Envelope

View file

@ -4,7 +4,6 @@
package akka.actor.mailbox
import com.redis._
import akka.actor.LocalActorRef
import akka.AkkaException
import akka.actor.ActorContext
import akka.dispatch.Envelope

View file

@ -4,7 +4,6 @@
package akka.actor.mailbox
import java.util.concurrent.TimeUnit.MILLISECONDS
import akka.actor.LocalActorRef
import akka.util.Duration
import akka.AkkaException
import org.I0Itec.zkclient.serialize._

View file

@ -1,6 +1,6 @@
package akka.actor.mailbox
import akka.actor.{ Actor, LocalActorRef }
import akka.actor.Actor
import akka.cluster.zookeeper._
import org.I0Itec.zkclient._
import akka.dispatch.MessageDispatcher

View file

@ -117,7 +117,6 @@ class Remote(val settings: ActorSystem.Settings, val remoteSettings: RemoteSetti
sealed trait DaemonMsg
case class DaemonMsgCreate(factory: () Actor, path: String, supervisor: ActorRef) extends DaemonMsg
case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMsg
case class DaemonMsgTerminated(deceased: ActorRef) extends DaemonMsg
/**
* Internal system "daemon" actor for remote internal communication.
@ -177,13 +176,13 @@ class RemoteSystemDaemon(system: ActorSystemImpl, remote: Remote, _path: ActorPa
case DaemonMsgWatch(watcher, watched)
val other = system.actorFor(watcher.path.root / "remote")
system.deathWatch.subscribe(other, watched)
case DaemonMsgTerminated(deceased)
system.deathWatch.publish(Terminated(deceased))
}
case Terminated(child) removeChild(child.path.elements.drop(1).mkString("/"))
case Terminated(child: LocalActorRef) removeChild(child.path.elements.drop(1).mkString("/"))
case unknown log.warning("Unknown message {} received by {}", unknown, this)
case t: Terminated system.deathWatch.publish(t)
case unknown log.warning("Unknown message {} received by {}", unknown, this)
}
}
@ -253,13 +252,13 @@ trait RemoteMarshallingOps {
remoteMessage.recipient match {
case `remoteDaemon`
remoteMessage.payload match {
case m: DaemonMsg
case m @ (_: DaemonMsg | _: Terminated)
try remoteDaemon ! m catch {
case e: Exception log.error(e, "exception while processing remote command {} from {}", m, remoteMessage.sender)
}
case x log.warning("remoteDaemon received illegal message {} from {}", x, remoteMessage.sender)
}
case l @ (_: LocalActorRef | _: MinimalActorRef)
case l: LocalRef
remoteMessage.payload match {
case msg: SystemMessage
if (useUntrustedMode)

View file

@ -31,7 +31,6 @@ class RemoteActorRefProvider(
val remoteSettings = new RemoteSettings(settings.config, systemName)
def deathWatch = local.deathWatch
def rootGuardian = local.rootGuardian
def guardian = local.guardian
def systemGuardian = local.systemGuardian
@ -49,6 +48,8 @@ class RemoteActorRefProvider(
private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, _deadLetters, rootPath, deployer)
val deathWatch = new RemoteDeathWatch(local.deathWatch, this)
def init(system: ActorSystemImpl) {
local.init(system)
remote.init(system, this)
@ -150,6 +151,10 @@ class RemoteActorRefProvider(
}
}
trait RemoteRef extends ActorRefScope {
final def isLocal = false
}
/**
* Remote ActorRef that is used when referencing the Actor on a different node than its "home" node.
* This reference is network-aware (remembers its origin) and immutable.
@ -160,7 +165,7 @@ private[akka] class RemoteActorRef private[akka] (
val path: ActorPath,
val getParent: InternalActorRef,
loader: Option[ClassLoader])
extends InternalActorRef {
extends InternalActorRef with RemoteRef {
def getChild(name: Iterator[String]): InternalActorRef = {
val s = name.toStream
@ -206,11 +211,11 @@ private[akka] class RemoteActorRef private[akka] (
class RemoteDeathWatch(val local: LocalDeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch {
def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match {
case r: RemoteActorRef
case r: RemoteRef
val ret = local.subscribe(watcher, watched)
provider.actorFor(r.path.root / "remote") ! DaemonMsgWatch(watcher, watched)
ret
case l: LocalActorRef
case l: LocalRef
local.subscribe(watcher, watched)
case _
provider.log.error("unknown ActorRef type {} as DeathWatch target", watched.getClass)

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.remote
import akka.testkit._
import akka.actor.{ ActorSystem, DeathWatchSpec }
import com.typesafe.config.ConfigFactory
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class RemoteDeathWatchSpec extends AkkaSpec(ConfigFactory.parseString("""
akka {
actor {
provider = "akka.remote.RemoteActorRefProvider"
deployment {
/watchers.remote = "akka://other@127.0.0.1:2666"
}
}
cluster.nodename = buh
remote.server {
hostname = "127.0.0.1"
port = 2665
}
}
""")) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
val other = ActorSystem("other", ConfigFactory.parseString("akka.remote.server.port=2666").withFallback(system.settings.config))
override def atTermination() {
other.shutdown()
}
}