remove app argument from Dispatchers

(cascading into all dispatchers, mailboxes, other stuff; had to move
deadLetter stuff to ActorSystem again and split its initialization due
to cyclic dependency)
This commit is contained in:
Roland 2011-11-14 16:03:26 +01:00
parent 79daccdce1
commit f2bf27b4de
20 changed files with 180 additions and 130 deletions

View file

@ -150,7 +150,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd
expectNoMsg(Duration.Zero)
assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)")
val actor = new TestActorRef[TestLogActor](app, Props[TestLogActor], supervisor, "none")
val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none")
val set = receiveWhile(messages = 2) {
case Logging.Debug(`supervisor`, msg: String) if msg startsWith "now supervising" 1

View file

@ -421,10 +421,10 @@ class DispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(app, "foo", app.AkkaConfig.DispatcherThroughput,
new Dispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, "foo", app.AkkaConfig.DispatcherThroughput,
app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType,
config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor,
ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor]
ThreadPoolConfig(app.eventStream)).build.asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "Dispatcher"
@ -458,10 +458,10 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(app, "foo", 1, // TODO check why 1 here? (came from old test)
new BalancingDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, "foo", 1, // TODO check why 1 here? (came from old test)
app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType,
config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor,
ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor]
ThreadPoolConfig(app.eventStream)).build.asInstanceOf[MessageDispatcherInterceptor]
def dispatcherType = "Balancing Dispatcher"

View file

@ -11,7 +11,7 @@ import org.junit.{ After, Test }
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher = new CallingThreadDispatcher(app, "test") with MessageDispatcherInterceptor
def newInterceptedDispatcher = new CallingThreadDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, "test") with MessageDispatcherInterceptor
def dispatcherType = "Calling Thread Dispatcher"
}

View file

@ -385,8 +385,13 @@ object DeadLetterActorRef {
val serialized = new SerializedDeadLetterActorRef
}
class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath, val dispatcher: MessageDispatcher) extends MinimalActorRef {
val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher)
class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath) extends MinimalActorRef {
@volatile
var brokenPromise: Future[Any] = _
private[akka] def init(dispatcher: MessageDispatcher) {
brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher)
}
override val name: String = "dead-letter"
@ -401,6 +406,8 @@ class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath, val
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = {
eventStream.publish(DeadLetter(message, this, this))
// leave this in: guard with good visibility against really stupid/weird errors
assert(brokenPromise != null)
brokenPromise
}

View file

