Merge branch 'wip-CallingThreadDispatcher'

Conflicts:
	akka-actor/src/test/scala/akka/dispatch/ActorModelSpec.scala
	project/build/AkkaProject.scala
both resolved by "union" approach

- change wavesOfActors test for CTD: scheduling SHUTDOWN 10000 times does not
  work so well...
- add executeFuture with trivial implementation, TBC
This commit is contained in:
Roland Kuhn 2011-03-17 22:18:39 +01:00
commit 06049642d5
8 changed files with 364 additions and 39 deletions

1
.gitignore vendored
View file

@ -44,3 +44,4 @@ run-codefellow
.scala_dependencies
multiverse.log
.eprj
.*.swp

View file

@ -164,6 +164,34 @@ class Switch(startAsOn: Boolean = false) {
} else false
}
def whileOnYield[T](action: => T): Option[T] = synchronized {
if (switch.get) Some(action)
else None
}
def whileOffYield[T](action: => T): Option[T] = synchronized {
if (!switch.get) Some(action)
else None
}
def whileOn(action: => Unit): Boolean = synchronized {
if (switch.get) {
action
true
} else false
}
def whileOff(action: => Unit): Boolean = synchronized {
if (switch.get) {
action
true
} else false
}
def ifElseYield[T](on: => T)(off: => T) = synchronized {
if (switch.get) on else off
}
def isOn = switch.get
def isOff = !isOn
}

View file

