pekko/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala

314 lines
10 KiB
Scala
Raw Normal View History

2011-05-19 21:34:21 +02:00
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
2011-05-19 21:34:21 +02:00
*/
package akka.testkit
import akka.event.Logging.{ Warning, Error }
import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList
import java.util.concurrent.RejectedExecutionException
import akka.util.Switch
import java.lang.ref.WeakReference
import scala.annotation.tailrec
import akka.actor.{ ActorCell, ActorRef, ActorSystem }
import akka.dispatch._
import akka.actor.Scheduler
import akka.event.EventStream
import akka.util.Duration
2011-12-02 10:32:17 +01:00
import akka.util.duration._
import java.util.concurrent.TimeUnit
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.actor.ActorSystemImpl
import akka.actor.Extension
import com.typesafe.config.Config
/*
* Locking rules:
*
* While not suspendSwitch, messages are processed (!isActive) or queued
* thread-locally (isActive). While suspendSwitch, messages are queued
* thread-locally. When resuming, all messages are atomically scooped from all
* non-active threads and queued on the resuming thread's queue, to be
* processed immediately. Processing a queue checks suspend before each
* invocation, leaving the active state if suspendSwitch. For this to work
* reliably, the active flag needs to be set atomically with the initial check
* for suspend. Scooping up messages means replacing the ThreadLocal's contents
* with an empty new NestingQueue.
*
* All accesses to the queue must be done under the suspendSwitch-switch's lock, so
* within one of its methods taking a closure argument.
*/
private[testkit] object CallingThreadDispatcherQueues extends ExtensionId[CallingThreadDispatcherQueues] with ExtensionIdProvider {
override def lookup = CallingThreadDispatcherQueues
override def createExtension(system: ActorSystemImpl): CallingThreadDispatcherQueues = new CallingThreadDispatcherQueues
}
private[testkit] class CallingThreadDispatcherQueues extends Extension {
// PRIVATE DATA
private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]()
private var lastGC = 0l
// we have to forget about long-gone threads sometime
private def gc {
queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty)
}
protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized {
if (queues contains mbox) {
val newSet = queues(mbox) + new WeakReference(q)
queues += mbox -> newSet
} else {
queues += mbox -> Set(new WeakReference(q))
}
val now = System.nanoTime
if (now - lastGC > 1000000000l) {
lastGC = now
gc
}
}
/*
* This method must be called with "own" being this thread's queue for the
* given mailbox. When this method returns, the queue will be entered
* (active).
*/
protected[akka] def gatherFromAllOtherQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized {
if (!own.isActive) own.enter
if (queues contains mbox) {
for {
ref queues(mbox)
val q = ref.get
if (q ne null) && (q ne own)
} {
while (q.peek ne null) {
// this is safe because this method is only ever called while holding the suspendSwitch monitor
own.push(q.pop)
}
}
}
}
}
object CallingThreadDispatcher {
val Id = "akka.test.calling-thread-dispatcher"
}
/**
* Dispatcher which runs invocations on the current thread only. This
* dispatcher does not create any new threads, but it can be used from
* different threads concurrently for the same actor. The dispatch strategy is
* to run on the current thread unless the target actor is either suspendSwitch or
* already running on the current thread (if it is running on a different
* thread, then this thread will block until that other invocation is
* finished); if the invocation is not run, it is queued in a thread-local
* queue to be executed once the active invocation further up the call stack
* finishes. This leads to completely deterministic execution order if only one
* thread is used.
*
* Suspending and resuming are global actions for one actor, meaning they can
* affect different threads, which leads to complications. If messages are
* queued (thread-locally) during the suspendSwitch period, the only thread to run
* them upon resume is the thread actually calling the resume method. Hence,
* all thread-local queues which are not currently being drained (possible,
* since suspend-queue-resume might happen entirely during an invocation on a
* different thread) are scooped up into the current thread-local queue which
* is then executed. It is possible to suspend an actor from within its call
* stack.
*
* @author Roland Kuhn
* @since 1.1
*/
class CallingThreadDispatcher(
_prerequisites: DispatcherPrerequisites,
val name: String = "calling-thread") extends MessageDispatcher(_prerequisites) {
import CallingThreadDispatcher._
val log = akka.event.Logging(prerequisites.eventStream, "CallingThreadDispatcher")
override def id: String = Id
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(actor)
protected[akka] override def shutdown() {}
protected[akka] override def throughput = 0
protected[akka] override def throughputDeadlineTime = Duration.Zero
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false
2011-12-02 10:32:17 +01:00
protected[akka] override def shutdownTimeout = 1 second
2011-09-20 15:43:57 +02:00
override def suspend(actor: ActorCell) {
actor.mailbox match {
case m: CallingThreadMailbox m.suspendSwitch.switchOn
case m m.systemEnqueue(actor.self, Suspend())
}
}
2011-09-20 15:43:57 +02:00
override def resume(actor: ActorCell) {
actor.mailbox match {
case mbox: CallingThreadMailbox
val queue = mbox.queue
val wasActive = queue.isActive
val switched = mbox.suspendSwitch.switchOff {
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(mbox, queue)
}
if (switched && !wasActive) {
runQueue(mbox, queue)
}
case m m.systemEnqueue(actor.self, Resume())
}
}
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
receiver.mailbox match {
case mbox: CallingThreadMailbox
mbox.systemEnqueue(receiver.self, message)
val queue = mbox.queue
if (!queue.isActive) {
queue.enter
runQueue(mbox, queue)
}
case m m.systemEnqueue(receiver.self, message)
}
}
protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
receiver.mailbox match {
case mbox: CallingThreadMailbox
val queue = mbox.queue
val execute = mbox.suspendSwitch.fold {
queue.push(handle)
false
} {
queue.push(handle)
if (!queue.isActive) {
queue.enter
true
} else false
}
if (execute) runQueue(mbox, queue)
case m m.enqueue(receiver.self, handle)
}
}
protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run }
/*
* This method must be called with this thread's queue, which must already
* have been entered (active). When this method returns, the queue will be
* inactive.
*
* If the catch block is executed, then a non-empty mailbox may be stalled as
* there is no-one who cares to execute it before the next message is sent or
* it is suspendSwitch and resumed.
*/
@tailrec
private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue, interruptedex: InterruptedException = null) {
var intex = interruptedex;
assert(queue.isActive)
mbox.ctdLock.lock
val recurse = try {
mbox.processAllSystemMessages()
val handle = mbox.suspendSwitch.fold[Envelope] {
queue.leave
null
} {
val ret = if (mbox.isClosed) null else queue.pop
if (ret eq null) queue.leave
ret
}
if (handle ne null) {
try {
if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
mbox.actor.invoke(handle)
true
} catch {
case ie: InterruptedException
log.error(ie, "Interrupted during message processing")
Thread.currentThread().interrupt()
intex = ie
true
case e
log.error(e, "Error during message processing")
queue.leave
false
}
} else if (queue.isActive) {
queue.leave
false
} else false
} catch {
case e queue.leave; throw e
} finally {
mbox.ctdLock.unlock
}
if (recurse) {
runQueue(mbox, queue, intex)
} else {
if (intex ne null) {
Thread.interrupted // clear flag
throw intex
}
}
}
}
class CallingThreadDispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
extends MessageDispatcherConfigurator(config, prerequisites) {
private val instance = new CallingThreadDispatcher(prerequisites)
override def dispatcher(): MessageDispatcher = instance
}
class NestingQueue {
private var q = new LinkedList[Envelope]()
def size = q.size
def isEmpty = q.isEmpty
def push(handle: Envelope) { q.offer(handle) }
def peek = q.peek
def pop = q.poll
@volatile
private var active = false
2011-04-08 14:43:15 +02:00
def enter { if (active) sys.error("already active") else active = true }
def leave { if (!active) sys.error("not active") else active = false }
def isActive = active
}
class CallingThreadMailbox(_receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = {
val queue = new NestingQueue
CallingThreadDispatcherQueues(actor.system).registerQueue(CallingThreadMailbox.this, queue)
queue
}
}
def queue = q.get
val ctdLock = new ReentrantLock
val suspendSwitch = new Switch
override def enqueue(receiver: ActorRef, msg: Envelope) {}
override def dequeue() = null
override def hasMessages = queue.isEmpty
override def numberOfMessages = queue.size
override def cleanUp(): Unit = {
/*
* This is called from dispatcher.unregister, i.e. under this.lock. If
* another thread obtained a reference to this mailbox and enqueues after
* the gather operation, tough luck: no guaranteed delivery to deadLetters.
*/
suspendSwitch.locked {
CallingThreadDispatcherQueues(actor.system).gatherFromAllOtherQueues(this, queue)
super.cleanUp()
}
}
}