diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 2fa16eca71..b4891d58d3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -98,6 +98,10 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } + private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) { + executorService.get() execute invocation + } else log.slf4j.warn("{} is shut down", this) + /** * @return the mailbox associated with the actor */ diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 54aec2607d..5cbd52d13b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -66,6 +66,10 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher( executorService.get() execute mbox } + private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) { + executorService.get() execute invocation + } else log.slf4j.warn("{} is shut down", this) + /** * Try processing the mailbox of the given actor. Fails if the dispatching lock on the actor is already held by * another thread (because then that thread is already processing the mailbox). diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index bf23ab91d9..b39aa20fc2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -32,10 +32,7 @@ object Futures { dispatcher: MessageDispatcher = Dispatchers.defaultGlobalDispatcher) (body: => T): Future[T] = { val f = new DefaultCompletableFuture[T](timeout) - spawn({ - try { f completeWithResult body } - catch { case e => f completeWithException e} - })(dispatcher) + dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) f } diff --git a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala index e5e54ea0e9..df57f0d440 100644 --- a/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/HawtDispatcher.scala @@ -148,6 +148,9 @@ class HawtDispatcher(val aggregate: Boolean = true, val parent: DispatchQueue = mailbox(invocation.receiver).dispatch(invocation) } + private[akka] def executeFuture(invocation: FutureInvocation): Unit = + parent execute invocation + // hawtdispatch does not have a way to get queue sizes, getting an accurate // size can cause extra contention.. is this really needed? // TODO: figure out if this can be optional in akka diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 63b68e262c..b5d1276602 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -27,6 +27,12 @@ final case class MessageInvocation(val receiver: ActorRef, } } +final case class FutureInvocation(future: CompletableFuture[Any], function: () => Any) extends Runnable { + val uuid = akka.actor.newUuid + + def run = future complete (try { Right(function.apply) } catch { case e => Left(e) }) +} + object MessageDispatcher { val UNSCHEDULED = 0 val SCHEDULED = 1 @@ -68,6 +74,24 @@ trait MessageDispatcher extends Logging { dispatch(invocation) } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") + private[akka] final def dispatchFuture(invocation: FutureInvocation): Unit = { + uuids add invocation.uuid + if (active.isOff) { active.switchOn { start } } + invocation.future.onComplete { f => + if ((uuids remove invocation.uuid) && uuids.isEmpty) { + shutdownSchedule match { + case UNSCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + shutdownSchedule = RESCHEDULED + case RESCHEDULED => //Already marked for reschedule + } + } + } + executeFuture(invocation) + } + private[akka] def register(actorRef: ActorRef) { if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) @@ -150,6 +174,8 @@ trait MessageDispatcher extends Logging { */ private[akka] def dispatch(invocation: MessageInvocation): Unit + private[akka] def executeFuture(invocation: FutureInvocation): Unit + /** * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown */