Merge pull request #1244 from akka/wip-3072-uid-ref-patriknw

Add UID to RemoteActorRef, see #3072
This commit is contained in:
Patrik Nordwall 2013-03-22 01:06:05 -07:00
commit bf813d8406
37 changed files with 607 additions and 186 deletions

View file

@ -208,14 +208,14 @@ class ActorDSLSpec extends AkkaSpec {
// here we pass in the ActorRefFactory explicitly as an example
val a = actor(system, "fred")(new Act {
val b = actor("barney")(new Act {
whenStarting { context.parent ! ("hello from " + self) }
whenStarting { context.parent ! ("hello from " + self.path) }
})
become {
case x testActor ! x
}
})
//#nested-actor
expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]")
expectMsg("hello from akka://ActorDSLSpec/user/fred/barney")
lastSender must be(a)
}

View file

@ -71,11 +71,33 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
}
"find actors by looking up their string representation" in {
// this is only true for local actor references
system.actorFor(c1.path.toString) must be === c1
system.actorFor(c2.path.toString) must be === c2
system.actorFor(c21.path.toString) must be === c21
}
"take actor incarnation into account when comparing actor references" in {
val name = "abcdefg"
val a1 = system.actorOf(p, name)
watch(a1)
a1 ! PoisonPill
expectMsgType[Terminated].actor must be === a1
// not equal because it's terminated
system.actorFor(a1.path.toString) must not be (a1)
val a2 = system.actorOf(p, name)
a2.path must be(a1.path)
a2.path.toString must be(a1.path.toString)
a2 must not be (a1)
a2.toString must not be (a1.toString)
watch(a2)
a2 ! PoisonPill
expectMsgType[Terminated].actor must be === a2
}
"find actors by looking up their root-anchored relative path" in {
system.actorFor(c1.path.elements.mkString("/", "/", "")) must be === c1
system.actorFor(c2.path.elements.mkString("/", "/", "")) must be === c2
@ -163,6 +185,9 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout {
"find actors by looking up their string representation" in {
def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) {
Await.result(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result
// with uid
Await.result(looker ? LookupString(pathOf.path.toSerializationFormat), timeout.duration) must be === result
// with trailing /
Await.result(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result
}
for {

View file

@ -0,0 +1,46 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
class ActorPathSpec extends WordSpec with MustMatchers {
"ActorPath" must {
"create correct toString" in {
val a = Address("akka.tcp", "mysys")
RootActorPath(a).toString must be("akka.tcp://mysys/")
(RootActorPath(a) / "user").toString must be("akka.tcp://mysys/user")
(RootActorPath(a) / "user" / "foo").toString must be("akka.tcp://mysys/user/foo")
(RootActorPath(a) / "user" / "foo" / "bar").toString must be("akka.tcp://mysys/user/foo/bar")
}
"create correct toStringWithAddress" in {
val local = Address("akka.tcp", "mysys")
val a = local.copy(host = Some("aaa"), port = Some(2552))
val b = a.copy(host = Some("bb"))
val c = a.copy(host = Some("cccc"))
val root = RootActorPath(local)
root.toStringWithAddress(a) must be("akka.tcp://mysys@aaa:2552/")
(root / "user").toStringWithAddress(a) must be("akka.tcp://mysys@aaa:2552/user")
(root / "user" / "foo").toStringWithAddress(a) must be("akka.tcp://mysys@aaa:2552/user/foo")
// root.toStringWithAddress(b) must be("akka.tcp://mysys@bb:2552/")
(root / "user").toStringWithAddress(b) must be("akka.tcp://mysys@bb:2552/user")
(root / "user" / "foo").toStringWithAddress(b) must be("akka.tcp://mysys@bb:2552/user/foo")
root.toStringWithAddress(c) must be("akka.tcp://mysys@cccc:2552/")
(root / "user").toStringWithAddress(c) must be("akka.tcp://mysys@cccc:2552/user")
(root / "user" / "foo").toStringWithAddress(c) must be("akka.tcp://mysys@cccc:2552/user/foo")
val rootA = RootActorPath(a)
rootA.toStringWithAddress(b) must be("akka.tcp://mysys@aaa:2552/")
(rootA / "user").toStringWithAddress(b) must be("akka.tcp://mysys@aaa:2552/user")
(rootA / "user" / "foo").toStringWithAddress(b) must be("akka.tcp://mysys@aaa:2552/user/foo")
}
}
}

View file

@ -23,5 +23,8 @@ class RelativeActorPathSpec extends WordSpec with MustMatchers {
val name = URLEncoder.encode("akka://ClusterSystem@127.0.0.1:2552", "UTF-8")
elements(name) must be(List(name))
}
"match path with uid fragment" in {
elements("foo/bar/baz#1234") must be(List("foo", "bar", "baz#1234"))
}
}
}

View file

@ -54,7 +54,7 @@ object SupervisorHierarchySpec {
}
case class Ready(ref: ActorRef)
case class Died(ref: ActorRef)
case class Died(path: ActorPath)
case object Abort
case object PingOfDeath
case object PongOfDeath
@ -112,17 +112,17 @@ object SupervisorHierarchySpec {
* upon Restart or would have to be managed by the highest supervisor (which
* is undesirable).
*/
case class HierarchyState(log: Vector[Event], kids: Map[ActorRef, Int], failConstr: Failure)
val stateCache = new ConcurrentHashMap[ActorRef, HierarchyState]()
case class HierarchyState(log: Vector[Event], kids: Map[ActorPath, Int], failConstr: Failure)
val stateCache = new ConcurrentHashMap[ActorPath, HierarchyState]()
class Hierarchy(size: Int, breadth: Int, listener: ActorRef, myLevel: Int) extends Actor {
var log = Vector.empty[Event]
stateCache.get(self) match {
stateCache.get(self.path) match {
case hs @ HierarchyState(l: Vector[Event], _, f: Failure) if f.failConstr > 0
val log = l :+ Event("Failed in constructor", identityHashCode(this))
stateCache.put(self, hs.copy(log = log, failConstr = f.copy(failConstr = f.failConstr - 1)))
stateCache.put(self.path, hs.copy(log = log, failConstr = f.copy(failConstr = f.failConstr - 1)))
throw f
case _
}
@ -149,7 +149,7 @@ object SupervisorHierarchySpec {
log :+= Event("started", identityHashCode(this))
listener ! Ready(self)
val s = size - 1 // subtract myself
val kidInfo: Map[ActorRef, Int] =
val kidInfo: Map[ActorPath, Int] =
if (s > 0) {
val kids = Random.nextInt(Math.min(breadth, s)) + 1
val sizes = s / kids
@ -158,10 +158,10 @@ object SupervisorHierarchySpec {
(1 to kids).map { (id)
val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes
val props = propsTemplate.withCreator(new Hierarchy(kidSize, breadth, listener, myLevel + 1))
(context.watch(context.actorOf(props, id.toString)), kidSize)
(context.watch(context.actorOf(props, id.toString)).path, kidSize)
}(collection.breakOut)
} else Map()
stateCache.put(self, HierarchyState(log, kidInfo, null))
stateCache.put(self.path, HierarchyState(log, kidInfo, null))
}
var preRestartCalled = false
@ -178,12 +178,12 @@ object SupervisorHierarchySpec {
context.unwatch(child)
context.stop(child)
}
stateCache.put(self, stateCache.get(self).copy(log = log))
stateCache.put(self.path, stateCache.get(self.path).copy(log = log))
if (f.failPre > 0) {
f.failPre -= 1
throw f
}
case _ stateCache.put(self, stateCache.get(self).copy(log = log))
case _ stateCache.put(self.path, stateCache.get(self.path).copy(log = log))
}
}
}
@ -217,14 +217,14 @@ object SupervisorHierarchySpec {
})
override def postRestart(cause: Throwable) {
val state = stateCache.get(self)
val state = stateCache.get(self.path)
log = state.log
log :+= Event("restarted " + suspendCount + " " + cause, identityHashCode(this))
state.kids foreach {
case (child, kidSize)
val name = child.path.name
if (context.actorFor(name).isTerminated) {
listener ! Died(child)
case (childPath, kidSize)
val name = childPath.name
if (context.child(name).isEmpty) {
listener ! Died(childPath)
val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1)).withDispatcher("hierarchy")
context.watch(context.actorOf(props, name))
}
@ -243,7 +243,7 @@ object SupervisorHierarchySpec {
if (failed || suspended) {
listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log)
} else {
stateCache.put(self, HierarchyState(log, Map(), null))
stateCache.put(self.path, HierarchyState(log, Map(), null))
}
}
@ -270,7 +270,7 @@ object SupervisorHierarchySpec {
val handler: Receive = {
case f: Failure
setFlags(f.directive)
stateCache.put(self, stateCache.get(self).copy(failConstr = f.copy()))
stateCache.put(self.path, stateCache.get(self.path).copy(failConstr = f.copy()))
throw f
case "ping" { Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" }
case Dump(0) abort("dump")
@ -281,9 +281,9 @@ object SupervisorHierarchySpec {
* (if the unwatch() came too late), so just ignore in this case.
*/
val name = ref.path.name
if (pongsToGo == 0 && context.actorFor(name).isTerminated) {
listener ! Died(ref)
val kids = stateCache.get(self).kids(ref)
if (pongsToGo == 0 && context.child(name).isEmpty) {
listener ! Died(ref.path)
val kids = stateCache.get(self.path).kids(ref.path)
val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy")
context.watch(context.actorOf(props, name))
} else {
@ -469,8 +469,8 @@ object SupervisorHierarchySpec {
case x if x > 0.03 1
case _ 2
}
private def bury(ref: ActorRef): Unit = {
val deadGuy = ref.path.elements
private def bury(path: ActorPath): Unit = {
val deadGuy = path.elements
val deadGuySize = deadGuy.size
val isChild = (other: ActorRef) other.path.elements.take(deadGuySize) == deadGuy
idleChildren = idleChildren filterNot isChild
@ -499,8 +499,8 @@ object SupervisorHierarchySpec {
else context.system.scheduler.scheduleOnce(workSchedule, self, Work)(context.dispatcher)
stay using (x - 1)
case Event(Work, _) if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing)
case Event(Died(ref), _)
bury(ref)
case Event(Died(path), _)
bury(path)
stay
case Event("pong", _)
pingChildren -= sender
@ -631,7 +631,7 @@ object SupervisorHierarchySpec {
case l: LocalActorRef
l.underlying.actor match {
case h: Hierarchy errors :+= target -> ErrorLog("forced", h.log)
case _ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log)
case _ errors :+= target -> ErrorLog("fetched", stateCache.get(target.path).log)
}
if (depth > 0) {
l.underlying.children foreach (getErrors(_, depth - 1))
@ -644,7 +644,7 @@ object SupervisorHierarchySpec {
case l: LocalActorRef
l.underlying.actor match {
case h: Hierarchy errors :+= target -> ErrorLog("forced", h.log)
case _ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log)
case _ errors :+= target -> ErrorLog("fetched", stateCache.get(target.path).log)
}
if (target != hierarchy) getErrorsUp(l.getParent)
}

View file

@ -393,10 +393,10 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende
override def postRestart(reason: Throwable): Unit = testActor ! "parent restarted"
def receive = {
case t @ Terminated(`child`) testActor ! "child terminated"
case l: TestLatch child ! l
case "test" sender ! "green"
case "testchild" child forward "test"
case Terminated(a) if a.path == child.path testActor ! "child terminated" // FIXME case t @ Terminated(`child`) ticket #3156
case l: TestLatch child ! l
case "test" sender ! "green"
case "testchild" child forward "test"
}
}))

View file

@ -332,7 +332,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
String.valueOf(encodeHex(ser.serialize(obj, obj.getClass).get)) must be(asExpected)
"be preserved for the Create SystemMessage" in {
verify(Create(1234), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465bcdf9f7f2675038d0200014900037569647870000004d27671007e0003")
verify(Create(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465000000000000000302000078707671007e0003")
}
"be preserved for the Recreate SystemMessage" in {
verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720016616b6b612e64697370617463682e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
@ -347,7 +347,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5465726d696e61746509d66ca68318700f02000078707671007e0003")
}
"be preserved for the Supervise SystemMessage" in {
verify(Supervise(FakeActorRef("child"), true, 2468), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5375706572766973652d0b363f56ab5feb0200035a00056173796e634900037569644c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787001000009a47372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
verify(Supervise(FakeActorRef("child"), true), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e53757065727669736500000000000000030200025a00056173796e634c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b7870017372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
}
"be preserved for the ChildTerminated SystemMessage" in {
verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")

View file

@ -16,6 +16,7 @@ import akka.event.Logging.{ LogEvent, Debug, Error }
import akka.japi.Procedure
import akka.dispatch.NullMessage
import scala.concurrent.ExecutionContext
import scala.concurrent.forkjoin.ThreadLocalRandom
/**
* The actor context - the view of the actor cell from the actor.
@ -304,8 +305,26 @@ private[akka] object ActorCell {
final val emptyBehaviorStack: List[Actor.Receive] = Nil
final val emptyActorRefSet: Set[ActorRef] = immutable.TreeSet.empty
final val emptyActorRefMap: Map[ActorPath, ActorRef] = immutable.TreeMap.empty
final val terminatedProps: Props = Props(() throw new IllegalActorStateException("This Actor has been terminated"))
final val undefinedUid = 0
@tailrec final def newUid(): Int = {
// Note that this uid is also used as hashCode in ActorRef, so be careful
// to not break hashing if you change the way uid is generated
val uid = ThreadLocalRandom.current.nextInt()
if (uid == undefinedUid) newUid
else uid
}
final def splitNameAndUid(name: String): (String, Int) = {
val i = name.indexOf('#')
if (i < 0) (name, undefinedUid)
else (name.substring(0, i), Integer.valueOf(name.substring(i + 1)))
}
}
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
@ -337,7 +356,7 @@ private[akka] class ActorCell(
protected final def lookupRoot = self
final def provider = system.provider
protected var uid: Int = 0
protected def uid: Int = self.path.uid
private[this] var _actor: Actor = _
def actor: Actor = _actor
protected def actor_=(a: Actor): Unit = _actor = a
@ -361,7 +380,7 @@ private[akka] class ActorCell(
var todo = message.next
try {
message match {
case Create(uid) create(uid)
case Create() create()
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Recreate(cause)
@ -379,10 +398,10 @@ private[akka] class ActorCell(
case null faultResume(inRespToFailure)
case w: WaitingForChildren w.enqueue(message)
}
case Terminate() terminate()
case Supervise(child, async, uid) supervise(child, async, uid)
case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning
case Terminate() terminate()
case Supervise(child, async) supervise(child, async)
case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch handleNonFatalOrInterruptedException { e
handleInvokeFailure(Nil, e)
@ -473,7 +492,7 @@ private[akka] class ActorCell(
}
}
protected def create(uid: Int): Unit = {
protected def create(): Unit = {
def clearOutActorIfNonNull(): Unit = {
if (actor != null) {
clearActorFields(actor)
@ -481,7 +500,6 @@ private[akka] class ActorCell(
}
}
try {
this.uid = uid
val created = newActor()
actor = created
created.preStart()
@ -505,12 +523,11 @@ private[akka] class ActorCell(
}
}
private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit =
private def supervise(child: ActorRef, async: Boolean): Unit =
if (!isTerminating) {
// Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure()
initChild(child) match {
case Some(crs)
crs.uid = uid
handleSupervise(child, async)
if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child))
case None publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well"))

View file

@ -6,6 +6,7 @@ import scala.annotation.tailrec
import scala.collection.immutable
import akka.japi.Util.immutableSeq
import java.net.MalformedURLException
import java.lang.{ StringBuilder JStringBuilder }
object ActorPath {
/**
@ -35,6 +36,13 @@ object ActorPath {
* as possible, which owing to the bottom-up recursive nature of ActorPath
* is sorted by path elements FROM RIGHT TO LEFT, where RootActorPath >
* ChildActorPath in case the number of elements is different.
*
* Two actor paths are compared equal when they have the same name and parent
* elements, including the root address information. That does not necessarily
* mean that they point to the same incarnation of the actor if the actor is
* re-created with the same path. In other words, in contrast to how actor
* references are compared the unique id of the actor is not taken into account
* when comparing actor paths.
*/
@SerialVersionUID(1L)
sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
@ -96,6 +104,37 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable {
* information.
*/
def toStringWithAddress(address: Address): String
/**
* Generate full String representation including the
* uid for the actor cell instance as URI fragment.
* This representation should be used as serialized
* representation instead of `toString`.
*/
def toSerializationFormat: String
/**
* Generate full String representation including the uid for the actor cell
* instance as URI fragment, replacing the Address in the RootActor Path
* with the given one unless this paths address includes host and port
* information. This representation should be used as serialized
* representation instead of `toStringWithAddress`.
*/
def toSerializationFormatWithAddress(address: Address): String
/**
* INTERNAL API
* Unique identifier of the actor. Used for distinguishing
* different incarnations of actors with same path (name elements).
*/
private[akka] def uid: Int
/**
* INTERNAL API
* Creates a new ActorPath with same elements but with the specified `uid`.
*/
private[akka] def withUid(uid: Int): ActorPath
}
/**
@ -109,29 +148,55 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act
override def root: RootActorPath = this
override def /(child: String): ActorPath = new ChildActorPath(this, child)
override def /(child: String): ActorPath = {
val (childName, uid) = ActorCell.splitNameAndUid(child)
new ChildActorPath(this, childName, uid)
}
override def elements: immutable.Iterable[String] = ActorPath.emptyActorPath
override val toString: String = address + name
override val toSerializationFormat: String = toString
override def toStringWithAddress(addr: Address): String =
if (address.host.isDefined) address + name
else addr + name
override def toSerializationFormatWithAddress(addr: Address): String = toStringWithAddress(addr)
override def compareTo(other: ActorPath): Int = other match {
case r: RootActorPath toString compareTo r.toString // FIXME make this cheaper by comparing address and name in isolation
case c: ChildActorPath 1
}
/**
* INTERNAL API
*/
private[akka] def uid: Int = ActorCell.undefinedUid
/**
* INTERNAL API
*/
override private[akka] def withUid(uid: Int): ActorPath =
if (uid == ActorCell.undefinedUid) this
else throw new IllegalStateException("RootActorPath must not have uid")
}
@SerialVersionUID(1L)
final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath {
final class ChildActorPath private[akka] (val parent: ActorPath, val name: String, override private[akka] val uid: Int) extends ActorPath {
if (name.indexOf('/') != -1) throw new IllegalArgumentException("/ is a path separator and is not legal in ActorPath names: [%s]" format name)
if (name.indexOf('#') != -1) throw new IllegalArgumentException("# is a fragment separator and is not legal in ActorPath names: [%s]" format name)
def this(parent: ActorPath, name: String) = this(parent, name, ActorCell.undefinedUid)
override def address: Address = root.address
override def /(child: String): ActorPath = new ChildActorPath(this, child)
override def /(child: String): ActorPath = {
val (childName, uid) = ActorCell.splitNameAndUid(child)
new ChildActorPath(this, childName, uid)
}
override def elements: immutable.Iterable[String] = {
@tailrec
@ -151,28 +216,82 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto
rec(this)
}
// TODO research whether this should be cached somehow (might be fast enough, but creates GC pressure)
/*
* idea: add one field which holds the total length (because that is known)
* so that only one String needs to be allocated before traversal; this is
* cheaper than any cache
/**
* INTERNAL API
*/
override def toString = {
@tailrec
def rec(p: ActorPath, s: StringBuilder): StringBuilder = p match {
case r: RootActorPath s.insert(0, r.toString)
case _ rec(p.parent, s.insert(0, '/').insert(0, p.name))
}
rec(parent, new StringBuilder(32).append(name)).toString
override private[akka] def withUid(uid: Int): ActorPath =
if (uid == this.uid) this
else new ChildActorPath(parent, name, uid)
override def toString: String = {
val length = toStringLength
buildToString(new JStringBuilder(length), length, 0, _.toString).toString
}
override def toStringWithAddress(addr: Address) = {
override def toSerializationFormat: String = {
val length = toStringLength
val sb = buildToString(new JStringBuilder(length + 12), length, 0, _.toString)
appendUidFragment(sb).toString
}
private def toStringLength: Int = toStringOffset + name.length
private val toStringOffset: Int = parent match {
case r: RootActorPath r.address.toString.length + r.name.length
case c: ChildActorPath c.toStringLength + 1
}
override def toStringWithAddress(addr: Address): String = {
val diff = addressStringLengthDiff(addr)
val length = toStringLength + diff
buildToString(new JStringBuilder(length), length, diff, _.toStringWithAddress(addr)).toString
}
override def toSerializationFormatWithAddress(addr: Address): String = {
val diff = addressStringLengthDiff(addr)
val length = toStringLength + diff
val sb = buildToString(new JStringBuilder(length + 12), length, diff, _.toStringWithAddress(addr))
appendUidFragment(sb).toString
}
private def addressStringLengthDiff(addr: Address): Int = {
val r = root
if (r.address.host.isDefined) 0
else (addr.toString.length - r.address.toString.length)
}
/**
* Optimized toString construction. Used by `toString`, `toSerializationFormat`,
* and friends `WithAddress`
* @param sb builder that will be modified (and same instance is returned)
* @param length pre-calculated length of the to be constructed String, not
* necessarily same as sb.capacity because more things may be appended to the
* sb afterwards
* @param diff difference in offset for each child element, due to different address
* @param rootString function to construct the root element string
*/
private def buildToString(sb: JStringBuilder, length: Int, diff: Int, rootString: RootActorPath String): JStringBuilder = {
@tailrec
def rec(p: ActorPath, s: StringBuilder): StringBuilder = p match {
case r: RootActorPath s.insert(0, r.toStringWithAddress(addr))
case _ rec(p.parent, s.insert(0, '/').insert(0, p.name))
def rec(p: ActorPath): JStringBuilder = p match {
case r: RootActorPath
val rootStr = rootString(r)
sb.replace(0, rootStr.length, rootStr)
case c: ChildActorPath
val start = c.toStringOffset + diff
val end = start + c.name.length
sb.replace(start, end, c.name)
if (c ne this)
sb.replace(end, end + 1, "/")
rec(c.parent)
}
rec(parent, new StringBuilder(32).append(name)).toString
sb.setLength(length)
rec(this)
}
private def appendUidFragment(sb: JStringBuilder): JStringBuilder = {
if (uid == ActorCell.undefinedUid) sb
else sb.append("#").append(uid)
}
override def equals(other: Any): Boolean = {

View file

@ -12,7 +12,6 @@ import akka.event.EventStream
import scala.annotation.tailrec
import java.util.concurrent.ConcurrentHashMap
import akka.event.LoggingAdapter
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.collection.JavaConverters
/**
@ -73,6 +72,17 @@ import scala.collection.JavaConverters
*
* ActorRef does not have a method for terminating the actor it points to, use
* [[akka.actor.ActorRefFactory]]`.stop(child)` for this purpose.
*
* Two actor references are compared equal when they have the same path and point to
* the same actor incarnation. A reference pointing to a terminated actor doesn't compare
* equal to a reference pointing to another (re-created) actor with the same path.
* Actor references acquired with `actorFor` do not always include the full information
* about the underlying actor identity and therefore such references do not always compare
* equal to references acquired with `actorOf`, `sender`, or `context.self`.
*
* If you need to keep track of actor references in a collection and do not care
* about the exact actor incarnation you can use the ``ActorPath`` as key because
* the unique id of the actor is not taken into account when comparing actor paths.
*/
abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable {
scalaRef: InternalActorRef
@ -83,9 +93,13 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
def path: ActorPath
/**
* Comparison only takes address into account.
* Comparison takes path and the unique id of the actor cell into account.
*/
final def compareTo(other: ActorRef) = this.path compareTo other.path
final def compareTo(other: ActorRef) = {
val x = this.path compareTo other.path
if (x == 0) this.path.uid compareTo other.path.uid
else x
}
/**
* Sends the specified message to the sender, i.e. fire-and-forget semantics.
@ -122,15 +136,22 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable
*/
def isTerminated: Boolean
// FIXME RK check if we should scramble the bits or whether they can stay the same
final override def hashCode: Int = path.hashCode
final override def hashCode: Int = {
if (path.uid == ActorCell.undefinedUid) path.hashCode
else path.uid
}
/**
* Equals takes path and the unique id of the actor cell into account.
*/
final override def equals(that: Any): Boolean = that match {
case other: ActorRef path == other.path
case other: ActorRef path.uid == other.path.uid && path == other.path
case _ false
}
override def toString = "Actor[%s]".format(path)
override def toString: String =
if (path.uid == ActorCell.undefinedUid) s"Actor[${path}]"
else s"Actor[${path}#${path.uid}]"
}
/**
@ -270,7 +291,7 @@ private[akka] class LocalActorRef private[akka] (
* object from another thread as soon as we run init.
*/
private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor)
actorCell.init(ThreadLocalRandom.current.nextInt(), sendSupervise = true)
actorCell.init(sendSupervise = true)
protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell =
new ActorCell(system, ref, props, supervisor)
@ -316,11 +337,14 @@ private[akka] class LocalActorRef private[akka] (
* Method for looking up a single child beneath this actor. Override in order
* to inject synthetic actor paths like /temp.
*/
protected def getSingleChild(name: String): InternalActorRef =
actorCell.getChildByName(name) match {
case Some(crs: ChildRestartStats) crs.child.asInstanceOf[InternalActorRef]
case _ Nobody
protected def getSingleChild(name: String): InternalActorRef = {
val (childName, uid) = ActorCell.splitNameAndUid(name)
actorCell.getChildByName(childName) match {
case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid
crs.child.asInstanceOf[InternalActorRef]
case _ Nobody
}
}
override def getChild(names: Iterator[String]): InternalActorRef = {
/*
@ -384,8 +408,8 @@ private[akka] case class SerializedActorRef private (path: String) {
private[akka] object SerializedActorRef {
def apply(path: ActorPath): SerializedActorRef = {
Serialization.currentTransportAddress.value match {
case null new SerializedActorRef(path.toString)
case addr new SerializedActorRef(path.toStringWithAddress(addr))
case null new SerializedActorRef(path.toSerializationFormat)
case addr new SerializedActorRef(path.toSerializationFormatWithAddress(addr))
}
}
}

View file

@ -396,7 +396,7 @@ class LocalActorRefProvider private[akka] (
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match {
case Supervise(_, _, _) // TODO register child in some map to keep track of it and enable shutdown after all dead
case Supervise(_, _) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(_) stop()
case _ log.error(this + " received unexpected system message [" + message + "]")
}

View file

@ -47,7 +47,7 @@ final case class Address private (protocol: String, system: String, host: Option
*/
@transient
override lazy val toString: String = {
val sb = (new StringBuilder(protocol)).append("://").append(system)
val sb = (new java.lang.StringBuilder(protocol)).append("://").append(system)
if (host.isDefined) sb.append('@').append(host.get)
if (port.isDefined) sb.append(':').append(port.get)
@ -76,12 +76,14 @@ object Address {
}
private[akka] trait PathUtils {
protected def split(s: String): List[String] = {
protected def split(s: String, fragment: String): List[String] = {
@tailrec
def rec(pos: Int, acc: List[String]): List[String] = {
val from = s.lastIndexOf('/', pos - 1)
val sub = s.substring(from + 1, pos)
val l = sub :: acc
val l =
if ((fragment ne null) && acc.isEmpty) sub + "#" + fragment :: acc
else sub :: acc
if (from == -1) l else rec(from, l)
}
rec(s.length, Nil)
@ -93,7 +95,7 @@ object RelativeActorPath extends PathUtils {
try {
val uri = new URI(addr)
if (uri.isAbsolute) None
else Some(split(uri.getRawPath))
else Some(split(uri.getRawPath, uri.getRawFragment))
} catch {
case _: URISyntaxException None
}
@ -142,7 +144,7 @@ object ActorPathExtractor extends PathUtils {
val uri = new URI(addr)
uri.getRawPath match {
case null None
case path AddressFromURIString.unapply(uri).map((_, split(path).drop(1)))
case path AddressFromURIString.unapply(uri).map((_, split(path, uri.getRawFragment).drop(1)))
}
} catch {
case _: URISyntaxException None

View file

@ -32,7 +32,7 @@ private[akka] case object ChildNameReserved extends ChildStats
case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L)
extends ChildStats {
var uid: Int = 0
def uid: Int = child.path.uid
//FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies?
def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean =

View file

@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit
import java.util.concurrent.locks.ReentrantLock
import scala.annotation.tailrec
import scala.concurrent.forkjoin.ThreadLocalRandom
import akka.actor.dungeon.ChildrenContainer
import akka.event.Logging.Warning
@ -73,10 +72,9 @@ private[akka] class RepointableActorRef(
def initialize(async: Boolean): this.type =
underlying match {
case null
val uid = ThreadLocalRandom.current.nextInt()
swapCell(new UnstartedCell(system, this, props, supervisor, uid))
swapCell(new UnstartedCell(system, this, props, supervisor))
swapLookup(underlying)
supervisor.sendSystemMessage(Supervise(this, async, uid))
supervisor.sendSystemMessage(Supervise(this, async))
if (!async) point()
this
case other throw new IllegalStateException("initialize called more than once!")
@ -112,7 +110,7 @@ private[akka] class RepointableActorRef(
* unstarted cell. The cell must be fully functional.
*/
def newCell(old: UnstartedCell): Cell =
new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false)
new ActorCell(system, this, props, supervisor).init(sendSupervise = false)
def start(): Unit = ()
@ -144,9 +142,11 @@ private[akka] class RepointableActorRef(
case ".." getParent.getChild(name)
case "" getChild(name)
case other
lookup.getChildByName(other) match {
case Some(crs: ChildRestartStats) crs.child.asInstanceOf[InternalActorRef].getChild(name)
case _ Nobody
val (childName, uid) = ActorCell.splitNameAndUid(other)
lookup.getChildByName(childName) match {
case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid
crs.child.asInstanceOf[InternalActorRef].getChild(name)
case _ Nobody
}
}
} else this
@ -162,8 +162,7 @@ private[akka] class RepointableActorRef(
private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl,
val self: RepointableActorRef,
val props: Props,
val supervisor: InternalActorRef,
val uid: Int) extends Cell {
val supervisor: InternalActorRef) extends Cell {
/*
* This lock protects all accesses to this cells queues. It also ensures

View file

@ -182,7 +182,8 @@ private[akka] trait Children { this: ActorCell ⇒
// this name will either be unreserved or overwritten with a real child below
val actor =
try {
cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name,
val childPath = (cell.self.path / name).withUid(ActorCell.newUid())
cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath,
systemService = systemService, deploy = None, lookupDeploy = true, async = async)
} catch {
case e: InterruptedException

View file

@ -8,6 +8,7 @@ import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, Actor
import akka.dispatch.{ ChildTerminated, Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
import akka.actor.MinimalActorRef
private[akka] trait DeathWatch { this: ActorCell
@ -16,7 +17,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def watch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && !watching.contains(a)) {
if (a != self && !watchingContains(a)) {
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching += a
@ -27,10 +28,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
override final def unwatch(subject: ActorRef): ActorRef = subject match {
case a: InternalActorRef
if (a != self && watching.contains(a)) {
if (a != self && watchingContains(a)) {
maintainAddressTerminatedSubscription(a) {
a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
watching -= a
removeWatching(a)
}
}
a
@ -41,13 +42,28 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
* it will be propagated to user's receive.
*/
protected def watchedActorTerminated(t: Terminated): Unit =
if (watching.contains(t.actor)) {
if (watchingContains(t.actor)) {
maintainAddressTerminatedSubscription(t.actor) {
watching -= t.actor
removeWatching(t.actor)
}
receiveMessage(t)
}
// TODO this should be removed and be replaced with `watching.contains(subject)`
// when all actor references have uid, i.e. actorFor is removed
private def watchingContains(subject: ActorRef): Boolean = {
watching.contains(subject) || (subject.path.uid != ActorCell.undefinedUid &&
watching.contains(new UndefinedUidActorRef(subject)))
}
// TODO this should be removed and be replaced with `watching -= subject`
// when all actor references have uid, i.e. actorFor is removed
private def removeWatching(subject: ActorRef): Unit = {
watching -= subject
if (subject.path.uid != ActorCell.undefinedUid)
watching -= new UndefinedUidActorRef(subject)
}
protected def tellWatchersWeDied(actor: Actor): Unit = {
if (!watchedBy.isEmpty) {
val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false)
@ -168,3 +184,8 @@ private[akka] trait DeathWatch { this: ActorCell ⇒
private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated])
}
private[akka] class UndefinedUidActorRef(ref: ActorRef) extends MinimalActorRef {
override val path = ref.path.withUid(ActorCell.undefinedUid)
override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide")
}

View file

@ -40,7 +40,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
* reasonably different from the previous UID of a possible actor with the same path,
* which can be achieved by using ThreadLocalRandom.current.nextInt().
*/
final def init(uid: Int, sendSupervise: Boolean): this.type = {
final def init(sendSupervise: Boolean): this.type = {
/*
* Create the mailbox and enqueue the Create() message to ensure that
* this is processed before anything else.
@ -49,11 +49,11 @@ private[akka] trait Dispatch { this: ActorCell ⇒
mailbox.setActor(this)
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
mailbox.systemEnqueue(self, Create(uid))
mailbox.systemEnqueue(self, Create())
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false, uid))
parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false))
parent ! NullMessage // read ScalaDoc of NullMessage to see why
}
this

View file

@ -134,7 +134,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
private def finishCreate(): Unit = {
try resumeNonRecursive()
finally clearFailed()
create(uid)
create()
}
protected def terminate() {

View file

@ -82,8 +82,8 @@ private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializab
/**
* INTERNAL API
*/
@SerialVersionUID(-4836972106317757555L)
private[akka] case class Create(uid: Int) extends SystemMessage // send to self from Dispatcher.register
@SerialVersionUID(3L)
private[akka] case class Create() extends SystemMessage // send to self from Dispatcher.register
/**
* INTERNAL API
*/
@ -107,8 +107,8 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from
/**
* INTERNAL API
*/
@SerialVersionUID(3245747602115485675L)
private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
@SerialVersionUID(3L)
private[akka] case class Supervise(child: ActorRef, async: Boolean) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
/**
* INTERNAL API
*/

View file

@ -872,7 +872,7 @@ trait LoggingAdapter {
}
def format(t: String, arg: Any*): String = {
val sb = new StringBuilder(64)
val sb = new java.lang.StringBuilder(64)
var p = 0
var rest = t
while (p < arg.length) {

View file

@ -55,13 +55,13 @@ trait GracefulStopSupport {
internalTarget.sendSystemMessage(Watch(target, ref))
val f = ref.result.future
f onComplete { // Just making sure we're not leaking here
case Success(Terminated(`target`)) ()
case _ internalTarget.sendSystemMessage(Unwatch(target, ref))
case Success(Terminated(a)) if a.path == target.path ()
case _ internalTarget.sendSystemMessage(Unwatch(target, ref))
}
target ! stopMessage
f map {
case Terminated(`target`) true
case _ false
case Terminated(a) if a.path == target.path true
case _ false
}
case s throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'")
}

View file

@ -35,7 +35,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup
" is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.")
} else _props.routerConfig.verifyConfig()
override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false)
override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(sendSupervise = false)
}
@ -76,7 +76,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match {
case _: AutoReceivedMessage Destination(sender, self) :: Nil
case CurrentRoutees sender ! RouterRoutees(_routees); Nil
case CurrentRoutees { sender ! RouterRoutees(_routees); Nil }
case msg if route.isDefinedAt(sender, msg) route(sender, message)
case _ Nil
}
@ -94,13 +94,13 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo
}
/**
* Adds the routees to existing routees.
* Removes the abandoned routees from existing routees.
* Removes death watch of the routees. Doesn't stop the routees.
* Not thread safe, but intended to be called from protected points, such as
* `Resizer.resize`
*/
private[akka] def removeRoutees(abandonedRoutees: immutable.Iterable[ActorRef]): Unit = {
_routees = abandonedRoutees.foldLeft(_routees) { (xs, x) unwatch(x); xs.filterNot(_ == x) }
_routees = abandonedRoutees.foldLeft(_routees) { (xs, x) unwatch(x); xs.filterNot(_.path == x.path) }
}
/**

View file

@ -32,7 +32,7 @@ object Crypt {
}
def hexify(bytes: Array[Byte]): String = {
val builder = new StringBuilder(bytes.length * 2)
val builder = new java.lang.StringBuilder(bytes.length * 2)
bytes.foreach { byte builder.append(hex.charAt((byte & 0xF0) >> 4)).append(hex.charAt(byte & 0xF)) }
builder.toString
}

View file

@ -40,8 +40,8 @@ object Helpers {
final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~"
@tailrec
def base64(l: Long, sb: StringBuilder = new StringBuilder("$")): String = {
sb += base64chars.charAt(l.toInt & 63)
def base64(l: Long, sb: java.lang.StringBuilder = new java.lang.StringBuilder("$")): String = {
sb append base64chars.charAt(l.toInt & 63)
val next = l >>> 6
if (next == 0) sb.toString
else base64(next, sb)

View file

@ -254,7 +254,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def maxDuration = results.map(_.duration).max
def totalClusterStats = results.foldLeft(ClusterStats()){_ :+ _.clusterStats}
def totalClusterStats = results.foldLeft(ClusterStats()) { _ :+ _.clusterStats }
def formatMetrics: String = {
import akka.cluster.Member.addressOrdering
@ -302,7 +302,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}"
def formatStats: String =
(clusterStatsObservedByNode map { case (monitor, stats) => s"${monitor}\t${stats}" }).
(clusterStatsObservedByNode map { case (monitor, stats) s"${monitor}\t${stats}" }).
mkString("ClusterStats\n", "\n", "")
}
@ -403,7 +403,7 @@ object StressMultiJvmSpec extends MultiNodeConfig {
def receive = {
case StatsTick
val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats)
reportTo foreach { _ ! res }
reportTo foreach { _ ! res }
case ReportTo(ref)
reportTo = ref
case Reset
@ -553,7 +553,9 @@ object StressMultiJvmSpec extends MultiNodeConfig {
* Used for remote death watch testing
*/
class Watchee extends Actor {
def receive = Actor.emptyBehavior
def receive = {
case Ping sender ! Pong
}
}
/**
@ -621,6 +623,9 @@ object StressMultiJvmSpec extends MultiNodeConfig {
case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int)
case object Reset
case object Ping
case object Pong
}
class StressMultiJvmNode1 extends StressSpec
@ -700,7 +705,7 @@ abstract class StressSpec
runOn(roles.head) {
val r = clusterResultAggregator
watch(r)
expectMsgPF(remaining) { case Terminated(`r`) true }
expectMsgPF(remaining) { case Terminated(a) if a.path == r.path true }
}
enterBarrier("cluster-result-done-" + step)
}
@ -773,7 +778,9 @@ abstract class StressSpec
}
enterBarrier("watchee-created-" + step)
runOn(roles.head) {
watch(system.actorFor(node(removeRole) / "user" / "watchee"))
system.actorFor(node(removeRole) / "user" / "watchee") ! Ping
expectMsg(Pong)
watch(lastSender)
}
enterBarrier("watch-estabilished-" + step)
@ -790,9 +797,9 @@ abstract class StressSpec
}
runOn(roles.head) {
val expectedRef = system.actorFor(RootActorPath(removeAddress) / "user" / "watchee")
val expectedPath = RootActorPath(removeAddress) / "user" / "watchee"
expectMsgPF(remaining) {
case Terminated(`expectedRef`) true
case Terminated(a) if a.path == expectedPath true
}
}
enterBarrier("watch-verified-" + step)
@ -939,7 +946,7 @@ abstract class StressSpec
workResult.jobsPerSecond.form,
workResult.retryCount, workResult.sendCount)
watch(m)
expectMsgPF(remaining) { case Terminated(`m`) true }
expectMsgPF(remaining) { case Terminated(a) if a.path == m.path true }
workResult
}
@ -947,7 +954,7 @@ abstract class StressSpec
within(duration + 10.seconds) {
val rounds = (duration.toMillis / oneIteration.toMillis).max(1).toInt
val supervisor = system.actorOf(Props[Supervisor], "supervisor")
for (count <- 0 until rounds) {
for (count 0 until rounds) {
createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false)
reportResult {

View file

@ -26,7 +26,8 @@ object ReliableProxy {
} else {
log.debug("received msg of {} from {} with wrong serial", msg.asInstanceOf[AnyRef].getClass, snd)
}
case Terminated(`target`) context stop self
//TODO use exact match of target when all actor references have uid, i.e. actorFor has been removed
case Terminated(a) if a.path == target.path context stop self
}
}

View file

@ -91,6 +91,26 @@ followed by the concatenation of the path elements, from root guardian to the
designated actor; the path elements are the names of the traversed actors and
are separated by slashes.
What is the Difference Between Actor Reference and Path?
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
An actor reference designates a single actor and the life-cycle of the reference
matches that actors life-cycle; an actor path represents a name which may or
may not be inhabited by an actor and the path itself does not have a life-cycle,
it never becomes invalid. You can create an actor path without creating an actor,
but you cannot create an actor reference without creating corresponding actor.
.. note::
That definition does not hold for ``actorFor``, which is one of the reasons why
``actorFor`` is deprecated in favor of ``actorSelection``.
You can create an actor, terminate it, and then create a new actor with the same
actor path. The newly created actor is a new incarnation of the actor. It is not
the same actor. An actor reference to the old incarnation is not valid for the new
incarnation. Messages sent to the old actor reference will not be delivered
to the new incarnation even though they have the same path.
Actor Path Anchors
^^^^^^^^^^^^^^^^^^
@ -180,9 +200,9 @@ the whole lifetime of the actor. In the case of a local actor reference, the
named actor needs to exist before the lookup, or else the acquired reference
will be an :class:`EmptyLocalActorRef`. This will be true even if an actor with
that exact path is created after acquiring the actor reference. For remote actor
references the behaviour is different and sending messages to such a reference
will under the hood look up the actor by path on the remote system for every
message send.
references acquired with `actorFor` the behaviour is different and sending messages
to such a reference will under the hood look up the actor by path on the remote
system for every message send.
Absolute vs. Relative Paths
```````````````````````````
@ -246,18 +266,39 @@ Summary: ``actorOf`` vs. ``actorFor``
- ``actorFor`` only ever looks up an existing actor, i.e. does not create
one.
Actor Reference and Path Equality
---------------------------------
Equality of ``ActorRef`` match the intention that an ``ActorRef`` corresponds to
the target actor incarnation. Two actor references are compared equal when they have
the same path and point to the same actor incarnation. A reference pointing to a
terminated actor does not compare equal to a reference pointing to another (re-created)
actor with the same path. Note that a restart of an actor caused by a failure still
means that it is the same actor incarnation, i.e. a restart is not visible for the
consumer of the ``ActorRef``.
Remote actor references acquired with ``actorFor`` do not include the full
information about the underlying actor identity and therefore such references
do not compare equal to references acquired with ``actorOf``, ``sender``,
or ``context.self``. Because of this ``actorFor`` is deprecated in favor of
``actorSelection``.
If you need to keep track of actor references in a collection and do not care about
the exact actor incarnation you can use the ``ActorPath`` as key, because the identifier
of the target actor is not taken into account when comparing actor paths.
Reusing Actor Paths
-------------------
When an actor is terminated, its path will point to the dead letter mailbox,
When an actor is terminated, its reference will point to the dead letter mailbox,
DeathWatch will publish its final transition and in general it is not expected
to come back to life again (since the actor life cycle does not allow this).
While it is possible to create an actor at a later time with an identical
path—simply due to it being impossible to enforce the opposite without keeping
the set of all actors ever created available—this is not good practice: remote
actor references which “died” suddenly start to work again, but without any
guarantee of ordering between this transition and any other event, hence the
new inhabitant of the path may receive messages which were destined for the
actor references acquired with ``actorFor`` which “died” suddenly start to work
again, but without any guarantee of ordering between this transition and any
other event, hence the new inhabitant of the path may receive messages which were destined for the
previous tenant.
It may be the right thing to do in very specific circumstances, but make sure

View file

@ -67,8 +67,8 @@ public class SerializationDocTestBase {
// within a piece of code that sets it,
// so either you need to supply your own,
// or simply use the local path.
if (transportAddress == null) identifier = theActorRef.path().toString();
else identifier = theActorRef.path().toStringWithAddress(transportAddress);
if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat();
else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress);
// Then just serialize the identifier however you like

View file

@ -120,9 +120,17 @@ you might want to know how to serialize and deserialize them properly, here's th
.. note::
``ActorPath.toStringWithAddress`` only differs from ``toString`` if the
``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the
address does not already have ``host`` and ``port`` components, i.e. it only
inserts address information for local addresses.
``toSerializationFormatWithAddress`` also adds the unique id of the actor, which will
change when the actor is stopped and then created again with the same name.
Sending messages to a reference pointing the old actor will not be delivered
to the new actor. If you do not want this behavior, e.g. in case of long term
storage of the reference, you can use ``toStringWithAddress``, which does not
include the unique id.
This assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization, though,

View file

@ -152,4 +152,44 @@ available via the ``inbound`` boolean field of the event.
New configuration settings are also available, see the remoting documentation for more detail: :ref:`remoting-scala`
ActorRef equality and sending to remote actors
==============================================
Sending messages to an ``ActorRef`` must have the same semantics no matter if the target actor is located
on a remote host or in the same ``ActorSystem`` in the same JVM. This was not always the case. For example
when the target actor is terminated and created again under the same path. Sending to local references
of the previous incarnation of the actor will not be delivered to the new incarnation, but that was the case
for remote references. The reason was that the target actor was looked up by its path on every message
delivery and the path didn't distinguish between the two incarnations of the actor. This has been fixed, and
sending messages to remote references that points to a terminated actor will not be delivered to a new
actor with the same path.
Equality of ``ActorRef`` has been changed to match the intention that an ``ActorRef`` corresponds to the target
actor instance. Two actor references are compared equal when they have the same path and point to the same
actor incarnation. A reference pointing to a terminated actor does not compare equal to a reference pointing
to another (re-created) actor with the same path. Note that a restart of an actor caused by a failure still
means that it's the same actor incarnation, i.e. a restart is not visible for the consumer of the ``ActorRef``.
Equality in 2.1 was only based on the path of the ``ActorRef``. If you need to keep track of actor references
in a collection and do not care about the exact actor incarnation you can use the ``ActorPath`` as key, because
the identifier of the target actor is not taken into account when comparing actor paths.
Remote actor references acquired with ``actorFor`` do not include the full information about the underlying actor
identity and therefore such references do not compare equal to references acquired with ``actorOf``,
``sender``, or ``context.self``. Because of this ``actorFor`` is deprecated, as explained in
:ref:`migration_2.2_actorSelection`.
Note that when a parent actor is restarted its children are by default stopped and re-created, i.e. the child
after the restart will be a different incarnation than the child before the restart. This has always been the
case, but in some situations you might not have noticed, e.g. when comparing such actor references or sending
messages to remote deployed children of a restarted parent.
This may also have implications if you compare the ``ActorRef`` received in a ``Terminated`` message
with an expected ``ActorRef``.
.. _migration_2.2_actorSelection:
Use ``actorSelection`` instead of ``actorFor``
==============================================
FIXME: ticket #3074

View file

@ -171,8 +171,8 @@ package docs.serialization {
// so either you need to supply your own,
// or simply use the local path.
val identifier: String = Serialization.currentTransportAddress.value match {
case null theActorRef.path.toString
case address theActorRef.path.toStringWithAddress(address)
case null theActorRef.path.toSerializationFormat
case address theActorRef.path.toSerializationFormatWithAddress(address)
}
// Then just serialize the identifier however you like
@ -192,7 +192,7 @@ package docs.serialization {
}
def serializeTo(ref: ActorRef, remote: Address): String =
ref.path.toStringWithAddress(ExternalAddress(theActorSystem).addressFor(remote))
ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).addressFor(remote))
//#external-address
}
@ -207,7 +207,7 @@ package docs.serialization {
}
def serializeAkkaDefault(ref: ActorRef): String =
ref.path.toStringWithAddress(ExternalAddress(theActorSystem).addressForAkka)
ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).addressForAkka)
//#external-address-default
}
}

View file

@ -109,9 +109,16 @@ you might want to know how to serialize and deserialize them properly, here's th
.. note::
``ActorPath.toStringWithAddress`` only differs from ``toString`` if the
``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the
address does not already have ``host`` and ``port`` components, i.e. it only
inserts address information for local addresses.
inserts address information for local addresses.
``toSerializationFormatWithAddress`` also adds the unique id of the actor, which will
change when the actor is stopped and then created again with the same name.
Sending messages to a reference pointing the old actor will not be delivered
to the new actor. If you don't want this behavior, e.g. in case of long term
storage of the reference, you can use ``toStringWithAddress``, which doesn't
include the unique id.
This assumes that serialization happens in the context of sending a message
through the remote transport. There are other uses of serialization, though,

View file

@ -241,7 +241,8 @@ private[akka] class RemoteActorRefProvider(
} else {
try {
val localAddress = transport.localAddressForRemote(addr)
val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements
val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements).
withUid(path.uid)
new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d))
} catch {
case NonFatal(e)
@ -280,15 +281,19 @@ private[akka] class RemoteActorRefProvider(
* Called in deserialization of incoming remote messages. In this case the correct local address is known, therefore
* this method is faster than the actorFor above.
*/
def actorForWithLocalAddress(ref: InternalActorRef, path: String, localAddress: Address): InternalActorRef = path match {
case ActorPathExtractor(address, elems)
if (hasAddress(address)) actorFor(rootGuardian, elems)
else new RemoteActorRef(transport, localAddress,
new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
case _ local.actorFor(ref, path)
def actorForWithLocalAddress(ref: InternalActorRef, path: String, localAddress: Address): InternalActorRef = {
path match {
case ActorPathExtractor(address, elems)
if (hasAddress(address)) actorFor(rootGuardian, elems)
else new RemoteActorRef(transport, localAddress,
new RootActorPath(address) / elems, Nobody, props = None, deploy = None)
case _
local.actorFor(ref, path)
}
}
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path)
def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef =
local.actorFor(ref, path)
/**
* Using (checking out) actor on a specific node.
@ -297,7 +302,7 @@ private[akka] class RemoteActorRefProvider(
log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path)
// we dont wait for the ACK, because the remote end will process this command before any other message to the new actor
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(props, deploy, path.toString, supervisor)
actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(props, deploy, path.toSerializationFormat, supervisor)
}
def getExternalAddressFor(addr: Address): Option[Address] = {

View file

@ -12,6 +12,7 @@ import akka.dispatch.Watch
import akka.actor.ActorRefWithCell
import akka.actor.ActorRefScope
import akka.util.Switch
import akka.actor.RootActorPath
/**
* INTERNAL API
@ -54,11 +55,14 @@ private[akka] class RemoteSystemDaemon(
@tailrec
def rec(s: String, n: Int): (InternalActorRef, Int) = {
getChild(s) match {
import akka.actor.ActorCell._
val (childName, uid) = splitNameAndUid(s)
getChild(childName) match {
case null
val last = s.lastIndexOf('/')
if (last == -1) (Nobody, n)
else rec(s.substring(0, last), n + 1)
case ref if uid != undefinedUid && uid != ref.path.uid (Nobody, n)
case ref (ref, n)
}
}
@ -82,15 +86,21 @@ private[akka] class RemoteSystemDaemon(
// TODO RK currently the extracted address is just ignored, is that okay?
// TODO RK canonicalize path so as not to duplicate it always #1446
val subpath = elems.drop(1)
val path = this.path / subpath
val p = this.path / subpath
val childName = {
val s = subpath.mkString("/")
val i = s.indexOf('#')
if (i < 0) s
else s.substring(0, i)
}
val isTerminating = !terminating.whileOff {
val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef],
path, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(subpath.mkString("/"), actor)
p, systemService = false, Some(deploy), lookupDeploy = true, async = false)
addChild(childName, actor)
actor.sendSystemMessage(Watch(actor, this))
actor.start()
}
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address)
if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address)
case _
log.debug("remote path does not match path from message [{}]", message)
}

View file

@ -19,8 +19,8 @@ object ProtobufSerializer {
*/
def serializeActorRef(ref: ActorRef): ActorRefProtocol = {
val identifier: String = Serialization.currentTransportAddress.value match {
case null ref.path.toString
case address ref.path.toStringWithAddress(address)
case null ref.path.toSerializationFormat
case address ref.path.toSerializationFormatWithAddress(address)
}
ActorRefProtocol.newBuilder.setPath(identifier).build
}

View file

@ -182,7 +182,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec {
private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = {
ActorRefProtocol.newBuilder.setPath(
if (ref.path.address.host.isDefined) ref.path.toString else ref.path.toStringWithAddress(defaultAddress)).build()
if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build()
}
private def serializeAddress(address: Address): Option[AddressProtocol] = {

View file

@ -119,14 +119,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
test.local-address = "test://remote-sys@localhost:12346"
}
""").withFallback(system.settings.config).resolve()
val other = ActorSystem("remote-sys", conf)
val otherSystem = ActorSystem("remote-sys", conf)
for (
(name, proto) Seq(
"/gonk" -> "tcp",
"/zagzag" -> "udp",
"/roghtaar" -> "ssl.tcp")
) deploy(system, Deploy(name, scope = RemoteScope(addr(other, proto))))
) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto))))
def addr(sys: ActorSystem, proto: String) =
sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get
@ -135,12 +135,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d)
}
val remote = other.actorOf(Props[Echo2], "echo")
val remote = otherSystem.actorOf(Props[Echo2], "echo")
val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo")
override def afterTermination() {
other.shutdown()
otherSystem.shutdown()
AssociationRegistry.clear()
}
@ -168,16 +168,16 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"send dead letters on remote if actor does not exist" in {
EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept {
system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh"
}(other)
}(otherSystem)
}
"not be exhausted by sending to broken connections" in {
val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]""").
withFallback(other.settings.config)
val moreSystems = Vector.fill(5)(ActorSystem(other.name, tcpOnlyConfig))
withFallback(otherSystem.settings.config)
val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig))
moreSystems foreach (_.actorOf(Props[Echo2], name = "echo"))
val moreRefs = moreSystems map (sys system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo"))
val aliveEcho = system.actorFor(RootActorPath(addr(other, "tcp")) / "user" / "echo")
val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo")
val n = 100
// first everything is up and running
@ -223,6 +223,30 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
expectMsg("postStop")
}
"not send to remote re-created actor with same name" in {
val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1")
echo ! 71
expectMsg(71)
echo ! PoisonPill
expectMsg("postStop")
echo ! 72
expectNoMsg(1.second)
val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1")
echo2 ! 73
expectMsg(73)
// msg to old ActorRef (different uid) should not get through
echo2.path.uid must not be (echo.path.uid)
echo ! 74
expectNoMsg(1.second)
otherSystem.actorFor("/user/otherEcho1") ! 75
expectMsg(75)
system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76
expectMsg(76)
}
"look-up actors across node boundaries" in {
val l = system.actorOf(Props(new Actor {
def receive = {
@ -230,20 +254,41 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
case s: String sender ! context.actorFor(s)
}
}), "looker")
// child is configured to be deployed on remote-sys (otherSystem)
l ! (Props[Echo1], "child")
val r = expectMsgType[ActorRef]
r ! (Props[Echo1], "grandchild")
val remref = expectMsgType[ActorRef]
remref.asInstanceOf[ActorRefScope].isLocal must be(true)
val child = expectMsgType[ActorRef]
// grandchild is configured to be deployed on RemotingSpec (system)
child ! (Props[Echo1], "grandchild")
val grandchild = expectMsgType[ActorRef]
grandchild.asInstanceOf[ActorRefScope].isLocal must be(true)
grandchild ! 43
expectMsg(43)
val myref = system.actorFor(system / "looker" / "child" / "grandchild")
myref.isInstanceOf[RemoteActorRef] must be(true)
myref ! 43
expectMsg(43)
lastSender must be theSameInstanceAs remref
r.asInstanceOf[RemoteActorRef].getParent must be(l)
system.actorFor("/user/looker/child") must be theSameInstanceAs r
myref ! 44
expectMsg(44)
lastSender must be(grandchild)
lastSender must be theSameInstanceAs grandchild
child.asInstanceOf[RemoteActorRef].getParent must be(l)
system.actorFor("/user/looker/child") must be theSameInstanceAs child
Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l
watch(child)
child ! PoisonPill
expectMsg("postStop")
expectMsgType[Terminated].actor must be === child
l ! (Props[Echo1], "child")
val child2 = expectMsgType[ActorRef]
child2 ! 45
expectMsg(45)
// msg to old ActorRef (different uid) should not get through
child2.path.uid must not be (child.path.uid)
child ! 46
expectNoMsg(1.second)
system.actorFor(system / "looker" / "child") ! 47
expectMsg(47)
}
"not fail ask across node boundaries" in {
@ -255,7 +300,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (TCP)" in {
val r = system.actorOf(Props[Echo1], "gonk")
r.path.toString must be ===
s"akka.tcp://remote-sys@localhost:${port(other, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk"
r ! 42
expectMsg(42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -271,7 +316,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (UDP)" in {
val r = system.actorOf(Props[Echo1], "zagzag")
r.path.toString must be ===
s"akka.udp://remote-sys@localhost:${port(other, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag"
r ! 42
expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -287,7 +332,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
"be able to use multiple transports and use the appropriate one (SSL)" in {
val r = system.actorOf(Props[Echo1], "roghtaar")
r.path.toString must be ===
s"akka.ssl.tcp://remote-sys@localhost:${port(other, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar"
r ! 42
expectMsg(10.seconds, 42)
EventFilter[Exception]("crash", occurrences = 1).intercept {
@ -305,7 +350,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
override def beforeTermination() {
system.eventStream.publish(TestEvent.Mute(
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)")))
other.eventStream.publish(TestEvent.Mute(
otherSystem.eventStream.publish(TestEvent.Mute(
EventFilter[EndpointException](),
EventFilter.error(start = "AssociationError"),
EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))