Changed Failed to be a SystemMessage

- Moved system messages to their own package.
 - All queueing operations are now hidden behind a SystemMessageList value class
 - Introduced dual SystemMessageList types to encode the ordering in the type.
   - Protects against accidentally missed reverse calls or accidentally reversed lists
   - Makes ordering expectations by fields/parameters explicit
 - Fixed serialization tests
 - Fixes to logging in HierarchyStressSpec
This commit is contained in:
Endre Sándor Varga 2013-03-05 16:19:54 +01:00
parent bf813d8406
commit f8c3717ca1
34 changed files with 636 additions and 250 deletions

View file

@ -5,11 +5,11 @@
package akka.actor
import language.postfixOps
import akka.dispatch.sysmsg.Failed
import akka.pattern.ask
import akka.testkit._
import scala.concurrent.duration._
import java.util.concurrent.atomic._
import scala.concurrent.Await
import akka.pattern.ask
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class LocalDeathWatchSpec extends AkkaSpec with ImplicitSender with DefaultTimeout with DeathWatchSpec
@ -129,7 +129,7 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
case class FF(fail: Failed)
val strategy = new OneForOneStrategy(maxNrOfRetries = 0)(SupervisorStrategy.makeDecider(List(classOf[Exception]))) {
override def handleFailure(context: ActorContext, child: ActorRef, cause: Throwable, stats: ChildRestartStats, children: Iterable[ChildRestartStats]) = {
testActor.tell(FF(Failed(cause, 0)), child)
testActor.tell(FF(Failed(child, cause, 0)), child)
super.handleFailure(context, child, cause, stats, children)
}
}
@ -145,9 +145,9 @@ trait DeathWatchSpec { this: AkkaSpec with ImplicitSender with DefaultTimeout
failed ! Kill
val result = receiveWhile(3 seconds, messages = 3) {
case FF(Failed(_: ActorKilledException, _)) if lastSender eq failed 1
case FF(Failed(DeathPactException(`failed`), _)) if lastSender eq brother 2
case WrappedTerminated(Terminated(`brother`)) 3
case FF(Failed(_, _: ActorKilledException, _)) if lastSender eq failed 1
case FF(Failed(_, DeathPactException(`failed`), _)) if lastSender eq brother 2
case WrappedTerminated(Terminated(`brother`)) 3
}
testActor.isTerminated must not be true
result must be(Seq(1, 2, 3))

View file

@ -242,6 +242,8 @@ object SupervisorHierarchySpec {
override def postStop {
if (failed || suspended) {
listener ! ErrorLog("not resumed (" + failed + ", " + suspended + ")", log)
val state = stateCache.get(self)
stateCache.put(self.path, state.copy(log = log))
} else {
stateCache.put(self.path, HierarchyState(log, Map(), null))
}
@ -249,7 +251,7 @@ object SupervisorHierarchySpec {
def check(msg: Any): Boolean = {
suspended = false
log :+= Event(msg, identityHashCode(this))
log :+= Event(msg, identityHashCode(Hierarchy.this))
if (failed) {
abort("processing message while failed")
failed = false
@ -287,13 +289,15 @@ object SupervisorHierarchySpec {
val props = Props(new Hierarchy(kids, breadth, listener, myLevel + 1)).withDispatcher("hierarchy")
context.watch(context.actorOf(props, name))
} else {
log :+= Event(sender + " terminated while pongOfDeath", identityHashCode(this))
// WARNING: The Terminated that is logged by this is logged by check() above, too. It is not
// an indication of duplicate Terminate messages
log :+= Event(sender + " terminated while pongOfDeath", identityHashCode(Hierarchy.this))
}
case Abort abort("terminating")
case PingOfDeath
if (size > 1) {
pongsToGo = context.children.size
log :+= Event("sending " + pongsToGo + " pingOfDeath", identityHashCode(this))
log :+= Event("sending " + pongsToGo + " pingOfDeath", identityHashCode(Hierarchy.this))
context.children foreach (_ ! PingOfDeath)
} else {
context stop self

View file

@ -16,6 +16,7 @@ import org.scalatest.junit.JUnitRunner
import com.typesafe.config.Config
import akka.actor._
import akka.dispatch.sysmsg._
import akka.dispatch._
import akka.event.Logging.Error
import akka.pattern.ask
@ -390,7 +391,8 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path }
} foreach {
case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain(null)))
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " "
+ cell.mailbox.numberOfMessages + " " + cell.mailbox.systemDrain(SystemMessageList.LNil).size)
}
System.err.println("Mailbox: " + mq.numberOfMessages + " " + mq.hasMessages)

View file

@ -0,0 +1,116 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch.sysmsg
import akka.testkit.AkkaSpec
class SystemMessageListSpec extends AkkaSpec {
import SystemMessageList.LNil
import SystemMessageList.ENil
"The SystemMessageList value class" must {
"handle empty lists correctly" in {
LNil.head must be === null
LNil.isEmpty must be(true)
(LNil.reverse == ENil) must be(true)
}
"able to append messages" in {
val create0 = Create(0)
val create1 = Create(1)
val create2 = Create(2)
((create0 :: LNil).head eq create0) must be(true)
((create1 :: create0 :: LNil).head eq create1) must be(true)
((create2 :: create1 :: create0 :: LNil).head eq create2) must be(true)
(create2.next eq create1) must be(true)
(create1.next eq create0) must be(true)
(create0.next eq null) must be(true)
}
"able to deconstruct head and tail" in {
val create0 = Create(0)
val create1 = Create(1)
val create2 = Create(2)
val list = create2 :: create1 :: create0 :: LNil
(list.head eq create2) must be(true)
(list.tail.head eq create1) must be(true)
(list.tail.tail.head eq create0) must be(true)
(list.tail.tail.tail.head eq null) must be(true)
}
"properly report size and emptyness" in {
val create0 = Create(0)
val create1 = Create(1)
val create2 = Create(2)
val list = create2 :: create1 :: create0 :: LNil
list.size must be === 3
list.isEmpty must be(false)
list.tail.size must be === 2
list.tail.isEmpty must be(false)
list.tail.tail.size must be === 1
list.tail.tail.isEmpty must be(false)
list.tail.tail.tail.size must be === 0
list.tail.tail.tail.isEmpty must be(true)
}
"properly reverse contents" in {
val create0 = Create(0)
val create1 = Create(1)
val create2 = Create(2)
val list = create2 :: create1 :: create0 :: LNil
val listRev: EarliestFirstSystemMessageList = list.reverse
listRev.isEmpty must be(false)
listRev.size must be === 3
(listRev.head eq create0) must be(true)
(listRev.tail.head eq create1) must be(true)
(listRev.tail.tail.head eq create2) must be(true)
(listRev.tail.tail.tail.head eq null) must be(true)
(create0.next eq create1) must be(true)
(create1.next eq create2) must be(true)
(create2.next eq null) must be(true)
}
}
"EarliestFirstSystemMessageList" must {
"properly prepend reversed message lists to the front" in {
val create0 = Create(0)
val create1 = Create(1)
val create2 = Create(2)
val create3 = Create(3)
val create4 = Create(4)
val create5 = Create(5)
val fwdList = create3 :: create4 :: create5 :: ENil
val revList = create2 :: create1 :: create0 :: LNil
val list = revList reverse_::: fwdList
(list.head eq create0) must be(true)
(list.tail.head eq create1) must be(true)
(list.tail.tail.head eq create2) must be(true)
(list.tail.tail.tail.head eq create3) must be(true)
(list.tail.tail.tail.tail.head eq create4) must be(true)
(list.tail.tail.tail.tail.tail.head eq create5) must be(true)
(list.tail.tail.tail.tail.tail.tail.head eq null) must be(true)
(LNil reverse_::: ENil) == ENil must be(true)
((create0 :: LNil reverse_::: ENil).head eq create0) must be(true)
((LNil reverse_::: create0 :: ENil).head eq create0) must be(true)
}
}
}

View file

@ -8,7 +8,7 @@ import language.postfixOps
import akka.testkit.{ AkkaSpec, EventFilter }
import akka.actor._
import akka.dispatch._
import akka.dispatch.sysmsg._
import java.io._
import scala.concurrent.Await
import akka.util.Timeout
@ -109,7 +109,7 @@ object SerializationTests {
}
serialization-bindings {
"akka.dispatch.SystemMessage" = test
"akka.dispatch.sysmsg.SystemMessage" = test
}
}
}
@ -125,6 +125,7 @@ object SerializationTests {
classOf[ChildTerminated],
classOf[Watch],
classOf[Unwatch],
classOf[Failed],
NoMessage.getClass)
}
@ -335,31 +336,35 @@ class SerializationCompatibilitySpec extends AkkaSpec(SerializationTests.mostlyR
verify(Create(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e437265617465000000000000000302000078707671007e0003")
}
"be preserved for the Recreate SystemMessage" in {
verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720016616b6b612e64697370617463682e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
verify(Recreate(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e7379736d73672e52656372656174650987c65c8d378a800200014c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
}
"be preserved for the Suspend SystemMessage" in {
verify(Suspend(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e53757370656e6464e531d5d134b59902000078707671007e0003")
verify(Suspend(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001c616b6b612e64697370617463682e7379736d73672e53757370656e6464e531d5d134b59902000078707671007e0003")
}
"be preserved for the Resume SystemMessage" in {
verify(Resume(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720014616b6b612e64697370617463682e526573756d65dc5e646d445fcb010200014c000f63617573656442794661696c7572657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
verify(Resume(null), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e526573756d65dc5e646d445fcb010200014c000f63617573656442794661696c7572657400154c6a6176612f6c616e672f5468726f7761626c653b7870707671007e0003")
}
"be preserved for the Terminate SystemMessage" in {
verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e5465726d696e61746509d66ca68318700f02000078707671007e0003")
verify(Terminate(), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001e616b6b612e64697370617463682e7379736d73672e5465726d696e61746509d66ca68318700f02000078707671007e0003")
}
"be preserved for the Supervise SystemMessage" in {
verify(Supervise(FakeActorRef("child"), true), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720017616b6b612e64697370617463682e53757065727669736500000000000000030200025a00056173796e634c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b7870017372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
}
"be preserved for the ChildTerminated SystemMessage" in {
verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001d616b6b612e64697370617463682e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
verify(ChildTerminated(FakeActorRef("child")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720024616b6b612e64697370617463682e7379736d73672e4368696c645465726d696e617465644c84222437ed5db40200014c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b78707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
}
"be preserved for the Watch SystemMessage" in {
verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720013616b6b612e64697370617463682e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
verify(Watch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001a616b6b612e64697370617463682e7379736d73672e57617463682e1e65bc74394fc40200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
}
"be preserved for the Unwatch SystemMessage" in {
verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720015616b6b612e64697370617463682e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f725265660d0aa2ca1e82097602000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
verify(Unwatch(FakeActorRef("watchee"), FakeActorRef("watcher")), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001c616b6b612e64697370617463682e7379736d73672e556e776174636858501f7ee63dc2100200024c0007776174636865657400154c616b6b612f6163746f722f4163746f725265663b4c00077761746368657271007e000478707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f46940200007870740007776174636865657371007e0006740007776174636865727671007e0003")
}
"be preserved for the NoMessage SystemMessage" in {
verify(NoMessage, "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e0001787073720018616b6b612e64697370617463682e4e6f4d65737361676524b401a3610ccb70dd02000078707671007e0003")
verify(NoMessage, "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001f616b6b612e64697370617463682e7379736d73672e4e6f4d65737361676524b401a3610ccb70dd02000078707671007e0003")
}
"be preserved for the Failed SystemMessage" in {
// Using null as the cause to avoid a large serialized message
verify(Failed(FakeActorRef("child"), cause = null, uid = 0), "aced00057372000c7363616c612e5475706c6532bc7daadf46211a990200024c00025f317400124c6a6176612f6c616e672f4f626a6563743b4c00025f3271007e000178707372001b616b6b612e64697370617463682e7379736d73672e4661696c656400000000000000030200034900037569644c000563617573657400154c6a6176612f6c616e672f5468726f7761626c653b4c00056368696c647400154c616b6b612f6163746f722f4163746f725265663b787000000000707372001f616b6b612e73657269616c697a6174696f6e2e46616b654163746f7252656600000000000000010200014c00046e616d657400124c6a6176612f6c616e672f537472696e673b7872001b616b6b612e6163746f722e496e7465726e616c4163746f72526566db6eaed9e69a356302000078720013616b6b612e6163746f722e4163746f72526566c3585dde655f469402000078707400056368696c647671007e0003")
}
}
}

View file

@ -26,7 +26,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import akka.dispatch.SystemMessage;
import akka.dispatch.sysmsg.SystemMessage;
import akka.util.Helpers;
import scala.concurrent.duration.Duration;
import scala.concurrent.duration.FiniteDuration;

View file

@ -28,12 +28,6 @@ trait PossiblyHarmful
*/
trait NoSerializationVerificationNeeded
/**
* INTERNAL API
*/
@SerialVersionUID(2L)
private[akka] case class Failed(cause: Throwable, uid: Int) extends AutoReceivedMessage with PossiblyHarmful
abstract class PoisonPill extends AutoReceivedMessage with PossiblyHarmful
/**

View file

@ -4,9 +4,17 @@
package akka.actor
import akka.actor.dungeon.ChildrenContainer
import akka.dispatch.Envelope
import akka.dispatch.NullMessage
import akka.dispatch.sysmsg._
import akka.event.Logging.Debug
import akka.event.Logging.{ LogEvent, Error }
import akka.japi.Procedure
import java.io.{ ObjectOutputStream, NotSerializableException }
import scala.annotation.tailrec
import scala.annotation.{ switch, tailrec }
import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration.Duration
import scala.util.control.NonFatal
import akka.actor.dungeon.ChildrenContainer
@ -16,7 +24,6 @@ import akka.event.Logging.{ LogEvent, Debug, Error }
import akka.japi.Procedure
import akka.dispatch.NullMessage
import scala.concurrent.ExecutionContext
import scala.concurrent.forkjoin.ThreadLocalRandom
/**
* The actor context - the view of the actor cell from the actor.
@ -325,6 +332,9 @@ private[akka] object ActorCell {
else (name.substring(0, i), Integer.valueOf(name.substring(i + 1)))
}
final val DefaultState = 0
final val SuspendedState = 1
final val SuspendedWaitForChildrenState = 2
}
//ACTORCELL IS 64bytes and should stay that way unless very good reason not to (machine sympathy, cache line fit)
@ -362,12 +372,24 @@ private[akka] class ActorCell(
protected def actor_=(a: Actor): Unit = _actor = a
var currentMessage: Envelope = _
private var behaviorStack: List[Actor.Receive] = emptyBehaviorStack
private[this] var sysmsgStash: LatestFirstSystemMessageList = SystemMessageList.LNil
protected def stash(msg: SystemMessage): Unit = {
assert(msg.unlinked)
sysmsgStash ::= msg
}
private def unstashAll(): LatestFirstSystemMessageList = {
val unstashed = sysmsgStash
sysmsgStash = SystemMessageList.LNil
unstashed
}
/*
* MESSAGE PROCESSING
*/
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
@tailrec final def systemInvoke(message: SystemMessage): Unit = {
final def systemInvoke(message: SystemMessage): Unit = {
/*
* When recreate/suspend/resume are received while restarting (i.e. between
* preRestart and postRestart, waiting for children to terminate), these
@ -377,36 +399,61 @@ private[akka] class ActorCell(
* types (hence the overwrite further down). Mailbox sets message.next=null
* before systemInvoke, so this will only be non-null during such a replay.
*/
var todo = message.next
try {
message match {
case Create() create()
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Recreate(cause)
waitingForChildrenOrNull match {
case null faultRecreate(cause)
case w: WaitingForChildren w.enqueue(message)
}
case Suspend()
waitingForChildrenOrNull match {
case null faultSuspend()
case w: WaitingForChildren w.enqueue(message)
}
case Resume(inRespToFailure)
waitingForChildrenOrNull match {
case null faultResume(inRespToFailure)
case w: WaitingForChildren w.enqueue(message)
}
case Terminate() terminate()
case Supervise(child, async) supervise(child, async)
case ChildTerminated(child) todo = handleChildTerminated(child)
case NoMessage // only here to suppress warning
def calculateState: Int =
if (waitingForChildrenOrNull ne null) SuspendedWaitForChildrenState
else if (mailbox.isSuspended) SuspendedState
else DefaultState
@tailrec def sendAllToDeadLetters(messages: EarliestFirstSystemMessageList): Unit =
if (messages.nonEmpty) {
val tail = messages.tail
val msg = messages.head
msg.unlink()
provider.deadLetters ! msg
sendAllToDeadLetters(tail)
}
} catch handleNonFatalOrInterruptedException { e
handleInvokeFailure(Nil, e)
def shouldStash(m: SystemMessage, state: Int): Boolean =
(state: @switch) match {
case DefaultState false
case SuspendedState m.isInstanceOf[StashWhenFailed]
case SuspendedWaitForChildrenState m.isInstanceOf[StashWhenWaitingForChildren]
}
@tailrec
def invokeAll(messages: EarliestFirstSystemMessageList, currentState: Int): Unit = {
val rest = messages.tail
val message = messages.head
message.unlink()
try {
message match {
case message: SystemMessage if shouldStash(message, currentState) stash(message)
case f: Failed handleFailure(f)
case Create() create(uid)
case Watch(watchee, watcher) addWatcher(watchee, watcher)
case Unwatch(watchee, watcher) remWatcher(watchee, watcher)
case Recreate(cause) faultRecreate(cause)
case Suspend() faultSuspend()
case Resume(inRespToFailure) faultResume(inRespToFailure)
case Terminate() terminate()
case Supervise(child, async) supervise(child, async, uid)
case ChildTerminated(child) handleChildTerminated(child)
case NoMessage // only here to suppress warning
}
} catch handleNonFatalOrInterruptedException { e
handleInvokeFailure(Nil, e)
}
val newState = calculateState
// As each state accepts a strict subset of another state, it is enough to unstash if we "walk up" the state
// chain
val todo = if (newState < currentState) unstashAll() reverse_::: rest else rest
if (isTerminated) sendAllToDeadLetters(todo)
else if (todo.nonEmpty) invokeAll(todo, newState)
}
if (todo != null) systemInvoke(todo)
invokeAll(new EarliestFirstSystemMessageList(message), calculateState)
}
//Memory consistency is handled by the Mailbox (reading mailbox status then processing messages, then writing mailbox status
@ -430,7 +477,6 @@ private[akka] class ActorCell(
publish(Debug(self.path.toString, clazz(actor), "received AutoReceiveMessage " + msg))
msg.message match {
case Failed(cause, uid) handleFailure(sender, cause, uid)
case t: Terminated watchedActorTerminated(t)
case AddressTerminated(address) addressTerminated(address)
case Kill throw new ActorKilledException("Kill")

View file

@ -5,6 +5,7 @@
package akka.actor
import akka.dispatch._
import akka.dispatch.sysmsg._
import akka.util._
import java.lang.{ UnsupportedOperationException, IllegalStateException }
import akka.serialization.{ Serialization, JavaSerializer }

View file

@ -4,7 +4,8 @@
package akka.actor
import akka.dispatch._
import akka.dispatch.sysmsg._
import akka.dispatch.NullMessage
import akka.routing._
import akka.event._
import akka.util.{ Switch, Helpers }
@ -388,17 +389,17 @@ class LocalActorRefProvider private[akka] (
override def isTerminated: Boolean = stopped.isOn
override def !(message: Any)(implicit sender: ActorRef = Actor.noSender): Unit = stopped.ifOff(message match {
case null throw new InvalidMessageException("Message is null")
case Failed(ex, _) if sender ne null { causeOfTermination = Some(ex); sender.asInstanceOf[InternalActorRef].stop() }
case NullMessage // do nothing
case _ log.error(this + " received unexpected message [" + message + "]")
case null throw new InvalidMessageException("Message is null")
case NullMessage // do nothing
case _ log.error(this + " received unexpected message [" + message + "]")
})
override def sendSystemMessage(message: SystemMessage): Unit = stopped ifOff {
message match {
case Supervise(_, _) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(_) stop()
case _ log.error(this + " received unexpected system message [" + message + "]")
case Failed(child, ex, _) { causeOfTermination = Some(ex); child.asInstanceOf[InternalActorRef].stop() }
case Supervise(_, _) // TODO register child in some map to keep track of it and enable shutdown after all dead
case ChildTerminated(_) stop()
case _ log.error(this + " received unexpected system message [" + message + "]")
}
}
}

View file

@ -10,6 +10,7 @@ import java.util.concurrent.TimeUnit.MILLISECONDS
import com.typesafe.config.{ Config, ConfigFactory }
import akka.event._
import akka.dispatch._
import akka.dispatch.sysmsg.{ SystemMessageList, EarliestFirstSystemMessageList, LatestFirstSystemMessageList, SystemMessage }
import akka.japi.Util.immutableSeq
import akka.actor.dungeon.ChildrenContainer
import akka.util._
@ -559,7 +560,7 @@ private[akka] class ActorSystemImpl(val name: String, applicationConfig: Config,
becomeClosed()
def systemEnqueue(receiver: ActorRef, handle: SystemMessage): Unit =
deadLetters ! DeadLetter(handle, receiver, receiver)
def systemDrain(newContents: SystemMessage): SystemMessage = null
def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = SystemMessageList.ENil
def hasSystemMessages = false
}

View file

@ -15,6 +15,7 @@ import akka.actor.dungeon.ChildrenContainer
import akka.event.Logging.Warning
import akka.util.Unsafe
import akka.dispatch._
import akka.dispatch.sysmsg._
import util.Try
/**

View file

@ -7,7 +7,7 @@ package akka.actor.dungeon
import scala.collection.immutable
import akka.actor.{ InvalidActorNameException, ChildStats, ChildRestartStats, ChildNameReserved, ActorRef }
import akka.dispatch.SystemMessage
import akka.dispatch.sysmsg.{ EarliestFirstSystemMessageList, SystemMessageList, LatestFirstSystemMessageList, SystemMessage }
import akka.util.Collections.{ EmptyImmutableSeq, PartialImmutableValuesIterable }
/**
@ -62,11 +62,7 @@ private[akka] object ChildrenContainer {
override final def valuesIterator = stats.valuesIterator
}
trait WaitingForChildren {
private var todo: SystemMessage = null
def enqueue(message: SystemMessage) = { message.next = todo; todo = message }
def dequeueAll(): SystemMessage = { val ret = SystemMessage.reverse(todo); todo = null; ret }
}
trait WaitingForChildren
trait EmptyChildrenContainer extends ChildrenContainer {
val emptyStats = immutable.TreeMap.empty[String, ChildStats]

View file

@ -5,7 +5,7 @@
package akka.actor.dungeon
import akka.actor.{ Terminated, InternalActorRef, ActorRef, ActorRefScope, ActorCell, Actor, Address, AddressTerminated }
import akka.dispatch.{ ChildTerminated, Watch, Unwatch }
import akka.dispatch.sysmsg.{ ChildTerminated, Watch, Unwatch }
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
import akka.actor.MinimalActorRef

View file

@ -5,7 +5,8 @@
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.dispatch.{ Terminate, SystemMessage, Suspend, Resume, Recreate, MessageDispatcher, Mailbox, Envelope, Create }
import akka.dispatch.{ MessageDispatcher, Mailbox, Envelope }
import akka.dispatch.sysmsg._
import akka.event.Logging.Error
import akka.util.Unsafe
import akka.dispatch.NullMessage
@ -53,7 +54,7 @@ private[akka] trait Dispatch { this: ActorCell ⇒
if (sendSupervise) {
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(akka.dispatch.Supervise(self, async = false))
parent.sendSystemMessage(akka.dispatch.sysmsg.Supervise(self, async = false, uid))
parent ! NullMessage // read ScalaDoc of NullMessage to see why
}
this

View file

@ -4,20 +4,19 @@
package akka.actor.dungeon
import scala.annotation.tailrec
import akka.actor.{ PreRestartException, PostRestartException, InternalActorRef, Failed, ActorRef, ActorInterruptedException, ActorCell, Actor }
import akka.dispatch._
import akka.event.Logging.{ Warning, Error, Debug }
import scala.util.control.NonFatal
import akka.event.Logging
import scala.collection.immutable
import akka.dispatch.ChildTerminated
import akka.actor.PreRestartException
import akka.actor.Failed
import akka.actor.PostRestartException
import akka.actor.PreRestartException
import akka.actor.{ InternalActorRef, ActorRef, ActorInterruptedException, ActorCell, Actor }
import akka.dispatch._
import akka.dispatch.sysmsg.ChildTerminated
import akka.dispatch.sysmsg._
import akka.event.Logging
import akka.event.Logging.Debug
import akka.event.Logging.Error
import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.util.control.Exception._
import scala.util.control.NonFatal
private[akka] trait FaultHandling { this: ActorCell
@ -134,7 +133,10 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
private def finishCreate(): Unit = {
try resumeNonRecursive()
finally clearFailed()
create()
try create()
catch handleNonFatalOrInterruptedException { e
handleInvokeFailure(Nil, e)
}
}
protected def terminate() {
@ -169,14 +171,18 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
suspendNonRecursive()
// suspend children
val skip: Set[ActorRef] = currentMessage match {
case Envelope(Failed(_, _), child) { setFailed(child); Set(child) }
case _ { setFailed(self); Set.empty }
case Envelope(Failed(_, _, _), child) setFailed(child); Set(child)
case _ setFailed(self); Set.empty
}
suspendChildren(exceptFor = skip ++ childrenNotToSuspend)
t match {
// tell supervisor
case _: InterruptedException parent.tell(Failed(new ActorInterruptedException(t), uid), self)
case _ parent.tell(Failed(t, uid), self)
case _: InterruptedException
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, new ActorInterruptedException(t), uid))
case _
// ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
parent.sendSystemMessage(Failed(self, t, uid))
}
} catch handleNonFatalOrInterruptedException { e
publish(Error(e, self.path.toString, clazz(actor),
@ -237,23 +243,25 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
}
}
final protected def handleFailure(child: ActorRef, cause: Throwable, uid: Int): Unit =
getChildByRef(child) match {
final protected def handleFailure(f: Failed): Unit = {
currentMessage = Envelope(f, f.child, system)
getChildByRef(f.child) match {
/*
* only act upon the failure, if it comes from a currently known child;
* the UID protects against reception of a Failed from a child which was
* killed in preRestart and re-created in postRestart
*/
case Some(stats) if stats.uid == uid
if (!actor.supervisorStrategy.handleFailure(this, child, cause, stats, getAllChildStats)) throw cause
case Some(stats) if stats.uid == f.uid
if (!actor.supervisorStrategy.handleFailure(this, f.child, f.cause, stats, getAllChildStats)) throw f.cause
case Some(stats)
publish(Debug(self.path.toString, clazz(actor),
"dropping Failed(" + cause + ") from old child " + child + " (uid=" + stats.uid + " != " + uid + ")"))
"dropping Failed(" + f.cause + ") from old child " + f.child + " (uid=" + stats.uid + " != " + f.uid + ")"))
case None
publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + cause + ") from unknown child " + child))
publish(Debug(self.path.toString, clazz(actor), "dropping Failed(" + f.cause + ") from unknown child " + f.child))
}
}
final protected def handleChildTerminated(child: ActorRef): SystemMessage = {
final protected def handleChildTerminated(child: ActorRef): Unit = {
val status = removeChildAndGetStateChange(child)
/*
* if this fails, we do nothing in case of terminating/restarting state,
@ -272,10 +280,10 @@ private[akka] trait FaultHandling { this: ActorCell ⇒
* then we are continuing the previously suspended recreate/create/terminate action
*/
status match {
case Some(c @ ChildrenContainer.Recreation(cause)) { finishRecreate(cause, actor); c.dequeueAll() }
case Some(c @ ChildrenContainer.Creation()) { finishCreate(); c.dequeueAll() }
case Some(ChildrenContainer.Termination) { finishTerminate(); null }
case _ null
case Some(c @ ChildrenContainer.Recreation(cause)) finishRecreate(cause, actor)
case Some(c @ ChildrenContainer.Creation()) finishCreate()
case Some(ChildrenContainer.Termination) finishTerminate()
case _
}
}

View file

@ -7,6 +7,7 @@ package akka.dispatch
import java.util.concurrent._
import akka.event.Logging.{ Error, LogEventException }
import akka.actor._
import akka.dispatch.sysmsg._
import akka.event.EventStream
import com.typesafe.config.Config
import akka.util.{ Unsafe, Index }
@ -41,95 +42,6 @@ object Envelope {
*/
case object NullMessage extends AutoReceivedMessage
/**
* INTERNAL API
*/
private[akka] object SystemMessage {
@tailrec
final def size(list: SystemMessage, acc: Int = 0): Int = {
if (list eq null) acc else size(list.next, acc + 1)
}
@tailrec
final def reverse(list: SystemMessage, acc: SystemMessage = null): SystemMessage = {
if (list eq null) acc else {
val next = list.next
list.next = acc
reverse(next, list)
}
}
}
/**
* System messages are handled specially: they form their own queue within
* each actors mailbox. This queue is encoded in the messages themselves to
* avoid extra allocations and overhead. The next pointer is a normal var, and
* it does not need to be volatile because in the enqueuing method its update
* is immediately succeeded by a volatile write and all reads happen after the
* volatile read in the dequeuing thread. Afterwards, the obtained list of
* system messages is handled in a single thread only and not ever passed around,
* hence no further synchronization is needed.
*
* INTERNAL API
*
* ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
*/
private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable {
@transient
var next: SystemMessage = _
}
/**
* INTERNAL API
*/
@SerialVersionUID(3L)
private[akka] case class Create() extends SystemMessage // send to self from Dispatcher.register
/**
* INTERNAL API
*/
@SerialVersionUID(686735569005808256L)
private[akka] case class Recreate(cause: Throwable) extends SystemMessage // sent to self from ActorCell.restart
/**
* INTERNAL API
*/
@SerialVersionUID(7270271967867221401L)
private[akka] case class Suspend() extends SystemMessage // sent to self from ActorCell.suspend
/**
* INTERNAL API
*/
@SerialVersionUID(-2567504317093262591L)
private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage // sent to self from ActorCell.resume
/**
* INTERNAL API
*/
@SerialVersionUID(708873453777219599L)
private[akka] case class Terminate() extends SystemMessage // sent to self from ActorCell.stop
/**
* INTERNAL API
*/
@SerialVersionUID(3L)
private[akka] case class Supervise(child: ActorRef, async: Boolean) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
/**
* INTERNAL API
*/
@SerialVersionUID(5513569382760799668L)
private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate
/**
* INTERNAL API
*/
@SerialVersionUID(3323205435124174788L)
private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch
/**
* INTERNAL API
*/
@SerialVersionUID(6363620903363658256L)
private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch
/**
* INTERNAL API
*/
@SerialVersionUID(-5475916034683997987L)
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination
final case class TaskInvocation(eventStream: EventStream, runnable: Runnable, cleanup: () Unit) extends Batchable {
final override def isBatchable: Boolean = runnable match {
case b: Batchable b.isBatchable

View file

@ -5,6 +5,7 @@
package akka.dispatch
import akka.actor.{ ActorCell, ActorRef }
import akka.dispatch.sysmsg._
import scala.annotation.tailrec
import scala.concurrent.duration.Duration
import akka.util.Helpers
@ -56,13 +57,13 @@ class BalancingDispatcher(
override def cleanUp(): Unit = {
val dlq = system.deadLetterMailbox
//Don't call the original implementation of this since it scraps all messages, and we don't want to do that
var message = systemDrain(NoMessage)
while (message ne null) {
var messages = systemDrain(new LatestFirstSystemMessageList(NoMessage))
while (messages.nonEmpty) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
val message = messages.head
messages = messages.tail
message.unlink()
dlq.systemEnqueue(system.deadLetters, message)
message = next
}
}
}

View file

@ -7,6 +7,7 @@ package akka.dispatch
import akka.event.Logging.Error
import akka.actor.ActorCell
import akka.event.Logging
import akka.dispatch.sysmsg.SystemMessage
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ ExecutorService, RejectedExecutionException }
import scala.concurrent.forkjoin.ForkJoinPool

View file

@ -6,6 +6,7 @@ package akka.dispatch
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ Unsafe, BoundedBlockingQueue }
import akka.event.Logging.Error
@ -14,6 +15,9 @@ import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
import scala.concurrent.duration.FiniteDuration
import akka.actor.DeadLetter
import akka.dispatch.BoundedMailbox
import akka.dispatch.BoundedDequeBasedMailbox
/**
* INTERNAL API
@ -196,11 +200,16 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
/*
* AtomicReferenceFieldUpdater for system queue.
*/
protected final def systemQueueGet: SystemMessage =
Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage]
protected final def systemQueueGet: LatestFirstSystemMessageList =
// Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such
// it just exists as a typed view during compile-time. The actual return type is still SystemMessage.
new LatestFirstSystemMessageList(Unsafe.instance.getObjectVolatile(this, AbstractMailbox.systemMessageOffset).asInstanceOf[SystemMessage])
protected final def systemQueuePut(_old: SystemMessage, _new: SystemMessage): Boolean =
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old, _new)
protected final def systemQueuePut(_old: LatestFirstSystemMessageList, _new: LatestFirstSystemMessageList): Boolean =
// Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new
// are SystemMessage instances hidden during compile time behind the SystemMessageList value class.
// Without calling .head the parameters would be boxed in SystemMessageList wrapper.
Unsafe.instance.compareAndSwapObject(this, AbstractMailbox.systemMessageOffset, _old.head, _new.head)
final def canBeScheduledForExecution(hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = status match {
case Open | Scheduled hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
@ -248,28 +257,28 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
*/
final def processAllSystemMessages() {
var interruption: Throwable = null
var nextMessage = systemDrain(null)
while ((nextMessage ne null) && !isClosed) {
val msg = nextMessage
nextMessage = nextMessage.next
msg.next = null
var messageList = systemDrain(SystemMessageList.LNil)
while ((messageList.nonEmpty) && !isClosed) {
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
if (debug) println(actor.self + " processing system message " + msg + " with " + actor.childrenRefs)
// we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
actor systemInvoke msg
if (Thread.interrupted())
interruption = new InterruptedException("Interrupted while processing system messages")
// dont ever execute normal message when system message present!
if ((nextMessage eq null) && !isClosed) nextMessage = systemDrain(null)
if ((messageList.isEmpty) && !isClosed) messageList = systemDrain(SystemMessageList.LNil)
}
/*
* if we closed the mailbox, we must dump the remaining system messages
* to deadLetters (this is essential for DeathWatch)
*/
val dlm = actor.systemImpl.deadLetterMailbox
while (nextMessage ne null) {
val msg = nextMessage
nextMessage = nextMessage.next
msg.next = null
while (messageList.nonEmpty) {
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
try dlm.systemEnqueue(actor.self, msg)
catch {
case e: InterruptedException interruption = e
@ -292,13 +301,13 @@ private[akka] abstract class Mailbox(val messageQueue: MessageQueue)
protected[dispatch] def cleanUp(): Unit =
if (actor ne null) { // actor is null for the deadLetterMailbox
val dlm = actor.systemImpl.deadLetterMailbox
var message = systemDrain(NoMessage)
while (message ne null) {
var messageList = systemDrain(new LatestFirstSystemMessageList(NoMessage))
while (messageList.nonEmpty) {
// message must be virgin before being able to systemEnqueue again
val next = message.next
message.next = null
dlm.systemEnqueue(actor.self, message)
message = next
val msg = messageList.head
messageList = messageList.tail
msg.unlink()
dlm.systemEnqueue(actor.self, msg)
}
if (messageQueue ne null) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
@ -355,7 +364,7 @@ private[akka] trait SystemMessageQueue {
/**
* Dequeue all messages from system queue and return them as single-linked list.
*/
def systemDrain(newContents: SystemMessage): SystemMessage
def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList
def hasSystemMessages: Boolean
}
@ -367,36 +376,26 @@ private[akka] trait DefaultSystemMessageQueue { self: Mailbox ⇒
@tailrec
final def systemEnqueue(receiver: ActorRef, message: SystemMessage): Unit = {
assert(message.next eq null)
assert(message.unlinked)
if (Mailbox.debug) println(receiver + " having enqueued " + message)
val head = systemQueueGet
if (head == NoMessage) {
val currentList = systemQueueGet
if (currentList.head == NoMessage) {
if (actor ne null) actor.systemImpl.deadLetterMailbox.systemEnqueue(receiver, message)
} else {
/*
* This write is safely published by the compareAndSet contained within
* systemQueuePut; Intra-Thread Semantics on page 12 of the JSR133 spec
* guarantees that head uses the value obtained from systemQueueGet above.
* Hence, SystemMessage.next does not need to be volatile.
*/
message.next = head
if (!systemQueuePut(head, message)) {
message.next = null
if (!systemQueuePut(currentList, message :: currentList)) {
message.unlink()
systemEnqueue(receiver, message)
}
}
}
@tailrec
final def systemDrain(newContents: SystemMessage): SystemMessage = systemQueueGet match {
case NoMessage null
case head if (systemQueuePut(head, newContents)) SystemMessage.reverse(head) else systemDrain(newContents)
final def systemDrain(newContents: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
val currentList = systemQueueGet
if (systemQueuePut(currentList, newContents)) currentList.reverse else systemDrain(newContents)
}
def hasSystemMessages: Boolean = systemQueueGet match {
case null | NoMessage false
case _ true
}
def hasSystemMessages: Boolean = systemQueueGet.nonEmpty
}

View file

@ -0,0 +1,257 @@
/**
* Copyright (C) 2009-2013 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.dispatch.sysmsg
import scala.annotation.tailrec
import akka.actor.{ ActorRef, PossiblyHarmful }
/**
* INTERNAL API
*
* Helper companion object for [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] and
* [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]]
*/
object SystemMessageList {
final val LNil: LatestFirstSystemMessageList = new LatestFirstSystemMessageList(null)
final val ENil: EarliestFirstSystemMessageList = new EarliestFirstSystemMessageList(null)
@tailrec
private[sysmsg] def sizeInner(head: SystemMessage, acc: Int): Int = if (head eq null) acc else sizeInner(head.next, acc + 1)
@tailrec
private[sysmsg] def reverseInner(head: SystemMessage, acc: SystemMessage): SystemMessage = {
if (head eq null) acc else {
val next = head.next
head.next = acc
reverseInner(next, head)
}
}
}
/**
*
* INTERNAL API
*
* Value class supporting list operations on system messages. The `next` field of [[akka.dispatch.sysmsg.SystemMessage]]
* is hidden, and can only accessed through the value classes [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] and
* [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]], abstracting over the fact that system messages are the
* list nodes themselves. If used properly, this stays a compile time construct without any allocation overhead.
*
* This list is mutable.
*
* The type of the list also encodes that the messages contained are in reverse order, i.e. the head of the list is the
* latest appended element.
*
*/
class LatestFirstSystemMessageList(val head: SystemMessage) extends AnyVal {
import SystemMessageList._
/**
* Indicates if the list is empty or not. This operation has constant cost.
*/
final def isEmpty: Boolean = head eq null
/**
* Indicates if the list has at least one element or not. This operation has constant cost.
*/
final def nonEmpty: Boolean = head ne null
/**
* Indicates if the list is empty or not. This operation has constant cost.
*/
final def size: Int = sizeInner(head, 0)
/**
* Gives back the list containing all the elements except the first. This operation has constant cost.
*
* *Warning:* as the underlying list nodes (the [[akka.dispatch.sysmsg.SystemMessage]] instances) are mutable, care
* should be taken when passing the tail to other methods. [[akka.dispatch.sysmsg.SystemMessage#unlink]] should be
* called on the head if one wants to detach the tail permanently.
*/
final def tail: LatestFirstSystemMessageList = new LatestFirstSystemMessageList(head.next)
/**
* Reverses the list. This operation mutates the underlying list. The cost of the call to reverse is linear in the
* number of elements.
*
* The type of the returned list is of the opposite order: [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]]
*/
final def reverse: EarliestFirstSystemMessageList = new EarliestFirstSystemMessageList(reverseInner(head, null))
/**
* Attaches a message to the current head of the list. This operation has constant cost.
*/
final def ::(msg: SystemMessage): LatestFirstSystemMessageList = {
assert(msg ne null)
msg.next = head
new LatestFirstSystemMessageList(msg)
}
}
/**
*
* INTERNAL API
*
* Value class supporting list operations on system messages. The `next` field of [[akka.dispatch.sysmsg.SystemMessage]]
* is hidden, and can only accessed through the value classes [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]] and
* [[akka.dispatch.sysmsg.EarliestFirstSystemMessageList]], abstracting over the fact that system messages are the
* list nodes themselves. If used properly, this stays a compile time construct without any allocation overhead.
*
* This list is mutable.
*
* This list type also encodes that the messages contained are in reverse order, i.e. the head of the list is the
* latest appended element.
*
*/
class EarliestFirstSystemMessageList(val head: SystemMessage) extends AnyVal {
import SystemMessageList._
/**
* Indicates if the list is empty or not. This operation has constant cost.
*/
final def isEmpty: Boolean = head eq null
/**
* Indicates if the list has at least one element or not. This operation has constant cost.
*/
final def nonEmpty: Boolean = head ne null
/**
* Indicates if the list is empty or not. This operation has constant cost.
*/
final def size: Int = sizeInner(head, 0)
/**
* Gives back the list containing all the elements except the first. This operation has constant cost.
*
* *Warning:* as the underlying list nodes (the [[akka.dispatch.sysmsg.SystemMessage]] instances) are mutable, care
* should be taken when passing the tail to other methods. [[akka.dispatch.sysmsg.SystemMessage#unlink]] should be
* called on the head if one wants to detach the tail permanently.
*/
final def tail: EarliestFirstSystemMessageList = new EarliestFirstSystemMessageList(head.next)
/**
* Reverses the list. This operation mutates the underlying list. The cost of the call to reverse is linear in the
* number of elements.
*
* The type of the returned list is of the opposite order: [[akka.dispatch.sysmsg.LatestFirstSystemMessageList]]
*/
final def reverse: LatestFirstSystemMessageList = new LatestFirstSystemMessageList(reverseInner(head, null))
/**
* Attaches a message to the current head of the list. This operation has constant cost.
*/
final def ::(msg: SystemMessage): EarliestFirstSystemMessageList = {
assert(msg ne null)
msg.next = head
new EarliestFirstSystemMessageList(msg)
}
/**
* Prepends a list in a reversed order to the head of this list. The prepended list will be reversed during the process.
*
* Example: (3, 4, 5) reversePrepend (2, 1, 0) == (0, 1, 2, 3, 4, 5)
*
* The cost of this operation is linear in the size of the list that is to be prepended.
*/
final def reverse_:::(other: LatestFirstSystemMessageList): EarliestFirstSystemMessageList = {
var remaining = other
var result = this
while (remaining.nonEmpty) {
val msg = remaining.head
remaining = remaining.tail
result ::= msg
}
result
}
}
/**
* System messages are handled specially: they form their own queue within
* each actors mailbox. This queue is encoded in the messages themselves to
* avoid extra allocations and overhead. The next pointer is a normal var, and
* it does not need to be volatile because in the enqueuing method its update
* is immediately succeeded by a volatile write and all reads happen after the
* volatile read in the dequeuing thread. Afterwards, the obtained list of
* system messages is handled in a single thread only and not ever passed around,
* hence no further synchronization is needed.
*
* INTERNAL API
*
* ➡➡➡ NEVER SEND THE SAME SYSTEM MESSAGE OBJECT TO TWO ACTORS ⬅⬅⬅
*/
private[akka] sealed trait SystemMessage extends PossiblyHarmful with Serializable {
// Next fields are only modifiable via the SystemMessageList value class
@transient
private[sysmsg] var next: SystemMessage = _
def unlink(): Unit = next = null
def unlinked: Boolean = next eq null
}
trait StashWhenWaitingForChildren
trait StashWhenFailed
/**
* INTERNAL API
*/
@SerialVersionUID(-4836972106317757555L)
private[akka] case class Create(uid: Int) extends SystemMessage // send to self from Dispatcher.register
/**
* INTERNAL API
*/
@SerialVersionUID(686735569005808256L)
private[akka] case class Recreate(cause: Throwable) extends SystemMessage with StashWhenWaitingForChildren // sent to self from ActorCell.restart
/**
* INTERNAL API
*/
@SerialVersionUID(7270271967867221401L)
private[akka] case class Suspend() extends SystemMessage with StashWhenWaitingForChildren // sent to self from ActorCell.suspend
/**
* INTERNAL API
*/
@SerialVersionUID(-2567504317093262591L)
private[akka] case class Resume(causedByFailure: Throwable) extends SystemMessage with StashWhenWaitingForChildren // sent to self from ActorCell.resume
/**
* INTERNAL API
*/
@SerialVersionUID(708873453777219599L)
private[akka] case class Terminate() extends SystemMessage // sent to self from ActorCell.stop
/**
* INTERNAL API
*/
@SerialVersionUID(3245747602115485675L)
private[akka] case class Supervise(child: ActorRef, async: Boolean, uid: Int) extends SystemMessage // sent to supervisor ActorRef from ActorCell.start
/**
* INTERNAL API
*/
@SerialVersionUID(5513569382760799668L)
private[akka] case class ChildTerminated(child: ActorRef) extends SystemMessage // sent to supervisor from ActorCell.doTerminate
/**
* INTERNAL API
*/
@SerialVersionUID(3323205435124174788L)
private[akka] case class Watch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to establish a DeathWatch
/**
* INTERNAL API
*/
@SerialVersionUID(6363620903363658256L)
private[akka] case class Unwatch(watchee: ActorRef, watcher: ActorRef) extends SystemMessage // sent to tear down a DeathWatch
/**
* INTERNAL API
*/
@SerialVersionUID(-5475916034683997987L)
private[akka] case object NoMessage extends SystemMessage // switched into the mailbox to signal termination
/**
* INTERNAL API
*/
@SerialVersionUID(3L)
private[akka] case class Failed(child: ActorRef, cause: Throwable, uid: Int) extends SystemMessage
with StashWhenFailed
with StashWhenWaitingForChildren

View file

@ -7,7 +7,7 @@ import language.implicitConversions
import java.util.concurrent.TimeoutException
import akka.actor._
import akka.dispatch._
import akka.dispatch.sysmsg._
import scala.annotation.tailrec
import scala.util.control.NonFatal
import scala.concurrent.{ Future, Promise, ExecutionContext }

View file

@ -6,7 +6,7 @@ package akka.pattern
import akka.actor._
import akka.util.{ Timeout }
import akka.dispatch.{ Unwatch, Watch }
import akka.dispatch.sysmsg.{ Unwatch, Watch }
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.util.Success

View file

@ -18,7 +18,7 @@ import akka.actor.Props
import akka.actor.Scheduler
import akka.actor.Scope
import akka.actor.Terminated
import akka.dispatch.ChildTerminated
import akka.dispatch.sysmsg.ChildTerminated
import akka.event.EventStream
import akka.japi.Util.immutableSeq
import akka.remote.RemoteActorRefProvider

View file

@ -111,6 +111,8 @@ implementation; it is always possible to add stricter guarantees on top of
basic ones, but it is not possible to retro-actively remove guarantees in order
to gain more performance.
.. _message-ordering:
Discussion: Message Ordering
----------------------------
@ -153,6 +155,22 @@ Causal transitive ordering would imply that ``M2`` is never received before
violated due to different message delivery latencies when ``A``, ``B`` and
``C`` reside on different network hosts, see more below.
Communication of failure
........................
Please note, that the ordering guarantees discussed above only hold for user messages between actors. Failure of a child
of an actor is communicated by special system messages that are not ordered relative to ordinary user messages. In
particular:
Child actor ``C`` sends message ``M`` to its parent ``P``
Child actor fails with failure ``F``
Parent actor ``P`` might receive the two events either in order ``M``, ``F`` or ``F``, ``M``
The reason for this is that internal system messages has their own mailboxes therefore the ordering of enqueue calls of
a user and system message cannot guarantee the ordering of their dequeue times.
The Rules for In-JVM (Local) Message Sends
==========================================

View file

@ -55,6 +55,15 @@ actors cannot be orphaned or attached to supervisors from the outside, which
might otherwise catch them unawares. In addition, this yields a natural and
clean shutdown procedure for (sub-trees of) actor applications.
.. warning::
Supervision related parent-child communication happens by special system
messages that have their own mailboxes separate from user messages. This
implies that supervision related events are not deterministically
ordered relative to ordinary messages. In general, the user cannot influence
the order of normal messages and failure notifications. For details and
example see the :ref:`message-ordering` section.
.. _toplevel-supervisors:
The Top-Level Supervisors

View file

@ -246,6 +246,14 @@ that triggered the exception will not be received again. Any message
sent to an actor while it is being restarted will be queued to its mailbox as
usual.
.. warning::
Be aware that the ordering of failure notifications relative to user messages
is not deterministic. In particular, a parent might restart its child before
it has processed the last messages sent by the child before the failure.
See :ref:`message-ordering` for details.
Stop Hook
---------

View file

@ -359,6 +359,13 @@ that triggered the exception will not be received again. Any message
sent to an actor while it is being restarted will be queued to its mailbox as
usual.
.. warning::
Be aware that the ordering of failure notifications relative to user messages
is not deterministic. In particular, a parent might restart its child before
it has processed the last messages sent by the child before the failure.
See :ref:`message-ordering` for details.
Stop Hook
---------

View file

@ -5,7 +5,7 @@ package akka.remote
import akka.{ OnlyCauseStackTrace, AkkaException }
import akka.actor._
import akka.dispatch.SystemMessage
import akka.dispatch.sysmsg.SystemMessage
import akka.event.LoggingAdapter
import akka.pattern.pipe
import akka.remote.EndpointManager.Send

View file

@ -5,7 +5,7 @@
package akka.remote
import akka.actor._
import akka.dispatch._
import akka.dispatch.sysmsg._
import akka.event.{ Logging, LoggingAdapter, EventStream }
import akka.event.Logging.Error
import akka.serialization.{ JavaSerializer, Serialization, SerializationExtension }

View file

@ -8,7 +8,7 @@ import scala.annotation.tailrec
import scala.util.control.NonFatal
import akka.actor.{ VirtualPathContainer, Terminated, Deploy, Props, Nobody, LocalActorRef, InternalActorRef, Address, ActorSystemImpl, ActorRef, ActorPathExtractor, ActorPath, Actor, AddressTerminated }
import akka.event.LoggingAdapter
import akka.dispatch.Watch
import akka.dispatch.sysmsg.Watch
import akka.actor.ActorRefWithCell
import akka.actor.ActorRefScope
import akka.util.Switch

View file

@ -4,12 +4,9 @@
package akka.remote
import akka.dispatch.SystemMessage
import akka.event.{ LoggingAdapter, Logging }
import akka.AkkaException
import akka.serialization.Serialization
import akka.remote.RemoteProtocol._
import akka.actor._
import akka.event.LoggingAdapter
import scala.collection.immutable
import scala.concurrent.Future

View file

@ -10,7 +10,8 @@ import java.util.concurrent.locks.ReentrantLock
import scala.annotation.tailrec
import com.typesafe.config.Config
import akka.actor.{ ActorInitializationException, ExtensionIdProvider, ExtensionId, Extension, ExtendedActorSystem, ActorRef, ActorCell }
import akka.dispatch.{ MessageQueue, MailboxType, TaskInvocation, SystemMessage, Suspend, Resume, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.dispatch.{ MessageQueue, MailboxType, TaskInvocation, MessageDispatcherConfigurator, MessageDispatcher, Mailbox, Envelope, DispatcherPrerequisites, DefaultSystemMessageQueue }
import akka.dispatch.sysmsg.{ SystemMessage, Suspend, Resume }
import scala.concurrent.duration._
import akka.util.Switch
import scala.concurrent.duration.Duration

View file

@ -10,7 +10,7 @@ import scala.collection.immutable
import scala.concurrent.duration.Duration
import scala.reflect.ClassTag
import akka.actor.{ DeadLetter, ActorSystem, Terminated, UnhandledMessage }
import akka.dispatch.{ SystemMessage, Terminate }
import akka.dispatch.sysmsg.{ SystemMessage, Terminate }
import akka.event.Logging.{ Warning, LogEvent, InitializeLogger, Info, Error, Debug, LoggerInitialized }
import akka.event.Logging
import akka.actor.NoSerializationVerificationNeeded