2009-05-25 14:48:43 +02:00
/* *
2015-03-07 22:58:48 -08:00
* Copyright ( C ) 2009 - 2015 Typesafe Inc . < http : //www.typesafe.com>
2009-05-25 14:48:43 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.dispatch
2009-05-25 14:48:43 +02:00
2010-09-07 18:32:50 +02:00
import java.util.concurrent._
2015-05-08 14:07:06 +08:00
import java. { util ⇒ ju }
2010-10-26 12:49:25 +02:00
import akka.actor._
2013-03-05 16:19:54 +01:00
import akka.dispatch.sysmsg._
2014-06-19 11:33:08 +02:00
import akka.event.EventStream
2015-05-08 14:07:06 +08:00
import akka.event.Logging. { Debug , Error , LogEventException }
import akka.util. { Index , Unsafe }
import com.typesafe.config.Config
2012-07-22 15:33:18 +02:00
import scala.annotation.tailrec
2015-05-08 14:07:06 +08:00
import scala.concurrent. { ExecutionContext , ExecutionContextExecutor }
import scala.concurrent.duration. { Duration , FiniteDuration }
import scala.concurrent.forkjoin. { ForkJoinPool , ForkJoinTask }
2013-01-17 18:24:57 +01:00
import scala.util.control.NonFatal
2010-02-23 19:49:01 +01:00
2012-06-13 17:57:56 +02:00
final case class Envelope private ( val message : Any , val sender : ActorRef )
object Envelope {
2013-02-20 11:42:29 +01:00
def apply ( message : Any , sender : ActorRef , system : ActorSystem ) : Envelope = {
if ( message == null ) throw new InvalidMessageException ( "Message is null" )
2013-01-17 18:24:57 +01:00
new Envelope ( message , if ( sender ne Actor . noSender ) sender else system . deadLetters )
2013-02-20 11:42:29 +01:00
}
2011-09-20 18:34:21 +02:00
}
2012-08-03 23:33:45 +02:00
final case class TaskInvocation ( eventStream : EventStream , runnable : Runnable , cleanup : ( ) ⇒ Unit ) extends Batchable {
final override def isBatchable : Boolean = runnable match {
case b : Batchable ⇒ b . isBatchable
case _ : scala . concurrent . OnCompleteRunnable ⇒ true
case _ ⇒ false
}
2012-05-16 17:04:13 +02:00
def run ( ) : Unit =
try runnable . run ( ) catch {
case NonFatal ( e ) ⇒ eventStream . publish ( Error ( e , "TaskInvocation" , this . getClass , e . getMessage ) )
} finally cleanup ( )
2011-02-25 15:20:58 -07:00
}
2012-05-16 17:04:13 +02:00
/* *
* INTERNAL API
*/
2012-02-13 18:14:35 +01:00
private [ akka ] trait LoadMetrics { self : Executor ⇒
def atFullThrottle ( ) : Boolean
2011-12-30 13:48:31 +01:00
}
2012-05-16 17:04:13 +02:00
/* *
* INTERNAL API
*/
2012-05-11 15:00:37 +02:00
private [ akka ] object MessageDispatcher {
2011-11-16 16:09:58 +01:00
val UNSCHEDULED = 0 //WARNING DO NOT CHANGE THE VALUE OF THIS: It relies on the faster init of 0 in AbstractMessageDispatcher
2011-05-18 08:37:58 +02:00
val SCHEDULED = 1
2010-10-25 00:01:31 +02:00
val RESCHEDULED = 2
2011-10-11 16:05:48 +02:00
2012-02-28 15:48:02 +01:00
// dispatcher debugging helper using println (see below)
// since this is a compile-time constant, scalac will elide code behind if (MessageDispatcher.debug) (RK checked with 2.9.1)
2012-05-23 15:17:49 +02:00
final val debug = false // Deliberately without type ascription to make it a compile-time constant
2012-02-28 15:48:02 +01:00
lazy val actors = new Index [ MessageDispatcher , ActorRef ] ( 16 , _ compareTo _ )
2013-03-28 23:45:48 +01:00
def printActors ( ) : Unit =
2013-01-17 14:00:01 +01:00
if ( debug ) {
for {
d ← actors . keys
a ← { println ( d + " inhabitants: " + d . inhabitants ) ; actors . valueIterator ( d ) }
} {
val status = if ( a . isTerminated ) " (terminated)" else " (alive)"
val messages = a match {
case r : ActorRefWithCell ⇒ " " + r . underlying . numberOfMessages + " messages"
case _ ⇒ " " + a . getClass
}
val parent = a match {
case i : InternalActorRef ⇒ ", parent: " + i . getParent
case _ ⇒ ""
}
println ( " -> " + a + status + messages + parent )
2012-02-28 15:48:02 +01:00
}
}
2010-10-25 00:01:31 +02:00
}
2014-01-13 09:31:36 +01:00
abstract class MessageDispatcher ( val configurator : MessageDispatcherConfigurator ) extends AbstractMessageDispatcher with BatchingExecutor with ExecutionContextExecutor {
2011-11-14 16:03:26 +01:00
2012-05-11 15:00:37 +02:00
import AbstractMessageDispatcher. { inhabitantsOffset , shutdownScheduleOffset }
2015-05-08 14:07:06 +08:00
import MessageDispatcher._
2013-06-03 11:41:11 +02:00
import configurator.prerequisites
val mailboxes = prerequisites . mailboxes
val eventStream = prerequisites . eventStream
2010-11-22 17:58:21 +01:00
2012-05-11 15:00:37 +02:00
@volatile private [ this ] var _inhabitantsDoNotCallMeDirectly : Long = _ // DO NOT TOUCH!
@volatile private [ this ] var _shutdownScheduleDoNotCallMeDirectly : Int = _ // DO NOT TOUCH!
@tailrec private final def addInhabitants ( add : Long ) : Long = {
2012-05-11 16:19:46 +02:00
val c = inhabitants
2012-05-11 15:00:37 +02:00
val r = c + add
2013-01-25 08:04:01 +01:00
if ( r < 0 ) {
// We haven't succeeded in decreasing the inhabitants yet but the simple fact that we're trying to
// go below zero means that there is an imbalance and we might as well throw the exception
val e = new IllegalStateException ( "ACTOR SYSTEM CORRUPTED!!! A dispatcher can't have less than 0 inhabitants!" )
reportFailure ( e )
throw e
}
2012-05-11 16:19:46 +02:00
if ( Unsafe . instance . compareAndSwapLong ( this , inhabitantsOffset , c , r ) ) r else addInhabitants ( add )
2012-05-11 15:00:37 +02:00
}
final def inhabitants : Long = Unsafe . instance . getLongVolatile ( this , inhabitantsOffset )
private final def shutdownSchedule : Int = Unsafe . instance . getIntVolatile ( this , shutdownScheduleOffset )
private final def updateShutdownSchedule ( expect : Int , update : Int ) : Boolean = Unsafe . instance . compareAndSwapInt ( this , shutdownScheduleOffset , expect , update )
2010-11-22 17:58:21 +01:00
/* *
* Creates and returns a mailbox for the given actor .
*/
2013-06-01 21:58:34 +02:00
protected [ akka ] def createMailbox ( actor : Cell , mailboxType : MailboxType ) : Mailbox
2013-04-18 13:35:36 +02:00
2011-12-20 21:08:27 +01:00
/* *
2011-12-27 16:22:24 +01:00
* Identifier of this dispatcher , corresponds to the full key
2011-12-21 19:02:06 +01:00
* of the dispatcher configuration .
2011-12-20 21:08:27 +01:00
*/
2011-12-21 19:02:06 +01:00
def id : String
2011-12-20 21:08:27 +01:00
2010-10-24 16:01:00 +02:00
/* *
2012-02-13 15:33:31 +01:00
* Attaches the specified actor instance to this dispatcher , which includes
* scheduling it to run for the first time ( Create ( ) is expected to have
* been enqueued by the ActorCell upon mailbox creation ) .
2010-10-24 16:01:00 +02:00
*/
2012-02-13 15:33:31 +01:00
final def attach ( actor : ActorCell ) : Unit = {
register ( actor )
registerForExecution ( actor . mailbox , false , true )
}
2010-10-24 15:22:28 +02:00
2010-10-24 16:01:00 +02:00
/* *
2011-09-15 08:12:07 +02:00
* Detaches the specified actor instance from this dispatcher
2010-10-24 16:01:00 +02:00
*/
2012-06-29 16:06:26 +02:00
final def detach ( actor : ActorCell ) : Unit = try unregister ( actor ) finally ifSensibleToDoSoThenScheduleShutdown ( )
2014-01-27 16:08:49 +01:00
final protected def resubmitOnBlock : Boolean = true // We want to avoid starvation
2012-08-03 23:33:45 +02:00
final override protected def unbatchedExecute ( r : Runnable ) : Unit = {
val invocation = TaskInvocation ( eventStream , r , taskCleanup )
2012-05-11 15:00:37 +02:00
addInhabitants ( + 1 )
2011-04-28 16:01:11 +02:00
try {
2011-11-16 15:54:14 +01:00
executeTask ( invocation )
2011-04-28 16:01:11 +02:00
} catch {
2012-06-28 15:33:49 +02:00
case t : Throwable ⇒
2012-05-11 15:00:37 +02:00
addInhabitants ( - 1 )
2012-01-31 14:44:14 +01:00
throw t
2011-02-28 11:48:51 -07:00
}
2011-04-27 20:45:39 -06:00
}
2012-06-29 16:06:26 +02:00
override def reportFailure ( t : Throwable ) : Unit = t match {
2013-06-01 21:58:34 +02:00
case e : LogEventException ⇒ eventStream . publish ( e . event )
case _ ⇒ eventStream . publish ( Error ( t , getClass . getName , getClass , t . getMessage ) )
2012-02-03 10:37:31 +01:00
}
2011-11-16 15:54:14 +01:00
@tailrec
2013-01-25 08:04:01 +01:00
private final def ifSensibleToDoSoThenScheduleShutdown ( ) : Unit = {
if ( inhabitants <= 0 ) shutdownSchedule match {
case UNSCHEDULED ⇒
if ( updateShutdownSchedule ( UNSCHEDULED , SCHEDULED ) ) scheduleShutdownAction ( )
else ifSensibleToDoSoThenScheduleShutdown ( )
case SCHEDULED ⇒
if ( updateShutdownSchedule ( SCHEDULED , RESCHEDULED ) ) ( )
else ifSensibleToDoSoThenScheduleShutdown ( )
case RESCHEDULED ⇒
}
2011-11-16 15:54:14 +01:00
}
2011-12-09 12:16:13 +01:00
private def scheduleShutdownAction ( ) : Unit = {
// IllegalStateException is thrown if scheduler has been shutdown
2013-06-03 11:41:11 +02:00
try prerequisites . scheduler . scheduleOnce ( shutdownTimeout , shutdownAction ) ( new ExecutionContext {
2012-08-08 15:57:30 +02:00
override def execute ( runnable : Runnable ) : Unit = runnable . run ( )
override def reportFailure ( t : Throwable ) : Unit = MessageDispatcher . this . reportFailure ( t )
} ) catch {
2011-12-09 12:16:13 +01:00
case _ : IllegalStateException ⇒ shutdown ( )
}
}
2012-05-11 15:00:37 +02:00
private final val taskCleanup : ( ) ⇒ Unit = ( ) ⇒ if ( addInhabitants ( - 1 ) == 0 ) ifSensibleToDoSoThenScheduleShutdown ( )
2011-02-25 15:20:58 -07:00
2011-07-15 09:39:04 +02:00
/* *
2012-02-13 15:33:31 +01:00
* If you override it , you must call it . But only ever once . See "attach" for only invocation .
2012-05-16 17:04:13 +02:00
*
* INTERNAL API
2011-07-15 09:39:04 +02:00
*/
2011-09-27 17:41:02 +02:00
protected [ akka ] def register ( actor : ActorCell ) {
2012-02-28 15:48:02 +01:00
if ( debug ) actors . put ( this , actor . self )
2012-05-11 15:00:37 +02:00
addInhabitants ( + 1 )
2010-09-01 16:33:56 +02:00
}
2010-10-29 16:33:31 +02:00
2011-07-15 09:39:04 +02:00
/* *
2011-11-16 16:19:56 +01:00
* If you override it , you must call it . But only ever once . See "detach" for the only invocation
2012-05-16 17:04:13 +02:00
*
* INTERNAL API
2011-07-15 09:39:04 +02:00
*/
2011-09-27 17:41:02 +02:00
protected [ akka ] def unregister ( actor : ActorCell ) {
2012-02-28 15:48:02 +01:00
if ( debug ) actors . remove ( this , actor . self )
2012-05-11 15:00:37 +02:00
addInhabitants ( - 1 )
2013-06-03 11:41:11 +02:00
val mailBox = actor . swapMailbox ( mailboxes . deadLetterMailbox )
2012-10-16 12:06:03 +02:00
mailBox . becomeClosed ( )
2011-11-15 14:39:43 +01:00
mailBox . cleanUp ( )
2010-10-24 01:18:59 +02:00
}
2010-10-25 00:01:31 +02:00
private val shutdownAction = new Runnable {
2011-11-16 15:54:14 +01:00
@tailrec
final def run ( ) {
2012-05-11 15:00:37 +02:00
shutdownSchedule match {
2011-11-16 15:54:14 +01:00
case SCHEDULED ⇒
try {
2012-05-11 15:00:37 +02:00
if ( inhabitants == 0 ) shutdown ( ) //Warning, racy
2011-11-16 15:54:14 +01:00
} finally {
2012-05-11 15:00:37 +02:00
while ( ! updateShutdownSchedule ( shutdownSchedule , UNSCHEDULED ) ) { }
2011-11-16 15:54:14 +01:00
}
case RESCHEDULED ⇒
2012-05-11 15:00:37 +02:00
if ( updateShutdownSchedule ( RESCHEDULED , SCHEDULED ) ) scheduleShutdownAction ( )
2011-11-16 15:54:14 +01:00
else run ( )
2012-06-04 11:46:59 +02:00
case UNSCHEDULED ⇒
2011-11-16 15:54:14 +01:00
}
2010-10-25 00:01:31 +02:00
}
}
2010-11-12 14:04:06 +01:00
/* *
2011-11-21 10:48:21 +01:00
* When the dispatcher no longer has any actors registered , how long will it wait until it shuts itself down ,
2011-12-20 13:40:29 +01:00
* defaulting to your akka configs "akka.actor.default-dispatcher.shutdown-timeout" or default specified in
2011-12-06 16:49:39 +01:00
* reference . conf
2012-05-16 17:04:13 +02:00
*
* INTERNAL API
2010-11-12 14:04:06 +01:00
*/
2012-09-18 09:58:30 +02:00
protected [ akka ] def shutdownTimeout : FiniteDuration
2010-10-25 00:01:31 +02:00
2010-10-24 16:01:00 +02:00
/* *
* After the call to this method , the dispatcher mustn 't begin any new message processing for the specified reference
*/
2013-03-28 23:45:48 +01:00
protected [ akka ] def suspend ( actor : ActorCell ) : Unit = {
2011-10-18 15:09:35 +02:00
val mbox = actor . mailbox
2012-04-03 00:37:09 +02:00
if ( ( mbox . actor eq actor ) && ( mbox . dispatcher eq this ) )
2012-07-13 12:25:26 +02:00
mbox . suspend ( )
2011-10-18 15:09:35 +02:00
}
2010-10-24 16:01:00 +02:00
/*
* After the call to this method , the dispatcher must begin any new message processing for the specified reference
*/
2013-03-28 23:45:48 +01:00
protected [ akka ] def resume ( actor : ActorCell ) : Unit = {
2011-09-23 09:33:53 +02:00
val mbox = actor . mailbox
2012-07-13 12:25:26 +02:00
if ( ( mbox . actor eq actor ) && ( mbox . dispatcher eq this ) && mbox . resume ( ) )
2011-10-18 15:09:35 +02:00
registerForExecution ( mbox , false , false )
2011-09-23 09:33:53 +02:00
}
2010-09-01 16:33:56 +02:00
2011-09-26 19:52:49 +02:00
/* *
2012-05-16 17:04:13 +02:00
* Will be called when the dispatcher is to queue an invocation for execution
*
* INTERNAL API
2011-09-26 19:52:49 +02:00
*/
2011-10-18 16:44:35 +02:00
protected [ akka ] def systemDispatch ( receiver : ActorCell , invocation : SystemMessage )
2011-09-26 19:52:49 +02:00
2010-10-24 16:01:00 +02:00
/* *
2012-05-16 17:04:13 +02:00
* Will be called when the dispatcher is to queue an invocation for execution
*
* INTERNAL API
2010-10-24 16:01:00 +02:00
*/
2011-10-19 13:19:44 +02:00
protected [ akka ] def dispatch ( receiver : ActorCell , invocation : Envelope )
2011-09-21 15:01:47 +02:00
/* *
2011-09-23 13:14:17 +02:00
* Suggest to register the provided mailbox for execution
2012-05-16 17:04:13 +02:00
*
* INTERNAL API
2011-09-21 15:01:47 +02:00
*/
2011-09-23 13:14:17 +02:00
protected [ akka ] def registerForExecution ( mbox : Mailbox , hasMessageHint : Boolean , hasSystemMessageHint : Boolean ) : Boolean
2011-09-21 15:01:47 +02:00
// TODO check whether this should not actually be a property of the mailbox
2012-05-16 17:04:13 +02:00
/* *
* INTERNAL API
*/
2011-09-21 15:01:47 +02:00
protected [ akka ] def throughput : Int
2012-05-16 17:04:13 +02:00
/* *
* INTERNAL API
*/
2011-11-21 10:48:21 +01:00
protected [ akka ] def throughputDeadlineTime : Duration
2010-10-24 01:18:59 +02:00
2012-05-16 17:04:13 +02:00
/* *
* INTERNAL API
*/
@inline protected [ akka ] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime . toMillis > 0
2011-10-07 11:34:07 +02:00
2012-05-16 17:04:13 +02:00
/* *
* INTERNAL API
*/
2011-08-30 15:50:52 +02:00
protected [ akka ] def executeTask ( invocation : TaskInvocation )
2011-02-25 15:20:58 -07:00
2010-10-24 16:01:00 +02:00
/* *
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
2011-11-16 15:54:14 +01:00
* Must be idempotent
2012-05-16 17:04:13 +02:00
*
* INTERNAL API
2010-10-24 16:01:00 +02:00
*/
2011-08-30 15:50:52 +02:00
protected [ akka ] def shutdown ( ) : Unit
2010-10-29 16:33:31 +02:00
}
2011-03-04 20:55:12 +01:00
2012-05-16 17:04:13 +02:00
/* *
* An ExecutorServiceConfigurator is a class that given some prerequisites and a configuration can create instances of ExecutorService
*/
2012-01-30 13:44:56 +01:00
abstract class ExecutorServiceConfigurator ( config : Config , prerequisites : DispatcherPrerequisites ) extends ExecutorServiceFactoryProvider
2011-03-04 20:55:12 +01:00
/* *
2011-12-21 21:24:57 +01:00
* Base class to be used for hooking in new dispatchers into Dispatchers .
2011-03-04 20:55:12 +01:00
*/
2013-11-27 15:03:27 +01:00
abstract class MessageDispatcherConfigurator ( _config : Config , val prerequisites : DispatcherPrerequisites ) {
val config : Config = new CachingConfig ( _config )
2011-12-20 21:08:27 +01:00
2011-04-12 15:40:09 +02:00
/* *
2011-12-20 21:08:27 +01:00
* Returns an instance of MessageDispatcher given the configuration .
2011-12-21 19:02:06 +01:00
* Depending on the needs the implementation may return a new instance for
* each invocation or return the same instance every time .
2011-04-12 15:40:09 +02:00
*/
2011-12-20 21:08:27 +01:00
def dispatcher ( ) : MessageDispatcher
2011-03-04 20:55:12 +01:00
2012-01-30 13:44:56 +01:00
def configureExecutor ( ) : ExecutorServiceConfigurator = {
2014-01-16 23:24:06 +01:00
def configurator ( executor : String ) : ExecutorServiceConfigurator = executor match {
2012-01-31 10:12:45 +01:00
case null | "" | "fork-join-executor" ⇒ new ForkJoinExecutorConfigurator ( config . getConfig ( "fork-join-executor" ) , prerequisites )
case "thread-pool-executor" ⇒ new ThreadPoolExecutorConfigurator ( config . getConfig ( "thread-pool-executor" ) , prerequisites )
2012-01-30 13:44:56 +01:00
case fqcn ⇒
2012-10-30 15:08:41 +01:00
val args = List (
Bye-bye ReflectiveAccess, introducing PropertyMaster, see #1750
- PropertyMaster is the only place in Akka which calls
ClassLoader.getClass (apart from kernel, which might be special)
- all PropertyMaster methods (there are only three) take a ClassManifest
of what is to be constructed, and they verify that the obtained object
is actually compatible with the required type
Other stuff:
- noticed that I had forgotten to change to ExtendedActorSystem when
constructing Extensions by ExtensionKey (damn you, reflection!)
- moved Serializer.currentSystem into JavaSerializer, because that’s the
only one needing it (it’s only used in readResolve() methods)
- Serializers are constructed now with one-arg constructor taking
ExtendedActorSystem (if that exists, otherwise no-arg as before), to
allow JavaSerializer to do its magic; possibly necessary for others as
well
- Removed all Option[ClassLoader] signatures
- made it so that the ActorSystem will try context class loader, then
the class loader which loaded the class actually calling into
ActorSystem.apply, then the loader which loaded ActorSystemImpl
- for the second of the above I added a (reflectively accessed hopefully
safe) facility for getting caller Class[_] objects by using
sun.reflect.Reflection; this is optional an defaults to None, e.g. on
Android, which means that getting the caller’s classloader is done on
a best effort basis (there’s nothing we can do because a StackTrace
does not contain actual Class[_] objects).
- refactored DurableMailbox to contain the owner val and use that
instead of declaring that in all subclasses
2012-02-09 11:56:43 +01:00
classOf [ Config ] -> config ,
classOf [ DispatcherPrerequisites ] -> prerequisites )
2012-09-06 03:17:51 +02:00
prerequisites . dynamicAccess . createInstanceFor [ ExecutorServiceConfigurator ] ( fqcn , args ) . recover ( {
case exception ⇒ throw new IllegalArgumentException (
2012-01-31 10:12:45 +01:00
( "" "Cannot instantiate ExecutorServiceConfigurator (" executor = [ % s ] " ) , defined in [ % s ] ,
2012-01-30 16:34:25 +01:00
make sure it has an accessible constructor with a [ % s ,% s ] signature "" " )
2012-01-31 10:12:45 +01:00
. format ( fqcn , config . getString ( "id" ) , classOf [ Config ] , classOf [ DispatcherPrerequisites ] ) , exception )
2012-09-06 03:17:51 +02:00
} ) . get
2012-01-30 13:44:56 +01:00
}
2014-01-16 23:24:06 +01:00
config . getString ( "executor" ) match {
case "default-executor" ⇒ new DefaultExecutorServiceConfigurator ( config . getConfig ( "default-executor" ) , prerequisites , configurator ( config . getString ( "default-executor.fallback" ) ) )
case other ⇒ configurator ( other )
}
2012-01-30 13:44:56 +01:00
}
}
2011-03-04 20:55:12 +01:00
2012-01-30 13:44:56 +01:00
class ThreadPoolExecutorConfigurator ( config : Config , prerequisites : DispatcherPrerequisites ) extends ExecutorServiceConfigurator ( config , prerequisites ) {
2011-11-15 11:34:39 +01:00
2012-01-30 13:44:56 +01:00
val threadPoolConfig : ThreadPoolConfig = createThreadPoolConfigBuilder ( config , prerequisites ) . config
2012-01-30 15:34:56 +01:00
protected def createThreadPoolConfigBuilder ( config : Config , prerequisites : DispatcherPrerequisites ) : ThreadPoolConfigBuilder = {
2014-01-09 14:09:52 +01:00
import akka.util.Helpers.ConfigOps
2012-01-30 13:44:56 +01:00
ThreadPoolConfigBuilder ( ThreadPoolConfig ( ) )
2014-01-09 14:09:52 +01:00
. setKeepAliveTime ( config . getMillisDuration ( "keep-alive-time" ) )
2011-12-06 20:33:25 +01:00
. setAllowCoreThreadTimeout ( config getBoolean "allow-core-timeout" )
2011-12-07 11:25:27 +01:00
. setCorePoolSizeFromFactor ( config getInt "core-pool-size-min" , config getDouble "core-pool-size-factor" , config getInt "core-pool-size-max" )
. setMaxPoolSizeFromFactor ( config getInt "max-pool-size-min" , config getDouble "max-pool-size-factor" , config getInt "max-pool-size-max" )
2011-12-06 20:33:25 +01:00
. configure (
2012-12-11 16:24:12 +01:00
Some ( config getInt "task-queue-size" ) flatMap {
2011-12-06 20:33:25 +01:00
case size if size > 0 ⇒
Some ( config getString "task-queue-type" ) map {
case "array" ⇒ ThreadPoolConfig . arrayBlockingQueue ( size , false ) //TODO config fairness?
case "" | "linked" ⇒ ThreadPoolConfig . linkedBlockingQueue ( size )
case x ⇒ throw new IllegalArgumentException ( "[%s] is not a valid task-queue-type [array|linked]!" format x )
2012-12-11 16:24:12 +01:00
} map { qf ⇒ ( q : ThreadPoolConfigBuilder ) ⇒ q . setQueueFactory ( qf ) }
2011-12-06 20:33:25 +01:00
case _ ⇒ None
2012-12-11 16:24:12 +01:00
} )
2011-03-04 20:55:12 +01:00
}
2012-01-30 13:44:56 +01:00
2012-10-02 09:31:23 +02:00
def createExecutorServiceFactory ( id : String , threadFactory : ThreadFactory ) : ExecutorServiceFactory =
threadPoolConfig . createExecutorServiceFactory ( id , threadFactory )
2011-03-04 20:55:12 +01:00
}
2012-01-30 15:34:56 +01:00
2012-02-06 15:19:05 +01:00
object ForkJoinExecutorConfigurator {
2012-02-06 15:42:28 +01:00
/* *
* INTERNAL AKKA USAGE ONLY
*/
final class AkkaForkJoinPool ( parallelism : Int ,
threadFactory : ForkJoinPool . ForkJoinWorkerThreadFactory ,
2015-05-08 14:07:06 +08:00
unhandledExceptionHandler : Thread . UncaughtExceptionHandler ,
asyncMode : Boolean )
extends ForkJoinPool ( parallelism , threadFactory , unhandledExceptionHandler , asyncMode ) with LoadMetrics {
def this ( parallelism : Int ,
threadFactory : ForkJoinPool . ForkJoinWorkerThreadFactory ,
unhandledExceptionHandler : Thread . UncaughtExceptionHandler ) = this ( parallelism , threadFactory , unhandledExceptionHandler , asyncMode = true )
2015-05-07 13:55:17 +02:00
override def execute ( r : Runnable ) : Unit =
2015-05-11 13:21:05 +02:00
if ( r ne null )
super . execute ( ( if ( r . isInstanceOf [ ForkJoinTask [ _ ] ] ) r else new AkkaForkJoinTask ( r ) ) . asInstanceOf [ ForkJoinTask [ Any ] ] )
else
throw new NullPointerException ( "Runnable was null" )
2012-02-13 18:14:35 +01:00
def atFullThrottle ( ) : Boolean = this . getActiveThreadCount ( ) >= this . getParallelism ( )
2012-02-06 15:19:05 +01:00
}
2012-02-06 15:42:28 +01:00
/* *
* INTERNAL AKKA USAGE ONLY
*/
2013-01-28 23:43:55 +01:00
@SerialVersionUID ( 1L )
final class AkkaForkJoinTask ( runnable : Runnable ) extends ForkJoinTask [ Unit ] {
override def getRawResult ( ) : Unit = ( )
override def setRawResult ( unit : Unit ) : Unit = ( )
final override def exec ( ) : Boolean = try { runnable . run ( ) ; true } catch {
2014-10-29 11:24:40 +01:00
case ie : InterruptedException ⇒
Thread . currentThread . interrupt ( )
false
2012-06-28 15:33:49 +02:00
case anything : Throwable ⇒
2012-02-06 15:19:05 +01:00
val t = Thread . currentThread
t . getUncaughtExceptionHandler match {
case null ⇒
case some ⇒ some . uncaughtException ( t , anything )
}
throw anything
}
}
2011-03-04 20:55:12 +01:00
}
2012-01-30 15:34:56 +01:00
class ForkJoinExecutorConfigurator ( config : Config , prerequisites : DispatcherPrerequisites ) extends ExecutorServiceConfigurator ( config , prerequisites ) {
2012-02-06 15:19:05 +01:00
import ForkJoinExecutorConfigurator._
2012-01-30 15:34:56 +01:00
2012-02-08 11:53:55 +01:00
def validate ( t : ThreadFactory ) : ForkJoinPool . ForkJoinWorkerThreadFactory = t match {
2012-01-30 15:34:56 +01:00
case correct : ForkJoinPool . ForkJoinWorkerThreadFactory ⇒ correct
case x ⇒ throw new IllegalStateException ( "The prerequisites for the ForkJoinExecutorConfigurator is a ForkJoinPool.ForkJoinWorkerThreadFactory!" )
}
class ForkJoinExecutorServiceFactory ( val threadFactory : ForkJoinPool . ForkJoinWorkerThreadFactory ,
2015-05-08 14:07:06 +08:00
val parallelism : Int ,
val asyncMode : Boolean ) extends ExecutorServiceFactory {
def this ( threadFactory : ForkJoinPool . ForkJoinWorkerThreadFactory , parallelism : Int ) = this ( threadFactory , parallelism , asyncMode = true )
def createExecutorService : ExecutorService = new AkkaForkJoinPool ( parallelism , threadFactory , MonitorableThreadFactory . doNothing , asyncMode )
2012-01-30 15:34:56 +01:00
}
2015-05-08 14:07:06 +08:00
2012-02-08 11:53:55 +01:00
final def createExecutorServiceFactory ( id : String , threadFactory : ThreadFactory ) : ExecutorServiceFactory = {
val tf = threadFactory match {
2012-02-08 14:03:31 +01:00
case m : MonitorableThreadFactory ⇒
// add the dispatcher id to the thread names
2012-12-06 22:50:40 +01:00
m . withName ( m . name + "-" + id )
2012-02-08 14:03:31 +01:00
case other ⇒ other
2012-02-08 11:53:55 +01:00
}
2015-05-08 14:07:06 +08:00
val asyncMode = config . getString ( "task-peeking-mode" ) match {
case "FIFO" ⇒ true
case "LIFO" ⇒ false
case unsupported ⇒ throw new IllegalArgumentException ( s""" Cannot instantiate ForkJoinExecutorServiceFactory. " task-peeking-mode " in " fork-join-executor " section could only set to " FIFO " or " LILO " . """ )
}
2012-01-30 15:34:56 +01:00
new ForkJoinExecutorServiceFactory (
2012-02-08 11:53:55 +01:00
validate ( tf ) ,
2012-01-30 15:34:56 +01:00
ThreadPoolConfig . scaledPoolSize (
config . getInt ( "parallelism-min" ) ,
config . getDouble ( "parallelism-factor" ) ,
2015-05-08 14:07:06 +08:00
config . getInt ( "parallelism-max" ) ) ,
asyncMode )
2012-02-08 11:53:55 +01:00
}
2012-01-30 15:34:56 +01:00
}
2014-01-16 23:24:06 +01:00
class DefaultExecutorServiceConfigurator ( config : Config , prerequisites : DispatcherPrerequisites , fallback : ExecutorServiceConfigurator ) extends ExecutorServiceConfigurator ( config , prerequisites ) {
val provider : ExecutorServiceFactoryProvider =
prerequisites . defaultExecutionContext match {
case Some ( ec ) ⇒
prerequisites . eventStream . publish ( Debug ( "DefaultExecutorServiceConfigurator" , this . getClass , s" Using passed in ExecutionContext as default executor for this ActorSystem. If you want to use a different executor, please specify one in akka.actor.default-dispatcher.default-executor. " ) )
new AbstractExecutorService with ExecutorServiceFactory with ExecutorServiceFactoryProvider {
def createExecutorServiceFactory ( id : String , threadFactory : ThreadFactory ) : ExecutorServiceFactory = this
def createExecutorService : ExecutorService = this
def shutdown ( ) : Unit = ( )
def isTerminated : Boolean = false
def awaitTermination ( timeout : Long , unit : TimeUnit ) : Boolean = false
def shutdownNow ( ) : ju.List [ Runnable ] = ju . Collections . emptyList ( )
def execute ( command : Runnable ) : Unit = ec . execute ( command )
def isShutdown : Boolean = false
}
case None ⇒ fallback
}
def createExecutorServiceFactory ( id : String , threadFactory : ThreadFactory ) : ExecutorServiceFactory =
provider . createExecutorServiceFactory ( id , threadFactory )
}