Adding support for safe mailboxes
This commit is contained in:
parent
c2b85ee6e4
commit
928fa63411
6 changed files with 98 additions and 82 deletions
|
|
@ -15,7 +15,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
|
|||
protected var selectorThread: Thread = _
|
||||
protected val guard = new Object
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = queue.append(invocation)
|
||||
def dispatch(invocation: MessageInvocation) = queue enqueue invocation
|
||||
|
||||
def shutdown = if (active) {
|
||||
log.debug("Shutting down %s", toString)
|
||||
|
|
@ -34,14 +34,20 @@ class ReactiveMessageQueue(name: String) extends MessageQueue {
|
|||
private[akka] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
|
||||
@volatile private var interrupted = false
|
||||
|
||||
def append(handle: MessageInvocation) = queue.synchronized {
|
||||
queue.offer(handle)
|
||||
def enqueue(handle: MessageInvocation) = queue.synchronized {
|
||||
queue offer handle
|
||||
queue.notifyAll
|
||||
}
|
||||
|
||||
def dequeue(): MessageInvocation = queue.synchronized {
|
||||
val result = queue.poll
|
||||
queue.notifyAll
|
||||
result
|
||||
}
|
||||
|
||||
def read(destination: List[MessageInvocation]) = queue.synchronized {
|
||||
while (queue.isEmpty && !interrupted) queue.wait
|
||||
if (!interrupted) while (!queue.isEmpty) destination.add(queue.remove)
|
||||
if (!interrupted) while (!queue.isEmpty) destination add queue.remove
|
||||
else interrupted = false
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -46,10 +46,10 @@ import se.scalablesolutions.akka.util.{Duration, Logging, UUID}
|
|||
object Dispatchers extends Logging {
|
||||
val THROUGHPUT = config.getInt("akka.actor.throughput", 5)
|
||||
val MAILBOX_CAPACITY = config.getInt("akka.actor.default-dispatcher.mailbox-capacity", 1000)
|
||||
val MAILBOX_BOUNDS = BoundedMailbox(
|
||||
Dispatchers.MAILBOX_CAPACITY,
|
||||
config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").
|
||||
map(Duration(_,TimeUnit.MILLISECONDS))
|
||||
val MAILBOX_CONFIG = MailboxConfig(
|
||||
capacity = Dispatchers.MAILBOX_CAPACITY,
|
||||
pushTimeOut = config.getInt("akka.actor.default-dispatcher.mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS)),
|
||||
blockingDequeue = false
|
||||
)
|
||||
|
||||
lazy val defaultGlobalDispatcher = {
|
||||
|
|
@ -58,7 +58,7 @@ object Dispatchers extends Logging {
|
|||
|
||||
object globalHawtDispatcher extends HawtDispatcher
|
||||
|
||||
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_BOUNDS) {
|
||||
object globalExecutorBasedEventDrivenDispatcher extends ExecutorBasedEventDrivenDispatcher("global",THROUGHPUT,MAILBOX_CONFIG) {
|
||||
override def register(actor: ActorRef) = {
|
||||
if (isShutdown) init
|
||||
super.register(actor)
|
||||
|
|
@ -99,7 +99,7 @@ object Dispatchers extends Logging {
|
|||
* <p/>
|
||||
* E.g. each actor consumes its own thread.
|
||||
*/
|
||||
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, BoundedMailbox(mailboxCapacity,Option(pushTimeOut)))
|
||||
def newThreadBasedDispatcher(actor: ActorRef, mailboxCapacity: Int, pushTimeOut: Duration) = new ThreadBasedDispatcher(actor, MailboxConfig(mailboxCapacity,Option(pushTimeOut),true))
|
||||
|
||||
/**
|
||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||
|
|
@ -200,10 +200,10 @@ object Dispatchers extends Logging {
|
|||
})
|
||||
}
|
||||
|
||||
lazy val mailboxBounds: BoundedMailbox = {
|
||||
lazy val mailboxBounds: MailboxConfig = {
|
||||
val capacity = cfg.getInt("mailbox-capacity",Dispatchers.MAILBOX_CAPACITY)
|
||||
val timeout = cfg.getInt("mailbox-push-timeout-ms").map(Duration(_,TimeUnit.MILLISECONDS))
|
||||
BoundedMailbox(capacity,timeout)
|
||||
MailboxConfig(capacity,timeout,false)
|
||||
}
|
||||
|
||||
val dispatcher: Option[MessageDispatcher] = cfg.getString("type") map {
|
||||
|
|
|
|||
|
|
@ -65,15 +65,15 @@ import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}
|
|||
class ExecutorBasedEventDrivenDispatcher(
|
||||
_name: String,
|
||||
throughput: Int = Dispatchers.THROUGHPUT,
|
||||
mailboxBounds: BoundedMailbox = Dispatchers.MAILBOX_BOUNDS,
|
||||
mailboxConfig: MailboxConfig = Dispatchers.MAILBOX_CONFIG,
|
||||
config: (ThreadPoolBuilder) => Unit = _ => ()) extends MessageDispatcher with ThreadPoolBuilder {
|
||||
|
||||
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,BoundedMailbox(capacity,None))
|
||||
def this(_name: String, throughput: Int, capacity: Int) = this(_name,throughput,MailboxConfig(capacity,None,false))
|
||||
def this(_name: String, throughput: Int) = this(_name, throughput, Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
|
||||
def this(_name: String) = this(_name,Dispatchers.THROUGHPUT,Dispatchers.MAILBOX_CAPACITY) // Needed for Java API usage
|
||||
|
||||
|
||||
mailboxCapacity = mailboxBounds.capacity
|
||||
mailboxCapacity = mailboxConfig.capacity
|
||||
|
||||
@volatile private var active: Boolean = false
|
||||
|
||||
|
|
@ -92,16 +92,7 @@ class ExecutorBasedEventDrivenDispatcher(
|
|||
|
||||
override def mailboxSize(actorRef: ActorRef) = getMailbox(actorRef).size
|
||||
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxCapacity <= 0)
|
||||
new ConcurrentLinkedQueue[MessageInvocation]
|
||||
else if (mailboxBounds.pushTimeOut.isDefined) {
|
||||
val timeout = mailboxBounds.pushTimeOut.get
|
||||
new BoundedTransferQueue[MessageInvocation](mailboxCapacity,timeout.length,timeout.unit)
|
||||
}
|
||||
else
|
||||
new LinkedBlockingQueue[MessageInvocation](mailboxCapacity)
|
||||
}
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(bounds = mailboxCapacity)
|
||||
|
||||
def dispatch(receiver: ActorRef): Unit = if (active) {
|
||||
|
||||
|
|
|
|||
|
|
@ -4,14 +4,14 @@
|
|||
|
||||
package se.scalablesolutions.akka.dispatch
|
||||
|
||||
import java.util.List
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef, ActorInitializationException}
|
||||
|
||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||
import se.scalablesolutions.akka.AkkaException
|
||||
import java.util.concurrent.{ConcurrentSkipListSet}
|
||||
import se.scalablesolutions.akka.util.{Duration, HashCode, Logging}
|
||||
import java.util.{Queue, List}
|
||||
import java.util.concurrent._
|
||||
import concurrent.forkjoin.LinkedTransferQueue
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -63,13 +63,60 @@ class MessageQueueAppendFailedException(message: String) extends AkkaException(m
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
trait MessageQueue {
|
||||
def append(handle: MessageInvocation)
|
||||
def enqueue(handle: MessageInvocation)
|
||||
def dequeue(): MessageInvocation
|
||||
}
|
||||
|
||||
/* Tells the dispatcher that it should create a bounded mailbox with the specified push timeout
|
||||
* (If capacity > 0)
|
||||
*/
|
||||
case class BoundedMailbox(capacity: Int, pushTimeOut: Option[Duration])
|
||||
case class MailboxConfig(capacity: Int, pushTimeOut: Option[Duration], blockingDequeue: Boolean) {
|
||||
|
||||
/**
|
||||
* Creates a MessageQueue (Mailbox) with the specified properties
|
||||
* bounds = whether the mailbox should be bounded (< 0 means unbounded)
|
||||
* pushTime = only used if bounded, indicates if and how long an enqueue should block
|
||||
* blockDequeue = whether dequeues should block or not
|
||||
*
|
||||
* The bounds + pushTime generates a MessageQueueAppendFailedException if enqueue times out
|
||||
*/
|
||||
def newMailbox(bounds: Int = capacity,
|
||||
pushTime: Option[Duration] = pushTimeOut,
|
||||
blockDequeue: Boolean = blockingDequeue) : MessageQueue = {
|
||||
if (bounds <= 0) {
|
||||
new LinkedTransferQueue[MessageInvocation] with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation): Unit = this add handle
|
||||
def dequeue(): MessageInvocation = {
|
||||
if(blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
else if (pushTime.isDefined) {
|
||||
val time = pushTime.get
|
||||
new BoundedTransferQueue[MessageInvocation](bounds) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation) {
|
||||
if (!this.offer(handle,time.length,time.unit))
|
||||
throw new MessageQueueAppendFailedException("Couldn't enqueue message " + handle + " to " + this.toString)
|
||||
}
|
||||
|
||||
def dequeue(): MessageInvocation = {
|
||||
if (blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
else {
|
||||
new LinkedBlockingQueue[MessageInvocation](bounds) with MessageQueue {
|
||||
def enqueue(handle: MessageInvocation): Unit = this put handle
|
||||
def dequeue(): MessageInvocation = {
|
||||
if(blockDequeue) this.take()
|
||||
else this.poll()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
|
|||
|
|
@ -9,14 +9,8 @@ import java.util.concurrent.{TimeUnit, Semaphore}
|
|||
import java.util.Iterator
|
||||
import se.scalablesolutions.akka.util.Logger
|
||||
|
||||
class BoundedTransferQueue[E <: AnyRef](
|
||||
val capacity: Int,
|
||||
val pushTimeout: Long,
|
||||
val pushTimeUnit: TimeUnit)
|
||||
extends LinkedTransferQueue[E] {
|
||||
class BoundedTransferQueue[E <: AnyRef](val capacity: Int) extends LinkedTransferQueue[E] {
|
||||
require(capacity > 0)
|
||||
require(pushTimeout > 0)
|
||||
require(pushTimeUnit ne null)
|
||||
|
||||
protected val guard = new Semaphore(capacity)
|
||||
|
||||
|
|
@ -50,7 +44,7 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
override def offer(e: E): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.offer(e)
|
||||
} catch {
|
||||
|
|
@ -63,9 +57,9 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
override def offer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.offer(e,timeout,unit)
|
||||
super.offer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
|
|
@ -76,7 +70,7 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
override def add(e: E): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.add(e)
|
||||
} catch {
|
||||
|
|
@ -89,17 +83,16 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
override def put(e :E): Unit = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
guard.acquire
|
||||
try {
|
||||
super.put(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
override def tryTransfer(e: E): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
if (guard.tryAcquire) {
|
||||
val result = try {
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
|
|
@ -112,9 +105,9 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
override def tryTransfer(e: E, timeout: Long, unit: TimeUnit): Boolean = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
if (guard.tryAcquire(timeout,unit)) {
|
||||
val result = try {
|
||||
super.tryTransfer(e,timeout,unit)
|
||||
super.tryTransfer(e)
|
||||
} catch {
|
||||
case e => guard.release; throw e
|
||||
}
|
||||
|
|
@ -125,7 +118,7 @@ class BoundedTransferQueue[E <: AnyRef](
|
|||
}
|
||||
|
||||
override def transfer(e: E): Unit = {
|
||||
if (guard.tryAcquire(pushTimeout,pushTimeUnit)) {
|
||||
if (guard.tryAcquire) {
|
||||
try {
|
||||
super.transfer(e)
|
||||
} catch {
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ import java.util.Queue
|
|||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
import se.scalablesolutions.akka.config.Config.config
|
||||
import concurrent.forkjoin.{TransferQueue, LinkedTransferQueue}
|
||||
import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue}
|
||||
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
|
||||
|
||||
/**
|
||||
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
|
||||
|
|
@ -17,9 +17,9 @@ import java.util.concurrent.{BlockingQueue, TimeUnit, LinkedBlockingQueue}
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ThreadBasedDispatcher(private val actor: ActorRef,
|
||||
val mailboxBounds: BoundedMailbox
|
||||
val mailboxConfig: MailboxConfig
|
||||
) extends MessageDispatcher {
|
||||
def this(actor: ActorRef, capacity: Int) = this(actor,BoundedMailbox(capacity,None))
|
||||
def this(actor: ActorRef, capacity: Int) = this(actor,MailboxConfig(capacity,None,true))
|
||||
def this(actor: ActorRef) = this(actor, Dispatchers.MAILBOX_CAPACITY)// For Java
|
||||
|
||||
private val name = actor.getClass.getName + ":" + actor.uuid
|
||||
|
|
@ -27,16 +27,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
|
|||
private var selectorThread: Thread = _
|
||||
@volatile private var active: Boolean = false
|
||||
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = {
|
||||
if (mailboxBounds.capacity <= 0)
|
||||
new LinkedTransferQueue[MessageInvocation] with ThreadMessageBlockingQueue
|
||||
else if (mailboxBounds.pushTimeOut.isDefined) {
|
||||
val timeout = mailboxBounds.pushTimeOut.get
|
||||
new BoundedTransferQueue[MessageInvocation](mailboxBounds.capacity, timeout.length, timeout.unit) with ThreadMessageBlockingQueue
|
||||
}
|
||||
else
|
||||
new LinkedBlockingQueue[MessageInvocation](mailboxBounds.capacity) with ThreadMessageBlockingQueue
|
||||
}
|
||||
override def createMailbox(actorRef: ActorRef): AnyRef = mailboxConfig.newMailbox(blockDequeue = true)
|
||||
|
||||
override def register(actorRef: ActorRef) = {
|
||||
if(actorRef != actor)
|
||||
|
|
@ -45,11 +36,11 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
|
|||
super.register(actorRef)
|
||||
}
|
||||
|
||||
def mailbox = actor.mailbox.asInstanceOf[ThreadMessageBlockingQueue]
|
||||
def mailbox = actor.mailbox.asInstanceOf[Queue[MessageInvocation] with MessageQueue]
|
||||
|
||||
def mailboxSize(a: ActorRef) = mailbox.size
|
||||
|
||||
def dispatch(invocation: MessageInvocation) = mailbox append invocation
|
||||
def dispatch(invocation: MessageInvocation) = mailbox enqueue invocation
|
||||
|
||||
def start = if (!active) {
|
||||
log.debug("Starting up %s", toString)
|
||||
|
|
@ -58,7 +49,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
|
|||
override def run = {
|
||||
while (active) {
|
||||
try {
|
||||
actor.invoke(mailbox.next)
|
||||
actor.invoke(mailbox.dequeue)
|
||||
} catch { case e: InterruptedException => active = false }
|
||||
}
|
||||
}
|
||||
|
|
@ -77,15 +68,3 @@ class ThreadBasedDispatcher(private val actor: ActorRef,
|
|||
|
||||
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
|
||||
}
|
||||
|
||||
trait ThreadMessageBlockingQueue extends MessageQueue with BlockingQueue[MessageInvocation] {
|
||||
final def next: MessageInvocation = take
|
||||
def append(invocation: MessageInvocation): Unit = put(invocation)
|
||||
}
|
||||
|
||||
trait ThreadMessageTransferQueue extends ThreadMessageBlockingQueue with TransferQueue[MessageInvocation] {
|
||||
final override def append(invocation: MessageInvocation): Unit = {
|
||||
if(!offer(invocation)) //If no consumer found, append it to the queue, if that fails, we're aborting
|
||||
throw new MessageQueueAppendFailedException("BlockingMessageTransferQueue transfer timed out")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue