2011-05-19 21:34:21 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2011-05-19 21:34:21 +02:00
|
|
|
*/
|
2011-03-06 22:45:44 +01:00
|
|
|
package akka.testkit
|
|
|
|
|
|
2011-10-27 12:23:01 +02:00
|
|
|
import akka.event.Logging.{ Warning, Error }
|
2011-03-06 22:45:44 +01:00
|
|
|
import java.util.concurrent.locks.ReentrantLock
|
|
|
|
|
import java.util.LinkedList
|
|
|
|
|
import java.util.concurrent.RejectedExecutionException
|
|
|
|
|
import akka.util.Switch
|
|
|
|
|
import java.lang.ref.WeakReference
|
|
|
|
|
import scala.annotation.tailrec
|
2011-11-12 10:57:28 +01:00
|
|
|
import akka.actor.{ ActorCell, ActorRef, ActorSystem }
|
2011-09-20 18:34:21 +02:00
|
|
|
import akka.dispatch._
|
2011-03-06 22:45:44 +01:00
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Locking rules:
|
|
|
|
|
*
|
2011-09-21 15:01:47 +02:00
|
|
|
* While not suspendSwitch, messages are processed (!isActive) or queued
|
|
|
|
|
* thread-locally (isActive). While suspendSwitch, messages are queued
|
2011-03-06 22:45:44 +01:00
|
|
|
* thread-locally. When resuming, all messages are atomically scooped from all
|
|
|
|
|
* non-active threads and queued on the resuming thread's queue, to be
|
|
|
|
|
* processed immediately. Processing a queue checks suspend before each
|
2011-09-21 15:01:47 +02:00
|
|
|
* invocation, leaving the active state if suspendSwitch. For this to work
|
2011-03-06 22:45:44 +01:00
|
|
|
* reliably, the active flag needs to be set atomically with the initial check
|
|
|
|
|
* for suspend. Scooping up messages means replacing the ThreadLocal's contents
|
|
|
|
|
* with an empty new NestingQueue.
|
|
|
|
|
*
|
2011-09-21 15:01:47 +02:00
|
|
|
* All accesses to the queue must be done under the suspendSwitch-switch's lock, so
|
2011-03-06 22:45:44 +01:00
|
|
|
* within one of its methods taking a closure argument.
|
|
|
|
|
*/
|
|
|
|
|
|
2011-10-21 18:47:44 +02:00
|
|
|
private[testkit] object CallingThreadDispatcher {
|
2011-04-16 22:20:04 +02:00
|
|
|
|
|
|
|
|
// PRIVATE DATA
|
|
|
|
|
|
2011-03-06 22:45:44 +01:00
|
|
|
private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]()
|
2011-10-21 18:47:44 +02:00
|
|
|
private var lastGC = 0l
|
2011-03-06 22:45:44 +01:00
|
|
|
|
|
|
|
|
// we have to forget about long-gone threads sometime
|
|
|
|
|
private def gc {
|
|
|
|
|
queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty)
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized {
|
2011-03-06 22:45:44 +01:00
|
|
|
if (queues contains mbox) {
|
|
|
|
|
val newSet = queues(mbox) + new WeakReference(q)
|
|
|
|
|
queues += mbox -> newSet
|
|
|
|
|
} else {
|
|
|
|
|
queues += mbox -> Set(new WeakReference(q))
|
|
|
|
|
}
|
2011-10-21 18:47:44 +02:00
|
|
|
val now = System.nanoTime
|
|
|
|
|
if (now - lastGC > 1000000000l) {
|
|
|
|
|
lastGC = now
|
|
|
|
|
gc
|
|
|
|
|
}
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* This method must be called with "own" being this thread's queue for the
|
|
|
|
|
* given mailbox. When this method returns, the queue will be entered
|
|
|
|
|
* (active).
|
|
|
|
|
*/
|
2011-10-21 18:47:44 +02:00
|
|
|
protected[akka] def gatherFromAllOtherQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized {
|
2011-03-06 22:45:44 +01:00
|
|
|
if (!own.isActive) own.enter
|
|
|
|
|
if (queues contains mbox) {
|
|
|
|
|
for {
|
2011-05-18 17:25:30 +02:00
|
|
|
ref ← queues(mbox)
|
2011-10-21 18:47:44 +02:00
|
|
|
val q = ref.get
|
|
|
|
|
if (q ne null) && (q ne own)
|
2011-03-06 22:45:44 +01:00
|
|
|
} {
|
|
|
|
|
while (q.peek ne null) {
|
2011-10-21 18:47:44 +02:00
|
|
|
// this is safe because this method is only ever called while holding the suspendSwitch monitor
|
2011-03-06 22:45:44 +01:00
|
|
|
own.push(q.pop)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Dispatcher which runs invocations on the current thread only. This
|
|
|
|
|
* dispatcher does not create any new threads, but it can be used from
|
|
|
|
|
* different threads concurrently for the same actor. The dispatch strategy is
|
2011-09-21 15:01:47 +02:00
|
|
|
* to run on the current thread unless the target actor is either suspendSwitch or
|
2011-03-06 22:45:44 +01:00
|
|
|
* already running on the current thread (if it is running on a different
|
|
|
|
|
* thread, then this thread will block until that other invocation is
|
|
|
|
|
* finished); if the invocation is not run, it is queued in a thread-local
|
|
|
|
|
* queue to be executed once the active invocation further up the call stack
|
|
|
|
|
* finishes. This leads to completely deterministic execution order if only one
|
|
|
|
|
* thread is used.
|
|
|
|
|
*
|
|
|
|
|
* Suspending and resuming are global actions for one actor, meaning they can
|
|
|
|
|
* affect different threads, which leads to complications. If messages are
|
2011-09-21 15:01:47 +02:00
|
|
|
* queued (thread-locally) during the suspendSwitch period, the only thread to run
|
2011-03-06 22:45:44 +01:00
|
|
|
* them upon resume is the thread actually calling the resume method. Hence,
|
|
|
|
|
* all thread-local queues which are not currently being drained (possible,
|
|
|
|
|
* since suspend-queue-resume might happen entirely during an invocation on a
|
|
|
|
|
* different thread) are scooped up into the current thread-local queue which
|
|
|
|
|
* is then executed. It is possible to suspend an actor from within its call
|
|
|
|
|
* stack.
|
|
|
|
|
*
|
|
|
|
|
* @author Roland Kuhn
|
|
|
|
|
* @since 1.1
|
|
|
|
|
*/
|
2011-11-10 20:08:00 +01:00
|
|
|
class CallingThreadDispatcher(_app: ActorSystem, val name: String = "calling-thread") extends MessageDispatcher(_app) {
|
2011-03-06 22:45:44 +01:00
|
|
|
import CallingThreadDispatcher._
|
|
|
|
|
|
2011-10-18 16:44:35 +02:00
|
|
|
protected[akka] override def createMailbox(actor: ActorCell) = new CallingThreadMailbox(this, actor)
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-10-26 14:18:29 +02:00
|
|
|
private def getMailbox(actor: ActorCell): Option[CallingThreadMailbox] = actor.mailbox match {
|
|
|
|
|
case m: CallingThreadMailbox ⇒ Some(m)
|
|
|
|
|
case _ ⇒ None
|
|
|
|
|
}
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] override def start() {}
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] override def shutdown() {}
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-09-21 15:01:47 +02:00
|
|
|
protected[akka] override def throughput = 0
|
|
|
|
|
protected[akka] override def throughputDeadlineTime = 0
|
2011-09-23 13:14:17 +02:00
|
|
|
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false
|
2011-09-21 15:01:47 +02:00
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] override def timeoutMs = 100L
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-09-20 15:43:57 +02:00
|
|
|
override def suspend(actor: ActorCell) {
|
2011-10-26 14:18:29 +02:00
|
|
|
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
|
2011-09-20 15:43:57 +02:00
|
|
|
override def resume(actor: ActorCell) {
|
2011-11-12 10:57:28 +01:00
|
|
|
actor.mailbox match {
|
|
|
|
|
case mbox: CallingThreadMailbox ⇒
|
|
|
|
|
val queue = mbox.queue
|
|
|
|
|
val wasActive = queue.isActive
|
|
|
|
|
val switched = mbox.suspendSwitch.switchOff {
|
|
|
|
|
gatherFromAllOtherQueues(mbox, queue)
|
|
|
|
|
}
|
|
|
|
|
if (switched && !wasActive) {
|
|
|
|
|
runQueue(mbox, queue)
|
|
|
|
|
}
|
|
|
|
|
case m ⇒ m.systemEnqueue(actor.self, Resume())
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-26 14:18:29 +02:00
|
|
|
override def mailboxSize(actor: ActorCell) = getMailbox(actor) map (_.queue.size) getOrElse 0
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-10-26 14:18:29 +02:00
|
|
|
override def mailboxIsEmpty(actor: ActorCell): Boolean = getMailbox(actor) map (_.queue.isEmpty) getOrElse true
|
2011-06-07 13:23:24 -05:00
|
|
|
|
2011-10-18 16:44:35 +02:00
|
|
|
protected[akka] override def systemDispatch(receiver: ActorCell, message: SystemMessage) {
|
2011-11-12 10:57:28 +01:00
|
|
|
receiver.mailbox match {
|
|
|
|
|
case mbox: CallingThreadMailbox ⇒
|
|
|
|
|
mbox.systemEnqueue(receiver.self, message)
|
|
|
|
|
val queue = mbox.queue
|
|
|
|
|
if (!queue.isActive) {
|
|
|
|
|
queue.enter
|
|
|
|
|
runQueue(mbox, queue)
|
|
|
|
|
}
|
|
|
|
|
case m ⇒ m.systemEnqueue(receiver.self, message)
|
2011-09-21 08:25:08 +02:00
|
|
|
}
|
2011-09-20 18:34:21 +02:00
|
|
|
}
|
|
|
|
|
|
2011-10-19 13:19:44 +02:00
|
|
|
protected[akka] override def dispatch(receiver: ActorCell, handle: Envelope) {
|
2011-11-12 10:57:28 +01:00
|
|
|
receiver.mailbox match {
|
|
|
|
|
case mbox: CallingThreadMailbox ⇒
|
|
|
|
|
val queue = mbox.queue
|
|
|
|
|
val execute = mbox.suspendSwitch.fold {
|
|
|
|
|
queue.push(handle)
|
|
|
|
|
false
|
|
|
|
|
} {
|
|
|
|
|
queue.push(handle)
|
|
|
|
|
if (queue.isActive)
|
|
|
|
|
false
|
|
|
|
|
else {
|
|
|
|
|
queue.enter
|
|
|
|
|
true
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
if (execute) runQueue(mbox, queue)
|
|
|
|
|
case m ⇒ m.enqueue(receiver.self, handle)
|
2011-05-18 17:25:30 +02:00
|
|
|
}
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
|
2011-08-30 15:50:52 +02:00
|
|
|
protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run }
|
2011-03-17 22:18:39 +01:00
|
|
|
|
2011-03-06 22:45:44 +01:00
|
|
|
/*
|
|
|
|
|
* This method must be called with this thread's queue, which must already
|
|
|
|
|
* have been entered (active). When this method returns, the queue will be
|
|
|
|
|
* inactive.
|
|
|
|
|
*
|
|
|
|
|
* If the catch block is executed, then a non-empty mailbox may be stalled as
|
|
|
|
|
* there is no-one who cares to execute it before the next message is sent or
|
2011-09-21 15:01:47 +02:00
|
|
|
* it is suspendSwitch and resumed.
|
2011-03-06 22:45:44 +01:00
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
@tailrec
|
2011-10-25 15:07:20 +02:00
|
|
|
private def runQueue(mbox: CallingThreadMailbox, queue: NestingQueue, interruptedex: InterruptedException = null) {
|
|
|
|
|
var intex = interruptedex;
|
2011-03-06 22:45:44 +01:00
|
|
|
assert(queue.isActive)
|
|
|
|
|
mbox.lock.lock
|
|
|
|
|
val recurse = try {
|
2011-10-21 18:47:44 +02:00
|
|
|
mbox.processAllSystemMessages()
|
2011-09-21 15:01:47 +02:00
|
|
|
val handle = mbox.suspendSwitch.fold[Envelope] {
|
2011-05-18 17:25:30 +02:00
|
|
|
queue.leave
|
|
|
|
|
null
|
|
|
|
|
} {
|
|
|
|
|
val ret = queue.pop
|
|
|
|
|
if (ret eq null) queue.leave
|
|
|
|
|
ret
|
|
|
|
|
}
|
2011-03-06 22:45:44 +01:00
|
|
|
if (handle ne null) {
|
|
|
|
|
try {
|
2011-11-12 22:37:12 +01:00
|
|
|
if (Mailbox.debug) println(mbox.actor.self + " processing message " + handle)
|
2011-10-19 13:19:44 +02:00
|
|
|
mbox.actor.invoke(handle)
|
2011-06-13 22:36:46 +02:00
|
|
|
true
|
2011-03-06 22:45:44 +01:00
|
|
|
} catch {
|
2011-10-21 18:47:44 +02:00
|
|
|
case ie: InterruptedException ⇒
|
2011-11-10 20:48:50 +01:00
|
|
|
app.eventStream.publish(Error(this, ie))
|
2011-10-21 18:47:44 +02:00
|
|
|
Thread.currentThread().interrupt()
|
2011-10-25 15:07:20 +02:00
|
|
|
intex = ie
|
2011-10-21 18:47:44 +02:00
|
|
|
true
|
2011-06-13 22:36:46 +02:00
|
|
|
case e ⇒
|
2011-11-10 20:48:50 +01:00
|
|
|
app.eventStream.publish(Error(this, e))
|
2011-06-13 22:36:46 +02:00
|
|
|
queue.leave
|
|
|
|
|
false
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
} else if (queue.isActive) {
|
|
|
|
|
queue.leave
|
|
|
|
|
false
|
|
|
|
|
} else false
|
2011-10-21 18:47:44 +02:00
|
|
|
} catch {
|
|
|
|
|
case e ⇒ queue.leave; throw e
|
2011-03-06 22:45:44 +01:00
|
|
|
} finally {
|
|
|
|
|
mbox.lock.unlock
|
|
|
|
|
}
|
|
|
|
|
if (recurse) {
|
2011-10-25 15:07:20 +02:00
|
|
|
runQueue(mbox, queue, intex)
|
|
|
|
|
} else {
|
|
|
|
|
if (intex ne null) {
|
|
|
|
|
Thread.interrupted // clear flag
|
|
|
|
|
throw intex
|
|
|
|
|
}
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
class NestingQueue {
|
2011-09-21 15:01:47 +02:00
|
|
|
private var q = new LinkedList[Envelope]()
|
2011-03-06 22:45:44 +01:00
|
|
|
def size = q.size
|
2011-06-07 13:23:24 -05:00
|
|
|
def isEmpty = q.isEmpty
|
2011-09-21 15:01:47 +02:00
|
|
|
def push(handle: Envelope) { q.offer(handle) }
|
2011-03-06 22:45:44 +01:00
|
|
|
def peek = q.peek
|
|
|
|
|
def pop = q.poll
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@volatile
|
|
|
|
|
private var active = false
|
2011-04-08 14:43:15 +02:00
|
|
|
def enter { if (active) sys.error("already active") else active = true }
|
|
|
|
|
def leave { if (!active) sys.error("not active") else active = false }
|
2011-03-06 22:45:44 +01:00
|
|
|
def isActive = active
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-18 16:44:35 +02:00
|
|
|
class CallingThreadMailbox(val dispatcher: MessageDispatcher, _receiver: ActorCell) extends Mailbox(_receiver) with DefaultSystemMessageQueue {
|
2011-03-06 22:45:44 +01:00
|
|
|
|
|
|
|
|
private val q = new ThreadLocal[NestingQueue]() {
|
2011-10-21 18:47:44 +02:00
|
|
|
override def initialValue = {
|
|
|
|
|
val queue = new NestingQueue
|
|
|
|
|
CallingThreadDispatcher.registerQueue(CallingThreadMailbox.this, queue)
|
|
|
|
|
queue
|
|
|
|
|
}
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def queue = q.get
|
|
|
|
|
|
|
|
|
|
val lock = new ReentrantLock
|
2011-09-21 15:01:47 +02:00
|
|
|
val suspendSwitch = new Switch
|
2011-03-06 22:45:44 +01:00
|
|
|
|
2011-11-12 10:57:28 +01:00
|
|
|
override def enqueue(receiver: ActorRef, msg: Envelope) {}
|
2011-09-21 15:01:47 +02:00
|
|
|
override def dequeue() = null
|
2011-09-21 16:27:31 +02:00
|
|
|
override def hasMessages = true
|
|
|
|
|
override def numberOfMessages = 0
|
2011-03-06 22:45:44 +01:00
|
|
|
}
|