Merge pull request #18283 from ktoso/wip--†-blocking-ktoso

=act #17372 warn when non-zero push timeout detected
This commit is contained in:
Konrad Malawski 2015-08-21 15:37:05 +02:00
commit fb18464b73
5 changed files with 216 additions and 180 deletions

View file

@ -3,21 +3,22 @@
*/
package akka.dispatch
import java.util.{ Comparator, PriorityQueue, Queue, Deque }
import java.util.concurrent._
import akka.AkkaException
import akka.dispatch.sysmsg._
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import akka.util.Helpers.ConfigOps
import akka.event.Logging.Error
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
import scala.annotation.tailrec
import scala.util.control.NonFatal
import com.typesafe.config.Config
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.locks.ReentrantLock
import java.util.{ Comparator, Deque, PriorityQueue, Queue }
import akka.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActorRef }
import akka.dispatch.sysmsg._
import akka.event.Logging.Error
import akka.util.Helpers.ConfigOps
import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe }
import com.typesafe.config.Config
import scala.annotation.tailrec
import scala.concurrent.duration.{ Duration, FiniteDuration }
import scala.concurrent.forkjoin.ForkJoinTask
import scala.util.control.NonFatal
/**
* INTERNAL API
@ -505,6 +506,15 @@ trait BoundedMessageQueueSemantics {
def pushTimeOut: Duration
}
/**
* INTERNAL API
* Used to determine mailbox factories which create [[BoundedMessageQueueSemantics]]
* mailboxes, and thus should be validated that the `pushTimeOut` is greater than 0.
*/
private[akka] trait ProducesPushTimeoutSemanticsMailbox {
def pushTimeOut: Duration
}
trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics {
override def queue: BlockingQueue[Envelope]
@ -635,8 +645,9 @@ case class NonBlockingBoundedMailbox(val capacity: Int) extends MailboxType with
/**
* BoundedMailbox is the default bounded MailboxType used by Akka Actors.
*/
final case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] {
final case class BoundedMailbox(val capacity: Int, override val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))
@ -677,8 +688,9 @@ object UnboundedPriorityMailbox {
* BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] {
class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
@ -719,8 +731,9 @@ object UnboundedStablePriorityMailbox {
* [[BoundedPriorityMailbox]] it preserves ordering for messages of equal priority.
* Extend this class and provide the Comparator in the constructor.
*/
class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] {
class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration)
extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
@ -757,8 +770,9 @@ object UnboundedDequeBasedMailbox {
/**
* BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque.
*/
case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] {
case class BoundedDequeBasedMailbox( final val capacity: Int, override final val pushTimeOut: FiniteDuration)
extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))
@ -833,7 +847,9 @@ object UnboundedControlAwareMailbox {
* BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues
* to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority.
*/
final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] {
final case class BoundedControlAwareMailbox(capacity: Int, override final val pushTimeOut: FiniteDuration) extends MailboxType
with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue]
with ProducesPushTimeoutSemanticsMailbox {
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
config.getNanosDuration("mailbox-push-timeout-time"))

View file

@ -4,26 +4,18 @@
package akka.dispatch
import com.typesafe.config.{ ConfigFactory, Config }
import akka.actor.{ Actor, DynamicAccess, ActorSystem }
import akka.event.EventStream
import java.util.concurrent.ConcurrentHashMap
import akka.event.Logging.Warning
import akka.ConfigurationException
import scala.annotation.tailrec
import java.lang.reflect.ParameterizedType
import java.util.concurrent.ConcurrentHashMap
import akka.ConfigurationException
import akka.actor.{ Actor, ActorRef, ActorSystem, DeadLetter, Deploy, DynamicAccess, Props }
import akka.dispatch.sysmsg.{ EarliestFirstSystemMessageList, LatestFirstSystemMessageList, SystemMessage, SystemMessageList }
import akka.event.EventStream
import akka.event.Logging.Warning
import akka.util.Reflect
import akka.actor.Props
import akka.actor.Deploy
import scala.util.Try
import scala.util.Failure
import com.typesafe.config.{ Config, ConfigFactory }
import scala.util.control.NonFatal
import akka.actor.ActorRef
import akka.actor.DeadLetter
import akka.dispatch.sysmsg.SystemMessage
import akka.dispatch.sysmsg.LatestFirstSystemMessageList
import akka.dispatch.sysmsg.EarliestFirstSystemMessageList
import akka.dispatch.sysmsg.SystemMessageList
object Mailboxes {
final val DefaultMailboxId = "akka.actor.default-mailbox"
@ -96,6 +88,7 @@ private[akka] class Mailboxes(
// dont care if this happens twice
private var mailboxSizeWarningIssued = false
private var mailboxNonZeroPushTimeoutWarningIssued = false
def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match {
case NoMailboxRequirement classOf[MessageQueue]
@ -188,18 +181,32 @@ private[akka] class Mailboxes(
case _
if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [${id}] not configured")
val conf = config(id)
conf.getString("mailbox-type") match {
val mailboxType = conf.getString("mailbox-type") match {
case "" throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty")
case fqcn
val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> conf)
dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({
case exception
throw new IllegalArgumentException(
(s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" +
" constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"),
s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" +
" constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters",
exception)
}).get
}
if (!mailboxNonZeroPushTimeoutWarningIssued) {
mailboxType match {
case m: ProducesPushTimeoutSemanticsMailbox if m.pushTimeOut.toNanos > 0L
warn(s"Configured potentially-blocking mailbox [$id] configured with non-zero pushTimeOut (${m.pushTimeOut}), " +
s"which can lead to blocking behaviour when sending messages to this mailbox. " +
s"Avoid this by setting `$id.mailbox-push-timeout-time` to `0`.")
mailboxNonZeroPushTimeoutWarningIssued = true
case _ // good; nothing to see here, move along, sir.
}
}
mailboxType
}
mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match {
@ -213,6 +220,9 @@ private[akka] class Mailboxes(
private val defaultMailboxConfig = settings.config.getConfig(DefaultMailboxId)
private final def warn(msg: String): Unit =
eventStream.publish(Warning("mailboxes", getClass, msg))
//INTERNAL API
private def config(id: String): Config = {
import scala.collection.JavaConverters._

View file

@ -110,7 +110,8 @@ Builtin Mailbox Implementations
Akka comes shipped with a number of mailbox implementations:
* UnboundedMailbox
* **UnboundedMailbox** (default)
- The default mailbox
- Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
@ -119,87 +120,29 @@ Akka comes shipped with a number of mailbox implementations:
- Bounded: No
- Configuration name: "unbounded" or "akka.dispatch.UnboundedMailbox"
- Configuration name: ``"unbounded"`` or ``"akka.dispatch.UnboundedMailbox"``
* SingleConsumerOnlyUnboundedMailbox
* **SingleConsumerOnlyUnboundedMailbox**
- Backed by a very efficient Multiple Producer Single Consumer queue, cannot be used with BalancingDispatcher
- Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher``
- Blocking: No
- Bounded: No
- Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
- Configuration name: ``"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"``
* NonBlockingBoundedMailbox
* **NonBlockingBoundedMailbox**
- Backed by a very efficient MultiPle-Producer Multiple-Consumer queue
- Backed by a very efficient Multiple-Producer Multiple-Consumer queue
- Blocking: No
- Bounded: Yes
- Configuration name: "akka.dispatch.NonBlockingBoundedMailbox"
- Configuration name: ``"akka.dispatch.NonBlockingBoundedMailbox"``
* BoundedMailbox
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
- Blocking: Yes
- Bounded: Yes
- Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
* UnboundedPriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedPriorityMailbox"
* BoundedPriorityMailbox
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedPriorityMailbox"
* UnboundedStablePriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer``
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox"
* BoundedStablePriorityMailbox
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue``
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedStablePriorityMailbox"
* UnboundedControlAwareMailbox
* **UnboundedControlAwareMailbox**
- Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority
@ -211,7 +154,68 @@ Akka comes shipped with a number of mailbox implementations:
- Configuration name: "akka.dispatch.UnboundedControlAwareMailbox"
* BoundedControlAwareMailbox
Aditional implementations exist which can be potencially blocking on pushing into the queue,
however they can be configured using
* **BoundedMailbox**
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
- Blocking: Yes
- Bounded: Yes
- Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
* **UnboundedPriorityMailbox**
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: ``"akka.dispatch.UnboundedPriorityMailbox"``
* **BoundedPriorityMailbox**
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the ``BoundedStablePriorityMailbox``
- Blocking: Yes
- Bounded: Yes
- Configuration name: ``"akka.dispatch.BoundedPriorityMailbox"``
* **UnboundedStablePriorityMailbox**
- Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer``
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: ``"akka.dispatch.UnboundedStablePriorityMailbox"``
* **BoundedStablePriorityMailbox**
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue``
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: ``"akka.dispatch.BoundedStablePriorityMailbox"``
* **BoundedControlAwareMailbox**
- Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority
@ -221,7 +225,7 @@ Akka comes shipped with a number of mailbox implementations:
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedControlAwareMailbox"
- Configuration name: ``"akka.dispatch.BoundedControlAwareMailbox"``
Mailbox configuration examples
==============================

View file

@ -7,6 +7,8 @@ package docs.persistence
import akka.actor._
import akka.pattern.BackoffSupervisor
import akka.persistence._
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{Source, Sink, Flow}
import scala.concurrent.duration._
import scala.language.postfixOps

View file

@ -105,12 +105,13 @@ configuration section from the actor systems configuration, overriding its
``id`` key with the configuration path of the mailbox type and adding a
fall-back to the default mailbox configuration section.
Builtin implementations
=======================
Builtin Mailbox Implementations
===============================
Akka comes shipped with a number of mailbox implementations:
* UnboundedMailbox
* **UnboundedMailbox** (default)
- The default mailbox
- Backed by a ``java.util.concurrent.ConcurrentLinkedQueue``
@ -119,87 +120,29 @@ Akka comes shipped with a number of mailbox implementations:
- Bounded: No
- Configuration name: "unbounded" or "akka.dispatch.UnboundedMailbox"
- Configuration name: ``"unbounded"`` or ``"akka.dispatch.UnboundedMailbox"``
* SingleConsumerOnlyUnboundedMailbox
* **SingleConsumerOnlyUnboundedMailbox**
- Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with BalancingDispatcher
- Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher``
- Blocking: No
- Bounded: No
- Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox"
- Configuration name: ``"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"``
* BoundedMailbox
* **NonBlockingBoundedMailbox**
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
- Blocking: Yes
- Bounded: Yes
- Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
* NonBlockingBoundedMailbox
- Backed by a very efficient MultiPle-Producer Multiple-Consumer queue
- Backed by a very efficient Multiple-Producer Multiple-Consumer queue
- Blocking: No
- Bounded: Yes
- Configuration name: "akka.dispatch.NonBlockingBoundedMailbox"
- Configuration name: ``"akka.dispatch.NonBlockingBoundedMailbox"``
* UnboundedPriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedPriorityMailbox"
* BoundedPriorityMailbox
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedPriorityMailbox"
* UnboundedStablePriorityMailbox
- Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer``
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox"
* BoundedStablePriorityMailbox
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue``
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedStablePriorityMailbox"
* UnboundedControlAwareMailbox
* **UnboundedControlAwareMailbox**
- Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority
@ -211,7 +154,67 @@ Akka comes shipped with a number of mailbox implementations:
- Configuration name: "akka.dispatch.UnboundedControlAwareMailbox"
* BoundedControlAwareMailbox
Previously available mailbox implementations which were deprecated because they might block:
* **BoundedMailbox**
- Backed by a ``java.util.concurrent.LinkedBlockingQueue``
- Blocking: Yes
- Bounded: Yes
- Configuration name: "bounded" or "akka.dispatch.BoundedMailbox"
* **UnboundedPriorityMailbox**
- Backed by a ``java.util.concurrent.PriorityBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedPriorityMailbox"
* **BoundedPriorityMailbox**
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue``
- Delivery order for messages of equal priority is undefined - contrast with the ``BoundedStablePriorityMailbox``
- Blocking: Yes
- Bounded: Yes
- Configuration name: ``"akka.dispatch.BoundedPriorityMailbox"``
* **UnboundedStablePriorityMailbox**
- Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer``
- FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox
- Blocking: Yes
- Bounded: No
- Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox"
* **BoundedStablePriorityMailbox**
- Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue``
- FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox
- Blocking: Yes
- Bounded: Yes
- Configuration name: "akka.dispatch.BoundedStablePriorityMailbox"
* **BoundedControlAwareMailbox**
- Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority
@ -223,6 +226,7 @@ Akka comes shipped with a number of mailbox implementations:
- Configuration name: "akka.dispatch.BoundedControlAwareMailbox"
Mailbox configuration examples
==============================