2010-09-21 18:52:41 +02:00
/* *
2015-03-07 22:58:48 -08:00
* Copyright ( C ) 2009 - 2015 Typesafe Inc . < http : //www.typesafe.com>
2010-09-21 18:52:41 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.dispatch
2010-09-21 18:52:41 +02:00
2011-09-21 16:27:31 +02:00
import java.util.concurrent._
2015-08-20 15:16:32 +02:00
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import java.util. { Comparator , Deque , PriorityQueue , Queue }
import akka.actor. { ActorCell , ActorRef , ActorSystem , DeadLetter , InternalActorRef }
2013-03-05 16:19:54 +01:00
import akka.dispatch.sysmsg._
2011-10-27 12:23:01 +02:00
import akka.event.Logging.Error
2015-08-20 15:16:32 +02:00
import akka.util.Helpers.ConfigOps
import akka.util. { BoundedBlockingQueue , StablePriorityBlockingQueue , StablePriorityQueue , Unsafe }
import com.typesafe.config.Config
import scala.annotation.tailrec
2015-05-11 13:21:05 +02:00
import scala.concurrent.duration. { Duration , FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
2012-07-22 15:33:18 +02:00
import scala.util.control.NonFatal
2014-03-11 17:03:05 +01:00
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* INTERNAL API
2012-05-16 17:37:23 +02:00
*/
private [ akka ] object Mailbox {
2011-09-28 19:55:42 +02:00
2011-09-26 11:39:07 +02:00
type Status = Int
2011-09-28 19:55:42 +02:00
2011-10-04 14:28:05 +02:00
/*
2013-02-18 10:29:01 +01:00
* The following assigned numbers CANNOT be changed without looking at the code which uses them !
2011-10-04 14:28:05 +02:00
*/
2013-02-18 10:29:01 +01:00
// Primary status
2012-05-23 15:17:49 +02:00
final val Open = 0 // _status is not initialized in AbstractMailbox, so default must be zero! Deliberately without type ascription to make it a compile-time constant
2012-06-19 11:02:06 +02:00
final val Closed = 1 // Deliberately without type ascription to make it a compile-time constant
2013-02-18 10:29:01 +01:00
// Secondary status: Scheduled bit may be added to Open/Suspended
2012-06-19 11:02:06 +02:00
final val Scheduled = 2 // Deliberately without type ascription to make it a compile-time constant
2013-02-18 10:29:01 +01:00
// Shifted by 2: the suspend count!
2012-06-19 11:02:06 +02:00
final val shouldScheduleMask = 3
2012-07-04 09:20:17 +02:00
final val shouldNotProcessMask = ~ 2
2012-06-19 11:02:06 +02:00
final val suspendMask = ~ 3
final val suspendUnit = 4
2011-10-20 20:45:02 +02:00
2011-12-03 18:16:41 +01:00
// mailbox debugging helper using println (see below)
2011-12-10 22:52:49 +01:00
// since this is a compile-time constant, scalac will elide code behind if (Mailbox.debug) (RK checked with 2.9.1)
2012-05-23 15:17:49 +02:00
final val debug = false // Deliberately without type ascription to make it a compile-time constant
2011-09-23 13:14:17 +02:00
}
2011-12-19 21:46:37 +01:00
/* *
* Mailbox and InternalMailbox is separated in two classes because ActorCell is needed for implementation ,
* but can 't be exposed to user defined mailbox subclasses .
*
2012-05-18 13:37:26 +02:00
* INTERNAL API
2011-12-19 21:46:37 +01:00
*/
2012-06-13 17:57:56 +02:00
private [ akka ] abstract class Mailbox ( val messageQueue : MessageQueue )
2015-05-11 13:21:05 +02:00
extends ForkJoinTask [ Unit ] with SystemMessageQueue with Runnable {
2012-02-19 10:28:56 +01:00
2011-09-23 13:14:17 +02:00
import Mailbox._
2012-06-13 17:57:56 +02:00
/*
* This is needed for actually executing the mailbox , i . e . invoking the
* ActorCell . There are situations ( e . g . RepointableActorRef ) where a Mailbox
* is constructed but we know that we will not execute it , in which case this
2013-02-04 12:41:58 +01:00
* will be null . It must be a var to support switching into an “ active ”
2012-06-13 17:57:56 +02:00
* mailbox , should the owning ActorRef turn local .
2013-02-04 12:41:58 +01:00
*
2012-06-13 17:57:56 +02:00
* ANOTHER THING , IMPORTANT :
2013-02-04 12:41:58 +01:00
*
2012-06-13 17:57:56 +02:00
* actorCell . start ( ) publishes actorCell & self to the dispatcher , which
* means that messages may be processed theoretically before self ’ s constructor
* ends . The JMM guarantees visibility for final fields only after the end
* of the constructor , so safe publication requires that THIS WRITE BELOW
* stay as it is .
*/
@volatile
var actor : ActorCell = _
def setActor ( cell : ActorCell ) : Unit = actor = cell
def dispatcher : MessageDispatcher = actor . dispatcher
2012-04-03 16:24:50 +02:00
/* *
* Try to enqueue the message to this queue , or throw an exception .
*/
2012-02-21 13:22:25 +01:00
def enqueue ( receiver : ActorRef , msg : Envelope ) : Unit = messageQueue . enqueue ( receiver , msg )
2012-04-04 11:08:28 +02:00
2012-04-03 16:24:50 +02:00
/* *
* Try to dequeue the next message from this queue , return null failing that .
*/
2012-02-21 13:22:25 +01:00
def dequeue ( ) : Envelope = messageQueue . dequeue ( )
2012-04-03 16:24:50 +02:00
/* *
* Indicates whether this queue is non - empty .
*/
2012-02-21 13:22:25 +01:00
def hasMessages : Boolean = messageQueue . hasMessages
2012-04-03 16:24:50 +02:00
/* *
* Should return the current number of messages held in this queue ; may
* always return 0 if no other value is available efficiently . Do not use
* this for testing for presence of messages , use `hasMessages` instead .
*/
2012-02-21 13:22:25 +01:00
def numberOfMessages : Int = messageQueue . numberOfMessages
2011-11-21 17:49:21 +01:00
@volatile
2011-11-29 16:51:30 +01:00
protected var _statusDoNotCallMeDirectly : Status = _ //0 by default
2011-11-21 17:49:21 +01:00
@volatile
2011-11-29 16:51:30 +01:00
protected var _systemQueueDoNotCallMeDirectly : SystemMessage = _ //null by default
2011-11-21 17:49:21 +01:00
2011-10-04 14:44:06 +02:00
@inline
2014-10-29 11:24:40 +01:00
final def currentStatus : Mailbox . Status = Unsafe . instance . getIntVolatile ( this , AbstractMailbox . mailboxStatusOffset )
2011-09-23 13:14:17 +02:00
2011-10-04 14:44:06 +02:00
@inline
2014-10-29 11:24:40 +01:00
final def shouldProcessMessage : Boolean = ( currentStatus & shouldNotProcessMask ) == 0
2011-09-28 19:55:42 +02:00
2012-08-07 22:11:40 +02:00
@inline
2014-10-29 11:24:40 +01:00
final def suspendCount : Int = currentStatus / suspendUnit
2012-08-07 22:11:40 +02:00
2011-10-04 14:44:06 +02:00
@inline
2014-10-29 11:24:40 +01:00
final def isSuspended : Boolean = ( currentStatus & suspendMask ) != 0
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2014-10-29 11:24:40 +01:00
final def isClosed : Boolean = currentStatus == Closed
2011-09-28 19:55:42 +02:00
2011-10-04 14:44:06 +02:00
@inline
2014-10-29 11:24:40 +01:00
final def isScheduled : Boolean = ( currentStatus & Scheduled ) != 0
2011-09-23 13:14:17 +02:00
2011-10-04 15:22:41 +02:00
@inline
protected final def updateStatus ( oldStatus : Status , newStatus : Status ) : Boolean =
2011-11-21 17:49:21 +01:00
Unsafe . instance . compareAndSwapInt ( this , AbstractMailbox . mailboxStatusOffset , oldStatus , newStatus )
2011-10-04 15:22:41 +02:00
@inline
2011-11-29 16:51:30 +01:00
protected final def setStatus ( newStatus : Status ) : Unit =
Unsafe . instance . putIntVolatile ( this , AbstractMailbox . mailboxStatusOffset , newStatus )
2011-10-04 15:22:41 +02:00
2011-09-28 19:55:42 +02:00
/* *
2012-07-13 12:25:26 +02:00
* Reduce the suspend count by one . Caller does not need to worry about whether
2011-10-04 14:28:05 +02:00
* status was Scheduled or not .
2012-06-19 11:02:06 +02:00
*
2012-07-24 16:09:23 +02:00
* @return true if the suspend count reached zero
2011-09-28 19:55:42 +02:00
*/
@tailrec
2014-10-29 11:24:40 +01:00
final def resume ( ) : Boolean = currentStatus match {
2013-02-04 12:41:58 +01:00
case Closed ⇒
setStatus ( Closed ) ; false
2012-06-19 11:02:06 +02:00
case s ⇒
val next = if ( s < suspendUnit ) s else s - suspendUnit
if ( updateStatus ( s , next ) ) next < suspendUnit
2012-07-13 12:25:26 +02:00
else resume ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
2012-07-13 12:25:26 +02:00
* Increment the suspend count by one . Caller does not need to worry about whether
2011-10-04 14:28:05 +02:00
* status was Scheduled or not .
2012-06-19 11:02:06 +02:00
*
2012-07-24 16:09:23 +02:00
* @return true if the previous suspend count was zero
2011-10-04 14:28:05 +02:00
*/
2011-09-28 19:55:42 +02:00
@tailrec
2014-10-29 11:24:40 +01:00
final def suspend ( ) : Boolean = currentStatus match {
2013-02-04 12:41:58 +01:00
case Closed ⇒
setStatus ( Closed ) ; false
2012-06-19 11:02:06 +02:00
case s ⇒
if ( updateStatus ( s , s + suspendUnit ) ) s < suspendUnit
2012-07-13 12:25:26 +02:00
else suspend ( )
2011-09-28 19:55:42 +02:00
}
2011-10-04 14:28:05 +02:00
/* *
* set new primary status Closed . Caller does not need to worry about whether
* status was Scheduled or not .
*/
2011-09-28 19:55:42 +02:00
@tailrec
2014-10-29 11:24:40 +01:00
final def becomeClosed ( ) : Boolean = currentStatus match {
2013-02-04 12:41:58 +01:00
case Closed ⇒
setStatus ( Closed ) ; false
case s ⇒ updateStatus ( s , Closed ) || becomeClosed ( )
2011-10-04 14:28:05 +02:00
}
/* *
* Set Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsScheduled ( ) : Boolean = {
2014-10-29 11:24:40 +01:00
val s = currentStatus
2011-10-04 14:28:05 +02:00
/*
2013-02-18 10:29:01 +01:00
* Only try to add Scheduled bit if pure Open / Suspended , not Closed or with
* Scheduled bit already set .
2011-10-04 14:28:05 +02:00
*/
2012-06-19 11:02:06 +02:00
if ( ( s & shouldScheduleMask ) != Open ) false
else updateStatus ( s , s | Scheduled ) || setAsScheduled ( )
2011-10-04 14:28:05 +02:00
}
/* *
* Reset Scheduled status , keeping primary status as is .
*/
@tailrec
final def setAsIdle ( ) : Boolean = {
2014-10-29 11:24:40 +01:00
val s = currentStatus
2011-11-14 19:19:44 +01:00
updateStatus ( s , s & ~ Scheduled ) || setAsIdle ( )
2011-09-28 19:55:42 +02:00
}
2011-10-18 18:06:17 +02:00
/*
2013-02-18 10:29:01 +01:00
* AtomicReferenceFieldUpdater for system queue .
2011-10-18 18:06:17 +02:00
*/
2013-03-05 16:19:54 +01:00
protected final def systemQueueGet : LatestFirstSystemMessageList =
// Note: contrary how it looks, there is no allocation here, as SystemMessageList is a value class and as such
// it just exists as a typed view during compile-time. The actual return type is still SystemMessage.
new LatestFirstSystemMessageList ( Unsafe . instance . getObjectVolatile ( this , AbstractMailbox . systemMessageOffset ) . asInstanceOf [ SystemMessage ] )
2012-06-04 12:18:30 +02:00
2013-03-05 16:19:54 +01:00
protected final def systemQueuePut ( _old : LatestFirstSystemMessageList , _new : LatestFirstSystemMessageList ) : Boolean =
// Note: calling .head is not actually existing on the bytecode level as the parameters _old and _new
// are SystemMessage instances hidden during compile time behind the SystemMessageList value class.
// Without calling .head the parameters would be boxed in SystemMessageList wrapper.
Unsafe . instance . compareAndSwapObject ( this , AbstractMailbox . systemMessageOffset , _old . head , _new . head )
2011-10-18 18:06:17 +02:00
2014-10-29 11:24:40 +01:00
final def canBeScheduledForExecution ( hasMessageHint : Boolean , hasSystemMessageHint : Boolean ) : Boolean = currentStatus match {
2011-10-04 14:28:05 +02:00
case Open | Scheduled ⇒ hasMessageHint || hasSystemMessageHint || hasSystemMessages || hasMessages
case Closed ⇒ false
case _ ⇒ hasSystemMessageHint || hasSystemMessages
2011-09-23 13:14:17 +02:00
}
2011-09-21 15:01:47 +02:00
2014-10-29 11:24:40 +01:00
override final def run ( ) : Unit = {
2011-11-18 17:03:35 +01:00
try {
if ( ! isClosed ) { //Volatile read, needed here
processAllSystemMessages ( ) //First, deal with any system messages
processMailbox ( ) //Then deal with messages
}
} finally {
setAsIdle ( ) //Volatile write, needed here
2011-09-23 13:14:17 +02:00
dispatcher . registerForExecution ( this , false , false )
2011-09-21 15:01:47 +02:00
}
}
2015-05-11 13:21:05 +02:00
override final def getRawResult ( ) : Unit = ( )
override final def setRawResult ( unit : Unit ) : Unit = ( )
final override def exec ( ) : Boolean = try { run ( ) ; false } catch {
case ie : InterruptedException ⇒
Thread . currentThread . interrupt ( )
false
case anything : Throwable ⇒
val t = Thread . currentThread
t . getUncaughtExceptionHandler match {
case null ⇒
case some ⇒ some . uncaughtException ( t , anything )
}
throw anything
}
2011-09-21 15:01:47 +02:00
/* *
* Process the messages in the mailbox
*/
2012-01-21 01:33:07 +01:00
@tailrec private final def processMailbox (
2012-01-21 13:04:45 +01:00
left : Int = java . lang . Math . max ( dispatcher . throughput , 1 ) ,
2012-01-21 13:12:30 +01:00
deadlineNs : Long = if ( dispatcher . isThroughputDeadlineTimeDefined == true ) System . nanoTime + dispatcher . throughputDeadlineTime . toNanos else 0L ) : Unit =
2012-01-21 01:33:07 +01:00
if ( shouldProcessMessage ) {
2012-01-21 01:06:32 +01:00
val next = dequeue ( )
if ( next ne null ) {
2012-01-21 01:09:06 +01:00
if ( Mailbox . debug ) println ( actor . self + " processing message " + next )
2012-01-21 01:06:32 +01:00
actor invoke next
2013-02-11 14:57:44 +01:00
if ( Thread . interrupted ( ) )
throw new InterruptedException ( "Interrupted while processing actor messages" )
2012-01-21 01:06:32 +01:00
processAllSystemMessages ( )
2013-02-11 14:57:44 +01:00
if ( ( left > 1 ) && ( ( dispatcher . isThroughputDeadlineTimeDefined == false ) || ( System . nanoTime - deadlineNs ) < 0 ) )
2012-01-21 01:33:07 +01:00
processMailbox ( left - 1 , deadlineNs )
2012-01-21 01:15:00 +01:00
}
}
2011-09-20 18:34:21 +02:00
2012-06-07 15:19:28 +02:00
/* *
* Will at least try to process all queued system messages : in case of
* failure simply drop and go on to the next , because there is nothing to
* restart here ( failure is in ActorCell somewhere … ) . In case the mailbox
* becomes closed ( because of processing a Terminate message ) , dump all
* already dequeued message to deadLetters .
*/
2011-11-18 17:03:35 +01:00
final def processAllSystemMessages ( ) {
2012-08-16 11:14:35 +02:00
var interruption : Throwable = null
2013-03-05 16:19:54 +01:00
var messageList = systemDrain ( SystemMessageList . LNil )
while ( ( messageList . nonEmpty ) && ! isClosed ) {
val msg = messageList . head
messageList = messageList . tail
msg . unlink ( )
2012-06-05 16:58:24 +02:00
if ( debug ) println ( actor . self + " processing system message " + msg + " with " + actor . childrenRefs )
2013-02-11 14:57:44 +01:00
// we know here that systemInvoke ensures that only "fatal" exceptions get rethrown
actor systemInvoke msg
if ( Thread . interrupted ( ) )
interruption = new InterruptedException ( "Interrupted while processing system messages" )
2012-06-05 16:58:24 +02:00
// don’ t ever execute normal message when system message present!
2013-03-05 16:19:54 +01:00
if ( ( messageList . isEmpty ) && ! isClosed ) messageList = systemDrain ( SystemMessageList . LNil )
2012-06-05 16:58:24 +02:00
}
/*
* if we closed the mailbox , we must dump the remaining system messages
2013-02-04 12:41:58 +01:00
* to deadLetters ( this is essential for DeathWatch )
2012-06-05 16:58:24 +02:00
*/
2013-06-03 11:41:11 +02:00
val dlm = actor . dispatcher . mailboxes . deadLetterMailbox
2013-03-05 16:19:54 +01:00
while ( messageList . nonEmpty ) {
val msg = messageList . head
messageList = messageList . tail
msg . unlink ( )
2012-06-13 17:57:56 +02:00
try dlm . systemEnqueue ( actor . self , msg )
2012-06-05 16:58:24 +02:00
catch {
2012-08-16 13:14:16 +02:00
case e : InterruptedException ⇒ interruption = e
2012-06-05 16:58:24 +02:00
case NonFatal ( e ) ⇒ actor . system . eventStream . publish (
Error ( e , actor . self . path . toString , this . getClass , "error while enqueuing " + msg + " to deadLetters: " + e . getMessage ) )
2011-10-18 18:06:17 +02:00
}
2011-09-20 18:34:21 +02:00
}
2012-08-16 11:14:35 +02:00
// if we got an interrupted exception while handling system messages, then rethrow it
if ( interruption ne null ) {
2012-08-16 13:14:16 +02:00
Thread . interrupted ( ) // clear interrupted flag before throwing according to java convention
2012-08-16 11:14:35 +02:00
throw interruption
}
2011-09-20 18:34:21 +02:00
}
2011-09-21 15:01:47 +02:00
2011-11-15 14:39:43 +01:00
/* *
* Overridable callback to clean up the mailbox ,
* called when an actor is unregistered .
2011-12-07 15:51:46 +01:00
* By default it dequeues all system messages + messages and ships them to the owning actors ' systems ' DeadLetterMailbox
2011-11-15 14:39:43 +01:00
*/
2012-01-10 13:33:57 +01:00
protected [ dispatch ] def cleanUp ( ) : Unit =
if ( actor ne null ) { // actor is null for the deadLetterMailbox
2013-06-03 11:41:11 +02:00
val dlm = actor . dispatcher . mailboxes . deadLetterMailbox
2013-03-05 16:19:54 +01:00
var messageList = systemDrain ( new LatestFirstSystemMessageList ( NoMessage ) )
while ( messageList . nonEmpty ) {
2012-06-04 12:18:30 +02:00
// message must be “virgin” before being able to systemEnqueue again
2013-03-05 16:19:54 +01:00
val msg = messageList . head
messageList = messageList . tail
msg . unlink ( )
dlm . systemEnqueue ( actor . self , msg )
2011-12-07 15:51:46 +01:00
}
2012-02-19 10:28:56 +01:00
if ( messageQueue ne null ) // needed for CallingThreadDispatcher, which never calls Mailbox.run()
2013-06-03 11:41:11 +02:00
messageQueue . cleanUp ( actor . self , actor . dispatcher . mailboxes . deadLetterMailbox . messageQueue )
2011-12-07 15:51:46 +01:00
}
2011-09-23 09:33:53 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
* A MessageQueue is one of the core components in forming an Akka Mailbox .
* The MessageQueue is where the normal messages that are sent to Actors will be enqueued ( and subsequently dequeued )
2013-02-18 10:29:01 +01:00
* It needs to at least support N producers and 1 consumer thread - safely .
2012-05-16 17:37:23 +02:00
*/
2011-09-23 09:33:53 +02:00
trait MessageQueue {
2012-02-13 12:38:59 +01:00
/* *
* Try to enqueue the message to this queue , or throw an exception .
2011-09-21 15:01:47 +02:00
*/
2012-02-13 12:38:59 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit // NOTE: receiver is used only in two places, but cannot be removed
2011-09-23 09:33:53 +02:00
2012-02-13 12:38:59 +01:00
/* *
* Try to dequeue the next message from this queue , return null failing that .
*/
2011-09-21 15:01:47 +02:00
def dequeue ( ) : Envelope
2012-02-13 12:38:59 +01:00
/* *
* Should return the current number of messages held in this queue ; may
* always return 0 if no other value is available efficiently . Do not use
* this for testing for presence of messages , use `hasMessages` instead .
*/
2011-09-21 16:27:31 +02:00
def numberOfMessages : Int
2012-02-13 12:38:59 +01:00
/* *
* Indicates whether this queue is non - empty .
*/
2011-09-23 09:33:53 +02:00
def hasMessages : Boolean
2012-02-19 10:28:56 +01:00
/* *
* Called when the mailbox this queue belongs to is disposed of . Normally it
* is expected to transfer all remaining messages into the dead letter queue
* which is passed in . The owner of this MessageQueue is passed in if
* available ( e . g . for creating DeadLetters ( ) ) , “ / deadletters ” otherwise .
*/
2012-06-13 17:57:56 +02:00
def cleanUp ( owner : ActorRef , deadLetters : MessageQueue ) : Unit
2011-09-23 09:33:53 +02:00
}
2013-06-01 21:58:34 +02:00
class NodeMessageQueue extends AbstractNodeQueue [ Envelope ] with MessageQueue with UnboundedMessageQueueSemantics {
2013-03-07 15:58:26 +01:00
final def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = add ( handle )
final def dequeue ( ) : Envelope = poll ( )
2013-04-03 20:05:20 +02:00
final def numberOfMessages : Int = count ( )
2013-03-07 15:58:26 +01:00
2013-04-03 20:05:20 +02:00
final def hasMessages : Boolean = ! isEmpty ( )
2013-03-07 15:58:26 +01:00
@tailrec final def cleanUp ( owner : ActorRef , deadLetters : MessageQueue ) : Unit = {
val envelope = dequeue ( )
if ( envelope ne null ) {
deadLetters . enqueue ( owner , envelope )
cleanUp ( owner , deadLetters )
}
}
}
2014-07-05 10:34:11 +02:00
//Discards overflowing messages into DeadLetters
class BoundedNodeMessageQueue ( capacity : Int ) extends AbstractBoundedNodeQueue [ Envelope ] ( capacity )
2014-08-25 10:26:28 +02:00
with MessageQueue with BoundedMessageQueueSemantics with MultipleConsumerSemantics {
2014-07-05 10:34:11 +02:00
final def pushTimeOut : Duration = Duration . Undefined
final def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit =
if ( ! add ( handle ) )
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
final def dequeue ( ) : Envelope = poll ( )
final def numberOfMessages : Int = size ( )
final def hasMessages : Boolean = ! isEmpty ( )
@tailrec final def cleanUp ( owner : ActorRef , deadLetters : MessageQueue ) : Unit = {
val envelope = dequeue ( )
if ( envelope ne null ) {
deadLetters . enqueue ( owner , envelope )
cleanUp ( owner , deadLetters )
}
}
}
2012-02-19 10:28:56 +01:00
/* *
2013-02-08 13:13:52 +01:00
* INTERNAL API
2012-02-19 10:28:56 +01:00
*/
private [ akka ] trait SystemMessageQueue {
2011-10-18 18:06:17 +02:00
/* *
* Enqueue a new system message , e . g . by prepending atomically as new head of a single - linked list .
*/
2011-11-12 10:57:28 +01:00
def systemEnqueue ( receiver : ActorRef , message : SystemMessage ) : Unit
2011-09-23 09:33:53 +02:00
2011-10-18 18:06:17 +02:00
/* *
* Dequeue all messages from system queue and return them as single - linked list .
*/
2013-03-05 16:19:54 +01:00
def systemDrain ( newContents : LatestFirstSystemMessageList ) : EarliestFirstSystemMessageList
2011-09-21 16:27:31 +02:00
def hasSystemMessages : Boolean
}
2012-02-19 10:28:56 +01:00
/* *
2013-02-08 13:13:52 +01:00
* INTERNAL API
2012-02-19 10:28:56 +01:00
*/
private [ akka ] trait DefaultSystemMessageQueue { self : Mailbox ⇒
2011-09-21 16:27:31 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
2011-11-12 10:57:28 +01:00
final def systemEnqueue ( receiver : ActorRef , message : SystemMessage ) : Unit = {
2013-03-05 16:19:54 +01:00
assert ( message . unlinked )
2012-06-13 17:57:56 +02:00
if ( Mailbox . debug ) println ( receiver + " having enqueued " + message )
2013-03-05 16:19:54 +01:00
val currentList = systemQueueGet
if ( currentList . head == NoMessage ) {
2013-06-03 11:41:11 +02:00
if ( actor ne null ) actor . dispatcher . mailboxes . deadLetterMailbox . systemEnqueue ( receiver , message )
2012-06-13 17:57:56 +02:00
} else {
2013-03-05 16:19:54 +01:00
if ( ! systemQueuePut ( currentList , message : : currentList ) ) {
message . unlink ( )
2012-06-04 12:18:30 +02:00
systemEnqueue ( receiver , message )
}
2011-11-11 17:44:57 +01:00
}
2011-10-18 18:06:17 +02:00
}
2011-10-18 16:44:35 +02:00
2011-10-18 18:06:17 +02:00
@tailrec
2013-03-05 16:19:54 +01:00
final def systemDrain ( newContents : LatestFirstSystemMessageList ) : EarliestFirstSystemMessageList = {
val currentList = systemQueueGet
2013-03-26 13:55:18 +01:00
if ( currentList . head == NoMessage ) new EarliestFirstSystemMessageList ( null )
else if ( systemQueuePut ( currentList , newContents ) ) currentList . reverse
else systemDrain ( newContents )
2011-10-18 18:06:17 +02:00
}
2011-09-21 16:27:31 +02:00
2013-03-26 13:55:18 +01:00
def hasSystemMessages : Boolean = systemQueueGet . head match {
case null | NoMessage ⇒ false
case _ ⇒ true
}
2012-06-04 12:18:30 +02:00
2011-09-21 15:01:47 +02:00
}
2013-06-01 21:58:34 +02:00
/* *
* This is a marker trait for message queues which support multiple consumers ,
* as is required by the BalancingDispatcher .
*/
trait MultipleConsumerSemantics
2012-05-16 17:37:23 +02:00
/* *
2013-02-18 10:29:01 +01:00
* A QueueBasedMessageQueue is a MessageQueue backed by a java . util . Queue .
2012-05-16 17:37:23 +02:00
*/
2013-06-01 21:58:34 +02:00
trait QueueBasedMessageQueue extends MessageQueue with MultipleConsumerSemantics {
2012-02-13 12:10:35 +01:00
def queue : Queue [ Envelope ]
2012-02-24 16:32:00 +01:00
def numberOfMessages = queue . size
def hasMessages = ! queue . isEmpty
2012-06-13 17:57:56 +02:00
def cleanUp ( owner : ActorRef , deadLetters : MessageQueue ) : Unit = {
2012-02-24 16:32:00 +01:00
if ( hasMessages ) {
var envelope = dequeue
while ( envelope ne null ) {
2012-06-13 17:57:56 +02:00
deadLetters . enqueue ( owner , envelope )
2012-02-24 16:32:00 +01:00
envelope = dequeue
}
}
}
2012-02-13 12:10:35 +01:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedMessageQueueSemantics adds unbounded semantics to a QueueBasedMessageQueue ,
* i . e . a non - blocking enqueue and dequeue .
2012-05-16 17:37:23 +02:00
*/
2013-06-01 21:58:34 +02:00
trait UnboundedMessageQueueSemantics
trait UnboundedQueueBasedMessageQueue extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
2012-02-13 12:38:59 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = queue add handle
def dequeue ( ) : Envelope = queue . poll ( )
2011-09-21 15:01:47 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedMessageQueueSemantics adds bounded semantics to a QueueBasedMessageQueue ,
2013-02-18 10:29:01 +01:00
* i . e . blocking enqueue with timeout .
2012-05-16 17:37:23 +02:00
*/
2013-06-01 21:58:34 +02:00
trait BoundedMessageQueueSemantics {
2011-09-21 15:01:47 +02:00
def pushTimeOut : Duration
2013-06-01 21:58:34 +02:00
}
2015-08-20 15:16:32 +02:00
/* *
* INTERNAL API
* Used to determine mailbox factories which create [ [ BoundedMessageQueueSemantics ] ]
* mailboxes , and thus should be validated that the `pushTimeOut` is greater than 0.
*/
private [ akka ] trait ProducesPushTimeoutSemanticsMailbox {
def pushTimeOut : Duration
}
2013-06-01 21:58:34 +02:00
trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics {
2011-09-23 09:33:53 +02:00
override def queue : BlockingQueue [ Envelope ]
2011-09-21 15:01:47 +02:00
2012-06-08 13:56:53 +02:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit =
2013-05-13 15:39:52 +02:00
if ( pushTimeOut . length >= 0 ) {
2012-06-08 13:56:53 +02:00
if ( ! queue . offer ( handle , pushTimeOut . length , pushTimeOut . unit ) )
2013-02-04 12:41:58 +01:00
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
2011-09-21 15:01:47 +02:00
} else queue put handle
2012-02-13 12:38:59 +01:00
def dequeue ( ) : Envelope = queue . poll ( )
2011-09-21 15:01:47 +02:00
}
2012-05-16 17:37:23 +02:00
/* *
2013-02-18 10:29:01 +01:00
* DequeBasedMessageQueue refines QueueBasedMessageQueue to be backed by a java . util . Deque .
2012-05-16 17:37:23 +02:00
*/
2013-06-01 21:58:34 +02:00
trait DequeBasedMessageQueueSemantics {
2012-02-13 12:10:35 +01:00
def enqueueFirst ( receiver : ActorRef , handle : Envelope ) : Unit
2011-09-21 15:01:47 +02:00
}
2013-06-01 21:58:34 +02:00
trait UnboundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with UnboundedMessageQueueSemantics
trait BoundedDequeBasedMessageQueueSemantics extends DequeBasedMessageQueueSemantics with BoundedMessageQueueSemantics
trait DequeBasedMessageQueue extends QueueBasedMessageQueue with DequeBasedMessageQueueSemantics {
def queue : Deque [ Envelope ]
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedDequeBasedMessageQueueSemantics adds unbounded semantics to a DequeBasedMessageQueue ,
* i . e . a non - blocking enqueue and dequeue .
2012-05-16 17:37:23 +02:00
*/
2013-06-01 21:58:34 +02:00
trait UnboundedDequeBasedMessageQueue extends DequeBasedMessageQueue with UnboundedDequeBasedMessageQueueSemantics {
2012-02-24 16:32:00 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = queue add handle
def enqueueFirst ( receiver : ActorRef , handle : Envelope ) : Unit = queue addFirst handle
def dequeue ( ) : Envelope = queue . poll ( )
2012-02-08 18:26:54 +01:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedMessageQueueSemantics adds bounded semantics to a DequeBasedMessageQueue ,
2013-02-18 10:29:01 +01:00
* i . e . blocking enqueue with timeout .
2012-05-16 17:37:23 +02:00
*/
2013-06-01 21:58:34 +02:00
trait BoundedDequeBasedMessageQueue extends DequeBasedMessageQueue with BoundedDequeBasedMessageQueueSemantics {
2012-02-08 18:26:54 +01:00
def pushTimeOut : Duration
override def queue : BlockingDeque [ Envelope ]
2012-02-24 16:32:00 +01:00
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit =
2013-05-13 15:39:52 +02:00
if ( pushTimeOut . length >= 0 ) {
2012-06-08 13:56:53 +02:00
if ( ! queue . offer ( handle , pushTimeOut . length , pushTimeOut . unit ) )
2013-02-04 12:41:58 +01:00
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
2012-06-08 13:56:53 +02:00
} else queue put handle
2012-02-08 18:26:54 +01:00
2012-02-24 16:32:00 +01:00
def enqueueFirst ( receiver : ActorRef , handle : Envelope ) : Unit =
2013-05-13 15:39:52 +02:00
if ( pushTimeOut . length >= 0 ) {
2012-06-08 13:56:53 +02:00
if ( ! queue . offerFirst ( handle , pushTimeOut . length , pushTimeOut . unit ) )
2013-02-04 12:41:58 +01:00
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( handle . message , handle . sender , receiver ) , handle . sender )
2012-06-08 13:56:53 +02:00
} else queue putFirst handle
2012-02-08 18:26:54 +01:00
2012-02-24 16:32:00 +01:00
def dequeue ( ) : Envelope = queue . poll ( )
2012-02-08 18:26:54 +01:00
}
2010-09-21 18:52:41 +02:00
/* *
2012-06-25 10:49:49 +02:00
* MailboxType is a factory to create MessageQueues for an optionally
* provided ActorContext .
*
* < b > Possibly Important Notice </ b >
*
* When implementing a custom mailbox type , be aware that there is special
* semantics attached to `system.actorOf()` in that sending to the returned
* ActorRef may — for a short period of time — enqueue the messages first in a
* dummy queue . Top - level actors are created in two steps , and only after the
* guardian actor has performed that second step will all previously sent
* messages be transferred from the dummy queue into the real mailbox .
2010-09-21 18:52:41 +02:00
*/
2011-09-21 15:01:47 +02:00
trait MailboxType {
2012-06-13 17:57:56 +02:00
def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue
2011-09-21 15:01:47 +02:00
}
2013-06-01 21:58:34 +02:00
trait ProducesMessageQueue [ T <: MessageQueue ]
2011-09-23 09:33:53 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedMailbox is the default unbounded MailboxType used by Akka Actors .
2011-09-23 09:33:53 +02:00
*/
2014-03-07 13:20:01 +01:00
final case class UnboundedMailbox ( ) extends MailboxType with ProducesMessageQueue [ UnboundedMailbox . MessageQueue ] {
2012-02-21 16:40:34 +01:00
2012-02-26 21:26:25 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( )
2012-02-21 16:40:34 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2013-06-01 21:58:34 +02:00
new UnboundedMailbox . MessageQueue
}
object UnboundedMailbox {
class MessageQueue extends ConcurrentLinkedQueue [ Envelope ] with UnboundedQueueBasedMessageQueue {
final def queue : Queue [ Envelope ] = this
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2013-03-07 15:58:26 +01:00
/* *
2013-04-03 20:05:20 +02:00
* SingleConsumerOnlyUnboundedMailbox is a high - performance , multiple producer — single consumer , unbounded MailboxType ,
2013-03-07 15:58:26 +01:00
* the only drawback is that you can 't have multiple consumers ,
2014-01-10 17:14:10 +01:00
* which rules out using it with BalancingPool ( BalancingDispatcher ) for instance .
2013-03-07 15:58:26 +01:00
*/
2014-03-07 13:20:01 +01:00
final case class SingleConsumerOnlyUnboundedMailbox ( ) extends MailboxType with ProducesMessageQueue [ NodeMessageQueue ] {
2013-03-07 15:58:26 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( )
2013-06-01 21:58:34 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue = new NodeMessageQueue
2013-03-07 15:58:26 +01:00
}
2014-07-05 10:34:11 +02:00
/* *
* NonBlockingBoundedMailbox is a high - performance , multiple producer — multiple consumer , bounded MailboxType ,
* Noteworthy is that it discards overflow as DeadLetters .
*
* NOTE : NonBlockingBoundedMailbox does not use `mailbox-push-timeout-time` as it is non - blocking .
*/
case class NonBlockingBoundedMailbox ( val capacity : Int ) extends MailboxType with ProducesMessageQueue [ BoundedNodeMessageQueue ] {
def this ( settings : ActorSystem . Settings , config : Config ) = this ( config . getInt ( "mailbox-capacity" ) )
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for NonBlockingBoundedMailbox can not be negative" )
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
new BoundedNodeMessageQueue ( capacity )
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedMailbox is the default bounded MailboxType used by Akka Actors .
2012-05-16 17:37:23 +02:00
*/
2015-08-20 15:16:32 +02:00
final case class BoundedMailbox ( val capacity : Int , override val pushTimeOut : FiniteDuration )
extends MailboxType with ProducesMessageQueue [ BoundedMailbox . MessageQueue ]
with ProducesPushTimeoutSemanticsMailbox {
2011-09-21 15:01:47 +02:00
2012-02-26 21:26:25 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( config . getInt ( "mailbox-capacity" ) ,
2014-01-09 14:09:52 +01:00
config . getNanosDuration ( "mailbox-push-timeout-time" ) )
2012-02-21 16:40:34 +01:00
2011-05-18 17:25:30 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
2010-09-21 18:52:41 +02:00
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2013-06-01 21:58:34 +02:00
new BoundedMailbox . MessageQueue ( capacity , pushTimeOut )
}
object BoundedMailbox {
class MessageQueue ( capacity : Int , final val pushTimeOut : FiniteDuration )
extends LinkedBlockingQueue [ Envelope ] ( capacity ) with BoundedQueueBasedMessageQueue {
final def queue : BlockingQueue [ Envelope ] = this
}
2011-09-21 15:01:47 +02:00
}
2010-09-21 18:52:41 +02:00
2012-02-21 17:23:54 +01:00
/* *
2013-04-18 13:35:36 +02:00
* UnboundedPriorityMailbox is an unbounded mailbox that allows for prioritization of its contents .
2012-05-18 13:37:26 +02:00
* Extend this class and provide the Comparator in the constructor .
2012-02-21 17:23:54 +01:00
*/
2013-06-01 21:58:34 +02:00
class UnboundedPriorityMailbox ( val cmp : Comparator [ Envelope ] , val initialCapacity : Int )
extends MailboxType with ProducesMessageQueue [ UnboundedPriorityMailbox . MessageQueue ] {
2012-05-18 13:37:26 +02:00
def this ( cmp : Comparator [ Envelope ] ) = this ( cmp , 11 )
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2013-06-01 21:58:34 +02:00
new UnboundedPriorityMailbox . MessageQueue ( initialCapacity , cmp )
}
object UnboundedPriorityMailbox {
class MessageQueue ( initialCapacity : Int , cmp : Comparator [ Envelope ] )
extends PriorityBlockingQueue [ Envelope ] ( initialCapacity , cmp ) with UnboundedQueueBasedMessageQueue {
final def queue : Queue [ Envelope ] = this
}
2011-03-09 18:11:45 +01:00
}
2012-02-21 17:23:54 +01:00
/* *
2013-04-18 13:35:36 +02:00
* BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents .
2012-05-18 13:37:26 +02:00
* Extend this class and provide the Comparator in the constructor .
2012-02-21 17:23:54 +01:00
*/
2015-08-20 15:16:32 +02:00
class BoundedPriorityMailbox ( final val cmp : Comparator [ Envelope ] , final val capacity : Int , override final val pushTimeOut : Duration )
extends MailboxType with ProducesMessageQueue [ BoundedPriorityMailbox . MessageQueue ]
with ProducesPushTimeoutSemanticsMailbox {
2011-03-09 18:11:45 +01:00
2011-09-21 15:01:47 +02:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
2011-03-09 18:11:45 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2013-06-01 21:58:34 +02:00
new BoundedPriorityMailbox . MessageQueue ( capacity , cmp , pushTimeOut )
}
object BoundedPriorityMailbox {
class MessageQueue ( capacity : Int , cmp : Comparator [ Envelope ] , val pushTimeOut : Duration )
extends BoundedBlockingQueue [ Envelope ] ( capacity , new PriorityQueue [ Envelope ] ( 11 , cmp ) )
with BoundedQueueBasedMessageQueue {
final def queue : BlockingQueue [ Envelope ] = this
}
2011-09-21 15:01:47 +02:00
}
2011-03-09 18:11:45 +01:00
2015-01-10 14:44:25 +00:00
/* *
* UnboundedStablePriorityMailbox is an unbounded mailbox that allows for prioritization of its contents . Unlike the
* [ [ UnboundedPriorityMailbox ] ] it preserves ordering for messages of equal priority .
* Extend this class and provide the Comparator in the constructor .
*/
class UnboundedStablePriorityMailbox ( val cmp : Comparator [ Envelope ] , val initialCapacity : Int )
extends MailboxType with ProducesMessageQueue [ UnboundedStablePriorityMailbox . MessageQueue ] {
def this ( cmp : Comparator [ Envelope ] ) = this ( cmp , 11 )
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
new UnboundedStablePriorityMailbox . MessageQueue ( initialCapacity , cmp )
}
object UnboundedStablePriorityMailbox {
class MessageQueue ( initialCapacity : Int , cmp : Comparator [ Envelope ] )
extends StablePriorityBlockingQueue [ Envelope ] ( initialCapacity , cmp ) with UnboundedQueueBasedMessageQueue {
final def queue : Queue [ Envelope ] = this
}
}
/* *
* BoundedStablePriorityMailbox is a bounded mailbox that allows for prioritization of its contents . Unlike the
* [ [ BoundedPriorityMailbox ] ] it preserves ordering for messages of equal priority .
* Extend this class and provide the Comparator in the constructor .
*/
2015-08-20 15:16:32 +02:00
class BoundedStablePriorityMailbox ( final val cmp : Comparator [ Envelope ] , final val capacity : Int , override final val pushTimeOut : Duration )
extends MailboxType with ProducesMessageQueue [ BoundedStablePriorityMailbox . MessageQueue ]
with ProducesPushTimeoutSemanticsMailbox {
2015-01-10 14:44:25 +00:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedMailbox can not be null" )
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
new BoundedStablePriorityMailbox . MessageQueue ( capacity , cmp , pushTimeOut )
}
object BoundedStablePriorityMailbox {
class MessageQueue ( capacity : Int , cmp : Comparator [ Envelope ] , val pushTimeOut : Duration )
extends BoundedBlockingQueue [ Envelope ] ( capacity , new StablePriorityQueue [ Envelope ] ( 11 , cmp ) )
with BoundedQueueBasedMessageQueue {
final def queue : BlockingQueue [ Envelope ] = this
}
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* UnboundedDequeBasedMailbox is an unbounded MailboxType , backed by a Deque .
2012-05-16 17:37:23 +02:00
*/
2014-03-07 13:20:01 +01:00
final case class UnboundedDequeBasedMailbox ( ) extends MailboxType with ProducesMessageQueue [ UnboundedDequeBasedMailbox . MessageQueue ] {
2012-02-24 16:32:00 +01:00
2012-02-27 23:26:15 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( )
2012-02-24 16:32:00 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2013-06-01 21:58:34 +02:00
new UnboundedDequeBasedMailbox . MessageQueue
}
object UnboundedDequeBasedMailbox {
class MessageQueue extends LinkedBlockingDeque [ Envelope ] with UnboundedDequeBasedMessageQueue {
final val queue = this
}
2012-02-08 18:26:54 +01:00
}
2012-05-16 17:37:23 +02:00
/* *
2012-05-18 13:37:26 +02:00
* BoundedDequeBasedMailbox is an bounded MailboxType , backed by a Deque .
2012-05-16 17:37:23 +02:00
*/
2015-08-20 15:16:32 +02:00
case class BoundedDequeBasedMailbox ( final val capacity : Int , override final val pushTimeOut : FiniteDuration )
extends MailboxType with ProducesMessageQueue [ BoundedDequeBasedMailbox . MessageQueue ]
with ProducesPushTimeoutSemanticsMailbox {
2012-02-08 18:26:54 +01:00
2012-02-27 23:26:15 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( config . getInt ( "mailbox-capacity" ) ,
2014-01-09 14:09:52 +01:00
config . getNanosDuration ( "mailbox-push-timeout-time" ) )
2012-02-24 16:32:00 +01:00
if ( capacity < 0 ) throw new IllegalArgumentException ( "The capacity for BoundedDequeBasedMailbox can not be negative" )
if ( pushTimeOut eq null ) throw new IllegalArgumentException ( "The push time-out for BoundedDequeBasedMailbox can not be null" )
2012-02-08 18:26:54 +01:00
2012-06-13 17:57:56 +02:00
final override def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue =
2013-06-01 21:58:34 +02:00
new BoundedDequeBasedMailbox . MessageQueue ( capacity , pushTimeOut )
}
object BoundedDequeBasedMailbox {
class MessageQueue ( capacity : Int , val pushTimeOut : FiniteDuration )
extends LinkedBlockingDeque [ Envelope ] ( capacity ) with BoundedDequeBasedMessageQueue {
final val queue = this
}
2012-02-08 18:26:54 +01:00
}
2013-04-18 13:35:36 +02:00
2014-03-11 17:03:05 +01:00
/* *
* ControlAwareMessageQueue handles messages that extend [ [ akka . dispatch . ControlMessage ] ] with priority .
*/
trait ControlAwareMessageQueueSemantics extends QueueBasedMessageQueue {
def controlQueue : Queue [ Envelope ]
def queue : Queue [ Envelope ]
def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = handle match {
case envelope @ Envelope ( _ : ControlMessage , _ ) ⇒ controlQueue add envelope
case envelope ⇒ queue add envelope
}
def dequeue ( ) : Envelope = {
val controlMsg = controlQueue . poll ( )
if ( controlMsg ne null ) controlMsg
else queue . poll ( )
}
override def numberOfMessages : Int = controlQueue . size ( ) + queue . size ( )
override def hasMessages : Boolean = ! ( queue . isEmpty && controlQueue . isEmpty )
}
trait UnboundedControlAwareMessageQueueSemantics extends UnboundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
trait BoundedControlAwareMessageQueueSemantics extends BoundedMessageQueueSemantics with ControlAwareMessageQueueSemantics
/* *
* Messages that extend this trait will be handled with priority by control aware mailboxes .
*/
trait ControlMessage
/* *
* UnboundedControlAwareMailbox is an unbounded MailboxType , that maintains two queues
* to allow messages that extend [ [ akka . dispatch . ControlMessage ] ] to be delivered with priority .
*/
final case class UnboundedControlAwareMailbox ( ) extends MailboxType with ProducesMessageQueue [ UnboundedControlAwareMailbox . MessageQueue ] {
// this constructor will be called via reflection when this mailbox type
// is used in the application config
def this ( settings : ActorSystem . Settings , config : Config ) = this ( )
def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue = new UnboundedControlAwareMailbox . MessageQueue
}
object UnboundedControlAwareMailbox {
class MessageQueue extends ControlAwareMessageQueueSemantics with UnboundedMessageQueueSemantics {
val controlQueue : Queue [ Envelope ] = new ConcurrentLinkedQueue [ Envelope ] ( )
val queue : Queue [ Envelope ] = new ConcurrentLinkedQueue [ Envelope ] ( )
}
}
/* *
* BoundedControlAwareMailbox is a bounded MailboxType , that maintains two queues
* to allow messages that extend [ [ akka . dispatch . ControlMessage ] ] to be delivered with priority .
*/
2015-08-20 15:16:32 +02:00
final case class BoundedControlAwareMailbox ( capacity : Int , override final val pushTimeOut : FiniteDuration ) extends MailboxType
with ProducesMessageQueue [ BoundedControlAwareMailbox . MessageQueue ]
with ProducesPushTimeoutSemanticsMailbox {
2014-03-11 17:03:05 +01:00
def this ( settings : ActorSystem . Settings , config : Config ) = this ( config . getInt ( "mailbox-capacity" ) ,
config . getNanosDuration ( "mailbox-push-timeout-time" ) )
def create ( owner : Option [ ActorRef ] , system : Option [ ActorSystem ] ) : MessageQueue = new BoundedControlAwareMailbox . MessageQueue ( capacity , pushTimeOut )
}
object BoundedControlAwareMailbox {
class MessageQueue ( val capacity : Int , val pushTimeOut : FiniteDuration ) extends BoundedControlAwareMessageQueueSemantics {
private final val size = new AtomicInteger ( 0 )
private final val putLock = new ReentrantLock ( )
private final val notFull = putLock . newCondition ( )
// no need to use blocking queues here, as blocking is being handled in `enqueueWithTimeout`
val controlQueue = new ConcurrentLinkedQueue [ Envelope ] ( )
val queue = new ConcurrentLinkedQueue [ Envelope ] ( )
override def enqueue ( receiver : ActorRef , handle : Envelope ) : Unit = handle match {
case envelope @ Envelope ( _ : ControlMessage , _ ) ⇒ enqueueWithTimeout ( controlQueue , receiver , envelope )
case envelope ⇒ enqueueWithTimeout ( queue , receiver , envelope )
}
override def numberOfMessages : Int = size . get ( )
override def hasMessages : Boolean = numberOfMessages > 0
@tailrec
final override def dequeue ( ) : Envelope = {
val count = size . get ( )
// if both queues are empty return null
if ( count > 0 ) {
// if there are messages try to fetch the current head
// or retry if other consumer dequeued in the mean time
if ( size . compareAndSet ( count , count - 1 ) ) {
val item = super . dequeue ( )
if ( size . get < capacity ) signalNotFull ( )
item
} else {
dequeue ( )
}
} else {
null
}
}
private def signalNotFull ( ) {
putLock . lock ( )
try {
notFull . signal ( )
} finally {
putLock . unlock ( )
}
}
private final def enqueueWithTimeout ( q : Queue [ Envelope ] , receiver : ActorRef , envelope : Envelope ) {
var remaining = pushTimeOut . toNanos
putLock . lockInterruptibly ( )
val inserted = try {
var stop = false
while ( size . get ( ) == capacity && ! stop ) {
remaining = notFull . awaitNanos ( remaining )
stop = remaining <= 0
}
if ( stop ) {
false
} else {
q . add ( envelope )
val c = size . incrementAndGet ( )
if ( c < capacity ) notFull . signal ( )
true
}
} finally {
putLock . unlock ( )
}
if ( ! inserted ) {
receiver . asInstanceOf [ InternalActorRef ] . provider . deadLetters . tell (
DeadLetter ( envelope . message , envelope . sender , receiver ) , envelope . sender )
}
}
}
}
2013-04-18 13:35:36 +02:00
/* *
* Trait to signal that an Actor requires a certain type of message queue semantics .
*
* The mailbox type will be looked up by mapping the type T via akka . actor . mailbox . requirements in the config ,
* to a mailbox configuration . If no mailbox is assigned on Props or in deployment config then this one will be used .
*
2013-04-24 08:39:29 +02:00
* The queue type of the created mailbox will be checked against the type T and actor creation will fail if it doesn ' t
* fulfill the requirements .
2013-04-18 13:35:36 +02:00
*/
trait RequiresMessageQueue [ T ]