diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index ef0d09fca3..a5689946b7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -3,21 +3,22 @@ */ package akka.dispatch -import java.util.{ Comparator, PriorityQueue, Queue, Deque } import java.util.concurrent._ -import akka.AkkaException -import akka.dispatch.sysmsg._ -import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter } -import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe } -import akka.util.Helpers.ConfigOps -import akka.event.Logging.Error -import scala.concurrent.duration.{ Duration, FiniteDuration } -import scala.concurrent.forkjoin.ForkJoinTask -import scala.annotation.tailrec -import scala.util.control.NonFatal -import com.typesafe.config.Config import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.locks.ReentrantLock +import java.util.{ Comparator, Deque, PriorityQueue, Queue } + +import akka.actor.{ ActorCell, ActorRef, ActorSystem, DeadLetter, InternalActorRef } +import akka.dispatch.sysmsg._ +import akka.event.Logging.Error +import akka.util.Helpers.ConfigOps +import akka.util.{ BoundedBlockingQueue, StablePriorityBlockingQueue, StablePriorityQueue, Unsafe } +import com.typesafe.config.Config + +import scala.annotation.tailrec +import scala.concurrent.duration.{ Duration, FiniteDuration } +import scala.concurrent.forkjoin.ForkJoinTask +import scala.util.control.NonFatal /** * INTERNAL API @@ -505,6 +506,15 @@ trait BoundedMessageQueueSemantics { def pushTimeOut: Duration } +/** + * INTERNAL API + * Used to determine mailbox factories which create [[BoundedMessageQueueSemantics]] + * mailboxes, and thus should be validated that the `pushTimeOut` is greater than 0. + */ +private[akka] trait ProducesPushTimeoutSemanticsMailbox { + def pushTimeOut: Duration +} + trait BoundedQueueBasedMessageQueue extends QueueBasedMessageQueue with BoundedMessageQueueSemantics { override def queue: BlockingQueue[Envelope] @@ -635,8 +645,9 @@ case class NonBlockingBoundedMailbox(val capacity: Int) extends MailboxType with /** * BoundedMailbox is the default bounded MailboxType used by Akka Actors. */ -final case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration) - extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] { +final case class BoundedMailbox(val capacity: Int, override val pushTimeOut: FiniteDuration) + extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), config.getNanosDuration("mailbox-push-timeout-time")) @@ -677,8 +688,9 @@ object UnboundedPriorityMailbox { * BoundedPriorityMailbox is a bounded mailbox that allows for prioritization of its contents. * Extend this class and provide the Comparator in the constructor. */ -class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) - extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] { +class BoundedPriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration) + extends MailboxType with ProducesMessageQueue[BoundedPriorityMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") @@ -719,8 +731,9 @@ object UnboundedStablePriorityMailbox { * [[BoundedPriorityMailbox]] it preserves ordering for messages of equal priority. * Extend this class and provide the Comparator in the constructor. */ -class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, final val pushTimeOut: Duration) - extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] { +class BoundedStablePriorityMailbox( final val cmp: Comparator[Envelope], final val capacity: Int, override final val pushTimeOut: Duration) + extends MailboxType with ProducesMessageQueue[BoundedStablePriorityMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative") if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null") @@ -757,8 +770,9 @@ object UnboundedDequeBasedMailbox { /** * BoundedDequeBasedMailbox is an bounded MailboxType, backed by a Deque. */ -case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTimeOut: FiniteDuration) - extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] { +case class BoundedDequeBasedMailbox( final val capacity: Int, override final val pushTimeOut: FiniteDuration) + extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), config.getNanosDuration("mailbox-push-timeout-time")) @@ -833,7 +847,9 @@ object UnboundedControlAwareMailbox { * BoundedControlAwareMailbox is a bounded MailboxType, that maintains two queues * to allow messages that extend [[akka.dispatch.ControlMessage]] to be delivered with priority. */ -final case class BoundedControlAwareMailbox(capacity: Int, pushTimeOut: FiniteDuration) extends MailboxType with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] { +final case class BoundedControlAwareMailbox(capacity: Int, override final val pushTimeOut: FiniteDuration) extends MailboxType + with ProducesMessageQueue[BoundedControlAwareMailbox.MessageQueue] + with ProducesPushTimeoutSemanticsMailbox { def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"), config.getNanosDuration("mailbox-push-timeout-time")) diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala index 7454674a9b..9747c10073 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailboxes.scala @@ -4,26 +4,18 @@ package akka.dispatch -import com.typesafe.config.{ ConfigFactory, Config } -import akka.actor.{ Actor, DynamicAccess, ActorSystem } -import akka.event.EventStream -import java.util.concurrent.ConcurrentHashMap -import akka.event.Logging.Warning -import akka.ConfigurationException -import scala.annotation.tailrec import java.lang.reflect.ParameterizedType +import java.util.concurrent.ConcurrentHashMap + +import akka.ConfigurationException +import akka.actor.{ Actor, ActorRef, ActorSystem, DeadLetter, Deploy, DynamicAccess, Props } +import akka.dispatch.sysmsg.{ EarliestFirstSystemMessageList, LatestFirstSystemMessageList, SystemMessage, SystemMessageList } +import akka.event.EventStream +import akka.event.Logging.Warning import akka.util.Reflect -import akka.actor.Props -import akka.actor.Deploy -import scala.util.Try -import scala.util.Failure +import com.typesafe.config.{ Config, ConfigFactory } + import scala.util.control.NonFatal -import akka.actor.ActorRef -import akka.actor.DeadLetter -import akka.dispatch.sysmsg.SystemMessage -import akka.dispatch.sysmsg.LatestFirstSystemMessageList -import akka.dispatch.sysmsg.EarliestFirstSystemMessageList -import akka.dispatch.sysmsg.SystemMessageList object Mailboxes { final val DefaultMailboxId = "akka.actor.default-mailbox" @@ -96,6 +88,7 @@ private[akka] class Mailboxes( // don’t care if this happens twice private var mailboxSizeWarningIssued = false + private var mailboxNonZeroPushTimeoutWarningIssued = false def getMailboxRequirement(config: Config) = config.getString("mailbox-requirement") match { case NoMailboxRequirement ⇒ classOf[MessageQueue] @@ -188,18 +181,32 @@ private[akka] class Mailboxes( case _ ⇒ if (!settings.config.hasPath(id)) throw new ConfigurationException(s"Mailbox Type [${id}] not configured") val conf = config(id) - conf.getString("mailbox-type") match { + + val mailboxType = conf.getString("mailbox-type") match { case "" ⇒ throw new ConfigurationException(s"The setting mailbox-type, defined in [$id] is empty") case fqcn ⇒ val args = List(classOf[ActorSystem.Settings] -> settings, classOf[Config] -> conf) dynamicAccess.createInstanceFor[MailboxType](fqcn, args).recover({ case exception ⇒ throw new IllegalArgumentException( - (s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" + - " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters"), + s"Cannot instantiate MailboxType [$fqcn], defined in [$id], make sure it has a public" + + " constructor with [akka.actor.ActorSystem.Settings, com.typesafe.config.Config] parameters", exception) }).get } + + if (!mailboxNonZeroPushTimeoutWarningIssued) { + mailboxType match { + case m: ProducesPushTimeoutSemanticsMailbox if m.pushTimeOut.toNanos > 0L ⇒ + warn(s"Configured potentially-blocking mailbox [$id] configured with non-zero pushTimeOut (${m.pushTimeOut}), " + + s"which can lead to blocking behaviour when sending messages to this mailbox. " + + s"Avoid this by setting `$id.mailbox-push-timeout-time` to `0`.") + mailboxNonZeroPushTimeoutWarningIssued = true + case _ ⇒ // good; nothing to see here, move along, sir. + } + } + + mailboxType } mailboxTypeConfigurators.putIfAbsent(id, newConfigurator) match { @@ -213,6 +220,9 @@ private[akka] class Mailboxes( private val defaultMailboxConfig = settings.config.getConfig(DefaultMailboxId) + private final def warn(msg: String): Unit = + eventStream.publish(Warning("mailboxes", getClass, msg)) + //INTERNAL API private def config(id: String): Config = { import scala.collection.JavaConverters._ diff --git a/akka-docs/rst/java/mailboxes.rst b/akka-docs/rst/java/mailboxes.rst index e320d0c06a..ebbd8c09a2 100644 --- a/akka-docs/rst/java/mailboxes.rst +++ b/akka-docs/rst/java/mailboxes.rst @@ -110,7 +110,8 @@ Builtin Mailbox Implementations Akka comes shipped with a number of mailbox implementations: -* UnboundedMailbox +* **UnboundedMailbox** (default) + - The default mailbox - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue`` @@ -119,87 +120,29 @@ Akka comes shipped with a number of mailbox implementations: - Bounded: No - - Configuration name: "unbounded" or "akka.dispatch.UnboundedMailbox" + - Configuration name: ``"unbounded"`` or ``"akka.dispatch.UnboundedMailbox"`` -* SingleConsumerOnlyUnboundedMailbox +* **SingleConsumerOnlyUnboundedMailbox** - - Backed by a very efficient Multiple Producer Single Consumer queue, cannot be used with BalancingDispatcher + - Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` - Blocking: No - Bounded: No - - Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + - Configuration name: ``"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"`` -* NonBlockingBoundedMailbox +* **NonBlockingBoundedMailbox** - - Backed by a very efficient MultiPle-Producer Multiple-Consumer queue + - Backed by a very efficient Multiple-Producer Multiple-Consumer queue - Blocking: No - Bounded: Yes - - Configuration name: "akka.dispatch.NonBlockingBoundedMailbox" + - Configuration name: ``"akka.dispatch.NonBlockingBoundedMailbox"`` -* BoundedMailbox - - - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` - - - Blocking: Yes - - - Bounded: Yes - - - Configuration name: "bounded" or "akka.dispatch.BoundedMailbox" - -* UnboundedPriorityMailbox - - - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` - - - Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox - - - Blocking: Yes - - - Bounded: No - - - Configuration name: "akka.dispatch.UnboundedPriorityMailbox" - -* BoundedPriorityMailbox - - - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` - - - Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox - - - Blocking: Yes - - - Bounded: Yes - - - Configuration name: "akka.dispatch.BoundedPriorityMailbox" - -* UnboundedStablePriorityMailbox - - - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` - - - FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox - - - Blocking: Yes - - - Bounded: No - - - Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox" - -* BoundedStablePriorityMailbox - - - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue`` - - - FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox - - - Blocking: Yes - - - Bounded: Yes - - - Configuration name: "akka.dispatch.BoundedStablePriorityMailbox" - -* UnboundedControlAwareMailbox +* **UnboundedControlAwareMailbox** - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority @@ -211,7 +154,68 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.UnboundedControlAwareMailbox" -* BoundedControlAwareMailbox +Aditional implementations exist which can be potencially blocking on pushing into the queue, +however they can be configured using + +* **BoundedMailbox** + + - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "bounded" or "akka.dispatch.BoundedMailbox" + +* **UnboundedPriorityMailbox** + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` + + - Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox + + - Blocking: Yes + + - Bounded: No + + - Configuration name: ``"akka.dispatch.UnboundedPriorityMailbox"`` + +* **BoundedPriorityMailbox** + + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + + - Delivery order for messages of equal priority is undefined - contrast with the ``BoundedStablePriorityMailbox`` + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: ``"akka.dispatch.BoundedPriorityMailbox"`` + +* **UnboundedStablePriorityMailbox** + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` + + - FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox + + - Blocking: Yes + + - Bounded: No + + - Configuration name: ``"akka.dispatch.UnboundedStablePriorityMailbox"`` + +* **BoundedStablePriorityMailbox** + + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue`` + + - FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: ``"akka.dispatch.BoundedStablePriorityMailbox"`` + +* **BoundedControlAwareMailbox** - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority @@ -221,7 +225,7 @@ Akka comes shipped with a number of mailbox implementations: - Bounded: Yes - - Configuration name: "akka.dispatch.BoundedControlAwareMailbox" + - Configuration name: ``"akka.dispatch.BoundedControlAwareMailbox"`` Mailbox configuration examples ============================== diff --git a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala index dddf53e0ae..3f806b2f86 100644 --- a/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala +++ b/akka-docs/rst/scala/code/docs/persistence/PersistenceDocSpec.scala @@ -7,6 +7,8 @@ package docs.persistence import akka.actor._ import akka.pattern.BackoffSupervisor import akka.persistence._ +import akka.stream.ActorMaterializer +import akka.stream.scaladsl.{Source, Sink, Flow} import scala.concurrent.duration._ import scala.language.postfixOps diff --git a/akka-docs/rst/scala/mailboxes.rst b/akka-docs/rst/scala/mailboxes.rst index cf2ed4a8a7..579aec8488 100644 --- a/akka-docs/rst/scala/mailboxes.rst +++ b/akka-docs/rst/scala/mailboxes.rst @@ -105,12 +105,13 @@ configuration section from the actor system’s configuration, overriding its ``id`` key with the configuration path of the mailbox type and adding a fall-back to the default mailbox configuration section. -Builtin implementations -======================= +Builtin Mailbox Implementations +=============================== Akka comes shipped with a number of mailbox implementations: -* UnboundedMailbox +* **UnboundedMailbox** (default) + - The default mailbox - Backed by a ``java.util.concurrent.ConcurrentLinkedQueue`` @@ -119,87 +120,29 @@ Akka comes shipped with a number of mailbox implementations: - Bounded: No - - Configuration name: "unbounded" or "akka.dispatch.UnboundedMailbox" + - Configuration name: ``"unbounded"`` or ``"akka.dispatch.UnboundedMailbox"`` -* SingleConsumerOnlyUnboundedMailbox +* **SingleConsumerOnlyUnboundedMailbox** - - Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with BalancingDispatcher + - Backed by a very efficient Multiple-Producer Single-Consumer queue, cannot be used with ``BalancingDispatcher`` - Blocking: No - Bounded: No - - Configuration name: "akka.dispatch.SingleConsumerOnlyUnboundedMailbox" + - Configuration name: ``"akka.dispatch.SingleConsumerOnlyUnboundedMailbox"`` -* BoundedMailbox +* **NonBlockingBoundedMailbox** - - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` - - - Blocking: Yes - - - Bounded: Yes - - - Configuration name: "bounded" or "akka.dispatch.BoundedMailbox" - -* NonBlockingBoundedMailbox - - - Backed by a very efficient MultiPle-Producer Multiple-Consumer queue + - Backed by a very efficient Multiple-Producer Multiple-Consumer queue - Blocking: No - Bounded: Yes - - Configuration name: "akka.dispatch.NonBlockingBoundedMailbox" + - Configuration name: ``"akka.dispatch.NonBlockingBoundedMailbox"`` -* UnboundedPriorityMailbox - - - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` - - - Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox - - - Blocking: Yes - - - Bounded: No - - - Configuration name: "akka.dispatch.UnboundedPriorityMailbox" - -* BoundedPriorityMailbox - - - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` - - - Delivery order for messages of equal priority is undefined - contrast with the BoundedStablePriorityMailbox - - - Blocking: Yes - - - Bounded: Yes - - - Configuration name: "akka.dispatch.BoundedPriorityMailbox" - -* UnboundedStablePriorityMailbox - - - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` - - - FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox - - - Blocking: Yes - - - Bounded: No - - - Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox" - -* BoundedStablePriorityMailbox - - - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue`` - - - FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox - - - Blocking: Yes - - - Bounded: Yes - - - Configuration name: "akka.dispatch.BoundedStablePriorityMailbox" - -* UnboundedControlAwareMailbox +* **UnboundedControlAwareMailbox** - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority @@ -211,7 +154,67 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.UnboundedControlAwareMailbox" -* BoundedControlAwareMailbox +Previously available mailbox implementations which were deprecated because they might block: + +* **BoundedMailbox** + + - Backed by a ``java.util.concurrent.LinkedBlockingQueue`` + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "bounded" or "akka.dispatch.BoundedMailbox" + +* **UnboundedPriorityMailbox** + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` + + - Delivery order for messages of equal priority is undefined - contrast with the UnboundedStablePriorityMailbox + + - Blocking: Yes + + - Bounded: No + + - Configuration name: "akka.dispatch.UnboundedPriorityMailbox" + +* **BoundedPriorityMailbox** + + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.BoundedBlockingQueue`` + + - Delivery order for messages of equal priority is undefined - contrast with the ``BoundedStablePriorityMailbox`` + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: ``"akka.dispatch.BoundedPriorityMailbox"`` + +* **UnboundedStablePriorityMailbox** + + - Backed by a ``java.util.concurrent.PriorityBlockingQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` + + - FIFO order is preserved for messages of equal priority - contrast with the UnboundedPriorityMailbox + + - Blocking: Yes + + - Bounded: No + + - Configuration name: "akka.dispatch.UnboundedStablePriorityMailbox" + +* **BoundedStablePriorityMailbox** + + - Backed by a ``java.util.PriorityQueue`` wrapped in an ``akka.util.PriorityQueueStabilizer`` and an ``akka.util.BoundedBlockingQueue`` + + - FIFO order is preserved for messages of equal priority - contrast with the BoundedPriorityMailbox + + - Blocking: Yes + + - Bounded: Yes + + - Configuration name: "akka.dispatch.BoundedStablePriorityMailbox" + +* **BoundedControlAwareMailbox** - Delivers messages that extend ``akka.dispatch.ControlMessage`` with higher priority @@ -223,6 +226,7 @@ Akka comes shipped with a number of mailbox implementations: - Configuration name: "akka.dispatch.BoundedControlAwareMailbox" + Mailbox configuration examples ==============================