Run independent futures on the dispatcher directly

This commit is contained in:
Derek Williams 2011-02-25 15:20:58 -07:00
parent a76e62096f
commit b2c62ba18d
5 changed files with 38 additions and 4 deletions

View file

@ -98,6 +98,10 @@ class ExecutorBasedEventDrivenDispatcher(
registerForExecution(mbox)
}
private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) {
executorService.get() execute invocation
} else log.slf4j.warn("{} is shut down", this)
/**
* @return the mailbox associated with the actor
*/

View file

@ -66,6 +66,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(
executorService.get() execute mbox
}
private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) {
executorService.get() execute invocation
} else log.slf4j.warn("{} is shut down", this)
/**
* Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by
* another thread (because then that thread is already processing the mailbox).

View file

@ -32,10 +32,7 @@ object Futures {
dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher)
(body: => T): Future[T] = {
val f = new DefaultCompletableFuture[T](timeout)
spawn({
try { f completeWithResult body }
catch { case e => f completeWithException e}
})(dispatcher)
dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body))
f
}

View file

@ -148,6 +148,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue =
mailbox(invocation.receiver).dispatch(invocation)
}
private[akka] def executeFuture(invocation: FutureInvocation): Unit =
parent execute invocation
// hawtdispatch does not have a way to get queue sizes, getting an accurate
// size can cause extra contention.. is this really needed?
// TODO: figure out if this can be optional in akka

View file

@ -27,6 +27,12 @@ final case class MessageInvocation(val receiver: ActorRef,
}
}
final case class FutureInvocation(future: CompletableFuture[Any], function: () => Any) extends Runnable {
val uuid = akka.actor.newUuid
def run = future complete (try { Right(function.apply) } catch { case e => Left(e) })
}
object MessageDispatcher {
val UNSCHEDULED = 0
val SCHEDULED = 1
@ -68,6 +74,24 @@ trait MessageDispatcher extends Logging {
dispatch(invocation)
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
private[akka] final def dispatchFuture(invocation: FutureInvocation): Unit = {
uuids add invocation.uuid
if (active.isOff) { active.switchOn { start } }
invocation.future.onComplete { f =>
if ((uuids remove invocation.uuid) && uuids.isEmpty) {
shutdownSchedule match {
case UNSCHEDULED =>
shutdownSchedule = SCHEDULED
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED =>
shutdownSchedule = RESCHEDULED
case RESCHEDULED => //Already marked for reschedule
}
}
}
executeFuture(invocation)
}
private[akka] def register(actorRef: ActorRef) {
if (actorRef.mailbox eq null)
actorRef.mailbox = createMailbox(actorRef)
@ -150,6 +174,8 @@ trait MessageDispatcher extends Logging {
*/
private[akka] def dispatch(invocation: MessageInvocation): Unit
private[akka] def executeFuture(invocation: FutureInvocation): Unit
/**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/