Initial attempt at suspend/resume
This commit is contained in:
parent
3a9994bbe0
commit
ae3768c266
8 changed files with 99 additions and 38 deletions
|
|
@ -89,6 +89,8 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
private[akka] val active = new Switch(false)
|
||||
|
||||
val name = "akka:event-driven:dispatcher:" + _name
|
||||
|
||||
//Initialize
|
||||
init
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = {
|
||||
|
|
@ -145,7 +147,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
}
|
||||
|
||||
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
|
||||
if (mbox.dispatcherLock.tryLock()) {
|
||||
if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) {
|
||||
try {
|
||||
executor execute mbox
|
||||
} catch {
|
||||
|
|
@ -158,8 +160,20 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
|
||||
override val toString = getClass.getSimpleName + "[" + name + "]"
|
||||
|
||||
def suspend(actorRef: ActorRef) {
|
||||
log.debug("Suspending %s",actorRef.uuid)
|
||||
getMailbox(actorRef).suspended.switchOn
|
||||
}
|
||||
|
||||
def resume(actorRef: ActorRef) {
|
||||
log.debug("Resuming %s",actorRef.uuid)
|
||||
val mbox = getMailbox(actorRef)
|
||||
mbox.suspended.switchOff
|
||||
registerForExecution(mbox)
|
||||
}
|
||||
|
||||
// FIXME: should we have an unbounded queue and not bounded as default ????
|
||||
private[akka] def init = {
|
||||
private[akka] def init {
|
||||
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||
config(this)
|
||||
buildThreadPool
|
||||
|
|
@ -189,28 +203,33 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
|
|||
* @return true if the processing finished before the mailbox was empty, due to the throughput constraint
|
||||
*/
|
||||
final def processMailbox(): Boolean = {
|
||||
var nextMessage = self.dequeue
|
||||
if (nextMessage ne null) {
|
||||
val throttle = dispatcher.throughput > 0
|
||||
var processedMessages = 0
|
||||
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
|
||||
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
|
||||
do {
|
||||
nextMessage.invoke
|
||||
if (self.suspended.isOn)
|
||||
true
|
||||
else {
|
||||
var nextMessage = self.dequeue
|
||||
if (nextMessage ne null) {
|
||||
val throttle = dispatcher.throughput > 0
|
||||
var processedMessages = 0
|
||||
val isDeadlineEnabled = throttle && dispatcher.throughputDeadlineTime > 0
|
||||
val started = if (isDeadlineEnabled) System.currentTimeMillis else 0
|
||||
do {
|
||||
nextMessage.invoke
|
||||
|
||||
if (nextMessage.receiver.isBeingRestarted)
|
||||
return !self.isEmpty
|
||||
if (throttle) { // Will be elided when false
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= dispatcher.throughput) ||
|
||||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
|
||||
return !self.isEmpty
|
||||
}
|
||||
|
||||
if (throttle) { // Will be elided when false
|
||||
processedMessages += 1
|
||||
if ((processedMessages >= dispatcher.throughput) ||
|
||||
(isDeadlineEnabled && (System.currentTimeMillis - started) >= dispatcher.throughputDeadlineTime)) // If we're throttled, break out
|
||||
return !self.isEmpty
|
||||
}
|
||||
nextMessage = self.dequeue
|
||||
} while (nextMessage ne null)
|
||||
if (self.suspended.isOn)
|
||||
return true
|
||||
|
||||
nextMessage = self.dequeue
|
||||
} while (nextMessage ne null)
|
||||
}
|
||||
false
|
||||
}
|
||||
false
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue