Merge remote-tracking branch 'origin/0deps' into 0deps-future-dispatch

Conflicts:
	akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala
	akka-actor/src/main/scala/akka/dispatch/Future.scala
	akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala
This commit is contained in:
Derek Williams 2011-03-02 16:44:44 -07:00
commit d1fcb6d398
87 changed files with 2234 additions and 1999 deletions

View file

@ -4,7 +4,7 @@
package akka.dispatch
import akka.actor.{ActorRef, IllegalActorStateException}
import akka.actor.{ActorRef, IllegalActorStateException, EventHandler}
import akka.util.{ReflectiveAccess, Switch}
import java.util.Queue
@ -99,13 +99,18 @@ class ExecutorBasedEventDrivenDispatcher(
}
private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) {
executorService.get() execute invocation
} else log.slf4j.warn("{} is shut down", this)
try executorService.get() execute invocation
catch {
case e: RejectedExecutionException =>
EventHandler notifyListeners EventHandler.Warning(e, this, _name)
throw e
}
}
/**
* @return the mailbox associated with the actor
*/
private def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
protected def getMailbox(receiver: ActorRef) = receiver.mailbox.asInstanceOf[MessageQueue with ExecutableMailbox]
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
@ -120,41 +125,45 @@ class ExecutorBasedEventDrivenDispatcher(
}
}
private[akka] def start = log.slf4j.debug("Starting up {}\n\twith throughput [{}]", this, throughput)
private[akka] def start {}
private[akka] def shutdown {
val old = executorService.getAndSet(config.createLazyExecutorService(threadFactory))
if (old ne null) {
log.slf4j.debug("Shutting down {}", this)
old.shutdownNow()
}
}
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) {
if (!mbox.suspended.locked && mbox.dispatcherLock.tryLock()) {
try {
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
mbox.dispatcherLock.unlock()
throw e
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
if (mbox.dispatcherLock.tryLock()) {
if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
try {
executorService.get() execute mbox
} catch {
case e: RejectedExecutionException =>
EventHandler notifyListeners EventHandler.Warning(e, this, _name)
mbox.dispatcherLock.unlock()
throw e
}
} else {
mbox.dispatcherLock.unlock() //If the dispatcher isn't active or if the actor is suspended, unlock the dispatcher lock
}
}
} else log.slf4j.warn("{} is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t{}", this, mbox)
}
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox)
override val toString = getClass.getSimpleName + "[" + name + "]"
def suspend(actorRef: ActorRef) {
log.slf4j.debug("Suspending {}",actorRef.uuid)
getMailbox(actorRef).suspended.tryLock
}
def resume(actorRef: ActorRef) {
log.slf4j.debug("Resuming {}",actorRef.uuid)
val mbox = getMailbox(actorRef)
mbox.suspended.tryUnlock
registerForExecution(mbox)
reRegisterForExecution(mbox)
}
}
@ -174,7 +183,7 @@ trait ExecutableMailbox extends Runnable { self: MessageQueue =>
dispatcherLock.unlock()
}
if (!self.isEmpty)
dispatcher.registerForExecution(this)
dispatcher.reRegisterForExecution(this)
}
/**