=act #17372 warn when non-zero push timeout detected

This commit is contained in:
Konrad Malawski 2015-08-20 15:16:32 +02:00
parent e1f54d5367
commit 3036012e58
5 changed files with 216 additions and 180 deletions

View file

@ -3,21 +3,22 @@
*/
package akka.dispatch
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import akka.util.Helpers.ConfigOps
import akka.event.Logging.Error
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import java.util.{ Comparator, Deque, PriorityQueue, Queue }
import akka.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActorRef }
import akka.dispatch.sysmsg._
import akka.event.Logging.Error
import akka.util.Helpers.ConfigOps
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -505,6 +506,15 @@ trait BoundedMessageQueueSemantics {
def pushTimeOut: Duration
}
/**
* INTERNAL API
* Used to determine mailbox factories which create [[BoundedMessageQueueSemantics]]
* mailboxes, and thus should be validated that the `pushTimeOut` is greater than 0.
*/
private[akka] trait ProducesPushTimeoutSemanticsMailbox {
def pushTimeOut: Duration
}
trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics {
override def queue: BlockingQueue[Envelope]
@ -635,8 +645,9 @@ case class NonBlockingBoundedMailbox(val capacity: Int) extends MailboxType with
/**
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
*/
final case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] {
final case class BoundedMailbox(val capacity: Int, override val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))
@ -677,8 +688,9 @@ object UnboundedPriorityMailbox {
* BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] {
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
@ -719,8 +731,9 @@ object UnboundedStablePriorityMailbox {
* [[BoundedPriorityMailbox]] it preserves ordering for messages of equal priority.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] {
class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
@ -757,8 +770,9 @@ object UnboundedDequeBasedMailbox {
/**
* BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque.
*/
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] {
case class BoundedDequeBasedMailbox( final val capacity: Int, override final val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))
@ -833,7 +847,9 @@ object UnboundedControlAwareMailbox {
* BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues
* to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
*/
final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] {
final case class BoundedControlAwareMailbox(capacity: Int, override final val pushTimeOut: FiniteDuration) extends MailboxType
with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))