From 01b3be1242fdf61f1fb6182f901db1cc150b8677 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Mon, 19 Aug 2013 12:06:07 +0200 Subject: [PATCH] +rem #3485 Make dispatcher for all remoting actors configurable --- akka-remote/src/main/resources/reference.conf | 10 ++++++++ .../src/main/scala/akka/remote/Endpoint.scala | 8 +++---- .../akka/remote/RemoteActorRefProvider.scala | 18 ++++++++------ .../scala/akka/remote/RemoteSettings.scala | 5 ++++ .../src/main/scala/akka/remote/Remoting.scala | 24 ++++++++++++------- .../transport/AkkaProtocolTransport.scala | 8 +++---- .../transport/ThrottlerTransportAdapter.scala | 5 +++- .../transport/netty/NettyTransport.scala | 7 +++++- .../scala/akka/remote/RemoteConfigSpec.scala | 1 + 9 files changed, 61 insertions(+), 25 deletions(-) diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 7350d3d96f..3ab5d3d412 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -85,6 +85,13 @@ akka { # Acknowledgment timeout of management commands sent to the transport stack. command-ack-timeout = 30 s + # If set to a nonempty string remoting will use the given dispatcher for + # its internal actors otherwise the default dispatcher is used. Please note + # that since remoting can load arbitrary 3rd party drivers (see + # "enabled-transport" and "adapters" entries) it is not guaranteed that + # every module will respect this setting. + use-dispatcher = "" + ### Security settings # Enable untrusted mode for full security of server managed actors, prevents @@ -328,6 +335,9 @@ akka { # If set to "" then the specified dispatcher # will be used to accept inbound connections, and perform IO. If "" then # dedicated threads will be used. + # Please note that the Netty driver only uses this configuration and does + # not read the "akka.remote.use-dispatcher" entry. Instead it has to be + # configured manually to point to the same dispatcher if needed. use-dispatcher-for-io = "" # Sets the high water mark for the in and outbound sockets, diff --git a/akka-remote/src/main/scala/akka/remote/Endpoint.scala b/akka-remote/src/main/scala/akka/remote/Endpoint.scala index c3ab140e2d..209079aae0 100644 --- a/akka-remote/src/main/scala/akka/remote/Endpoint.scala +++ b/akka-remote/src/main/scala/akka/remote/Endpoint.scala @@ -313,7 +313,7 @@ private[remote] class ReliableDeliverySupervisor( } private def createWriter(): ActorRef = { - context.watch(context.actorOf(EndpointWriter.props( + context.watch(context.actorOf(RARP(context.system).configureDispatcher(EndpointWriter.props( handleOrActive = currentHandle, localAddress = localAddress, remoteAddress = remoteAddress, @@ -321,7 +321,7 @@ private[remote] class ReliableDeliverySupervisor( settings = settings, AkkaPduProtobufCodec, receiveBuffers = receiveBuffers, - reliableDeliverySupervisor = Some(self)).withDeploy(Deploy.local), "endpointWriter")) + reliableDeliverySupervisor = Some(self))).withDeploy(Deploy.local), "endpointWriter")) } } @@ -602,8 +602,8 @@ private[remote] class EndpointWriter( private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = { val newReader = context.watch(context.actorOf( - EndpointReader.props(localAddress, remoteAddress, transport, settings, codec, - msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local), + RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec, + msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local), "endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next())) handle.readHandlerPromise.success(ActorHandleEventListener(newReader)) Some(newReader) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index aa971b3b43..a84e572955 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -155,7 +155,9 @@ private[akka] class RemoteActorRefProvider( def init(system: ActorSystemImpl): Unit = { local.init(system) - remotingTerminator = system.systemActorOf(Props(classOf[RemotingTerminator], local.systemGuardian), "remoting-terminator") + remotingTerminator = system.systemActorOf( + remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)), + "remoting-terminator") val internals = Internals( remoteDaemon = { @@ -187,11 +189,13 @@ private[akka] class RemoteActorRefProvider( protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = { import remoteSettings._ val failureDetector = createRemoteWatcherFailureDetector(system) - system.systemActorOf(RemoteWatcher.props( - failureDetector, - heartbeatInterval = WatchHeartBeatInterval, - unreachableReaperInterval = WatchUnreachableReaperInterval, - heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter), + system.systemActorOf( + configureDispatcher( + RemoteWatcher.props( + failureDetector, + heartbeatInterval = WatchHeartBeatInterval, + unreachableReaperInterval = WatchUnreachableReaperInterval, + heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter)), "remote-watcher") } @@ -203,7 +207,7 @@ private[akka] class RemoteActorRefProvider( } protected def createRemoteDeploymentWatcher(system: ActorSystemImpl): ActorRef = - system.systemActorOf(Props[RemoteDeploymentWatcher], "remote-deployment-watcher") + system.systemActorOf(remoteSettings.configureDispatcher(Props[RemoteDeploymentWatcher]), "remote-deployment-watcher") def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath, systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef = diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index de0d8119a9..2a91568df1 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -10,6 +10,7 @@ import akka.util.Timeout import scala.collection.immutable import akka.util.Helpers.Requiring import akka.japi.Util._ +import akka.actor.Props final class RemoteSettings(val config: Config) { import config._ @@ -23,6 +24,10 @@ final class RemoteSettings(val config: Config) { val LogRemoteLifecycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events") + val Dispatcher: String = getString("akka.remote.use-dispatcher") + + def configureDispatcher(props: Props): Props = if (Dispatcher.isEmpty) props else props.withDispatcher(Dispatcher) + val ShutdownTimeout: Timeout = { Timeout(Duration(getMilliseconds("akka.remote.shutdown-timeout"), MILLISECONDS)) } requiring (_.duration > Duration.Zero, "shutdown-timeout must be > 0") diff --git a/akka-remote/src/main/scala/akka/remote/Remoting.scala b/akka-remote/src/main/scala/akka/remote/Remoting.scala index aa74a186af..c4dc61a10f 100644 --- a/akka-remote/src/main/scala/akka/remote/Remoting.scala +++ b/akka-remote/src/main/scala/akka/remote/Remoting.scala @@ -35,7 +35,9 @@ private[remote] object AddressUrlEncoder { /** * INTERNAL API */ -private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension +private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension { + def configureDispatcher(props: Props): Props = provider.remoteSettings.configureDispatcher(props) +} /** * INTERNAL API */ @@ -89,7 +91,10 @@ private[remote] object Remoting { } def receive = { - case RegisterTransportActor(props, name) ⇒ sender ! context.actorOf(props.withDeploy(Deploy.local), name) + case RegisterTransportActor(props, name) ⇒ + sender ! context.actorOf( + RARP(context.system).configureDispatcher(props.withDeploy(Deploy.local)), + name) } } @@ -111,7 +116,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc import provider.remoteSettings._ - val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[TransportSupervisor], "transports") + val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf( + configureDispatcher(Props[TransportSupervisor]), + "transports") override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote) @@ -155,7 +162,8 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc case None ⇒ log.info("Starting remoting") val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf( - Props(classOf[EndpointManager], provider.remoteSettings.config, log).withDeploy(Deploy.local), Remoting.EndpointManagerName) + configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local), + Remoting.EndpointManagerName) endpointManager = Some(manager) try { @@ -648,16 +656,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends writing: Boolean): ActorRef = { assert(transportMapping contains localAddress) - if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor.props( + if (writing) context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props( handleOption, localAddress, remoteAddress, transport, endpointSettings, AkkaPduProtobufCodec, - receiveBuffers).withDeploy(Deploy.local), + receiveBuffers)).withDeploy(Deploy.local), "reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) - else context.watch(context.actorOf(EndpointWriter.props( + else context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(EndpointWriter.props( handleOption, localAddress, remoteAddress, @@ -665,7 +673,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends endpointSettings, AkkaPduProtobufCodec, receiveBuffers, - reliableDeliverySupervisor = None).withDeploy(Deploy.local), + reliableDeliverySupervisor = None)).withDeploy(Deploy.local), "endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next())) } diff --git a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala index ad91494682..9a1ad860c7 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/AkkaProtocolTransport.scala @@ -115,27 +115,27 @@ private[transport] class AkkaProtocolManager( val stateActorAssociationHandler = associationListener val stateActorSettings = settings val failureDetector = createTransportFailureDetector() - context.actorOf(Props(classOf[ProtocolStateActor], + context.actorOf(RARP(context.system).configureDispatcher(Props(classOf[ProtocolStateActor], HandshakeInfo(stateActorLocalAddress, AddressUidExtension(context.system).addressUid, stateActorSettings.SecureCookie), handle, stateActorAssociationHandler, stateActorSettings, AkkaPduProtobufCodec, - failureDetector).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress)) + failureDetector)).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress)) case AssociateUnderlying(remoteAddress, statusPromise) ⇒ val stateActorLocalAddress = localAddress val stateActorSettings = settings val stateActorWrappedTransport = wrappedTransport val failureDetector = createTransportFailureDetector() - context.actorOf(Props(classOf[ProtocolStateActor], + context.actorOf(RARP(context.system).configureDispatcher(Props(classOf[ProtocolStateActor], HandshakeInfo(stateActorLocalAddress, AddressUidExtension(context.system).addressUid, stateActorSettings.SecureCookie), remoteAddress, statusPromise, stateActorWrappedTransport, stateActorSettings, AkkaPduProtobufCodec, - failureDetector).withDeploy(Deploy.local), actorNameFor(remoteAddress)) + failureDetector)).withDeploy(Deploy.local), actorNameFor(remoteAddress)) } private def createTransportFailureDetector(): FailureDetector = diff --git a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala index 373361ef39..2f80ccf475 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/ThrottlerTransportAdapter.scala @@ -23,6 +23,7 @@ import scala.util.{ Success, Failure } import scala.util.control.NonFatal import akka.dispatch.sysmsg.{ Unwatch, Watch } import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue } +import akka.remote.RARP class ThrottlerProvider extends TransportAdapterProvider { @@ -288,7 +289,9 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A val managerRef = self ThrottlerHandle( originalHandle, - context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound).withDeploy(Deploy.local), + context.actorOf( + RARP(context.system).configureDispatcher( + Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound)).withDeploy(Deploy.local), "throttler" + nextId())) } } diff --git a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala index 8843620b16..60322758f6 100644 --- a/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/transport/netty/NettyTransport.scala @@ -28,6 +28,7 @@ import scala.util.{ Failure, Success, Try } import scala.util.control.{ NoStackTrace, NonFatal } import akka.util.Helpers.Requiring import akka.util.Helpers +import akka.remote.RARP object NettyTransportSettings { sealed trait Mode @@ -239,7 +240,11 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA import NettyTransport._ import settings._ - implicit val executionContext: ExecutionContext = system.dispatcher + implicit val executionContext: ExecutionContext = + settings.UseDispatcherForIo.orElse(RARP(system).provider.remoteSettings.Dispatcher match { + case "" ⇒ None + case dispatcherName ⇒ Some(dispatcherName) + }).map(system.dispatchers.lookup).getOrElse(system.dispatcher) override val schemeIdentifier: String = (if (EnableSsl) "ssl." else "") + TransportMode override def maximumPayloadBytes: Int = settings.MaxFrameSize diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index ca8e455e12..5d905c2470 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -33,6 +33,7 @@ class RemoteConfigSpec extends AkkaSpec( StartupTimeout.duration must be(10 seconds) RetryGateClosedFor must be(Duration.Zero) UnknownAddressGateClosedFor must be(1 minute) + Dispatcher must be === "" UsePassiveConnections must be(true) MaximumRetriesInWindow must be(3) RetryWindow must be(60 seconds)