@ -1,6 +1,6 @@
package akka.actor
import akka.util.TestKit
import akka.testkit.TestKit
import akka.util.duration._
import org.scalatest.WordSpec
@ -16,7 +16,7 @@ class FSMTimingSpec
val fsm = Actor.actorOf(new StateMachine(testActor)).start
fsm ! SubscribeTransitionCallBack(testActor)
expectMsg(100 millis, CurrentState(fsm, Initial))
expectMsg(200 millis, CurrentState(fsm, Initial))
ignoreMsg {
case Transition(_, Initial, _) => true
@ -43,12 +43,11 @@ class FSMTimingSpec
"receive and cancel a repeated timer" in {
fsm ! TestRepeatedTimer
val seq = receiveWhile(550 millis) {
val seq = receiveWhile(600 millis) {
case Tick => Tick
}
seq must have length (5)
within(250 millis) {
fsm ! Cancel
expectMsg(Transition(fsm, TestRepeatedTimer, Initial))
expectNoMsg
}
@ -95,17 +94,17 @@ object FSMTimingSpec {
case class Unhandled(msg : AnyRef)
class StateMachine(tester : ActorRef) extends Actor with FSM[State, Unit] {
class StateMachine(tester : ActorRef) extends Actor with FSM[State, Int] {
import FSM._
startWith(Initial, ())
startWith(Initial, 0)
when(Initial) {
case Ev(TestSingleTimer) =>
setTimer("tester", Tick, 100 millis, false)
goto(TestSingleTimer)
case Ev(TestRepeatedTimer) =>
setTimer("tester", Tick, 100 millis, true)
goto(TestRepeatedTimer)
goto(TestRepeatedTimer) using 4
case Ev(x : FSMTimingSpec.State) => goto(x)
}
when(TestStateTimeout, stateTimeout = 100 millis) {
@ -117,12 +116,14 @@ object FSMTimingSpec {
goto(Initial)
}
when(TestRepeatedTimer) {
case Ev(Tick) =>
case Event(Tick, remaining) =>
tester ! Tick
stay
case Ev(Cancel) =>
if (remaining == 0) {
cancelTimer("tester")
goto(Initial)
} else {
stay using (remaining - 1)
}
}
when(TestUnhandled) {
case Ev(SetHandler) =>

View file

@ -12,7 +12,7 @@ import akka.actor.Actor._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent. {ConcurrentHashMap, CountDownLatch, TimeUnit}
import akka.actor.dispatch.ActorModelSpec.MessageDispatcherInterceptor
import akka.util.Duration
import akka.util.{Duration, Switch}
object ActorModelSpec {
@ -25,6 +25,8 @@ object ActorModelSpec {
case class Await(latch: CountDownLatch) extends ActorModelMessage
case class Meet(acknowledge: CountDownLatch, waitFor: CountDownLatch) extends ActorModelMessage
case class CountDownNStop(latch: CountDownLatch) extends ActorModelMessage
case class Wait(time: Long) extends ActorModelMessage
case class WaitAck(time: Long, latch: CountDownLatch) extends ActorModelMessage
case object Restart extends ActorModelMessage
val Ping = "Ping"
@ -33,22 +35,32 @@ object ActorModelSpec {
class DispatcherActor(dispatcher: MessageDispatcherInterceptor) extends Actor {
self.dispatcher = dispatcher.asInstanceOf[MessageDispatcher]
def ack { dispatcher.getStats(self).msgsProcessed.incrementAndGet() }
private val busy = new Switch(false)
def ack {
if (!busy.switchOn()) {
throw new Exception("isolation violated")
} else {
dispatcher.getStats(self).msgsProcessed.incrementAndGet()
}
}
override def postRestart(reason: Throwable) {
dispatcher.getStats(self).restarts.incrementAndGet()
}
def receive = {
case Await(latch) => ack; latch.await()
case Meet(sign, wait) => ack; sign.countDown(); wait.await()
case Reply(msg) => ack; self.reply(msg)
case Reply_?(msg) => ack; self.reply_?(msg)
case Forward(to,msg) => ack; to.forward(msg)
case CountDown(latch) => ack; latch.countDown()
case Increment(count) => ack; count.incrementAndGet()
case CountDownNStop(l)=> ack; l.countDown; self.stop
case Restart => ack; throw new Exception("Restart requested")
case Await(latch) => ack; latch.await(); busy.switchOff()
case Meet(sign, wait) => ack; sign.countDown(); wait.await(); busy.switchOff()
case Wait(time) => ack; Thread.sleep(time); busy.switchOff()
case WaitAck(time, l) => ack; Thread.sleep(time); l.countDown; busy.switchOff()
case Reply(msg) => ack; self.reply(msg); busy.switchOff()
case Reply_?(msg) => ack; self.reply_?(msg); busy.switchOff()
case Forward(to,msg) => ack; to.forward(msg); busy.switchOff()
case CountDown(latch) => ack; latch.countDown(); busy.switchOff()
case Increment(count) => ack; count.incrementAndGet(); busy.switchOff()
case CountDownNStop(l)=> ack; l.countDown; self.stop; busy.switchOff()
case Restart => ack; busy.switchOff(); throw new Exception("Restart requested")
}
}
@ -208,29 +220,43 @@ abstract class ActorModelSpec extends JUnitSuite {
@Test def dispatcherShouldProcessMessagesOneAtATime {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
val start,step1,step2,oneAtATime = new CountDownLatch(1)
val start,oneAtATime = new CountDownLatch(1)
a.start
a ! CountDown(start)
assertCountDown(start,3000, "Should process first message within 3 seconds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 1, msgsProcessed = 1)
a ! Meet(step1,step2)
assertCountDown(step1,3000, "Didn't process the Meet message in 3 seocnds")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 2, msgsProcessed = 2)
a ! Wait(1000)
a ! CountDown(oneAtATime)
assertNoCountDown(oneAtATime,500,"Processed message when not allowed to")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 2)
step2.countDown()
assertCountDown(oneAtATime,500,"Processed message when allowed")
// in case of serialization violation, restart would happen instead of count down
assertCountDown(oneAtATime,1500,"Processed message when allowed")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 3, msgsProcessed = 3)
a.stop
assertRefDefaultZero(a)(registers = 1, unregisters = 1, msgsReceived = 3, msgsProcessed = 3)
}
@Test def dispatcherShouldHandleQueueingFromMultipleThreads {
implicit val dispatcher = newInterceptedDispatcher
val a = newTestActor
val counter = new CountDownLatch(200)
a.start
def start = spawn { for (i <- 1 to 20) { a ! WaitAck(1, counter) } }
for (i <- 1 to 10) { start }
assertCountDown(counter, 3000, "Should process 200 messages")
assertRefDefaultZero(a)(registers = 1, msgsReceived = 200, msgsProcessed = 200)
a.stop
}
def spawn(f : => Unit) = {
val thread = new Thread { override def run { f } }
thread.start
thread
}
@Test def dispatcherShouldProcessMessagesInParallel: Unit = {
implicit val dispatcher = newInterceptedDispatcher
val a, b = newTestActor.start
@ -304,5 +330,6 @@ class ExecutorBasedEventDrivenDispatcherModelTest extends ActorModelSpec {
}
class ExecutorBasedEventDrivenWorkStealingDispatcherModelTest extends ActorModelSpec {
def newInterceptedDispatcher = new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor
def newInterceptedDispatcher =
new ExecutorBasedEventDrivenWorkStealingDispatcher("foo") with MessageDispatcherInterceptor
}

View file

@ -0,0 +1,224 @@
package akka.testkit
import akka.actor.{ActorRef, EventHandler}
import akka.dispatch.{MessageDispatcher, MessageInvocation, FutureInvocation}
import java.util.concurrent.locks.ReentrantLock
import java.util.LinkedList
import java.util.concurrent.RejectedExecutionException
import akka.util.Switch
import java.lang.ref.WeakReference
import scala.annotation.tailrec
/*
* Locking rules:
*
* While not suspended, messages are processed (!isActive) or queued
* thread-locally (isActive). While suspended, messages are queued
* thread-locally. When resuming, all messages are atomically scooped from all
* non-active threads and queued on the resuming thread's queue, to be
* processed immediately. Processing a queue checks suspend before each
* invocation, leaving the active state if suspended. For this to work
* reliably, the active flag needs to be set atomically with the initial check
* for suspend. Scooping up messages means replacing the ThreadLocal's contents
* with an empty new NestingQueue.
*
* All accesses to the queue must be done under the suspended-switch's lock, so
* within one of its methods taking a closure argument.
*/
object CallingThreadDispatcher {
private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]()
// we have to forget about long-gone threads sometime
private def gc {
queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty)
}
def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized {
if (queues contains mbox) {
val newSet = queues(mbox) + new WeakReference(q)
queues += mbox -> newSet
} else {
queues += mbox -> Set(new WeakReference(q))
}
gc
}
/*
* This method must be called with "own" being this thread's queue for the
* given mailbox. When this method returns, the queue will be entered
* (active).
*/
def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized {
if (!own.isActive) own.enter
if (queues contains mbox) {
for {
ref <- queues(mbox)
q = ref.get
if (q ne null) && !q.isActive
/*
* if q.isActive was false, then it cannot change to true while we are
* holding the mbox.suspende.switch's lock under which we are currently
* executing
*/
} {
while (q.peek ne null) {
own.push(q.pop)
}
}
}
}
}
/**
* Dispatcher which runs invocations on the current thread only. This
* dispatcher does not create any new threads, but it can be used from
* different threads concurrently for the same actor. The dispatch strategy is
* to run on the current thread unless the target actor is either suspended or
* already running on the current thread (if it is running on a different
* thread, then this thread will block until that other invocation is
* finished); if the invocation is not run, it is queued in a thread-local
* queue to be executed once the active invocation further up the call stack
* finishes. This leads to completely deterministic execution order if only one
* thread is used.
*
* Suspending and resuming are global actions for one actor, meaning they can
* affect different threads, which leads to complications. If messages are
* queued (thread-locally) during the suspended period, the only thread to run
* them upon resume is the thread actually calling the resume method. Hence,
* all thread-local queues which are not currently being drained (possible,
* since suspend-queue-resume might happen entirely during an invocation on a
* different thread) are scooped up into the current thread-local queue which
* is then executed. It is possible to suspend an actor from within its call
* stack.
*
* @author Roland Kuhn
* @since 1.1
*/
class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher {
import CallingThreadDispatcher._
private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox
private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
private[akka] override def start {}
private[akka] override def shutdown {}
private[akka] override def timeoutMs = 100L
override def suspend(actor: ActorRef) {
getMailbox(actor).suspended.switchOn
}
override def resume(actor: ActorRef) {
val mbox = getMailbox(actor)
val queue = mbox.queue
val wasActive = queue.isActive
val switched = mbox.suspended.switchOff {
gatherFromAllInactiveQueues(mbox, queue)
}
if (switched && !wasActive) {
runQueue(mbox, queue)
}
}
override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size
private[akka] override def dispatch(handle: MessageInvocation) {
val mbox = getMailbox(handle.receiver)
val queue = mbox.queue
val execute = mbox.suspended.ifElseYield {
queue.push(handle)
if (warnings && handle.senderFuture.isDefined) {
EventHandler.warning(this, "suspended, creating Future could deadlock; target: %s" format handle.receiver)
}
false
} {
queue.push(handle)
if (queue.isActive) {
if (warnings && handle.senderFuture.isDefined) {
EventHandler.warning(this, "blocked on this thread, creating Future could deadlock; target: %s" format handle.receiver)
}
false
} else {
queue.enter
true
}
}
if (execute) runQueue(mbox, queue)
}
private[akka] override def executeFuture(invocation: FutureInvocation) { invocation.run }
/*
* This method must be called with this thread's queue, which must already
* have been entered (active). When this method returns, the queue will be
* inactive.
*
* If the catch block is executed, then a non-empty mailbox may be stalled as
* there is no-one who cares to execute it before the next message is sent or
* it is suspended and resumed.
*/
@tailrec private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) {
assert(queue.isActive)
mbox.lock.lock
val recurse = try {
val handle = mbox.suspended.ifElseYield[MessageInvocation] {
queue.leave
null
} {
val ret = queue.pop
if (ret eq null) queue.leave
ret
}
if (handle ne null) {
try {
handle.invoke
val f = handle.senderFuture
if (warnings && f.isDefined && !f.get.isCompleted) {
EventHandler.warning(this, "calling %s with message %s did not reply as expected, might deadlock" format (handle.receiver, handle.message))
}
} catch {
case _ => queue.leave
}
true
} else if (queue.isActive) {
queue.leave
false
} else false
} finally {
mbox.lock.unlock
}
if (recurse) {
runQueue(mbox, queue)
}
}
}
class NestingQueue {
private var q = new LinkedList[MessageInvocation]()
def size = q.size
def push(handle : MessageInvocation) { q.offer(handle) }
def peek = q.peek
def pop = q.poll
@volatile private var active = false
def enter { if (active) error("already active") else active = true }
def leave { if (!active) error("not active") else active = false }
def isActive = active
}
class CallingThreadMailbox {
private val q = new ThreadLocal[NestingQueue]() {
override def initialValue = new NestingQueue
}
def queue = q.get
val lock = new ReentrantLock
val suspended = new Switch(false)
}

View file

@ -1,8 +1,9 @@
package akka.util
package akka.testkit
import akka.actor.{Actor, FSM}
import Actor._
import duration._
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.{BlockingDeque, LinkedBlockingDeque, TimeUnit}

View file

@ -0,0 +1,32 @@
package akka.testkit
import akka.actor.dispatch.ActorModelSpec
import java.util.concurrent.CountDownLatch
class CallingThreadDispatcherModelSpec extends ActorModelSpec {
import ActorModelSpec._
def newInterceptedDispatcher = new CallingThreadDispatcher with MessageDispatcherInterceptor
override def dispatcherShouldProcessMessagesInParallel {}
override def dispatcherShouldHandleWavesOfActors {
implicit val dispatcher = newInterceptedDispatcher
def flood(num: Int) {
val cachedMessage = CountDownNStop(new CountDownLatch(num))
val keeper = newTestActor.start
(1 to num) foreach {
_ => newTestActor.start ! cachedMessage
}
keeper.stop
assertCountDown(cachedMessage.latch,10000, "Should process " + num + " countdowns")
}
for(run <- 1 to 3) {
flood(10000)
await(dispatcher.stops.get == run)(withinMs = 10000)
assertDispatcher(dispatcher)(starts = run, stops = run)
}
}
}
// vim: set ts=4 sw=4 et:

View file

@ -177,6 +177,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
// -------------------------------------------------------------------------------------------------------------------
lazy val akka_actor = project("akka-actor", "akka-actor", new AkkaActorProject(_))
lazy val akka_testkit = project("akka-testkit", "akka-testkit", new AkkaTestkitProject(_), akka_actor)
lazy val akka_stm = project("akka-stm", "akka-stm", new AkkaStmProject(_), akka_actor)
lazy val akka_typed_actor = project("akka-typed-actor", "akka-typed-actor", new AkkaTypedActorProject(_), akka_stm)
lazy val akka_remote = project("akka-remote", "akka-remote", new AkkaRemoteProject(_), akka_typed_actor)
@ -289,8 +290,18 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val multiverse_test = Dependencies.multiverse_test // StandardLatch
override def bndExportPackage = super.bndExportPackage ++ Seq("com.eaio.*;version=3.2")
// some tests depend on testkit, so include that and make sure it's compiled
override def testClasspath = super.testClasspath +++ akka_testkit.path("target") / "classes"
override def testCompileAction = super.testCompileAction dependsOn (akka_testkit.compile)
}
// -------------------------------------------------------------------------------------------------------------------
// akka-testkit subproject
// -------------------------------------------------------------------------------------------------------------------
class AkkaTestkitProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath)
// -------------------------------------------------------------------------------------------------------------------
// akka-stm subproject
// -------------------------------------------------------------------------------------------------------------------