diff --git a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala index 54d30d4b65..5fa63fc0f8 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/DeathWatchSpec.scala @@ -5,11 +5,11 @@ package akka.actor import language.postfixOps +import akka.dispatch.sysmsg.Failed +import akka.pattern.ask import akka.testkit._ import scala.concurrent.duration._ -import java.util.concurrent.atomic._ import scala.concurrent.Await -import akka.pattern.ask @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec @@ -129,7 +129,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout case class FF(fail: Failed) val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) { override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = { - testActor.tell(FF(Failed(cause, 0)), child) + testActor.tell(FF(Failed(child, cause, 0)), child) super.handleFailure(context, child, cause, stats, children) } } @@ -145,9 +145,9 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout failed ! Kill val result = receiveWhile(3 seconds, messages = 3) { - case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed ⇒ 1 - case FF(Failed(DeathPactException(`failed`), _)) if lastSender eq brother ⇒ 2 - case WrappedTerminated(Terminated(`brother`)) ⇒ 3 + case FF(Failed(_, _: ActorKilledException, _)) if lastSender eq failed ⇒ 1 + case FF(Failed(_, DeathPactException(`failed`), _)) if lastSender eq brother ⇒ 2 + case WrappedTerminated(Terminated(`brother`)) ⇒ 3 } testActor.isTerminated must not be true result must be(Seq(1, 2, 3)) diff --git a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala index 8e91abb570..b94936ce84 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SupervisorHierarchySpec.scala @@ -242,6 +242,8 @@ object SupervisorHierarchySpec { override def postStop { if (failed || suspended) { listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log) + val state = stateCache.get(self) + stateCache.put(self.path, state.copy(log = log)) } else { stateCache.put(self.path, HierarchyState(log, Map(), null)) } @@ -249,7 +251,7 @@ object SupervisorHierarchySpec { def check(msg: Any): Boolean = { suspended = false - log :+= Event(msg, identityHashCode(this)) + log :+= Event(msg, identityHashCode(Hierarchy.this)) if (failed) { abort("processing message while failed") failed = false @@ -287,13 +289,15 @@ object SupervisorHierarchySpec { val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy") context.watch(context.actorOf(props, name)) } else { - log :+= Event(sender + " terminated while pongOfDeath", identityHashCode(this)) + // WARNING: The Terminated that is logged by this is logged by check() above, too. It is not + // an indication of duplicate Terminate messages + log :+= Event(sender + " terminated while pongOfDeath", identityHashCode(Hierarchy.this)) } case Abort ⇒ abort("terminating") case PingOfDeath ⇒ if (size > 1) { pongsToGo = context.children.size - log :+= Event("sending " + pongsToGo + " pingOfDeath", identityHashCode(this)) + log :+= Event("sending " + pongsToGo + " pingOfDeath", identityHashCode(Hierarchy.this)) context.children foreach (_ ! PingOfDeath) } else { context stop self diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index d2cc572a69..fcaf47d49a 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -16,6 +16,7 @@ import org.scalatest.junit.JUnitRunner import com.typesafe.config.Config import akka.actor._ +import akka.dispatch.sysmsg._ import akka.dispatch._ import akka.event.Logging.Error import akka.pattern.ask @@ -390,7 +391,8 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ⇒ ll.self.path compareTo rr.self.path } } foreach { case cell: ActorCell ⇒ - System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain(null))) + System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + + cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size) } System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/sysmsg/SystemMessageListSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/sysmsg/SystemMessageListSpec.scala new file mode 100644 index 0000000000..a7059cb748 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/dispatch/sysmsg/SystemMessageListSpec.scala @@ -0,0 +1,116 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.dispatch.sysmsg + +import akka.testkit.AkkaSpec + +class SystemMessageListSpec extends AkkaSpec { + import SystemMessageList.LNil + import SystemMessageList.ENil + + "The SystemMessageList value class" must { + + "handle empty lists correctly" in { + LNil.head must be === null + LNil.isEmpty must be(true) + (LNil.reverse == ENil) must be(true) + } + + "able to append messages" in { + val create0 = Create(0) + val create1 = Create(1) + val create2 = Create(2) + ((create0 :: LNil).head eq create0) must be(true) + ((create1 :: create0 :: LNil).head eq create1) must be(true) + ((create2 :: create1 :: create0 :: LNil).head eq create2) must be(true) + + (create2.next eq create1) must be(true) + (create1.next eq create0) must be(true) + (create0.next eq null) must be(true) + } + + "able to deconstruct head and tail" in { + val create0 = Create(0) + val create1 = Create(1) + val create2 = Create(2) + val list = create2 :: create1 :: create0 :: LNil + + (list.head eq create2) must be(true) + (list.tail.head eq create1) must be(true) + (list.tail.tail.head eq create0) must be(true) + (list.tail.tail.tail.head eq null) must be(true) + } + + "properly report size and emptyness" in { + val create0 = Create(0) + val create1 = Create(1) + val create2 = Create(2) + val list = create2 :: create1 :: create0 :: LNil + + list.size must be === 3 + list.isEmpty must be(false) + + list.tail.size must be === 2 + list.tail.isEmpty must be(false) + + list.tail.tail.size must be === 1 + list.tail.tail.isEmpty must be(false) + + list.tail.tail.tail.size must be === 0 + list.tail.tail.tail.isEmpty must be(true) + + } + + "properly reverse contents" in { + val create0 = Create(0) + val create1 = Create(1) + val create2 = Create(2) + val list = create2 :: create1 :: create0 :: LNil + val listRev: EarliestFirstSystemMessageList = list.reverse + + listRev.isEmpty must be(false) + listRev.size must be === 3 + + (listRev.head eq create0) must be(true) + (listRev.tail.head eq create1) must be(true) + (listRev.tail.tail.head eq create2) must be(true) + (listRev.tail.tail.tail.head eq null) must be(true) + + (create0.next eq create1) must be(true) + (create1.next eq create2) must be(true) + (create2.next eq null) must be(true) + } + + } + + "EarliestFirstSystemMessageList" must { + + "properly prepend reversed message lists to the front" in { + val create0 = Create(0) + val create1 = Create(1) + val create2 = Create(2) + val create3 = Create(3) + val create4 = Create(4) + val create5 = Create(5) + + val fwdList = create3 :: create4 :: create5 :: ENil + val revList = create2 :: create1 :: create0 :: LNil + + val list = revList reverse_::: fwdList + + (list.head eq create0) must be(true) + (list.tail.head eq create1) must be(true) + (list.tail.tail.head eq create2) must be(true) + (list.tail.tail.tail.head eq create3) must be(true) + (list.tail.tail.tail.tail.head eq create4) must be(true) + (list.tail.tail.tail.tail.tail.head eq create5) must be(true) + (list.tail.tail.tail.tail.tail.tail.head eq null) must be(true) + + (LNil reverse_::: ENil) == ENil must be(true) + ((create0 :: LNil reverse_::: ENil).head eq create0) must be(true) + ((LNil reverse_::: create0 :: ENil).head eq create0) must be(true) + } + + } +} diff --git a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala index 14076463f5..26c551e8be 100644 --- a/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/serialization/SerializeSpec.scala @@ -8,7 +8,7 @@ import language.postfixOps import akka.testkit.{ AkkaSpec, EventFilter } import akka.actor._ -import akka.dispatch._ +import akka.dispatch.sysmsg._ import java.io._ import scala.concurrent.Await import akka.util.Timeout @@ -109,7 +109,7 @@ object SerializationTests { } serialization-bindings { - "akka.dispatch.SystemMessage" = test + "akka.dispatch.sysmsg.SystemMessage" = test } } } @@ -125,6 +125,7 @@ object SerializationTests { classOf[ChildTerminated], classOf[Watch], classOf[Unwatch], + classOf[Failed], NoMessage.getClass) } @@ -335,31 +336,35 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR verify(Create(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465000000000000000302000078707671007e0003") } "be preserved for the Recreate SystemMessage" in { - verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720016616b6b612e64697370617463682e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003") + verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e7379736d73672e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003") } "be preserved for the Suspend SystemMessage" in { - verify(Suspend(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e53757370656e6464e531d5d134b59902000078707671007e0003") + verify(Suspend(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001c616b6b612e64697370617463682e7379736d73672e53757370656e6464e531d5d134b59902000078707671007e0003") } "be preserved for the Resume SystemMessage" in { - verify(Resume(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e526573756d65dc5e646d445fcb010200014c000f63617573656442794661696c7572657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003") + verify(Resume(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e526573756d65dc5e646d445fcb010200014c000f63617573656442794661696c7572657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003") } "be preserved for the Terminate SystemMessage" in { - verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5465726d696e61746509d66ca68318700f02000078707671007e0003") + verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001e616b6b612e64697370617463682e7379736d73672e5465726d696e61746509d66ca68318700f02000078707671007e0003") } "be preserved for the Supervise SystemMessage" in { verify(Supervise(FakeActorRef("child"), true), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e53757065727669736500000000000000030200025a00056173796e634c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b7870017372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") } "be preserved for the ChildTerminated SystemMessage" in { - verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") + verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720024616b6b612e64697370617463682e7379736d73672e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") } "be preserved for the Watch SystemMessage" in { - verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720013616b6b612e64697370617463682e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") + verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001a616b6b612e64697370617463682e7379736d73672e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") } "be preserved for the Unwatch SystemMessage" in { - verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") + verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001c616b6b612e64697370617463682e7379736d73672e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003") } "be preserved for the NoMessage SystemMessage" in { - verify(NoMessage, "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720018616b6b612e64697370617463682e4e6f4d65737361676524b401a3610ccb70dd02000078707671007e0003") + verify(NoMessage, "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001f616b6b612e64697370617463682e7379736d73672e4e6f4d65737361676524b401a3610ccb70dd02000078707671007e0003") + } + "be preserved for the Failed SystemMessage" in { + // Using null as the cause to avoid a large serialized message + verify(Failed(FakeActorRef("child"), cause = null, uid = 0), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e4661696c656400000000000000030200034900037569644c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787000000000707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003") } } } diff --git a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java index c0c38887a3..0c1765cde9 100644 --- a/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java +++ b/akka-actor/src/main/java/akka/util/internal/HashedWheelTimer.java @@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock; -import akka.dispatch.SystemMessage; +import akka.dispatch.sysmsg.SystemMessage; import akka.util.Helpers; import scala.concurrent.duration.Duration; import scala.concurrent.duration.FiniteDuration; diff --git a/akka-actor/src/main/scala/akka/actor/Actor.scala b/akka-actor/src/main/scala/akka/actor/Actor.scala index acd93b21f9..2659c10a19 100644 --- a/akka-actor/src/main/scala/akka/actor/Actor.scala +++ b/akka-actor/src/main/scala/akka/actor/Actor.scala @@ -28,12 +28,6 @@ trait PossiblyHarmful */ trait NoSerializationVerificationNeeded -/** - * INTERNAL API - */ -@SerialVersionUID(2L) -private[akka] case class Failed(cause: Throwable, uid: Int) extends AutoReceivedMessage with PossiblyHarmful - abstract class PoisonPill extends AutoReceivedMessage with PossiblyHarmful /** diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 6b1d42a529..649db7cd02 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -4,9 +4,17 @@ package akka.actor +import akka.actor.dungeon.ChildrenContainer +import akka.dispatch.Envelope +import akka.dispatch.NullMessage +import akka.dispatch.sysmsg._ +import akka.event.Logging.Debug +import akka.event.Logging.{ LogEvent, Error } +import akka.japi.Procedure import java.io.{ ObjectOutputStream, NotSerializableException } -import scala.annotation.tailrec +import scala.annotation.{ switch, tailrec } import scala.collection.immutable +import scala.concurrent.ExecutionContext import scala.concurrent.duration.Duration import scala.util.control.NonFatal import akka.actor.dungeon.ChildrenContainer @@ -16,7 +24,6 @@ import akka.event.Logging.{ LogEvent, Debug, Error } import akka.japi.Procedure import akka.dispatch.NullMessage import scala.concurrent.ExecutionContext -import scala.concurrent.forkjoin.ThreadLocalRandom /** * The actor context - the view of the actor cell from the actor. @@ -325,6 +332,9 @@ private[akka] object ActorCell { else (name.substring(0, i), Integer.valueOf(name.substring(i + 1))) } + final val DefaultState = 0 + final val SuspendedState = 1 + final val SuspendedWaitForChildrenState = 2 } //ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit) @@ -362,12 +372,24 @@ private[akka] class ActorCell( protected def actor_=(a: Actor): Unit = _actor = a var currentMessage: Envelope = _ private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack + private[this] var sysmsgStash: LatestFirstSystemMessageList = SystemMessageList.LNil + + protected def stash(msg: SystemMessage): Unit = { + assert(msg.unlinked) + sysmsgStash ::= msg + } + + private def unstashAll(): LatestFirstSystemMessageList = { + val unstashed = sysmsgStash + sysmsgStash = SystemMessageList.LNil + unstashed + } /* * MESSAGE PROCESSING */ //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status - @tailrec final def systemInvoke(message: SystemMessage): Unit = { + final def systemInvoke(message: SystemMessage): Unit = { /* * When recreate/suspend/resume are received while restarting (i.e. between * preRestart and postRestart, waiting for children to terminate), these @@ -377,36 +399,61 @@ private[akka] class ActorCell( * types (hence the overwrite further down). Mailbox sets message.next=null * before systemInvoke, so this will only be non-null during such a replay. */ - var todo = message.next - try { - message match { - case Create() ⇒ create() - case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) - case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) - case Recreate(cause) ⇒ - waitingForChildrenOrNull match { - case null ⇒ faultRecreate(cause) - case w: WaitingForChildren ⇒ w.enqueue(message) - } - case Suspend() ⇒ - waitingForChildrenOrNull match { - case null ⇒ faultSuspend() - case w: WaitingForChildren ⇒ w.enqueue(message) - } - case Resume(inRespToFailure) ⇒ - waitingForChildrenOrNull match { - case null ⇒ faultResume(inRespToFailure) - case w: WaitingForChildren ⇒ w.enqueue(message) - } - case Terminate() ⇒ terminate() - case Supervise(child, async) ⇒ supervise(child, async) - case ChildTerminated(child) ⇒ todo = handleChildTerminated(child) - case NoMessage ⇒ // only here to suppress warning + + def calculateState: Int = + if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState + else if (mailbox.isSuspended) SuspendedState + else DefaultState + + @tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit = + if (messages.nonEmpty) { + val tail = messages.tail + val msg = messages.head + msg.unlink() + provider.deadLetters ! msg + sendAllToDeadLetters(tail) } - } catch handleNonFatalOrInterruptedException { e ⇒ - handleInvokeFailure(Nil, e) + + def shouldStash(m: SystemMessage, state: Int): Boolean = + (state: @switch) match { + case DefaultState ⇒ false + case SuspendedState ⇒ m.isInstanceOf[StashWhenFailed] + case SuspendedWaitForChildrenState ⇒ m.isInstanceOf[StashWhenWaitingForChildren] + } + + @tailrec + def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = { + val rest = messages.tail + val message = messages.head + message.unlink() + try { + message match { + case message: SystemMessage if shouldStash(message, currentState) ⇒ stash(message) + case f: Failed ⇒ handleFailure(f) + case Create() ⇒ create(uid) + case Watch(watchee, watcher) ⇒ addWatcher(watchee, watcher) + case Unwatch(watchee, watcher) ⇒ remWatcher(watchee, watcher) + case Recreate(cause) ⇒ faultRecreate(cause) + case Suspend() ⇒ faultSuspend() + case Resume(inRespToFailure) ⇒ faultResume(inRespToFailure) + case Terminate() ⇒ terminate() + case Supervise(child, async) ⇒ supervise(child, async, uid) + case ChildTerminated(child) ⇒ handleChildTerminated(child) + case NoMessage ⇒ // only here to suppress warning + } + } catch handleNonFatalOrInterruptedException { e ⇒ + handleInvokeFailure(Nil, e) + } + val newState = calculateState + // As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state + // chain + val todo = if (newState < currentState) unstashAll() reverse_::: rest else rest + + if (isTerminated) sendAllToDeadLetters(todo) + else if (todo.nonEmpty) invokeAll(todo, newState) } - if (todo != null) systemInvoke(todo) + + invokeAll(new EarliestFirstSystemMessageList(message), calculateState) } //Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status @@ -430,7 +477,6 @@ private[akka] class ActorCell( publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg)) msg.message match { - case Failed(cause, uid) ⇒ handleFailure(sender, cause, uid) case t: Terminated ⇒ watchedActorTerminated(t) case AddressTerminated(address) ⇒ addressTerminated(address) case Kill ⇒ throw new ActorKilledException("Kill") diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index ff42918ebe..fe0f66f5f5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -5,6 +5,7 @@ package akka.actor import akka.dispatch._ +import akka.dispatch.sysmsg._ import akka.util._ import java.lang.{ UnsupportedOperationException, IllegalStateException } import akka.serialization.{ Serialization, JavaSerializer } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 4c6b13d9b6..4a01b87f95 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -4,7 +4,8 @@ package akka.actor -import akka.dispatch._ +import akka.dispatch.sysmsg._ +import akka.dispatch.NullMessage import akka.routing._ import akka.event._ import akka.util.{ Switch, Helpers } @@ -388,17 +389,17 @@ class LocalActorRefProvider private[akka] ( override def isTerminated: Boolean = stopped.isOn override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match { - case null ⇒ throw new InvalidMessageException("Message is null") - case Failed(ex, _) if sender ne null ⇒ { causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() } - case NullMessage ⇒ // do nothing - case _ ⇒ log.error(this + " received unexpected message [" + message + "]") + case null ⇒ throw new InvalidMessageException("Message is null") + case NullMessage ⇒ // do nothing + case _ ⇒ log.error(this + " received unexpected message [" + message + "]") }) override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff { message match { - case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead - case ChildTerminated(_) ⇒ stop() - case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") + case Failed(child, ex, _) ⇒ { causeOfTermination = Some(ex); child.asInstanceOf[InternalActorRef].stop() } + case Supervise(_, _) ⇒ // TODO register child in some map to keep track of it and enable shutdown after all dead + case ChildTerminated(_) ⇒ stop() + case _ ⇒ log.error(this + " received unexpected system message [" + message + "]") } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 6a39dd513b..3b5408b5f9 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS import com.typesafe.config.{ Config, ConfigFactory } import akka.event._ import akka.dispatch._ +import akka.dispatch.sysmsg.{ SystemMessageList, EarliestFirstSystemMessageList, LatestFirstSystemMessageList, SystemMessage } import akka.japi.Util.immutableSeq import akka.actor.dungeon.ChildrenContainer import akka.util._ @@ -559,7 +560,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config, becomeClosed() def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit = deadLetters ! DeadLetter(handle, receiver, receiver) - def systemDrain(newContents: SystemMessage): SystemMessage = null + def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil def hasSystemMessages = false } diff --git a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala index 8a1a6a6a03..123a576462 100644 --- a/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/RepointableActorRef.scala @@ -15,6 +15,7 @@ import akka.actor.dungeon.ChildrenContainer import akka.event.Logging.Warning import akka.util.Unsafe import akka.dispatch._ +import akka.dispatch.sysmsg._ import util.Try /** diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala b/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala index 1dbd117904..2a60fc5fca 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/ChildrenContainer.scala @@ -7,7 +7,7 @@ package akka.actor.dungeon import scala.collection.immutable import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef } -import akka.dispatch.SystemMessage +import akka.dispatch.sysmsg.{ EarliestFirstSystemMessageList, SystemMessageList, LatestFirstSystemMessageList, SystemMessage } import akka.util.Collections.{ EmptyImmutableSeq, PartialImmutableValuesIterable } /** @@ -62,11 +62,7 @@ private[akka] object ChildrenContainer { override final def valuesIterator = stats.valuesIterator } - trait WaitingForChildren { - private var todo: SystemMessage = null - def enqueue(message: SystemMessage) = { message.next = todo; todo = message } - def dequeueAll(): SystemMessage = { val ret = SystemMessage.reverse(todo); todo = null; ret } - } + trait WaitingForChildren trait EmptyChildrenContainer extends ChildrenContainer { val emptyStats = immutable.TreeMap.empty[String, ChildStats] diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala index cff5665ad3..6dbebb806b 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/DeathWatch.scala @@ -5,7 +5,7 @@ package akka.actor.dungeon import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated } -import akka.dispatch.{ ChildTerminated, Watch, Unwatch } +import akka.dispatch.sysmsg.{ ChildTerminated, Watch, Unwatch } import akka.event.Logging.{ Warning, Error, Debug } import scala.util.control.NonFatal import akka.actor.MinimalActorRef @@ -188,4 +188,4 @@ private[akka] trait DeathWatch { this: ActorCell ⇒ private[akka] class UndefinedUidActorRef(ref: ActorRef) extends MinimalActorRef { override val path = ref.path.withUid(ActorCell.undefinedUid) override def provider = throw new UnsupportedOperationException("UndefinedUidActorRef does not provide") -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala index 075ff49a0b..4235330b7e 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/Dispatch.scala @@ -5,7 +5,8 @@ package akka.actor.dungeon import scala.annotation.tailrec -import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create } +import akka.dispatch.{ MessageDispatcher, Mailbox, Envelope } +import akka.dispatch.sysmsg._ import akka.event.Logging.Error import akka.util.Unsafe import akka.dispatch.NullMessage @@ -53,7 +54,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒ if (sendSupervise) { // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false)) + parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false, uid)) parent ! NullMessage // read ScalaDoc of NullMessage to see why } this diff --git a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala index ac5d1a48e0..5badb8c180 100644 --- a/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala +++ b/akka-actor/src/main/scala/akka/actor/dungeon/FaultHandling.scala @@ -4,20 +4,19 @@ package akka.actor.dungeon -import scala.annotation.tailrec -import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor } -import akka.dispatch._ -import akka.event.Logging.{ Warning, Error, Debug } -import scala.util.control.NonFatal -import akka.event.Logging -import scala.collection.immutable -import akka.dispatch.ChildTerminated -import akka.actor.PreRestartException -import akka.actor.Failed import akka.actor.PostRestartException +import akka.actor.PreRestartException +import akka.actor.{ InternalActorRef, ActorRef, ActorInterruptedException, ActorCell, Actor } +import akka.dispatch._ +import akka.dispatch.sysmsg.ChildTerminated +import akka.dispatch.sysmsg._ +import akka.event.Logging import akka.event.Logging.Debug +import akka.event.Logging.Error +import scala.collection.immutable import scala.concurrent.duration.Duration import scala.util.control.Exception._ +import scala.util.control.NonFatal private[akka] trait FaultHandling { this: ActorCell ⇒ @@ -134,7 +133,10 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ private def finishCreate(): Unit = { try resumeNonRecursive() finally clearFailed() - create() + try create() + catch handleNonFatalOrInterruptedException { e ⇒ + handleInvokeFailure(Nil, e) + } } protected def terminate() { @@ -169,14 +171,18 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ suspendNonRecursive() // suspend children val skip: Set[ActorRef] = currentMessage match { - case Envelope(Failed(_, _), child) ⇒ { setFailed(child); Set(child) } - case _ ⇒ { setFailed(self); Set.empty } + case Envelope(Failed(_, _, _), child) ⇒ setFailed(child); Set(child) + case _ ⇒ setFailed(self); Set.empty } suspendChildren(exceptFor = skip ++ childrenNotToSuspend) t match { // tell supervisor - case _: InterruptedException ⇒ parent.tell(Failed(new ActorInterruptedException(t), uid), self) - case _ ⇒ parent.tell(Failed(t, uid), self) + case _: InterruptedException ⇒ + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid)) + case _ ⇒ + // ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + parent.sendSystemMessage(Failed(self, t, uid)) } } catch handleNonFatalOrInterruptedException { e ⇒ publish(Error(e, self.path.toString, clazz(actor), @@ -237,23 +243,25 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ } } - final protected def handleFailure(child: ActorRef, cause: Throwable, uid: Int): Unit = - getChildByRef(child) match { + final protected def handleFailure(f: Failed): Unit = { + currentMessage = Envelope(f, f.child, system) + getChildByRef(f.child) match { /* * only act upon the failure, if it comes from a currently known child; * the UID protects against reception of a Failed from a child which was * killed in preRestart and re-created in postRestart */ - case Some(stats) if stats.uid == uid ⇒ - if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause + case Some(stats) if stats.uid == f.uid ⇒ + if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause case Some(stats) ⇒ publish(Debug(self.path.toString, clazz(actor), - "dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")")) + "dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")")) case None ⇒ - publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child)) + publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child)) } + } - final protected def handleChildTerminated(child: ActorRef): SystemMessage = { + final protected def handleChildTerminated(child: ActorRef): Unit = { val status = removeChildAndGetStateChange(child) /* * if this fails, we do nothing in case of terminating/restarting state, @@ -272,10 +280,10 @@ private[akka] trait FaultHandling { this: ActorCell ⇒ * then we are continuing the previously suspended recreate/create/terminate action */ status match { - case Some(c @ ChildrenContainer.Recreation(cause)) ⇒ { finishRecreate(cause, actor); c.dequeueAll() } - case Some(c @ ChildrenContainer.Creation()) ⇒ { finishCreate(); c.dequeueAll() } - case Some(ChildrenContainer.Termination) ⇒ { finishTerminate(); null } - case _ ⇒ null + case Some(c @ ChildrenContainer.Recreation(cause)) ⇒ finishRecreate(cause, actor) + case Some(c @ ChildrenContainer.Creation()) ⇒ finishCreate() + case Some(ChildrenContainer.Termination) ⇒ finishTerminate() + case _ ⇒ } } diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index b0e97f2f0c..282a66e796 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -7,6 +7,7 @@ package akka.dispatch import java.util.concurrent._ import akka.event.Logging.{ Error, LogEventException } import akka.actor._ +import akka.dispatch.sysmsg._ import akka.event.EventStream import com.typesafe.config.Config import akka.util.{ Unsafe, Index } @@ -41,95 +42,6 @@ object Envelope { */ case object NullMessage extends AutoReceivedMessage -/** - * INTERNAL API - */ -private[akka] object SystemMessage { - @tailrec - final def size(list: SystemMessage, acc: Int = 0): Int = { - if (list eq null) acc else size(list.next, acc + 1) - } - - @tailrec - final def reverse(list: SystemMessage, acc: SystemMessage = null): SystemMessage = { - if (list eq null) acc else { - val next = list.next - list.next = acc - reverse(next, list) - } - } -} - -/** - * System messages are handled specially: they form their own queue within - * each actor’s mailbox. This queue is encoded in the messages themselves to - * avoid extra allocations and overhead. The next pointer is a normal var, and - * it does not need to be volatile because in the enqueuing method its update - * is immediately succeeded by a volatile write and all reads happen after the - * volatile read in the dequeuing thread. Afterwards, the obtained list of - * system messages is handled in a single thread only and not ever passed around, - * hence no further synchronization is needed. - * - * INTERNAL API - * - * ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ - */ -private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable { - @transient - var next: SystemMessage = _ -} - -/** - * INTERNAL API - */ -@SerialVersionUID(3L) -private[akka] case class Create() extends SystemMessage // send to self from Dispatcher.register -/** - * INTERNAL API - */ -@SerialVersionUID(686735569005808256L) -private[akka] case class Recreate(cause: Throwable) extends SystemMessage // sent to self from ActorCell.restart -/** - * INTERNAL API - */ -@SerialVersionUID(7270271967867221401L) -private[akka] case class Suspend() extends SystemMessage // sent to self from ActorCell.suspend -/** - * INTERNAL API - */ -@SerialVersionUID(-2567504317093262591L) -private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume -/** - * INTERNAL API - */ -@SerialVersionUID(708873453777219599L) -private[akka] case class Terminate() extends SystemMessage // sent to self from ActorCell.stop -/** - * INTERNAL API - */ -@SerialVersionUID(3L) -private[akka] case class Supervise(child: ActorRef, async: Boolean) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start -/** - * INTERNAL API - */ -@SerialVersionUID(5513569382760799668L) -private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate -/** - * INTERNAL API - */ -@SerialVersionUID(3323205435124174788L) -private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch -/** - * INTERNAL API - */ -@SerialVersionUID(6363620903363658256L) -private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch -/** - * INTERNAL API - */ -@SerialVersionUID(-5475916034683997987L) -private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination - final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () ⇒ Unit) extends Batchable { final override def isBatchable: Boolean = runnable match { case b: Batchable ⇒ b.isBatchable diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index 841a359b87..3942762e09 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -5,6 +5,7 @@ package akka.dispatch import akka.actor.{ ActorCell, ActorRef } +import akka.dispatch.sysmsg._ import scala.annotation.tailrec import scala.concurrent.duration.Duration import akka.util.Helpers @@ -56,13 +57,13 @@ class BalancingDispatcher( override def cleanUp(): Unit = { val dlq = system.deadLetterMailbox //Don't call the original implementation of this since it scraps all messages, and we don't want to do that - var message = systemDrain(NoMessage) - while (message ne null) { + var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage)) + while (messages.nonEmpty) { // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null + val message = messages.head + messages = messages.tail + message.unlink() dlq.systemEnqueue(system.deadLetters, message) - message = next } } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index cc101e6311..74c765f401 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -7,6 +7,7 @@ package akka.dispatch import akka.event.Logging.Error import akka.actor.ActorCell import akka.event.Logging +import akka.dispatch.sysmsg.SystemMessage import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ ExecutorService, RejectedExecutionException } import scala.concurrent.forkjoin.ForkJoinPool diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index e055c4b327..2d4efe6464 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -6,6 +6,7 @@ package akka.dispatch import java.util.{ Comparator, PriorityQueue, Queue, Deque } import java.util.concurrent._ import akka.AkkaException +import akka.dispatch.sysmsg._ import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter } import akka.util.{ Unsafe, BoundedBlockingQueue } import akka.event.Logging.Error @@ -14,6 +15,9 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import com.typesafe.config.Config import scala.concurrent.duration.FiniteDuration +import akka.actor.DeadLetter +import akka.dispatch.BoundedMailbox +import akka.dispatch.BoundedDequeBasedMailbox /** * INTERNAL API @@ -196,11 +200,16 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) /* * AtomicReferenceFieldUpdater for system queue. */ - protected final def systemQueueGet: SystemMessage = - Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage] + protected final def systemQueueGet: LatestFirstSystemMessageList = + // Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such + // it just exists as a typed view during compile-time. The actual return type is still SystemMessage. + new LatestFirstSystemMessageList(Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]) - protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean = - Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new) + protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: LatestFirstSystemMessageList): Boolean = + // Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new + // are SystemMessage instances hidden during compile time behind the SystemMessageList value class. + // Without calling .head the parameters would be boxed in SystemMessageList wrapper. + Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head) final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match { case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages @@ -248,28 +257,28 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) */ final def processAllSystemMessages() { var interruption: Throwable = null - var nextMessage = systemDrain(null) - while ((nextMessage ne null) && !isClosed) { - val msg = nextMessage - nextMessage = nextMessage.next - msg.next = null + var messageList = systemDrain(SystemMessageList.LNil) + while ((messageList.nonEmpty) && !isClosed) { + val msg = messageList.head + messageList = messageList.tail + msg.unlink() if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs) // we know here that systemInvoke ensures that only "fatal" exceptions get rethrown actor systemInvoke msg if (Thread.interrupted()) interruption = new InterruptedException("Interrupted while processing system messages") // don’t ever execute normal message when system message present! - if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null) + if ((messageList.isEmpty) && !isClosed) messageList = systemDrain(SystemMessageList.LNil) } /* * if we closed the mailbox, we must dump the remaining system messages * to deadLetters (this is essential for DeathWatch) */ val dlm = actor.systemImpl.deadLetterMailbox - while (nextMessage ne null) { - val msg = nextMessage - nextMessage = nextMessage.next - msg.next = null + while (messageList.nonEmpty) { + val msg = messageList.head + messageList = messageList.tail + msg.unlink() try dlm.systemEnqueue(actor.self, msg) catch { case e: InterruptedException ⇒ interruption = e @@ -292,13 +301,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue) protected[dispatch] def cleanUp(): Unit = if (actor ne null) { // actor is null for the deadLetterMailbox val dlm = actor.systemImpl.deadLetterMailbox - var message = systemDrain(NoMessage) - while (message ne null) { + var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage)) + while (messageList.nonEmpty) { // message must be “virgin” before being able to systemEnqueue again - val next = message.next - message.next = null - dlm.systemEnqueue(actor.self, message) - message = next + val msg = messageList.head + messageList = messageList.tail + msg.unlink() + dlm.systemEnqueue(actor.self, msg) } if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run() @@ -355,7 +364,7 @@ private[akka] trait SystemMessageQueue { /** * Dequeue all messages from system queue and return them as single-linked list. */ - def systemDrain(newContents: SystemMessage): SystemMessage + def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList def hasSystemMessages: Boolean } @@ -367,36 +376,26 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒ @tailrec final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = { - assert(message.next eq null) + assert(message.unlinked) if (Mailbox.debug) println(receiver + " having enqueued " + message) - val head = systemQueueGet - if (head == NoMessage) { + val currentList = systemQueueGet + if (currentList.head == NoMessage) { if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message) } else { - /* - * This write is safely published by the compareAndSet contained within - * systemQueuePut; “Intra-Thread Semantics” on page 12 of the JSR133 spec - * guarantees that “head” uses the value obtained from systemQueueGet above. - * Hence, SystemMessage.next does not need to be volatile. - */ - message.next = head - if (!systemQueuePut(head, message)) { - message.next = null + if (!systemQueuePut(currentList, message :: currentList)) { + message.unlink() systemEnqueue(receiver, message) } } } @tailrec - final def systemDrain(newContents: SystemMessage): SystemMessage = systemQueueGet match { - case NoMessage ⇒ null - case head ⇒ if (systemQueuePut(head, newContents)) SystemMessage.reverse(head) else systemDrain(newContents) + final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = { + val currentList = systemQueueGet + if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents) } - def hasSystemMessages: Boolean = systemQueueGet match { - case null | NoMessage ⇒ false - case _ ⇒ true - } + def hasSystemMessages: Boolean = systemQueueGet.nonEmpty } diff --git a/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala new file mode 100644 index 0000000000..a62dd03141 --- /dev/null +++ b/akka-actor/src/main/scala/akka/dispatch/sysmsg/SystemMessage.scala @@ -0,0 +1,257 @@ +/** + * Copyright (C) 2009-2013 Typesafe Inc. + */ +package akka.dispatch.sysmsg + +import scala.annotation.tailrec +import akka.actor.{ ActorRef, PossiblyHarmful } + +/** + * INTERNAL API + * + * Helper companion object for [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] and + * [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]] + */ +object SystemMessageList { + final val LNil: LatestFirstSystemMessageList = new LatestFirstSystemMessageList(null) + final val ENil: EarliestFirstSystemMessageList = new EarliestFirstSystemMessageList(null) + + @tailrec + private[sysmsg] def sizeInner(head: SystemMessage, acc: Int): Int = if (head eq null) acc else sizeInner(head.next, acc + 1) + + @tailrec + private[sysmsg] def reverseInner(head: SystemMessage, acc: SystemMessage): SystemMessage = { + if (head eq null) acc else { + val next = head.next + head.next = acc + reverseInner(next, head) + } + } +} + +/** + * + * INTERNAL API + * + * Value class supporting list operations on system messages. The `next` field of [[akka.dispatch.sysmsg.SystemMessage]] + * is hidden, and can only accessed through the value classes [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] and + * [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]], abstracting over the fact that system messages are the + * list nodes themselves. If used properly, this stays a compile time construct without any allocation overhead. + * + * This list is mutable. + * + * The type of the list also encodes that the messages contained are in reverse order, i.e. the head of the list is the + * latest appended element. + * + */ +class LatestFirstSystemMessageList(val head: SystemMessage) extends AnyVal { + import SystemMessageList._ + + /** + * Indicates if the list is empty or not. This operation has constant cost. + */ + final def isEmpty: Boolean = head eq null + + /** + * Indicates if the list has at least one element or not. This operation has constant cost. + */ + final def nonEmpty: Boolean = head ne null + + /** + * Indicates if the list is empty or not. This operation has constant cost. + */ + final def size: Int = sizeInner(head, 0) + + /** + * Gives back the list containing all the elements except the first. This operation has constant cost. + * + * *Warning:* as the underlying list nodes (the [[akka.dispatch.sysmsg.SystemMessage]] instances) are mutable, care + * should be taken when passing the tail to other methods. [[akka.dispatch.sysmsg.SystemMessage#unlink]] should be + * called on the head if one wants to detach the tail permanently. + */ + final def tail: LatestFirstSystemMessageList = new LatestFirstSystemMessageList(head.next) + + /** + * Reverses the list. This operation mutates the underlying list. The cost of the call to reverse is linear in the + * number of elements. + * + * The type of the returned list is of the opposite order: [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]] + */ + final def reverse: EarliestFirstSystemMessageList = new EarliestFirstSystemMessageList(reverseInner(head, null)) + + /** + * Attaches a message to the current head of the list. This operation has constant cost. + */ + final def ::(msg: SystemMessage): LatestFirstSystemMessageList = { + assert(msg ne null) + msg.next = head + new LatestFirstSystemMessageList(msg) + } + +} + +/** + * + * INTERNAL API + * + * Value class supporting list operations on system messages. The `next` field of [[akka.dispatch.sysmsg.SystemMessage]] + * is hidden, and can only accessed through the value classes [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] and + * [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]], abstracting over the fact that system messages are the + * list nodes themselves. If used properly, this stays a compile time construct without any allocation overhead. + * + * This list is mutable. + * + * This list type also encodes that the messages contained are in reverse order, i.e. the head of the list is the + * latest appended element. + * + */ +class EarliestFirstSystemMessageList(val head: SystemMessage) extends AnyVal { + import SystemMessageList._ + + /** + * Indicates if the list is empty or not. This operation has constant cost. + */ + final def isEmpty: Boolean = head eq null + + /** + * Indicates if the list has at least one element or not. This operation has constant cost. + */ + final def nonEmpty: Boolean = head ne null + + /** + * Indicates if the list is empty or not. This operation has constant cost. + */ + final def size: Int = sizeInner(head, 0) + + /** + * Gives back the list containing all the elements except the first. This operation has constant cost. + * + * *Warning:* as the underlying list nodes (the [[akka.dispatch.sysmsg.SystemMessage]] instances) are mutable, care + * should be taken when passing the tail to other methods. [[akka.dispatch.sysmsg.SystemMessage#unlink]] should be + * called on the head if one wants to detach the tail permanently. + */ + final def tail: EarliestFirstSystemMessageList = new EarliestFirstSystemMessageList(head.next) + + /** + * Reverses the list. This operation mutates the underlying list. The cost of the call to reverse is linear in the + * number of elements. + * + * The type of the returned list is of the opposite order: [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] + */ + final def reverse: LatestFirstSystemMessageList = new LatestFirstSystemMessageList(reverseInner(head, null)) + + /** + * Attaches a message to the current head of the list. This operation has constant cost. + */ + final def ::(msg: SystemMessage): EarliestFirstSystemMessageList = { + assert(msg ne null) + msg.next = head + new EarliestFirstSystemMessageList(msg) + } + + /** + * Prepends a list in a reversed order to the head of this list. The prepended list will be reversed during the process. + * + * Example: (3, 4, 5) reversePrepend (2, 1, 0) == (0, 1, 2, 3, 4, 5) + * + * The cost of this operation is linear in the size of the list that is to be prepended. + */ + final def reverse_:::(other: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = { + var remaining = other + var result = this + while (remaining.nonEmpty) { + val msg = remaining.head + remaining = remaining.tail + result ::= msg + } + result + } + +} + +/** + * System messages are handled specially: they form their own queue within + * each actor’s mailbox. This queue is encoded in the messages themselves to + * avoid extra allocations and overhead. The next pointer is a normal var, and + * it does not need to be volatile because in the enqueuing method its update + * is immediately succeeded by a volatile write and all reads happen after the + * volatile read in the dequeuing thread. Afterwards, the obtained list of + * system messages is handled in a single thread only and not ever passed around, + * hence no further synchronization is needed. + * + * INTERNAL API + * + * ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅ + */ +private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable { + // Next fields are only modifiable via the SystemMessageList value class + @transient + private[sysmsg] var next: SystemMessage = _ + + def unlink(): Unit = next = null + + def unlinked: Boolean = next eq null +} + +trait StashWhenWaitingForChildren + +trait StashWhenFailed + +/** + * INTERNAL API + */ +@SerialVersionUID(-4836972106317757555L) +private[akka] case class Create(uid: Int) extends SystemMessage // send to self from Dispatcher.register +/** + * INTERNAL API + */ +@SerialVersionUID(686735569005808256L) +private[akka] case class Recreate(cause: Throwable) extends SystemMessage with StashWhenWaitingForChildren // sent to self from ActorCell.restart +/** + * INTERNAL API + */ +@SerialVersionUID(7270271967867221401L) +private[akka] case class Suspend() extends SystemMessage with StashWhenWaitingForChildren // sent to self from ActorCell.suspend +/** + * INTERNAL API + */ +@SerialVersionUID(-2567504317093262591L) +private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage with StashWhenWaitingForChildren // sent to self from ActorCell.resume +/** + * INTERNAL API + */ +@SerialVersionUID(708873453777219599L) +private[akka] case class Terminate() extends SystemMessage // sent to self from ActorCell.stop +/** + * INTERNAL API + */ +@SerialVersionUID(3245747602115485675L) +private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start +/** + * INTERNAL API + */ +@SerialVersionUID(5513569382760799668L) +private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate +/** + * INTERNAL API + */ +@SerialVersionUID(3323205435124174788L) +private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch +/** + * INTERNAL API + */ +@SerialVersionUID(6363620903363658256L) +private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch +/** + * INTERNAL API + */ +@SerialVersionUID(-5475916034683997987L) +private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination + +/** + * INTERNAL API + */ +@SerialVersionUID(3L) +private[akka] case class Failed(child: ActorRef, cause: Throwable, uid: Int) extends SystemMessage + with StashWhenFailed + with StashWhenWaitingForChildren \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala index 1937a81110..7905acbea3 100644 --- a/akka-actor/src/main/scala/akka/pattern/AskSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/AskSupport.scala @@ -7,7 +7,7 @@ import language.implicitConversions import java.util.concurrent.TimeoutException import akka.actor._ -import akka.dispatch._ +import akka.dispatch.sysmsg._ import scala.annotation.tailrec import scala.util.control.NonFatal import scala.concurrent.{ Future, Promise, ExecutionContext } diff --git a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala index 84980e4ee0..b8f60ed00d 100644 --- a/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala +++ b/akka-actor/src/main/scala/akka/pattern/GracefulStopSupport.scala @@ -6,7 +6,7 @@ package akka.pattern import akka.actor._ import akka.util.{ Timeout } -import akka.dispatch.{ Unwatch, Watch } +import akka.dispatch.sysmsg.{ Unwatch, Watch } import scala.concurrent.Future import scala.concurrent.duration.Duration import scala.util.Success diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala index 0d06d4d64b..bde47e3c34 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterActorRefProvider.scala @@ -18,7 +18,7 @@ import akka.actor.Props import akka.actor.Scheduler import akka.actor.Scope import akka.actor.Terminated -import akka.dispatch.ChildTerminated +import akka.dispatch.sysmsg.ChildTerminated import akka.event.EventStream import akka.japi.Util.immutableSeq import akka.remote.RemoteActorRefProvider diff --git a/akka-docs/rst/general/message-delivery-guarantees.rst b/akka-docs/rst/general/message-delivery-guarantees.rst index d9544d7016..e398346254 100644 --- a/akka-docs/rst/general/message-delivery-guarantees.rst +++ b/akka-docs/rst/general/message-delivery-guarantees.rst @@ -111,6 +111,8 @@ implementation; it is always possible to add stricter guarantees on top of basic ones, but it is not possible to retro-actively remove guarantees in order to gain more performance. +.. _message-ordering: + Discussion: Message Ordering ---------------------------- @@ -153,6 +155,22 @@ Causal transitive ordering would imply that ``M2`` is never received before violated due to different message delivery latencies when ``A``, ``B`` and ``C`` reside on different network hosts, see more below. +Communication of failure +........................ + +Please note, that the ordering guarantees discussed above only hold for user messages between actors. Failure of a child +of an actor is communicated by special system messages that are not ordered relative to ordinary user messages. In +particular: + + Child actor ``C`` sends message ``M`` to its parent ``P`` + + Child actor fails with failure ``F`` + + Parent actor ``P`` might receive the two events either in order ``M``, ``F`` or ``F``, ``M`` + +The reason for this is that internal system messages has their own mailboxes therefore the ordering of enqueue calls of +a user and system message cannot guarantee the ordering of their dequeue times. + The Rules for In-JVM (Local) Message Sends ========================================== diff --git a/akka-docs/rst/general/supervision.rst b/akka-docs/rst/general/supervision.rst index 470079639a..88097a5b88 100644 --- a/akka-docs/rst/general/supervision.rst +++ b/akka-docs/rst/general/supervision.rst @@ -55,6 +55,15 @@ actors cannot be orphaned or attached to supervisors from the outside, which might otherwise catch them unawares. In addition, this yields a natural and clean shutdown procedure for (sub-trees of) actor applications. +.. warning:: + + Supervision related parent-child communication happens by special system + messages that have their own mailboxes separate from user messages. This + implies that supervision related events are not deterministically + ordered relative to ordinary messages. In general, the user cannot influence + the order of normal messages and failure notifications. For details and + example see the :ref:`message-ordering` section. + .. _toplevel-supervisors: The Top-Level Supervisors diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index 5365aa5130..8ae06710c3 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -246,6 +246,14 @@ that triggered the exception will not be received again. Any message sent to an actor while it is being restarted will be queued to its mailbox as usual. +.. warning:: + + Be aware that the ordering of failure notifications relative to user messages + is not deterministic. In particular, a parent might restart its child before + it has processed the last messages sent by the child before the failure. + See :ref:`message-ordering` for details. + + Stop Hook --------- diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index 8ec241d046..a383d47228 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -359,6 +359,13 @@ that triggered the exception will not be received again. Any message sent to an actor while it is being restarted will be queued to its mailbox as usual. +.. warning:: + + Be aware that the ordering of failure notifications relative to user messages + is not deterministic. In particular, a parent might restart its child before + it has processed the last messages sent by the child before the failure. + See :ref:`message-ordering` for details. + Stop Hook --------- diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index fe3f96f6ca..f133453162 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -5,7 +5,7 @@ package akka.remote import akka.{ OnlyCauseStackTrace, AkkaException } import akka.actor._ -import akka.dispatch.SystemMessage +import akka.dispatch.sysmsg.SystemMessage import akka.event.LoggingAdapter import akka.pattern.pipe import akka.remote.EndpointManager.Send diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index c23e3c66b2..4ac2b95229 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -5,7 +5,7 @@ package akka.remote import akka.actor._ -import akka.dispatch._ +import akka.dispatch.sysmsg._ import akka.event.{ Logging, LoggingAdapter, EventStream } import akka.event.Logging.Error import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 6f8b994d54..ed0e9b3134 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -8,7 +8,7 @@ import scala.annotation.tailrec import scala.util.control.NonFatal import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated } import akka.event.LoggingAdapter -import akka.dispatch.Watch +import akka.dispatch.sysmsg.Watch import akka.actor.ActorRefWithCell import akka.actor.ActorRefScope import akka.util.Switch diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 5304ed5c77..c697068b07 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -4,12 +4,9 @@ package akka.remote -import akka.dispatch.SystemMessage -import akka.event.{ LoggingAdapter, Logging } import akka.AkkaException -import akka.serialization.Serialization -import akka.remote.RemoteProtocol._ import akka.actor._ +import akka.event.LoggingAdapter import scala.collection.immutable import scala.concurrent.Future diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index cd026e1902..361c6d4c29 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -10,7 +10,8 @@ import java.util.concurrent.locks.ReentrantLock import scala.annotation.tailrec import com.typesafe.config.Config import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell } -import akka.dispatch.{ MessageQueue, MailboxType, TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } +import akka.dispatch.{ MessageQueue, MailboxType, TaskInvocation, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue } +import akka.dispatch.sysmsg.{ SystemMessage, Suspend, Resume } import scala.concurrent.duration._ import akka.util.Switch import scala.concurrent.duration.Duration diff --git a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala index 57e3230e98..51541814d7 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestEventListener.scala @@ -10,7 +10,7 @@ import scala.collection.immutable import scala.concurrent.duration.Duration import scala.reflect.ClassTag import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage } -import akka.dispatch.{ SystemMessage, Terminate } +import akka.dispatch.sysmsg.{ SystemMessage, Terminate } import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized } import akka.event.Logging import akka.actor.NoSerializationVerificationNeeded