2010-09-21 18:52:41 +02:00
/* *
2013-01-09 01:47:48 +01:00
* Copyright ( C ) 2009 - 2013 Typesafe Inc . < http : //www.typesafe.com>
2010-09-21 18:52:41 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.dispatch
2010-09-21 18:52:41 +02:00
2012-02-06 14:49:23 +01:00
import java.util. { Comparator , PriorityQueue , Queue , Deque }
2011-09-21 16:27:31 +02:00
import java.util.concurrent._
2012-07-22 15:33:18 +02:00
import akka.AkkaException
import akka.actor. { ActorCell , ActorRef , Cell , ActorSystem , InternalActorRef , DeadLetter }
import akka.util. { Unsafe , BoundedBlockingQueue }
2011-10-27 12:23:01 +02:00
import akka.event.Logging.Error
2012-09-21 14:50:06 +02:00
import scala.concurrent.duration.Duration
2012-07-22 15:33:18 +02:00
import scala.annotation.tailrec
import scala.util.control.NonFatal
2012-02-06 14:49:23 +01:00
import com.typesafe.config.Config
2012-09-21 14:50:06 +02:00
import scala.concurrent.duration.FiniteDuration
2010-09-21 18:52:41 +02:00
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* INTERNAL API
2012-05-16 17:37:23 +02:00
*/
private [ akka ] object Mailbox {
2011-09-28 19:55:42 +02:00
2011-09-26 11:39:07 +02:00
type Status = Int
2011-09-28 19:55:42 +02:00
2011-10-04 14:28:05 +02:00
/*
* the following assigned numbers CANNOT be changed without looking at the code which uses them !
*/
2012-06-19 11:02:06 +02:00
// primary status
2012-05-23 15:17:49 +02:00
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
2012-06-19 11:02:06 +02:00
final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant
2011-10-04 14:28:05 +02:00
// secondary status: Scheduled bit may be added to Open/Suspended
2012-06-19 11:02:06 +02:00
final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
// shifted by 2: the suspend count!
final val shouldScheduleMask = 3
2012-07-04 09:20:17 +02:00
final val shouldNotProcessMask = ~ 2
2012-06-19 11:02:06 +02:00
final val suspendMask = ~ 3
final val suspendUnit = 4
2011-10-20 20:45:02 +02:00
2011-12-03 18:16:41 +01:00
// mailbox debugging helper using println (see below)
2011-12-10 22:52:49 +01:00
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
2012-05-23 15:17:49 +02:00
final val debug = false // Deliberately without type ascription to make it a compile-time constant
2011-09-23 13:14:17 +02:00
}
2011-12-19 21:46:37 +01:00
/* *
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation ,
* but can 't be exposed to user defined mailbox subclasses .
*
2012-05-18 13:37:26 +02:00
* INTERNAL API
2011-12-19 21:46:37 +01:00
*/
2012-06-13 17:57:56 +02:00
private [ akka ] abstract class Mailbox ( val messageQueue : MessageQueue )
2012-02-19 10:28:56 +01:00
extends SystemMessageQueue with Runnable {
2011-09-23 13:14:17 +02:00
import Mailbox._
2012-06-13 17:57:56 +02:00
/*
* This is needed for actually executing the mailbox , i . e . invoking the
* ActorCell . There are situations ( e . g . RepointableActorRef ) where a Mailbox
* is constructed but we know that we will not execute it , in which case this
2013-02-04 12:41:58 +01:00
* will be null . It must be a var to support switching into an “ active ”
2012-06-13 17:57:56 +02:00
* mailbox , should the owning ActorRef turn local .
2013-02-04 12:41:58 +01:00
*
2012-06-13 17:57:56 +02:00
* ANOTHER THING , IMPORTANT :
2013-02-04 12:41:58 +01:00
*
2012-06-13 17:57:56 +02:00
* actorCell . start ( ) publishes actorCell & self to the dispatcher , which
* means that messages may be processed theoretically before self ’ s constructor
* ends . The JMM guarantees visibility for final fields only after the end
* of the constructor , so safe publication requires that THIS WRITE BELOW
* stay as it is .
*/
@volatile
var actor : ActorCell = _
def setActor ( cell : ActorCell ) : Unit = actor = cell
def dispatcher : MessageDispatcher = actor . dispatcher
2012-04-03 16:24:50 +02:00
/* *
* Try to enqueue the message to this queue , or throw an exception .
*/
2012-02-21 13:22:25 +01:00
def enqueue ( receiver : ActorRef , msg : Envelope ) : Unit = messageQueue . enqueue ( receiver , msg )
2012-04-04 11:08:28 +02:00
2012-04-03 16:24:50 +02:00
/* *
* Try to dequeue the next message from this queue , return null failing that .
*/
2012-02-21 13:22:25 +01:00
def dequeue ( ) : Envelope = messageQueue . dequeue ( )
2012-04-03 16:24:50 +02:00
/* *
* Indicates whether this queue is non - empty .
*/
2012-02-21 13:22:25 +01:00
def hasMessages : Boolean = messageQueue . hasMessages
2012-04-03 16:24:50 +02:00
/* *
* Should return the current number of messages held in this queue ; may
* always return 0 if no other value is available efficiently . Do not use
* this for testing for presence of messages , use `hasMessages` instead .
*/
2012-02-21 13:22:25 +01:00
def numberOfMessages : Int = messageQueue . numberOfMessages
2011-11-21 17:49:21 +01:00
@volatile
2011-11-29 16:51:30 +01:00
protected var _statusDoNotCallMeDirectly : Status = _ //0 by default
2011-11-21 17:49:21 +01:00
@volatile
2011-11-29 16:51:30 +01:00
protected var _systemQueueDoNotCallMeDirectly : SystemMessage = _ //null by default
2011-11-21 17:49:21 +01:00
2011-10-04 14:44:06 +02:00
@inline
2011-11-29 16:51:30 +01:00
final def status : Mailbox . Status = Unsafe . instance . getIntVolatile ( this , AbstractMailbox . mailboxStatusOffset )
2011-09-23 13:14:17 +02:00
2011-10-04 14:44:06 +02:00
@inline
2012-07-04 09:20:17 +02:00
final def shouldProcessMessage : Boolean = ( status & shouldNotProcessMask ) == 0
2011-09-28 19:55:42 +02:00
2012-08-07 22:11:40 +02:00
@inline
final def suspendCount : Int = status / suspendUnit
2011-10-04 14:44:06 +02:00
@inline
2012-06-19 11:02:06 +02:00
final def isSuspended : Boolean = ( status & suspendMask ) != 0
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-09-26 11:39:07 +02:00
final def isClosed : Boolean = status == Closed
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2011-10-04 14:28:05 +02:00
final def isScheduled : Boolean = ( status & Scheduled ) != 0
2011-09-23 13:14:17 +02:00
2011-10-04 15:22:41 +02:00
@inline
protected final def updateStatus ( oldStatus : Status , newStatus : Status ) : Boolean =
2011-11-21 17:49:21 +01:00
Unsafe . instance . compareAndSwapInt ( this , AbstractMailbox . mailboxStatusOffset , oldStatus , newStatus )
2011-10-04 15:22:41 +02:00
@inline
2011-11-29 16:51:30 +01:00
protected final def setStatus ( newStatus : Status ) : Unit =
Unsafe . instance . putIntVolatile ( this , AbstractMailbox . mailboxStatusOffset , newStatus )
2011-10-04 15:22:41 +02:00
2011-09-28 19:55:42 +02:00
/* *
2012-07-13 12:25:26 +02:00
* Reduce the suspend count by one . Caller does not need to worry about whether
2011-10-04 14:28:05 +02:00
* status was Scheduled or not .
2012-06-19 11:02:06 +02:00
*
2012-07-24 16:09:23 +02:00
* @return true if the suspend count reached zero
2011-09-28 19:55:42 +02:00
*/
@tailrec
2012-07-13 12:25:26 +02:00
final def resume ( ) : Boolean = status match {
2013-02-04 12:41:58 +01:00
case Closed ⇒
setStatus ( Closed ) ; false
2012-06-19 11:02:06 +02:00
case s ⇒
val next = if ( s < suspendUnit ) s else s - suspendUnit
if ( updateStatus ( s , next ) ) next < suspendUnit
2012-07-13 12:25:26 +02:00
else resume ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
2012-07-13 12:25:26 +02:00
* Increment the suspend count by one . Caller does not need to worry about whether
2011-10-04 14:28:05 +02:00
* status was Scheduled or not .
2012-06-19 11:02:06 +02:00
*
2012-07-24 16:09:23 +02:00
* @return true if the previous suspend count was zero
2011-10-04 14:28:05 +02:00
*/
2011-09-28 19:55:42 +02:00
@tailrec
2012-07-13 12:25:26 +02:00
final def suspend ( ) : Boolean = status match {
2013-02-04 12:41:58 +01:00
case Closed ⇒
setStatus ( Closed ) ; false
2012-06-19 11:02:06 +02:00
case s ⇒
if ( updateStatus ( s , s + suspendUnit ) ) s < suspendUnit
2012-07-13 12:25:26 +02:00
else suspend ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
* set new primary status Closed . Caller does not need to worry about whether
* status was Scheduled or not .
*/
2011-09-28 19:55:42 +02:00
@tailrec
2011-10-04 14:28:05 +02:00
final def becomeClosed ( ) : Boolean = status match {
2013-02-04 12:41:58 +01:00
case Closed ⇒
setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Closed ) || becomeClosed ( )
2011-10-04 14:28:05 +02:00
}
/* *
* Set Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsScheduled ( ) : Boolean = {
val s = status
/*
* only try to add Scheduled bit if pure Open / Suspended , not Closed or with
2012-06-19 11:02:06 +02:00
* Scheduled bit already set
2011-10-04 14:28:05 +02:00
*/
2012-06-19 11:02:06 +02:00
if ( ( s & shouldScheduleMask ) != Open ) false
else updateStatus ( s , s | Scheduled ) || setAsScheduled ( )
2011-10-04 14:28:05 +02:00
}
/* *
* Reset Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsIdle ( ) : Boolean = {
val s = status
2011-11-14 19:19:44 +01:00
updateStatus ( s , s & ~ Scheduled ) || setAsIdle ( )
2011-09-28 19:55:42 +02:00
}
2011-09-23 13:14:17 +02:00
2011-10-18 18:06:17 +02:00
/*
* AtomicReferenceFieldUpdater for system queue
*/
2011-11-29 16:51:30 +01:00
protected final def systemQueueGet : SystemMessage =
Unsafe . instance . getObjectVolatile ( this , AbstractMailbox . systemMessageOffset ) . asInstanceOf [ SystemMessage ]
2012-06-04 12:18:30 +02:00
2011-11-22 22:28:37 +01:00
protected final def systemQueuePut ( _old : SystemMessage , _new : SystemMessage ) : Boolean =
2011-11-21 17:49:21 +01:00
Unsafe . instance . compareAndSwapObject ( this , AbstractMailbox . systemMessageOffset , _old , _new )
2011-10-18 18:06:17 +02:00
2011-11-18 17:03:35 +01:00
final def canBeScheduledForExecution ( hasMessageHint : Boolean , hasSystemMessageHint : Boolean ) : Boolean = status match {
2011-10-04 14:28:05 +02:00
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed ⇒ false
case _ ⇒ hasSystemMessageHint || hasSystemMessages
2011-09-23 13:14:17 +02:00
}
2011-09-21 15:01:47 +02:00
final def run = {
2011-11-18 17:03:35 +01:00
try {
if ( ! isClosed ) { //Volatile read, needed here
processAllSystemMessages ( ) //First, deal with any system messages
processMailbox ( ) //Then deal with messages
}
} finally {
setAsIdle ( ) //Volatile write, needed here
2011-09-23 13:14:17 +02:00
dispatcher . registerForExecution ( this , false , false )
2011-09-21 15:01:47 +02:00
}
}
/* *
* Process the messages in the mailbox
*/
2012-01-21 01:33:07 +01:00
@tailrec private final def processMailbox (
2012-01-21 13:04:45 +01:00
left : Int = java . lang . Math . max ( dispatcher . throughput , 1 ) ,
2012-01-21 13:12:30 +01:00
deadlineNs : Long = if ( dispatcher . isThroughputDeadlineTimeDefined == true ) System . nanoTime + dispatcher . throughputDeadlineTime . toNanos else 0L ) : Unit =
2012-01-21 01:33:07 +01:00
if ( shouldProcessMessage ) {
2012-01-21 01:06:32 +01:00
val next = dequeue ( )
if ( next ne null ) {
2012-01-21 01:09:06 +01:00
if ( Mailbox . debug ) println ( actor . self + " processing message " + next )
2012-01-21 01:06:32 +01:00
actor invoke next
processAllSystemMessages ( )
2012-08-15 15:17:39 +02:00
if ( ( left > 1 ) && ( ( dispatcher . isThroughputDeadlineTimeDefined == false ) || ( System . nanoTime - deadlineNs ) < 0 ) ) {
2012-01-21 01:33:07 +01:00
processMailbox ( left - 1 , deadlineNs )
2012-08-15 15:17:39 +02:00
} else if ( Thread . interrupted ( ) ) {
throw new InterruptedException ( "Interrupted while processing actor messages" )
}
2012-01-21 01:15:00 +01:00
}
}
2011-09-20 18:34:21 +02:00
2012-06-07 15:19:28 +02:00
/* *
* Will at least try to process all queued system messages : in case of
* failure simply drop and go on to the next , because there is nothing to
* restart here ( failure is in ActorCell somewhere … ) . In case the mailbox
* becomes closed ( because of processing a Terminate message ) , dump all
* already dequeued message to deadLetters .
*/
2011-11-18 17:03:35 +01:00
final def processAllSystemMessages ( ) {
2012-08-16 11:14:35 +02:00
var interruption : Throwable = null
2012-06-04 12:18:30 +02:00
var nextMessage = systemDrain ( null )
2012-06-05 16:58:24 +02:00
while ( ( nextMessage ne null ) && ! isClosed ) {
val msg = nextMessage
nextMessage = nextMessage . next
msg . next = null
if ( debug ) println ( actor . self + " processing system message " + msg + " with " + actor . childrenRefs )
2012-06-07 15:19:28 +02:00
try {
actor systemInvoke msg
} catch {
2012-08-16 11:14:35 +02:00
// we know here that systemInvoke ensures that only InterruptedException and "fatal" exceptions get rethrown
2012-08-16 13:14:16 +02:00
case e : InterruptedException ⇒ interruption = e
2012-06-05 16:58:24 +02:00
}
// don’ t ever execute normal message when system message present!
2012-08-16 11:14:35 +02:00
if ( ( nextMessage eq null ) && ! isClosed ) nextMessage = systemDrain ( null )
2012-06-05 16:58:24 +02:00
}
/*
* if we closed the mailbox , we must dump the remaining system messages
2013-02-04 12:41:58 +01:00
* to deadLetters ( this is essential for DeathWatch )
2012-06-05 16:58:24 +02:00
*/
2012-06-13 17:57:56 +02:00
val dlm = actor . systemImpl . deadLetterMailbox
2012-06-05 16:58:24 +02:00
while ( nextMessage ne null ) {
val msg = nextMessage
nextMessage = nextMessage . next
msg . next = null
2012-06-13 17:57:56 +02:00
try dlm . systemEnqueue ( actor . self , msg )
2012-06-05 16:58:24 +02:00
catch {
2012-08-16 13:14:16 +02:00
case e : InterruptedException ⇒ interruption = e
2012-06-05 16:58:24 +02:00
case NonFatal ( e ) ⇒ actor . system . eventStream . publish (
Error ( e , actor . self . path . toString , this . getClass , "error while enqueuing " + msg + " to deadLetters: " + e . getMessage ) )
2011-10-18 18:06:17 +02:00
}
2011-09-20 18:34:21 +02:00
}
2012-08-16 11:14:35 +02:00
// if we got an interrupted exception while handling system messages, then rethrow it
if ( interruption ne null ) {
2012-08-16 13:14:16 +02:00
Thread . interrupted ( ) // clear interrupted flag before throwing according to java convention
2012-08-16 11:14:35 +02:00
throw interruption
}
2011-09-20 18:34:21 +02:00
}
2011-09-21 15:01:47 +02:00
2011-11-15 14:39:43 +01:00
/* *
* Overridable callback to clean up the mailbox ,
* called when an actor is unregistered .
2011-12-07 15:51:46 +01:00
* By default it dequeues all system messages + messages and ships them to the owning actors ' systems ' DeadLetterMailbox
2011-11-15 14:39:43 +01:00
*/
2012-01-10 13:33:57 +01:00
protected [ dispatch ] def cleanUp ( ) : Unit =
if ( actor ne null ) { // actor is null for the deadLetterMailbox
2012-02-19 10:28:56 +01:00
val dlm = actor . systemImpl . deadLetterMailbox
2012-06-04 12:18:30 +02:00
var message = systemDrain ( NoMessage )
while ( message ne null ) {
// message must be “virgin” before being able to systemEnqueue again
val next = message . next
message . next = null
dlm . systemEnqueue ( actor . self , message )
message = next
2011-12-07 15:51:46 +01:00
}
2012-02-19 10:28:56 +01:00
if ( messageQueue ne null ) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
2012-06-13 17:57:56 +02:00
messageQueue . cleanUp ( actor . self , actor . systemImpl . deadLetterQueue )
2011-12-07 15:51:46 +01:00
}
2011-09-23 09:33:53 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
* A MessageQueue is one of the core components in forming an Akka Mailbox .
* The MessageQueue is where the normal messages that are sent to Actors will be enqueued ( and subsequently dequeued )
2012-05-18 13:37:26 +02:00
* It needs to atleast support N producers and 1 consumer thread - safely .
2012-05-16 17:37:23 +02:00
*/
2011-09-23 09:33:53 +02:00
trait MessageQueue {
2012-02-13 12:38:59 +01:00
/* *
* Try to enqueue the message to this queue , or throw an exception .
2011-09-21 15:01:47 +02:00
*/
2012-02-13 12:38:59 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit // NOTE: receiver is used only in two places, but cannot be removed
2011-09-23 09:33:53 +02:00
2012-02-13 12:38:59 +01:00
/* *
* Try to dequeue the next message from this queue , return null failing that .
*/
2011-09-21 15:01:47 +02:00
def dequeue ( ) : Envelope
2012-02-13 12:38:59 +01:00
/* *
* Should return the current number of messages held in this queue ; may
* always return 0 if no other value is available efficiently . Do not use
* this for testing for presence of messages , use `hasMessages` instead .
*/
2011-09-21 16:27:31 +02:00
def numberOfMessages : Int
2012-02-13 12:38:59 +01:00
/* *
* Indicates whether this queue is non - empty .
*/
2011-09-23 09:33:53 +02:00
def hasMessages : Boolean
2012-02-19 10:28:56 +01:00
/* *
* Called when the mailbox this queue belongs to is disposed of . Normally it
* is expected to transfer all remaining messages into the dead letter queue
* which is passed in . The owner of this MessageQueue is passed in if
* available ( e . g . for creating DeadLetters ( ) ) , “ / deadletters ” otherwise .
*/
2012-06-13 17:57:56 +02:00
def cleanUp ( owner : ActorRef , deadLetters : MessageQueue ) : Unit
2011-09-23 09:33:53 +02:00
}
2012-02-19 10:28:56 +01:00
/* *
2012-05-16 17:37:23 +02:00
* INTERNAL USE ONLY
2012-02-19 10:28:56 +01:00
*/
private [ akka ] trait SystemMessageQueue {
2011-10-18 18:06:17 +02:00
/* *
* Enqueue a new system message , e . g . by prepending atomically as new head of a single - linked list .
*/
2011-11-12 10:57:28 +01:00
def systemEnqueue ( receiver : ActorRef , message : SystemMessage ) : Unit
2011-09-23 09:33:53 +02:00
2011-10-18 18:06:17 +02:00
/* *
* Dequeue all messages from system queue and return them as single - linked list .
*/
2012-06-04 12:18:30 +02:00
def systemDrain ( newContents : SystemMessage ) : SystemMessage
2011-09-21 16:27:31 +02:00
def hasSystemMessages : Boolean
}
2012-02-19 10:28:56 +01:00
/* *
2012-05-16 17:37:23 +02:00
* INTERNAL USE ONLY
2012-02-19 10:28:56 +01:00
*/
private [ akka ] trait DefaultSystemMessageQueue { self : Mailbox ⇒
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
2011-11-12 10:57:28 +01:00
final def systemEnqueue ( receiver : ActorRef , message : SystemMessage ) : Unit = {
2011-11-11 17:44:57 +01:00
assert ( message . next eq null )
2012-06-13 17:57:56 +02:00
if ( Mailbox . debug ) println ( receiver + " having enqueued " + message )
2011-10-18 18:06:17 +02:00
val head = systemQueueGet
2012-06-13 17:57:56 +02:00
if ( head == NoMessage ) {
if ( actor ne null ) actor . systemImpl . deadLetterMailbox . systemEnqueue ( receiver , message )
} else {
2012-06-04 12:18:30 +02:00
/*
* this write is safely published by the compareAndSet contained within
* systemQueuePut ; “ Intra - Thread Semantics ” on page 12 of the JSR133 spec
* guarantees that “ head ” uses the value obtained from systemQueueGet above .
* Hence , SystemMessage . next does not need to be volatile .
*/
message . next = head
if ( ! systemQueuePut ( head , message ) ) {
message . next = null
systemEnqueue ( receiver , message )
}
2011-11-11 17:44:57 +01:00
}
2011-10-18 18:06:17 +02:00
}
2011-10-18 16:44:35 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
2012-06-04 12:18:30 +02:00
final def systemDrain ( newContents : SystemMessage ) : SystemMessage = {
2011-10-18 18:06:17 +02:00
val head = systemQueueGet
2012-06-04 12:18:30 +02:00
if ( systemQueuePut ( head , newContents ) ) SystemMessage . reverse ( head ) else systemDrain ( newContents )
2011-10-18 18:06:17 +02:00
}
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
def hasSystemMessages : Boolean = systemQueueGet ne null
2012-06-04 12:18:30 +02:00
2011-09-21 15:01:47 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* A QueueBasedMessageQueue is a MessageQueue backed by a java . util . Queue
2012-05-16 17:37:23 +02:00
*/
2012-02-13 12:10:35 +01:00
trait QueueBasedMessageQueue extends MessageQueue {
def queue : Queue [ Envelope ]
2012-02-24 16:32:00 +01:00
def numberOfMessages = queue . size
def hasMessages = ! queue . isEmpty
2012-06-13 17:57:56 +02:00
def cleanUp ( owner : ActorRef , deadLetters : MessageQueue ) : Unit = {
2012-02-24 16:32:00 +01:00
if ( hasMessages ) {
var envelope = dequeue
while ( envelope ne null ) {
2012-06-13 17:57:56 +02:00
deadLetters . enqueue ( owner , envelope )
2012-02-24 16:32:00 +01:00
envelope = dequeue
}
}
}
2012-02-13 12:10:35 +01:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue ,
* i . e . a non - blocking enqueue and dequeue .
2012-05-16 17:37:23 +02:00
*/
2011-09-23 09:33:53 +02:00
trait UnboundedMessageQueueSemantics extends QueueBasedMessageQueue {
2012-02-13 12:38:59 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = queue add handle
def dequeue ( ) : Envelope = queue . poll ( )
2011-09-21 15:01:47 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue ,
* i . e . blocking enqueue with timeout
2012-05-16 17:37:23 +02:00
*/
2011-09-23 09:33:53 +02:00
trait BoundedMessageQueueSemantics extends QueueBasedMessageQueue {
2011-09-21 15:01:47 +02:00
def pushTimeOut : Duration
2011-09-23 09:33:53 +02:00
override def queue : BlockingQueue [ Envelope ]
2011-09-21 15:01:47 +02:00
2012-06-08 13:56:53 +02:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit =
2011-09-21 15:01:47 +02:00
if ( pushTimeOut . length > 0 ) {
2012-06-08 13:56:53 +02:00
if ( ! queue . offer ( handle , pushTimeOut . length , pushTimeOut . unit ) )
2013-02-04 12:41:58 +01:00
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
2011-09-21 15:01:47 +02:00
} else queue put handle
2012-02-13 12:38:59 +01:00
def dequeue ( ) : Envelope = queue . poll ( )
2011-09-21 15:01:47 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java . util . Deque
2012-05-16 17:37:23 +02:00
*/
2012-02-13 12:10:35 +01:00
trait DequeBasedMessageQueue extends QueueBasedMessageQueue {
def queue : Deque [ Envelope ]
def enqueueFirst ( receiver : ActorRef , handle : Envelope ) : Unit
2011-09-21 15:01:47 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue ,
* i . e . a non - blocking enqueue and dequeue .
2012-05-16 17:37:23 +02:00
*/
2012-02-08 18:26:54 +01:00
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
2012-02-24 16:32:00 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = queue add handle
def enqueueFirst ( receiver : ActorRef , handle : Envelope ) : Unit = queue addFirst handle
def dequeue ( ) : Envelope = queue . poll ( )
2012-02-08 18:26:54 +01:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue ,
* i . e . blocking enqueue with timeout
2012-05-16 17:37:23 +02:00
*/
2012-02-08 18:26:54 +01:00
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueue {
def pushTimeOut : Duration
override def queue : BlockingDeque [ Envelope ]
2012-02-24 16:32:00 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit =
2012-06-08 13:56:53 +02:00
if ( pushTimeOut . length > 0 ) {
if ( ! queue . offer ( handle , pushTimeOut . length , pushTimeOut . unit ) )
2013-02-04 12:41:58 +01:00
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
2012-06-08 13:56:53 +02:00
} else queue put handle
2012-02-08 18:26:54 +01:00
2012-02-24 16:32:00 +01:00
def enqueueFirst ( receiver : ActorRef , handle : Envelope ) : Unit =
2012-06-08 13:56:53 +02:00
if ( pushTimeOut . length > 0 ) {
if ( ! queue . offerFirst ( handle , pushTimeOut . length , pushTimeOut . unit ) )
2013-02-04 12:41:58 +01:00
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
2012-06-08 13:56:53 +02:00
} else queue putFirst handle
2012-02-08 18:26:54 +01:00
2012-02-24 16:32:00 +01:00
def dequeue ( ) : Envelope = queue . poll ( )
2012-02-08 18:26:54 +01:00
}
2010-09-21 18:52:41 +02:00
/* *
2012-06-25 10:49:49 +02:00
* MailboxType is a factory to create MessageQueues for an optionally
* provided ActorContext .
*
* < b > Possibly Important Notice </ b >
*
* When implementing a custom mailbox type , be aware that there is special
* semantics attached to `system.actorOf()` in that sending to the returned
* ActorRef may — for a short period of time — enqueue the messages first in a
* dummy queue . Top - level actors are created in two steps , and only after the
* guardian actor has performed that second step will all previously sent
* messages be transferred from the dummy queue into the real mailbox .
2010-09-21 18:52:41 +02:00
*/
2011-09-21 15:01:47 +02:00
trait MailboxType {
2012-06-13 17:57:56 +02:00
def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue
2011-09-21 15:01:47 +02:00
}
2011-09-23 09:33:53 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedMailbox is the default unbounded MailboxType used by Akka Actors .
2011-09-23 09:33:53 +02:00
*/
2011-09-21 15:01:47 +02:00
case class UnboundedMailbox ( ) extends MailboxType {
2012-02-21 16:40:34 +01:00
2012-02-26 21:26:25 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( )
2012-02-21 16:40:34 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2012-02-24 13:13:55 +01:00
new ConcurrentLinkedQueue [ Envelope ] ( ) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
final def queue : Queue [ Envelope ] = this
2011-10-18 16:44:35 +02:00
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedMailbox is the default bounded MailboxType used by Akka Actors .
2012-05-16 17:37:23 +02:00
*/
2012-08-21 09:22:09 +02:00
case class BoundedMailbox ( final val capacity : Int , final val pushTimeOut : FiniteDuration ) extends MailboxType {
2011-09-21 15:01:47 +02:00
2012-02-26 21:26:25 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( config . getInt ( "mailbox-capacity" ) ,
2012-02-21 16:40:34 +01:00
Duration ( config . getNanoseconds ( "mailbox-push-timeout-time" ) , TimeUnit . NANOSECONDS ) )
2011-05-18 17:25:30 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
2010-09-21 18:52:41 +02:00
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2012-02-24 13:13:55 +01:00
new LinkedBlockingQueue [ Envelope ] ( capacity ) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue : BlockingQueue [ Envelope ] = this
2011-10-18 16:44:35 +02:00
final val pushTimeOut = BoundedMailbox . this . pushTimeOut
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2012-02-21 17:23:54 +01:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedPriorityMailbox is an unbounded mailbox that allows for priorization of its contents .
* Extend this class and provide the Comparator in the constructor .
2012-02-21 17:23:54 +01:00
*/
2012-05-18 13:37:26 +02:00
class UnboundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] , final val initialCapacity : Int ) extends MailboxType {
def this ( cmp : Comparator [ Envelope ] ) = this ( cmp , 11 )
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2012-05-18 13:37:26 +02:00
new PriorityBlockingQueue [ Envelope ] ( initialCapacity , cmp ) with QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
2012-02-24 13:13:55 +01:00
final def queue : Queue [ Envelope ] = this
2011-10-18 16:44:35 +02:00
}
2011-03-09 18:11:45 +01:00
}
2012-02-21 17:23:54 +01:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedPriorityMailbox is a bounded mailbox that allows for priorization of its contents .
* Extend this class and provide the Comparator in the constructor .
2012-02-21 17:23:54 +01:00
*/
class BoundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] , final val capacity : Int , final val pushTimeOut : Duration ) extends MailboxType {
2011-03-09 18:11:45 +01:00
2011-09-21 15:01:47 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2011-03-09 18:11:45 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2012-02-24 13:13:55 +01:00
new BoundedBlockingQueue [ Envelope ] ( capacity , new PriorityQueue [ Envelope ] ( 11 , cmp ) ) with QueueBasedMessageQueue with BoundedMessageQueueSemantics {
final def queue : BlockingQueue [ Envelope ] = this
2011-10-18 16:44:35 +02:00
final val pushTimeOut = BoundedPriorityMailbox . this . pushTimeOut
}
2011-09-21 15:01:47 +02:00
}
2011-03-09 18:11:45 +01:00
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedDequeBasedMailbox is an unbounded MailboxType , backed by a Deque .
2012-05-16 17:37:23 +02:00
*/
2012-02-24 16:32:00 +01:00
case class UnboundedDequeBasedMailbox ( ) extends MailboxType {
2012-02-27 23:26:15 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( )
2012-02-24 16:32:00 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2012-02-24 16:32:00 +01:00
new LinkedBlockingDeque [ Envelope ] ( ) with DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
final val queue = this
2012-02-08 18:26:54 +01:00
}
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedDequeBasedMailbox is an bounded MailboxType , backed by a Deque .
2012-05-16 17:37:23 +02:00
*/
2012-08-21 09:22:09 +02:00
case class BoundedDequeBasedMailbox ( final val capacity : Int , final val pushTimeOut : FiniteDuration ) extends MailboxType {
2012-02-08 18:26:54 +01:00
2012-02-27 23:26:15 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( config . getInt ( "mailbox-capacity" ) ,
2012-02-24 16:32:00 +01:00
Duration ( config . getNanoseconds ( "mailbox-push-timeout-time" ) , TimeUnit . NANOSECONDS ) )
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedDequeBasedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedDequeBasedMailbox can not be null" )
2012-02-08 18:26:54 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2012-02-24 16:32:00 +01:00
new LinkedBlockingDeque [ Envelope ] ( capacity ) with DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
final val queue = this
2012-02-08 18:26:54 +01:00
final val pushTimeOut = BoundedDequeBasedMailbox . this . pushTimeOut
}
}