2010-09-21 18:52:41 +02:00
/* *
2012-01-19 18:21:06 +01:00
* Copyright ( C ) 2009 - 2012 Typesafe Inc . < http : //www.typesafe.com>
2010-09-21 18:52:41 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.dispatch
2010-09-21 18:52:41 +02:00
2010-10-26 12:49:25 +02:00
import akka.AkkaException
2011-11-14 14:09:23 +01:00
import java.util. { Comparator , PriorityQueue , Queue }
2010-10-26 12:49:25 +02:00
import akka.util._
2011-11-14 14:09:23 +01:00
import akka.actor. { ActorCell , ActorRef }
2011-09-21 16:27:31 +02:00
import java.util.concurrent._
2011-09-26 11:50:26 +02:00
import annotation.tailrec
2011-10-27 12:23:01 +02:00
import akka.event.Logging.Error
2011-12-19 20:36:06 +01:00
import com.typesafe.config.Config
2011-12-19 21:46:37 +01:00
import akka.actor.ActorContext
2010-09-21 18:52:41 +02:00
2011-04-29 17:15:00 +02:00
class MessageQueueAppendFailedException ( message : String , cause : Throwable = null ) extends AkkaException ( message , cause )
2010-09-21 18:52:41 +02:00
2011-10-21 18:47:44 +02:00
object Mailbox {
2011-09-28 19:55:42 +02:00
2011-09-26 11:39:07 +02:00
type Status = Int
2011-09-28 19:55:42 +02:00
2011-10-04 14:28:05 +02:00
/*
* the following assigned numbers CANNOT be changed without looking at the code which uses them !
*/
2011-09-28 19:55:42 +02:00
// primary status: only first three
2011-10-04 14:28:05 +02:00
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero!
2011-09-28 19:55:42 +02:00
final val Suspended = 1
final val Closed = 2
2011-10-04 14:28:05 +02:00
// secondary status: Scheduled bit may be added to Open/Suspended
final val Scheduled = 4
2011-10-20 20:45:02 +02:00
2011-12-03 18:16:41 +01:00
// mailbox debugging helper using println (see below)
2011-12-10 22:52:49 +01:00
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
2011-12-03 18:16:41 +01:00
final val debug = false
2011-09-23 13:14:17 +02:00
}
2011-12-19 21:46:37 +01:00
/* *
* Custom mailbox implementations are implemented by extending this class .
2011-12-21 19:37:18 +01:00
* E . g .
* < pre < code >
* class MyMailbox ( owner : ActorContext ) extends CustomMailbox ( owner )
* with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
* val queue = new ConcurrentLinkedQueue [ Envelope ] ( )
* }
* </ code ></ pre >
2011-12-19 21:46:37 +01:00
*/
abstract class CustomMailbox ( val actorContext : ActorContext ) extends Mailbox ( actorContext . asInstanceOf [ ActorCell ] )
/* *
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation ,
* but can 't be exposed to user defined mailbox subclasses .
*
*/
private [ akka ] abstract class Mailbox ( val actor : ActorCell ) extends MessageQueue with SystemMessageQueue with Runnable {
2011-09-23 13:14:17 +02:00
import Mailbox._
2011-11-21 17:49:21 +01:00
@volatile
2011-11-29 16:51:30 +01:00
protected var _statusDoNotCallMeDirectly : Status = _ //0 by default
2011-11-21 17:49:21 +01:00
@volatile
2011-11-29 16:51:30 +01:00
protected var _systemQueueDoNotCallMeDirectly : SystemMessage = _ //null by default
2011-11-21 17:49:21 +01:00
2011-10-04 14:44:06 +02:00
@inline
2011-11-29 16:51:30 +01:00
final def status : Mailbox . Status = Unsafe . instance . getIntVolatile ( this , AbstractMailbox . mailboxStatusOffset )
2011-09-23 13:14:17 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-11-14 19:19:44 +01:00
final def shouldProcessMessage : Boolean = ( status & 3 ) == Open
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 14:28:05 +02:00
final def isSuspended : Boolean = ( status & 3 ) == Suspended
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-09-26 11:39:07 +02:00
final def isClosed : Boolean = status == Closed
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 14:28:05 +02:00
final def isScheduled : Boolean = ( status & Scheduled ) != 0
2011-09-23 13:14:17 +02:00
2011-10-04 15:22:41 +02:00
@inline
protected final def updateStatus ( oldStatus : Status , newStatus : Status ) : Boolean =
2011-11-21 17:49:21 +01:00
Unsafe . instance . compareAndSwapInt ( this , AbstractMailbox . mailboxStatusOffset , oldStatus , newStatus )
2011-10-04 15:22:41 +02:00
@inline
2011-11-29 16:51:30 +01:00
protected final def setStatus ( newStatus : Status ) : Unit =
Unsafe . instance . putIntVolatile ( this , AbstractMailbox . mailboxStatusOffset , newStatus )
2011-10-04 15:22:41 +02:00
2011-09-28 19:55:42 +02:00
/* *
2011-10-04 14:28:05 +02:00
* set new primary status Open . Caller does not need to worry about whether
* status was Scheduled or not .
2011-09-28 19:55:42 +02:00
*/
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeOpen ( ) : Boolean = status match {
case Closed ⇒ setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Open | s & Scheduled ) || becomeOpen ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
* set new primary status Suspended . Caller does not need to worry about whether
* status was Scheduled or not .
*/
2011-09-28 19:55:42 +02:00
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeSuspended ( ) : Boolean = status match {
case Closed ⇒ setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Suspended | s & Scheduled ) || becomeSuspended ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
* set new primary status Closed . Caller does not need to worry about whether
* status was Scheduled or not .
*/
2011-09-28 19:55:42 +02:00
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeClosed ( ) : Boolean = status match {
case Closed ⇒ setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Closed ) || becomeClosed ( )
}
/* *
* Set Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsScheduled ( ) : Boolean = {
val s = status
/*
* only try to add Scheduled bit if pure Open / Suspended , not Closed or with
* Scheduled bit already set ( this is one of the reasons why the numbers
* cannot be changed in object Mailbox above )
*/
if ( s <= Suspended ) updateStatus ( s , s | Scheduled ) || setAsScheduled ( )
else false
}
/* *
* Reset Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsIdle ( ) : Boolean = {
val s = status
/*
* only try to remove Scheduled bit if currently Scheduled , not Closed or
* without Scheduled bit set ( this is one of the reasons why the numbers
* cannot be changed in object Mailbox above )
*/
2011-11-14 19:19:44 +01:00
updateStatus ( s , s & ~ Scheduled ) || setAsIdle ( )
2011-09-28 19:55:42 +02:00
}
2011-09-23 13:14:17 +02:00
2011-10-18 18:06:17 +02:00
/*
* AtomicReferenceFieldUpdater for system queue
*/
2011-11-29 16:51:30 +01:00
protected final def systemQueueGet : SystemMessage =
Unsafe . instance . getObjectVolatile ( this , AbstractMailbox . systemMessageOffset ) . asInstanceOf [ SystemMessage ]
2011-11-22 22:28:37 +01:00
protected final def systemQueuePut ( _old : SystemMessage , _new : SystemMessage ) : Boolean =
2011-11-21 17:49:21 +01:00
Unsafe . instance . compareAndSwapObject ( this , AbstractMailbox . systemMessageOffset , _old , _new )
2011-10-18 18:06:17 +02:00
2011-11-18 17:03:35 +01:00
final def canBeScheduledForExecution ( hasMessageHint : Boolean , hasSystemMessageHint : Boolean ) : Boolean = status match {
2011-10-04 14:28:05 +02:00
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed ⇒ false
case _ ⇒ hasSystemMessageHint || hasSystemMessages
2011-09-23 13:14:17 +02:00
}
2011-09-21 15:01:47 +02:00
final def run = {
2011-11-18 17:03:35 +01:00
try {
if ( ! isClosed ) { //Volatile read, needed here
processAllSystemMessages ( ) //First, deal with any system messages
processMailbox ( ) //Then deal with messages
}
} finally {
setAsIdle ( ) //Volatile write, needed here
2011-09-23 13:14:17 +02:00
dispatcher . registerForExecution ( this , false , false )
2011-09-21 15:01:47 +02:00
}
}
/* *
* Process the messages in the mailbox
*
* @return true if the processing finished before the mailbox was empty , due to the throughput constraint
*/
2012-01-21 01:06:32 +01:00
private final def processMailbox ( ) : Unit = if ( dispatcher . isThroughputDefined ) process ( dispatcher . throughput ) else process ( 1 )
@tailrec private final def process ( left : Int , deadlineNs : Long = if ( dispatcher . isThroughputDeadlineTimeDefined ) System . nanoTime + dispatcher . throughputDeadlineTime . toNanos else 0 l ) : Unit =
2012-01-21 01:13:55 +01:00
if ( ( left > 0 ) && ( shouldProcessMessage ) && ( ! dispatcher . isThroughputDeadlineTimeDefined || System . nanoTime < deadlineNs ) ) {
2012-01-21 01:06:32 +01:00
val next = dequeue ( )
if ( next ne null ) {
2012-01-21 01:09:06 +01:00
if ( Mailbox . debug ) println ( actor . self + " processing message " + next )
2012-01-21 01:06:32 +01:00
actor invoke next
processAllSystemMessages ( )
process ( left - 1 , deadlineNs )
2012-01-21 01:15:00 +01:00
}
}
2011-09-20 18:34:21 +02:00
2011-11-18 17:03:35 +01:00
final def processAllSystemMessages ( ) {
2011-10-18 18:06:17 +02:00
var nextMessage = systemDrain ( )
try {
while ( nextMessage ne null ) {
2011-12-03 18:16:41 +01:00
if ( debug ) println ( actor . self + " processing system message " + nextMessage + " with children " + actor . childrenRefs )
2011-10-18 18:06:17 +02:00
actor systemInvoke nextMessage
nextMessage = nextMessage . next
// don’ t ever execute normal message when system message present!
if ( nextMessage eq null ) nextMessage = systemDrain ( )
}
} catch {
case e ⇒
2012-01-17 14:07:20 +01:00
actor . system . eventStream . publish ( Error ( e , actor . self . path . toString , this . getClass , "exception during processing system messages, dropping " + SystemMessage . size ( nextMessage ) + " messages!" ) )
2011-10-18 18:06:17 +02:00
throw e
2011-09-20 18:34:21 +02:00
}
}
2011-09-21 15:01:47 +02:00
2011-11-14 14:09:23 +01:00
@inline
final def dispatcher : MessageDispatcher = actor . dispatcher
2011-11-15 14:39:43 +01:00
/* *
* Overridable callback to clean up the mailbox ,
* called when an actor is unregistered .
2011-12-07 15:51:46 +01:00
* By default it dequeues all system messages + messages and ships them to the owning actors ' systems ' DeadLetterMailbox
2011-11-15 14:39:43 +01:00
*/
2012-01-10 13:33:57 +01:00
protected [ dispatch ] def cleanUp ( ) : Unit =
if ( actor ne null ) { // actor is null for the deadLetterMailbox
val dlq = actor . systemImpl . deadLetterMailbox
if ( hasSystemMessages ) {
var message = systemDrain ( )
while ( message ne null ) {
// message must be “virgin” before being able to systemEnqueue again
val next = message . next
message . next = null
dlq . systemEnqueue ( actor . self , message )
message = next
}
2011-12-07 15:51:46 +01:00
}
2012-01-10 13:33:57 +01:00
if ( hasMessages ) {
var envelope = dequeue
while ( envelope ne null ) {
dlq . enqueue ( actor . self , envelope )
envelope = dequeue
}
2011-12-07 15:51:46 +01:00
}
}
2011-09-23 09:33:53 +02:00
}
trait MessageQueue {
2011-09-21 15:01:47 +02:00
/*
* These method need to be implemented in subclasses ; they should not rely on the internal stuff above .
*/
2011-11-12 10:57:28 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope )
2011-09-23 09:33:53 +02:00
2011-09-21 15:01:47 +02:00
def dequeue ( ) : Envelope
2011-09-21 16:27:31 +02:00
def numberOfMessages : Int
2011-09-23 09:33:53 +02:00
def hasMessages : Boolean
}
trait SystemMessageQueue {
2011-10-18 18:06:17 +02:00
/* *
* Enqueue a new system message , e . g . by prepending atomically as new head of a single - linked list .
*/
2011-11-12 10:57:28 +01:00
def systemEnqueue ( receiver : ActorRef , message : SystemMessage ) : Unit
2011-09-23 09:33:53 +02:00
2011-10-18 18:06:17 +02:00
/* *
* Dequeue all messages from system queue and return them as single - linked list .
*/
def systemDrain ( ) : SystemMessage
2011-09-21 16:27:31 +02:00
def hasSystemMessages : Boolean
}
2011-10-18 18:06:17 +02:00
trait DefaultSystemMessageQueue { self : Mailbox ⇒
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
2011-11-12 10:57:28 +01:00
final def systemEnqueue ( receiver : ActorRef , message : SystemMessage ) : Unit = {
2011-11-11 17:44:57 +01:00
assert ( message . next eq null )
2011-12-03 18:16:41 +01:00
if ( Mailbox . debug ) println ( actor . self + " having enqueued " + message )
2011-10-18 18:06:17 +02:00
val head = systemQueueGet
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut ; “ Intra - Thread Semantics ” on page 12 of the JSR133 spec
* guarantees that “ head ” uses the value obtained from systemQueueGet above .
* Hence , SystemMessage . next does not need to be volatile .
*/
message . next = head
2011-11-11 17:44:57 +01:00
if ( ! systemQueuePut ( head , message ) ) {
message . next = null
2011-11-12 10:57:28 +01:00
systemEnqueue ( receiver , message )
2011-11-11 17:44:57 +01:00
}
2011-10-18 18:06:17 +02:00
}
2011-10-18 16:44:35 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
final def systemDrain ( ) : SystemMessage = {
val head = systemQueueGet
if ( systemQueuePut ( head , null ) ) SystemMessage . reverse ( head ) else systemDrain ( )
}
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
def hasSystemMessages : Boolean = systemQueueGet ne null
2011-09-21 15:01:47 +02:00
}
2011-09-23 09:33:53 +02:00
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
2011-11-12 10:57:28 +01:00
final def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = queue add handle
2011-09-21 15:01:47 +02:00
final def dequeue ( ) : Envelope = queue . poll ( )
}
2011-09-23 09:33:53 +02:00
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
2011-09-21 15:01:47 +02:00
def pushTimeOut : Duration
2011-09-23 09:33:53 +02:00
override def queue : BlockingQueue [ Envelope ]
2011-09-21 15:01:47 +02:00
2011-11-12 10:57:28 +01:00
final def enqueue ( receiver : ActorRef , handle : Envelope ) {
2011-09-21 15:01:47 +02:00
if ( pushTimeOut . length > 0 ) {
queue . offer ( handle , pushTimeOut . length , pushTimeOut . unit ) || {
throw new MessageQueueAppendFailedException ( "Couldn't enqueue message " + handle + " to " + toString )
}
} else queue put handle
}
final def dequeue ( ) : Envelope = queue . poll ( )
}
2011-09-23 09:33:53 +02:00
trait QueueBasedMessageQueue extends MessageQueue {
2011-09-21 18:48:54 +02:00
def queue : Queue [ Envelope ]
2011-09-21 16:27:31 +02:00
final def numberOfMessages = queue . size
final def hasMessages = ! queue . isEmpty
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
/* *
* Mailbox configuration .
*/
2011-09-21 15:01:47 +02:00
trait MailboxType {
2011-12-19 21:46:37 +01:00
def create ( receiver : ActorContext ) : Mailbox
2011-09-21 15:01:47 +02:00
}
2011-09-23 09:33:53 +02:00
/* *
* It 's a case class for Java ( new UnboundedMailbox )
*/
2011-09-21 15:01:47 +02:00
case class UnboundedMailbox ( ) extends MailboxType {
2011-12-19 21:46:37 +01:00
override def create ( receiver : ActorContext ) =
new Mailbox ( receiver . asInstanceOf [ ActorCell ] ) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
2011-10-18 16:44:35 +02:00
final val queue = new ConcurrentLinkedQueue [ Envelope ] ( )
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2011-10-07 15:59:18 +02:00
case class BoundedMailbox ( final val capacity : Int , final val pushTimeOut : Duration ) extends MailboxType {
2011-09-21 15:01:47 +02:00
2011-05-18 17:25:30 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
2010-09-21 18:52:41 +02:00
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2011-12-19 21:46:37 +01:00
override def create ( receiver : ActorContext ) =
new Mailbox ( receiver . asInstanceOf [ ActorCell ] ) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
2011-10-18 16:44:35 +02:00
final val queue = new LinkedBlockingQueue [ Envelope ] ( capacity )
final val pushTimeOut = BoundedMailbox . this . pushTimeOut
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2011-10-07 11:34:07 +02:00
case class UnboundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] ) extends MailboxType {
2011-12-19 21:46:37 +01:00
override def create ( receiver : ActorContext ) =
new Mailbox ( receiver . asInstanceOf [ ActorCell ] ) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics with DefaultSystemMessageQueue {
2011-10-18 16:44:35 +02:00
final val queue = new PriorityBlockingQueue [ Envelope ] ( 11 , cmp )
}
2011-03-09 18:11:45 +01:00
}
2011-10-07 15:59:18 +02:00
case class BoundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] , final val capacity : Int , final val pushTimeOut : Duration ) extends MailboxType {
2011-03-09 18:11:45 +01:00
2011-09-21 15:01:47 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2011-03-09 18:11:45 +01:00
2011-12-19 21:46:37 +01:00
override def create ( receiver : ActorContext ) =
new Mailbox ( receiver . asInstanceOf [ ActorCell ] ) with QueueBasedMessageQueue with BoundedMessageQueueSemantics with DefaultSystemMessageQueue {
2011-10-18 16:44:35 +02:00
final val queue = new BoundedBlockingQueue [ Envelope ] ( capacity , new PriorityQueue [ Envelope ] ( 11 , cmp ) )
final val pushTimeOut = BoundedPriorityMailbox . this . pushTimeOut
}
2011-09-21 15:01:47 +02:00
}
2011-03-09 18:11:45 +01:00