fix sometimes-off-by-one in PeekMailbox, see #2851
This commit is contained in:
parent
a7f8fe7e8e
commit
298bab0cd8
2 changed files with 15 additions and 13 deletions
|
|
@ -40,15 +40,16 @@ class PeekMailboxExtension(val system: ExtendedActorSystem) extends Extension {
|
|||
class PeekMailboxType(settings: ActorSystem.Settings, config: Config) extends MailboxType {
|
||||
override def create(owner: Option[ActorRef], system: Option[ActorSystem]) = (owner, system) match {
|
||||
case (Some(o), Some(s)) ⇒
|
||||
val tries = config.getInt("max-tries") ensuring (_ >= 1, "max-tries must be at least 1")
|
||||
val mailbox = new PeekMailbox(o, s, tries)
|
||||
val retries = config.getInt("max-retries")
|
||||
if (retries < 1) throw new akka.ConfigurationException("max-retries must be at least 1")
|
||||
val mailbox = new PeekMailbox(o, s, retries)
|
||||
PeekMailboxExtension(s).register(o, mailbox)
|
||||
mailbox
|
||||
case _ ⇒ throw new Exception("no mailbox owner or system given")
|
||||
}
|
||||
}
|
||||
|
||||
class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int)
|
||||
class PeekMailbox(owner: ActorRef, system: ActorSystem, maxRetries: Int)
|
||||
extends QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
|
||||
final val queue = new ConcurrentLinkedQueue[Envelope]()
|
||||
|
||||
|
|
@ -58,20 +59,21 @@ class PeekMailbox(owner: ActorRef, system: ActorSystem, maxTries: Int)
|
|||
* continue handing back out that same message until ACKed, peek() must be
|
||||
* used. The retry limit logic is then formulated in terms of the `tries`
|
||||
* field, which holds
|
||||
* 0 if clean slate (i.e. last dequeue was ack()ed)
|
||||
* 1..maxTries if not yet ack()ed
|
||||
* Marker if last try was done (at which point we had to poll())
|
||||
* -1 during cleanUp (in order to disable the ack() requirement)
|
||||
* 0 if clean slate (i.e. last dequeue was ack()ed)
|
||||
* 1..maxRetries if not yet ack()ed
|
||||
* Marker if last try was done (at which point we had to poll())
|
||||
* -1 during cleanUp (in order to disable the ack() requirement)
|
||||
*/
|
||||
// the mutable state is only ever accessed by the actor (i.e. dequeue() side)
|
||||
var tries = 0
|
||||
val Marker = maxTries + 1
|
||||
val Marker = maxRetries + 1
|
||||
|
||||
// this logic does not work if maxRetries==0, but then you could also use a normal mailbox
|
||||
override def dequeue(): Envelope = tries match {
|
||||
case -1 ⇒ queue.poll()
|
||||
case 0 | Marker ⇒ tries = 1; queue.peek()
|
||||
case `maxTries` ⇒ tries = Marker; queue.poll()
|
||||
case n ⇒ tries = n + 1; queue.peek()
|
||||
case -1 ⇒ queue.poll()
|
||||
case 0 | Marker ⇒ val e = queue.peek(); tries = if (e eq null) 0 else 1; e
|
||||
case `maxRetries` ⇒ tries = Marker; queue.poll()
|
||||
case n ⇒ tries = n + 1; queue.peek()
|
||||
}
|
||||
|
||||
def ack(): Unit = {
|
||||
|
|
|
|||
|
|
@ -32,7 +32,7 @@ object PeekMailboxSpec {
|
|||
class PeekMailboxSpec extends AkkaSpec("""
|
||||
peek-dispatcher {
|
||||
mailbox-type = "akka.contrib.mailbox.PeekMailboxType"
|
||||
max-tries = 3
|
||||
max-retries = 2
|
||||
}
|
||||
""") with ImplicitSender {
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue