diff --git a/akka-actor/src/main/scala/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/dispatch/MessageHandling.scala index a7bc8165aa..dd55384151 100644 --- a/akka-actor/src/main/scala/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/dispatch/MessageHandling.scala @@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.concurrent._ -import atomic. {AtomicReference, AtomicLong} +import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong} import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode} import se.scalablesolutions.akka.actor._ @@ -52,13 +52,19 @@ final class MessageInvocation(val receiver: ActorRef, } } +object MessageDispatcher { + val UNSCHEDULED = 0 + val SCHEDULED = 1 + val RESCHEDULED = 2 +} + /** * @author Jonas Bonér */ trait MessageDispatcher extends MailboxFactory with Logging { protected val uuids = new ConcurrentSkipListSet[Uuid] protected val guard = new ReentrantGuard - private val scheduledShutdown = new AtomicReference[ScheduledFuture[AnyRef]](null) + private val shutdownSchedule = new AtomicInteger(MessageDispatcher.UNSCHEDULED) protected val active = new Switch(false) /** @@ -81,10 +87,7 @@ trait MessageDispatcher extends MailboxFactory with Logging { protected def register(actorRef: ActorRef) { if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) - if (uuids add actorRef.uuid) { - val future = scheduledShutdown.getAndSet(null) - if (future ne null) future.cancel(false) - } + uuids add actorRef.uuid if (active.isOff) { active.switchOn { start @@ -96,14 +99,14 @@ trait MessageDispatcher extends MailboxFactory with Logging { if (uuids remove actorRef.uuid) { actorRef.mailbox = null if (uuids.isEmpty){ - val future = scheduledShutdown.getAndSet(Scheduler.scheduleOnce(() => guard withGuard { - if (uuids.isEmpty()) { - active switchOff { - shutdown // shut down in the dispatcher's references is zero - } - } - }, 1, TimeUnit.SECONDS)) - if (future ne null) future.cancel(false) + shutdownSchedule.get() match { + case MessageDispatcher.UNSCHEDULED => + shutdownSchedule.set(MessageDispatcher.SCHEDULED) + Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + case MessageDispatcher.SCHEDULED => + shutdownSchedule.set(MessageDispatcher.RESCHEDULED) + case MessageDispatcher.RESCHEDULED => //Already marked for reschedule + } } } } @@ -123,6 +126,26 @@ trait MessageDispatcher extends MailboxFactory with Logging { } } + private val shutdownAction = new Runnable { + def run = guard withGuard { + shutdownSchedule.get() match { + case MessageDispatcher.RESCHEDULED => + shutdownSchedule.set(MessageDispatcher.SCHEDULED) + Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) + case MessageDispatcher.SCHEDULED => + if (uuids.isEmpty()) { + active switchOff { + shutdown // shut down in the dispatcher's references is zero + } + } + shutdownSchedule.set(MessageDispatcher.UNSCHEDULED) + case MessageDispatcher.UNSCHEDULED => //Do nothing + } + } + } + + protected def timeoutMs: Long = 1000 + /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference */