#2005 - Putting the required fields into RemoteTransport and took the opportunity to clean up use of ActorSystemImpl
This commit is contained in:
parent
7cc4c03018
commit
5f2b23c0c8
5 changed files with 16 additions and 16 deletions
|
|
@ -17,9 +17,9 @@ import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap
|
|||
import java.io.Closeable
|
||||
import akka.dispatch.Await.Awaitable
|
||||
import akka.dispatch.Await.CanAwait
|
||||
import java.util.concurrent.{ CountDownLatch, TimeoutException, RejectedExecutionException }
|
||||
import akka.util._
|
||||
import collection.immutable.Stack
|
||||
import java.util.concurrent.{ ThreadFactory, CountDownLatch, TimeoutException, RejectedExecutionException }
|
||||
|
||||
object ActorSystem {
|
||||
|
||||
|
|
@ -404,6 +404,11 @@ abstract class ExtendedActorSystem extends ActorSystem {
|
|||
*/
|
||||
def deathWatch: DeathWatch
|
||||
|
||||
/**
|
||||
* A ThreadFactory that can be used if the transport needs to create any Threads
|
||||
*/
|
||||
def threadFactory: ThreadFactory
|
||||
|
||||
/**
|
||||
* ClassLoader wrapper which is used for reflective accesses internally. This is set
|
||||
* to use the context class loader, if one is set, or the class loader which
|
||||
|
|
|
|||
|
|
@ -78,8 +78,7 @@ class RemoteActorRefProvider(
|
|||
_transport = {
|
||||
val fqn = remoteSettings.RemoteTransport
|
||||
val args = Seq(
|
||||
classOf[RemoteSettings] -> remoteSettings,
|
||||
classOf[ActorSystemImpl] -> system,
|
||||
classOf[ExtendedActorSystem] -> system,
|
||||
classOf[RemoteActorRefProvider] -> this)
|
||||
|
||||
system.dynamicAccess.createInstanceFor[RemoteTransport](fqn, args) match {
|
||||
|
|
|
|||
|
|
@ -5,13 +5,13 @@
|
|||
package akka.remote
|
||||
|
||||
import scala.reflect.BeanProperty
|
||||
import akka.actor.{ Terminated, LocalRef, InternalActorRef, AutoReceivedMessage, AddressFromURIString, Address, ActorSystemImpl, ActorSystem, ActorRef }
|
||||
import akka.dispatch.SystemMessage
|
||||
import akka.event.{ LoggingAdapter, Logging }
|
||||
import akka.AkkaException
|
||||
import akka.serialization.Serialization
|
||||
import akka.remote.RemoteProtocol._
|
||||
import akka.dispatch.ChildTerminated
|
||||
import akka.actor._
|
||||
|
||||
/**
|
||||
* Remote life-cycle events.
|
||||
|
|
@ -152,7 +152,7 @@ class RemoteTransportException(message: String, cause: Throwable) extends AkkaEx
|
|||
* be available (i.e. fully initialized) by the time the first message is
|
||||
* received or when the start() method returns, whatever happens first.
|
||||
*/
|
||||
abstract class RemoteTransport {
|
||||
abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: RemoteActorRefProvider) {
|
||||
/**
|
||||
* Shuts down the remoting
|
||||
*/
|
||||
|
|
@ -163,11 +163,6 @@ abstract class RemoteTransport {
|
|||
*/
|
||||
def address: Address
|
||||
|
||||
/**
|
||||
* The actor system, for which this transport is instantiated. Will publish to its eventStream.
|
||||
*/
|
||||
def system: ActorSystem
|
||||
|
||||
/**
|
||||
* Start up the transport, i.e. enable incoming connections.
|
||||
*/
|
||||
|
|
@ -197,7 +192,7 @@ abstract class RemoteTransport {
|
|||
override def toString = address.toString
|
||||
}
|
||||
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ActorSystemImpl) {
|
||||
class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) {
|
||||
|
||||
def originalReceiver = input.getRecipient.getPath
|
||||
|
||||
|
|
@ -216,7 +211,7 @@ trait RemoteMarshallingOps {
|
|||
|
||||
def log: LoggingAdapter
|
||||
|
||||
def system: ActorSystemImpl
|
||||
def system: ExtendedActorSystem
|
||||
|
||||
def provider: RemoteActorRefProvider
|
||||
|
||||
|
|
|
|||
|
|
@ -58,7 +58,7 @@ abstract class RemoteClient private[akka] (
|
|||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
|
||||
if (netty.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
if (netty.provider.remoteSettings.LogSend) log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
send((message, senderOption, recipient))
|
||||
} else {
|
||||
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", netty, remoteAddress)
|
||||
|
|
|
|||
|
|
@ -16,18 +16,19 @@ import org.jboss.netty.channel.{ ChannelHandlerContext, Channel }
|
|||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
|
||||
import org.jboss.netty.util.HashedWheelTimer
|
||||
import akka.actor.{ Address, ActorSystemImpl, ActorRef }
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
|
||||
import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted }
|
||||
import akka.util.NonFatal
|
||||
import akka.actor.{ ExtendedActorSystem, Address, ActorRef }
|
||||
|
||||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
*/
|
||||
class NettyRemoteTransport(val remoteSettings: RemoteSettings, val system: ActorSystemImpl, val provider: RemoteActorRefProvider)
|
||||
extends RemoteTransport with RemoteMarshallingOps {
|
||||
class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with RemoteMarshallingOps {
|
||||
|
||||
import provider.remoteSettings
|
||||
|
||||
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue