diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala index bdefe3e7b7..2d7b7bc13d 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorDSLSpec.scala @@ -208,14 +208,14 @@ class ActorDSLSpec extends AkkaSpec { // here we pass in the ActorRefFactory explicitly as an example val a = actor(system, "fred")(new Act { val b = actor("barney")(new Act { - whenStarting { context.parent ! ("hello from " + self) } + whenStarting { context.parent ! ("hello from " + self.path) } }) become { case x ⇒ testActor ! x } }) //#nested-actor - expectMsg("hello from Actor[akka://ActorDSLSpec/user/fred/barney]") + expectMsg("hello from akka://ActorDSLSpec/user/fred/barney") lastSender must be(a) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala index 02b69f83d8..6ac5aa8361 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorLookupSpec.scala @@ -71,11 +71,33 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { } "find actors by looking up their string representation" in { + // this is only true for local actor references system.actorFor(c1.path.toString) must be === c1 system.actorFor(c2.path.toString) must be === c2 system.actorFor(c21.path.toString) must be === c21 } + "take actor incarnation into account when comparing actor references" in { + val name = "abcdefg" + val a1 = system.actorOf(p, name) + watch(a1) + a1 ! PoisonPill + expectMsgType[Terminated].actor must be === a1 + + // not equal because it's terminated + system.actorFor(a1.path.toString) must not be (a1) + + val a2 = system.actorOf(p, name) + a2.path must be(a1.path) + a2.path.toString must be(a1.path.toString) + a2 must not be (a1) + a2.toString must not be (a1.toString) + + watch(a2) + a2 ! PoisonPill + expectMsgType[Terminated].actor must be === a2 + } + "find actors by looking up their root-anchored relative path" in { system.actorFor(c1.path.elements.mkString("/", "/", "")) must be === c1 system.actorFor(c2.path.elements.mkString("/", "/", "")) must be === c2 @@ -163,6 +185,9 @@ class ActorLookupSpec extends AkkaSpec with DefaultTimeout { "find actors by looking up their string representation" in { def check(looker: ActorRef, pathOf: ActorRef, result: ActorRef) { Await.result(looker ? LookupString(pathOf.path.toString), timeout.duration) must be === result + // with uid + Await.result(looker ? LookupString(pathOf.path.toSerializationFormat), timeout.duration) must be === result + // with trailing / Await.result(looker ? LookupString(pathOf.path.toString + "/"), timeout.duration) must be === result } for { diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorPathSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorPathSpec.scala new file mode 100644 index 0000000000..ef29bb2603 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorPathSpec.scala @@ -0,0 +1,46 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.actor + +import org.scalatest.WordSpec +import org.scalatest.matchers.MustMatchers + +class ActorPathSpec extends WordSpec with MustMatchers { + + "ActorPath" must { + + "create correct toString" in { + val a = Address("akka.tcp", "mysys") + RootActorPath(a).toString must be("akka.tcp://mysys/") + (RootActorPath(a) / "user").toString must be("akka.tcp://mysys/user") + (RootActorPath(a) / "user" / "foo").toString must be("akka.tcp://mysys/user/foo") + (RootActorPath(a) / "user" / "foo" / "bar").toString must be("akka.tcp://mysys/user/foo/bar") + } + + "create correct toStringWithAddress" in { + val local = Address("akka.tcp", "mysys") + val a = local.copy(host = Some("aaa"), port = Some(2552)) + val b = a.copy(host = Some("bb")) + val c = a.copy(host = Some("cccc")) + val root = RootActorPath(local) + root.toStringWithAddress(a) must be("akka.tcp://mysys@aaa:2552/") + (root / "user").toStringWithAddress(a) must be("akka.tcp://mysys@aaa:2552/user") + (root / "user" / "foo").toStringWithAddress(a) must be("akka.tcp://mysys@aaa:2552/user/foo") + + // root.toStringWithAddress(b) must be("akka.tcp://mysys@bb:2552/") + (root / "user").toStringWithAddress(b) must be("akka.tcp://mysys@bb:2552/user") + (root / "user" / "foo").toStringWithAddress(b) must be("akka.tcp://mysys@bb:2552/user/foo") + + root.toStringWithAddress(c) must be("akka.tcp://mysys@cccc:2552/") + (root / "user").toStringWithAddress(c) must be("akka.tcp://mysys@cccc:2552/user") + (root / "user" / "foo").toStringWithAddress(c) must be("akka.tcp://mysys@cccc:2552/user/foo") + + val rootA = RootActorPath(a) + rootA.toStringWithAddress(b) must be("akka.tcp://mysys@aaa:2552/") + (rootA / "user").toStringWithAddress(b) must be("akka.tcp://mysys@aaa:2552/user") + (rootA / "user" / "foo").toStringWithAddress(b) must be("akka.tcp://mysys@aaa:2552/user/foo") + + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala index fd076463c7..5cf80eac88 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/RelativeActorPathSpec.scala @@ -23,5 +23,8 @@ class RelativeActorPathSpec extends WordSpec with MustMatchers { val name = URLEncoder.encode("akka://ClusterSystem@127.0.0.1:2552", "UTF-8") elements(name) must be(List(name)) } + "match path with uid fragment" in { + elements("foo/bar/baz#1234") must be(List("foo", "bar", "baz#1234")) + } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index a36af3d895..8e91abb570 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -54,7 +54,7 @@ object SupervisorHierarchySpec { } case class Ready(ref: ActorRef) - case class Died(ref: ActorRef) + case class Died(path: ActorPath) case object Abort case object PingOfDeath case object PongOfDeath @@ -112,17 +112,17 @@ object SupervisorHierarchySpec { * upon Restart or would have to be managed by the highest supervisor (which * is undesirable). */ - case class HierarchyState(log: Vector[Event], kids: Map[ActorRef, Int], failConstr: Failure) - val stateCache = new ConcurrentHashMap[ActorRef, HierarchyState]() + case class HierarchyState(log: Vector[Event], kids: Map[ActorPath, Int], failConstr: Failure) + val stateCache = new ConcurrentHashMap[ActorPath, HierarchyState]() class Hierarchy(size: Int, breadth: Int, listener: ActorRef, myLevel: Int) extends Actor { var log = Vector.empty[Event] - stateCache.get(self) match { + stateCache.get(self.path) match { case hs @ HierarchyState(l: Vector[Event], _, f: Failure) if f.failConstr > 0 ⇒ val log = l :+ Event("Failed in constructor", identityHashCode(this)) - stateCache.put(self, hs.copy(log = log, failConstr = f.copy(failConstr = f.failConstr - 1))) + stateCache.put(self.path, hs.copy(log = log, failConstr = f.copy(failConstr = f.failConstr - 1))) throw f case _ ⇒ } @@ -149,7 +149,7 @@ object SupervisorHierarchySpec { log :+= Event("started", identityHashCode(this)) listener ! Ready(self) val s = size - 1 // subtract myself - val kidInfo: Map[ActorRef, Int] = + val kidInfo: Map[ActorPath, Int] = if (s > 0) { val kids = Random.nextInt(Math.min(breadth, s)) + 1 val sizes = s / kids @@ -158,10 +158,10 @@ object SupervisorHierarchySpec { (1 to kids).map { (id) ⇒ val kidSize = if (rest > 0) { rest -= 1; sizes + 1 } else sizes val props = propsTemplate.withCreator(new Hierarchy(kidSize, breadth, listener, myLevel + 1)) - (context.watch(context.actorOf(props, id.toString)), kidSize) + (context.watch(context.actorOf(props, id.toString)).path, kidSize) }(collection.breakOut) } else Map() - stateCache.put(self, HierarchyState(log, kidInfo, null)) + stateCache.put(self.path, HierarchyState(log, kidInfo, null)) } var preRestartCalled = false @@ -178,12 +178,12 @@ object SupervisorHierarchySpec { context.unwatch(child) context.stop(child) } - stateCache.put(self, stateCache.get(self).copy(log = log)) + stateCache.put(self.path, stateCache.get(self.path).copy(log = log)) if (f.failPre > 0) { f.failPre -= 1 throw f } - case _ ⇒ stateCache.put(self, stateCache.get(self).copy(log = log)) + case _ ⇒ stateCache.put(self.path, stateCache.get(self.path).copy(log = log)) } } } @@ -217,14 +217,14 @@ object SupervisorHierarchySpec { }) override def postRestart(cause: Throwable) { - val state = stateCache.get(self) + val state = stateCache.get(self.path) log = state.log log :+= Event("restarted " + suspendCount + " " + cause, identityHashCode(this)) state.kids foreach { - case (child, kidSize) ⇒ - val name = child.path.name - if (context.actorFor(name).isTerminated) { - listener ! Died(child) + case (childPath, kidSize) ⇒ + val name = childPath.name + if (context.child(name).isEmpty) { + listener ! Died(childPath) val props = Props(new Hierarchy(kidSize, breadth, listener, myLevel + 1)).withDispatcher("hierarchy") context.watch(context.actorOf(props, name)) } @@ -243,7 +243,7 @@ object SupervisorHierarchySpec { if (failed || suspended) { listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log) } else { - stateCache.put(self, HierarchyState(log, Map(), null)) + stateCache.put(self.path, HierarchyState(log, Map(), null)) } } @@ -270,7 +270,7 @@ object SupervisorHierarchySpec { val handler: Receive = { case f: Failure ⇒ setFlags(f.directive) - stateCache.put(self, stateCache.get(self).copy(failConstr = f.copy())) + stateCache.put(self.path, stateCache.get(self.path).copy(failConstr = f.copy())) throw f case "ping" ⇒ { Thread.sleep((Random.nextFloat * 1.03).toLong); sender ! "pong" } case Dump(0) ⇒ abort("dump") @@ -281,9 +281,9 @@ object SupervisorHierarchySpec { * (if the unwatch() came too late), so just ignore in this case. */ val name = ref.path.name - if (pongsToGo == 0 && context.actorFor(name).isTerminated) { - listener ! Died(ref) - val kids = stateCache.get(self).kids(ref) + if (pongsToGo == 0 && context.child(name).isEmpty) { + listener ! Died(ref.path) + val kids = stateCache.get(self.path).kids(ref.path) val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy") context.watch(context.actorOf(props, name)) } else { @@ -469,8 +469,8 @@ object SupervisorHierarchySpec { case x if x > 0.03 ⇒ 1 case _ ⇒ 2 } - private def bury(ref: ActorRef): Unit = { - val deadGuy = ref.path.elements + private def bury(path: ActorPath): Unit = { + val deadGuy = path.elements val deadGuySize = deadGuy.size val isChild = (other: ActorRef) ⇒ other.path.elements.take(deadGuySize) == deadGuy idleChildren = idleChildren filterNot isChild @@ -499,8 +499,8 @@ object SupervisorHierarchySpec { else context.system.scheduler.scheduleOnce(workSchedule, self, Work)(context.dispatcher) stay using (x - 1) case Event(Work, _) ⇒ if (pingChildren.isEmpty) goto(LastPing) else goto(Finishing) - case Event(Died(ref), _) ⇒ - bury(ref) + case Event(Died(path), _) ⇒ + bury(path) stay case Event("pong", _) ⇒ pingChildren -= sender @@ -631,7 +631,7 @@ object SupervisorHierarchySpec { case l: LocalActorRef ⇒ l.underlying.actor match { case h: Hierarchy ⇒ errors :+= target -> ErrorLog("forced", h.log) - case _ ⇒ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log) + case _ ⇒ errors :+= target -> ErrorLog("fetched", stateCache.get(target.path).log) } if (depth > 0) { l.underlying.children foreach (getErrors(_, depth - 1)) @@ -644,7 +644,7 @@ object SupervisorHierarchySpec { case l: LocalActorRef ⇒ l.underlying.actor match { case h: Hierarchy ⇒ errors :+= target -> ErrorLog("forced", h.log) - case _ ⇒ errors :+= target -> ErrorLog("fetched", stateCache.get(target).log) + case _ ⇒ errors :+= target -> ErrorLog("fetched", stateCache.get(target.path).log) } if (target != hierarchy) getErrorsUp(l.getParent) } diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala index 10576187d2..6a0bc9abbb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorSpec.scala @@ -393,10 +393,10 @@ class SupervisorSpec extends AkkaSpec with BeforeAndAfterEach with ImplicitSende override def postRestart(reason: Throwable): Unit = testActor ! "parent restarted" def receive = { - case t @ Terminated(`child`) ⇒ testActor ! "child terminated" - case l: TestLatch ⇒ child ! l - case "test" ⇒ sender ! "green" - case "testchild" ⇒ child forward "test" + case Terminated(a) if a.path == child.path ⇒ testActor ! "child terminated" // FIXME case t @ Terminated(`child`) ticket #3156 + case l: TestLatch ⇒ child ! l + case "test" ⇒ sender ! "green" + case "testchild" ⇒ child forward "test" } })) diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 8e1ccd1bdc..14076463f5 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -332,7 +332,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR String.valueOf(encodeHex(ser.serialize(obj, obj.getClass).get)) must be(asExpected) "be preserved for the Create SystemMessage" in { - verify(Create(1234), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465bcdf9f7f2675038d0200014900037569647870000004d27671007e0003") + verify(Create(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465000000000000000302000078707671007e0003") } "be preserved for the Recreate SystemMessage" in { verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720016616b6b612e64697370617463682e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003") @@ -347,7 +347,7 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5465726d696e61746509d66ca68318700f02000078707671007e0003") } "be preserved for the Supervise SystemMessage" in { - verify(Supervise(FakeActorRef("child"), true, 2468), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5375706572766973652d0b363f56ab5feb0200035a00056173796e634900037569644c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787001000009a47372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") + verify(Supervise(FakeActorRef("child"), true), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e53757065727669736500000000000000030200025a00056173796e634c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b7870017372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") } "be preserved for the ChildTerminated SystemMessage" in { verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index e570718149..6b1d42a529 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -16,6 +16,7 @@ import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure import akka.dispatch.NullMessage import scala.concurrent.ExecutionContext +import scala.concurrent.forkjoin.ThreadLocalRandom /** * The actor context - the view of the actor cell from the actor. @@ -304,8 +305,26 @@ private[akka] object ActorCell { final val emptyBehaviorStack: List[Actor.Receive] = Nil final val emptyActorRefSet: Set[ActorRef] = immutable.TreeSet.empty + final val emptyActorRefMap: Map[ActorPath, ActorRef] = immutable.TreeMap.empty final val terminatedProps: Props = Props(() ⇒ throw new IllegalActorStateException("This Actor has been terminated")) + + final val undefinedUid = 0 + + @tailrec final def newUid(): Int = { + // Note that this uid is also used as hashCode in ActorRef, so be careful + // to not break hashing if you change the way uid is generated + val uid = ThreadLocalRandom.current.nextInt() + if (uid == undefinedUid) newUid + else uid + } + + final def splitNameAndUid(name: String): (String, Int) = { + val i = name.indexOf('#') + if (i < 0) (name, undefinedUid) + else (name.substring(0, i), Integer.valueOf(name.substring(i + 1))) + } + } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) @@ -337,7 +356,7 @@ private[akka] class ActorCell( protected final def lookupRoot = self final def provider = system.provider - protected var uid: Int = 0 + protected def uid: Int = self.path.uid private[this] var _actor: Actor = _ def actor: Actor = _actor protected def actor_=(a: Actor): Unit = _actor = a @@ -361,7 +380,7 @@ private[akka] class ActorCell( var todo = message.next try { message match { - case Create(uid) ⇒ create(uid) + case Create() ⇒ create() case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) case Recreate(cause) ⇒ @@ -379,10 +398,10 @@ private[akka] class ActorCell( case null ⇒ faultResume(inRespToFailure) case w: WaitingForChildren ⇒ w.enqueue(message) } - case Terminate() ⇒ terminate() - case Supervise(child, async, uid) ⇒ supervise(child, async, uid) - case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning + case Terminate() ⇒ terminate() + case Supervise(child, async) ⇒ supervise(child, async) + case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning } } catch handleNonFatalOrInterruptedException { e ⇒ handleInvokeFailure(Nil, e) @@ -473,7 +492,7 @@ private[akka] class ActorCell( } } - protected def create(uid: Int): Unit = { + protected def create(): Unit = { def clearOutActorIfNonNull(): Unit = { if (actor != null) { clearActorFields(actor) @@ -481,7 +500,6 @@ private[akka] class ActorCell( } } try { - this.uid = uid val created = newActor() actor = created created.preStart() @@ -505,12 +523,11 @@ private[akka] class ActorCell( } } - private def supervise(child: ActorRef, async: Boolean, uid: Int): Unit = + private def supervise(child: ActorRef, async: Boolean): Unit = if (!isTerminating) { // Supervise is the first thing we get from a new child, so store away the UID for later use in handleFailure() initChild(child) match { case Some(crs) ⇒ - crs.uid = uid handleSupervise(child, async) if (system.settings.DebugLifecycle) publish(Debug(self.path.toString, clazz(actor), "now supervising " + child)) case None ⇒ publish(Error(self.path.toString, clazz(actor), "received Supervise from unregistered child " + child + ", this will not end well")) diff --git a/akka-actor/src/main/scala/akka/actor/ActorPath.scala b/akka-actor/src/main/scala/akka/actor/ActorPath.scala index 4b87568cb0..20800e2785 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorPath.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorPath.scala @@ -6,6 +6,7 @@ import scala.annotation.tailrec import scala.collection.immutable import akka.japi.Util.immutableSeq import java.net.MalformedURLException +import java.lang.{ StringBuilder ⇒ JStringBuilder } object ActorPath { /** @@ -35,6 +36,13 @@ object ActorPath { * as possible, which owing to the bottom-up recursive nature of ActorPath * is sorted by path elements FROM RIGHT TO LEFT, where RootActorPath > * ChildActorPath in case the number of elements is different. + * + * Two actor paths are compared equal when they have the same name and parent + * elements, including the root address information. That does not necessarily + * mean that they point to the same incarnation of the actor if the actor is + * re-created with the same path. In other words, in contrast to how actor + * references are compared the unique id of the actor is not taken into account + * when comparing actor paths. */ @SerialVersionUID(1L) sealed trait ActorPath extends Comparable[ActorPath] with Serializable { @@ -96,6 +104,37 @@ sealed trait ActorPath extends Comparable[ActorPath] with Serializable { * information. */ def toStringWithAddress(address: Address): String + + /** + * Generate full String representation including the + * uid for the actor cell instance as URI fragment. + * This representation should be used as serialized + * representation instead of `toString`. + */ + def toSerializationFormat: String + + /** + * Generate full String representation including the uid for the actor cell + * instance as URI fragment, replacing the Address in the RootActor Path + * with the given one unless this path’s address includes host and port + * information. This representation should be used as serialized + * representation instead of `toStringWithAddress`. + */ + def toSerializationFormatWithAddress(address: Address): String + + /** + * INTERNAL API + * Unique identifier of the actor. Used for distinguishing + * different incarnations of actors with same path (name elements). + */ + private[akka] def uid: Int + + /** + * INTERNAL API + * Creates a new ActorPath with same elements but with the specified `uid`. + */ + private[akka] def withUid(uid: Int): ActorPath + } /** @@ -109,29 +148,55 @@ final case class RootActorPath(address: Address, name: String = "/") extends Act override def root: RootActorPath = this - override def /(child: String): ActorPath = new ChildActorPath(this, child) + override def /(child: String): ActorPath = { + val (childName, uid) = ActorCell.splitNameAndUid(child) + new ChildActorPath(this, childName, uid) + } override def elements: immutable.Iterable[String] = ActorPath.emptyActorPath override val toString: String = address + name + override val toSerializationFormat: String = toString + override def toStringWithAddress(addr: Address): String = if (address.host.isDefined) address + name else addr + name + override def toSerializationFormatWithAddress(addr: Address): String = toStringWithAddress(addr) + override def compareTo(other: ActorPath): Int = other match { case r: RootActorPath ⇒ toString compareTo r.toString // FIXME make this cheaper by comparing address and name in isolation case c: ChildActorPath ⇒ 1 } + + /** + * INTERNAL API + */ + private[akka] def uid: Int = ActorCell.undefinedUid + + /** + * INTERNAL API + */ + override private[akka] def withUid(uid: Int): ActorPath = + if (uid == ActorCell.undefinedUid) this + else throw new IllegalStateException("RootActorPath must not have uid") + } @SerialVersionUID(1L) -final class ChildActorPath(val parent: ActorPath, val name: String) extends ActorPath { +final class ChildActorPath private[akka] (val parent: ActorPath, val name: String, override private[akka] val uid: Int) extends ActorPath { if (name.indexOf('/') != -1) throw new IllegalArgumentException("/ is a path separator and is not legal in ActorPath names: [%s]" format name) + if (name.indexOf('#') != -1) throw new IllegalArgumentException("# is a fragment separator and is not legal in ActorPath names: [%s]" format name) + + def this(parent: ActorPath, name: String) = this(parent, name, ActorCell.undefinedUid) override def address: Address = root.address - override def /(child: String): ActorPath = new ChildActorPath(this, child) + override def /(child: String): ActorPath = { + val (childName, uid) = ActorCell.splitNameAndUid(child) + new ChildActorPath(this, childName, uid) + } override def elements: immutable.Iterable[String] = { @tailrec @@ -151,28 +216,82 @@ final class ChildActorPath(val parent: ActorPath, val name: String) extends Acto rec(this) } - // TODO research whether this should be cached somehow (might be fast enough, but creates GC pressure) - /* - * idea: add one field which holds the total length (because that is known) - * so that only one String needs to be allocated before traversal; this is - * cheaper than any cache + /** + * INTERNAL API */ - override def toString = { - @tailrec - def rec(p: ActorPath, s: StringBuilder): StringBuilder = p match { - case r: RootActorPath ⇒ s.insert(0, r.toString) - case _ ⇒ rec(p.parent, s.insert(0, '/').insert(0, p.name)) - } - rec(parent, new StringBuilder(32).append(name)).toString + override private[akka] def withUid(uid: Int): ActorPath = + if (uid == this.uid) this + else new ChildActorPath(parent, name, uid) + + override def toString: String = { + val length = toStringLength + buildToString(new JStringBuilder(length), length, 0, _.toString).toString } - override def toStringWithAddress(addr: Address) = { + override def toSerializationFormat: String = { + val length = toStringLength + val sb = buildToString(new JStringBuilder(length + 12), length, 0, _.toString) + appendUidFragment(sb).toString + } + + private def toStringLength: Int = toStringOffset + name.length + + private val toStringOffset: Int = parent match { + case r: RootActorPath ⇒ r.address.toString.length + r.name.length + case c: ChildActorPath ⇒ c.toStringLength + 1 + } + + override def toStringWithAddress(addr: Address): String = { + val diff = addressStringLengthDiff(addr) + val length = toStringLength + diff + buildToString(new JStringBuilder(length), length, diff, _.toStringWithAddress(addr)).toString + } + + override def toSerializationFormatWithAddress(addr: Address): String = { + val diff = addressStringLengthDiff(addr) + val length = toStringLength + diff + val sb = buildToString(new JStringBuilder(length + 12), length, diff, _.toStringWithAddress(addr)) + appendUidFragment(sb).toString + } + + private def addressStringLengthDiff(addr: Address): Int = { + val r = root + if (r.address.host.isDefined) 0 + else (addr.toString.length - r.address.toString.length) + } + + /** + * Optimized toString construction. Used by `toString`, `toSerializationFormat`, + * and friends `WithAddress` + * @param sb builder that will be modified (and same instance is returned) + * @param length pre-calculated length of the to be constructed String, not + * necessarily same as sb.capacity because more things may be appended to the + * sb afterwards + * @param diff difference in offset for each child element, due to different address + * @param rootString function to construct the root element string + */ + private def buildToString(sb: JStringBuilder, length: Int, diff: Int, rootString: RootActorPath ⇒ String): JStringBuilder = { @tailrec - def rec(p: ActorPath, s: StringBuilder): StringBuilder = p match { - case r: RootActorPath ⇒ s.insert(0, r.toStringWithAddress(addr)) - case _ ⇒ rec(p.parent, s.insert(0, '/').insert(0, p.name)) + def rec(p: ActorPath): JStringBuilder = p match { + case r: RootActorPath ⇒ + val rootStr = rootString(r) + sb.replace(0, rootStr.length, rootStr) + case c: ChildActorPath ⇒ + val start = c.toStringOffset + diff + val end = start + c.name.length + sb.replace(start, end, c.name) + if (c ne this) + sb.replace(end, end + 1, "/") + rec(c.parent) } - rec(parent, new StringBuilder(32).append(name)).toString + + sb.setLength(length) + rec(this) + } + + private def appendUidFragment(sb: JStringBuilder): JStringBuilder = { + if (uid == ActorCell.undefinedUid) sb + else sb.append("#").append(uid) } override def equals(other: Any): Boolean = { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 99aaaa69fa..ff42918ebe 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -12,7 +12,6 @@ import akka.event.EventStream import scala.annotation.tailrec import java.util.concurrent.ConcurrentHashMap import akka.event.LoggingAdapter -import scala.concurrent.forkjoin.ThreadLocalRandom import scala.collection.JavaConverters /** @@ -73,6 +72,17 @@ import scala.collection.JavaConverters * * ActorRef does not have a method for terminating the actor it points to, use * [[akka.actor.ActorRefFactory]]`.stop(child)` for this purpose. + * + * Two actor references are compared equal when they have the same path and point to + * the same actor incarnation. A reference pointing to a terminated actor doesn't compare + * equal to a reference pointing to another (re-created) actor with the same path. + * Actor references acquired with `actorFor` do not always include the full information + * about the underlying actor identity and therefore such references do not always compare + * equal to references acquired with `actorOf`, `sender`, or `context.self`. + * + * If you need to keep track of actor references in a collection and do not care + * about the exact actor incarnation you can use the ``ActorPath`` as key because + * the unique id of the actor is not taken into account when comparing actor paths. */ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable { scalaRef: InternalActorRef ⇒ @@ -83,9 +93,13 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable def path: ActorPath /** - * Comparison only takes address into account. + * Comparison takes path and the unique id of the actor cell into account. */ - final def compareTo(other: ActorRef) = this.path compareTo other.path + final def compareTo(other: ActorRef) = { + val x = this.path compareTo other.path + if (x == 0) this.path.uid compareTo other.path.uid + else x + } /** * Sends the specified message to the sender, i.e. fire-and-forget semantics. @@ -122,15 +136,22 @@ abstract class ActorRef extends java.lang.Comparable[ActorRef] with Serializable */ def isTerminated: Boolean - // FIXME RK check if we should scramble the bits or whether they can stay the same - final override def hashCode: Int = path.hashCode + final override def hashCode: Int = { + if (path.uid == ActorCell.undefinedUid) path.hashCode + else path.uid + } + /** + * Equals takes path and the unique id of the actor cell into account. + */ final override def equals(that: Any): Boolean = that match { - case other: ActorRef ⇒ path == other.path + case other: ActorRef ⇒ path.uid == other.path.uid && path == other.path case _ ⇒ false } - override def toString = "Actor[%s]".format(path) + override def toString: String = + if (path.uid == ActorCell.undefinedUid) s"Actor[${path}]" + else s"Actor[${path}#${path.uid}]" } /** @@ -270,7 +291,7 @@ private[akka] class LocalActorRef private[akka] ( * object from another thread as soon as we run init. */ private val actorCell: ActorCell = newActorCell(_system, this, _props, _supervisor) - actorCell.init(ThreadLocalRandom.current.nextInt(), sendSupervise = true) + actorCell.init(sendSupervise = true) protected def newActorCell(system: ActorSystemImpl, ref: InternalActorRef, props: Props, supervisor: InternalActorRef): ActorCell = new ActorCell(system, ref, props, supervisor) @@ -316,11 +337,14 @@ private[akka] class LocalActorRef private[akka] ( * Method for looking up a single child beneath this actor. Override in order * to inject “synthetic” actor paths like “/temp”. */ - protected def getSingleChild(name: String): InternalActorRef = - actorCell.getChildByName(name) match { - case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef] - case _ ⇒ Nobody + protected def getSingleChild(name: String): InternalActorRef = { + val (childName, uid) = ActorCell.splitNameAndUid(name) + actorCell.getChildByName(childName) match { + case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid ⇒ + crs.child.asInstanceOf[InternalActorRef] + case _ ⇒ Nobody } + } override def getChild(names: Iterator[String]): InternalActorRef = { /* @@ -384,8 +408,8 @@ private[akka] case class SerializedActorRef private (path: String) { private[akka] object SerializedActorRef { def apply(path: ActorPath): SerializedActorRef = { Serialization.currentTransportAddress.value match { - case null ⇒ new SerializedActorRef(path.toString) - case addr ⇒ new SerializedActorRef(path.toStringWithAddress(addr)) + case null ⇒ new SerializedActorRef(path.toSerializationFormat) + case addr ⇒ new SerializedActorRef(path.toSerializationFormatWithAddress(addr)) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index cb262495fe..4c6b13d9b6 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -396,7 +396,7 @@ class LocalActorRefProvider private[akka] ( override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(_, _, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead case ChildTerminated(_) ⇒ stop() case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } diff --git a/akka-actor/src/main/scala/akka/actor/Address.scala b/akka-actor/src/main/scala/akka/actor/Address.scala index 72f0a0c38a..8bde7ef8cc 100644 --- a/akka-actor/src/main/scala/akka/actor/Address.scala +++ b/akka-actor/src/main/scala/akka/actor/Address.scala @@ -47,7 +47,7 @@ final case class Address private (protocol: String, system: String, host: Option */ @transient override lazy val toString: String = { - val sb = (new StringBuilder(protocol)).append("://").append(system) + val sb = (new java.lang.StringBuilder(protocol)).append("://").append(system) if (host.isDefined) sb.append('@').append(host.get) if (port.isDefined) sb.append(':').append(port.get) @@ -76,12 +76,14 @@ object Address { } private[akka] trait PathUtils { - protected def split(s: String): List[String] = { + protected def split(s: String, fragment: String): List[String] = { @tailrec def rec(pos: Int, acc: List[String]): List[String] = { val from = s.lastIndexOf('/', pos - 1) val sub = s.substring(from + 1, pos) - val l = sub :: acc + val l = + if ((fragment ne null) && acc.isEmpty) sub + "#" + fragment :: acc + else sub :: acc if (from == -1) l else rec(from, l) } rec(s.length, Nil) @@ -93,7 +95,7 @@ object RelativeActorPath extends PathUtils { try { val uri = new URI(addr) if (uri.isAbsolute) None - else Some(split(uri.getRawPath)) + else Some(split(uri.getRawPath, uri.getRawFragment)) } catch { case _: URISyntaxException ⇒ None } @@ -142,7 +144,7 @@ object ActorPathExtractor extends PathUtils { val uri = new URI(addr) uri.getRawPath match { case null ⇒ None - case path ⇒ AddressFromURIString.unapply(uri).map((_, split(path).drop(1))) + case path ⇒ AddressFromURIString.unapply(uri).map((_, split(path, uri.getRawFragment).drop(1))) } } catch { case _: URISyntaxException ⇒ None diff --git a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala index 171290b4d1..4612bff4e5 100644 --- a/akka-actor/src/main/scala/akka/actor/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/FaultHandling.scala @@ -32,7 +32,7 @@ private[akka] case object ChildNameReserved extends ChildStats case class ChildRestartStats(child: ActorRef, var maxNrOfRetriesCount: Int = 0, var restartTimeWindowStartNanos: Long = 0L) extends ChildStats { - var uid: Int = 0 + def uid: Int = child.path.uid //FIXME How about making ChildRestartStats immutable and then move these methods into the actual supervisor strategies? def requestRestartPermission(retriesWindow: (Option[Int], Option[Int])): Boolean = diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 5b265a6055..8a1a6a6a03 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -10,7 +10,6 @@ import java.util.concurrent.TimeUnit import java.util.concurrent.locks.ReentrantLock import scala.annotation.tailrec -import scala.concurrent.forkjoin.ThreadLocalRandom import akka.actor.dungeon.ChildrenContainer import akka.event.Logging.Warning @@ -73,10 +72,9 @@ private[akka] class RepointableActorRef( def initialize(async: Boolean): this.type = underlying match { case null ⇒ - val uid = ThreadLocalRandom.current.nextInt() - swapCell(new UnstartedCell(system, this, props, supervisor, uid)) + swapCell(new UnstartedCell(system, this, props, supervisor)) swapLookup(underlying) - supervisor.sendSystemMessage(Supervise(this, async, uid)) + supervisor.sendSystemMessage(Supervise(this, async)) if (!async) point() this case other ⇒ throw new IllegalStateException("initialize called more than once!") @@ -112,7 +110,7 @@ private[akka] class RepointableActorRef( * unstarted cell. The cell must be fully functional. */ def newCell(old: UnstartedCell): Cell = - new ActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false) + new ActorCell(system, this, props, supervisor).init(sendSupervise = false) def start(): Unit = () @@ -144,9 +142,11 @@ private[akka] class RepointableActorRef( case ".." ⇒ getParent.getChild(name) case "" ⇒ getChild(name) case other ⇒ - lookup.getChildByName(other) match { - case Some(crs: ChildRestartStats) ⇒ crs.child.asInstanceOf[InternalActorRef].getChild(name) - case _ ⇒ Nobody + val (childName, uid) = ActorCell.splitNameAndUid(other) + lookup.getChildByName(childName) match { + case Some(crs: ChildRestartStats) if uid == ActorCell.undefinedUid || uid == crs.uid ⇒ + crs.child.asInstanceOf[InternalActorRef].getChild(name) + case _ ⇒ Nobody } } } else this @@ -162,8 +162,7 @@ private[akka] class RepointableActorRef( private[akka] class UnstartedCell(val systemImpl: ActorSystemImpl, val self: RepointableActorRef, val props: Props, - val supervisor: InternalActorRef, - val uid: Int) extends Cell { + val supervisor: InternalActorRef) extends Cell { /* * This lock protects all accesses to this cell’s queues. It also ensures diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala index 9c64d79a64..c419f1a14f 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Children.scala @@ -182,7 +182,8 @@ private[akka] trait Children { this: ActorCell ⇒ // this name will either be unreserved or overwritten with a real child below val actor = try { - cell.provider.actorOf(cell.systemImpl, props, cell.self, cell.self.path / name, + val childPath = (cell.self.path / name).withUid(ActorCell.newUid()) + cell.provider.actorOf(cell.systemImpl, props, cell.self, childPath, systemService = systemService, deploy = None, lookupDeploy = true, async = async) } catch { case e: InterruptedException ⇒ diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index a0fd5f1632..cff5665ad3 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -8,6 +8,7 @@ import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, Actor import akka.dispatch.{ ChildTerminated, Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal +import akka.actor.MinimalActorRef private[akka] trait DeathWatch { this: ActorCell ⇒ @@ -16,7 +17,7 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def watch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ - if (a != self && !watching.contains(a)) { + if (a != self && !watchingContains(a)) { maintainAddressTerminatedSubscription(a) { a.sendSystemMessage(Watch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ watching += a @@ -27,10 +28,10 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ override final def unwatch(subject: ActorRef): ActorRef = subject match { case a: InternalActorRef ⇒ - if (a != self && watching.contains(a)) { + if (a != self && watchingContains(a)) { maintainAddressTerminatedSubscription(a) { a.sendSystemMessage(Unwatch(a, self)) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - watching -= a + removeWatching(a) } } a @@ -41,13 +42,28 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ * it will be propagated to user's receive. */ protected def watchedActorTerminated(t: Terminated): Unit = - if (watching.contains(t.actor)) { + if (watchingContains(t.actor)) { maintainAddressTerminatedSubscription(t.actor) { - watching -= t.actor + removeWatching(t.actor) } receiveMessage(t) } + // TODO this should be removed and be replaced with `watching.contains(subject)` + // when all actor references have uid, i.e. actorFor is removed + private def watchingContains(subject: ActorRef): Boolean = { + watching.contains(subject) || (subject.path.uid != ActorCell.undefinedUid && + watching.contains(new UndefinedUidActorRef(subject))) + } + + // TODO this should be removed and be replaced with `watching -= subject` + // when all actor references have uid, i.e. actorFor is removed + private def removeWatching(subject: ActorRef): Unit = { + watching -= subject + if (subject.path.uid != ActorCell.undefinedUid) + watching -= new UndefinedUidActorRef(subject) + } + protected def tellWatchersWeDied(actor: Actor): Unit = { if (!watchedBy.isEmpty) { val terminated = Terminated(self)(existenceConfirmed = true, addressTerminated = false) @@ -168,3 +184,8 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ private def subscribeAddressTerminated(): Unit = system.eventStream.subscribe(self, classOf[AddressTerminated]) } + +private[akka] class UndefinedUidActorRef(ref: ActorRef) extends MinimalActorRef { + override val path = ref.path.withUid(ActorCell.undefinedUid) + override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide") +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index a931164d43..075ff49a0b 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -40,7 +40,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ * reasonably different from the previous UID of a possible actor with the same path, * which can be achieved by using ThreadLocalRandom.current.nextInt(). */ - final def init(uid: Int, sendSupervise: Boolean): this.type = { + final def init(sendSupervise: Boolean): this.type = { /* * Create the mailbox and enqueue the Create() message to ensure that * this is processed before anything else. @@ -49,11 +49,11 @@ private[akka] trait Dispatch { this: ActorCell ⇒ mailbox.setActor(this) // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - mailbox.systemEnqueue(self, Create(uid)) + mailbox.systemEnqueue(self, Create()) if (sendSupervise) { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false, uid)) + parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false)) parent ! NullMessage // read ScalaDoc of NullMessage to see why } this diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index 7764d966e0..ac5d1a48e0 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -134,7 +134,7 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ private def finishCreate(): Unit = { try resumeNonRecursive() finally clearFailed() - create(uid) + create() } protected def terminate() { diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 3ec4e24e90..b0e97f2f0c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -82,8 +82,8 @@ private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializab /** * INTERNAL API */ -@SerialVersionUID(-4836972106317757555L) -private[akka] case class Create(uid: Int) extends SystemMessage // send to self from Dispatcher.register +@SerialVersionUID(3L) +private[akka] case class Create() extends SystemMessage // send to self from Dispatcher.register /** * INTERNAL API */ @@ -107,8 +107,8 @@ private[akka] case class Terminate() extends SystemMessage // sent to self from /** * INTERNAL API */ -@SerialVersionUID(3245747602115485675L) -private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +@SerialVersionUID(3L) +private[akka] case class Supervise(child: ActorRef, async: Boolean) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start /** * INTERNAL API */ diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 53504c5d2c..266958bf83 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -872,7 +872,7 @@ trait LoggingAdapter { } def format(t: String, arg: Any*): String = { - val sb = new StringBuilder(64) + val sb = new java.lang.StringBuilder(64) var p = 0 var rest = t while (p < arg.length) { diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index dba1cd6461..84980e4ee0 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -55,13 +55,13 @@ trait GracefulStopSupport { internalTarget.sendSystemMessage(Watch(target, ref)) val f = ref.result.future f onComplete { // Just making sure we're not leaking here - case Success(Terminated(`target`)) ⇒ () - case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) + case Success(Terminated(a)) if a.path == target.path ⇒ () + case _ ⇒ internalTarget.sendSystemMessage(Unwatch(target, ref)) } target ! stopMessage f map { - case Terminated(`target`) ⇒ true - case _ ⇒ false + case Terminated(a) if a.path == target.path ⇒ true + case _ ⇒ false } case s ⇒ throw new IllegalArgumentException("Unknown ActorSystem implementation: '" + s + "'") } diff --git a/akka-actor/src/main/scala/akka/routing/Routing.scala b/akka-actor/src/main/scala/akka/routing/Routing.scala index f09168ed73..c880496077 100644 --- a/akka-actor/src/main/scala/akka/routing/Routing.scala +++ b/akka-actor/src/main/scala/akka/routing/Routing.scala @@ -35,7 +35,7 @@ private[akka] class RoutedActorRef(_system: ActorSystemImpl, _props: Props, _sup " is invalid - you can not use a 'BalancingDispatcher' as a Router's dispatcher, you can however use it for the routees.") } else _props.routerConfig.verifyConfig() - override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(old.uid, sendSupervise = false) + override def newCell(old: UnstartedCell): Cell = new RoutedActorCell(system, this, props, supervisor).init(sendSupervise = false) } @@ -76,7 +76,7 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo def applyRoute(sender: ActorRef, message: Any): immutable.Iterable[Destination] = message match { case _: AutoReceivedMessage ⇒ Destination(sender, self) :: Nil - case CurrentRoutees ⇒ sender ! RouterRoutees(_routees); Nil + case CurrentRoutees ⇒ { sender ! RouterRoutees(_routees); Nil } case msg if route.isDefinedAt(sender, msg) ⇒ route(sender, message) case _ ⇒ Nil } @@ -94,13 +94,13 @@ private[akka] class RoutedActorCell(_system: ActorSystemImpl, _ref: InternalActo } /** - * Adds the routees to existing routees. + * Removes the abandoned routees from existing routees. * Removes death watch of the routees. Doesn't stop the routees. * Not thread safe, but intended to be called from protected points, such as * `Resizer.resize` */ private[akka] def removeRoutees(abandonedRoutees: immutable.Iterable[ActorRef]): Unit = { - _routees = abandonedRoutees.foldLeft(_routees) { (xs, x) ⇒ unwatch(x); xs.filterNot(_ == x) } + _routees = abandonedRoutees.foldLeft(_routees) { (xs, x) ⇒ unwatch(x); xs.filterNot(_.path == x.path) } } /** diff --git a/akka-actor/src/main/scala/akka/util/Crypt.scala b/akka-actor/src/main/scala/akka/util/Crypt.scala index 86b98a2cfd..b398ae98ea 100644 --- a/akka-actor/src/main/scala/akka/util/Crypt.scala +++ b/akka-actor/src/main/scala/akka/util/Crypt.scala @@ -32,7 +32,7 @@ object Crypt { } def hexify(bytes: Array[Byte]): String = { - val builder = new StringBuilder(bytes.length * 2) + val builder = new java.lang.StringBuilder(bytes.length * 2) bytes.foreach { byte ⇒ builder.append(hex.charAt((byte & 0xF0) >> 4)).append(hex.charAt(byte & 0xF)) } builder.toString } diff --git a/akka-actor/src/main/scala/akka/util/Helpers.scala b/akka-actor/src/main/scala/akka/util/Helpers.scala index c0963b30fa..f62724e065 100644 --- a/akka-actor/src/main/scala/akka/util/Helpers.scala +++ b/akka-actor/src/main/scala/akka/util/Helpers.scala @@ -40,8 +40,8 @@ object Helpers { final val base64chars = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789+~" @tailrec - def base64(l: Long, sb: StringBuilder = new StringBuilder("$")): String = { - sb += base64chars.charAt(l.toInt & 63) + def base64(l: Long, sb: java.lang.StringBuilder = new java.lang.StringBuilder("$")): String = { + sb append base64chars.charAt(l.toInt & 63) val next = l >>> 6 if (next == 0) sb.toString else base64(next, sb) diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala index ec851b1594..fdf33cb391 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/StressSpec.scala @@ -254,7 +254,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def maxDuration = results.map(_.duration).max - def totalClusterStats = results.foldLeft(ClusterStats()){_ :+ _.clusterStats} + def totalClusterStats = results.foldLeft(ClusterStats()) { _ :+ _.clusterStats } def formatMetrics: String = { import akka.cluster.Member.addressOrdering @@ -302,7 +302,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { s"${monitor}\t${subject}\t${phi.count}\t${phi.countAboveOne}\t${phi.max.form}" def formatStats: String = - (clusterStatsObservedByNode map { case (monitor, stats) => s"${monitor}\t${stats}" }). + (clusterStatsObservedByNode map { case (monitor, stats) ⇒ s"${monitor}\t${stats}" }). mkString("ClusterStats\n", "\n", "") } @@ -403,7 +403,7 @@ object StressMultiJvmSpec extends MultiNodeConfig { def receive = { case StatsTick ⇒ val res = StatsResult(cluster.selfAddress, cluster.readView.latestStats :- startStats) - reportTo foreach { _ ! res } + reportTo foreach { _ ! res } case ReportTo(ref) ⇒ reportTo = ref case Reset ⇒ @@ -553,7 +553,9 @@ object StressMultiJvmSpec extends MultiNodeConfig { * Used for remote death watch testing */ class Watchee extends Actor { - def receive = Actor.emptyBehavior + def receive = { + case Ping ⇒ sender ! Pong + } } /** @@ -621,6 +623,9 @@ object StressMultiJvmSpec extends MultiNodeConfig { case class ChildrenCount(numberOfChildren: Int, numberOfChildRestarts: Int) case object Reset + case object Ping + case object Pong + } class StressMultiJvmNode1 extends StressSpec @@ -700,7 +705,7 @@ abstract class StressSpec runOn(roles.head) { val r = clusterResultAggregator watch(r) - expectMsgPF(remaining) { case Terminated(`r`) ⇒ true } + expectMsgPF(remaining) { case Terminated(a) if a.path == r.path ⇒ true } } enterBarrier("cluster-result-done-" + step) } @@ -773,7 +778,9 @@ abstract class StressSpec } enterBarrier("watchee-created-" + step) runOn(roles.head) { - watch(system.actorFor(node(removeRole) / "user" / "watchee")) + system.actorFor(node(removeRole) / "user" / "watchee") ! Ping + expectMsg(Pong) + watch(lastSender) } enterBarrier("watch-estabilished-" + step) @@ -790,9 +797,9 @@ abstract class StressSpec } runOn(roles.head) { - val expectedRef = system.actorFor(RootActorPath(removeAddress) / "user" / "watchee") + val expectedPath = RootActorPath(removeAddress) / "user" / "watchee" expectMsgPF(remaining) { - case Terminated(`expectedRef`) ⇒ true + case Terminated(a) if a.path == expectedPath ⇒ true } } enterBarrier("watch-verified-" + step) @@ -939,7 +946,7 @@ abstract class StressSpec workResult.jobsPerSecond.form, workResult.retryCount, workResult.sendCount) watch(m) - expectMsgPF(remaining) { case Terminated(`m`) ⇒ true } + expectMsgPF(remaining) { case Terminated(a) if a.path == m.path ⇒ true } workResult } @@ -947,7 +954,7 @@ abstract class StressSpec within(duration + 10.seconds) { val rounds = (duration.toMillis / oneIteration.toMillis).max(1).toInt val supervisor = system.actorOf(Props[Supervisor], "supervisor") - for (count <- 0 until rounds) { + for (count ← 0 until rounds) { createResultAggregator(title, expectedResults = nbrUsedRoles, includeInHistory = false) reportResult { diff --git a/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala b/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala index b3e24e2361..f34b73d589 100644 --- a/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala +++ b/akka-contrib/src/main/scala/akka/contrib/pattern/ReliableProxy.scala @@ -26,7 +26,8 @@ object ReliableProxy { } else { log.debug("received msg of {} from {} with wrong serial", msg.asInstanceOf[AnyRef].getClass, snd) } - case Terminated(`target`) ⇒ context stop self + //TODO use exact match of target when all actor references have uid, i.e. actorFor has been removed + case Terminated(a) if a.path == target.path ⇒ context stop self } } diff --git a/akka-docs/rst/general/addressing.rst b/akka-docs/rst/general/addressing.rst index dfd7ea3946..12d1d81417 100644 --- a/akka-docs/rst/general/addressing.rst +++ b/akka-docs/rst/general/addressing.rst @@ -91,6 +91,26 @@ followed by the concatenation of the path elements, from root guardian to the designated actor; the path elements are the names of the traversed actors and are separated by slashes. +What is the Difference Between Actor Reference and Path? +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +An actor reference designates a single actor and the life-cycle of the reference +matches that actor’s life-cycle; an actor path represents a name which may or +may not be inhabited by an actor and the path itself does not have a life-cycle, +it never becomes invalid. You can create an actor path without creating an actor, +but you cannot create an actor reference without creating corresponding actor. + +.. note:: + + That definition does not hold for ``actorFor``, which is one of the reasons why + ``actorFor`` is deprecated in favor of ``actorSelection``. + +You can create an actor, terminate it, and then create a new actor with the same +actor path. The newly created actor is a new incarnation of the actor. It is not +the same actor. An actor reference to the old incarnation is not valid for the new +incarnation. Messages sent to the old actor reference will not be delivered +to the new incarnation even though they have the same path. + Actor Path Anchors ^^^^^^^^^^^^^^^^^^ @@ -180,9 +200,9 @@ the whole lifetime of the actor. In the case of a local actor reference, the named actor needs to exist before the lookup, or else the acquired reference will be an :class:`EmptyLocalActorRef`. This will be true even if an actor with that exact path is created after acquiring the actor reference. For remote actor -references the behaviour is different and sending messages to such a reference -will under the hood look up the actor by path on the remote system for every -message send. +references acquired with `actorFor` the behaviour is different and sending messages +to such a reference will under the hood look up the actor by path on the remote +system for every message send. Absolute vs. Relative Paths ``````````````````````````` @@ -246,18 +266,39 @@ Summary: ``actorOf`` vs. ``actorFor`` - ``actorFor`` only ever looks up an existing actor, i.e. does not create one. +Actor Reference and Path Equality +--------------------------------- + +Equality of ``ActorRef`` match the intention that an ``ActorRef`` corresponds to +the target actor incarnation. Two actor references are compared equal when they have +the same path and point to the same actor incarnation. A reference pointing to a +terminated actor does not compare equal to a reference pointing to another (re-created) +actor with the same path. Note that a restart of an actor caused by a failure still +means that it is the same actor incarnation, i.e. a restart is not visible for the +consumer of the ``ActorRef``. + +Remote actor references acquired with ``actorFor`` do not include the full +information about the underlying actor identity and therefore such references +do not compare equal to references acquired with ``actorOf``, ``sender``, +or ``context.self``. Because of this ``actorFor`` is deprecated in favor of +``actorSelection``. + +If you need to keep track of actor references in a collection and do not care about +the exact actor incarnation you can use the ``ActorPath`` as key, because the identifier +of the target actor is not taken into account when comparing actor paths. + Reusing Actor Paths ------------------- -When an actor is terminated, its path will point to the dead letter mailbox, +When an actor is terminated, its reference will point to the dead letter mailbox, DeathWatch will publish its final transition and in general it is not expected to come back to life again (since the actor life cycle does not allow this). While it is possible to create an actor at a later time with an identical path—simply due to it being impossible to enforce the opposite without keeping the set of all actors ever created available—this is not good practice: remote -actor references which “died” suddenly start to work again, but without any -guarantee of ordering between this transition and any other event, hence the -new inhabitant of the path may receive messages which were destined for the +actor references acquired with ``actorFor`` which “died” suddenly start to work +again, but without any guarantee of ordering between this transition and any +other event, hence the new inhabitant of the path may receive messages which were destined for the previous tenant. It may be the right thing to do in very specific circumstances, but make sure diff --git a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java index bd4f246d18..3eac0502d2 100644 --- a/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java +++ b/akka-docs/rst/java/code/docs/serialization/SerializationDocTestBase.java @@ -67,8 +67,8 @@ public class SerializationDocTestBase { // within a piece of code that sets it, // so either you need to supply your own, // or simply use the local path. - if (transportAddress == null) identifier = theActorRef.path().toString(); - else identifier = theActorRef.path().toStringWithAddress(transportAddress); + if (transportAddress == null) identifier = theActorRef.path().toSerializationFormat(); + else identifier = theActorRef.path().toSerializationFormatWithAddress(transportAddress); // Then just serialize the identifier however you like diff --git a/akka-docs/rst/java/serialization.rst b/akka-docs/rst/java/serialization.rst index 4668597c4f..7d72b6ef43 100644 --- a/akka-docs/rst/java/serialization.rst +++ b/akka-docs/rst/java/serialization.rst @@ -120,9 +120,17 @@ you might want to know how to serialize and deserialize them properly, here's th .. note:: - ``ActorPath.toStringWithAddress`` only differs from ``toString`` if the + ``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the address does not already have ``host`` and ``port`` components, i.e. it only inserts address information for local addresses. + + ``toSerializationFormatWithAddress`` also adds the unique id of the actor, which will + change when the actor is stopped and then created again with the same name. + Sending messages to a reference pointing the old actor will not be delivered + to the new actor. If you do not want this behavior, e.g. in case of long term + storage of the reference, you can use ``toStringWithAddress``, which does not + include the unique id. + This assumes that serialization happens in the context of sending a message through the remote transport. There are other uses of serialization, though, diff --git a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst index f414095a79..fef4462db8 100644 --- a/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst +++ b/akka-docs/rst/project/migration-guide-2.1.x-2.2.x.rst @@ -152,4 +152,44 @@ available via the ``inbound`` boolean field of the event. New configuration settings are also available, see the remoting documentation for more detail: :ref:`remoting-scala` +ActorRef equality and sending to remote actors +============================================== +Sending messages to an ``ActorRef`` must have the same semantics no matter if the target actor is located +on a remote host or in the same ``ActorSystem`` in the same JVM. This was not always the case. For example +when the target actor is terminated and created again under the same path. Sending to local references +of the previous incarnation of the actor will not be delivered to the new incarnation, but that was the case +for remote references. The reason was that the target actor was looked up by its path on every message +delivery and the path didn't distinguish between the two incarnations of the actor. This has been fixed, and +sending messages to remote references that points to a terminated actor will not be delivered to a new +actor with the same path. + +Equality of ``ActorRef`` has been changed to match the intention that an ``ActorRef`` corresponds to the target +actor instance. Two actor references are compared equal when they have the same path and point to the same +actor incarnation. A reference pointing to a terminated actor does not compare equal to a reference pointing +to another (re-created) actor with the same path. Note that a restart of an actor caused by a failure still +means that it's the same actor incarnation, i.e. a restart is not visible for the consumer of the ``ActorRef``. + +Equality in 2.1 was only based on the path of the ``ActorRef``. If you need to keep track of actor references +in a collection and do not care about the exact actor incarnation you can use the ``ActorPath`` as key, because +the identifier of the target actor is not taken into account when comparing actor paths. + +Remote actor references acquired with ``actorFor`` do not include the full information about the underlying actor +identity and therefore such references do not compare equal to references acquired with ``actorOf``, +``sender``, or ``context.self``. Because of this ``actorFor`` is deprecated, as explained in +:ref:`migration_2.2_actorSelection`. + +Note that when a parent actor is restarted its children are by default stopped and re-created, i.e. the child +after the restart will be a different incarnation than the child before the restart. This has always been the +case, but in some situations you might not have noticed, e.g. when comparing such actor references or sending +messages to remote deployed children of a restarted parent. + +This may also have implications if you compare the ``ActorRef`` received in a ``Terminated`` message +with an expected ``ActorRef``. + +.. _migration_2.2_actorSelection: + +Use ``actorSelection`` instead of ``actorFor`` +============================================== + +FIXME: ticket #3074 \ No newline at end of file diff --git a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala index 33d7ea09b7..0dacdeff3b 100644 --- a/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/serialization/SerializationDocSpec.scala @@ -171,8 +171,8 @@ package docs.serialization { // so either you need to supply your own, // or simply use the local path. val identifier: String = Serialization.currentTransportAddress.value match { - case null ⇒ theActorRef.path.toString - case address ⇒ theActorRef.path.toStringWithAddress(address) + case null ⇒ theActorRef.path.toSerializationFormat + case address ⇒ theActorRef.path.toSerializationFormatWithAddress(address) } // Then just serialize the identifier however you like @@ -192,7 +192,7 @@ package docs.serialization { } def serializeTo(ref: ActorRef, remote: Address): String = - ref.path.toStringWithAddress(ExternalAddress(theActorSystem).addressFor(remote)) + ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).addressFor(remote)) //#external-address } @@ -207,7 +207,7 @@ package docs.serialization { } def serializeAkkaDefault(ref: ActorRef): String = - ref.path.toStringWithAddress(ExternalAddress(theActorSystem).addressForAkka) + ref.path.toSerializationFormatWithAddress(ExternalAddress(theActorSystem).addressForAkka) //#external-address-default } } diff --git a/akka-docs/rst/scala/serialization.rst b/akka-docs/rst/scala/serialization.rst index 70a02faecd..1e59226b7d 100644 --- a/akka-docs/rst/scala/serialization.rst +++ b/akka-docs/rst/scala/serialization.rst @@ -109,9 +109,16 @@ you might want to know how to serialize and deserialize them properly, here's th .. note:: - ``ActorPath.toStringWithAddress`` only differs from ``toString`` if the + ``ActorPath.toSerializationFormatWithAddress`` differs from ``toString`` if the address does not already have ``host`` and ``port`` components, i.e. it only - inserts address information for local addresses. + inserts address information for local addresses. + + ``toSerializationFormatWithAddress`` also adds the unique id of the actor, which will + change when the actor is stopped and then created again with the same name. + Sending messages to a reference pointing the old actor will not be delivered + to the new actor. If you don't want this behavior, e.g. in case of long term + storage of the reference, you can use ``toStringWithAddress``, which doesn't + include the unique id. This assumes that serialization happens in the context of sending a message through the remote transport. There are other uses of serialization, though, diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 2066aacc58..c23e3c66b2 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -241,7 +241,8 @@ private[akka] class RemoteActorRefProvider( } else { try { val localAddress = transport.localAddressForRemote(addr) - val rpath = RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements + val rpath = (RootActorPath(addr) / "remote" / localAddress.protocol / localAddress.hostPort / path.elements). + withUid(path.uid) new RemoteActorRef(transport, localAddress, rpath, supervisor, Some(props), Some(d)) } catch { case NonFatal(e) ⇒ @@ -280,15 +281,19 @@ private[akka] class RemoteActorRefProvider( * Called in deserialization of incoming remote messages. In this case the correct local address is known, therefore * this method is faster than the actorFor above. */ - def actorForWithLocalAddress(ref: InternalActorRef, path: String, localAddress: Address): InternalActorRef = path match { - case ActorPathExtractor(address, elems) ⇒ - if (hasAddress(address)) actorFor(rootGuardian, elems) - else new RemoteActorRef(transport, localAddress, - new RootActorPath(address) / elems, Nobody, props = None, deploy = None) - case _ ⇒ local.actorFor(ref, path) + def actorForWithLocalAddress(ref: InternalActorRef, path: String, localAddress: Address): InternalActorRef = { + path match { + case ActorPathExtractor(address, elems) ⇒ + if (hasAddress(address)) actorFor(rootGuardian, elems) + else new RemoteActorRef(transport, localAddress, + new RootActorPath(address) / elems, Nobody, props = None, deploy = None) + case _ ⇒ + local.actorFor(ref, path) + } } - def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = local.actorFor(ref, path) + def actorFor(ref: InternalActorRef, path: Iterable[String]): InternalActorRef = + local.actorFor(ref, path) /** * Using (checking out) actor on a specific node. @@ -297,7 +302,7 @@ private[akka] class RemoteActorRefProvider( log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path) // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor - actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(props, deploy, path.toString, supervisor) + actorFor(RootActorPath(path.address) / "remote") ! DaemonMsgCreate(props, deploy, path.toSerializationFormat, supervisor) } def getExternalAddressFor(addr: Address): Option[Address] = { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 45160e718f..6f8b994d54 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -12,6 +12,7 @@ import akka.dispatch.Watch import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope import akka.util.Switch +import akka.actor.RootActorPath /** * INTERNAL API @@ -54,11 +55,14 @@ private[akka] class RemoteSystemDaemon( @tailrec def rec(s: String, n: Int): (InternalActorRef, Int) = { - getChild(s) match { + import akka.actor.ActorCell._ + val (childName, uid) = splitNameAndUid(s) + getChild(childName) match { case null ⇒ val last = s.lastIndexOf('/') if (last == -1) (Nobody, n) else rec(s.substring(0, last), n + 1) + case ref if uid != undefinedUid && uid != ref.path.uid ⇒ (Nobody, n) case ref ⇒ (ref, n) } } @@ -82,15 +86,21 @@ private[akka] class RemoteSystemDaemon( // TODO RK currently the extracted “address” is just ignored, is that okay? // TODO RK canonicalize path so as not to duplicate it always #1446 val subpath = elems.drop(1) - val path = this.path / subpath + val p = this.path / subpath + val childName = { + val s = subpath.mkString("/") + val i = s.indexOf('#') + if (i < 0) s + else s.substring(0, i) + } val isTerminating = !terminating.whileOff { val actor = system.provider.actorOf(system, props, supervisor.asInstanceOf[InternalActorRef], - path, systemService = false, Some(deploy), lookupDeploy = true, async = false) - addChild(subpath.mkString("/"), actor) + p, systemService = false, Some(deploy), lookupDeploy = true, async = false) + addChild(childName, actor) actor.sendSystemMessage(Watch(actor, this)) actor.start() } - if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, path.address) + if (isTerminating) log.error("Skipping [{}] to RemoteSystemDaemon on [{}] while terminating", message, p.address) case _ ⇒ log.debug("remote path does not match path from message [{}]", message) } diff --git a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala index 42f2978db6..056439c23e 100644 --- a/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/serialization/ProtobufSerializer.scala @@ -19,8 +19,8 @@ object ProtobufSerializer { */ def serializeActorRef(ref: ActorRef): ActorRefProtocol = { val identifier: String = Serialization.currentTransportAddress.value match { - case null ⇒ ref.path.toString - case address ⇒ ref.path.toStringWithAddress(address) + case null ⇒ ref.path.toSerializationFormat + case address ⇒ ref.path.toSerializationFormatWithAddress(address) } ActorRefProtocol.newBuilder.setPath(identifier).build } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala index a64d011404..58bcea77d8 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaPduCodec.scala @@ -182,7 +182,7 @@ private[remote] object AkkaPduProtobufCodec extends AkkaPduCodec { private def serializeActorRef(defaultAddress: Address, ref: ActorRef): ActorRefProtocol = { ActorRefProtocol.newBuilder.setPath( - if (ref.path.address.host.isDefined) ref.path.toString else ref.path.toStringWithAddress(defaultAddress)).build() + if (ref.path.address.host.isDefined) ref.path.toSerializationFormat else ref.path.toSerializationFormatWithAddress(defaultAddress)).build() } private def serializeAddress(address: Address): Option[AddressProtocol] = { diff --git a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala index 013330f2b2..98b4cd455c 100644 --- a/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemotingSpec.scala @@ -119,14 +119,14 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D test.local-address = "test://remote-sys@localhost:12346" } """).withFallback(system.settings.config).resolve() - val other = ActorSystem("remote-sys", conf) + val otherSystem = ActorSystem("remote-sys", conf) for ( (name, proto) ← Seq( "/gonk" -> "tcp", "/zagzag" -> "udp", "/roghtaar" -> "ssl.tcp") - ) deploy(system, Deploy(name, scope = RemoteScope(addr(other, proto)))) + ) deploy(system, Deploy(name, scope = RemoteScope(addr(otherSystem, proto)))) def addr(sys: ActorSystem, proto: String) = sys.asInstanceOf[ExtendedActorSystem].provider.getExternalAddressFor(Address(s"akka.$proto", "", "", 0)).get @@ -135,12 +135,12 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D sys.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].deployer.deploy(d) } - val remote = other.actorOf(Props[Echo2], "echo") + val remote = otherSystem.actorOf(Props[Echo2], "echo") val here = system.actorFor("akka.test://remote-sys@localhost:12346/user/echo") override def afterTermination() { - other.shutdown() + otherSystem.shutdown() AssociationRegistry.clear() } @@ -168,16 +168,16 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "send dead letters on remote if actor does not exist" in { EventFilter.warning(pattern = "dead.*buh", occurrences = 1).intercept { system.actorFor("akka.test://remote-sys@localhost:12346/does/not/exist") ! "buh" - }(other) + }(otherSystem) } "not be exhausted by sending to broken connections" in { val tcpOnlyConfig = ConfigFactory.parseString("""akka.remote.enabled-transports = ["akka.remote.netty.tcp"]"""). - withFallback(other.settings.config) - val moreSystems = Vector.fill(5)(ActorSystem(other.name, tcpOnlyConfig)) + withFallback(otherSystem.settings.config) + val moreSystems = Vector.fill(5)(ActorSystem(otherSystem.name, tcpOnlyConfig)) moreSystems foreach (_.actorOf(Props[Echo2], name = "echo")) val moreRefs = moreSystems map (sys ⇒ system.actorFor(RootActorPath(addr(sys, "tcp")) / "user" / "echo")) - val aliveEcho = system.actorFor(RootActorPath(addr(other, "tcp")) / "user" / "echo") + val aliveEcho = system.actorFor(RootActorPath(addr(otherSystem, "tcp")) / "user" / "echo") val n = 100 // first everything is up and running @@ -223,6 +223,30 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D expectMsg("postStop") } + "not send to remote re-created actor with same name" in { + val echo = otherSystem.actorOf(Props[Echo1], "otherEcho1") + echo ! 71 + expectMsg(71) + echo ! PoisonPill + expectMsg("postStop") + echo ! 72 + expectNoMsg(1.second) + + val echo2 = otherSystem.actorOf(Props[Echo1], "otherEcho1") + echo2 ! 73 + expectMsg(73) + // msg to old ActorRef (different uid) should not get through + echo2.path.uid must not be (echo.path.uid) + echo ! 74 + expectNoMsg(1.second) + + otherSystem.actorFor("/user/otherEcho1") ! 75 + expectMsg(75) + + system.actorFor("akka.test://remote-sys@localhost:12346/user/otherEcho1") ! 76 + expectMsg(76) + } + "look-up actors across node boundaries" in { val l = system.actorOf(Props(new Actor { def receive = { @@ -230,20 +254,41 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D case s: String ⇒ sender ! context.actorFor(s) } }), "looker") + // child is configured to be deployed on remote-sys (otherSystem) l ! (Props[Echo1], "child") - val r = expectMsgType[ActorRef] - r ! (Props[Echo1], "grandchild") - val remref = expectMsgType[ActorRef] - remref.asInstanceOf[ActorRefScope].isLocal must be(true) + val child = expectMsgType[ActorRef] + // grandchild is configured to be deployed on RemotingSpec (system) + child ! (Props[Echo1], "grandchild") + val grandchild = expectMsgType[ActorRef] + grandchild.asInstanceOf[ActorRefScope].isLocal must be(true) + grandchild ! 43 + expectMsg(43) val myref = system.actorFor(system / "looker" / "child" / "grandchild") myref.isInstanceOf[RemoteActorRef] must be(true) - myref ! 43 - expectMsg(43) - lastSender must be theSameInstanceAs remref - r.asInstanceOf[RemoteActorRef].getParent must be(l) - system.actorFor("/user/looker/child") must be theSameInstanceAs r + myref ! 44 + expectMsg(44) + lastSender must be(grandchild) + lastSender must be theSameInstanceAs grandchild + child.asInstanceOf[RemoteActorRef].getParent must be(l) + system.actorFor("/user/looker/child") must be theSameInstanceAs child Await.result(l ? "child/..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l Await.result(system.actorFor(system / "looker" / "child") ? "..", timeout.duration).asInstanceOf[AnyRef] must be theSameInstanceAs l + + watch(child) + child ! PoisonPill + expectMsg("postStop") + expectMsgType[Terminated].actor must be === child + l ! (Props[Echo1], "child") + val child2 = expectMsgType[ActorRef] + child2 ! 45 + expectMsg(45) + // msg to old ActorRef (different uid) should not get through + child2.path.uid must not be (child.path.uid) + child ! 46 + expectNoMsg(1.second) + system.actorFor(system / "looker" / "child") ! 47 + expectMsg(47) + } "not fail ask across node boundaries" in { @@ -255,7 +300,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (TCP)" in { val r = system.actorOf(Props[Echo1], "gonk") r.path.toString must be === - s"akka.tcp://remote-sys@localhost:${port(other, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" + s"akka.tcp://remote-sys@localhost:${port(otherSystem, "tcp")}/remote/akka.tcp/RemotingSpec@localhost:${port(system, "tcp")}/user/gonk" r ! 42 expectMsg(42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -271,7 +316,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (UDP)" in { val r = system.actorOf(Props[Echo1], "zagzag") r.path.toString must be === - s"akka.udp://remote-sys@localhost:${port(other, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" + s"akka.udp://remote-sys@localhost:${port(otherSystem, "udp")}/remote/akka.udp/RemotingSpec@localhost:${port(system, "udp")}/user/zagzag" r ! 42 expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -287,7 +332,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "be able to use multiple transports and use the appropriate one (SSL)" in { val r = system.actorOf(Props[Echo1], "roghtaar") r.path.toString must be === - s"akka.ssl.tcp://remote-sys@localhost:${port(other, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" + s"akka.ssl.tcp://remote-sys@localhost:${port(otherSystem, "ssl.tcp")}/remote/akka.ssl.tcp/RemotingSpec@localhost:${port(system, "ssl.tcp")}/user/roghtaar" r ! 42 expectMsg(10.seconds, 42) EventFilter[Exception]("crash", occurrences = 1).intercept { @@ -305,7 +350,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D override def beforeTermination() { system.eventStream.publish(TestEvent.Mute( EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate)"))) - other.eventStream.publish(TestEvent.Mute( + otherSystem.eventStream.publish(TestEvent.Mute( EventFilter[EndpointException](), EventFilter.error(start = "AssociationError"), EventFilter.warning(pattern = "received dead letter.*(InboundPayload|Disassociate|HandleListener)")))