diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 369e1429db..56c3389072 100755 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -298,12 +298,13 @@ class LocalActorRefProvider( def this(_systemName: String, settings: ActorSystem.Settings, eventStream: EventStream, - scheduler: Scheduler) = + scheduler: Scheduler, + classloader: ClassLoader) = this(_systemName, settings, eventStream, scheduler, - new Deployer(settings)) + new Deployer(settings, classloader)) val rootPath: ActorPath = RootActorPath(Address("akka", _systemName)) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 81e0761bb0..e3235a5cec 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -322,6 +322,14 @@ abstract class ExtendedActorSystem extends ActorSystem { */ def deathWatch: DeathWatch + /** + * ClassLoader which is used for reflective accesses internally. This is set + * to the context class loader, if one is set, or the class loader which + * loaded the ActorSystem implementation. The context class loader is also + * set on all threads created by the ActorSystem, if one was set during + * creation. + */ + def internalClassLoader: ClassLoader } class ActorSystemImpl(val name: String, applicationConfig: Config) extends ExtendedActorSystem { @@ -386,16 +394,17 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten val scheduler: Scheduler = createScheduler() + val internalClassLoader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader + val provider: ActorRefProvider = { val arguments = Seq( classOf[String] -> name, classOf[Settings] -> settings, classOf[EventStream] -> eventStream, - classOf[Scheduler] -> scheduler) + classOf[Scheduler] -> scheduler, + classOf[ClassLoader] -> internalClassLoader) - val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader - - ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, loader) match { + ReflectiveAccess.createInstance[ActorRefProvider](ProviderClass, arguments, internalClassLoader) match { case Left(e) ⇒ throw e case Right(p) ⇒ p } @@ -416,7 +425,9 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten def locker: Locker = provider.locker - val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler)) + val dispatchers: Dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites( + threadFactory, eventStream, deadLetterMailbox, scheduler, internalClassLoader)) + val dispatcher: MessageDispatcher = dispatchers.defaultGlobalDispatcher def terminationFuture: Future[Unit] = provider.terminationFuture @@ -533,10 +544,9 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Exten private def loadExtensions() { import scala.collection.JavaConversions._ - val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse this.getClass.getClassLoader settings.config.getStringList("akka.extensions") foreach { fqcn ⇒ import ReflectiveAccess.{ getObjectFor, createInstance, noParams, noArgs } - getObjectFor[AnyRef](fqcn, loader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { + getObjectFor[AnyRef](fqcn, internalClassLoader).fold(_ ⇒ createInstance[AnyRef](fqcn, noParams, noArgs), Right(_)) match { case Right(p: ExtensionIdProvider) ⇒ registerExtension(p.lookup()); case Right(p: ExtensionId[_]) ⇒ registerExtension(p); case Right(other) ⇒ log.error("[{}] is not an 'ExtensionIdProvider' or 'ExtensionId', skipping...", fqcn) diff --git a/akka-actor/src/main/scala/akka/actor/Deployer.scala b/akka-actor/src/main/scala/akka/actor/Deployer.scala index c8e780d5c2..36d82b2cec 100644 --- a/akka-actor/src/main/scala/akka/actor/Deployer.scala +++ b/akka-actor/src/main/scala/akka/actor/Deployer.scala @@ -23,7 +23,7 @@ case object LocalScope extends Scope * * @author Jonas Bonér */ -class Deployer(val settings: ActorSystem.Settings) { +class Deployer(val settings: ActorSystem.Settings, val classloader: ClassLoader) { import scala.collection.JavaConverters._ @@ -41,7 +41,6 @@ class Deployer(val settings: ActorSystem.Settings) { def deploy(d: Deploy): Unit = deployments.put(d.path, d) protected def parseConfig(key: String, config: Config): Option[Deploy] = { - import akka.util.ReflectiveAccess.getClassFor val deployment = config.withFallback(default) @@ -65,8 +64,8 @@ class Deployer(val settings: ActorSystem.Settings) { case "scatter-gather" ⇒ ScatterGatherFirstCompletedRouter(nrOfInstances, routees, within, resizer) case "broadcast" ⇒ BroadcastRouter(nrOfInstances, routees, resizer) case fqn ⇒ - val constructorSignature = Array[Class[_]](classOf[Config]) - ReflectiveAccess.createInstance[RouterConfig](fqn, constructorSignature, Array[AnyRef](deployment)) match { + val args = Seq(classOf[Config] -> deployment) + ReflectiveAccess.createInstance[RouterConfig](fqn, args, classloader) match { case Right(router) ⇒ router case Left(exception) ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 2f6b330cc8..3a788c0fd7 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -322,8 +322,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit } case fqcn ⇒ val args = Seq(classOf[Config] -> config) - val loader = Option(Thread.currentThread.getContextClassLoader) getOrElse getClass.getClassLoader - ReflectiveAccess.createInstance[MailboxType](fqcn, args, loader) match { + ReflectiveAccess.createInstance[MailboxType](fqcn, args, prerequisites.classloader) match { case Right(instance) ⇒ instance case Left(exception) ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index 3871905905..17a2410784 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -19,13 +19,15 @@ trait DispatcherPrerequisites { def eventStream: EventStream def deadLetterMailbox: Mailbox def scheduler: Scheduler + def classloader: ClassLoader } case class DefaultDispatcherPrerequisites( val threadFactory: ThreadFactory, val eventStream: EventStream, val deadLetterMailbox: Mailbox, - val scheduler: Scheduler) extends DispatcherPrerequisites + val scheduler: Scheduler, + val classloader: ClassLoader) extends DispatcherPrerequisites object Dispatchers { /** @@ -134,8 +136,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc case "BalancingDispatcher" ⇒ new BalancingDispatcherConfigurator(cfg, prerequisites) case "PinnedDispatcher" ⇒ new PinnedDispatcherConfigurator(cfg, prerequisites) case fqn ⇒ - val constructorSignature = Array[Class[_]](classOf[Config], classOf[DispatcherPrerequisites]) - ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, constructorSignature, Array[AnyRef](cfg, prerequisites)) match { + val args = Seq(classOf[Config] -> cfg, classOf[DispatcherPrerequisites] -> prerequisites) + ReflectiveAccess.createInstance[MessageDispatcherConfigurator](fqn, args, prerequisites.classloader) match { case Right(configurator) ⇒ configurator case Left(exception) ⇒ throw new IllegalArgumentException( diff --git a/akka-actor/src/main/scala/akka/event/Logging.scala b/akka-actor/src/main/scala/akka/event/Logging.scala index 35a84b511f..c8bbe5f9eb 100644 --- a/akka-actor/src/main/scala/akka/event/Logging.scala +++ b/akka-actor/src/main/scala/akka/event/Logging.scala @@ -101,7 +101,7 @@ trait LoggingBus extends ActorEventBus { if loggerName != StandardOutLoggerName } yield { try { - ReflectiveAccess.getClassFor[Actor](loggerName) match { + ReflectiveAccess.getClassFor[Actor](loggerName, system.internalClassLoader) match { case Right(actorClass) ⇒ addLogger(system, actorClass, level, logName) case Left(exception) ⇒ throw exception } diff --git a/akka-docs/general/configuration.rst b/akka-docs/general/configuration.rst index 8109e7a358..1a01ba24c9 100644 --- a/akka-docs/general/configuration.rst +++ b/akka-docs/general/configuration.rst @@ -217,6 +217,20 @@ and parsed by the actor system can be displayed like this: println(system.settings()); // this is a shortcut for system.settings().config().root().render() +A Word About ClassLoaders +------------------------- + +In several places of the configuration file it is possible to specify the +fully-qualified class name of something to be instantiated by Akka. This is +done using Java reflection, which in turn uses a :class:`ClassLoader`. Getting +the right one in challenging environments like application containers or OSGi +bundles is not always trivial, the current approach of Akka is that each +:class:`ActorSystem` implementation stores the current thread’s context class +loader (if available, otherwise just its own loader as in +``this.getClass.getClassLoader``) and uses that for all reflective accesses. +This implies that putting Akka on the boot class path will yield +:class:`NullPointerException` from strange places: this is simply not +supported. Application specific settings ----------------------------- diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala index 6638c380bf..5e319dafac 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableMailbox.scala @@ -3,7 +3,7 @@ */ package akka.actor.mailbox -import akka.actor.{ ActorContext, ActorRef } +import akka.actor.{ ActorContext, ActorRef, ExtendedActorSystem } import akka.dispatch.{ Envelope, DefaultSystemMessageQueue, CustomMailbox } import akka.remote.MessageSerializer import akka.remote.RemoteProtocol.{ ActorRefProtocol, RemoteMessageProtocol } @@ -15,7 +15,7 @@ private[akka] object DurableExecutableMailboxConfig { abstract class DurableMailbox(owner: ActorContext) extends CustomMailbox(owner) with DefaultSystemMessageQueue { import DurableExecutableMailboxConfig._ - def system = owner.system + def system: ExtendedActorSystem = owner.system.asInstanceOf[ExtendedActorSystem] def ownerPath = owner.self.path val ownerPathString = ownerPath.elements.mkString("/") val name = "mailbox_" + Name.replaceAllIn(ownerPathString, "_") diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala index 217f46d6ec..cc36286e36 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/BSONSerialization.scala @@ -16,11 +16,9 @@ import org.bson.DefaultBSONSerializer import akka.remote.RemoteProtocol.MessageProtocol import akka.remote.MessageSerializer -import akka.actor.{ ActorSystem, ActorSystemImpl } +import akka.actor.ExtendedActorSystem -class BSONSerializableMailbox(system: ActorSystem) extends SerializableBSONObject[MongoDurableMessage] with Logging { - - val systemImpl = system.asInstanceOf[ActorSystemImpl] +class BSONSerializableMailbox(system: ExtendedActorSystem) extends SerializableBSONObject[MongoDurableMessage] with Logging { protected[akka] def serializeDurableMsg(msg: MongoDurableMessage)(implicit serializer: BSONSerializer) = { @@ -67,10 +65,10 @@ class BSONSerializableMailbox(system: ActorSystem) extends SerializableBSONObjec val doc = deserializer.decodeAndFetch(in).asInstanceOf[BSONDocument] system.log.debug("Deserializing a durable message from MongoDB: {}", doc) val msgData = MessageProtocol.parseFrom(doc.as[org.bson.types.Binary]("message").getData) - val msg = MessageSerializer.deserialize(system, msgData, getClass.getClassLoader) + val msg = MessageSerializer.deserialize(system, msgData, system.internalClassLoader) val ownerPath = doc.as[String]("ownerPath") val senderPath = doc.as[String]("senderPath") - val sender = systemImpl.actorFor(senderPath) + val sender = system.actorFor(senderPath) MongoDurableMessage(ownerPath, msg, sender) } diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 5b96b15d43..86de93527c 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -55,69 +55,66 @@ akka { # If this is "on", Akka will log all outbound messages at DEBUG level, if off then they are not logged log-sent-messages = off - # Each property is annotated with (C) or (S) or (C&S), where C stands for “client” and S for “server” role. + # Each property is annotated with (I) or (O) or (I&O), where I stands for “inbound” and O for “outbound” connections. # The NettyRemoteTransport always starts the server role to allow inbound connections, and it starts # active client connections whenever sending to a destination which is not yet connected; if configured # it reuses inbound connections for replies, which is called a passive client connection (i.e. from server # to client). netty { - # (C) In case of increased latency / overflow how long + # (O) In case of increased latency / overflow how long # should we wait (blocking the sender) until we deem the send to be cancelled? # 0 means "never backoff", any positive number will indicate time to block at most. backoff-timeout = 0ms - # (C&S) Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' + # (I&O) Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' # or using 'akka.util.Crypt.generateSecureCookie' secure-cookie = "" - # (S) Should the remote server require that it peers share the same secure-cookie + # (I) Should the remote server require that it peers share the same secure-cookie # (defined in the 'remote' section)? require-cookie = off - # (S) Reuse inbound connections for outbound messages + # (I) Reuse inbound connections for outbound messages use-passive-connections = on - # (C&S) Whether any Threds created by the remoting should be daemons or not - daemonic = on - - # (S) The hostname or ip to bind the remoting to, + # (I) The hostname or ip to bind the remoting to, # InetAddress.getLocalHost.getHostAddress is used if empty hostname = "" - # (S) The default remote server port clients should connect to. + # (I) The default remote server port clients should connect to. # Default is 2552 (AKKA), use 0 if you want a random available port port = 2552 - # (C&S) Increase this if you want to be able to send messages with large payloads + # (I&O) Increase this if you want to be able to send messages with large payloads message-frame-size = 1 MiB - # (C) Timeout duration + # (O) Timeout duration connection-timeout = 120s - # (S) Sets the size of the connection backlog + # (I) Sets the size of the connection backlog backlog = 4096 - # (S) Length in akka.time-unit how long core threads will be kept alive if idling + # (I) Length in akka.time-unit how long core threads will be kept alive if idling execution-pool-keepalive = 60s - # (S) Size of the core pool of the remote execution unit + # (I) Size of the core pool of the remote execution unit execution-pool-size = 4 - # (S) Maximum channel size, 0 for off + # (I) Maximum channel size, 0 for off max-channel-memory-size = 0b - # (S) Maximum total size of all channels, 0 for off + # (I) Maximum total size of all channels, 0 for off max-total-memory-size = 0b - # (C) Time between reconnect attempts for active clients + # (O) Time between reconnect attempts for active clients reconnect-delay = 5s - # (C) Inactivity period after which active client connection is shutdown; will be + # (O) Inactivity period after which active client connection is shutdown; will be # re-established in case of new communication requests read-timeout = 3600s - # (C) Maximum time window that a client should try to reconnect for + # (O) Maximum time window that a client should try to reconnect for reconnection-time-window = 600s } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 61b675fe55..6081372e6b 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -22,11 +22,12 @@ class RemoteActorRefProvider( val systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, - val scheduler: Scheduler) extends ActorRefProvider { + val scheduler: Scheduler, + val classloader: ClassLoader) extends ActorRefProvider { val remoteSettings = new RemoteSettings(settings.config, systemName) - val deployer = new RemoteDeployer(settings) + val deployer = new RemoteDeployer(settings, classloader) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) @@ -87,7 +88,7 @@ class RemoteActorRefProvider( classOf[ActorSystemImpl] -> system, classOf[RemoteActorRefProvider] -> this) - ReflectiveAccess.createInstance[RemoteTransport](fqn, args, getClass.getClassLoader) match { + ReflectiveAccess.createInstance[RemoteTransport](fqn, args, system.internalClassLoader) match { case Left(problem) ⇒ throw new RemoteTransportException("Could not load remote transport layer " + fqn, problem) case Right(remote) ⇒ remote } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala index 1274e99416..bda71bcc00 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDaemon.scala @@ -17,8 +17,10 @@ case class DaemonMsgWatch(watcher: ActorRef, watched: ActorRef) extends DaemonMs * Internal system "daemon" actor for remote internal communication. * * It acts as the brain of the remote that responds to system remote events (messages) and undertakes action. + * + * INTERNAL USE ONLY! */ -class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) +private[akka] class RemoteSystemDaemon(system: ActorSystemImpl, _path: ActorPath, _parent: InternalActorRef, _log: LoggingAdapter) extends VirtualPathContainer(system.provider, _path, _parent, _log) { /** diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index 6ce486b464..799bba13e3 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -10,7 +10,7 @@ import akka.config.ConfigurationException case class RemoteScope(node: Address) extends Scope -class RemoteDeployer(_settings: ActorSystem.Settings) extends Deployer(_settings) { +class RemoteDeployer(_settings: ActorSystem.Settings, _classloader: ClassLoader) extends Deployer(_settings, _classloader) { override protected def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 2d7d6218f7..2947d9db26 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -158,11 +158,12 @@ class ActiveRemoteClient private[akka] ( executionHandler = new ExecutionHandler(netty.executor) - bootstrap = new ClientBootstrap(netty.clientChannelFactory) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("keepAlive", true) - bootstrap.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) + val b = new ClientBootstrap(netty.clientChannelFactory) + b.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, b, executionHandler, remoteAddress, this)) + b.setOption("tcpNoDelay", true) + b.setOption("keepAlive", true) + b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis) + bootstrap = b val remoteIP = InetAddress.getByName(remoteAddress.host.get) log.debug("Starting remote client connection to [{}|{}]", remoteAddress, remoteIP) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 4179beeb3d..e9fe83dd7e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -33,20 +33,19 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName) - val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic, Some(getClass.getClassLoader)) - val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory) + val timer: HashedWheelTimer = new HashedWheelTimer(system.threadFactory) val executor = new OrderedMemoryAwareThreadPoolExecutor( settings.ExecutionPoolSize, settings.MaxChannelMemorySize, settings.MaxTotalMemorySize, - settings.ExecutionPoolKeepAlive.length, - settings.ExecutionPoolKeepAlive.unit, - threadFactory) + settings.ExecutionPoolKeepalive.length, + settings.ExecutionPoolKeepalive.unit, + system.threadFactory) val clientChannelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(threadFactory), - Executors.newCachedThreadPool(threadFactory)) + Executors.newCachedThreadPool(system.threadFactory), + Executors.newCachedThreadPool(system.threadFactory)) private val remoteClients = new HashMap[Address, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -79,7 +78,11 @@ class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: Actor def shutdown(): Unit = { clientsLock.writeLock().lock() try { - remoteClients foreach { case (_, client) ⇒ client.shutdown() } + remoteClients foreach { + case (_, client) ⇒ try client.shutdown() catch { + case e ⇒ log.error(e, "failure while shutting down [{}]", client) + } + } remoteClients.clear() } finally { clientsLock.writeLock().unlock() diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 9ebeb8f3e8..a8bc6ef67b 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -29,28 +29,29 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) { val ip = InetAddress.getByName(settings.Hostname) private val factory = new NioServerSocketChannelFactory( - Executors.newCachedThreadPool(netty.threadFactory), - Executors.newCachedThreadPool(netty.threadFactory)) - - private val bootstrap = new ServerBootstrap(factory) + Executors.newCachedThreadPool(netty.system.threadFactory), + Executors.newCachedThreadPool(netty.system.threadFactory)) private val executionHandler = new ExecutionHandler(netty.executor) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(openChannels, executionHandler, netty) - bootstrap.setPipelineFactory(pipelineFactory) - bootstrap.setOption("backlog", settings.Backlog) - bootstrap.setOption("tcpNoDelay", true) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setOption("reuseAddress", true) + private val bootstrap = { + val b = new ServerBootstrap(factory) + b.setPipelineFactory(new RemoteServerPipelineFactory(openChannels, executionHandler, netty)) + b.setOption("backlog", settings.Backlog) + b.setOption("tcpNoDelay", true) + b.setOption("child.keepAlive", true) + b.setOption("reuseAddress", true) + b + } @volatile private[akka] var channel: Channel = _ def start(): Unit = { - channel = bootstrap.bind(new InetSocketAddress(ip, settings.Port)) + channel = bootstrap.bind(new InetSocketAddress(ip, settings.DesiredPortFromConfig)) openChannels.add(channel) netty.notifyListeners(RemoteServerStarted(netty)) } @@ -62,7 +63,7 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) { b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder .setSystem(settings.systemName) .setHostname(settings.Hostname) - .setPort(settings.Port) + .setPort(settings.DesiredPortFromConfig) .build) if (settings.SecureCookie.nonEmpty) b.setCookie(settings.SecureCookie.get) @@ -139,6 +140,7 @@ class RemoteServerHandler( private var addressToSet = true + // TODO look into moving that into onBind or similar, but verify that that is guaranteed to be the first to be called override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (addressToSet) { netty.setAddressFromChannel(event.getChannel) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index 123e2d53dc..3f7c8f83de 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -13,7 +13,6 @@ class NettySettings(config: Config, val systemName: String) { import config._ - val Daemonic = getBoolean("daemonic") val BackoffTimeout = Duration(getMilliseconds("backoff-timeout"), MILLISECONDS) val SecureCookie: Option[String] = getString("secure-cookie") match { @@ -38,13 +37,13 @@ class NettySettings(config: Config, val systemName: String) { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } - val Port = getInt("port") + val DesiredPortFromConfig = getInt("port") val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) val Backlog = getInt("backlog") - val ExecutionPoolKeepAlive = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) + val ExecutionPoolKeepalive = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) val ExecutionPoolSize = getInt("execution-pool-size") match { case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 1") diff --git a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala index 0eb3caa8e4..8a6bad79fa 100644 --- a/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala +++ b/akka-remote/src/multi-jvm/scala/akka/remote/ScatterGatherRoutedRemoteActorMultiJvmSpec.scala @@ -89,7 +89,7 @@ class ScatterGatherRoutedRemoteActorMultiJvmNode4 extends AkkaRemoteSpec(Scatter } val replies = (receiveWhile(5 seconds, messages = connectionCount * iterationCount) { - case ref: ActorRef ⇒ (ref.asInstanceOf[ActorRef].path.address.hostPort, 1) + case ref: ActorRef ⇒ (ref.path.address.hostPort, 1) }).foldLeft(Map(akkaSpec(0) -> 0, akkaSpec(1) -> 0, akkaSpec(2) -> 0)) { case (m, (n, c)) ⇒ m + (n -> (m(n) + c)) } diff --git a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala index 837846d058..b60b90b900 100644 --- a/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/RemoteConfigSpec.scala @@ -6,6 +6,8 @@ package akka.remote import akka.testkit.AkkaSpec import akka.actor.ExtendedActorSystem import akka.util.duration._ +import akka.util.Duration +import akka.remote.netty.NettyRemoteTransport @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class RemoteConfigSpec extends AkkaSpec( @@ -17,8 +19,9 @@ class RemoteConfigSpec extends AkkaSpec( } """) { - "RemoteExtension" must { - "be able to parse remote and cluster config elements" in { + "Remoting" must { + + "be able to parse generic remote config elements" in { val settings = system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].remoteSettings import settings._ @@ -33,5 +36,32 @@ class RemoteConfigSpec extends AkkaSpec( GossipFrequency must be(1 second) SeedNodes must be(Set()) } + + "be able to parse Netty config elements" in { + val settings = + system.asInstanceOf[ExtendedActorSystem] + .provider.asInstanceOf[RemoteActorRefProvider] + .transport.asInstanceOf[NettyRemoteTransport] + .settings + import settings._ + + BackoffTimeout must be(Duration.Zero) + SecureCookie must be(None) + RequireCookie must be(false) + UsePassiveConnections must be(true) + Hostname must not be "" // will be set to the local IP + DesiredPortFromConfig must be(2552) + MessageFrameSize must be(1048576) + ConnectionTimeout must be(2 minutes) + Backlog must be(4096) + ExecutionPoolKeepalive must be(1 minute) + ExecutionPoolSize must be(4) + MaxChannelMemorySize must be(0) + MaxTotalMemorySize must be(0) + ReconnectDelay must be(5 seconds) + ReadTimeout must be(1 hour) + ReconnectionTimeWindow must be(10 minutes) + } + } }