From 298bab0cd8d42019cd28112de29abefab2c671fb Mon Sep 17 00:00:00 2001 From: Roland Date: Thu, 20 Dec 2012 23:28:08 +0100 Subject: [PATCH] fix sometimes-off-by-one in PeekMailbox, see #2851 --- .../akka/contrib/mailbox/PeekMailbox.scala | 26 ++++++++++--------- .../contrib/mailbox/PeekMailboxSpec.scala | 2 +- 2 files changed, 15 insertions(+), 13 deletions(-) diff --git a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala index bb580bccb9..7f929626cf 100644 --- a/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala +++ b/akka-contrib/src/main/scala/akka/contrib/mailbox/PeekMailbox.scala @@ -40,15 +40,16 @@ class PeekMailboxExtension(val system: ExtendedActorSystem) extends Extension { class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType { override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match { case (Some(o), Some(s)) ⇒ - val tries = config.getInt("max-tries") ensuring (_ >= 1, "max-tries must be at least 1") - val mailbox = new PeekMailbox(o, s, tries) + val retries = config.getInt("max-retries") + if (retries < 1) throw new akka.ConfigurationException("max-retries must be at least 1") + val mailbox = new PeekMailbox(o, s, retries) PeekMailboxExtension(s).register(o, mailbox) mailbox case _ ⇒ throw new Exception("no mailbox owner or system given") } } -class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int) +class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int) extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics { final val queue = new ConcurrentLinkedQueue[Envelope]() @@ -58,20 +59,21 @@ class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int) * continue handing back out that same message until ACKed, peek() must be * used. The retry limit logic is then formulated in terms of the `tries` * field, which holds - * 0 if clean slate (i.e. last dequeue was ack()ed) - * 1..maxTries if not yet ack()ed - * Marker if last try was done (at which point we had to poll()) - * -1 during cleanUp (in order to disable the ack() requirement) + * 0 if clean slate (i.e. last dequeue was ack()ed) + * 1..maxRetries if not yet ack()ed + * Marker if last try was done (at which point we had to poll()) + * -1 during cleanUp (in order to disable the ack() requirement) */ // the mutable state is only ever accessed by the actor (i.e. dequeue() side) var tries = 0 - val Marker = maxTries + 1 + val Marker = maxRetries + 1 + // this logic does not work if maxRetries==0, but then you could also use a normal mailbox override def dequeue(): Envelope = tries match { - case -1 ⇒ queue.poll() - case 0 | Marker ⇒ tries = 1; queue.peek() - case `maxTries` ⇒ tries = Marker; queue.poll() - case n ⇒ tries = n + 1; queue.peek() + case -1 ⇒ queue.poll() + case 0 | Marker ⇒ val e = queue.peek(); tries = if (e eq null) 0 else 1; e + case `maxRetries` ⇒ tries = Marker; queue.poll() + case n ⇒ tries = n + 1; queue.peek() } def ack(): Unit = { diff --git a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala index 1c27ce4828..4a2f68109d 100644 --- a/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala +++ b/akka-contrib/src/test/scala/akka/contrib/mailbox/PeekMailboxSpec.scala @@ -32,7 +32,7 @@ object PeekMailboxSpec { class PeekMailboxSpec extends AkkaSpec(""" peek-dispatcher { mailbox-type = "akka.contrib.mailbox.PeekMailboxType" - max-tries = 3 + max-retries = 2 } """) with ImplicitSender {