Rewrote timed shutdown facility, causes less than 5% overhead now
This commit is contained in:
parent
c90580a026
commit
a630caecf3
1 changed files with 37 additions and 14 deletions
|
|
@ -7,7 +7,7 @@ package se.scalablesolutions.akka.dispatch
|
|||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
|
||||
import java.util.concurrent._
|
||||
import atomic. {AtomicReference, AtomicLong}
|
||||
import atomic. {AtomicInteger, AtomicBoolean, AtomicReference, AtomicLong}
|
||||
import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode}
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
|
|
@ -52,13 +52,19 @@ final class MessageInvocation(val receiver: ActorRef,
|
|||
}
|
||||
}
|
||||
|
||||
object MessageDispatcher {
|
||||
val UNSCHEDULED = 0
|
||||
val SCHEDULED = 1
|
||||
val RESCHEDULED = 2
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageDispatcher extends MailboxFactory with Logging {
|
||||
protected val uuids = new ConcurrentSkipListSet[Uuid]
|
||||
protected val guard = new ReentrantGuard
|
||||
private val scheduledShutdown = new AtomicReference[ScheduledFuture[AnyRef]](null)
|
||||
private val shutdownSchedule = new AtomicInteger(MessageDispatcher.UNSCHEDULED)
|
||||
protected val active = new Switch(false)
|
||||
|
||||
/**
|
||||
|
|
@ -81,10 +87,7 @@ trait MessageDispatcher extends MailboxFactory with Logging {
|
|||
|
||||
protected def register(actorRef: ActorRef) {
|
||||
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
|
||||
if (uuids add actorRef.uuid) {
|
||||
val future = scheduledShutdown.getAndSet(null)
|
||||
if (future ne null) future.cancel(false)
|
||||
}
|
||||
uuids add actorRef.uuid
|
||||
if (active.isOff) {
|
||||
active.switchOn {
|
||||
start
|
||||
|
|
@ -96,14 +99,14 @@ trait MessageDispatcher extends MailboxFactory with Logging {
|
|||
if (uuids remove actorRef.uuid) {
|
||||
actorRef.mailbox = null
|
||||
if (uuids.isEmpty){
|
||||
val future = scheduledShutdown.getAndSet(Scheduler.scheduleOnce(() => guard withGuard {
|
||||
if (uuids.isEmpty()) {
|
||||
active switchOff {
|
||||
shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS))
|
||||
if (future ne null) future.cancel(false)
|
||||
shutdownSchedule.get() match {
|
||||
case MessageDispatcher.UNSCHEDULED =>
|
||||
shutdownSchedule.set(MessageDispatcher.SCHEDULED)
|
||||
Scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
case MessageDispatcher.SCHEDULED =>
|
||||
shutdownSchedule.set(MessageDispatcher.RESCHEDULED)
|
||||
case MessageDispatcher.RESCHEDULED => //Already marked for reschedule
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -123,6 +126,26 @@ trait MessageDispatcher extends MailboxFactory with Logging {
|
|||
}
|
||||
}
|
||||
|
||||
private val shutdownAction = new Runnable {
|
||||
def run = guard withGuard {
|
||||
shutdownSchedule.get() match {
|
||||
case MessageDispatcher.RESCHEDULED =>
|
||||
shutdownSchedule.set(MessageDispatcher.SCHEDULED)
|
||||
Scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
case MessageDispatcher.SCHEDULED =>
|
||||
if (uuids.isEmpty()) {
|
||||
active switchOff {
|
||||
shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
}
|
||||
shutdownSchedule.set(MessageDispatcher.UNSCHEDULED)
|
||||
case MessageDispatcher.UNSCHEDULED => //Do nothing
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected def timeoutMs: Long = 1000
|
||||
|
||||
/**
|
||||
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue