Fixing ticket 413
This commit is contained in:
parent
e02744425d
commit
fc77a138e7
2 changed files with 13 additions and 6 deletions
|
|
@ -116,6 +116,10 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
|
||||
do {
|
||||
nextMessage.invoke
|
||||
|
||||
if (nextMessage.receiver.isBeingRestarted)
|
||||
return !self.isEmpty
|
||||
|
||||
if (throttle) { // Will be elided when false
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= throughput) ||
|
||||
|
|
|
|||
|
|
@ -75,33 +75,36 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
|
|||
* @return true if the mailbox was processed, false otherwise
|
||||
*/
|
||||
private def tryProcessMailbox(mailbox: MessageQueue): Boolean = {
|
||||
var lockAcquiredOnce = false
|
||||
var mailboxWasProcessed = false
|
||||
|
||||
// this do-wile loop is required to prevent missing new messages between the end of processing
|
||||
// the mailbox and releasing the lock
|
||||
do {
|
||||
if (mailbox.dispatcherLock.tryLock) {
|
||||
lockAcquiredOnce = true
|
||||
try {
|
||||
processMailbox(mailbox)
|
||||
mailboxWasProcessed = processMailbox(mailbox)
|
||||
} finally {
|
||||
mailbox.dispatcherLock.unlock
|
||||
}
|
||||
}
|
||||
} while ((lockAcquiredOnce && !mailbox.isEmpty))
|
||||
} while ((mailboxWasProcessed && !mailbox.isEmpty))
|
||||
|
||||
lockAcquiredOnce
|
||||
mailboxWasProcessed
|
||||
}
|
||||
|
||||
/**
|
||||
* Process the messages in the mailbox of the given actor.
|
||||
* @return
|
||||
*/
|
||||
private def processMailbox(mailbox: MessageQueue) = {
|
||||
private def processMailbox(mailbox: MessageQueue): Boolean = {
|
||||
var messageInvocation = mailbox.dequeue
|
||||
while (messageInvocation ne null) {
|
||||
messageInvocation.invoke
|
||||
if (messageInvocation.receiver.isBeingRestarted)
|
||||
return false
|
||||
messageInvocation = mailbox.dequeue
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
private def findThief(receiver: ActorRef): Option[ActorRef] = {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue