Adding support for BoundedTransferQueue to EBEDD

This commit is contained in:
Viktor Klang 2010-09-06 19:12:05 +02:00
parent db5a8c1b6a
commit 4841996520
7 changed files with 60 additions and 29 deletions

View file

@ -65,14 +65,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
class ExecutorBasedEventDrivenDispatcher(
_name: String,
throughput: Int = Dispatchers.THROUGHPUT,
capacity: Int = Dispatchers.MAILBOX_CAPACITY,
mailboxBounds: BoundedMailbox = Dispatchers.MAILBOX_BOUNDS,
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,BoundedMailbox(capacity,None))
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
mailboxCapacity = capacity
mailboxCapacity = mailboxBounds.capacity
@volatile private var active: Boolean = false
@ -92,8 +93,14 @@ class ExecutorBasedEventDrivenDispatcher(
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
override def createMailbox(actorRef: ActorRef): AnyRef = {
if (mailboxCapacity <= 0) new ConcurrentLinkedQueue[MessageInvocation]
else new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
if (mailboxCapacity <= 0)
new ConcurrentLinkedQueue[MessageInvocation]
else if (mailboxBounds.pushTimeOut.isDefined) {
val timeout = mailboxBounds.pushTimeOut.get
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,timeout.length,timeout.unit)
}
else
new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
}
def dispatch(receiver: ActorRef): Unit = if (active) {