2009-12-11 16:37:44 +01:00
/* *
2009-12-27 16:01:53 +01:00
* Copyright ( C ) 2009 - 2010 Scalable Solutions AB < http : //scalablesolutions.se>
2009-12-11 16:37:44 +01:00
*/
package se.scalablesolutions.akka.dispatch
2010-07-02 11:14:49 +02:00
import se.scalablesolutions.akka.actor. { ActorRef , IllegalActorStateException }
2010-08-21 10:45:00 +02:00
import java.util.Queue
2010-09-11 15:24:09 +02:00
import java.util.concurrent. { RejectedExecutionException , ConcurrentLinkedQueue , LinkedBlockingQueue }
2010-05-27 15:50:11 +12:00
2009-12-11 16:37:44 +01:00
/* *
* Default settings are :
* < pre />
* - withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
* - NR_START_THREADS = 16
* - NR_MAX_THREADS = 128
* - KEEP_ALIVE_TIME = 60000L // one minute
* </ pre >
* < p />
*
* The dispatcher has a fluent builder interface to build up a thread pool to suite your use - case .
* There is a default thread pool defined but make use of the builder if you need it . Here are some examples .
* < p />
*
* Scala API .
* < p />
* Example usage :
* < pre />
* val dispatcher = new ExecutorBasedEventDrivenDispatcher ( "name" )
* dispatcher
* . withNewThreadPoolWithBoundedBlockingQueue ( 100 )
* . setCorePoolSize ( 16 )
* . setMaxPoolSize ( 128 )
* . setKeepAliveTimeInMillis ( 60000 )
* . setRejectionPolicy ( new CallerRunsPolicy )
* . buildThreadPool
* </ pre >
* < p />
*
* Java API .
* < p />
* Example usage :
* < pre />
* ExecutorBasedEventDrivenDispatcher dispatcher = new ExecutorBasedEventDrivenDispatcher ( "name" ) ;
* dispatcher
* . withNewThreadPoolWithBoundedBlockingQueue ( 100 )
* . setCorePoolSize ( 16 )
* . setMaxPoolSize ( 128 )
* . setKeepAliveTimeInMillis ( 60000 )
* . setRejectionPolicy ( new CallerRunsPolicy ( ) )
* . buildThreadPool ( ) ;
* </ pre >
* < p />
*
* But the preferred way of creating dispatchers is to use
2010-03-07 09:44:03 +01:00
* the { @link se . scalablesolutions . akka . dispatch . Dispatchers } factory object .
2009-12-11 16:37:44 +01:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2010-06-02 16:56:36 +02:00
* @param throughput positive integer indicates the dispatcher will only process so much messages at a time from the
* mailbox , without checking the mailboxes of other actors . Zero or negative means the dispatcher
* always continues until the mailbox is empty .
2010-06-30 16:26:15 +02:00
* Larger values ( or zero or negative ) increase througput , smaller values increase fairness
2009-12-11 16:37:44 +01:00
*/
2010-08-21 15:36:50 +02:00
class ExecutorBasedEventDrivenDispatcher (
2010-08-21 16:13:16 +02:00
_name : String ,
2010-09-09 17:34:05 +02:00
val throughput : Int = Dispatchers . THROUGHPUT ,
2010-09-15 15:10:47 +02:00
val throughputDeadlineMs : Int = Dispatchers . THROUGHPUT_DEADLINE_MS ,
2010-09-07 18:32:50 +02:00
mailboxConfig : MailboxConfig = Dispatchers . MAILBOX_CONFIG ,
2010-09-04 12:00:12 +02:00
config : ( ThreadPoolBuilder ) => Unit = _ => ( ) ) extends MessageDispatcher with ThreadPoolBuilder {
2010-08-21 15:36:50 +02:00
2010-09-15 15:10:47 +02:00
def this ( _name : String , throughput : Int , throughputDeadlineMs : Int , capacity : Int ) = this ( _name , throughput , throughputDeadlineMs , MailboxConfig ( capacity , None , false ) )
def this ( _name : String , throughput : Int ) = this ( _name , throughput , Dispatchers . THROUGHPUT_DEADLINE_MS , Dispatchers . MAILBOX_CAPACITY ) // Needed for Java API usage
def this ( _name : String ) = this ( _name , Dispatchers . THROUGHPUT , Dispatchers . THROUGHPUT_DEADLINE_MS , Dispatchers . MAILBOX_CAPACITY ) // Needed for Java API usage
2010-09-04 12:00:12 +02:00
2010-09-09 15:49:59 +02:00
//FIXME remove this from ThreadPoolBuilder
2010-09-07 18:32:50 +02:00
mailboxCapacity = mailboxConfig . capacity
2010-08-21 16:13:16 +02:00
2009-12-11 16:37:44 +01:00
@volatile private var active : Boolean = false
2010-03-04 21:22:16 +01:00
2010-07-18 07:13:43 +02:00
val name = "akka:event-driven:dispatcher:" + _name
2010-03-04 21:22:16 +01:00
init
2010-07-21 09:44:18 -04:00
/* *
2010-09-11 15:24:09 +02:00
* This is the behavior of an ExecutorBasedEventDrivenDispatchers mailbox
2010-07-21 09:44:18 -04:00
*/
2010-09-12 11:24:53 +02:00
trait ExecutableMailbox extends Runnable { self : MessageQueue =>
2010-09-12 15:36:32 +02:00
final def run = {
2010-09-12 11:24:53 +02:00
2010-09-12 21:00:50 +02:00
val reschedule = try {
processMailbox ( )
} finally {
dispatcherLock . unlock ( )
}
if ( reschedule || ! self . isEmpty )
registerForExecution ( self )
2010-09-10 18:12:09 +02:00
}
2010-06-02 16:56:36 +02:00
/* *
2010-09-10 18:12:09 +02:00
* Process the messages in the mailbox
2010-06-02 16:56:36 +02:00
*
* @return true if the processing finished before the mailbox was empty , due to the throughput constraint
*/
2010-09-12 15:36:32 +02:00
final def processMailbox ( ) : Boolean = {
2010-09-15 15:10:47 +02:00
var nextMessage = self . dequeue
if ( nextMessage ne null ) {
val throttle = throughput > 0
var processedMessages = 0
val isDeadlineEnabled = throttle && throughputDeadlineMs > 0
val started = if ( isDeadlineEnabled ) System . currentTimeMillis else 0
do {
nextMessage . invoke
if ( throttle ) { //Will be elided when false
processedMessages += 1
if ( ( processedMessages >= throughput )
|| ( isDeadlineEnabled && ( System . currentTimeMillis - started ) >= throughputDeadlineMs ) ) //If we're throttled, break out
return ! self . isEmpty
}
nextMessage = self . dequeue
}
while ( nextMessage ne null )
}
false
2010-06-02 16:56:36 +02:00
}
2010-09-10 18:12:09 +02:00
}
2010-09-09 17:34:05 +02:00
2010-09-11 15:24:09 +02:00
def dispatch ( invocation : MessageInvocation ) = {
val mbox = getMailbox ( invocation . receiver )
mbox enqueue invocation
2010-09-12 11:24:53 +02:00
registerForExecution ( mbox )
2010-09-11 15:24:09 +02:00
}
2010-09-12 21:00:50 +02:00
protected def registerForExecution ( mailbox : MessageQueue with ExecutableMailbox ) : Unit = if ( active ) {
if ( mailbox . dispatcherLock . tryLock ( ) ) {
try {
executor execute mailbox
} catch {
case e : RejectedExecutionException =>
mailbox . dispatcherLock . unlock ( )
throw e
}
}
} else {
log . warning ( "%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s" , toString , mailbox )
}
2010-09-11 15:24:09 +02:00
/* *
* @return the mailbox associated with the actor
*/
2010-09-12 11:24:53 +02:00
private def getMailbox ( receiver : ActorRef ) = receiver . mailbox . asInstanceOf [ MessageQueue with ExecutableMailbox ]
2010-09-11 15:24:09 +02:00
override def mailboxSize ( actorRef : ActorRef ) = getMailbox ( actorRef ) . size
2010-09-12 11:24:53 +02:00
override def createMailbox ( actorRef : ActorRef ) : AnyRef = {
if ( mailboxCapacity > 0 )
new DefaultBoundedMessageQueue ( mailboxCapacity , mailboxConfig . pushTimeOut , blockDequeue = false ) with ExecutableMailbox
else
new DefaultUnboundedMessageQueue ( blockDequeue = false ) with ExecutableMailbox
}
2010-09-11 15:24:09 +02:00
2010-06-02 16:56:36 +02:00
2009-12-11 16:37:44 +01:00
def start = if ( ! active ) {
2010-07-18 07:13:43 +02:00
log . debug ( "Starting up %s\n\twith throughput [%d]" , toString , throughput )
2009-12-11 16:37:44 +01:00
active = true
}
def shutdown = if ( active ) {
2010-07-18 07:13:43 +02:00
log . debug ( "Shutting down %s" , toString )
2009-12-11 16:37:44 +01:00
executor . shutdownNow
active = false
2010-08-27 12:12:33 +02:00
uuids . clear
2009-12-11 16:37:44 +01:00
}
2010-05-21 20:08:49 +02:00
2010-09-12 11:24:53 +02:00
def ensureNotActive ( ) : Unit = if ( active ) {
throw new IllegalActorStateException (
2009-12-11 16:37:44 +01:00
"Can't build a new thread pool for a dispatcher that is already up and running" )
2010-09-12 11:24:53 +02:00
}
2010-07-29 17:29:51 +02:00
2010-07-18 07:13:43 +02:00
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
2010-01-02 08:40:09 +01:00
2010-07-18 07:13:43 +02:00
// FIXME: should we have an unbounded queue and not bounded as default ????
2010-09-04 12:00:12 +02:00
private [ akka ] def init = {
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
config ( this )
buildThreadPool
}
2010-03-16 12:25:58 +01:00
}