Naïve implementation of timeout completed
This commit is contained in:
parent
b745f9839d
commit
c90580a026
2 changed files with 17 additions and 6 deletions
|
|
@ -7,6 +7,7 @@ package se.scalablesolutions.akka.dispatch
|
|||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
|
||||
import java.util.concurrent._
|
||||
import atomic. {AtomicReference, AtomicLong}
|
||||
import se.scalablesolutions.akka.util. {Switch, ReentrantGuard, Logging, HashCode}
|
||||
import se.scalablesolutions.akka.actor._
|
||||
|
||||
|
|
@ -57,6 +58,7 @@ final class MessageInvocation(val receiver: ActorRef,
|
|||
trait MessageDispatcher extends MailboxFactory with Logging {
|
||||
protected val uuids = new ConcurrentSkipListSet[Uuid]
|
||||
protected val guard = new ReentrantGuard
|
||||
private val scheduledShutdown = new AtomicReference[ScheduledFuture[AnyRef]](null)
|
||||
protected val active = new Switch(false)
|
||||
|
||||
/**
|
||||
|
|
@ -78,22 +80,30 @@ trait MessageDispatcher extends MailboxFactory with Logging {
|
|||
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started")
|
||||
|
||||
protected def register(actorRef: ActorRef) {
|
||||
if (uuids.isEmpty()) {
|
||||
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
|
||||
if (uuids add actorRef.uuid) {
|
||||
val future = scheduledShutdown.getAndSet(null)
|
||||
if (future ne null) future.cancel(false)
|
||||
}
|
||||
if (active.isOff) {
|
||||
active.switchOn {
|
||||
start
|
||||
}
|
||||
}
|
||||
if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef)
|
||||
uuids add actorRef.uuid
|
||||
}
|
||||
|
||||
protected def unregister(actorRef: ActorRef) = {
|
||||
if (uuids remove actorRef.uuid) {
|
||||
actorRef.mailbox = null
|
||||
if (uuids.isEmpty){
|
||||
active switchOff {
|
||||
shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
val future = scheduledShutdown.getAndSet(Scheduler.scheduleOnce(() => guard withGuard {
|
||||
if (uuids.isEmpty()) {
|
||||
active switchOff {
|
||||
shutdown // shut down in the dispatcher's references is zero
|
||||
}
|
||||
}
|
||||
}, 1, TimeUnit.SECONDS))
|
||||
if (future ne null) future.cancel(false)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue