From 5f2b23c0c8bf26287d3bec743bc27de719d72fee Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 23 Apr 2012 18:59:49 +0200 Subject: [PATCH] #2005 - Putting the required fields into RemoteTransport and took the opportunity to clean up use of ActorSystemImpl --- .../src/main/scala/akka/actor/ActorSystem.scala | 7 ++++++- .../scala/akka/remote/RemoteActorRefProvider.scala | 3 +-- .../main/scala/akka/remote/RemoteTransport.scala | 13 ++++--------- .../src/main/scala/akka/remote/netty/Client.scala | 2 +- .../akka/remote/netty/NettyRemoteSupport.scala | 7 ++++--- 5 files changed, 16 insertions(+), 16 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4112905711..486dbe3ae5 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -17,9 +17,9 @@ import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap import java.io.Closeable import akka.dispatch.Await.Awaitable import akka.dispatch.Await.CanAwait -import java.util.concurrent.{ CountDownLatch, TimeoutException, RejectedExecutionException } import akka.util._ import collection.immutable.Stack +import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException } object ActorSystem { @@ -404,6 +404,11 @@ abstract class ExtendedActorSystem extends ActorSystem { */ def deathWatch: DeathWatch + /** + * A ThreadFactory that can be used if the transport needs to create any Threads + */ + def threadFactory: ThreadFactory + /** * ClassLoader wrapper which is used for reflective accesses internally. This is set * to use the context class loader, if one is set, or the class loader which diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index 5870af9f95..a4d9a8d0c6 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -78,8 +78,7 @@ class RemoteActorRefProvider( _transport = { val fqn = remoteSettings.RemoteTransport val args = Seq( - classOf[RemoteSettings] -> remoteSettings, - classOf[ActorSystemImpl] -> system, + classOf[ExtendedActorSystem] -> system, classOf[RemoteActorRefProvider] -> this) system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match { diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 703803163a..3bade97460 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -5,13 +5,13 @@ package akka.remote import scala.reflect.BeanProperty -import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef } import akka.dispatch.SystemMessage import akka.event.{ LoggingAdapter, Logging } import akka.AkkaException import akka.serialization.Serialization import akka.remote.RemoteProtocol._ import akka.dispatch.ChildTerminated +import akka.actor._ /** * Remote life-cycle events. @@ -152,7 +152,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx * be available (i.e. fully initialized) by the time the first message is * received or when the start() method returns, whatever happens first. */ -abstract class RemoteTransport { +abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) { /** * Shuts down the remoting */ @@ -163,11 +163,6 @@ abstract class RemoteTransport { */ def address: Address - /** - * The actor system, for which this transport is instantiated. Will publish to its eventStream. - */ - def system: ActorSystem - /** * Start up the transport, i.e. enable incoming connections. */ @@ -197,7 +192,7 @@ abstract class RemoteTransport { override def toString = address.toString } -class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) { +class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) { def originalReceiver = input.getRecipient.getPath @@ -216,7 +211,7 @@ trait RemoteMarshallingOps { def log: LoggingAdapter - def system: ActorSystemImpl + def system: ExtendedActorSystem def provider: RemoteActorRefProvider diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index a0e91398fc..7baf3011ee 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -58,7 +58,7 @@ abstract class RemoteClient private[akka] ( * Converts the message to the wireprotocol and sends the message across the wire */ def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { - if (netty.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) + if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient) send((message, senderOption, recipient)) } else { val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 6062321841..cf859c3db2 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,18 +16,19 @@ import org.jboss.netty.channel.{ ChannelHandlerContext, Channel } import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor import org.jboss.netty.util.HashedWheelTimer -import akka.actor.{ Address, ActorSystemImpl, ActorRef } import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } import akka.util.NonFatal +import akka.actor.{ ExtendedActorSystem, Address, ActorRef } /** * Provides the implementation of the Netty remote support */ -class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: ActorSystemImpl, val provider: RemoteActorRefProvider) - extends RemoteTransport with RemoteMarshallingOps { +class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with RemoteMarshallingOps { + + import provider.remoteSettings val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)