diff --git a/.gitignore b/.gitignore index 0924b60661..c0fa0f10b4 100755 --- a/.gitignore +++ b/.gitignore @@ -43,4 +43,5 @@ run-codefellow .idea .scala_dependencies multiverse.log -.eprj \ No newline at end of file +.eprj +.*.swp diff --git a/akka-actor/src/main/scala/akka/util/LockUtil.scala b/akka-actor/src/main/scala/akka/util/LockUtil.scala index 3e234b8dea..1869dbb5e3 100644 --- a/akka-actor/src/main/scala/akka/util/LockUtil.scala +++ b/akka-actor/src/main/scala/akka/util/LockUtil.scala @@ -164,6 +164,34 @@ class Switch(startAsOn: Boolean = false) { } else false } + def whileOnYield[T](action: => T): Option[T] = synchronized { + if (switch.get) Some(action) + else None + } + + def whileOffYield[T](action: => T): Option[T] = synchronized { + if (!switch.get) Some(action) + else None + } + + def whileOn(action: => Unit): Boolean = synchronized { + if (switch.get) { + action + true + } else false + } + + def whileOff(action: => Unit): Boolean = synchronized { + if (switch.get) { + action + true + } else false + } + + def ifElseYield[T](on: => T)(off: => T) = synchronized { + if (switch.get) on else off + } + def isOn = switch.get def isOff = !isOn } diff --git a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala index b13a61b82f..a59785ab7a 100644 --- a/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala +++ b/akka-actor/src/test/scala/akka/actor/actor/FSMTimingSpec.scala @@ -1,6 +1,6 @@ package akka.actor -import akka.util.TestKit +import akka.testkit.TestKit import akka.util.duration._ import org.scalatest.WordSpec @@ -16,7 +16,7 @@ class FSMTimingSpec val fsm = Actor.actorOf(new StateMachine(testActor)).start fsm ! SubscribeTransitionCallBack(testActor) - expectMsg(100 millis, CurrentState(fsm, Initial)) + expectMsg(200 millis, CurrentState(fsm, Initial)) ignoreMsg { case Transition(_, Initial, _) => true @@ -43,12 +43,11 @@ class FSMTimingSpec "receive and cancel a repeated timer" in { fsm ! TestRepeatedTimer - val seq = receiveWhile(550 millis) { + val seq = receiveWhile(600 millis) { case Tick => Tick } seq must have length (5) within(250 millis) { - fsm ! Cancel expectMsg(Transition(fsm, TestRepeatedTimer, Initial)) expectNoMsg } @@ -95,17 +94,17 @@ object FSMTimingSpec { case class Unhandled(msg : AnyRef) - class StateMachine(tester : ActorRef) extends Actor with FSM[State, Unit] { + class StateMachine(tester : ActorRef) extends Actor with FSM[State, Int] { import FSM._ - startWith(Initial, ()) + startWith(Initial, 0) when(Initial) { case Ev(TestSingleTimer) => setTimer("tester", Tick, 100 millis, false) goto(TestSingleTimer) case Ev(TestRepeatedTimer) => setTimer("tester", Tick, 100 millis, true) - goto(TestRepeatedTimer) + goto(TestRepeatedTimer) using 4 case Ev(x : FSMTimingSpec.State) => goto(x) } when(TestStateTimeout, stateTimeout = 100 millis) { @@ -117,12 +116,14 @@ object FSMTimingSpec { goto(Initial) } when(TestRepeatedTimer) { - case Ev(Tick) => + case Event(Tick, remaining) => tester ! Tick - stay - case Ev(Cancel) => - cancelTimer("tester") - goto(Initial) + if (remaining == 0) { + cancelTimer("tester") + goto(Initial) + } else { + stay using (remaining - 1) + } } when(TestUnhandled) { case Ev(SetHandler) => diff --git a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala index 1a5e9753b8..c7bdd61241 100644 --- a/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -12,7 +12,7 @@ import akka.actor.Actor._ import java.util.concurrent.atomic.AtomicLong import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit} import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor -import akka.util.Duration +import akka.util.{Duration, Switch} object ActorModelSpec { @@ -25,6 +25,8 @@ object ActorModelSpec { case class Await(latch: CountDownLatch) extends ActorModelMessage case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage + case class Wait(time: Long) extends ActorModelMessage + case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage case object Restart extends ActorModelMessage val Ping = "Ping" @@ -33,22 +35,32 @@ object ActorModelSpec { class DispatcherActor(dispatcher: MessageDispatcherInterceptor) extends Actor { self.dispatcher = dispatcher.asInstanceOf[MessageDispatcher] - def ack { dispatcher.getStats(self).msgsProcessed.incrementAndGet() } + private val busy = new Switch(false) + + def ack { + if (!busy.switchOn()) { + throw new Exception("isolation violated") + } else { + dispatcher.getStats(self).msgsProcessed.incrementAndGet() + } + } override def postRestart(reason: Throwable) { dispatcher.getStats(self).restarts.incrementAndGet() } def receive = { - case Await(latch) => ack; latch.await() - case Meet(sign, wait) => ack; sign.countDown(); wait.await() - case Reply(msg) => ack; self.reply(msg) - case Reply_?(msg) => ack; self.reply_?(msg) - case Forward(to,msg) => ack; to.forward(msg) - case CountDown(latch) => ack; latch.countDown() - case Increment(count) => ack; count.incrementAndGet() - case CountDownNStop(l)=> ack; l.countDown; self.stop - case Restart => ack; throw new Exception("Restart requested") + case Await(latch) => ack; latch.await(); busy.switchOff() + case Meet(sign, wait) => ack; sign.countDown(); wait.await(); busy.switchOff() + case Wait(time) => ack; Thread.sleep(time); busy.switchOff() + case WaitAck(time, l) => ack; Thread.sleep(time); l.countDown; busy.switchOff() + case Reply(msg) => ack; self.reply(msg); busy.switchOff() + case Reply_?(msg) => ack; self.reply_?(msg); busy.switchOff() + case Forward(to,msg) => ack; to.forward(msg); busy.switchOff() + case CountDown(latch) => ack; latch.countDown(); busy.switchOff() + case Increment(count) => ack; count.incrementAndGet(); busy.switchOff() + case CountDownNStop(l)=> ack; l.countDown; self.stop; busy.switchOff() + case Restart => ack; busy.switchOff(); throw new Exception("Restart requested") } } @@ -208,29 +220,43 @@ abstract class ActorModelSpec extends JUnitSuite { @Test def dispatcherShouldProcessMessagesOneAtATime { implicit val dispatcher = newInterceptedDispatcher val a = newTestActor - val start,step1,step2,oneAtATime = new CountDownLatch(1) + val start,oneAtATime = new CountDownLatch(1) a.start a ! CountDown(start) assertCountDown(start,3000, "Should process first message within 3 seconds") assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1) - a ! Meet(step1,step2) - assertCountDown(step1,3000, "Didn't process the Meet message in 3 seocnds") - assertRefDefaultZero(a)(registers = 1, msgsReceived = 2, msgsProcessed = 2) - + a ! Wait(1000) a ! CountDown(oneAtATime) - assertNoCountDown(oneAtATime,500,"Processed message when not allowed to") - assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 2) - - step2.countDown() - assertCountDown(oneAtATime,500,"Processed message when allowed") + // in case of serialization violation, restart would happen instead of count down + assertCountDown(oneAtATime,1500,"Processed message when allowed") assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3) a.stop assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3) } + @Test def dispatcherShouldHandleQueueingFromMultipleThreads { + implicit val dispatcher = newInterceptedDispatcher + val a = newTestActor + val counter = new CountDownLatch(200) + a.start + + def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } } + for (i <- 1 to 10) { start } + assertCountDown(counter, 3000, "Should process 200 messages") + assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200) + + a.stop + } + + def spawn(f : => Unit) = { + val thread = new Thread { override def run { f } } + thread.start + thread + } + @Test def dispatcherShouldProcessMessagesInParallel: Unit = { implicit val dispatcher = newInterceptedDispatcher val a, b = newTestActor.start @@ -304,5 +330,6 @@ class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec { } class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec { - def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor -} \ No newline at end of file + def newInterceptedDispatcher = + new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor +} diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala new file mode 100644 index 0000000000..131c18b279 --- /dev/null +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -0,0 +1,224 @@ +package akka.testkit + +import akka.actor.{ActorRef, EventHandler} +import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation} +import java.util.concurrent.locks.ReentrantLock +import java.util.LinkedList +import java.util.concurrent.RejectedExecutionException +import akka.util.Switch +import java.lang.ref.WeakReference +import scala.annotation.tailrec + +/* + * Locking rules: + * + * While not suspended, messages are processed (!isActive) or queued + * thread-locally (isActive). While suspended, messages are queued + * thread-locally. When resuming, all messages are atomically scooped from all + * non-active threads and queued on the resuming thread's queue, to be + * processed immediately. Processing a queue checks suspend before each + * invocation, leaving the active state if suspended. For this to work + * reliably, the active flag needs to be set atomically with the initial check + * for suspend. Scooping up messages means replacing the ThreadLocal's contents + * with an empty new NestingQueue. + * + * All accesses to the queue must be done under the suspended-switch's lock, so + * within one of its methods taking a closure argument. + */ + +object CallingThreadDispatcher { + private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() + + // we have to forget about long-gone threads sometime + private def gc { + queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + } + + def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { + if (queues contains mbox) { + val newSet = queues(mbox) + new WeakReference(q) + queues += mbox -> newSet + } else { + queues += mbox -> Set(new WeakReference(q)) + } + gc + } + + /* + * This method must be called with "own" being this thread's queue for the + * given mailbox. When this method returns, the queue will be entered + * (active). + */ + def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { + if (!own.isActive) own.enter + if (queues contains mbox) { + for { + ref <- queues(mbox) + q = ref.get + if (q ne null) && !q.isActive + /* + * if q.isActive was false, then it cannot change to true while we are + * holding the mbox.suspende.switch's lock under which we are currently + * executing + */ + } { + while (q.peek ne null) { + own.push(q.pop) + } + } + } + } +} + +/** + * Dispatcher which runs invocations on the current thread only. This + * dispatcher does not create any new threads, but it can be used from + * different threads concurrently for the same actor. The dispatch strategy is + * to run on the current thread unless the target actor is either suspended or + * already running on the current thread (if it is running on a different + * thread, then this thread will block until that other invocation is + * finished); if the invocation is not run, it is queued in a thread-local + * queue to be executed once the active invocation further up the call stack + * finishes. This leads to completely deterministic execution order if only one + * thread is used. + * + * Suspending and resuming are global actions for one actor, meaning they can + * affect different threads, which leads to complications. If messages are + * queued (thread-locally) during the suspended period, the only thread to run + * them upon resume is the thread actually calling the resume method. Hence, + * all thread-local queues which are not currently being drained (possible, + * since suspend-queue-resume might happen entirely during an invocation on a + * different thread) are scooped up into the current thread-local queue which + * is then executed. It is possible to suspend an actor from within its call + * stack. + * + * @author Roland Kuhn + * @since 1.1 + */ +class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { + import CallingThreadDispatcher._ + + private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox + + private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] + + private[akka] override def start {} + + private[akka] override def shutdown {} + + private[akka] override def timeoutMs = 100L + + override def suspend(actor: ActorRef) { + getMailbox(actor).suspended.switchOn + } + + override def resume(actor: ActorRef) { + val mbox = getMailbox(actor) + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspended.switchOff { + gatherFromAllInactiveQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + } + + override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size + + private[akka] override def dispatch(handle: MessageInvocation) { + val mbox = getMailbox(handle.receiver) + val queue = mbox.queue + val execute = mbox.suspended.ifElseYield { + queue.push(handle) + if (warnings && handle.senderFuture.isDefined) { + EventHandler.warning(this, "suspended, creating Future could deadlock; target: %s" format handle.receiver) + } + false + } { + queue.push(handle) + if (queue.isActive) { + if (warnings && handle.senderFuture.isDefined) { + EventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver) + } + false + } else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + } + + private[akka] override def executeFuture(invocation: FutureInvocation) { invocation.run } + + /* + * This method must be called with this thread's queue, which must already + * have been entered (active). When this method returns, the queue will be + * inactive. + * + * If the catch block is executed, then a non-empty mailbox may be stalled as + * there is no-one who cares to execute it before the next message is sent or + * it is suspended and resumed. + */ + @tailrec private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { + assert(queue.isActive) + mbox.lock.lock + val recurse = try { + val handle = mbox.suspended.ifElseYield[MessageInvocation] { + queue.leave + null + } { + val ret = queue.pop + if (ret eq null) queue.leave + ret + } + if (handle ne null) { + try { + handle.invoke + val f = handle.senderFuture + if (warnings && f.isDefined && !f.get.isCompleted) { + EventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message)) + } + } catch { + case _ => queue.leave + } + true + } else if (queue.isActive) { + queue.leave + false + } else false + } finally { + mbox.lock.unlock + } + if (recurse) { + runQueue(mbox, queue) + } + } +} + +class NestingQueue { + private var q = new LinkedList[MessageInvocation]() + def size = q.size + def push(handle : MessageInvocation) { q.offer(handle) } + def peek = q.peek + def pop = q.poll + + @volatile private var active = false + def enter { if (active) error("already active") else active = true } + def leave { if (!active) error("not active") else active = false } + def isActive = active +} + +class CallingThreadMailbox { + + private val q = new ThreadLocal[NestingQueue]() { + override def initialValue = new NestingQueue + } + + def queue = q.get + + val lock = new ReentrantLock + + val suspended = new Switch(false) +} diff --git a/akka-actor/src/main/scala/akka/util/TestKit.scala b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala similarity index 99% rename from akka-actor/src/main/scala/akka/util/TestKit.scala rename to akka-testkit/src/main/scala/akka/testkit/TestKit.scala index bb400ff992..a2d26ac4a8 100644 --- a/akka-actor/src/main/scala/akka/util/TestKit.scala +++ b/akka-testkit/src/main/scala/akka/testkit/TestKit.scala @@ -1,8 +1,9 @@ -package akka.util +package akka.testkit import akka.actor.{Actor, FSM} import Actor._ -import duration._ +import akka.util.Duration +import akka.util.duration._ import java.util.concurrent.{BlockingDeque, LinkedBlockingDeque, TimeUnit} diff --git a/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala new file mode 100644 index 0000000000..22e16abdd9 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/CallingThreadDispatcherModelSpec.scala @@ -0,0 +1,32 @@ +package akka.testkit + +import akka.actor.dispatch.ActorModelSpec +import java.util.concurrent.CountDownLatch + +class CallingThreadDispatcherModelSpec extends ActorModelSpec { + import ActorModelSpec._ + def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor + + override def dispatcherShouldProcessMessagesInParallel {} + + override def dispatcherShouldHandleWavesOfActors { + implicit val dispatcher = newInterceptedDispatcher + + def flood(num: Int) { + val cachedMessage = CountDownNStop(new CountDownLatch(num)) + val keeper = newTestActor.start + (1 to num) foreach { + _ => newTestActor.start ! cachedMessage + } + keeper.stop + assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns") + } + for(run <- 1 to 3) { + flood(10000) + await(dispatcher.stops.get == run)(withinMs = 10000) + assertDispatcher(dispatcher)(starts = run, stops = run) + } + } +} + +// vim: set ts=4 sw=4 et: diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 709f55bc6b..e094bb1bf0 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -177,6 +177,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { // ------------------------------------------------------------------------------------------------------------------- lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_)) + lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor) lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor) lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm) lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor) @@ -288,9 +289,19 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val scalatest = Dependencies.scalatest val multiverse_test = Dependencies.multiverse_test // StandardLatch - override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2") + override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2") + + // some tests depend on testkit, so include that and make sure it's compiled + override def testClasspath = super.testClasspath +++ akka_testkit.path("target") / "classes" + override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile) } + // ------------------------------------------------------------------------------------------------------------------- + // akka-testkit subproject + // ------------------------------------------------------------------------------------------------------------------- + + class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) + // ------------------------------------------------------------------------------------------------------------------- // akka-stm subproject // -------------------------------------------------------------------------------------------------------------------