Adding DispatcherPrerequisites to hold the common dependencies that a dispatcher needs to be created
This commit is contained in:
commit
80d766b07b
139 changed files with 1126 additions and 948 deletions
|
|
@ -14,6 +14,8 @@ import akka.actor._
|
|||
import akka.actor.ActorSystem
|
||||
import locks.ReentrantLock
|
||||
import scala.annotation.tailrec
|
||||
import akka.event.EventStream
|
||||
import akka.actor.ActorSystem.Settings
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -62,12 +64,12 @@ case class Supervise(child: ActorRef) extends SystemMessage // sent to superviso
|
|||
case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsWatching
|
||||
case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsWatching
|
||||
|
||||
final case class TaskInvocation(app: ActorSystem, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable {
|
||||
final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable {
|
||||
def run() {
|
||||
try {
|
||||
function()
|
||||
} catch {
|
||||
case e ⇒ app.eventStream.publish(Error(e, this, e.getMessage))
|
||||
case e ⇒ eventStream.publish(Error(e, this, e.getMessage))
|
||||
} finally {
|
||||
cleanup()
|
||||
}
|
||||
|
|
@ -79,26 +81,23 @@ object MessageDispatcher {
|
|||
val SCHEDULED = 1
|
||||
val RESCHEDULED = 2
|
||||
|
||||
implicit def defaultDispatcher(implicit app: ActorSystem) = app.dispatcher
|
||||
implicit def defaultDispatcher(implicit system: ActorSystem) = system.dispatcher
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDispatcher with Serializable {
|
||||
abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) extends AbstractMessageDispatcher with Serializable {
|
||||
|
||||
import MessageDispatcher._
|
||||
import AbstractMessageDispatcher.{ inhabitantsUpdater, shutdownScheduleUpdater }
|
||||
import prerequisites._
|
||||
|
||||
/**
|
||||
* Creates and returns a mailbox for the given actor.
|
||||
*/
|
||||
protected[akka] def createMailbox(actor: ActorCell): Mailbox
|
||||
|
||||
/**
|
||||
* a blackhole mailbox for the purpose of replacing the real one upon actor termination
|
||||
*/
|
||||
import app.deadLetterMailbox
|
||||
|
||||
/**
|
||||
* Name of this dispatcher.
|
||||
*/
|
||||
|
|
@ -119,7 +118,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDi
|
|||
}
|
||||
|
||||
protected[akka] final def dispatchTask(block: () ⇒ Unit) {
|
||||
val invocation = TaskInvocation(app, block, taskCleanup)
|
||||
val invocation = TaskInvocation(eventStream, block, taskCleanup)
|
||||
inhabitantsUpdater.incrementAndGet(this)
|
||||
try {
|
||||
executeTask(invocation)
|
||||
|
|
@ -136,7 +135,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDi
|
|||
shutdownScheduleUpdater.get(this) match {
|
||||
case UNSCHEDULED ⇒
|
||||
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
|
||||
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
()
|
||||
} else ifSensibleToDoSoThenScheduleShutdown()
|
||||
case SCHEDULED ⇒
|
||||
|
|
@ -211,7 +210,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDi
|
|||
}
|
||||
case RESCHEDULED ⇒
|
||||
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
|
||||
app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
|
||||
else run()
|
||||
}
|
||||
}
|
||||
|
|
@ -289,29 +288,31 @@ abstract class MessageDispatcher(val app: ActorSystem) extends AbstractMessageDi
|
|||
/**
|
||||
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
|
||||
*/
|
||||
abstract class MessageDispatcherConfigurator(val app: ActorSystem) {
|
||||
abstract class MessageDispatcherConfigurator() {
|
||||
/**
|
||||
* Returns an instance of MessageDispatcher given a Configuration
|
||||
*/
|
||||
def configure(config: Configuration): MessageDispatcher
|
||||
def configure(config: Configuration, settings: Settings, prerequisites: DispatcherPrerequisites): MessageDispatcher
|
||||
|
||||
def mailboxType(config: Configuration): MailboxType = {
|
||||
val capacity = config.getInt("mailbox-capacity", app.AkkaConfig.MailboxCapacity)
|
||||
def mailboxType(config: Configuration, settings: Settings): MailboxType = {
|
||||
val capacity = config.getInt("mailbox-capacity", settings.MailboxCapacity)
|
||||
if (capacity < 1) UnboundedMailbox()
|
||||
else {
|
||||
val duration = Duration(
|
||||
config.getInt("mailbox-push-timeout-time", app.AkkaConfig.MailboxPushTimeout.toMillis.toInt),
|
||||
app.AkkaConfig.DefaultTimeUnit)
|
||||
config.getInt("mailbox-push-timeout-time", settings.MailboxPushTimeout.toMillis.toInt),
|
||||
settings.DefaultTimeUnit)
|
||||
BoundedMailbox(capacity, duration)
|
||||
}
|
||||
}
|
||||
|
||||
def configureThreadPool(config: Configuration, createDispatcher: ⇒ (ThreadPoolConfig) ⇒ MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||
def configureThreadPool(config: Configuration,
|
||||
settings: Settings,
|
||||
createDispatcher: ⇒ (ThreadPoolConfig) ⇒ MessageDispatcher): ThreadPoolConfigDispatcherBuilder = {
|
||||
import ThreadPoolConfigDispatcherBuilder.conf_?
|
||||
|
||||
//Apply the following options to the config if they are present in the config
|
||||
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(app)).configure(
|
||||
conf_?(config getInt "keep-alive-time")(time ⇒ _.setKeepAliveTime(Duration(time, app.AkkaConfig.DefaultTimeUnit))),
|
||||
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
|
||||
conf_?(config getInt "keep-alive-time")(time ⇒ _.setKeepAliveTime(Duration(time, settings.DefaultTimeUnit))),
|
||||
conf_?(config getDouble "core-pool-size-factor")(factor ⇒ _.setCorePoolSizeFromFactor(factor)),
|
||||
conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)),
|
||||
conf_?(config getBool "allow-core-timeout")(allow ⇒ _.setAllowCoreThreadTimeout(allow)),
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue