+rem #3485 Make dispatcher for all remoting actors configurable

This commit is contained in:
Endre Sándor Varga 2013-08-19 12:06:07 +02:00
parent ffea36a8c8
commit 01b3be1242
9 changed files with 61 additions and 25 deletions

View file

@ -85,6 +85,13 @@ akka {
# Acknowledgment timeout of management commands sent to the transport stack.
command-ack-timeout = 30 s
# If set to a nonempty string remoting will use the given dispatcher for
# its internal actors otherwise the default dispatcher is used. Please note
# that since remoting can load arbitrary 3rd party drivers (see
# "enabled-transport" and "adapters" entries) it is not guaranteed that
# every module will respect this setting.
use-dispatcher = ""
### Security settings
# Enable untrusted mode for full security of server managed actors, prevents
@ -328,6 +335,9 @@ akka {
# If set to "<id.of.dispatcher>" then the specified dispatcher
# will be used to accept inbound connections, and perform IO. If "" then
# dedicated threads will be used.
# Please note that the Netty driver only uses this configuration and does
# not read the "akka.remote.use-dispatcher" entry. Instead it has to be
# configured manually to point to the same dispatcher if needed.
use-dispatcher-for-io = ""
# Sets the high water mark for the in and outbound sockets,

View file

@ -313,7 +313,7 @@ private[remote] class ReliableDeliverySupervisor(
}
private def createWriter(): ActorRef = {
context.watch(context.actorOf(EndpointWriter.props(
context.watch(context.actorOf(RARP(context.system).configureDispatcher(EndpointWriter.props(
handleOrActive = currentHandle,
localAddress = localAddress,
remoteAddress = remoteAddress,
@ -321,7 +321,7 @@ private[remote] class ReliableDeliverySupervisor(
settings = settings,
AkkaPduProtobufCodec,
receiveBuffers = receiveBuffers,
reliableDeliverySupervisor = Some(self)).withDeploy(Deploy.local), "endpointWriter"))
reliableDeliverySupervisor = Some(self))).withDeploy(Deploy.local), "endpointWriter"))
}
}
@ -602,8 +602,8 @@ private[remote] class EndpointWriter(
private def startReadEndpoint(handle: AkkaProtocolHandle): Some[ActorRef] = {
val newReader =
context.watch(context.actorOf(
EndpointReader.props(localAddress, remoteAddress, transport, settings, codec,
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers).withDeploy(Deploy.local),
RARP(context.system).configureDispatcher(EndpointReader.props(localAddress, remoteAddress, transport, settings, codec,
msgDispatch, inbound, reliableDeliverySupervisor, receiveBuffers)).withDeploy(Deploy.local),
"endpointReader-" + AddressUrlEncoder(remoteAddress) + "-" + readerId.next()))
handle.readHandlerPromise.success(ActorHandleEventListener(newReader))
Some(newReader)

View file

@ -155,7 +155,9 @@ private[akka] class RemoteActorRefProvider(
def init(system: ActorSystemImpl): Unit = {
local.init(system)
remotingTerminator = system.systemActorOf(Props(classOf[RemotingTerminator], local.systemGuardian), "remoting-terminator")
remotingTerminator = system.systemActorOf(
remoteSettings.configureDispatcher(Props(classOf[RemotingTerminator], local.systemGuardian)),
"remoting-terminator")
val internals = Internals(
remoteDaemon = {
@ -187,11 +189,13 @@ private[akka] class RemoteActorRefProvider(
protected def createRemoteWatcher(system: ActorSystemImpl): ActorRef = {
import remoteSettings._
val failureDetector = createRemoteWatcherFailureDetector(system)
system.systemActorOf(RemoteWatcher.props(
system.systemActorOf(
configureDispatcher(
RemoteWatcher.props(
failureDetector,
heartbeatInterval = WatchHeartBeatInterval,
unreachableReaperInterval = WatchUnreachableReaperInterval,
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter),
heartbeatExpectedResponseAfter = WatchHeartbeatExpectedResponseAfter)),
"remote-watcher")
}
@ -203,7 +207,7 @@ private[akka] class RemoteActorRefProvider(
}
protected def createRemoteDeploymentWatcher(system: ActorSystemImpl): ActorRef =
system.systemActorOf(Props[RemoteDeploymentWatcher], "remote-deployment-watcher")
system.systemActorOf(remoteSettings.configureDispatcher(Props[RemoteDeploymentWatcher]), "remote-deployment-watcher")
def actorOf(system: ActorSystemImpl, props: Props, supervisor: InternalActorRef, path: ActorPath,
systemService: Boolean, deploy: Option[Deploy], lookupDeploy: Boolean, async: Boolean): InternalActorRef =

View file

@ -10,6 +10,7 @@ import akka.util.Timeout
import scala.collection.immutable
import akka.util.Helpers.Requiring
import akka.japi.Util._
import akka.actor.Props
final class RemoteSettings(val config: Config) {
import config._
@ -23,6 +24,10 @@ final class RemoteSettings(val config: Config) {
val LogRemoteLifecycleEvents: Boolean = getBoolean("akka.remote.log-remote-lifecycle-events")
val Dispatcher: String = getString("akka.remote.use-dispatcher")
def configureDispatcher(props: Props): Props = if (Dispatcher.isEmpty) props else props.withDispatcher(Dispatcher)
val ShutdownTimeout: Timeout = {
Timeout(Duration(getMilliseconds("akka.remote.shutdown-timeout"), MILLISECONDS))
} requiring (_.duration > Duration.Zero, "shutdown-timeout must be > 0")

View file

@ -35,7 +35,9 @@ private[remote] object AddressUrlEncoder {
/**
* INTERNAL API
*/
private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension
private[remote] case class RARP(provider: RemoteActorRefProvider) extends Extension {
def configureDispatcher(props: Props): Props = provider.remoteSettings.configureDispatcher(props)
}
/**
* INTERNAL API
*/
@ -89,7 +91,10 @@ private[remote] object Remoting {
}
def receive = {
case RegisterTransportActor(props, name) sender ! context.actorOf(props.withDeploy(Deploy.local), name)
case RegisterTransportActor(props, name)
sender ! context.actorOf(
RARP(context.system).configureDispatcher(props.withDeploy(Deploy.local)),
name)
}
}
@ -111,7 +116,9 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
import provider.remoteSettings._
val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(Props[TransportSupervisor], "transports")
val transportSupervisor = system.asInstanceOf[ActorSystemImpl].systemActorOf(
configureDispatcher(Props[TransportSupervisor]),
"transports")
override def localAddressForRemote(remote: Address): Address = Remoting.localAddressForRemote(transportMapping, remote)
@ -155,7 +162,8 @@ private[remote] class Remoting(_system: ExtendedActorSystem, _provider: RemoteAc
case None
log.info("Starting remoting")
val manager: ActorRef = system.asInstanceOf[ActorSystemImpl].systemActorOf(
Props(classOf[EndpointManager], provider.remoteSettings.config, log).withDeploy(Deploy.local), Remoting.EndpointManagerName)
configureDispatcher(Props(classOf[EndpointManager], provider.remoteSettings.config, log)).withDeploy(Deploy.local),
Remoting.EndpointManagerName)
endpointManager = Some(manager)
try {
@ -648,16 +656,16 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
writing: Boolean): ActorRef = {
assert(transportMapping contains localAddress)
if (writing) context.watch(context.actorOf(ReliableDeliverySupervisor.props(
if (writing) context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(ReliableDeliverySupervisor.props(
handleOption,
localAddress,
remoteAddress,
transport,
endpointSettings,
AkkaPduProtobufCodec,
receiveBuffers).withDeploy(Deploy.local),
receiveBuffers)).withDeploy(Deploy.local),
"reliableEndpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
else context.watch(context.actorOf(EndpointWriter.props(
else context.watch(context.actorOf(RARP(extendedSystem).configureDispatcher(EndpointWriter.props(
handleOption,
localAddress,
remoteAddress,
@ -665,7 +673,7 @@ private[remote] class EndpointManager(conf: Config, log: LoggingAdapter) extends
endpointSettings,
AkkaPduProtobufCodec,
receiveBuffers,
reliableDeliverySupervisor = None).withDeploy(Deploy.local),
reliableDeliverySupervisor = None)).withDeploy(Deploy.local),
"endpointWriter-" + AddressUrlEncoder(remoteAddress) + "-" + endpointId.next()))
}

View file

@ -115,27 +115,27 @@ private[transport] class AkkaProtocolManager(
val stateActorAssociationHandler = associationListener
val stateActorSettings = settings
val failureDetector = createTransportFailureDetector()
context.actorOf(Props(classOf[ProtocolStateActor],
context.actorOf(RARP(context.system).configureDispatcher(Props(classOf[ProtocolStateActor],
HandshakeInfo(stateActorLocalAddress, AddressUidExtension(context.system).addressUid, stateActorSettings.SecureCookie),
handle,
stateActorAssociationHandler,
stateActorSettings,
AkkaPduProtobufCodec,
failureDetector).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress))
failureDetector)).withDeploy(Deploy.local), actorNameFor(handle.remoteAddress))
case AssociateUnderlying(remoteAddress, statusPromise)
val stateActorLocalAddress = localAddress
val stateActorSettings = settings
val stateActorWrappedTransport = wrappedTransport
val failureDetector = createTransportFailureDetector()
context.actorOf(Props(classOf[ProtocolStateActor],
context.actorOf(RARP(context.system).configureDispatcher(Props(classOf[ProtocolStateActor],
HandshakeInfo(stateActorLocalAddress, AddressUidExtension(context.system).addressUid, stateActorSettings.SecureCookie),
remoteAddress,
statusPromise,
stateActorWrappedTransport,
stateActorSettings,
AkkaPduProtobufCodec,
failureDetector).withDeploy(Deploy.local), actorNameFor(remoteAddress))
failureDetector)).withDeploy(Deploy.local), actorNameFor(remoteAddress))
}
private def createTransportFailureDetector(): FailureDetector =

View file

@ -23,6 +23,7 @@ import scala.util.{ Success, Failure }
import scala.util.control.NonFatal
import akka.dispatch.sysmsg.{ Unwatch, Watch }
import akka.dispatch.{ UnboundedMessageQueueSemantics, RequiresMessageQueue }
import akka.remote.RARP
class ThrottlerProvider extends TransportAdapterProvider {
@ -288,7 +289,9 @@ private[transport] class ThrottlerManager(wrappedTransport: Transport) extends A
val managerRef = self
ThrottlerHandle(
originalHandle,
context.actorOf(Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound).withDeploy(Deploy.local),
context.actorOf(
RARP(context.system).configureDispatcher(
Props(classOf[ThrottledAssociation], managerRef, listener, originalHandle, inbound)).withDeploy(Deploy.local),
"throttler" + nextId()))
}
}

View file

@ -28,6 +28,7 @@ import scala.util.{ Failure, Success, Try }
import scala.util.control.{ NoStackTrace, NonFatal }
import akka.util.Helpers.Requiring
import akka.util.Helpers
import akka.remote.RARP
object NettyTransportSettings {
sealed trait Mode
@ -239,7 +240,11 @@ class NettyTransport(val settings: NettyTransportSettings, val system: ExtendedA
import NettyTransport._
import settings._
implicit val executionContext: ExecutionContext = system.dispatcher
implicit val executionContext: ExecutionContext =
settings.UseDispatcherForIo.orElse(RARP(system).provider.remoteSettings.Dispatcher match {
case "" None
case dispatcherName Some(dispatcherName)
}).map(system.dispatchers.lookup).getOrElse(system.dispatcher)
override val schemeIdentifier: String = (if (EnableSsl) "ssl." else "") + TransportMode
override def maximumPayloadBytes: Int = settings.MaxFrameSize

View file

@ -33,6 +33,7 @@ class RemoteConfigSpec extends AkkaSpec(
StartupTimeout.duration must be(10 seconds)
RetryGateClosedFor must be(Duration.Zero)
UnknownAddressGateClosedFor must be(1 minute)
Dispatcher must be === ""
UsePassiveConnections must be(true)
MaximumRetriesInWindow must be(3)
RetryWindow must be(60 seconds)