From 485013a353d952ddeb4fb294a6929a2ab53c33f0 Mon Sep 17 00:00:00 2001 From: Derek Williams Date: Wed, 27 Apr 2011 20:45:39 -0600 Subject: [PATCH] Dispatcher executed Future will be cleaned up even after expiring --- .../test/scala/akka/dispatch/FutureSpec.scala | 14 ++-- .../ExecutorBasedEventDrivenDispatcher.scala | 2 +- .../src/main/scala/akka/dispatch/Future.scala | 7 +- .../scala/akka/dispatch/MessageHandling.scala | 71 ++++++++++--------- .../testkit/CallingThreadDispatcher.scala | 2 +- 5 files changed, 49 insertions(+), 47 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala index d848eede53..1f7dae9270 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/FutureSpec.scala @@ -324,7 +324,7 @@ class FutureSpec extends JUnitSuite { assert(f3.resultOrException === Some("SUCCESS")) // make sure all futures are completed in dispatcher - assert(Dispatchers.defaultGlobalDispatcher.futureQueueSize === 0) + assert(Dispatchers.defaultGlobalDispatcher.pendingFutures === 0) } @Test def shouldBlockUntilResult { @@ -341,6 +341,10 @@ class FutureSpec extends JUnitSuite { intercept[FutureTimeoutException] { f3() } + Thread.sleep(100) + + // make sure all futures are completed in dispatcher + assert(Dispatchers.defaultGlobalDispatcher.pendingFutures === 0) } @Test def shouldNotAddOrRunCallbacksAfterFailureToBeCompletedBeforeExpiry { @@ -378,11 +382,11 @@ class FutureSpec extends JUnitSuite { @Test def ticket812FutureDispatchCleanup { val dispatcher = implicitly[MessageDispatcher] - assert(dispatcher.futureQueueSize === 0) + assert(dispatcher.pendingFutures === 0) val future = Future({Thread.sleep(100);"Done"}, 10) intercept[FutureTimeoutException] { future.await } - assert(dispatcher.futureQueueSize === 1) - Thread.sleep(200) - assert(dispatcher.futureQueueSize === 0) + assert(dispatcher.pendingFutures === 1) + Thread.sleep(100) + assert(dispatcher.pendingFutures === 0) } } diff --git a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 494fa85f28..dca2f2f822 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -99,7 +99,7 @@ class ExecutorBasedEventDrivenDispatcher( registerForExecution(mbox) } - private[akka] def executeFuture(invocation: FutureInvocation): Unit = if (active.isOn) { + private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit = if (active.isOn) { try executorService.get() execute invocation catch { case e: RejectedExecutionException => diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 1f2c8d63e4..72cab081a2 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -232,11 +232,8 @@ object Future { * This method constructs and returns a Future that will eventually hold the result of the execution of the supplied body * The execution is performed by the specified Dispatcher. */ - def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = { - val f = new DefaultCompletableFuture[T](timeout) - dispatcher.dispatchFuture(FutureInvocation(f.asInstanceOf[CompletableFuture[Any]], () => body)) - f - } + def apply[T](body: => T, timeout: Long = Actor.TIMEOUT)(implicit dispatcher: MessageDispatcher): Future[T] = + dispatcher.dispatchFuture(() => body, timeout) /** * Construct a completable channel diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 9e53bb09ca..8261a0f485 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -5,6 +5,7 @@ package akka.dispatch import java.util.concurrent._ +import java.util.concurrent.atomic.AtomicLong import akka.event.EventHandler import akka.config.Configuration import akka.config.Config.TIME_UNIT @@ -29,16 +30,18 @@ final case class MessageInvocation(val receiver: ActorRef, } } -final case class FutureInvocation(future: CompletableFuture[Any], function: () => Any) extends Runnable { - val uuid = akka.actor.newUuid - - def run = future complete (try { - Right(function.apply) - } catch { - case e => - EventHandler.error(e, this, e.getMessage) - Left(e) - }) +final case class FutureInvocation[T](future: CompletableFuture[T], function: () => T, cleanup: () => Unit) extends Runnable { + def run = { + future complete (try { + Right(function()) + } catch { + case e => + EventHandler.error(e, this, e.getMessage) + Left(e) + } finally { + cleanup() + }) + } } object MessageDispatcher { @@ -56,7 +59,7 @@ trait MessageDispatcher { import MessageDispatcher._ protected val uuids = new ConcurrentSkipListSet[Uuid] - protected val futures = new ConcurrentSkipListSet[Uuid] + protected val futures = new AtomicLong(0L) protected val guard = new ReentrantGuard protected val active = new Switch(false) @@ -83,27 +86,25 @@ trait MessageDispatcher { private[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation) - private[akka] final def dispatchFuture(invocation: FutureInvocation): Unit = { - guard withGuard { - futures add invocation.uuid - if (active.isOff) { active.switchOn { start } } - } - invocation.future.onComplete { f => - guard withGuard { - futures remove invocation.uuid - if (futures.isEmpty && uuids.isEmpty) { - shutdownSchedule match { - case UNSCHEDULED => - shutdownSchedule = SCHEDULED - Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) - case SCHEDULED => - shutdownSchedule = RESCHEDULED - case RESCHEDULED => //Already marked for reschedule - } - } + private[akka] final def dispatchFuture[T](block: () => T, timeout: Long): Future[T] = { + futures.getAndIncrement() + val future = new DefaultCompletableFuture[T](timeout) + if (active.isOff) { active.switchOn { start } } + executeFuture(FutureInvocation[T](future, block, futureCleanup)) + future + } + + private val futureCleanup: () => Unit = { () => + if (futures.decrementAndGet() == 0 && uuids.isEmpty) { + shutdownSchedule match { + case UNSCHEDULED => + shutdownSchedule = SCHEDULED + Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + case SCHEDULED => + shutdownSchedule = RESCHEDULED + case RESCHEDULED => //Already marked for reschedule } } - executeFuture(invocation) } private[akka] def register(actorRef: ActorRef) { @@ -121,7 +122,7 @@ trait MessageDispatcher { private[akka] def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { actorRef.mailbox = null - if (uuids.isEmpty && futures.isEmpty){ + if (uuids.isEmpty && futures.get == 0){ shutdownSchedule match { case UNSCHEDULED => shutdownSchedule = SCHEDULED @@ -155,7 +156,7 @@ trait MessageDispatcher { shutdownSchedule = SCHEDULED Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED => - if (uuids.isEmpty() && futures.isEmpty) { + if (uuids.isEmpty() && futures.get == 0) { active switchOff { shutdown // shut down in the dispatcher's references is zero } @@ -187,7 +188,7 @@ trait MessageDispatcher { */ private[akka] def dispatch(invocation: MessageInvocation): Unit - private[akka] def executeFuture(invocation: FutureInvocation): Unit + private[akka] def executeFuture(invocation: FutureInvocation[_]): Unit /** * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown @@ -205,9 +206,9 @@ trait MessageDispatcher { def mailboxSize(actorRef: ActorRef): Int /** - * Returns the size of the Future queue + * Returns the amount of futures queued for execution */ - def futureQueueSize: Int = futures.size + def pendingFutures: Long = futures.get } /** diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 971ee2e89f..dcf20158d8 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -156,7 +156,7 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa if (execute) runQueue(mbox, queue) } - private[akka] override def executeFuture(invocation: FutureInvocation) { invocation.run } + private[akka] override def executeFuture(invocation: FutureInvocation[_]) { invocation.run } /* * This method must be called with this thread's queue, which must already