diff --git a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala index 10e9235c89..b1f23e60e9 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/LoggingReceiveSpec.scala @@ -150,7 +150,7 @@ class LoggingReceiveSpec extends WordSpec with BeforeAndAfterEach with BeforeAnd expectNoMsg(Duration.Zero) assert(supervisorSet == Set(1, 2), supervisorSet + " was not Set(1, 2)") - val actor = new TestActorRef[TestLogActor](app, Props[TestLogActor], supervisor, "none") + val actor = TestActorRef[TestLogActor](Props[TestLogActor], supervisor, "none") val set = receiveWhile(messages = 2) { case Logging.Debug(`supervisor`, msg: String) if msg startsWith "now supervising" ⇒ 1 diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 8d1230a3df..6f50112362 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -421,10 +421,10 @@ class DispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(app, "foo", app.AkkaConfig.DispatcherThroughput, + new Dispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, "foo", app.AkkaConfig.DispatcherThroughput, app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType, config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor, - ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor] + ThreadPoolConfig(app.eventStream)).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Dispatcher" @@ -458,10 +458,10 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(app, "foo", 1, // TODO check why 1 here? (came from old test) + new BalancingDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, "foo", 1, // TODO check why 1 here? (came from old test) app.dispatcherFactory.ThroughputDeadlineTimeMillis, app.dispatcherFactory.MailboxType, config, app.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor, - ThreadPoolConfig(app)).build.asInstanceOf[MessageDispatcherInterceptor] + ThreadPoolConfig(app.eventStream)).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Balancing Dispatcher" diff --git a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index bd3816417e..e82d6d9dcb 100644 --- a/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -11,7 +11,7 @@ import org.junit.{ After, Test } class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ - def newInterceptedDispatcher = new CallingThreadDispatcher(app, "test") with MessageDispatcherInterceptor + def newInterceptedDispatcher = new CallingThreadDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, "test") with MessageDispatcherInterceptor def dispatcherType = "Calling Thread Dispatcher" } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index e6d470111e..20feee1e02 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -385,8 +385,13 @@ object DeadLetterActorRef { val serialized = new SerializedDeadLetterActorRef } -class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath, val dispatcher: MessageDispatcher) extends MinimalActorRef { - val brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher) +class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath) extends MinimalActorRef { + @volatile + var brokenPromise: Future[Any] = _ + + private[akka] def init(dispatcher: MessageDispatcher) { + brokenPromise = new KeptPromise[Any](Left(new ActorKilledException("In DeadLetterActorRef, promises are always broken.")))(dispatcher) + } override val name: String = "dead-letter" @@ -401,6 +406,8 @@ class DeadLetterActorRef(val eventStream: EventStream, val path: ActorPath, val override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = { eventStream.publish(DeadLetter(message, this, this)) + // leave this in: guard with good visibility against really stupid/weird errors + assert(brokenPromise != null) brokenPromise } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ae26fda28d..95db57ce78 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -31,10 +31,6 @@ trait ActorRefProvider { def deathWatch: DeathWatch - def deadLetters: ActorRef - - def deadLetterMailbox: Mailbox - /** * What deployer will be used to resolve deployment configuration? */ @@ -144,19 +140,6 @@ class LocalActorRefProvider( // currently still used for tmp actors (e.g. ask actor refs) private val actors = new ConcurrentHashMap[String, AnyRef] - val deadLetters = new DeadLetterActorRef(eventStream, root / "nul", dispatcher) - val deadLetterMailbox = new Mailbox(null) { - becomeClosed() - override def dispatcher = null //MessageDispatcher.this - override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } - override def dequeue() = null - override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } - override def systemDrain(): SystemMessage = null - override def hasMessages = false - override def hasSystemMessages = false - override def numberOfMessages = 0 - } - /** * Top-level anchor for the supervision hierarchy of this actor system. Will * receive only Supervise/ChildTerminated system messages or Failure message. diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index f0fd0f5854..43dfa99837 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -158,20 +158,34 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF // this provides basic logging (to stdout) until .start() is called below val eventStream = new EventStream(DebugEventStream) eventStream.startStdoutLogger(AkkaConfig) - val log = new BusLogging(eventStream, this) - - // TODO correctly pull its config from the config - val dispatcherFactory = new Dispatchers(this) - - implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher - - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512)) + val log = new BusLogging(eventStream, this) // “this” used only for .getClass in tagging messages /** * The root actor path for this application. */ val root: ActorPath = new RootActorPath(this) + val deadLetters = new DeadLetterActorRef(eventStream, root / "nul") + val deadLetterMailbox = new Mailbox(null) { + becomeClosed() + override def dispatcher = null //MessageDispatcher.this + override def enqueue(receiver: ActorRef, envelope: Envelope) { deadLetters ! DeadLetter(envelope.message, envelope.sender, receiver) } + override def dequeue() = null + override def systemEnqueue(receiver: ActorRef, handle: SystemMessage) { deadLetters ! DeadLetter(handle, receiver, receiver) } + override def systemDrain(): SystemMessage = null + override def hasMessages = false + override def hasSystemMessages = false + override def numberOfMessages = 0 + } + + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512)) + + // TODO correctly pull its config from the config + val dispatcherFactory = new Dispatchers(AkkaConfig, eventStream, deadLetterMailbox, scheduler) + implicit val dispatcher = dispatcherFactory.defaultGlobalDispatcher + + deadLetters.init(dispatcher) + // TODO think about memory consistency effects when doing funky stuff inside constructor val provider: ActorRefProvider = { val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { @@ -191,8 +205,6 @@ class ActorSystem(val name: String, val config: Configuration) extends ActorRefF def guardian: ActorRef = provider.guardian def systemGuardian: ActorRef = provider.systemGuardian def deathWatch: DeathWatch = provider.deathWatch - def deadLetters: ActorRef = provider.deadLetters - def deadLetterMailbox: Mailbox = provider.deadLetterMailbox terminationFuture.onComplete(_ ⇒ scheduler.stop()) terminationFuture.onComplete(_ ⇒ dispatcher.shutdown()) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index d7945f3409..4df4eeac39 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -13,6 +13,8 @@ import java.util.concurrent.ThreadPoolExecutor.{ AbortPolicy, CallerRunsPolicy, import akka.actor._ import akka.actor.ActorSystem import scala.annotation.tailrec +import akka.event.EventStream +import akka.actor.ActorSystem.AkkaConfig /** * @author Jonas Bonér @@ -61,12 +63,12 @@ case class Supervise(child: ActorRef) extends SystemMessage // sent to superviso case class Link(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.startsMonitoring case class Unlink(subject: ActorRef) extends SystemMessage // sent to self from ActorCell.stopsMonitoring -final case class TaskInvocation(app: ActorSystem, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { +final case class TaskInvocation(eventStream: EventStream, function: () ⇒ Unit, cleanup: () ⇒ Unit) extends Runnable { def run() { try { function() } catch { - case e ⇒ app.eventStream.publish(Error(e, this, e.getMessage)) + case e ⇒ eventStream.publish(Error(e, this, e.getMessage)) } finally { cleanup() } @@ -84,7 +86,11 @@ object MessageDispatcher { /** * @author Jonas Bonér */ -abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { +abstract class MessageDispatcher( + val deadLetterMailbox: Mailbox, + val eventStream: EventStream, + val scheduler: Scheduler) extends Serializable { + import MessageDispatcher._ protected val _tasks = new AtomicLong(0L) @@ -99,11 +105,6 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { */ protected[akka] def createMailbox(actor: ActorCell): Mailbox - /** - * a blackhole mailbox for the purpose of replacing the real one upon actor termination - */ - import app.deadLetterMailbox - /** * Name of this dispatcher. */ @@ -133,7 +134,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { shutdownSchedule match { case UNSCHEDULED ⇒ shutdownSchedule = SCHEDULED - app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ shutdownSchedule = RESCHEDULED case RESCHEDULED ⇒ //Already marked for reschedule @@ -154,7 +155,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { _tasks.getAndIncrement() try { startIfUnstarted() - executeTask(TaskInvocation(app, block, taskCleanup)) + executeTask(TaskInvocation(eventStream, block, taskCleanup)) } catch { case e ⇒ _tasks.decrementAndGet @@ -170,7 +171,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { shutdownSchedule match { case UNSCHEDULED ⇒ shutdownSchedule = SCHEDULED - app.scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ shutdownSchedule = RESCHEDULED case RESCHEDULED ⇒ //Already marked for reschedule @@ -234,7 +235,7 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { shutdownSchedule match { case RESCHEDULED ⇒ shutdownSchedule = SCHEDULED - app.scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) case SCHEDULED ⇒ if (_tasks.get == 0) { active switchOff { @@ -329,19 +330,19 @@ abstract class MessageDispatcher(val app: ActorSystem) extends Serializable { /** * Trait to be used for hooking in new dispatchers into Dispatchers.fromConfig */ -abstract class MessageDispatcherConfigurator(val app: ActorSystem) { +abstract class MessageDispatcherConfigurator(val AkkaConfig: AkkaConfig, val eventStream: EventStream) { /** * Returns an instance of MessageDispatcher given a Configuration */ def configure(config: Configuration): MessageDispatcher def mailboxType(config: Configuration): MailboxType = { - val capacity = config.getInt("mailbox-capacity", app.AkkaConfig.MailboxCapacity) + val capacity = config.getInt("mailbox-capacity", AkkaConfig.MailboxCapacity) if (capacity < 1) UnboundedMailbox() else { val duration = Duration( - config.getInt("mailbox-push-timeout-time", app.AkkaConfig.MailboxPushTimeout.toMillis.toInt), - app.AkkaConfig.DefaultTimeUnit) + config.getInt("mailbox-push-timeout-time", AkkaConfig.MailboxPushTimeout.toMillis.toInt), + AkkaConfig.DefaultTimeUnit) BoundedMailbox(capacity, duration) } } @@ -350,8 +351,8 @@ abstract class MessageDispatcherConfigurator(val app: ActorSystem) { import ThreadPoolConfigDispatcherBuilder.conf_? //Apply the following options to the config if they are present in the config - ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(app)).configure( - conf_?(config getInt "keep-alive-time")(time ⇒ _.setKeepAliveTime(Duration(time, app.AkkaConfig.DefaultTimeUnit))), + ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(eventStream)).configure( + conf_?(config getInt "keep-alive-time")(time ⇒ _.setKeepAliveTime(Duration(time, AkkaConfig.DefaultTimeUnit))), conf_?(config getDouble "core-pool-size-factor")(factor ⇒ _.setCorePoolSizeFromFactor(factor)), conf_?(config getDouble "max-pool-size-factor")(factor ⇒ _.setMaxPoolSizeFromFactor(factor)), conf_?(config getInt "executor-bounds")(bounds ⇒ _.setExecutorBounds(bounds)), diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index b64b7ee513..7f614be5a8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -10,6 +10,8 @@ import java.util.concurrent.{ LinkedBlockingQueue, ConcurrentLinkedQueue, Concur import java.util.{ Comparator, Queue } import annotation.tailrec import akka.actor.ActorSystem +import akka.event.EventStream +import akka.actor.Scheduler /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -28,16 +30,16 @@ import akka.actor.ActorSystem * @author Viktor Klang */ class BalancingDispatcher( - _app: ActorSystem, + _deadLetterMailbox: Mailbox, + _eventStream: EventStream, + _scheduler: Scheduler, _name: String, throughput: Int, throughputDeadlineTime: Int, mailboxType: MailboxType, config: ThreadPoolConfig, _timeoutMs: Long) - extends Dispatcher(_app, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { - - import app.deadLetterMailbox + extends Dispatcher(_deadLetterMailbox, _eventStream, _scheduler, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { private val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index bf12a5cafc..8fa84eafb6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -9,6 +9,8 @@ import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ TimeUnit, ExecutorService, RejectedExecutionException, ConcurrentLinkedQueue } import akka.actor.{ ActorCell, ActorKilledException } import akka.actor.ActorSystem +import akka.event.EventStream +import akka.actor.Scheduler /** * Default settings are: @@ -64,14 +66,16 @@ import akka.actor.ActorSystem * Larger values (or zero or negative) increase throughput, smaller values increase fairness */ class Dispatcher( - _app: ActorSystem, + _deadLetterMailbox: Mailbox, + _eventStream: EventStream, + _scheduler: Scheduler, val name: String, val throughput: Int, val throughputDeadlineTime: Int, val mailboxType: MailboxType, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, val timeoutMs: Long) - extends MessageDispatcher(_app) { + extends MessageDispatcher(_deadLetterMailbox, _eventStream, _scheduler) { protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) @@ -93,7 +97,7 @@ class Dispatcher( executorService.get() execute invocation } catch { case e: RejectedExecutionException ⇒ - app.eventStream.publish(Warning(this, e.toString)) + eventStream.publish(Warning(this, e.toString)) throw e } } @@ -120,7 +124,7 @@ class Dispatcher( } catch { case e: RejectedExecutionException ⇒ try { - app.eventStream.publish(Warning(this, e.toString)) + eventStream.publish(Warning(this, e.toString)) } finally { mbox.setAsIdle() } diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 2797652f77..654c3b338e 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -10,6 +10,9 @@ import akka.util.{ Duration, ReflectiveAccess } import akka.config.Configuration import java.util.concurrent.TimeUnit import akka.actor.ActorSystem +import akka.event.EventStream +import akka.actor.Scheduler +import akka.actor.ActorSystem.AkkaConfig /** * Scala API. Dispatcher factory. @@ -43,15 +46,20 @@ import akka.actor.ActorSystem * * @author Jonas Bonér */ -class Dispatchers(val app: ActorSystem) { - val ThroughputDeadlineTimeMillis = app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt +class Dispatchers( + val AkkaConfig: ActorSystem.AkkaConfig, + val eventStream: EventStream, + val deadLetterMailbox: Mailbox, + val scheduler: Scheduler) { + + val ThroughputDeadlineTimeMillis = AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt val MailboxType: MailboxType = - if (app.AkkaConfig.MailboxCapacity < 1) UnboundedMailbox() - else BoundedMailbox(app.AkkaConfig.MailboxCapacity, app.AkkaConfig.MailboxPushTimeout) - val DispatcherShutdownMillis = app.AkkaConfig.DispatcherDefaultShutdown.toMillis + if (AkkaConfig.MailboxCapacity < 1) UnboundedMailbox() + else BoundedMailbox(AkkaConfig.MailboxCapacity, AkkaConfig.MailboxPushTimeout) + val DispatcherShutdownMillis = AkkaConfig.DispatcherDefaultShutdown.toMillis lazy val defaultGlobalDispatcher = - app.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build + AkkaConfig.config.getSection("akka.actor.default-dispatcher").flatMap(from) getOrElse newDispatcher("AkkaDefaultGlobalDispatcher", 1, MailboxType).build /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -60,8 +68,8 @@ class Dispatchers(val app: ActorSystem) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(actor: LocalActorRef) = actor match { - case null ⇒ new PinnedDispatcher(app, null, "anon", MailboxType, DispatcherShutdownMillis) - case some ⇒ new PinnedDispatcher(app, some.underlying, some.address, MailboxType, DispatcherShutdownMillis) + case null ⇒ new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, "anon", MailboxType, DispatcherShutdownMillis) + case some ⇒ new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, some.underlying, some.address, MailboxType, DispatcherShutdownMillis) } /** @@ -71,8 +79,8 @@ class Dispatchers(val app: ActorSystem) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match { - case null ⇒ new PinnedDispatcher(app, null, "anon", mailboxType, DispatcherShutdownMillis) - case some ⇒ new PinnedDispatcher(app, some.underlying, some.address, mailboxType, DispatcherShutdownMillis) + case null ⇒ new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, "anon", mailboxType, DispatcherShutdownMillis) + case some ⇒ new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, some.underlying, some.address, mailboxType, DispatcherShutdownMillis) } /** @@ -81,7 +89,7 @@ class Dispatchers(val app: ActorSystem) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(name: String, mailboxType: MailboxType) = - new PinnedDispatcher(app, null, name, mailboxType, DispatcherShutdownMillis) + new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, name, mailboxType, DispatcherShutdownMillis) /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -89,7 +97,7 @@ class Dispatchers(val app: ActorSystem) { * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(name: String) = - new PinnedDispatcher(app, null, name, MailboxType, DispatcherShutdownMillis) + new PinnedDispatcher(deadLetterMailbox, eventStream, scheduler, null, name, MailboxType, DispatcherShutdownMillis) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -97,8 +105,8 @@ class Dispatchers(val app: ActorSystem) { * Has a fluent builder interface for configuring its semantics. */ def newDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(app, name, app.AkkaConfig.DispatcherThroughput, - ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(deadLetterMailbox, eventStream, scheduler, name, AkkaConfig.DispatcherThroughput, + ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -107,7 +115,7 @@ class Dispatchers(val app: ActorSystem) { */ def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + new Dispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -116,7 +124,7 @@ class Dispatchers(val app: ActorSystem) { */ def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(app, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + new Dispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -124,8 +132,8 @@ class Dispatchers(val app: ActorSystem) { * Has a fluent builder interface for configuring its semantics. */ def newBalancingDispatcher(name: String) = - ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(app, name, app.AkkaConfig.DispatcherThroughput, - ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, AkkaConfig.DispatcherThroughput, + ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -134,7 +142,7 @@ class Dispatchers(val app: ActorSystem) { */ def newBalancingDispatcher(name: String, throughput: Int) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -143,7 +151,7 @@ class Dispatchers(val app: ActorSystem) { */ def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(app, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -152,13 +160,13 @@ class Dispatchers(val app: ActorSystem) { */ def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(app, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(app)) + new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig(eventStream)) /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher */ def fromConfig(key: String, default: ⇒ MessageDispatcher = defaultGlobalDispatcher): MessageDispatcher = - app.config getSection key flatMap from getOrElse default + AkkaConfig.config getSection key flatMap from getOrElse default /* * Creates of obtains a dispatcher from a ConfigMap according to the format below @@ -185,8 +193,8 @@ class Dispatchers(val app: ActorSystem) { */ def from(cfg: Configuration): Option[MessageDispatcher] = { cfg.getString("type") flatMap { - case "Dispatcher" ⇒ Some(new DispatcherConfigurator(app)) - case "BalancingDispatcher" ⇒ Some(new BalancingDispatcherConfigurator(app)) + case "Dispatcher" ⇒ Some(new DispatcherConfigurator(AkkaConfig, deadLetterMailbox, eventStream, scheduler)) + case "BalancingDispatcher" ⇒ Some(new BalancingDispatcherConfigurator(AkkaConfig, deadLetterMailbox, eventStream, scheduler)) case "GlobalDispatcher" ⇒ None //TODO FIXME remove this case fqn ⇒ ReflectiveAccess.getClassFor[MessageDispatcherConfigurator](fqn) match { @@ -206,26 +214,26 @@ class Dispatchers(val app: ActorSystem) { } } -class DispatcherConfigurator(app: ActorSystem) extends MessageDispatcherConfigurator(app) { +class DispatcherConfigurator(AkkaConfig: AkkaConfig, deadLetterMailbox: Mailbox, eventStream: EventStream, scheduler: Scheduler) extends MessageDispatcherConfigurator(AkkaConfig, eventStream) { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher(app, + configureThreadPool(config, threadPoolConfig ⇒ new Dispatcher(deadLetterMailbox, eventStream, scheduler, config.getString("name", newUuid.toString), - config.getInt("throughput", app.AkkaConfig.DispatcherThroughput), - config.getInt("throughput-deadline-time", app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), + config.getInt("throughput", AkkaConfig.DispatcherThroughput), + config.getInt("throughput-deadline-time", AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), mailboxType(config), threadPoolConfig, - app.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build + AkkaConfig.DispatcherDefaultShutdown.toMillis)).build } } -class BalancingDispatcherConfigurator(app: ActorSystem) extends MessageDispatcherConfigurator(app) { +class BalancingDispatcherConfigurator(AkkaConfig: AkkaConfig, deadLetterMailbox: Mailbox, eventStream: EventStream, scheduler: Scheduler) extends MessageDispatcherConfigurator(AkkaConfig, eventStream) { def configure(config: Configuration): MessageDispatcher = { - configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher(app, + configureThreadPool(config, threadPoolConfig ⇒ new BalancingDispatcher(deadLetterMailbox, eventStream, scheduler, config.getString("name", newUuid.toString), - config.getInt("throughput", app.AkkaConfig.DispatcherThroughput), - config.getInt("throughput-deadline-time", app.AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), + config.getInt("throughput", AkkaConfig.DispatcherThroughput), + config.getInt("throughput-deadline-time", AkkaConfig.DispatcherThroughputDeadlineTime.toMillis.toInt), mailboxType(config), threadPoolConfig, - app.AkkaConfig.DispatcherDefaultShutdown.toMillis)).build + AkkaConfig.DispatcherDefaultShutdown.toMillis)).build } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index 990f3832f1..d62543af4c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -262,7 +262,7 @@ object Future { result completeWithResult currentValue } catch { case e: Exception ⇒ - dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.eventStream.publish(Error(e, this, e.getMessage)) result completeWithException e } finally { results.clear @@ -631,7 +631,7 @@ sealed trait Future[+T] extends japi.Future[T] { Right(f(res)) } catch { case e: Exception ⇒ - dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.eventStream.publish(Error(e, this, e.getMessage)) Left(e) }) } @@ -683,7 +683,7 @@ sealed trait Future[+T] extends japi.Future[T] { future.completeWith(f(r)) } catch { case e: Exception ⇒ - dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.eventStream.publish(Error(e, this, e.getMessage)) future complete Left(e) } } @@ -716,7 +716,7 @@ sealed trait Future[+T] extends japi.Future[T] { if (p(res)) r else Left(new MatchError(res)) } catch { case e: Exception ⇒ - dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.eventStream.publish(Error(e, this, e.getMessage)) Left(e) }) } @@ -811,7 +811,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.eventStream.publish(Error(e, this, e.getMessage)) fr completeWithException e } } @@ -825,7 +825,7 @@ trait Promise[T] extends Future[T] { fr completeWith cont(f) } catch { case e: Exception ⇒ - dispatcher.app.eventStream.publish(Error(e, this, e.getMessage)) + dispatcher.eventStream.publish(Error(e, this, e.getMessage)) fr completeWithException e } } @@ -979,12 +979,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) else func(DefaultPromise.this) } } } - val timeoutFuture = dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + val timeoutFuture = dispatcher.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -1006,18 +1006,18 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.app.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) } } } - dispatcher.app.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + dispatcher.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) promise } } else this private def notifyCompleted(func: Future[T] ⇒ Unit) { - try { func(this) } catch { case e ⇒ dispatcher.app.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? + try { func(this) } catch { case e ⇒ dispatcher.eventStream.publish(Error(e, this, "Future onComplete-callback raised an exception")) } //TODO catch, everything? Really? } @inline diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index f40fe953d0..2bf36dc375 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -7,14 +7,23 @@ package akka.dispatch import java.util.concurrent.atomic.AtomicReference import akka.actor.ActorCell import akka.actor.ActorSystem +import akka.event.EventStream +import akka.actor.Scheduler /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class PinnedDispatcher(_app: ActorSystem, _actor: ActorCell, _name: String, _mailboxType: MailboxType, _timeoutMs: Long) - extends Dispatcher(_app, _name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread(_app), _timeoutMs) { +class PinnedDispatcher( + _deadLetterMailbox: Mailbox, + _eventStream: EventStream, + _scheduler: Scheduler, + _actor: ActorCell, + _name: String, + _mailboxType: MailboxType, + _timeoutMs: Long) + extends Dispatcher(_deadLetterMailbox, _eventStream, _scheduler, _name, Int.MaxValue, -1, _mailboxType, PinnedDispatcher.oneThread(_eventStream), _timeoutMs) { @volatile protected[akka] var owner: ActorCell = _actor @@ -34,6 +43,6 @@ class PinnedDispatcher(_app: ActorSystem, _actor: ActorCell, _name: String, _mai } object PinnedDispatcher { - def oneThread(app: ActorSystem): ThreadPoolConfig = ThreadPoolConfig(app, allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) + def oneThread(eventStream: EventStream): ThreadPoolConfig = ThreadPoolConfig(eventStream, allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1) } diff --git a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala index b1a9547ccf..867f913d75 100644 --- a/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala +++ b/akka-actor/src/main/scala/akka/dispatch/ThreadPoolBuilder.scala @@ -11,6 +11,7 @@ import ThreadPoolExecutor.CallerRunsPolicy import akka.util.Duration import akka.event.Logging.{ Warning, Error } import akka.actor.ActorSystem +import akka.event.EventStream object ThreadPoolConfig { type Bounds = Int @@ -68,7 +69,7 @@ trait ExecutorServiceFactoryProvider { /** * A small configuration DSL to create ThreadPoolExecutors that can be provided as an ExecutorServiceFactoryProvider to Dispatcher */ -case class ThreadPoolConfig(app: ActorSystem, +case class ThreadPoolConfig(eventStream: EventStream, allowCorePoolTimeout: Boolean = ThreadPoolConfig.defaultAllowCoreThreadTimeout, corePoolSize: Int = ThreadPoolConfig.defaultCorePoolSize, maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize, @@ -86,7 +87,7 @@ case class ThreadPoolConfig(app: ActorSystem, case Right(bounds) ⇒ val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory) service.allowCoreThreadTimeOut(allowCorePoolTimeout) - new BoundedExecutorDecorator(app, service, bounds) + new BoundedExecutorDecorator(eventStream, service, bounds) } } } @@ -210,7 +211,7 @@ class MonitorableThread(runnable: Runnable, name: String) /** * @author Jonas Bonér */ -class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { +class BoundedExecutorDecorator(val eventStream: EventStream, val executor: ExecutorService, bound: Int) extends ExecutorServiceDelegate { protected val semaphore = new Semaphore(bound) override def execute(command: Runnable) = { @@ -227,10 +228,10 @@ class BoundedExecutorDecorator(val app: ActorSystem, val executor: ExecutorServi }) } catch { case e: RejectedExecutionException ⇒ - app.eventStream.publish(Warning(this, e.toString)) + eventStream.publish(Warning(this, e.toString)) semaphore.release case e: Throwable ⇒ - app.eventStream.publish(Error(e, this, e.getMessage)) + eventStream.publish(Error(e, this, e.getMessage)) throw e } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5f0223176e..31d7b682ef 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -41,8 +41,6 @@ class RemoteActorRefProvider( import akka.dispatch.Promise val local = new LocalActorRefProvider(app, root, eventStream, dispatcher, scheduler) - def deadLetterMailbox = local.deadLetterMailbox - def deadLetters = local.deadLetters def deathWatch = local.deathWatch def guardian = local.guardian def systemGuardian = local.systemGuardian diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index 29bda7cea6..2cbef7b719 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -151,7 +151,7 @@ class Agent[T](initialValue: T, app: ActorSystem) { def sendOff(f: T ⇒ T): Unit = { send((value: T) ⇒ { suspend() - val pinnedDispatcher = new PinnedDispatcher(app, null, "agent-send-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis) + val pinnedDispatcher = new PinnedDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, null, "agent-send-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis) val threadBased = app.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) threadBased ! Update(f) value @@ -169,7 +169,7 @@ class Agent[T](initialValue: T, app: ActorSystem) { val result = new DefaultPromise[T](timeout)(app.dispatcher) send((value: T) ⇒ { suspend() - val pinnedDispatcher = new PinnedDispatcher(app, null, "agent-alter-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis) + val pinnedDispatcher = new PinnedDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler, null, "agent-alter-off", UnboundedMailbox(), app.AkkaConfig.ActorTimeoutMillis) val threadBased = app.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] value diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 58ad446728..ac2e9a2bfa 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -12,6 +12,8 @@ import java.lang.ref.WeakReference import scala.annotation.tailrec import akka.actor.{ ActorCell, ActorRef, ActorSystem } import akka.dispatch._ +import akka.actor.Scheduler +import akka.event.EventStream /* * Locking rules: @@ -103,7 +105,11 @@ private[testkit] object CallingThreadDispatcher { * @author Roland Kuhn * @since 1.1 */ -class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thread") extends MessageDispatcher(_app) { +class CallingThreadDispatcher( + _deadLetterMailbox: Mailbox, + _eventStream: EventStream, + _scheduler: Scheduler, + val name: String = "calling-thread") extends MessageDispatcher(_deadLetterMailbox, _eventStream, _scheduler) { import CallingThreadDispatcher._ protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor) @@ -213,12 +219,12 @@ class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thr true } catch { case ie: InterruptedException ⇒ - app.eventStream.publish(Error(this, ie)) + eventStream.publish(Error(this, ie)) Thread.currentThread().interrupt() intex = ie true case e ⇒ - app.eventStream.publish(Error(this, e)) + eventStream.publish(Error(this, e)) queue.leave false } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala index 8843330c03..23fdddea2d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestActorRef.scala @@ -10,6 +10,8 @@ import com.eaio.uuid.UUID import akka.actor.Props._ import akka.actor.ActorSystem import java.util.concurrent.atomic.AtomicLong +import akka.dispatch.Mailbox +import akka.event.EventStream /** * This special ActorRef is exclusively for use during unit testing in a single-threaded environment. Therefore, it @@ -19,8 +21,15 @@ import java.util.concurrent.atomic.AtomicLong * @author Roland Kuhn * @since 1.1 */ -class TestActorRef[T <: Actor](_app: ActorSystem, _props: Props, _supervisor: ActorRef, name: String) - extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_app)), _supervisor, _supervisor.path / name, false) { +class TestActorRef[T <: Actor]( + _app: ActorSystem, + _deadLetterMailbox: Mailbox, + _eventStream: EventStream, + _scheduler: Scheduler, + _props: Props, + _supervisor: ActorRef, + name: String) + extends LocalActorRef(_app, _props.withDispatcher(new CallingThreadDispatcher(_deadLetterMailbox, _eventStream, _scheduler)), _supervisor, _supervisor.path / name, false) { /** * Directly inject messages into actor receive behavior. Any exceptions * thrown will be available to you, while still being able to use @@ -57,7 +66,7 @@ object TestActorRef { def apply[T <: Actor](props: Props, name: String)(implicit app: ActorSystem): TestActorRef[T] = apply[T](props, app.guardian, name) def apply[T <: Actor](props: Props, supervisor: ActorRef, name: String)(implicit app: ActorSystem): TestActorRef[T] = { - new TestActorRef(app, props, supervisor, name) + new TestActorRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, props, supervisor, name) } def apply[T <: Actor](implicit m: Manifest[T], app: ActorSystem): TestActorRef[T] = apply[T](randomName) diff --git a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala index 87b6aa6765..3c0564331a 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestFSMRef.scala @@ -8,6 +8,8 @@ import akka.actor._ import akka.util._ import com.eaio.uuid.UUID import akka.actor.ActorSystem +import akka.dispatch.Mailbox +import akka.event.EventStream /** * This is a specialised form of the TestActorRef with support for querying and @@ -34,8 +36,15 @@ import akka.actor.ActorSystem * @author Roland Kuhn * @since 1.2 */ -class TestFSMRef[S, D, T <: Actor](app: ActorSystem, props: Props, supervisor: ActorRef, name: String)(implicit ev: T <:< FSM[S, D]) - extends TestActorRef(app, props, supervisor, name) { +class TestFSMRef[S, D, T <: Actor]( + app: ActorSystem, + _deadLetterMailbox: Mailbox, + _eventStream: EventStream, + _scheduler: Scheduler, + props: Props, + supervisor: ActorRef, + name: String)(implicit ev: T <:< FSM[S, D]) + extends TestActorRef(app, _deadLetterMailbox, _eventStream, _scheduler, props, supervisor, name) { private def fsm: T = underlyingActor @@ -81,8 +90,8 @@ class TestFSMRef[S, D, T <: Actor](app: ActorSystem, props: Props, supervisor: A object TestFSMRef { def apply[S, D, T <: Actor](factory: ⇒ T)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, TestActorRef.randomName) + new TestFSMRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, Props(creator = () ⇒ factory), app.guardian, TestActorRef.randomName) def apply[S, D, T <: Actor](factory: ⇒ T, name: String)(implicit ev: T <:< FSM[S, D], app: ActorSystem): TestFSMRef[S, D, T] = - new TestFSMRef(app, Props(creator = () ⇒ factory), app.guardian, name) + new TestFSMRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, Props(creator = () ⇒ factory), app.guardian, name) } diff --git a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala index 0815942e81..fa7c20e000 100644 --- a/akka-testkit/src/main/scala/akka/testkit/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -91,7 +91,8 @@ class TestKit(_app: ActorSystem) { * ActorRef of the test actor. Access is provided to enable e.g. * registration as message target. */ - val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue)).copy(dispatcher = new CallingThreadDispatcher(app)), + val testActor: ActorRef = app.systemActorOf(Props(new TestActor(queue)) + .copy(dispatcher = new CallingThreadDispatcher(app.deadLetterMailbox, app.eventStream, app.scheduler)), "testActor" + TestKit.testActorId.incrementAndGet) private var end: Duration = Duration.Undefined diff --git a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala index 7672074b08..4d7fb0283d 100644 --- a/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/TestActorRefSpec.scala @@ -171,7 +171,7 @@ class TestActorRefSpec extends AkkaSpec with BeforeAndAfterEach { val boss = TestActorRef(Props(new TActor { - val ref = new TestActorRef(app, Props(new TActor { + val ref = new TestActorRef(app, app.deadLetterMailbox, app.eventStream, app.scheduler, Props(new TActor { def receiveT = { case _ ⇒ } override def preRestart(reason: Throwable, msg: Option[Any]) { counter -= 1 } override def postRestart(reason: Throwable) { counter -= 1 }