@ -31,10 +31,6 @@ trait ActorRefProvider {
def deathWatch: DeathWatch
def deadLetters: ActorRef
def deadLetterMailbox: Mailbox
/**
* What deployer will be used to resolve deployment configuration?
*/
@ -144,19 +140,6 @@ class LocalActorRefProvider(
// currently still used for tmp actors (e.g. ask actor refs)
private val actors = new ConcurrentHashMap[String, AnyRef]
val deadLetters = new DeadLetterActorRef(eventStream, root / "nul", dispatcher)
val deadLetterMailbox = new Mailbox(null) {
becomeClosed()
override def dispatcher = null //MessageDispatcher.this
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
override def dequeue() = null
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) }
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
}
/**
* Top-level anchor for the supervision hierarchy of this actor system. Will
* receive only Supervise/ChildTerminated system messages or Failure message.

View file

@ -158,20 +158,34 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
// this provides basic logging (to stdout) until .start() is called below
val eventStream = new EventStream(DebugEventStream)
eventStream.startStdoutLogger(AkkaConfig)
val log = new BusLogging(eventStream, this)
// TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(this)
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512))
val log = new BusLogging(eventStream, this) // this used only for .getClass in tagging messages
/**
* The root actor path for this application.
*/
val root: ActorPath = new RootActorPath(this)
val deadLetters = new DeadLetterActorRef(eventStream, root / "nul")
val deadLetterMailbox = new Mailbox(null) {
becomeClosed()
override def dispatcher = null //MessageDispatcher.this
override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) }
override def dequeue() = null
override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) }
override def systemDrain(): SystemMessage = null
override def hasMessages = false
override def hasSystemMessages = false
override def numberOfMessages = 0
}
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512))
// TODO correctly pull its config from the config
val dispatcherFactory = new Dispatchers(AkkaConfig, eventStream, deadLetterMailbox, scheduler)
implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher
deadLetters.init(dispatcher)
// TODO think about memory consistency effects when doing funky stuff inside constructor
val provider: ActorRefProvider = {
val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match {
@ -191,8 +205,6 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF
def guardian: ActorRef = provider.guardian
def systemGuardian: ActorRef = provider.systemGuardian
def deathWatch: DeathWatch = provider.deathWatch
def deadLetters: ActorRef = provider.deadLetters
def deadLetterMailbox: Mailbox = provider.deadLetterMailbox
terminationFuture.onComplete(_ scheduler.stop())
terminationFuture.onComplete(_ dispatcher.shutdown())

View file

@ -13,6 +13,8 @@ import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy,
import akka.actor._
import akka.actor.ActorSystem
import scala.annotation.tailrec
import akka.event.EventStream
import akka.actor.ActorSystem.AkkaConfig
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -61,12 +63,12 @@ case class Supervise(child: ActorRef) extends SystemMessage // sent to superviso
case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsMonitoring
case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsMonitoring
final case class TaskInvocation(app: ActorSystem, function: () Unit, cleanup: () Unit) extends Runnable {
final case class TaskInvocation(eventStream: EventStream, function: () Unit, cleanup: () Unit) extends Runnable {
def run() {
try {
function()
} catch {
case e app.eventStream.publish(Error(e, this, e.getMessage))
case e eventStream.publish(Error(e, this, e.getMessage))
} finally {
cleanup()
}
@ -84,7 +86,11 @@ object MessageDispatcher {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
abstract class MessageDispatcher(
val deadLetterMailbox: Mailbox,
val eventStream: EventStream,
val scheduler: Scheduler) extends Serializable {
import MessageDispatcher._
protected val _tasks = new AtomicLong(0L)
@ -99,11 +105,6 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
*/
protected[akka] def createMailbox(actor: ActorCell): Mailbox
/**
* a blackhole mailbox for the purpose of replacing the real one upon actor termination
*/
import app.deadLetterMailbox
/**
* Name of this dispatcher.
*/
@ -133,7 +134,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
shutdownSchedule = RESCHEDULED
case RESCHEDULED //Already marked for reschedule
@ -154,7 +155,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
_tasks.getAndIncrement()
try {
startIfUnstarted()
executeTask(TaskInvocation(app, block, taskCleanup))
executeTask(TaskInvocation(eventStream, block, taskCleanup))
} catch {
case e
_tasks.decrementAndGet
@ -170,7 +171,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
shutdownSchedule match {
case UNSCHEDULED
shutdownSchedule = SCHEDULED
app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
shutdownSchedule = RESCHEDULED
case RESCHEDULED //Already marked for reschedule
@ -234,7 +235,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
shutdownSchedule match {
case RESCHEDULED
shutdownSchedule = SCHEDULED
app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
case SCHEDULED
if (_tasks.get == 0) {
active switchOff {
@ -329,19 +330,19 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable {
/**
* Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig
*/
abstract class MessageDispatcherConfigurator(val app: ActorSystem) {
abstract class MessageDispatcherConfigurator(val AkkaConfig: AkkaConfig, val eventStream: EventStream) {
/**
* Returns an instance of MessageDispatcher given a Configuration
*/
def configure(config: Configuration): MessageDispatcher
def mailboxType(config: Configuration): MailboxType = {
val capacity = config.getInt("mailbox-capacity", app.AkkaConfig.MailboxCapacity)
val capacity = config.getInt("mailbox-capacity", AkkaConfig.MailboxCapacity)
if (capacity < 1) UnboundedMailbox()
else {
val duration = Duration(
config.getInt("mailbox-push-timeout-time", app.AkkaConfig.MailboxPushTimeout.toMillis.toInt),
app.AkkaConfig.DefaultTimeUnit)
config.getInt("mailbox-push-timeout-time", AkkaConfig.MailboxPushTimeout.toMillis.toInt),
AkkaConfig.DefaultTimeUnit)
BoundedMailbox(capacity, duration)
}
}
@ -350,8 +351,8 @@ abstract class MessageDispatcherConfigurator(val app: ActorSystem) {
import ThreadPoolConfigDispatcherBuilder.conf_?
//Apply the following options to the config if they are present in the config
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(app)).configure(
conf_?(config getInt "keep-alive-time")(time _.setKeepAliveTime(Duration(time, app.AkkaConfig.DefaultTimeUnit))),
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(eventStream)).configure(
conf_?(config getInt "keep-alive-time")(time _.setKeepAliveTime(Duration(time, AkkaConfig.DefaultTimeUnit))),
conf_?(config getDouble "core-pool-size-factor")(factor _.setCorePoolSizeFromFactor(factor)),
conf_?(config getDouble "max-pool-size-factor")(factor _.setMaxPoolSizeFromFactor(factor)),
conf_?(config getInt "executor-bounds")(bounds _.setExecutorBounds(bounds)),

View file

@ -10,6 +10,8 @@ import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, Concur
import java.util.{ Comparator, Queue }
import annotation.tailrec
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
/**
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
@ -28,16 +30,16 @@ import akka.actor.ActorSystem
* @author Viktor Klang
*/
class BalancingDispatcher(
_app: ActorSystem,
_deadLetterMailbox: Mailbox,
_eventStream: EventStream,
_scheduler: Scheduler,
_name: String,
throughput: Int,
throughputDeadlineTime: Int,
mailboxType: MailboxType,
config: ThreadPoolConfig,
_timeoutMs: Long)
extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
import app.deadLetterMailbox
extends Dispatcher(_deadLetterMailbox, _eventStream, _scheduler, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)

View file

@ -9,6 +9,8 @@ import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue }
import akka.actor.{ ActorCell, ActorKilledException }
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
/**
* Default settings are:
@ -64,14 +66,16 @@ import akka.actor.ActorSystem
* Larger values (or zero or negative) increase throughput, smaller values increase fairness
*/
class Dispatcher(
_app: ActorSystem,
_deadLetterMailbox: Mailbox,
_eventStream: EventStream,
_scheduler: Scheduler,
val name: String,
val throughput: Int,
val throughputDeadlineTime: Int,
val mailboxType: MailboxType,
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
val timeoutMs: Long)
extends MessageDispatcher(_app) {
extends MessageDispatcher(_deadLetterMailbox, _eventStream, _scheduler) {
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
@ -93,7 +97,7 @@ class Dispatcher(
executorService.get() execute invocation
} catch {
case e: RejectedExecutionException
app.eventStream.publish(Warning(this, e.toString))
eventStream.publish(Warning(this, e.toString))
throw e
}
}
@ -120,7 +124,7 @@ class Dispatcher(
} catch {
case e: RejectedExecutionException
try {
app.eventStream.publish(Warning(this, e.toString))
eventStream.publish(Warning(this, e.toString))
} finally {
mbox.setAsIdle()
}

View file

@ -10,6 +10,9 @@ import akka.util.{ Duration, ReflectiveAccess }
import akka.config.Configuration
import java.util.concurrent.TimeUnit
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
import akka.actor.ActorSystem.AkkaConfig
/**
* Scala API. Dispatcher factory.
@ -43,15 +46,20 @@ import akka.actor.ActorSystem
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Dispatchers(val app: ActorSystem) {
val ThroughputDeadlineTimeMillis = app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt
class Dispatchers(
val AkkaConfig: ActorSystem.AkkaConfig,
val eventStream: EventStream,
val deadLetterMailbox: Mailbox,
val scheduler: Scheduler) {
val ThroughputDeadlineTimeMillis = AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt
val MailboxType: MailboxType =
if (app.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox()
else BoundedMailbox(app.AkkaConfig.MailboxCapacity, app.AkkaConfig.MailboxPushTimeout)
val DispatcherShutdownMillis = app.AkkaConfig.DispatcherDefaultShutdown.toMillis
if (AkkaConfig.MailboxCapacity < 1) UnboundedMailbox()
else BoundedMailbox(AkkaConfig.MailboxCapacity, AkkaConfig.MailboxPushTimeout)
val DispatcherShutdownMillis = AkkaConfig.DispatcherDefaultShutdown.toMillis
lazy val defaultGlobalDispatcher =
app.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build
AkkaConfig.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
@ -60,8 +68,8 @@ class Dispatchers(val app: ActorSystem) {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
case null new PinnedDispatcher(app, null, "anon", MailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.address, MailboxType, DispatcherShutdownMillis)
case null new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, "anon", MailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, some.underlying, some.address, MailboxType, DispatcherShutdownMillis)
}
/**
@ -71,8 +79,8 @@ class Dispatchers(val app: ActorSystem) {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
case null new PinnedDispatcher(app, null, "anon", mailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(app, some.underlying, some.address, mailboxType, DispatcherShutdownMillis)
case null new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, "anon", mailboxType, DispatcherShutdownMillis)
case some new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, some.underlying, some.address, mailboxType, DispatcherShutdownMillis)
}
/**
@ -81,7 +89,7 @@ class Dispatchers(val app: ActorSystem) {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(name: String, mailboxType: MailboxType) =
new PinnedDispatcher(app, null, name, mailboxType, DispatcherShutdownMillis)
new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, name, mailboxType, DispatcherShutdownMillis)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
@ -89,7 +97,7 @@ class Dispatchers(val app: ActorSystem) {
* E.g. each actor consumes its own thread.
*/
def newPinnedDispatcher(name: String) =
new PinnedDispatcher(app, null, name, MailboxType, DispatcherShutdownMillis)
new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, name, MailboxType, DispatcherShutdownMillis)
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -97,8 +105,8 @@ class Dispatchers(val app: ActorSystem) {
* Has a fluent builder interface for configuring its semantics.
*/
def newDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config new Dispatcher(app, name, app.AkkaConfig.DispatcherThroughput,
ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
ThreadPoolConfigDispatcherBuilder(config new Dispatcher(deadLetterMailbox, eventStream, scheduler, name, AkkaConfig.DispatcherThroughput,
ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -107,7 +115,7 @@ class Dispatchers(val app: ActorSystem) {
*/
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
new Dispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
@ -116,7 +124,7 @@ class Dispatchers(val app: ActorSystem) {
*/
def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(app, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
new Dispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -124,8 +132,8 @@ class Dispatchers(val app: ActorSystem) {
* Has a fluent builder interface for configuring its semantics.
*/
def newBalancingDispatcher(name: String) =
ThreadPoolConfigDispatcherBuilder(config new BalancingDispatcher(app, name, app.AkkaConfig.DispatcherThroughput,
ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
ThreadPoolConfigDispatcherBuilder(config new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, AkkaConfig.DispatcherThroughput,
ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -134,7 +142,7 @@ class Dispatchers(val app: ActorSystem) {
*/
def newBalancingDispatcher(name: String, throughput: Int) =
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -143,7 +151,7 @@ class Dispatchers(val app: ActorSystem) {
*/
def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
@ -152,13 +160,13 @@ class Dispatchers(val app: ActorSystem) {
*/
def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
ThreadPoolConfigDispatcherBuilder(config
new BalancingDispatcher(app, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app))
new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream))
/**
* Utility function that tries to load the specified dispatcher config from the akka.conf
* or else use the supplied default dispatcher
*/
def fromConfig(key: String, default: MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher =
app.config getSection key flatMap from getOrElse default
AkkaConfig.config getSection key flatMap from getOrElse default
/*
* Creates of obtains a dispatcher from a ConfigMap according to the format below
@ -185,8 +193,8 @@ class Dispatchers(val app: ActorSystem) {
*/
def from(cfg: Configuration): Option[MessageDispatcher] = {
cfg.getString("type") flatMap {
case "Dispatcher" Some(new DispatcherConfigurator(app))
case "BalancingDispatcher" Some(new BalancingDispatcherConfigurator(app))
case "Dispatcher" Some(new DispatcherConfigurator(AkkaConfig, deadLetterMailbox, eventStream, scheduler))
case "BalancingDispatcher" Some(new BalancingDispatcherConfigurator(AkkaConfig, deadLetterMailbox, eventStream, scheduler))
case "GlobalDispatcher" None //TODO FIXME remove this
case fqn
ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match {
@ -206,26 +214,26 @@ class Dispatchers(val app: ActorSystem) {
}
}
class DispatcherConfigurator(app: ActorSystem) extends MessageDispatcherConfigurator(app) {
class DispatcherConfigurator(AkkaConfig: AkkaConfig, deadLetterMailbox: Mailbox, eventStream: EventStream, scheduler: Scheduler) extends MessageDispatcherConfigurator(AkkaConfig, eventStream) {
def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig new Dispatcher(app,
configureThreadPool(config, threadPoolConfig new Dispatcher(deadLetterMailbox, eventStream, scheduler,
config.getString("name", newUuid.toString),
config.getInt("throughput", app.AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt),
config.getInt("throughput", AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt),
mailboxType(config),
threadPoolConfig,
app.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
}
}
class BalancingDispatcherConfigurator(app: ActorSystem) extends MessageDispatcherConfigurator(app) {
class BalancingDispatcherConfigurator(AkkaConfig: AkkaConfig, deadLetterMailbox: Mailbox, eventStream: EventStream, scheduler: Scheduler) extends MessageDispatcherConfigurator(AkkaConfig, eventStream) {
def configure(config: Configuration): MessageDispatcher = {
configureThreadPool(config, threadPoolConfig new BalancingDispatcher(app,
configureThreadPool(config, threadPoolConfig new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler,
config.getString("name", newUuid.toString),
config.getInt("throughput", app.AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt),
config.getInt("throughput", AkkaConfig.DispatcherThroughput),
config.getInt("throughput-deadline-time", AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt),
mailboxType(config),
threadPoolConfig,
app.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
AkkaConfig.DispatcherDefaultShutdown.toMillis)).build
}
}

View file

@ -262,7 +262,7 @@ object Future {
result completeWithResult currentValue
} catch {
case e: Exception
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.eventStream.publish(Error(e, this, e.getMessage))
result completeWithException e
} finally {
results.clear
@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] {
Right(f(res))
} catch {
case e: Exception
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.eventStream.publish(Error(e, this, e.getMessage))
Left(e)
})
}
@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] {
future.completeWith(f(r))
} catch {
case e: Exception
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.eventStream.publish(Error(e, this, e.getMessage))
future complete Left(e)
}
}
@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] {
if (p(res)) r else Left(new MatchError(res))
} catch {
case e: Exception
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.eventStream.publish(Error(e, this, e.getMessage))
Left(e)
})
}
@ -811,7 +811,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.eventStream.publish(Error(e, this, e.getMessage))
fr completeWithException e
}
}
@ -825,7 +825,7 @@ trait Promise[T] extends Future[T] {
fr completeWith cont(f)
} catch {
case e: Exception
dispatcher.app.eventStream.publish(Error(e, this, e.getMessage))
dispatcher.eventStream.publish(Error(e, this, e.getMessage))
fr completeWithException e
}
}
@ -979,12 +979,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
if (!isExpired) dispatcher.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
else func(DefaultPromise.this)
}
}
}
val timeoutFuture = dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
val timeoutFuture = dispatcher.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
onComplete(_ timeoutFuture.cancel())
false
} else true
@ -1006,18 +1006,18 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi
val runnable = new Runnable {
def run() {
if (!isCompleted) {
if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
if (!isExpired) dispatcher.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS)
else promise complete (try { Right(fallback) } catch { case e Left(e) })
}
}
}
dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
dispatcher.scheduler.scheduleOnce(runnable, timeLeft(), NANOS)
promise
}
} else this
private def notifyCompleted(func: Future[T] Unit) {
try { func(this) } catch { case e dispatcher.app.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
try { func(this) } catch { case e dispatcher.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really?
}
@inline

View file

@ -7,14 +7,23 @@ package akka.dispatch
import java.util.concurrent.atomic.AtomicReference
import akka.actor.ActorCell
import akka.actor.ActorSystem
import akka.event.EventStream
import akka.actor.Scheduler
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class PinnedDispatcher(_app: ActorSystem, _actor: ActorCell, _name: String, _mailboxType: MailboxType, _timeoutMs: Long)
extends Dispatcher(_app, _name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread(_app), _timeoutMs) {
class PinnedDispatcher(
_deadLetterMailbox: Mailbox,
_eventStream: EventStream,
_scheduler: Scheduler,
_actor: ActorCell,
_name: String,
_mailboxType: MailboxType,
_timeoutMs: Long)
extends Dispatcher(_deadLetterMailbox, _eventStream, _scheduler, _name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread(_eventStream), _timeoutMs) {
@volatile
protected[akka] var owner: ActorCell = _actor
@ -34,6 +43,6 @@ class PinnedDispatcher(_app: ActorSystem, _actor: ActorCell, _name: String, _mai
}
object PinnedDispatcher {
def oneThread(app: ActorSystem): ThreadPoolConfig = ThreadPoolConfig(app, allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
def oneThread(eventStream: EventStream): ThreadPoolConfig = ThreadPoolConfig(eventStream, allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1)
}

View file

@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy
import akka.util.Duration
import akka.event.Logging.{ Warning, Error }
import akka.actor.ActorSystem
import akka.event.EventStream
object ThreadPoolConfig {
type Bounds = Int
@ -68,7 +69,7 @@ trait ExecutorServiceFactoryProvider {
/**
* A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher
*/
case class ThreadPoolConfig(app: ActorSystem,
case class ThreadPoolConfig(eventStream: EventStream,
allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout,
corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize,
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
@ -86,7 +87,7 @@ case class ThreadPoolConfig(app: ActorSystem,
case Right(bounds)
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory)
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
new BoundedExecutorDecorator(app, service, bounds)
new BoundedExecutorDecorator(eventStream, service, bounds)
}
}
}
@ -210,7 +211,7 @@ class MonitorableThread(runnable: Runnable, name: String)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
class BoundedExecutorDecorator(val eventStream: EventStream, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate {
protected val semaphore = new Semaphore(bound)
override def execute(command: Runnable) = {
@ -227,10 +228,10 @@ class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorServi
})
} catch {
case e: RejectedExecutionException
app.eventStream.publish(Warning(this, e.toString))
eventStream.publish(Warning(this, e.toString))
semaphore.release
case e: Throwable
app.eventStream.publish(Error(e, this, e.getMessage))
eventStream.publish(Error(e, this, e.getMessage))
throw e
}
}

View file

@ -41,8 +41,6 @@ class RemoteActorRefProvider(
import akka.dispatch.Promise
val local = new LocalActorRefProvider(app, root, eventStream, dispatcher, scheduler)
def deadLetterMailbox = local.deadLetterMailbox
def deadLetters = local.deadLetters
def deathWatch = local.deathWatch
def guardian = local.guardian
def systemGuardian = local.systemGuardian

View file

@ -151,7 +151,7 @@ class Agent[T](initialValue: T, app: ActorSystem) {
def sendOff(f: T T): Unit = {
send((value: T) {
suspend()
val pinnedDispatcher = new PinnedDispatcher(app, null, "agent-send-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis)
val pinnedDispatcher = new PinnedDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, null, "agent-send-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis)
val threadBased = app.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
threadBased ! Update(f)
value
@ -169,7 +169,7 @@ class Agent[T](initialValue: T, app: ActorSystem) {
val result = new DefaultPromise[T](timeout)(app.dispatcher)
send((value: T) {
suspend()
val pinnedDispatcher = new PinnedDispatcher(app, null, "agent-alter-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis)
val pinnedDispatcher = new PinnedDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, null, "agent-alter-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis)
val threadBased = app.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
value

View file

@ -12,6 +12,8 @@ import java.lang.ref.WeakReference
import scala.annotation.tailrec
import akka.actor.{ ActorCell, ActorRef, ActorSystem }
import akka.dispatch._
import akka.actor.Scheduler
import akka.event.EventStream
/*
* Locking rules:
@ -103,7 +105,11 @@ private[testkit] object CallingThreadDispatcher {
* @author Roland Kuhn
* @since 1.1
*/
class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thread") extends MessageDispatcher(_app) {
class CallingThreadDispatcher(
_deadLetterMailbox: Mailbox,
_eventStream: EventStream,
_scheduler: Scheduler,
val name: String = "calling-thread") extends MessageDispatcher(_deadLetterMailbox, _eventStream, _scheduler) {
import CallingThreadDispatcher._
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor)
@ -213,12 +219,12 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr
true
} catch {
case ie: InterruptedException
app.eventStream.publish(Error(this, ie))
eventStream.publish(Error(this, ie))
Thread.currentThread().interrupt()
intex = ie
true
case e
app.eventStream.publish(Error(this, e))
eventStream.publish(Error(this, e))
queue.leave
false
}

View file

@ -10,6 +10,8 @@ import com.eaio.uuid.UUID
import akka.actor.Props._
import akka.actor.ActorSystem
import java.util.concurrent.atomic.AtomicLong
import akka.dispatch.Mailbox
import akka.event.EventStream
/**
* This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it
@ -19,8 +21,15 @@ import java.util.concurrent.atomic.AtomicLong
* @author Roland Kuhn
* @since 1.1
*/
class TestActorRef[T <: Actor](_app: ActorSystem, _props: Props, _supervisor: ActorRef, name: String)
extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_app)), _supervisor, _supervisor.path / name, false) {
class TestActorRef[T <: Actor](
_app: ActorSystem,
_deadLetterMailbox: Mailbox,
_eventStream: EventStream,
_scheduler: Scheduler,
_props: Props,
_supervisor: ActorRef,
name: String)
extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_deadLetterMailbox, _eventStream, _scheduler)), _supervisor, _supervisor.path / name, false) {
/**
* Directly inject messages into actor receive behavior. Any exceptions
* thrown will be available to you, while still being able to use
@ -57,7 +66,7 @@ object TestActorRef {
def apply[T <: Actor](props: Props, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, app.guardian, name)
def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit app: ActorSystem): TestActorRef[T] = {
new TestActorRef(app, props, supervisor, name)
new TestActorRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, props, supervisor, name)
}
def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](randomName)

View file

@ -8,6 +8,8 @@ import akka.actor._
import akka.util._
import com.eaio.uuid.UUID
import akka.actor.ActorSystem
import akka.dispatch.Mailbox
import akka.event.EventStream
/**
* This is a specialised form of the TestActorRef with support for querying and
@ -34,8 +36,15 @@ import akka.actor.ActorSystem
* @author Roland Kuhn
* @since 1.2
*/
class TestFSMRef[S, D, T <: Actor](app: ActorSystem, props: Props, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D])
extends TestActorRef(app, props, supervisor, name) {
class TestFSMRef[S, D, T <: Actor](
app: ActorSystem,
_deadLetterMailbox: Mailbox,
_eventStream: EventStream,
_scheduler: Scheduler,
props: Props,
supervisor: ActorRef,
name: String)(implicit ev: T <:< FSM[S, D])
extends TestActorRef(app, _deadLetterMailbox, _eventStream, _scheduler, props, supervisor, name) {
private def fsm: T = underlyingActor
@ -81,8 +90,8 @@ class TestFSMRef[S, D, T <: Actor](app: ActorSystem, props: Props, supervisor: A
object TestFSMRef {
def apply[S, D, T <: Actor](factory: T)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] =
new TestFSMRef(app, Props(creator = () factory), app.guardian, TestActorRef.randomName)
new TestFSMRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, Props(creator = () factory), app.guardian, TestActorRef.randomName)
def apply[S, D, T <: Actor](factory: T, name: String)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] =
new TestFSMRef(app, Props(creator = () factory), app.guardian, name)
new TestFSMRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, Props(creator = () factory), app.guardian, name)
}

View file

@ -91,7 +91,8 @@ class TestKit(_app: ActorSystem) {
* ActorRef of the test actor. Access is provided to enable e.g.
* registration as message target.
*/
val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)),
val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue))
.copy(dispatcher = new CallingThreadDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler)),
"testActor" + TestKit.testActorId.incrementAndGet)
private var end: Duration = Duration.Undefined

View file

@ -171,7 +171,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach {
val boss = TestActorRef(Props(new TActor {
val ref = new TestActorRef(app, Props(new TActor {
val ref = new TestActorRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, Props(new TActor {
def receiveT = { case _ }
override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 }
override def postRestart(reason: Throwable) { counter -= 1 }