diff --git a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 45b1dfa886..5cbb6b3c8b 100644 --- a/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -141,6 +141,7 @@ class ExecutorBasedEventDrivenDispatcher( } } + private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = if (active.isOn) { if (mbox.suspended.isOff && mbox.dispatcherLock.tryLock()) { try { diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index 919d4b674d..a7bc8165aa 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -7,6 +7,7 @@ package se.scalablesolutions.akka.dispatch import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.concurrent._ +import atomic. {AtomicReference, AtomicLong} import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode} import se.scalablesolutions.akka.actor._ @@ -57,6 +58,7 @@ final class MessageInvocation(val receiver: ActorRef, trait MessageDispatcher extends MailboxFactory with Logging { protected val uuids = new ConcurrentSkipListSet[Uuid] protected val guard = new ReentrantGuard + private val scheduledShutdown = new AtomicReference[ScheduledFuture[AnyRef]](null) protected val active = new Switch(false) /** @@ -78,22 +80,30 @@ trait MessageDispatcher extends MailboxFactory with Logging { } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") protected def register(actorRef: ActorRef) { - if (uuids.isEmpty()) { + if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) + if (uuids add actorRef.uuid) { + val future = scheduledShutdown.getAndSet(null) + if (future ne null) future.cancel(false) + } + if (active.isOff) { active.switchOn { start } } - if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) - uuids add actorRef.uuid } protected def unregister(actorRef: ActorRef) = { if (uuids remove actorRef.uuid) { actorRef.mailbox = null if (uuids.isEmpty){ - active switchOff { - shutdown // shut down in the dispatcher's references is zero - } + val future = scheduledShutdown.getAndSet(Scheduler.scheduleOnce(() => guard withGuard { + if (uuids.isEmpty()) { + active switchOff { + shutdown // shut down in the dispatcher's references is zero + } + } + }, 1, TimeUnit.SECONDS)) + if (future ne null) future.cancel(false) } } }