diff --git a/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp b/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp new file mode 100644 index 0000000000..4e1f044a0f Binary files /dev/null and b/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp differ diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 3a96e3ae4f..8b106ac1cd 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -1,209 +1,224 @@ -package akka.testkit - -import akka.actor.ActorRef -import akka.dispatch.{MessageDispatcher, MessageInvocation} -import java.util.concurrent.locks.ReentrantLock -import java.util.LinkedList -import java.util.concurrent.RejectedExecutionException -import akka.util.Switch -import java.lang.ref.WeakReference -import scala.annotation.tailrec - -/* - * Locking rules: - * - * While not suspended, messages are processed (!isActive) or queued - * thread-locally (isActive). While suspended, messages are queued - * thread-locally. When resuming, all messages are atomically scooped from all - * non-active threads and queued on the resuming thread's queue, to be - * processed immediately. Processing a queue checks suspend before each - * invocation, leaving the active state if suspended. For this to work - * reliably, the active flag needs to be set atomically with the initial check - * for suspend. Scooping up messages means replacing the ThreadLocal's contents - * with an empty new NestingQueue. - * - * All accesses to the queue must be done under the suspended-switch's lock, so - * within one of its methods taking a closure argument. - */ - -object CallingThreadDispatcher { - private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() - - // we have to forget about long-gone threads sometime - private def gc { - queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) - } - - def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { - if (queues contains mbox) { - val newSet = queues(mbox) + new WeakReference(q) - queues += mbox -> newSet - } else { - queues += mbox -> Set(new WeakReference(q)) - } - gc - } - - /* - * This method must be called with "own" being this thread's queue for the - * given mailbox. When this method returns, the queue will be entere - * (active). - */ - def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { - if (!own.isActive) own.enter - if (queues contains mbox) { - for { - ref <- queues(mbox) - q = ref.get - if (q ne null) && !q.isActive - } { - while (q.peek ne null) { - own.push(q.pop) - } - } - } - } -} - -/** - * Dispatcher which runs invocations on the current thread only. This - * dispatcher does not create any new threads, but it can be used from - * different threads concurrently for the same actor. The dispatch strategy is - * to run on the current thread unless the target actor is either suspended or - * already running on the current thread (if it is running on a different - * thread, then this thread will block until that other invocation is - * finished); if the invocation is not run, it is queued in a thread-local - * queue to be executed once the active invocation further up the call stack - * finishes. This leads to completely deterministic execution order if only one - * thread is used. - * - * Suspending and resuming are global actions for one actor, meaning they can - * affect different threads, which leads to complications. If messages are - * queued (thread-locally) during the suspended period, the only thread to run - * them upon resume is the thread actually calling the resume method. Hence, - * all thread-local queues which are not currently being drained (possible, - * since suspend-queue-resume might happen entirely during an invocation on a - * different thread) are scooped up into the current thread-local queue which - * is then executed. It is possible to suspend an actor from within its call - * stack. - * - * @author Roland Kuhn - * @since 1.1 - */ -class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { - import CallingThreadDispatcher._ - - private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox - - private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] - - private[akka] override def start {} - - private[akka] override def shutdown {} - - private[akka] override def timeoutMs = 100L - - override def suspend(actor: ActorRef) { - getMailbox(actor).suspended.switchOn - } - - override def resume(actor: ActorRef) { - val mbox = getMailbox(actor) - val queue = mbox.queue - val wasActive = queue.isActive - val switched = mbox.suspended.switchOff { - gatherFromAllInactiveQueues(mbox, queue) - } - if (switched && !wasActive) { - runQueue(mbox, queue) - } - } - - override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size - - private[akka] override def dispatch(handle: MessageInvocation) { - val mbox = getMailbox(handle.receiver) - val queue = mbox.queue - val execute = mbox.suspended.ifElseYield { - queue.push(handle) - if (warnings && handle.senderFuture.isDefined) { - log.slf4j.warn("suspended, creating Future could deadlock; target: {}", - handle.receiver) - } - false - } { - queue.push(handle) - if (queue.isActive) { - if (warnings && handle.senderFuture.isDefined) { - log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", - handle.receiver) - } - false - } else { - queue.enter - true - } - } - if (execute) runQueue(mbox, queue) - } - - /* - * This method must be called with this thread's queue, which must already - * have been entered (active). When this method returns, the queue will be - * inactive. - * - * If the catch block is executed, then a non-empty mailbox may be stalled as - * there is no-one who cares to execute it before the next message is sent or - * it is suspended and resumed. - */ - private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { - assert(queue.isActive) - val handle = mbox.suspended.ifElseYield[MessageInvocation] { - queue.leave - null - } { - val ret = queue.pop - if (ret eq null) queue.leave - ret - } - if (handle ne null) { - try { - handle.invoke - val f = handle.senderFuture - if (warnings && f.isDefined && !f.get.isCompleted) { - log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) - } - } catch { - case _ => queue.leave - } - runQueue(mbox, queue) - log.info("runQueue") - } else if (queue.isActive) { - queue.leave - } - } -} - -class NestingQueue { - private var q = new LinkedList[MessageInvocation]() - def size = q.size - def push(handle : MessageInvocation) { q.offer(handle) } - def peek = q.peek - def pop = q.poll - - @volatile private var active = false - def enter { if (active) error("already active") else active = true } - def leave { if (!active) error("not active") else active = false } - def isActive = active -} - -class CallingThreadMailbox { - - private val q = new ThreadLocal[NestingQueue]() { - override def initialValue = new NestingQueue - } - - def queue = q.get - - val suspended = new Switch(false) -} +package akka.testkit + +import akka.actor.ActorRef +import akka.dispatch.{MessageDispatcher, MessageInvocation} +import java.util.concurrent.locks.ReentrantLock +import java.util.LinkedList +import java.util.concurrent.RejectedExecutionException +import akka.util.Switch +import java.lang.ref.WeakReference +import scala.annotation.tailrec + +/* + * Locking rules: + * + * While not suspended, messages are processed (!isActive) or queued + * thread-locally (isActive). While suspended, messages are queued + * thread-locally. When resuming, all messages are atomically scooped from all + * non-active threads and queued on the resuming thread's queue, to be + * processed immediately. Processing a queue checks suspend before each + * invocation, leaving the active state if suspended. For this to work + * reliably, the active flag needs to be set atomically with the initial check + * for suspend. Scooping up messages means replacing the ThreadLocal's contents + * with an empty new NestingQueue. + * + * All accesses to the queue must be done under the suspended-switch's lock, so + * within one of its methods taking a closure argument. + */ + +object CallingThreadDispatcher { + private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() + + // we have to forget about long-gone threads sometime + private def gc { + queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + } + + def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { + if (queues contains mbox) { + val newSet = queues(mbox) + new WeakReference(q) + queues += mbox -> newSet + } else { + queues += mbox -> Set(new WeakReference(q)) + } + gc + } + + /* + * This method must be called with "own" being this thread's queue for the + * given mailbox. When this method returns, the queue will be entered + * (active). + */ + def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { + if (!own.isActive) own.enter + if (queues contains mbox) { + for { + ref <- queues(mbox) + q = ref.get + if (q ne null) && !q.isActive + /* + * if q.isActive was false, then it cannot change to true while we are + * holding the mbox.suspende.switch's lock under which we are currently + * executing + */ + } { + while (q.peek ne null) { + own.push(q.pop) + } + } + } + } +} + +/** + * Dispatcher which runs invocations on the current thread only. This + * dispatcher does not create any new threads, but it can be used from + * different threads concurrently for the same actor. The dispatch strategy is + * to run on the current thread unless the target actor is either suspended or + * already running on the current thread (if it is running on a different + * thread, then this thread will block until that other invocation is + * finished); if the invocation is not run, it is queued in a thread-local + * queue to be executed once the active invocation further up the call stack + * finishes. This leads to completely deterministic execution order if only one + * thread is used. + * + * Suspending and resuming are global actions for one actor, meaning they can + * affect different threads, which leads to complications. If messages are + * queued (thread-locally) during the suspended period, the only thread to run + * them upon resume is the thread actually calling the resume method. Hence, + * all thread-local queues which are not currently being drained (possible, + * since suspend-queue-resume might happen entirely during an invocation on a + * different thread) are scooped up into the current thread-local queue which + * is then executed. It is possible to suspend an actor from within its call + * stack. + * + * @author Roland Kuhn + * @since 1.1 + */ +class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { + import CallingThreadDispatcher._ + + private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox + + private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] + + private[akka] override def start {} + + private[akka] override def shutdown {} + + private[akka] override def timeoutMs = 100L + + override def suspend(actor: ActorRef) { + getMailbox(actor).suspended.switchOn + } + + override def resume(actor: ActorRef) { + val mbox = getMailbox(actor) + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspended.switchOff { + gatherFromAllInactiveQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + } + + override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size + + private[akka] override def dispatch(handle: MessageInvocation) { + val mbox = getMailbox(handle.receiver) + val queue = mbox.queue + val execute = mbox.suspended.ifElseYield { + queue.push(handle) + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("suspended, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } { + queue.push(handle) + if (queue.isActive) { + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + } + + /* + * This method must be called with this thread's queue, which must already + * have been entered (active). When this method returns, the queue will be + * inactive. + * + * If the catch block is executed, then a non-empty mailbox may be stalled as + * there is no-one who cares to execute it before the next message is sent or + * it is suspended and resumed. + */ + @tailrec private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { + assert(queue.isActive) + mbox.lock.lock + val recurse = try { + val handle = mbox.suspended.ifElseYield[MessageInvocation] { + queue.leave + null + } { + val ret = queue.pop + if (ret eq null) queue.leave + ret + } + if (handle ne null) { + try { + handle.invoke + val f = handle.senderFuture + if (warnings && f.isDefined && !f.get.isCompleted) { + log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) + } + } catch { + case _ => queue.leave + } + true + } else if (queue.isActive) { + queue.leave + false + } else false + } finally { + mbox.lock.unlock + } + if (recurse) { + runQueue(mbox, queue) + } + } +} + +class NestingQueue { + private var q = new LinkedList[MessageInvocation]() + def size = q.size + def push(handle : MessageInvocation) { q.offer(handle) } + def peek = q.peek + def pop = q.poll + + @volatile private var active = false + def enter { if (active) error("already active") else active = true } + def leave { if (!active) error("not active") else active = false } + def isActive = active +} + +class CallingThreadMailbox { + + private val q = new ThreadLocal[NestingQueue]() { + override def initialValue = new NestingQueue + } + + def queue = q.get + + val lock = new ReentrantLock + + val suspended = new Switch(false) +}