From 0e66cd0d8cf87e443e4e41762fcf245f9f1d2d8e Mon Sep 17 00:00:00 2001 From: Roland Kuhn Date: Sun, 6 Mar 2011 22:45:44 +0100 Subject: [PATCH] add locking to CTD-mbox Up to now it relied on the ActorRef's lock, but that has the side effect that a large time may pass between the suspend test and the actual execution. With this lock in place, the ActorRef lock should never block and the time between suspend and the last actor execution is shortened to some cycles (modulo GC). --- .../.CallingThreadDispatcher.scala.swp | Bin 0 -> 24576 bytes .../testkit/CallingThreadDispatcher.scala | 433 +++++++++--------- 2 files changed, 224 insertions(+), 209 deletions(-) create mode 100644 akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp diff --git a/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp b/akka-testkit/src/main/scala/akka/testkit/.CallingThreadDispatcher.scala.swp new file mode 100644 index 0000000000000000000000000000000000000000..4e1f044a0fb7e283aded59dfdb92e82d6b6b2fcf GIT binary patch literal 24576 zcmYc?2=nw+FxN9?U|?VnU|x5+{Jc>U0B~2`FQz11r=| z%+5{(6S^g-#UDBY~f>I*vQAgFrSZsVGbVyLpUD;g9sl3!%sEG_x`=Sg28OMXZ!isAf>m(q~Z6QgA5F z%u7yHFw`>y^Abx-GV+TQg7R|`^HLPNL2Eq}v_LY&C5g$|dLUM2u|i2kYMw%BMQUmbnUW!6uoU4ARlM{0k3QJQVQY6>|0rDf)27H6au>nVg}WELx+x-PW@9IdG(sYSUUbtRd}(1Z$( z*`k!xB8AK}1#m#*=cR(I0EZ^Xdr7HKrKu?j`JhYz3dqFdlFYJH1*GJWR$5e&ky->w zoFE0Dl&k>G9U#M?j!V(B23Z4=N79*DtN=;7qmY7qV3XS5-5{2Z%JWv8j&PdEl2bZIuVjPqUb5fxO z!;RC|f-^JI6f_DI@=_J@N^^2F71Y!e6cN_IRTV1ODio!r>7}QZz{QGE(-drUA*E=s z1}N+_p>m+ySq)}`O$QaT#h?f(NzBYERsgF6)gcNBpk%J0m|va;_n4*vh?kmIl3K)| zppcT9rjVWpN&#*~`MHicIi7itEENbg6Ktu16>9zFo0yrClwY9(GDyKn!7sJABr`7^ ztU*)3N+C2avqZsGp|~zP+lxitxKy1*|19?D8Aw9D!HBTWIVyK=% zIH>Xh=O;+Xs!)_#Qd*Q(42}j!SU|H_ssh*-sVNY1z{L$zPi|>(i9%8;xV%nH0oQB_ zO8Mn^O3+#WY(6Bvsux2I1%+uzMk<4Xg1#05gMvaW$XJE+WH1HcC_t1JD`?w-16e`W zP9Zq8L?bV?Tp>I)F*_&~Tw5lmY7}ZhqN-LQHK#Zg;(rBf+VfJ&gW*vQYA!$o6l|eZ zfnz{ZL0bWv9bl*7PXV9^E=o<$EG|hc0!IvG% z7Auq_W`l~uM1|y>{NmCgP=S`Py?V?p(wQ=Covf$2eKNnz}E-V)KiBPKdE^o#h}`wC^aWDF)61Kl4&9NT_G(e zFwHdMH$Qi zP=kw0Kq)3O4c4v&HG19hi=ee~zCw9^5!jf5qI__`rYL0Q=BB1(CW2ZcdI|v$VUVGT zkaPmExfo`iLQ-m4eo?AIY9gc^o0p%b3$+@S>_E*aa5RD1CZJXlql_#LFq5a1PG^sRPw<(8PeD77>IRif{`Q@}c<)WC+L{pZsKS8ZRo% zNiDWw0F|eixdr(}B?{mr;CkSC6x^EED@n}EDN0RdfXZhjmL=-tB<7{-fht3k+z3+w zT8pk%T9TQg7YuGcz{J5?!QzOPie6AED8HtrxPp5_nfZCH70Iaupne3VHlNJA?9>#W z%;FMEIV2-NxvN+&C^a>&q$n}31mstkda%<{pbOshsxkU?IM^V_*p8V_@*-V_@*(V_;C_V_;z6 zV_ag z&BeeF#l^s2$;H5+%f-MT%f-OJ!^Obxjgx`lIVS_dT}}putDFoB>o^%0R&g>gtmI^1 zSi;G`5YEZKz{SbH@SB5y;Ts17!!iyAhA<8W1``ej20ac2hMVjR4Exv_7*?}G!X}5E zfx(!afkB0xfkA?uf#DY$1H)xD28N?-3=F&37#QZWF)&PIV_>LcV_*noV_*%D@oB%D~{k%D`aF%D^DO%D}+F%E0iAg@NH63j@PF76yjnEDQ_>Sr{1B zu`n=9VqswDfsQA%LB|vdSQr?rSQr@8SQr>yGczz;XJ%kH!py*Mn3;iL4l@HoC^G|t z05b!_4<-hNZ%hmfUzr#fZZI)0TxVinSj@!0P{hQ*kjKQpkjuot5Xr>A5W&R2V9dn8 zU<4hD;9+850JTd%tp<30Rj>t@@Q|#b0dBx*g6cw$B2dbO%A+J-4NyB4tP0dx02gDR z>Ks&MgS3L|P*BJ(OD!tOOo7zQ&=MM4&qGZ_u2UEk6cBZ9jsmD{3u?X~)fcE$tA?gR zH3I{dp%iG{43Y)80^~ecsiR;E>ceD~ff{Te@tjmp1ECty;DwaVnhL2!MfpVDHBW0(U9La;FoW%)UYC7^N?5d=k0 zr^4(5g(}Fpg8Tw-Ko{hLhFidGo`TfWY=~HDYBsciUQk+`p^=f8my(mJV1-$PXev}I z6zb)JMrxp{G{G(b*-)HWl?pZv#6XFbB6zeQ%Drf4iG?VhKy4mKydcLfEIvSS0&6HM zfcmhA7N!oQS^+ibAf1q+)a25l;#5#fgZgj~tqR~~v>qs7>Xqh!C~%XuRv|4j4?N%k z>Es~nP=G`jSQW?^aAMU1$30XG+Tw<{ijg`#FvAqU<8M&)$Yv`jBqtW9D#R<;+CiL; zVzPoltwJ(rD-*a?3bGx9VIBdo6%=yv)AfpT(oC}SK*N3-N}!evs1Z@s zkWhxW0OWCy-Jl^yP-inuFSFPsH4QxI1?qc(yO?^J#m=D7qSTVq6j+Qvw1ditGzD9j zjm4>;hNfF-Nof%%qe0Xtz+^$~vTTHSNl_&{Xh6<@xEkK-hs&V`c@ez(12P??uPC)d zA+-?NU&f4KkTQ@nLCT=rB)x+C0z`ZwS)>3q1ug~cfuTez$S9~wA-y{I07Z&kCc-Xo z>Q#UZ;(~^7D-n_i7DO3D8j{{Xy>Uni2Sp~>DT&3!sYN9k5U;@sG*BcuK$}DG;uDmI zi%Rnl`3iT>4b)-B=)HlR0PA=|2DK1X1*p*qZf}Bz?i3WXKn>TTRM4nUUcN4)>F{rVgms(K*>n4D77K28ML1RcD^E}f)MnZE9WK1j*l+YnV z)8O6#XlxDCpoa8Lpu+$Pxrvpa{(EsrVopwKib7&BDBQqfDq0Ge&>;%w_yM?Mn3M_` zH%=`AjqO6pXmGIqZ4I*F}IDn1Ag8OICS^{kl z0hFkrp{I$bm_*JO$i)h%ZUH$Dl*(X(2zubE3N8vQ^}x{$RR*s6;R=w=hAM#NVpypH zN`Npa%#sWod*Gp01*AaGQAjRIO$3eqC_oAch2;FwoD>CE*<}qKZ?saVu2s@Oat$a* zkc%lu4%9;~Zx96+auEg6h?a4X@;gKZsltSLsuoERYRDjmG|2lqR^pfJD4N&(g%(nNI) zxU;0G2X?d`xP1d3lY+%5O3{hYB!LW=q^5wXNyYL+=*TZDvBAtBG(3u8EGXRI#$rSP zM8-cY4VoB1s^Rv6nrWai9Nu7oM+Df1#KsXMR2a}2?2uv}GeE0B;ScdCQanST}w!1Nn%k6sxol9 z8r^rOsS502J!sO?ODy)xD=taQOHTDq!#Y+NOTZD}GBFjFYrux1+6U?lp^PCy)q>20 zj)b5{BZe6D;=%oYcW7T3wEth3fq~&0KLf*6eg=l!{0s~m`572i@-r|j=4W7-&(FXx zm!E-Q4nG6K41NZNAbti0Zhi)aJ$wudxqJ)^%6tqAcX=5YCh;;bxbiYEeC1(ac*MiN zaG!^PVImI$gBuS6!*^~5hE?1Q3~}5H3<9A3Jr@H*DHj7nHWvd!1{VWEAQuCJGZzDc zJr|_^KJ->pjJkL<1V%$(Gz3ONU^E0qLtr!n21N)!hFx(q14maDj;<_(%^Z)eEX*y) z8C_WjUm%+aUV;G~i3ANd!d47|hHIfq_=eWX!jOzq1=whkLUBn^Vo7Rxr2@pCF!9tP zg_QhM$P!LS|63t3uM#{N3RxLZtfP=rS^^rqPEO1N4K9P0tb$iQgO)5Hdf1=^eaRW1 za4SYyS6Yl?WhH2-DrBiEgT9s)WI`4)o&q6}$6a9}pixH9B0TUgWg%$p0yL5a5>)_M zo|CGeQK$!=2!yX1gbnP1hA~0&|IM5X4Aoqa`G3%!{=NJR49lT?{2qP=hCJx{e@lJ_ z27P`824Q{%23CFshPQkS49EEx7&@Ty{Xcja7@k4r{P*xOFs$cgVCdmxV5s0_V94TS zUeoS0Esr{4Gz3ONU^E0qLtr!nMnhmU1V%^*fZDFLwIC8S zX$hV8OHF|;{v&C*A!w2dw5|mg<(SP|B7gDhehU1$hOb0Cc9N#UQ> zr*NSmWPK>=5)o7$INhQwA%m{I0WIf%NW#~2K_~h_eLIv@JP;3ogh$sLg4dypt~ms6 T$xcnt%S5cx!LlA#p_%~z0}J%8 literal 0 HcmV?d00001 diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 3a96e3ae4f..8b106ac1cd 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -1,209 +1,224 @@ -package akka.testkit - -import akka.actor.ActorRef -import akka.dispatch.{MessageDispatcher, MessageInvocation} -import java.util.concurrent.locks.ReentrantLock -import java.util.LinkedList -import java.util.concurrent.RejectedExecutionException -import akka.util.Switch -import java.lang.ref.WeakReference -import scala.annotation.tailrec - -/* - * Locking rules: - * - * While not suspended, messages are processed (!isActive) or queued - * thread-locally (isActive). While suspended, messages are queued - * thread-locally. When resuming, all messages are atomically scooped from all - * non-active threads and queued on the resuming thread's queue, to be - * processed immediately. Processing a queue checks suspend before each - * invocation, leaving the active state if suspended. For this to work - * reliably, the active flag needs to be set atomically with the initial check - * for suspend. Scooping up messages means replacing the ThreadLocal's contents - * with an empty new NestingQueue. - * - * All accesses to the queue must be done under the suspended-switch's lock, so - * within one of its methods taking a closure argument. - */ - -object CallingThreadDispatcher { - private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() - - // we have to forget about long-gone threads sometime - private def gc { - queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) - } - - def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { - if (queues contains mbox) { - val newSet = queues(mbox) + new WeakReference(q) - queues += mbox -> newSet - } else { - queues += mbox -> Set(new WeakReference(q)) - } - gc - } - - /* - * This method must be called with "own" being this thread's queue for the - * given mailbox. When this method returns, the queue will be entere - * (active). - */ - def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { - if (!own.isActive) own.enter - if (queues contains mbox) { - for { - ref <- queues(mbox) - q = ref.get - if (q ne null) && !q.isActive - } { - while (q.peek ne null) { - own.push(q.pop) - } - } - } - } -} - -/** - * Dispatcher which runs invocations on the current thread only. This - * dispatcher does not create any new threads, but it can be used from - * different threads concurrently for the same actor. The dispatch strategy is - * to run on the current thread unless the target actor is either suspended or - * already running on the current thread (if it is running on a different - * thread, then this thread will block until that other invocation is - * finished); if the invocation is not run, it is queued in a thread-local - * queue to be executed once the active invocation further up the call stack - * finishes. This leads to completely deterministic execution order if only one - * thread is used. - * - * Suspending and resuming are global actions for one actor, meaning they can - * affect different threads, which leads to complications. If messages are - * queued (thread-locally) during the suspended period, the only thread to run - * them upon resume is the thread actually calling the resume method. Hence, - * all thread-local queues which are not currently being drained (possible, - * since suspend-queue-resume might happen entirely during an invocation on a - * different thread) are scooped up into the current thread-local queue which - * is then executed. It is possible to suspend an actor from within its call - * stack. - * - * @author Roland Kuhn - * @since 1.1 - */ -class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { - import CallingThreadDispatcher._ - - private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox - - private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] - - private[akka] override def start {} - - private[akka] override def shutdown {} - - private[akka] override def timeoutMs = 100L - - override def suspend(actor: ActorRef) { - getMailbox(actor).suspended.switchOn - } - - override def resume(actor: ActorRef) { - val mbox = getMailbox(actor) - val queue = mbox.queue - val wasActive = queue.isActive - val switched = mbox.suspended.switchOff { - gatherFromAllInactiveQueues(mbox, queue) - } - if (switched && !wasActive) { - runQueue(mbox, queue) - } - } - - override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size - - private[akka] override def dispatch(handle: MessageInvocation) { - val mbox = getMailbox(handle.receiver) - val queue = mbox.queue - val execute = mbox.suspended.ifElseYield { - queue.push(handle) - if (warnings && handle.senderFuture.isDefined) { - log.slf4j.warn("suspended, creating Future could deadlock; target: {}", - handle.receiver) - } - false - } { - queue.push(handle) - if (queue.isActive) { - if (warnings && handle.senderFuture.isDefined) { - log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", - handle.receiver) - } - false - } else { - queue.enter - true - } - } - if (execute) runQueue(mbox, queue) - } - - /* - * This method must be called with this thread's queue, which must already - * have been entered (active). When this method returns, the queue will be - * inactive. - * - * If the catch block is executed, then a non-empty mailbox may be stalled as - * there is no-one who cares to execute it before the next message is sent or - * it is suspended and resumed. - */ - private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { - assert(queue.isActive) - val handle = mbox.suspended.ifElseYield[MessageInvocation] { - queue.leave - null - } { - val ret = queue.pop - if (ret eq null) queue.leave - ret - } - if (handle ne null) { - try { - handle.invoke - val f = handle.senderFuture - if (warnings && f.isDefined && !f.get.isCompleted) { - log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) - } - } catch { - case _ => queue.leave - } - runQueue(mbox, queue) - log.info("runQueue") - } else if (queue.isActive) { - queue.leave - } - } -} - -class NestingQueue { - private var q = new LinkedList[MessageInvocation]() - def size = q.size - def push(handle : MessageInvocation) { q.offer(handle) } - def peek = q.peek - def pop = q.poll - - @volatile private var active = false - def enter { if (active) error("already active") else active = true } - def leave { if (!active) error("not active") else active = false } - def isActive = active -} - -class CallingThreadMailbox { - - private val q = new ThreadLocal[NestingQueue]() { - override def initialValue = new NestingQueue - } - - def queue = q.get - - val suspended = new Switch(false) -} +package akka.testkit + +import akka.actor.ActorRef +import akka.dispatch.{MessageDispatcher, MessageInvocation} +import java.util.concurrent.locks.ReentrantLock +import java.util.LinkedList +import java.util.concurrent.RejectedExecutionException +import akka.util.Switch +import java.lang.ref.WeakReference +import scala.annotation.tailrec + +/* + * Locking rules: + * + * While not suspended, messages are processed (!isActive) or queued + * thread-locally (isActive). While suspended, messages are queued + * thread-locally. When resuming, all messages are atomically scooped from all + * non-active threads and queued on the resuming thread's queue, to be + * processed immediately. Processing a queue checks suspend before each + * invocation, leaving the active state if suspended. For this to work + * reliably, the active flag needs to be set atomically with the initial check + * for suspend. Scooping up messages means replacing the ThreadLocal's contents + * with an empty new NestingQueue. + * + * All accesses to the queue must be done under the suspended-switch's lock, so + * within one of its methods taking a closure argument. + */ + +object CallingThreadDispatcher { + private var queues = Map[CallingThreadMailbox, Set[WeakReference[NestingQueue]]]() + + // we have to forget about long-gone threads sometime + private def gc { + queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) + } + + def registerQueue(mbox : CallingThreadMailbox, q : NestingQueue) : Unit = synchronized { + if (queues contains mbox) { + val newSet = queues(mbox) + new WeakReference(q) + queues += mbox -> newSet + } else { + queues += mbox -> Set(new WeakReference(q)) + } + gc + } + + /* + * This method must be called with "own" being this thread's queue for the + * given mailbox. When this method returns, the queue will be entered + * (active). + */ + def gatherFromAllInactiveQueues(mbox : CallingThreadMailbox, own : NestingQueue) : Unit = synchronized { + if (!own.isActive) own.enter + if (queues contains mbox) { + for { + ref <- queues(mbox) + q = ref.get + if (q ne null) && !q.isActive + /* + * if q.isActive was false, then it cannot change to true while we are + * holding the mbox.suspende.switch's lock under which we are currently + * executing + */ + } { + while (q.peek ne null) { + own.push(q.pop) + } + } + } + } +} + +/** + * Dispatcher which runs invocations on the current thread only. This + * dispatcher does not create any new threads, but it can be used from + * different threads concurrently for the same actor. The dispatch strategy is + * to run on the current thread unless the target actor is either suspended or + * already running on the current thread (if it is running on a different + * thread, then this thread will block until that other invocation is + * finished); if the invocation is not run, it is queued in a thread-local + * queue to be executed once the active invocation further up the call stack + * finishes. This leads to completely deterministic execution order if only one + * thread is used. + * + * Suspending and resuming are global actions for one actor, meaning they can + * affect different threads, which leads to complications. If messages are + * queued (thread-locally) during the suspended period, the only thread to run + * them upon resume is the thread actually calling the resume method. Hence, + * all thread-local queues which are not currently being drained (possible, + * since suspend-queue-resume might happen entirely during an invocation on a + * different thread) are scooped up into the current thread-local queue which + * is then executed. It is possible to suspend an actor from within its call + * stack. + * + * @author Roland Kuhn + * @since 1.1 + */ +class CallingThreadDispatcher(val warnings: Boolean = true) extends MessageDispatcher { + import CallingThreadDispatcher._ + + private[akka] override def createMailbox(actor: ActorRef) = new CallingThreadMailbox + + private def getMailbox(actor: ActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] + + private[akka] override def start {} + + private[akka] override def shutdown {} + + private[akka] override def timeoutMs = 100L + + override def suspend(actor: ActorRef) { + getMailbox(actor).suspended.switchOn + } + + override def resume(actor: ActorRef) { + val mbox = getMailbox(actor) + val queue = mbox.queue + val wasActive = queue.isActive + val switched = mbox.suspended.switchOff { + gatherFromAllInactiveQueues(mbox, queue) + } + if (switched && !wasActive) { + runQueue(mbox, queue) + } + } + + override def mailboxSize(actor: ActorRef) = getMailbox(actor).queue.size + + private[akka] override def dispatch(handle: MessageInvocation) { + val mbox = getMailbox(handle.receiver) + val queue = mbox.queue + val execute = mbox.suspended.ifElseYield { + queue.push(handle) + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("suspended, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } { + queue.push(handle) + if (queue.isActive) { + if (warnings && handle.senderFuture.isDefined) { + log.slf4j.warn("blocked on this thread, creating Future could deadlock; target: {}", + handle.receiver) + } + false + } else { + queue.enter + true + } + } + if (execute) runQueue(mbox, queue) + } + + /* + * This method must be called with this thread's queue, which must already + * have been entered (active). When this method returns, the queue will be + * inactive. + * + * If the catch block is executed, then a non-empty mailbox may be stalled as + * there is no-one who cares to execute it before the next message is sent or + * it is suspended and resumed. + */ + @tailrec private def runQueue(mbox : CallingThreadMailbox, queue : NestingQueue) { + assert(queue.isActive) + mbox.lock.lock + val recurse = try { + val handle = mbox.suspended.ifElseYield[MessageInvocation] { + queue.leave + null + } { + val ret = queue.pop + if (ret eq null) queue.leave + ret + } + if (handle ne null) { + try { + handle.invoke + val f = handle.senderFuture + if (warnings && f.isDefined && !f.get.isCompleted) { + log.slf4j.warn("calling {} with message {} did not reply as expected, might deadlock", handle.receiver, handle.message) + } + } catch { + case _ => queue.leave + } + true + } else if (queue.isActive) { + queue.leave + false + } else false + } finally { + mbox.lock.unlock + } + if (recurse) { + runQueue(mbox, queue) + } + } +} + +class NestingQueue { + private var q = new LinkedList[MessageInvocation]() + def size = q.size + def push(handle : MessageInvocation) { q.offer(handle) } + def peek = q.peek + def pop = q.poll + + @volatile private var active = false + def enter { if (active) error("already active") else active = true } + def leave { if (!active) error("not active") else active = false } + def isActive = active +} + +class CallingThreadMailbox { + + private val q = new ThreadLocal[NestingQueue]() { + override def initialValue = new NestingQueue + } + + def queue = q.get + + val lock = new ReentrantLock + + val suspended = new Switch(false) +}