From 337d34eac1ac053f9168d9938006490786962bff Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 20 Feb 2011 21:45:54 +0100 Subject: [PATCH 1/6] first shot at CallingThreadDispatcher - add some useful methods to akka.util.Switch and fix a typo - adapt ActorModelSpec not to rely on the Meet message (would not work with CallingThreadDispatcher) - add CallingThreadDispatcher --- .../dispatch/CallingThreadDispatcher.scala | 208 ++++++++++++++++++ .../src/main/scala/akka/util/LockUtil.scala | 30 ++- .../scala/akka/dispatch/ActorModelSpec.scala | 51 +++-- 3 files changed, 267 insertions(+), 22 deletions(-) create mode 100755 akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala diff --git a/akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala new file mode 100755 index 0000000000..a53d501869 --- /dev/null +++ b/akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala @@ -0,0 +1,208 @@ +package akka.dispatch + +import akka.actor.ActorRef +import java.util.concurrent.locks.ReentrantLock +import java.util.LinkedList +import java.util.concurrent.RejectedExecutionException +import akka.util.Switch +import java.lang.ref.WeakReference +import scala.annotation.tailrec + +/* + * Locking rules: + * + * While not suspended, messages are processed (!isActive) or queued + * thread-locally (isActive). While suspended, messages are queued + * thread-locally. When resuming, all messages are atomically scooped from all + * non-active threads and queued on the resuming thread's queue, to be + * processed immediately. Processing a queue checks suspend before each + * invocation, leaving the active state if suspended. For this to work + * reliably, the active flag needs to be set atomically with the initial check + * for suspend. Scooping up messages means replacing the ThreadLocal's contents + * with an empty new NestingQueue. + * + * All accesses to the queue must be done under the suspended-switch's lock, so + * within one of its methods taking a closure argument. + */ + +object CallingThreadDispatcher { + private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() + + // we have to forget about long-gone threads sometime + private def gc { + queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + } + + def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { + if (queues contains mbox) { + val newSet = queues(mbox) + new WeakReference(q) + queues += mbox -> newSet + } else { + queues += mbox -> Set(new WeakReference(q)) + } + gc + } + + /* + * This method must be called with "own" being this thread's queue for the + * given mailbox. When this method returns, the queue will be entere + * (active). + */ + def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { + if (!own.isActive) own.enter + if (queues contains mbox) { + for { + ref <- queues(mbox) + q = ref.get + if (q ne null) && !q.isActive + } { + while (q.peek ne null) { + own.push(q.pop) + } + } + } + } +} + +/** + * Dispatcher which runs invocations on the current thread only. This + * dispatcher does not create any new threads, but it can be used from + * different threads concurrently for the same actor. The dispatch strategy is + * to run on the current thread unless the target actor is either suspended or + * already running on the current thread (if it is running on a different + * thread, then this thread will block until that other invocation is + * finished); if the invocation is not run, it is queued in a thread-local + * queue to be executed once the active invocation further up the call stack + * finishes. This leads to completely deterministic execution order if only one + * thread is used. + * + * Suspending and resuming are global actions for one actor, meaning they can + * affect different threads, which leads to complications. If messages are + * queued (thread-locally) during the suspended period, the only thread to run + * them upon resume is the thread actually calling the resume method. Hence, + * all thread-local queues which are not currently being drained (possible, + * since suspend-queue-resume might happen entirely during an invocation on a + * different thread) are scooped up into the current thread-local queue which + * is then executed. It is possible to suspend an actor from within its call + * stack. + * + * @author Roland Kuhn + * @since 1.1 + */ +class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { + import CallingThreadDispatcher._ + + private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox + + private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] + + private[akka] override def start {} + + private[akka] override def shutdown {} + + private[akka] override def timeoutMs = 0L + + override def suspend(actor: ActorRef) { + getMailbox(actor).suspended.switchOn + } + + override def resume(actor: ActorRef) { + val mbox = getMailbox(actor) + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspended.switchOff { + gatherFromAllInactiveQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + } + + override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size + + private[akka] override def dispatch(handle: MessageInvocation) { + val mbox = getMailbox(handle.receiver) + val queue = mbox.queue + val execute = mbox.suspended.ifElseYield { + queue.push(handle) + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("suspended, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } { + queue.push(handle) + if (queue.isActive) { + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + } + + /* + * This method must be called with this thread's queue, which must already + * have been entered (active). When this method returns, the queue will be + * inactive. + * + * If the catch block is executed, then a non-empty mailbox may be stalled as + * there is no-one who cares to execute it before the next message is sent or + * it is suspended and resumed. + */ + private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { + assert(queue.isActive) + val handle = mbox.suspended.ifElseYield[MessageInvocation] { + queue.leave + null + } { + val ret = queue.pop + if (ret eq null) queue.leave + ret + } + if (handle ne null) { + try { + handle.invoke + val f = handle.senderFuture + if (warnings && f.isDefined && !f.get.isCompleted) { + log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) + } + } catch { + case _ => queue.leave + } + runQueue(mbox, queue) + log.info("runQueue") + } else if (queue.isActive) { + queue.leave + } + } +} + +class NestingQueue { + private var q = new LinkedList[MessageInvocation]() + def size = q.size + def push(handle : MessageInvocation) { q.offer(handle) } + def peek = q.peek + def pop = q.poll + + @volatile private var active = false + def enter { if (active) error("already active") else active = true } + def leave { if (!active) error("not active") else active = false } + def isActive = active +} + +class CallingThreadMailbox { + + private val q = new ThreadLocal[NestingQueue]() { + override def initialValue = new NestingQueue + } + + def queue = q.get + + val suspended = new Switch(false) +} diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index ab383f5f85..e4c76160d9 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -144,7 +144,7 @@ class Switch(startAsOn: Boolean = false) { } def ifOffYield[T](action: => T): Option[T] = { - if (switch.get) Some(action) + if (!switch.get) Some(action) else None } @@ -162,6 +162,34 @@ class Switch(startAsOn: Boolean = false) { } else false } + def whileOnYield[T](action: => T): Option[T] = synchronized { + if (switch.get) Some(action) + else None + } + + def whileOffYield[T](action: => T): Option[T] = synchronized { + if (!switch.get) Some(action) + else None + } + + def whileOn(action: => Unit): Boolean = synchronized { + if (switch.get) { + action + true + } else false + } + + def whileOff(action: => Unit): Boolean = synchronized { + if (switch.get) { + action + true + } else false + } + + def ifElseYield[T](on: => T)(off: => T) = synchronized { + if (switch.get) on else off + } + def isOn = switch.get def isOff = !isOn } diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index b282e08090..b66b489e2f 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -12,7 +12,7 @@ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor -import akka.util.Duration +import akka.util.{Duration, Switch} object ActorModelSpec { @@ -25,6 +25,7 @@ object ActorModelSpec { case class Await(latch: CountDownLatch) extends ActorModelMessage case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage + case class Wait(time: Long) extends ActorModelMessage case object Restart extends ActorModelMessage val Ping = "Ping" @@ -33,22 +34,31 @@ object ActorModelSpec { class DispatcherActor(dispatcher: MessageDispatcherInterceptor) extends Actor { self.dispatcher = dispatcher.asInstanceOf[MessageDispatcher] - def ack { dispatcher.getStats(self).msgsProcessed.incrementAndGet() } + private val busy = new Switch(false) + + def ack { + if (!busy.switchOn()) { + throw new Exception("isolation violated") + } else { + dispatcher.getStats(self).msgsProcessed.incrementAndGet() + } + } override def postRestart(reason: Throwable) { dispatcher.getStats(self).restarts.incrementAndGet() } def receive = { - case Await(latch) => ack; latch.await() - case Meet(sign, wait) => ack; sign.countDown(); wait.await() - case Reply(msg) => ack; self.reply(msg) - case Reply_?(msg) => ack; self.reply_?(msg) - case Forward(to,msg) => ack; to.forward(msg) - case CountDown(latch) => ack; latch.countDown() - case Increment(count) => ack; count.incrementAndGet() - case CountDownNStop(l)=> ack; l.countDown; self.stop - case Restart => ack; throw new Exception("Restart requested") + case Await(latch) => ack; latch.await(); busy.switchOff() + case Meet(sign, wait) => ack; sign.countDown(); wait.await(); busy.switchOff() + case Wait(time) => ack; Thread.sleep(time); busy.switchOff() + case Reply(msg) => ack; self.reply(msg); busy.switchOff() + case Reply_?(msg) => ack; self.reply_?(msg); busy.switchOff() + case Forward(to,msg) => ack; to.forward(msg); busy.switchOff() + case CountDown(latch) => ack; latch.countDown(); busy.switchOff() + case Increment(count) => ack; count.incrementAndGet(); busy.switchOff() + case CountDownNStop(l)=> ack; l.countDown; self.stop; busy.switchOff() + case Restart => ack; busy.switchOff(); throw new Exception("Restart requested") } } @@ -208,23 +218,17 @@ abstract class ActorModelSpec extends JUnitSuite { @Test def dispatcherShouldProcessMessagesOneAtATime { implicit val dispatcher = newInterceptedDispatcher val a = newTestActor - val start,step1,step2,oneAtATime = new CountDownLatch(1) + val start,oneAtATime = new CountDownLatch(1) a.start a ! CountDown(start) assertCountDown(start,3000, "Should process first message within 3 seconds") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1) - a ! Meet(step1,step2) - assertCountDown(step1,3000, "Didn't process the Meet message in 3 seocnds") - assertRefDefaultZero(a)(registers = 1, msgsReceived = 2, msgsProcessed = 2) - + a ! Wait(1000) a ! CountDown(oneAtATime) - assertNoCountDown(oneAtATime,500,"Processed message when not allowed to") - assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 2) - - step2.countDown() - assertCountDown(oneAtATime,500,"Processed message when allowed") + // in case of serialization violation, restart would happen instead of count down + assertCountDown(oneAtATime,1500,"Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) a.stop @@ -310,3 +314,8 @@ class HawtDispatcherModelTest extends ActorModelSpec { class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec { def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor } + +class CallingThreadDispatcherTest extends ActorModelSpec { + def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor + override def dispatcherShouldProcessMessagesInParallel {} +} From 3d28e6ad0fa20ce41592edc6107145dfee8c1190 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sat, 5 Mar 2011 14:36:08 +0100 Subject: [PATCH 2/6] create akka-testkit subproject - modify AkkaProject.scala - move CallingThreadDispatcher & Spec and TestKit into akka-testkit - update FSMTimingSpec accordingly --- .../test/scala/akka/actor/actor/FSMTimingSpec.scala | 2 +- .../test/scala/akka/dispatch/ActorModelSpec.scala | 4 ---- .../akka/testkit}/CallingThreadDispatcher.scala | 3 ++- .../src/main/scala/akka/testkit}/TestKit.scala | 5 +++-- .../akka/testkit/CallingThreadDispatcherSpec.scala | 11 +++++++++++ project/build/AkkaProject.scala | 12 ++++++++++++ 6 files changed, 29 insertions(+), 8 deletions(-) rename {akka-actor/src/main/scala/akka/dispatch => akka-testkit/src/main/scala/akka/testkit}/CallingThreadDispatcher.scala (96%) mode change 100755 => 100644 rename {akka-actor/src/main/scala/akka/util => akka-testkit/src/main/scala/akka/testkit}/TestKit.scala (99%) create mode 100644 akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index b13a61b82f..11bd7f830c 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -1,6 +1,6 @@ package akka.actor -import akka.util.TestKit +import akka.testkit.TestKit import akka.util.duration._ import org.scalatest.WordSpec diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index b66b489e2f..e006ba4198 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -315,7 +315,3 @@ class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModel def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor } -class CallingThreadDispatcherTest extends ActorModelSpec { - def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor - override def dispatcherShouldProcessMessagesInParallel {} -} diff --git a/akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala old mode 100755 new mode 100644 similarity index 96% rename from akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala rename to akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index a53d501869..afafd284a5 --- a/akka-actor/src/main/scala/akka/dispatch/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -1,6 +1,7 @@ -package akka.dispatch +package akka.testkit import akka.actor.ActorRef +import akka.dispatch.{MessageDispatcher, MessageInvocation} import java.util.concurrent.locks.ReentrantLock import java.util.LinkedList import java.util.concurrent.RejectedExecutionException diff --git a/akka-actor/src/main/scala/akka/util/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala similarity index 99% rename from akka-actor/src/main/scala/akka/util/TestKit.scala rename to akka-testkit/src/main/scala/akka/testkit/TestKit.scala index bb400ff992..a2d26ac4a8 100644 --- a/akka-actor/src/main/scala/akka/util/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -1,8 +1,9 @@ -package akka.util +package akka.testkit import akka.actor.{Actor, FSM} import Actor._ -import duration._ +import akka.util.Duration +import akka.util.duration._ import java.util.concurrent.{BlockingDeque, LinkedBlockingDeque, TimeUnit} diff --git a/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala new file mode 100644 index 0000000000..c438a8c3d7 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala @@ -0,0 +1,11 @@ +package akka.testkit + +import akka.actor.dispatch.ActorModelSpec + +class CallingThreadDispatcherTest extends ActorModelSpec { + import ActorModelSpec._ + def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor + override def dispatcherShouldProcessMessagesInParallel {} +} + +// vim: set ts=4 sw=4 et: diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 94559e5de3..11e9dea171 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -212,6 +212,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) + lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) @@ -330,8 +331,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val junit = Dependencies.junit val scalatest = Dependencies.scalatest val multiverse_test = Dependencies.multiverse_test // StandardLatch + + // some tests depend on testkit, so include that and make sure it's compiled + override def testClasspath = super.testClasspath +++ akka_testkit.path("target") / "classes" + override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile) + } + // ------------------------------------------------------------------------------------------------------------------- + // akka-testkit subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) + // ------------------------------------------------------------------------------------------------------------------- // akka-stm subproject // ------------------------------------------------------------------------------------------------------------------- From e1b266c8408cc7c4edbc40694abb5a5aeac32e96 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 6 Mar 2011 21:53:09 +0100 Subject: [PATCH 3/6] add test to ActorModelSpec dispatcherShouldHandleQueueingFromMultipleThreads tests for possible race conditions in prohibiting multiple threads running the same actor concurrently --- .../scala/akka/dispatch/ActorModelSpec.scala | 24 ++++++++++++++++++- .../testkit/CallingThreadDispatcher.scala | 2 +- ...=> CallingThreadDispatcherModelSpec.scala} | 2 +- 3 files changed, 25 insertions(+), 3 deletions(-) rename akka-testkit/src/test/scala/akka/testkit/{CallingThreadDispatcherSpec.scala => CallingThreadDispatcherModelSpec.scala} (81%) diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index e006ba4198..86820a51cb 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -26,6 +26,7 @@ object ActorModelSpec { case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage case class Wait(time: Long) extends ActorModelMessage + case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage case object Restart extends ActorModelMessage val Ping = "Ping" @@ -52,6 +53,7 @@ object ActorModelSpec { case Await(latch) => ack; latch.await(); busy.switchOff() case Meet(sign, wait) => ack; sign.countDown(); wait.await(); busy.switchOff() case Wait(time) => ack; Thread.sleep(time); busy.switchOff() + case WaitAck(time, l) => ack; Thread.sleep(time); l.countDown; busy.switchOff() case Reply(msg) => ack; self.reply(msg); busy.switchOff() case Reply_?(msg) => ack; self.reply_?(msg); busy.switchOff() case Forward(to,msg) => ack; to.forward(msg); busy.switchOff() @@ -235,6 +237,26 @@ abstract class ActorModelSpec extends JUnitSuite { assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) } + @Test def dispatcherShouldHandleQueueingFromMultipleThreads { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor + val counter = new CountDownLatch(200) + a.start + + def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } } + for (i <- 1 to 10) { start } + assertCountDown(counter, 3000, "Should process 200 messages") + assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) + + a.stop + } + + def spawn(f : => Unit) = { + val thread = new Thread { override def run { f } } + thread.start + thread + } + @Test def dispatcherShouldProcessMessagesInParallel: Unit = { implicit val dispatcher = newInterceptedDispatcher val a, b = newTestActor.start @@ -296,7 +318,7 @@ abstract class ActorModelSpec extends JUnitSuite { } for(run <- 1 to 3) { flood(10000) - await(dispatcher.stops.get == run)(withinMs = 10000) + await(dispatcher.stops.get == run)(withinMs = 11000) assertDispatcher(dispatcher)(starts = run, stops = run) } } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index afafd284a5..3a96e3ae4f 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -101,7 +101,7 @@ class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispa private[akka] override def shutdown {} - private[akka] override def timeoutMs = 0L + private[akka] override def timeoutMs = 100L override def suspend(actor: ActorRef) { getMailbox(actor).suspended.switchOn diff --git a/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala similarity index 81% rename from akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala rename to akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala index c438a8c3d7..4645b26d1a 100644 --- a/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherSpec.scala +++ b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -2,7 +2,7 @@ package akka.testkit import akka.actor.dispatch.ActorModelSpec -class CallingThreadDispatcherTest extends ActorModelSpec { +class CallingThreadDispatcherModelSpec extends ActorModelSpec { import ActorModelSpec._ def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor override def dispatcherShouldProcessMessagesInParallel {} From 0e66cd0d8cf87e443e4e41762fcf245f9f1d2d8e Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 6 Mar 2011 22:45:44 +0100 Subject: [PATCH 4/6] add locking to CTD-mbox Up to now it relied on the ActorRef's lock, but that has the side effect that a large time may pass between the suspend test and the actual execution. With this lock in place, the ActorRef lock should never block and the time between suspend and the last actor execution is shortened to some cycles (modulo GC). --- .../.CallingThreadDispatcher.scala.swp | Bin 0 -> 24576 bytes .../testkit/CallingThreadDispatcher.scala | 433 +++++++++--------- 2 files changed, 224 insertions(+), 209 deletions(-) create mode 100644 akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp diff --git a/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp b/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp new file mode 100644 index 0000000000000000000000000000000000000000..4e1f044a0fb7e283aded59dfdb92e82d6b6b2fcf GIT binary patch literal 24576 zcmYc?2=nw+FxN9?U|?VnU|x5+{Jc>U0B~2`FQz11r=| z%+5{(6S^g-#UDBY~f>I*vQAgFrSZsVGbVyLpUD;g9sl3!%sEG_x`=Sg28OMXZ!isAf>m(q~Z6QgA5F z%u7yHFw`>y^Abx-GV+TQg7R|`^HLPNL2Eq}v_LY&C5g$|dLUM2u|i2kYMw%BMQUmbnUW!6uoU4ARlM{0k3QJQVQY6>|0rDf)27H6au>nVg}WELx+x-PW@9IdG(sYSUUbtRd}(1Z$( z*`k!xB8AK}1#m#*=cR(I0EZ^Xdr7HKrKu?j`JhYz3dqFdlFYJH1*GJWR$5e&ky->w zoFE0Dl&k>G9U#M?j!V(B23Z4=N79*DtN=;7qmY7qV3XS5-5{2Z%JWv8j&PdEl2bZIuVjPqUb5fxO z!;RC|f-^JI6f_DI@=_J@N^^2F71Y!e6cN_IRTV1ODio!r>7}QZz{QGE(-drUA*E=s z1}N+_p>m+ySq)}`O$QaT#h?f(NzBYERsgF6)gcNBpk%J0m|va;_n4*vh?kmIl3K)| zppcT9rjVWpN&#*~`MHicIi7itEENbg6Ktu16>9zFo0yrClwY9(GDyKn!7sJABr`7^ ztU*)3N+C2avqZsGp|~zP+lxitxKy1*|19?D8Aw9D!HBTWIVyK=% zIH>Xh=O;+Xs!)_#Qd*Q(42}j!SU|H_ssh*-sVNY1z{L$zPi|>(i9%8;xV%nH0oQB_ zO8Mn^O3+#WY(6Bvsux2I1%+uzMk<4Xg1#05gMvaW$XJE+WH1HcC_t1JD`?w-16e`W zP9Zq8L?bV?Tp>I)F*_&~Tw5lmY7}ZhqN-LQHK#Zg;(rBf+VfJ&gW*vQYA!$o6l|eZ zfnz{ZL0bWv9bl*7PXV9^E=o<$EG|hc0!IvG% z7Auq_W`l~uM1|y>{NmCgP=S`Py?V?p(wQ=Covf$2eKNnz}E-V)KiBPKdE^o#h}`wC^aWDF)61Kl4&9NT_G(e zFwHdMH$Qi zP=kw0Kq)3O4c4v&HG19hi=ee~zCw9^5!jf5qI__`rYL0Q=BB1(CW2ZcdI|v$VUVGT zkaPmExfo`iLQ-m4eo?AIY9gc^o0p%b3$+@S>_E*aa5RD1CZJXlql_#LFq5a1PG^sRPw<(8PeD77>IRif{`Q@}c<)WC+L{pZsKS8ZRo% zNiDWw0F|eixdr(}B?{mr;CkSC6x^EED@n}EDN0RdfXZhjmL=-tB<7{-fht3k+z3+w zT8pk%T9TQg7YuGcz{J5?!QzOPie6AED8HtrxPp5_nfZCH70Iaupne3VHlNJA?9>#W z%;FMEIV2-NxvN+&C^a>&q$n}31mstkda%<{pbOshsxkU?IM^V_*p8V_@*-V_@*(V_;C_V_;z6 zV_ag z&BeeF#l^s2$;H5+%f-MT%f-OJ!^Obxjgx`lIVS_dT}}putDFoB>o^%0R&g>gtmI^1 zSi;G`5YEZKz{SbH@SB5y;Ts17!!iyAhA<8W1``ej20ac2hMVjR4Exv_7*?}G!X}5E zfx(!afkB0xfkA?uf#DY$1H)xD28N?-3=F&37#QZWF)&PIV_>LcV_*noV_*%D@oB%D~{k%D`aF%D^DO%D}+F%E0iAg@NH63j@PF76yjnEDQ_>Sr{1B zu`n=9VqswDfsQA%LB|vdSQr?rSQr@8SQr>yGczz;XJ%kH!py*Mn3;iL4l@HoC^G|t z05b!_4<-hNZ%hmfUzr#fZZI)0TxVinSj@!0P{hQ*kjKQpkjuot5Xr>A5W&R2V9dn8 zU<4hD;9+850JTd%tp<30Rj>t@@Q|#b0dBx*g6cw$B2dbO%A+J-4NyB4tP0dx02gDR z>Ks&MgS3L|P*BJ(OD!tOOo7zQ&=MM4&qGZ_u2UEk6cBZ9jsmD{3u?X~)fcE$tA?gR zH3I{dp%iG{43Y)80^~ecsiR;E>ceD~ff{Te@tjmp1ECty;DwaVnhL2!MfpVDHBW0(U9La;FoW%)UYC7^N?5d=k0 zr^4(5g(}Fpg8Tw-Ko{hLhFidGo`TfWY=~HDYBsciUQk+`p^=f8my(mJV1-$PXev}I z6zb)JMrxp{G{G(b*-)HWl?pZv#6XFbB6zeQ%Drf4iG?VhKy4mKydcLfEIvSS0&6HM zfcmhA7N!oQS^+ibAf1q+)a25l;#5#fgZgj~tqR~~v>qs7>Xqh!C~%XuRv|4j4?N%k z>Es~nP=G`jSQW?^aAMU1$30XG+Tw<{ijg`#FvAqU<8M&)$Yv`jBqtW9D#R<;+CiL; zVzPoltwJ(rD-*a?3bGx9VIBdo6%=yv)AfpT(oC}SK*N3-N}!evs1Z@s zkWhxW0OWCy-Jl^yP-inuFSFPsH4QxI1?qc(yO?^J#m=D7qSTVq6j+Qvw1ditGzD9j zjm4>;hNfF-Nof%%qe0Xtz+^$~vTTHSNl_&{Xh6<@xEkK-hs&V`c@ez(12P??uPC)d zA+-?NU&f4KkTQ@nLCT=rB)x+C0z`ZwS)>3q1ug~cfuTez$S9~wA-y{I07Z&kCc-Xo z>Q#UZ;(~^7D-n_i7DO3D8j{{Xy>Uni2Sp~>DT&3!sYN9k5U;@sG*BcuK$}DG;uDmI zi%Rnl`3iT>4b)-B=)HlR0PA=|2DK1X1*p*qZf}Bz?i3WXKn>TTRM4nUUcN4)>F{rVgms(K*>n4D77K28ML1RcD^E}f)MnZE9WK1j*l+YnV z)8O6#XlxDCpoa8Lpu+$Pxrvpa{(EsrVopwKib7&BDBQqfDq0Ge&>;%w_yM?Mn3M_` zH%=`AjqO6pXmGIqZ4I*F}IDn1Ag8OICS^{kl z0hFkrp{I$bm_*JO$i)h%ZUH$Dl*(X(2zubE3N8vQ^}x{$RR*s6;R=w=hAM#NVpypH zN`Npa%#sWod*Gp01*AaGQAjRIO$3eqC_oAch2;FwoD>CE*<}qKZ?saVu2s@Oat$a* zkc%lu4%9;~Zx96+auEg6h?a4X@;gKZsltSLsuoERYRDjmG|2lqR^pfJD4N&(g%(nNI) zxU;0G2X?d`xP1d3lY+%5O3{hYB!LW=q^5wXNyYL+=*TZDvBAtBG(3u8EGXRI#$rSP zM8-cY4VoB1s^Rv6nrWai9Nu7oM+Df1#KsXMR2a}2?2uv}GeE0B;ScdCQanST}w!1Nn%k6sxol9 z8r^rOsS502J!sO?ODy)xD=taQOHTDq!#Y+NOTZD}GBFjFYrux1+6U?lp^PCy)q>20 zj)b5{BZe6D;=%oYcW7T3wEth3fq~&0KLf*6eg=l!{0s~m`572i@-r|j=4W7-&(FXx zm!E-Q4nG6K41NZNAbti0Zhi)aJ$wudxqJ)^%6tqAcX=5YCh;;bxbiYEeC1(ac*MiN zaG!^PVImI$gBuS6!*^~5hE?1Q3~}5H3<9A3Jr@H*DHj7nHWvd!1{VWEAQuCJGZzDc zJr|_^KJ->pjJkL<1V%$(Gz3ONU^E0qLtr!n21N)!hFx(q14maDj;<_(%^Z)eEX*y) z8C_WjUm%+aUV;G~i3ANd!d47|hHIfq_=eWX!jOzq1=whkLUBn^Vo7Rxr2@pCF!9tP zg_QhM$P!LS|63t3uM#{N3RxLZtfP=rS^^rqPEO1N4K9P0tb$iQgO)5Hdf1=^eaRW1 za4SYyS6Yl?WhH2-DrBiEgT9s)WI`4)o&q6}$6a9}pixH9B0TUgWg%$p0yL5a5>)_M zo|CGeQK$!=2!yX1gbnP1hA~0&|IM5X4Aoqa`G3%!{=NJR49lT?{2qP=hCJx{e@lJ_ z27P`824Q{%23CFshPQkS49EEx7&@Ty{Xcja7@k4r{P*xOFs$cgVCdmxV5s0_V94TS zUeoS0Esr{4Gz3ONU^E0qLtr!nMnhmU1V%^*fZDFLwIC8S zX$hV8OHF|;{v&C*A!w2dw5|mg<(SP|B7gDhehU1$hOb0Cc9N#UQ> zr*NSmWPK>=5)o7$INhQwA%m{I0WIf%NW#~2K_~h_eLIv@JP;3ogh$sLg4dypt~ms6 T$xcnt%S5cx!LlA#p_%~z0}J%8 literal 0 HcmV?d00001 diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 3a96e3ae4f..8b106ac1cd 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -1,209 +1,224 @@ -package akka.testkit - -import akka.actor.ActorRef -import akka.dispatch.{MessageDispatcher, MessageInvocation} -import java.util.concurrent.locks.ReentrantLock -import java.util.LinkedList -import java.util.concurrent.RejectedExecutionException -import akka.util.Switch -import java.lang.ref.WeakReference -import scala.annotation.tailrec - -/* - * Locking rules: - * - * While not suspended, messages are processed (!isActive) or queued - * thread-locally (isActive). While suspended, messages are queued - * thread-locally. When resuming, all messages are atomically scooped from all - * non-active threads and queued on the resuming thread's queue, to be - * processed immediately. Processing a queue checks suspend before each - * invocation, leaving the active state if suspended. For this to work - * reliably, the active flag needs to be set atomically with the initial check - * for suspend. Scooping up messages means replacing the ThreadLocal's contents - * with an empty new NestingQueue. - * - * All accesses to the queue must be done under the suspended-switch's lock, so - * within one of its methods taking a closure argument. - */ - -object CallingThreadDispatcher { - private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() - - // we have to forget about long-gone threads sometime - private def gc { - queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) - } - - def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { - if (queues contains mbox) { - val newSet = queues(mbox) + new WeakReference(q) - queues += mbox -> newSet - } else { - queues += mbox -> Set(new WeakReference(q)) - } - gc - } - - /* - * This method must be called with "own" being this thread's queue for the - * given mailbox. When this method returns, the queue will be entere - * (active). - */ - def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { - if (!own.isActive) own.enter - if (queues contains mbox) { - for { - ref <- queues(mbox) - q = ref.get - if (q ne null) && !q.isActive - } { - while (q.peek ne null) { - own.push(q.pop) - } - } - } - } -} - -/** - * Dispatcher which runs invocations on the current thread only. This - * dispatcher does not create any new threads, but it can be used from - * different threads concurrently for the same actor. The dispatch strategy is - * to run on the current thread unless the target actor is either suspended or - * already running on the current thread (if it is running on a different - * thread, then this thread will block until that other invocation is - * finished); if the invocation is not run, it is queued in a thread-local - * queue to be executed once the active invocation further up the call stack - * finishes. This leads to completely deterministic execution order if only one - * thread is used. - * - * Suspending and resuming are global actions for one actor, meaning they can - * affect different threads, which leads to complications. If messages are - * queued (thread-locally) during the suspended period, the only thread to run - * them upon resume is the thread actually calling the resume method. Hence, - * all thread-local queues which are not currently being drained (possible, - * since suspend-queue-resume might happen entirely during an invocation on a - * different thread) are scooped up into the current thread-local queue which - * is then executed. It is possible to suspend an actor from within its call - * stack. - * - * @author Roland Kuhn - * @since 1.1 - */ -class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { - import CallingThreadDispatcher._ - - private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox - - private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] - - private[akka] override def start {} - - private[akka] override def shutdown {} - - private[akka] override def timeoutMs = 100L - - override def suspend(actor: ActorRef) { - getMailbox(actor).suspended.switchOn - } - - override def resume(actor: ActorRef) { - val mbox = getMailbox(actor) - val queue = mbox.queue - val wasActive = queue.isActive - val switched = mbox.suspended.switchOff { - gatherFromAllInactiveQueues(mbox, queue) - } - if (switched && !wasActive) { - runQueue(mbox, queue) - } - } - - override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size - - private[akka] override def dispatch(handle: MessageInvocation) { - val mbox = getMailbox(handle.receiver) - val queue = mbox.queue - val execute = mbox.suspended.ifElseYield { - queue.push(handle) - if (warnings && handle.senderFuture.isDefined) { - log.slf4j.warn("suspended, creating Future could deadlock; target: {}", - handle.receiver) - } - false - } { - queue.push(handle) - if (queue.isActive) { - if (warnings && handle.senderFuture.isDefined) { - log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", - handle.receiver) - } - false - } else { - queue.enter - true - } - } - if (execute) runQueue(mbox, queue) - } - - /* - * This method must be called with this thread's queue, which must already - * have been entered (active). When this method returns, the queue will be - * inactive. - * - * If the catch block is executed, then a non-empty mailbox may be stalled as - * there is no-one who cares to execute it before the next message is sent or - * it is suspended and resumed. - */ - private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { - assert(queue.isActive) - val handle = mbox.suspended.ifElseYield[MessageInvocation] { - queue.leave - null - } { - val ret = queue.pop - if (ret eq null) queue.leave - ret - } - if (handle ne null) { - try { - handle.invoke - val f = handle.senderFuture - if (warnings && f.isDefined && !f.get.isCompleted) { - log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) - } - } catch { - case _ => queue.leave - } - runQueue(mbox, queue) - log.info("runQueue") - } else if (queue.isActive) { - queue.leave - } - } -} - -class NestingQueue { - private var q = new LinkedList[MessageInvocation]() - def size = q.size - def push(handle : MessageInvocation) { q.offer(handle) } - def peek = q.peek - def pop = q.poll - - @volatile private var active = false - def enter { if (active) error("already active") else active = true } - def leave { if (!active) error("not active") else active = false } - def isActive = active -} - -class CallingThreadMailbox { - - private val q = new ThreadLocal[NestingQueue]() { - override def initialValue = new NestingQueue - } - - def queue = q.get - - val suspended = new Switch(false) -} +package akka.testkit + +import akka.actor.ActorRef +import akka.dispatch.{MessageDispatcher, MessageInvocation} +import java.util.concurrent.locks.ReentrantLock +import java.util.LinkedList +import java.util.concurrent.RejectedExecutionException +import akka.util.Switch +import java.lang.ref.WeakReference +import scala.annotation.tailrec + +/* + * Locking rules: + * + * While not suspended, messages are processed (!isActive) or queued + * thread-locally (isActive). While suspended, messages are queued + * thread-locally. When resuming, all messages are atomically scooped from all + * non-active threads and queued on the resuming thread's queue, to be + * processed immediately. Processing a queue checks suspend before each + * invocation, leaving the active state if suspended. For this to work + * reliably, the active flag needs to be set atomically with the initial check + * for suspend. Scooping up messages means replacing the ThreadLocal's contents + * with an empty new NestingQueue. + * + * All accesses to the queue must be done under the suspended-switch's lock, so + * within one of its methods taking a closure argument. + */ + +object CallingThreadDispatcher { + private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() + + // we have to forget about long-gone threads sometime + private def gc { + queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + } + + def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { + if (queues contains mbox) { + val newSet = queues(mbox) + new WeakReference(q) + queues += mbox -> newSet + } else { + queues += mbox -> Set(new WeakReference(q)) + } + gc + } + + /* + * This method must be called with "own" being this thread's queue for the + * given mailbox. When this method returns, the queue will be entered + * (active). + */ + def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { + if (!own.isActive) own.enter + if (queues contains mbox) { + for { + ref <- queues(mbox) + q = ref.get + if (q ne null) && !q.isActive + /* + * if q.isActive was false, then it cannot change to true while we are + * holding the mbox.suspende.switch's lock under which we are currently + * executing + */ + } { + while (q.peek ne null) { + own.push(q.pop) + } + } + } + } +} + +/** + * Dispatcher which runs invocations on the current thread only. This + * dispatcher does not create any new threads, but it can be used from + * different threads concurrently for the same actor. The dispatch strategy is + * to run on the current thread unless the target actor is either suspended or + * already running on the current thread (if it is running on a different + * thread, then this thread will block until that other invocation is + * finished); if the invocation is not run, it is queued in a thread-local + * queue to be executed once the active invocation further up the call stack + * finishes. This leads to completely deterministic execution order if only one + * thread is used. + * + * Suspending and resuming are global actions for one actor, meaning they can + * affect different threads, which leads to complications. If messages are + * queued (thread-locally) during the suspended period, the only thread to run + * them upon resume is the thread actually calling the resume method. Hence, + * all thread-local queues which are not currently being drained (possible, + * since suspend-queue-resume might happen entirely during an invocation on a + * different thread) are scooped up into the current thread-local queue which + * is then executed. It is possible to suspend an actor from within its call + * stack. + * + * @author Roland Kuhn + * @since 1.1 + */ +class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { + import CallingThreadDispatcher._ + + private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox + + private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] + + private[akka] override def start {} + + private[akka] override def shutdown {} + + private[akka] override def timeoutMs = 100L + + override def suspend(actor: ActorRef) { + getMailbox(actor).suspended.switchOn + } + + override def resume(actor: ActorRef) { + val mbox = getMailbox(actor) + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspended.switchOff { + gatherFromAllInactiveQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + } + + override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size + + private[akka] override def dispatch(handle: MessageInvocation) { + val mbox = getMailbox(handle.receiver) + val queue = mbox.queue + val execute = mbox.suspended.ifElseYield { + queue.push(handle) + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("suspended, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } { + queue.push(handle) + if (queue.isActive) { + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + } + + /* + * This method must be called with this thread's queue, which must already + * have been entered (active). When this method returns, the queue will be + * inactive. + * + * If the catch block is executed, then a non-empty mailbox may be stalled as + * there is no-one who cares to execute it before the next message is sent or + * it is suspended and resumed. + */ + @tailrec private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { + assert(queue.isActive) + mbox.lock.lock + val recurse = try { + val handle = mbox.suspended.ifElseYield[MessageInvocation] { + queue.leave + null + } { + val ret = queue.pop + if (ret eq null) queue.leave + ret + } + if (handle ne null) { + try { + handle.invoke + val f = handle.senderFuture + if (warnings && f.isDefined && !f.get.isCompleted) { + log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) + } + } catch { + case _ => queue.leave + } + true + } else if (queue.isActive) { + queue.leave + false + } else false + } finally { + mbox.lock.unlock + } + if (recurse) { + runQueue(mbox, queue) + } + } +} + +class NestingQueue { + private var q = new LinkedList[MessageInvocation]() + def size = q.size + def push(handle : MessageInvocation) { q.offer(handle) } + def peek = q.peek + def pop = q.poll + + @volatile private var active = false + def enter { if (active) error("already active") else active = true } + def leave { if (!active) error("not active") else active = false } + def isActive = active +} + +class CallingThreadMailbox { + + private val q = new ThreadLocal[NestingQueue]() { + override def initialValue = new NestingQueue + } + + def queue = q.get + + val lock = new ReentrantLock + + val suspended = new Switch(false) +} From d15e5e79197119c7e77555d18ef3393319ff6265 Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 17 Mar 2011 17:59:22 +0100 Subject: [PATCH 5/6] ignore VIM swap files (and clean up previous accident) --- .gitignore | 3 ++- .../testkit/.CallingThreadDispatcher.scala.swp | Bin 24576 -> 0 bytes 2 files changed, 2 insertions(+), 1 deletion(-) delete mode 100644 akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp diff --git a/.gitignore b/.gitignore index 0924b60661..c0fa0f10b4 100755 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,5 @@ run-codefellow .idea .scala_dependencies multiverse.log -.eprj \ No newline at end of file +.eprj +.*.swp diff --git a/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp b/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp deleted file mode 100644 index 4e1f044a0fb7e283aded59dfdb92e82d6b6b2fcf..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 24576 zcmYc?2=nw+FxN9?U|?VnU|x5+{Jc>U0B~2`FQz11r=| z%+5{(6S^g-#UDBY~f>I*vQAgFrSZsVGbVyLpUD;g9sl3!%sEG_x`=Sg28OMXZ!isAf>m(q~Z6QgA5F z%u7yHFw`>y^Abx-GV+TQg7R|`^HLPNL2Eq}v_LY&C5g$|dLUM2u|i2kYMw%BMQUmbnUW!6uoU4ARlM{0k3QJQVQY6>|0rDf)27H6au>nVg}WELx+x-PW@9IdG(sYSUUbtRd}(1Z$( z*`k!xB8AK}1#m#*=cR(I0EZ^Xdr7HKrKu?j`JhYz3dqFdlFYJH1*GJWR$5e&ky->w zoFE0Dl&k>G9U#M?j!V(B23Z4=N79*DtN=;7qmY7qV3XS5-5{2Z%JWv8j&PdEl2bZIuVjPqUb5fxO z!;RC|f-^JI6f_DI@=_J@N^^2F71Y!e6cN_IRTV1ODio!r>7}QZz{QGE(-drUA*E=s z1}N+_p>m+ySq)}`O$QaT#h?f(NzBYERsgF6)gcNBpk%J0m|va;_n4*vh?kmIl3K)| zppcT9rjVWpN&#*~`MHicIi7itEENbg6Ktu16>9zFo0yrClwY9(GDyKn!7sJABr`7^ ztU*)3N+C2avqZsGp|~zP+lxitxKy1*|19?D8Aw9D!HBTWIVyK=% zIH>Xh=O;+Xs!)_#Qd*Q(42}j!SU|H_ssh*-sVNY1z{L$zPi|>(i9%8;xV%nH0oQB_ zO8Mn^O3+#WY(6Bvsux2I1%+uzMk<4Xg1#05gMvaW$XJE+WH1HcC_t1JD`?w-16e`W zP9Zq8L?bV?Tp>I)F*_&~Tw5lmY7}ZhqN-LQHK#Zg;(rBf+VfJ&gW*vQYA!$o6l|eZ zfnz{ZL0bWv9bl*7PXV9^E=o<$EG|hc0!IvG% z7Auq_W`l~uM1|y>{NmCgP=S`Py?V?p(wQ=Covf$2eKNnz}E-V)KiBPKdE^o#h}`wC^aWDF)61Kl4&9NT_G(e zFwHdMH$Qi zP=kw0Kq)3O4c4v&HG19hi=ee~zCw9^5!jf5qI__`rYL0Q=BB1(CW2ZcdI|v$VUVGT zkaPmExfo`iLQ-m4eo?AIY9gc^o0p%b3$+@S>_E*aa5RD1CZJXlql_#LFq5a1PG^sRPw<(8PeD77>IRif{`Q@}c<)WC+L{pZsKS8ZRo% zNiDWw0F|eixdr(}B?{mr;CkSC6x^EED@n}EDN0RdfXZhjmL=-tB<7{-fht3k+z3+w zT8pk%T9TQg7YuGcz{J5?!QzOPie6AED8HtrxPp5_nfZCH70Iaupne3VHlNJA?9>#W z%;FMEIV2-NxvN+&C^a>&q$n}31mstkda%<{pbOshsxkU?IM^V_*p8V_@*-V_@*(V_;C_V_;z6 zV_ag z&BeeF#l^s2$;H5+%f-MT%f-OJ!^Obxjgx`lIVS_dT}}putDFoB>o^%0R&g>gtmI^1 zSi;G`5YEZKz{SbH@SB5y;Ts17!!iyAhA<8W1``ej20ac2hMVjR4Exv_7*?}G!X}5E zfx(!afkB0xfkA?uf#DY$1H)xD28N?-3=F&37#QZWF)&PIV_>LcV_*noV_*%D@oB%D~{k%D`aF%D^DO%D}+F%E0iAg@NH63j@PF76yjnEDQ_>Sr{1B zu`n=9VqswDfsQA%LB|vdSQr?rSQr@8SQr>yGczz;XJ%kH!py*Mn3;iL4l@HoC^G|t z05b!_4<-hNZ%hmfUzr#fZZI)0TxVinSj@!0P{hQ*kjKQpkjuot5Xr>A5W&R2V9dn8 zU<4hD;9+850JTd%tp<30Rj>t@@Q|#b0dBx*g6cw$B2dbO%A+J-4NyB4tP0dx02gDR z>Ks&MgS3L|P*BJ(OD!tOOo7zQ&=MM4&qGZ_u2UEk6cBZ9jsmD{3u?X~)fcE$tA?gR zH3I{dp%iG{43Y)80^~ecsiR;E>ceD~ff{Te@tjmp1ECty;DwaVnhL2!MfpVDHBW0(U9La;FoW%)UYC7^N?5d=k0 zr^4(5g(}Fpg8Tw-Ko{hLhFidGo`TfWY=~HDYBsciUQk+`p^=f8my(mJV1-$PXev}I z6zb)JMrxp{G{G(b*-)HWl?pZv#6XFbB6zeQ%Drf4iG?VhKy4mKydcLfEIvSS0&6HM zfcmhA7N!oQS^+ibAf1q+)a25l;#5#fgZgj~tqR~~v>qs7>Xqh!C~%XuRv|4j4?N%k z>Es~nP=G`jSQW?^aAMU1$30XG+Tw<{ijg`#FvAqU<8M&)$Yv`jBqtW9D#R<;+CiL; zVzPoltwJ(rD-*a?3bGx9VIBdo6%=yv)AfpT(oC}SK*N3-N}!evs1Z@s zkWhxW0OWCy-Jl^yP-inuFSFPsH4QxI1?qc(yO?^J#m=D7qSTVq6j+Qvw1ditGzD9j zjm4>;hNfF-Nof%%qe0Xtz+^$~vTTHSNl_&{Xh6<@xEkK-hs&V`c@ez(12P??uPC)d zA+-?NU&f4KkTQ@nLCT=rB)x+C0z`ZwS)>3q1ug~cfuTez$S9~wA-y{I07Z&kCc-Xo z>Q#UZ;(~^7D-n_i7DO3D8j{{Xy>Uni2Sp~>DT&3!sYN9k5U;@sG*BcuK$}DG;uDmI zi%Rnl`3iT>4b)-B=)HlR0PA=|2DK1X1*p*qZf}Bz?i3WXKn>TTRM4nUUcN4)>F{rVgms(K*>n4D77K28ML1RcD^E}f)MnZE9WK1j*l+YnV z)8O6#XlxDCpoa8Lpu+$Pxrvpa{(EsrVopwKib7&BDBQqfDq0Ge&>;%w_yM?Mn3M_` zH%=`AjqO6pXmGIqZ4I*F}IDn1Ag8OICS^{kl z0hFkrp{I$bm_*JO$i)h%ZUH$Dl*(X(2zubE3N8vQ^}x{$RR*s6;R=w=hAM#NVpypH zN`Npa%#sWod*Gp01*AaGQAjRIO$3eqC_oAch2;FwoD>CE*<}qKZ?saVu2s@Oat$a* zkc%lu4%9;~Zx96+auEg6h?a4X@;gKZsltSLsuoERYRDjmG|2lqR^pfJD4N&(g%(nNI) zxU;0G2X?d`xP1d3lY+%5O3{hYB!LW=q^5wXNyYL+=*TZDvBAtBG(3u8EGXRI#$rSP zM8-cY4VoB1s^Rv6nrWai9Nu7oM+Df1#KsXMR2a}2?2uv}GeE0B;ScdCQanST}w!1Nn%k6sxol9 z8r^rOsS502J!sO?ODy)xD=taQOHTDq!#Y+NOTZD}GBFjFYrux1+6U?lp^PCy)q>20 zj)b5{BZe6D;=%oYcW7T3wEth3fq~&0KLf*6eg=l!{0s~m`572i@-r|j=4W7-&(FXx zm!E-Q4nG6K41NZNAbti0Zhi)aJ$wudxqJ)^%6tqAcX=5YCh;;bxbiYEeC1(ac*MiN zaG!^PVImI$gBuS6!*^~5hE?1Q3~}5H3<9A3Jr@H*DHj7nHWvd!1{VWEAQuCJGZzDc zJr|_^KJ->pjJkL<1V%$(Gz3ONU^E0qLtr!n21N)!hFx(q14maDj;<_(%^Z)eEX*y) z8C_WjUm%+aUV;G~i3ANd!d47|hHIfq_=eWX!jOzq1=whkLUBn^Vo7Rxr2@pCF!9tP zg_QhM$P!LS|63t3uM#{N3RxLZtfP=rS^^rqPEO1N4K9P0tb$iQgO)5Hdf1=^eaRW1 za4SYyS6Yl?WhH2-DrBiEgT9s)WI`4)o&q6}$6a9}pixH9B0TUgWg%$p0yL5a5>)_M zo|CGeQK$!=2!yX1gbnP1hA~0&|IM5X4Aoqa`G3%!{=NJR49lT?{2qP=hCJx{e@lJ_ z27P`824Q{%23CFshPQkS49EEx7&@Ty{Xcja7@k4r{P*xOFs$cgVCdmxV5s0_V94TS zUeoS0Esr{4Gz3ONU^E0qLtr!nMnhmU1V%^*fZDFLwIC8S zX$hV8OHF|;{v&C*A!w2dw5|mg<(SP|B7gDhehU1$hOb0Cc9N#UQ> zr*NSmWPK>=5)o7$INhQwA%m{I0WIf%NW#~2K_~h_eLIv@JP;3ogh$sLg4dypt~ms6 T$xcnt%S5cx!LlA#p_%~z0}J%8 From 18080cbb89ebdffb60678f8eb5adb2a0571480fc Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Thu, 17 Mar 2011 20:49:38 +0100 Subject: [PATCH 6/6] make FSMTimingSpec more deterministic The test for repeated timer had a race condition between receiving the sixth Tick and the Cancel message. Fix this by cancelling from within after the fifth Tick received. --- .../akka/actor/actor/FSMTimingSpec.scala | 23 ++++++++++--------- 1 file changed, 12 insertions(+), 11 deletions(-) diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index 11bd7f830c..a59785ab7a 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -16,7 +16,7 @@ class FSMTimingSpec val fsm = Actor.actorOf(new StateMachine(testActor)).start fsm ! SubscribeTransitionCallBack(testActor) - expectMsg(100 millis, CurrentState(fsm, Initial)) + expectMsg(200 millis, CurrentState(fsm, Initial)) ignoreMsg { case Transition(_, Initial, _) => true @@ -43,12 +43,11 @@ class FSMTimingSpec "receive and cancel a repeated timer" in { fsm ! TestRepeatedTimer - val seq = receiveWhile(550 millis) { + val seq = receiveWhile(600 millis) { case Tick => Tick } seq must have length (5) within(250 millis) { - fsm ! Cancel expectMsg(Transition(fsm, TestRepeatedTimer, Initial)) expectNoMsg } @@ -95,17 +94,17 @@ object FSMTimingSpec { case class Unhandled(msg : AnyRef) - class StateMachine(tester : ActorRef) extends Actor with FSM[State, Unit] { + class StateMachine(tester : ActorRef) extends Actor with FSM[State, Int] { import FSM._ - startWith(Initial, ()) + startWith(Initial, 0) when(Initial) { case Ev(TestSingleTimer) => setTimer("tester", Tick, 100 millis, false) goto(TestSingleTimer) case Ev(TestRepeatedTimer) => setTimer("tester", Tick, 100 millis, true) - goto(TestRepeatedTimer) + goto(TestRepeatedTimer) using 4 case Ev(x : FSMTimingSpec.State) => goto(x) } when(TestStateTimeout, stateTimeout = 100 millis) { @@ -117,12 +116,14 @@ object FSMTimingSpec { goto(Initial) } when(TestRepeatedTimer) { - case Ev(Tick) => + case Event(Tick, remaining) => tester ! Tick - stay - case Ev(Cancel) => - cancelTimer("tester") - goto(Initial) + if (remaining == 0) { + cancelTimer("tester") + goto(Initial) + } else { + stay using (remaining - 1) + } } when(TestUnhandled) { case Ev(SetHandler) =>