diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala index 5a7fa7ce90..0ac89fa654 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorMailboxSpec.scala @@ -102,7 +102,7 @@ class ActorMailboxSpec extends AkkaSpec(ActorMailboxSpec.mailboxConf) with Defau actor ! "ping" val q = expectMsgType[MessageQueue] - types foreach (t ⇒ assert(t isInstance q, s"Type [${q.getClass}] is not assignable to [${t}]")) + types foreach (t ⇒ assert(t isInstance q, s"Type [${q.getClass.getName}] is not assignable to [${t.getName}]")) } "An Actor" must { diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index cd66fc5ac1..f4814b6dc5 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -319,7 +319,15 @@ akka { # an instance of that actor its mailbox type will be decided by looking # up a mailbox configuration via T in this mapping requirements { + "akka.dispatch.UnboundedMessageQueueSemantics" = akka.actor.mailbox.unbounded-queue-based "akka.dispatch.DequeBasedMessageQueue" = akka.actor.mailbox.unbounded-deque-based + "akka.dispatch.UnboundedDequeBasedMessageQueueSemantics" = akka.actor.mailbox.unbounded-deque-based + } + + unbounded-queue-based { + # FQCN of the MailboxType, The Class of the FQCN must have a public constructor + # with (akka.actor.ActorSystem.Settings, com.typesafe.config.Config) parameters. + mailbox-type = "akka.dispatch.UnboundedMailbox" } unbounded-deque-based { diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index ad32898ee5..71a444c0ea 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -6,7 +6,7 @@ package akka.actor import scala.collection.immutable import akka.dispatch.sysmsg._ -import akka.dispatch.NullMessage +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue, NullMessage } import akka.routing._ import akka.event._ import akka.util.{ Switch, Helpers } @@ -367,7 +367,8 @@ private[akka] object LocalActorRefProvider { /* * Root and user guardian */ - private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor { + private class Guardian(override val supervisorStrategy: SupervisorStrategy) extends Actor + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { def receive = { case Terminated(_) ⇒ context.stop(self) @@ -382,7 +383,8 @@ private[akka] object LocalActorRefProvider { /** * System guardian */ - private class SystemGuardian(override val supervisorStrategy: SupervisorStrategy, val guardian: ActorRef) extends Actor { + private class SystemGuardian(override val supervisorStrategy: SupervisorStrategy, val guardian: ActorRef) + extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import SystemGuardian._ var terminationHooks = Set.empty[ActorRef] diff --git a/akka-actor/src/main/scala/akka/actor/Stash.scala b/akka-actor/src/main/scala/akka/actor/Stash.scala index abefa9524d..beb20573de 100644 --- a/akka-actor/src/main/scala/akka/actor/Stash.scala +++ b/akka-actor/src/main/scala/akka/actor/Stash.scala @@ -3,7 +3,7 @@ */ package akka.actor -import akka.dispatch.{ RequiresMessageQueue, Envelope, DequeBasedMessageQueue } +import akka.dispatch.{ UnboundedDequeBasedMessageQueueSemantics, RequiresMessageQueue, Envelope, DequeBasedMessageQueue } import akka.AkkaException /** @@ -47,8 +47,14 @@ import akka.AkkaException * any trait/class that overrides the `preRestart` callback. This means it's not possible to write * `Actor with MyActor with Stash` if `MyActor` overrides `preRestart`. */ -trait Stash extends Actor with RequiresMessageQueue[DequeBasedMessageQueue] { +trait Stash extends UnrestrictedStash with RequiresMessageQueue[DequeBasedMessageQueue] +/** + * The `UnboundedStash` trait is a version of `Stash` that enforces an unbounded stash for you actor. + */ +trait UnboundedStash extends UnrestrictedStash with RequiresMessageQueue[UnboundedDequeBasedMessageQueueSemantics] + +trait UnrestrictedStash extends Actor { /* The private stash of the actor. It is only accessible using `stash()` and * `unstashAll()`. */ @@ -68,12 +74,12 @@ trait Stash extends Actor with RequiresMessageQueue[DequeBasedMessageQueue] { private val mailbox: DequeBasedMessageQueue = { context.asInstanceOf[ActorCell].mailbox.messageQueue match { case queue: DequeBasedMessageQueue ⇒ queue - case other ⇒ throw ActorInitializationException(self, "DequeBasedMailbox required, got: " + other.getClass() + """ -An (unbounded) deque-based mailbox can be configured as follows: - my-custom-dispatcher { - mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" - } -""") + case other ⇒ throw ActorInitializationException(self, s"DequeBasedMailbox required, got: ${other.getClass.getName}\n" + + """An (unbounded) deque-based mailbox can be configured as follows: + | my-custom-mailbox { + | mailbox-type = "akka.dispatch.UnboundedDequeBasedMailbox" + | } + |""".stripMargin) } } diff --git a/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala b/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala index a41e03e8cf..210fade3fd 100644 --- a/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala +++ b/akka-actor/src/main/scala/akka/actor/UntypedActorWithStash.scala @@ -31,3 +31,8 @@ package akka.actor * */ abstract class UntypedActorWithStash extends UntypedActor with Stash + +/** + * Actor base class that enforces an unbounded stash for the actor. + */ +abstract class UntypedActorWithUnboundedStash extends UntypedActor with UnboundedStash diff --git a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala index 950b5fa63f..c608324f70 100644 --- a/akka-actor/src/main/scala/akka/io/SelectionHandler.scala +++ b/akka-actor/src/main/scala/akka/io/SelectionHandler.scala @@ -19,6 +19,7 @@ import akka.io.IO.HasFailureMessage import akka.util.Helpers.Requiring import akka.event.LoggingAdapter import akka.util.SerializedSuspendableExecutionContext +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } abstract class SelectionHandlerSettings(config: Config) { import config._ @@ -56,7 +57,8 @@ private[io] object SelectionHandler { case object WriteInterest } -private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends Actor with ActorLogging { +private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends Actor with ActorLogging + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import SelectionHandler._ import settings._ @@ -211,4 +213,4 @@ private[io] class SelectionHandler(settings: SelectionHandlerSettings) extends A } } } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/io/TcpConnection.scala b/akka-actor/src/main/scala/akka/io/TcpConnection.scala index addae22ec9..16ba1ca7fe 100644 --- a/akka-actor/src/main/scala/akka/io/TcpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpConnection.scala @@ -17,14 +17,17 @@ import akka.util.ByteString import akka.io.Inet.SocketOption import akka.io.Tcp._ import akka.io.SelectionHandler._ +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * Base class for TcpIncomingConnection and TcpOutgoingConnection. * * INTERNAL API */ -private[io] abstract class TcpConnection(val channel: SocketChannel, - val tcp: TcpExt) extends Actor with ActorLogging { +private[io] abstract class TcpConnection( + val channel: SocketChannel, + val tcp: TcpExt) + extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import tcp.Settings._ import tcp.bufferPool import TcpConnection._ diff --git a/akka-actor/src/main/scala/akka/io/TcpListener.scala b/akka-actor/src/main/scala/akka/io/TcpListener.scala index a9a7397aee..35db90fa25 100644 --- a/akka-actor/src/main/scala/akka/io/TcpListener.scala +++ b/akka-actor/src/main/scala/akka/io/TcpListener.scala @@ -12,6 +12,7 @@ import akka.io.SelectionHandler._ import akka.io.Tcp._ import akka.io.IO.HasFailureMessage import java.net.InetSocketAddress +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API @@ -29,10 +30,12 @@ private[io] object TcpListener { /** * INTERNAL API */ -private[io] class TcpListener(val selectorRouter: ActorRef, - val tcp: TcpExt, - val bindCommander: ActorRef, - val bind: Bind) extends Actor with ActorLogging { +private[io] class TcpListener( + val selectorRouter: ActorRef, + val tcp: TcpExt, + val bindCommander: ActorRef, + val bind: Bind) + extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import TcpListener._ import tcp.Settings._ diff --git a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala index 9c881bb1eb..9e5515c492 100644 --- a/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala +++ b/akka-actor/src/main/scala/akka/io/TcpPipelineHandler.scala @@ -13,6 +13,7 @@ import scala.util.Failure import akka.actor.Terminated import akka.actor.Props import akka.util.ByteString +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } object TcpPipelineHandler { @@ -87,7 +88,7 @@ class TcpPipelineHandler[Ctx <: PipelineContext, Cmd, Evt]( init: TcpPipelineHandler.Init[Ctx, Cmd, Evt], connection: ActorRef, handler: ActorRef) - extends Actor { + extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import init._ import TcpPipelineHandler._ @@ -150,4 +151,4 @@ class TcpReadWriteAdapter[Ctx <: PipelineContext] extends PipelineStage[Ctx, Byt case cmd: Tcp.Command ⇒ ctx.singleCommand(cmd) } } -} \ No newline at end of file +} diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index 6beded47c8..5407d16bd7 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -12,13 +12,16 @@ import java.nio.channels.DatagramChannel import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.util.control.NonFatal +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API */ -private[io] class UdpConnection(val udpConn: UdpConnectedExt, - val commander: ActorRef, - val connect: Connect) extends Actor with ActorLogging { +private[io] class UdpConnection( + val udpConn: UdpConnectedExt, + val commander: ActorRef, + val connect: Connect) + extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { def selector: ActorRef = context.parent diff --git a/akka-actor/src/main/scala/akka/io/UdpListener.scala b/akka-actor/src/main/scala/akka/io/UdpListener.scala index aab2608323..ef3cf9b3ee 100644 --- a/akka-actor/src/main/scala/akka/io/UdpListener.scala +++ b/akka-actor/src/main/scala/akka/io/UdpListener.scala @@ -13,14 +13,16 @@ import java.nio.channels.DatagramChannel import java.nio.channels.SelectionKey._ import scala.annotation.tailrec import scala.util.control.NonFatal +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API */ -private[io] class UdpListener(val udp: UdpExt, - val bindCommander: ActorRef, - val bind: Bind) - extends Actor with ActorLogging with WithUdpSend { +private[io] class UdpListener( + val udp: UdpExt, + val bindCommander: ActorRef, + val bind: Bind) + extends Actor with ActorLogging with WithUdpSend with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import udp.bufferPool import udp.settings._ diff --git a/akka-actor/src/main/scala/akka/io/UdpSender.scala b/akka-actor/src/main/scala/akka/io/UdpSender.scala index 5f45550591..74450c2991 100644 --- a/akka-actor/src/main/scala/akka/io/UdpSender.scala +++ b/akka-actor/src/main/scala/akka/io/UdpSender.scala @@ -10,12 +10,16 @@ import akka.io.SelectionHandler.{ ChannelRegistered, RegisterChannel } import scala.collection.immutable import akka.io.Inet.SocketOption import scala.util.control.NonFatal +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API */ -private[io] class UdpSender(val udp: UdpExt, options: immutable.Traversable[SocketOption], val commander: ActorRef) - extends Actor with ActorLogging with WithUdpSend { +private[io] class UdpSender( + val udp: UdpExt, + options: immutable.Traversable[SocketOption], + val commander: ActorRef) + extends Actor with ActorLogging with WithUdpSend with RequiresMessageQueue[UnboundedMessageQueueSemantics] { def selector: ActorRef = context.parent diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala index 8f95269b9d..e31a6fae87 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterDaemon.scala @@ -16,6 +16,7 @@ import akka.actor.SupervisorStrategy.Stop import akka.cluster.MemberStatus._ import akka.cluster.ClusterEvent._ import akka.actor.ActorSelection +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * Base trait for all cluster messages. All ClusterMessage's are serializable. @@ -175,7 +176,8 @@ private[cluster] object ClusterLeaderAction { * * Supervisor managing the different Cluster daemons. */ -private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging { +private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Actor with ActorLogging + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ // Important - don't use Cluster(context.system) here because that would // cause deadlock. The Cluster extension is currently being created and is waiting @@ -206,7 +208,8 @@ private[cluster] final class ClusterDaemon(settings: ClusterSettings) extends Ac * ClusterCoreDaemon and ClusterDomainEventPublisher can't be restarted because the state * would be obsolete. Shutdown the member if any those actors crashed. */ -private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging { +private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLogging + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ val publisher = context.actorOf(Props[ClusterDomainEventPublisher]. @@ -234,7 +237,8 @@ private[cluster] final class ClusterCoreSupervisor extends Actor with ActorLoggi /** * INTERNAL API. */ -private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging { +private[cluster] final class ClusterCoreDaemon(publisher: ActorRef) extends Actor with ActorLogging + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import ClusterLeaderAction._ import InternalClusterAction._ diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala index 8c4bc77b2e..a48c3a4121 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterEvent.scala @@ -11,6 +11,7 @@ import akka.cluster.ClusterEvent._ import akka.cluster.MemberStatus._ import akka.event.EventStream import akka.actor.AddressTerminated +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * Domain events published to the event bus. @@ -252,7 +253,8 @@ object ClusterEvent { * Responsible for domain event subscriptions and publishing of * domain events to event bus. */ -private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging { +private[cluster] final class ClusterDomainEventPublisher extends Actor with ActorLogging + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import InternalClusterAction._ var latestGossip: Gossip = Gossip.empty diff --git a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala index 436ee38203..0b7d7442a6 100644 --- a/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala +++ b/akka-cluster/src/main/scala/akka/cluster/ClusterReadView.scala @@ -9,6 +9,7 @@ import scala.collection.immutable import akka.actor.{ Actor, ActorRef, ActorSystemImpl, Address, Props } import akka.cluster.ClusterEvent._ import akka.actor.PoisonPill +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API @@ -40,7 +41,7 @@ private[akka] class ClusterReadView(cluster: Cluster) extends Closeable { // create actor that subscribes to the cluster eventBus to update current read view state private val eventBusListener: ActorRef = { - cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor { + cluster.system.asInstanceOf[ActorSystemImpl].systemActorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { override def preStart(): Unit = cluster.subscribe(self, classOf[ClusterDomainEvent]) override def postStop(): Unit = cluster.unsubscribe(self) diff --git a/akka-docs/rst/java/untyped-actors.rst b/akka-docs/rst/java/untyped-actors.rst index bd7b6355d3..a014d9358e 100644 --- a/akka-docs/rst/java/untyped-actors.rst +++ b/akka-docs/rst/java/untyped-actors.rst @@ -733,6 +733,12 @@ actor's state which have the same property. The :class:`Stash` trait’s implementation of :meth:`preRestart` will call ``unstashAll()``, which is usually the desired behavior. +.. note:: + + If you want to enforce that your actor can only work with an unbounded stash, + then you should use the ``UntypedActorWithUnboundedStash`` class instead. + + .. _killing-actors-java: Killing an Actor diff --git a/akka-docs/rst/scala/actors.rst b/akka-docs/rst/scala/actors.rst index 4070f7b027..472d94a636 100644 --- a/akka-docs/rst/scala/actors.rst +++ b/akka-docs/rst/scala/actors.rst @@ -837,6 +837,12 @@ actor's state which have the same property. The :class:`Stash` trait’s implementation of :meth:`preRestart` will call ``unstashAll()``, which is usually the desired behavior. +.. note:: + + If you want to enforce that your actor can only work with an unbounded stash, + then you should use the ``UnboundedStash`` trait instead. + + .. _killing-actors-scala: Killing an Actor diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala index 8b816c8820..94ddf5760c 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testconductor/Player.scala @@ -19,6 +19,7 @@ import akka.pattern.{ ask, pipe, AskTimeoutException } import akka.event.{ LoggingAdapter, Logging } import java.net.{ InetSocketAddress, ConnectException } import akka.remote.transport.ThrottlerTransportAdapter.{ SetThrottle, TokenBucket, Blackhole, Unthrottled } +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * The Player is the client component of the @@ -49,7 +50,7 @@ trait Player { this: TestConductorExt ⇒ if (_client ne null) throw new IllegalStateException("TestConductorClient already started") _client = system.actorOf(Props(classOf[ClientFSM], name, controllerAddr), "TestConductorClient") - val a = system.actorOf(Props(new Actor { + val a = system.actorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { var waiting: ActorRef = _ def receive = { case fsm: ActorRef ⇒ @@ -140,7 +141,8 @@ private[akka] object ClientFSM { * * INTERNAL API. */ -private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) extends Actor with LoggingFSM[ClientFSM.State, ClientFSM.Data] { +private[akka] class ClientFSM(name: RoleName, controllerAddr: InetSocketAddress) extends Actor + with LoggingFSM[ClientFSM.State, ClientFSM.Data] with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import ClientFSM._ val settings = TestConductor().Settings diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index 6dfae9130a..dc197ac999 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -390,7 +390,9 @@ private[remote] class EndpointWriter( codec: AkkaPduCodec, val refuseUid: Option[Int], val receiveBuffers: ConcurrentHashMap[Link, AckedReceiveBuffer[Message]], - val reliableDeliverySupervisor: Option[ActorRef]) extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with Stash with FSM[EndpointWriter.State, Unit] { + val reliableDeliverySupervisor: Option[ActorRef]) + extends EndpointActor(localAddress, remoteAddress, transport, settings, codec) with UnboundedStash + with FSM[EndpointWriter.State, Unit] { import EndpointWriter._ import context.dispatcher diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 9ee4668c3f..91273cc20b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -17,6 +17,7 @@ import scala.concurrent.{ ExecutionContext, Future } import scala.concurrent.forkjoin.ThreadLocalRandom import com.typesafe.config.Config import akka.ConfigurationException +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } /** * INTERNAL API @@ -31,7 +32,8 @@ private[akka] object RemoteActorRefProvider { case object WaitTransportShutdown extends TerminatorState case object Finished extends TerminatorState - private class RemotingTerminator(systemGuardian: ActorRef) extends Actor with FSM[TerminatorState, Option[Internals]] { + private class RemotingTerminator(systemGuardian: ActorRef) extends Actor with FSM[TerminatorState, Option[Internals]] + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import context.dispatcher startWith(Uninitialized, None) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala index 8276c2bda0..448640d253 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeploymentWatcher.scala @@ -9,6 +9,7 @@ import akka.actor.Terminated import akka.actor.Actor import akka.actor.ActorRef import akka.dispatch.sysmsg.DeathWatchNotification +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API @@ -23,7 +24,7 @@ private[akka] object RemoteDeploymentWatcher { * Responsible for cleaning up child references of remote deployed actors when remote node * goes down (jvm crash, network failure), i.e. triggered by [[akka.actor.AddressTerminated]]. */ -private[akka] class RemoteDeploymentWatcher extends Actor { +private[akka] class RemoteDeploymentWatcher extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import RemoteDeploymentWatcher._ var supervisors = Map.empty[ActorRef, InternalActorRef] diff --git a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala index 70701698db..9421ea10c0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteWatcher.scala @@ -14,6 +14,7 @@ import akka.actor.RootActorPath import akka.actor.Terminated import akka.actor.ExtendedActorSystem import akka.ConfigurationException +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * INTERNAL API @@ -92,7 +93,7 @@ private[akka] class RemoteWatcher( unreachableReaperInterval: FiniteDuration, heartbeatExpectedResponseAfter: FiniteDuration, numberOfEndHeartbeatRequests: Int) - extends Actor with ActorLogging { + extends Actor with ActorLogging with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import RemoteWatcher._ import context.dispatcher diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index a80804550e..2f1282df26 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -23,6 +23,7 @@ import scala.util.control.NonFatal import scala.util.{ Failure, Success } import akka.remote.transport.AkkaPduCodec.Message import java.util.concurrent.ConcurrentHashMap +import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } /** * INTERNAL API @@ -82,7 +83,7 @@ private[remote] object Remoting { case class RegisterTransportActor(props: Props, name: String) - private[Remoting] class TransportSupervisor extends Actor { + private[Remoting] class TransportSupervisor extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { override def supervisorStrategy = OneForOneStrategy() { case NonFatal(e) ⇒ Restart } @@ -348,7 +349,8 @@ private[remote] object EndpointManager { /** * INTERNAL API */ -private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Actor { +private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends Actor + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import EndpointManager._ import context.dispatcher diff --git a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala index 07ae4d9ef5..f7b8193214 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AbstractTransportAdapter.scala @@ -12,7 +12,7 @@ import akka.util.Timeout import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ ExecutionContext, Promise, Future } -import scala.util.Success +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } trait TransportAdapterProvider { /** @@ -162,7 +162,8 @@ abstract class ActorTransportAdapter(wrappedTransport: Transport, system: ActorS override def shutdown(): Unit = manager ! PoisonPill } -abstract class ActorTransportAdapterManager extends Actor { +abstract class ActorTransportAdapterManager extends Actor + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import ActorTransportAdapter.{ ListenUnderlying, ListenerRegistered } private var delayedEvents = immutable.Queue.empty[Any] diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index 0386e7d449..ec418b3ac4 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -22,6 +22,7 @@ import scala.collection.immutable import scala.concurrent.duration._ import scala.concurrent.{ Future, Promise } import scala.util.control.NonFatal +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } @SerialVersionUID(1L) class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException(msg, cause) with OnlyCauseStackTrace { @@ -227,7 +228,8 @@ private[transport] class ProtocolStateActor(initialData: InitialProtocolStateDat private val settings: AkkaProtocolSettings, private val codec: AkkaPduCodec, private val failureDetector: FailureDetector) - extends Actor with FSM[AssociationState, ProtocolStateData] { + extends Actor with FSM[AssociationState, ProtocolStateData] + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import ProtocolStateActor._ import context.dispatcher diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 85d48e2344..df4a96238b 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -22,6 +22,7 @@ import scala.math.min import scala.util.{ Success, Failure } import scala.util.control.NonFatal import akka.dispatch.sysmsg.{ Unwatch, Watch } +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } class ThrottlerProvider extends TransportAdapterProvider { @@ -336,7 +337,8 @@ private[transport] class ThrottledAssociation( val associationHandler: AssociationEventListener, val originalHandle: AssociationHandle, val inbound: Boolean) - extends Actor with LoggingFSM[ThrottledAssociation.ThrottlerState, ThrottledAssociation.ThrottlerData] { + extends Actor with LoggingFSM[ThrottledAssociation.ThrottlerState, ThrottledAssociation.ThrottlerData] + with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import ThrottledAssociation._ import context.dispatcher diff --git a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala index 4eab0c1dcd..8a45f713be 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/ZeroMQExtension.scala @@ -14,6 +14,7 @@ import java.util.concurrent.TimeUnit import akka.util.Timeout import org.zeromq.ZMQException import scala.concurrent.duration.FiniteDuration +import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } /** * A Model to represent a version of the zeromq library @@ -241,7 +242,7 @@ class ZeroMQExtension(system: ActorSystem) extends Extension { private val zeromqGuardian: ActorRef = { verifyZeroMQVersion() - system.actorOf(Props(new Actor { + system.actorOf(Props(new Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] { import SupervisorStrategy._ override def supervisorStrategy = OneForOneStrategy() { case ex: ZMQException if nonfatal(ex) ⇒ Resume