Partial fix for the raciness of BalancingDispatcher
This commit is contained in:
parent
a38a26f8bd
commit
288287e182
5 changed files with 50 additions and 45 deletions
|
|
@ -20,7 +20,6 @@ final case class Envelope(val receiver: ActorCell, val message: Any, val channel
|
|||
if (receiver eq null) throw new IllegalArgumentException("Receiver can't be null")
|
||||
|
||||
final def invoke() {
|
||||
System.err.println("Invoking message [" + message + "] for " + receiver + " with channel " + channel)
|
||||
receiver invoke this
|
||||
}
|
||||
}
|
||||
|
|
@ -38,7 +37,6 @@ final case class SystemEnvelope(val receiver: ActorCell, val message: SystemMess
|
|||
* @return whether to proceed with processing other messages
|
||||
*/
|
||||
final def invoke(): Unit = {
|
||||
System.err.println("Invoking System message [" + message + "] for " + receiver + " with channel " + channel)
|
||||
receiver systemInvoke this
|
||||
}
|
||||
}
|
||||
|
|
@ -84,12 +82,13 @@ abstract class MessageDispatcher extends Serializable {
|
|||
/**
|
||||
* Create a blackhole mailbox for the purpose of replacing the real one upon actor termination
|
||||
*/
|
||||
protected[akka] val deadLetterMailbox = new Mailbox {
|
||||
protected[akka] val deadLetterMailbox: Mailbox = DeadLetterMailbox
|
||||
|
||||
object DeadLetterMailbox extends Mailbox {
|
||||
dispatcherLock.tryLock()
|
||||
become(Mailbox.CLOSED)
|
||||
override def become(newStatus: Mailbox.Status) { super.become(Mailbox.CLOSED) } //Always transcend to CLOSED to preserve the volatile write
|
||||
override def dispatcher = null //MessageDispatcher.this
|
||||
dispatcherLock.tryLock()
|
||||
|
||||
override def enqueue(envelope: Envelope) { envelope.channel sendException new ActorKilledException("Actor has been stopped") }
|
||||
override def dequeue() = null
|
||||
override def systemEnqueue(handle: SystemEnvelope): Unit = ()
|
||||
|
|
@ -165,18 +164,17 @@ abstract class MessageDispatcher extends Serializable {
|
|||
* and only call it under the dispatcher-guard, see "attach" for the only invocation
|
||||
*/
|
||||
protected[akka] def register(actor: ActorCell): Unit = {
|
||||
if (actor.mailbox eq null) {
|
||||
val mbox = createMailbox(actor)
|
||||
actor.mailbox = mbox
|
||||
systemDispatch(SystemEnvelope(actor, Create, NullChannel))
|
||||
}
|
||||
|
||||
uuids add actor.uuid
|
||||
if (active.isOff) {
|
||||
active.switchOn {
|
||||
start()
|
||||
if (uuids add actor.uuid) {
|
||||
if (actor.mailbox eq null) {
|
||||
actor.mailbox = createMailbox(actor)
|
||||
systemDispatch(SystemEnvelope(actor, Create, NullChannel))
|
||||
}
|
||||
}
|
||||
if (active.isOff) {
|
||||
active.switchOn {
|
||||
start()
|
||||
}
|
||||
}
|
||||
} else System.err.println("Couldn't register: " + actor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -186,7 +184,8 @@ abstract class MessageDispatcher extends Serializable {
|
|||
protected[akka] def unregister(actor: ActorCell): Unit = {
|
||||
if (uuids remove actor.uuid) {
|
||||
val mailBox = actor.mailbox
|
||||
actor.mailbox = deadLetterMailbox //FIXME switch to getAndSet semantics
|
||||
mailBox.become(Mailbox.CLOSED)
|
||||
actor.mailbox = deadLetterMailbox //FIXME getAndSet would be preferrable here
|
||||
cleanUpMailboxFor(actor, mailBox)
|
||||
if (uuids.isEmpty && _tasks.get == 0) {
|
||||
shutdownSchedule match {
|
||||
|
|
@ -198,7 +197,7 @@ abstract class MessageDispatcher extends Serializable {
|
|||
case RESCHEDULED ⇒ //Already marked for reschedule
|
||||
}
|
||||
}
|
||||
}
|
||||
} else System.err.println("Couldn't unregister: " + actor)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue