2010-09-21 18:52:41 +02:00
/* *
2011-07-14 16:03:08 +02:00
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
2010-09-21 18:52:41 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.dispatch
2010-09-21 18:52:41 +02:00
2010-10-26 12:49:25 +02:00
import akka.AkkaException
2011-05-18 17:25:30 +02:00
import java.util. { Comparator , PriorityQueue }
2010-10-26 12:49:25 +02:00
import akka.util._
2011-09-21 15:01:47 +02:00
import java.util.Queue
2011-10-18 16:44:35 +02:00
import akka.actor. { ActorContext , ActorCell }
2011-09-21 16:27:31 +02:00
import java.util.concurrent._
2011-09-26 11:50:26 +02:00
import atomic. { AtomicInteger , AtomicReferenceFieldUpdater }
import annotation.tailrec
2011-10-27 12:23:01 +02:00
import akka.event.Logging.Error
2010-09-21 18:52:41 +02:00
2011-04-29 17:15:00 +02:00
class MessageQueueAppendFailedException ( message : String , cause : Throwable = null ) extends AkkaException ( message , cause )
2010-09-21 18:52:41 +02:00
2011-10-21 18:47:44 +02:00
object Mailbox {
2011-09-28 19:55:42 +02:00
2011-09-26 11:39:07 +02:00
type Status = Int
2011-09-28 19:55:42 +02:00
2011-10-04 14:28:05 +02:00
/*
* the following assigned numbers CANNOT be changed without looking at the code which uses them !
*/
2011-09-28 19:55:42 +02:00
// primary status: only first three
2011-10-04 14:28:05 +02:00
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero!
2011-09-28 19:55:42 +02:00
final val Suspended = 1
final val Closed = 2
2011-10-04 14:28:05 +02:00
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4
2011-10-20 20:45:02 +02:00
2011-10-21 15:11:43 +02:00
// mailbox debugging helper using println (see below)
2011-10-21 19:07:17 +02:00
// TODO take this out before release
2011-10-20 20:45:02 +02:00
final val debug = false
2011-09-23 13:14:17 +02:00
}
2010-09-21 18:52:41 +02:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2011-10-18 16:44:35 +02:00
abstract class Mailbox ( val actor : ActorCell ) extends AbstractMailbox with MessageQueue with SystemMessageQueue with Runnable {
2011-09-23 13:14:17 +02:00
import Mailbox._
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 15:22:41 +02:00
final def status : Mailbox . Status = AbstractMailbox . updater . get ( this )
2011-09-23 13:14:17 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 14:28:05 +02:00
final def isActive : Boolean = ( status & 3 ) == Open
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 14:28:05 +02:00
final def isSuspended : Boolean = ( status & 3 ) == Suspended
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-09-26 11:39:07 +02:00
final def isClosed : Boolean = status == Closed
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 14:28:05 +02:00
final def isScheduled : Boolean = ( status & Scheduled ) != 0
2011-09-23 13:14:17 +02:00
2011-10-04 15:22:41 +02:00
@inline
protected final def updateStatus ( oldStatus : Status , newStatus : Status ) : Boolean =
AbstractMailbox . updater . compareAndSet ( this , oldStatus , newStatus )
@inline
protected final def setStatus ( newStatus : Status ) : Unit =
AbstractMailbox . updater . set ( this , newStatus )
2011-09-26 15:45:24 +02:00
/* *
* Internal method to enforce a volatile write of the status
*/
@tailrec
2011-09-27 17:41:02 +02:00
final def acknowledgeStatus ( ) {
2011-10-04 15:22:41 +02:00
val s = status
if ( updateStatus ( s , s ) ) ( )
2011-09-26 15:45:24 +02:00
else acknowledgeStatus ( )
}
2011-09-28 19:55:42 +02:00
/* *
2011-10-04 14:28:05 +02:00
* set new primary status Open . Caller does not need to worry about whether
* status was Scheduled or not .
2011-09-28 19:55:42 +02:00
*/
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeOpen ( ) : Boolean = status match {
case Closed ⇒ setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Open | s & Scheduled ) || becomeOpen ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
* set new primary status Suspended . Caller does not need to worry about whether
* status was Scheduled or not .
*/
2011-09-28 19:55:42 +02:00
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeSuspended ( ) : Boolean = status match {
case Closed ⇒ setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Suspended | s & Scheduled ) || becomeSuspended ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
* set new primary status Closed . Caller does not need to worry about whether
* status was Scheduled or not .
*/
2011-09-28 19:55:42 +02:00
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeClosed ( ) : Boolean = status match {
case Closed ⇒ setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Closed ) || becomeClosed ( )
}
/* *
* Set Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsScheduled ( ) : Boolean = {
val s = status
/*
* only try to add Scheduled bit if pure Open / Suspended , not Closed or with
* Scheduled bit already set ( this is one of the reasons why the numbers
* cannot be changed in object Mailbox above )
*/
if ( s <= Suspended ) updateStatus ( s , s | Scheduled ) || setAsScheduled ( )
else false
}
/* *
* Reset Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsIdle ( ) : Boolean = {
val s = status
/*
* only try to remove Scheduled bit if currently Scheduled , not Closed or
* without Scheduled bit set ( this is one of the reasons why the numbers
* cannot be changed in object Mailbox above )
*/
if ( s >= Scheduled ) {
updateStatus ( s , s & ~ Scheduled ) || setAsIdle ( )
2011-10-17 19:33:19 +02:00
} else {
acknowledgeStatus ( ) // this write is needed to make memory consistent after processMailbox()
false
}
2011-09-28 19:55:42 +02:00
}
2011-09-23 13:14:17 +02:00
2011-10-18 18:06:17 +02:00
/*
* AtomicReferenceFieldUpdater for system queue
*/
protected final def systemQueueGet : SystemMessage = AbstractMailbox . systemQueueUpdater . get ( this )
protected final def systemQueuePut ( _old : SystemMessage , _new : SystemMessage ) : Boolean = AbstractMailbox . systemQueueUpdater . compareAndSet ( this , _old , _new )
2011-09-23 13:14:17 +02:00
def shouldBeRegisteredForExecution ( hasMessageHint : Boolean , hasSystemMessageHint : Boolean ) : Boolean = status match {
2011-10-04 14:28:05 +02:00
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed ⇒ false
case _ ⇒ hasSystemMessageHint || hasSystemMessages
2011-09-23 13:14:17 +02:00
}
2011-09-21 15:01:47 +02:00
final def run = {
2011-10-20 20:45:02 +02:00
try processMailbox ( )
finally {
2011-09-28 19:55:42 +02:00
setAsIdle ( )
2011-09-23 13:14:17 +02:00
dispatcher . registerForExecution ( this , false , false )
2011-09-21 15:01:47 +02:00
}
}
/* *
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty , due to the throughput constraint
*/
final def processMailbox ( ) {
2011-10-07 11:34:07 +02:00
processAllSystemMessages ( ) //First, process all system messages
2011-09-23 13:14:17 +02:00
2011-09-28 19:55:42 +02:00
if ( isActive ) {
2011-09-21 15:01:47 +02:00
var nextMessage = dequeue ( )
if ( nextMessage ne null ) { //If we have a message
2011-10-07 11:34:07 +02:00
if ( dispatcher . isThroughputDefined ) { //If we're using throughput, we need to do some book-keeping
2011-09-21 15:01:47 +02:00
var processedMessages = 0
2011-10-07 11:34:07 +02:00
val deadlineNs = if ( dispatcher . isThroughputDeadlineTimeDefined ) System . nanoTime + TimeUnit . MILLISECONDS . toNanos ( dispatcher . throughputDeadlineTime ) else 0
2011-09-21 15:01:47 +02:00
do {
2011-10-21 15:11:43 +02:00
if ( debug ) println ( actor + " processing message " + nextMessage )
2011-10-19 13:19:44 +02:00
actor invoke nextMessage
2011-09-23 13:14:17 +02:00
2011-10-07 11:34:07 +02:00
processAllSystemMessages ( ) //After we're done, process all system messages
2011-09-23 13:14:17 +02:00
2011-09-28 19:55:42 +02:00
nextMessage = if ( isActive ) { // If we aren't suspended, we need to make sure we're not overstepping our boundaries
2011-09-22 17:15:51 +02:00
processedMessages += 1
2011-10-07 11:34:07 +02:00
if ( ( processedMessages >= dispatcher . throughput ) || ( dispatcher . isThroughputDeadlineTimeDefined && System . nanoTime >= deadlineNs ) ) // If we're throttled, break out
2011-09-22 17:15:51 +02:00
null //We reached our boundaries, abort
else dequeue //Dequeue the next message
2011-09-23 16:21:57 +02:00
} else null //Abort
2011-09-21 15:01:47 +02:00
} while ( nextMessage ne null )
2011-10-07 11:34:07 +02:00
} else { //If we only run one message per process
2011-10-19 13:19:44 +02:00
actor invoke nextMessage //Just run it
2011-10-07 11:34:07 +02:00
processAllSystemMessages ( ) //After we're done, process all system messages
2011-09-21 15:01:47 +02:00
}
}
}
}
2011-09-20 18:34:21 +02:00
2011-09-27 17:41:02 +02:00
def processAllSystemMessages ( ) {
2011-10-18 18:06:17 +02:00
var nextMessage = systemDrain ( )
try {
while ( nextMessage ne null ) {
2011-10-20 20:45:02 +02:00
if ( debug ) println ( actor + " processing system message " + nextMessage )
2011-10-18 18:06:17 +02:00
actor systemInvoke nextMessage
nextMessage = nextMessage . next
// don’ t ever execute normal message when system message present!
if ( nextMessage eq null ) nextMessage = systemDrain ( )
}
} catch {
case e ⇒
2011-11-11 11:37:34 +01:00
actor . app . eventStream . publish ( Error ( e , actor . self , "exception during processing system messages, dropping " + SystemMessage . size ( nextMessage ) + " messages!" ) )
2011-10-18 18:06:17 +02:00
throw e
2011-09-20 18:34:21 +02:00
}
}
2011-09-21 15:01:47 +02:00
2011-09-23 09:33:53 +02:00
def dispatcher : MessageDispatcher
}
trait MessageQueue {
2011-09-21 15:01:47 +02:00
/*
* These method need to be implemented in subclasses ; they should not rely on the internal stuff above .
*/
def enqueue ( handle : Envelope )
2011-09-23 09:33:53 +02:00
2011-09-21 15:01:47 +02:00
def dequeue ( ) : Envelope
2011-09-21 16:27:31 +02:00
def numberOfMessages : Int
2011-09-23 09:33:53 +02:00
def hasMessages : Boolean
}
trait SystemMessageQueue {
2011-10-18 18:06:17 +02:00
/* *
* Enqueue a new system message , e . g . by prepending atomically as new head of a single - linked list .
*/
2011-10-18 16:44:35 +02:00
def systemEnqueue ( message : SystemMessage ) : Unit
2011-09-23 09:33:53 +02:00
2011-10-18 18:06:17 +02:00
/* *
* Dequeue all messages from system queue and return them as single - linked list .
*/
def systemDrain ( ) : SystemMessage
2011-09-21 16:27:31 +02:00
def hasSystemMessages : Boolean
}
2011-10-18 18:06:17 +02:00
trait DefaultSystemMessageQueue { self : Mailbox ⇒
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
final def systemEnqueue ( message : SystemMessage ) : Unit = {
2011-11-11 17:44:57 +01:00
assert ( message . next eq null )
2011-10-21 15:11:43 +02:00
if ( Mailbox . debug ) println ( actor + " having enqueued " + message )
2011-10-18 18:06:17 +02:00
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut ; “ Intra - Thread Semantics ” on page 12 of the JSR133 spec
* guarantees that “ head ” uses the value obtained from systemQueueGet above .
* Hence , SystemMessage . next does not need to be volatile .
*/
message . next = head
2011-11-11 17:44:57 +01:00
if ( ! systemQueuePut ( head , message ) ) {
message . next = null
systemEnqueue ( message )
}
2011-10-18 18:06:17 +02:00
}
2011-10-18 16:44:35 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
final def systemDrain ( ) : SystemMessage = {
val head = systemQueueGet
if ( systemQueuePut ( head , null ) ) SystemMessage . reverse ( head ) else systemDrain ( )
}
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
def hasSystemMessages : Boolean = systemQueueGet ne null
2011-09-21 15:01:47 +02:00
}
2011-09-23 09:33:53 +02:00
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
2011-09-21 15:01:47 +02:00
final def enqueue ( handle : Envelope ) : Unit = queue add handle
final def dequeue ( ) : Envelope = queue . poll ( )
}
2011-09-23 09:33:53 +02:00
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
2011-09-21 15:01:47 +02:00
def pushTimeOut : Duration
2011-09-23 09:33:53 +02:00
override def queue : BlockingQueue [ Envelope ]
2011-09-21 15:01:47 +02:00
final def enqueue ( handle : Envelope ) {
if ( pushTimeOut . length > 0 ) {
queue . offer ( handle , pushTimeOut . length , pushTimeOut . unit ) || {
throw new MessageQueueAppendFailedException ( "Couldn't enqueue message " + handle + " to " + toString )
}
} else queue put handle
}
final def dequeue ( ) : Envelope = queue . poll ( )
}
2011-09-23 09:33:53 +02:00
trait QueueBasedMessageQueue extends MessageQueue {
2011-09-21 18:48:54 +02:00
def queue : Queue [ Envelope ]
2011-09-21 16:27:31 +02:00
final def numberOfMessages = queue . size
final def hasMessages = ! queue . isEmpty
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
/* *
* Mailbox configuration .
*/
2011-09-21 15:01:47 +02:00
trait MailboxType {
2011-10-18 16:44:35 +02:00
def create ( dispatcher : MessageDispatcher , receiver : ActorCell ) : Mailbox
2011-09-21 15:01:47 +02:00
}
2011-09-23 09:33:53 +02:00
/* *
* It 's a case class for Java ( new UnboundedMailbox )
*/
2011-09-21 15:01:47 +02:00
case class UnboundedMailbox ( ) extends MailboxType {
2011-10-18 16:44:35 +02:00
override def create ( _dispatcher : MessageDispatcher , receiver : ActorCell ) =
new Mailbox ( receiver ) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new ConcurrentLinkedQueue [ Envelope ] ( )
final val dispatcher = _dispatcher
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2011-10-07 15:59:18 +02:00
case class BoundedMailbox ( final val capacity : Int , final val pushTimeOut : Duration ) extends MailboxType {
2011-09-21 15:01:47 +02:00
2011-05-18 17:25:30 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
2010-09-21 18:52:41 +02:00
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2011-10-18 16:44:35 +02:00
override def create ( _dispatcher : MessageDispatcher , receiver : ActorCell ) =
new Mailbox ( receiver ) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new LinkedBlockingQueue [ Envelope ] ( capacity )
final val pushTimeOut = BoundedMailbox . this . pushTimeOut
final val dispatcher = _dispatcher
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2011-10-07 11:34:07 +02:00
case class UnboundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] ) extends MailboxType {
2011-10-18 16:44:35 +02:00
override def create ( _dispatcher : MessageDispatcher , receiver : ActorCell ) =
new Mailbox ( receiver ) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new PriorityBlockingQueue [ Envelope ] ( 11 , cmp )
final val dispatcher = _dispatcher
}
2011-03-09 18:11:45 +01:00
}
2011-10-07 15:59:18 +02:00
case class BoundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] , final val capacity : Int , final val pushTimeOut : Duration ) extends MailboxType {
2011-03-09 18:11:45 +01:00
2011-09-21 15:01:47 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2011-03-09 18:11:45 +01:00
2011-10-18 16:44:35 +02:00
override def create ( _dispatcher : MessageDispatcher , receiver : ActorCell ) =
new Mailbox ( receiver ) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
final val queue = new BoundedBlockingQueue [ Envelope ] ( capacity , new PriorityQueue [ Envelope ] ( 11 , cmp ) )
final val pushTimeOut = BoundedPriorityMailbox . this . pushTimeOut
final val dispatcher = _dispatcher
}
2011-09-21 15:01:47 +02:00
}
2011-03-09 18:11:45 +01:00