diff --git a/akka-actor/src/main/scala/akka/actor/ActorRef.scala b/akka-actor/src/main/scala/akka/actor/ActorRef.scala index 46e3440f95..460bd02076 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRef.scala @@ -154,17 +154,16 @@ trait ScalaActorRef { ref: ActorRef ⇒ } -//FIXME should ActorScope be private[akka], me thinks so - √ /** * All ActorRefs have a scope which describes where they live. Since it is * often necessary to distinguish between local and non-local references, this * is the only method provided on the scope. */ -trait ActorRefScope { +private[akka] trait ActorRefScope { def isLocal: Boolean } -trait LocalRef extends ActorRefScope { +private[akka] trait LocalRef extends ActorRefScope { final def isLocal = true } diff --git a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala index 65777d49ca..6bd61dd812 100644 --- a/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala +++ b/akka-remote/src/main/scala/akka/remote/MessageSerializer.scala @@ -9,8 +9,14 @@ import com.google.protobuf.ByteString import akka.actor.ExtendedActorSystem import akka.serialization.SerializationExtension -object MessageSerializer { +/** + * MessageSerializer is a helper for serialize and deserialize messages + */ +private[akka] object MessageSerializer { + /** + * Uses Akka Serialization for the specified ActorSystem to transform the given MessageProtocol to a message + */ def deserialize(system: ExtendedActorSystem, messageProtocol: MessageProtocol): AnyRef = { val clazz = if (messageProtocol.hasMessageManifest) { @@ -24,6 +30,9 @@ object MessageSerializer { } } + /** + * Uses Akka Serialization for the specified ActorSystem to transform the given message to a MessageProtocol + */ def serialize(system: ExtendedActorSystem, message: AnyRef): MessageProtocol = { val s = SerializationExtension(system) val serializer = s.findSerializerFor(message) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala index bf55edf24c..a12c5f5578 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala @@ -4,34 +4,26 @@ package akka.remote -import akka.AkkaException import akka.actor._ import akka.dispatch._ import akka.event.{ DeathWatch, Logging, LoggingAdapter } import akka.event.EventStream -import akka.ConfigurationException -import java.util.concurrent.{ TimeoutException } -import com.typesafe.config.Config import akka.serialization.Serialization import akka.serialization.SerializationExtension -class RemoteException(msg: String) extends AkkaException(msg) -class RemoteCommunicationException(msg: String) extends RemoteException(msg) -class RemoteConnectionException(msg: String) extends RemoteException(msg) - /** * Remote ActorRefProvider. Starts up actor on remote node and creates a RemoteActorRef representing it. */ -class RemoteActorRefProvider( +private[akka] class RemoteActorRefProvider( val systemName: String, val settings: ActorSystem.Settings, val eventStream: EventStream, val scheduler: Scheduler, val dynamicAccess: DynamicAccess) extends ActorRefProvider { - val remoteSettings = new RemoteSettings(settings.config, systemName) + val remoteSettings: RemoteSettings = new RemoteSettings(settings.config, systemName) - val deployer = new RemoteDeployer(settings, dynamicAccess) + val deployer: RemoteDeployer = new RemoteDeployer(settings, dynamicAccess) private val local = new LocalActorRefProvider(systemName, settings, eventStream, scheduler, deployer) @@ -39,21 +31,21 @@ class RemoteActorRefProvider( private var _log = local.log def log: LoggingAdapter = _log - def rootPath = local.rootPath - def deadLetters = local.deadLetters + override def rootPath: ActorPath = local.rootPath + override def deadLetters: InternalActorRef = local.deadLetters - val deathWatch = new RemoteDeathWatch(local.deathWatch, this) + override val deathWatch: DeathWatch = new RemoteDeathWatch(local.deathWatch, this) // these are only available after init() - def rootGuardian = local.rootGuardian - def guardian = local.guardian - def systemGuardian = local.systemGuardian - def terminationFuture = local.terminationFuture - def dispatcher = local.dispatcher - def registerTempActor(actorRef: InternalActorRef, path: ActorPath) = local.registerTempActor(actorRef, path) - def unregisterTempActor(path: ActorPath) = local.unregisterTempActor(path) - def tempPath() = local.tempPath() - def tempContainer = local.tempContainer + override def rootGuardian: InternalActorRef = local.rootGuardian + override def guardian: InternalActorRef = local.guardian + override def systemGuardian: InternalActorRef = local.systemGuardian + override def terminationFuture: Promise[Unit] = local.terminationFuture + override def dispatcher: MessageDispatcher = local.dispatcher + override def registerTempActor(actorRef: InternalActorRef, path: ActorPath): Unit = local.registerTempActor(actorRef, path) + override def unregisterTempActor(path: ActorPath): Unit = local.unregisterTempActor(path) + override def tempPath(): ActorPath = local.tempPath() + override def tempContainer: VirtualPathContainer = local.tempContainer @volatile private var _transport: RemoteTransport = _ @@ -61,13 +53,13 @@ class RemoteActorRefProvider( @volatile private var _serialization: Serialization = _ - def serialization = _serialization + def serialization: Serialization = _serialization @volatile private var _remoteDaemon: InternalActorRef = _ - def remoteDaemon = _remoteDaemon + def remoteDaemon: InternalActorRef = _remoteDaemon - def init(system: ActorSystemImpl) { + def init(system: ActorSystemImpl): Unit = { local.init(system) _remoteDaemon = new RemoteSystemDaemon(system, rootPath / "remote", rootGuardian, log) @@ -193,7 +185,7 @@ class RemoteActorRefProvider( /** * Using (checking out) actor on a specific node. */ - def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef) { + def useActorOnNode(path: ActorPath, props: Props, deploy: Deploy, supervisor: ActorRef): Unit = { log.debug("[{}] Instantiating Remote Actor [{}]", rootPath, path) // we don’t wait for the ACK, because the remote end will process this command before any other message to the new actor @@ -211,7 +203,7 @@ class RemoteActorRefProvider( } } -trait RemoteRef extends ActorRefScope { +private[akka] trait RemoteRef extends ActorRefScope { final def isLocal = false } @@ -256,7 +248,7 @@ private[akka] class RemoteActorRef private[akka] ( private def writeReplace(): AnyRef = SerializedActorRef(path) } -class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch { +private[akka] class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvider) extends DeathWatch { override def subscribe(watcher: ActorRef, watched: ActorRef): Boolean = watched match { case r: RemoteRef ⇒ @@ -275,5 +267,4 @@ class RemoteDeathWatch(val local: DeathWatch, val provider: RemoteActorRefProvid override def unsubscribe(watcher: ActorRef): Unit = local.unsubscribe(watcher) override def publish(event: Terminated): Unit = local.publish(event) - } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala index e869c4ef4c..25df64795d 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteDeployer.scala @@ -12,8 +12,7 @@ case class RemoteScope(node: Address) extends Scope { def withFallback(other: Scope): Scope = this } -class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends Deployer(_settings, _pm) { - +private[akka] class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extends Deployer(_settings, _pm) { override protected def parseConfig(path: String, config: Config): Option[Deploy] = { import scala.collection.JavaConverters._ @@ -30,5 +29,4 @@ class RemoteDeployer(_settings: ActorSystem.Settings, _pm: DynamicAccess) extend case None ⇒ None } } - } \ No newline at end of file diff --git a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala index 0b26311286..951c007fbc 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteSettings.scala @@ -6,17 +6,12 @@ package akka.remote import com.typesafe.config.Config import akka.util.Duration import java.util.concurrent.TimeUnit.MILLISECONDS -import java.net.InetAddress -import akka.ConfigurationException -import scala.collection.JavaConverters._ -import akka.actor.Address -import akka.actor.AddressFromURIString class RemoteSettings(val config: Config, val systemName: String) { import config._ - val RemoteTransport = getString("akka.remote.transport") - val LogReceive = getBoolean("akka.remote.log-received-messages") - val LogSend = getBoolean("akka.remote.log-sent-messages") - val RemoteSystemDaemonAckTimeout = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) - val UntrustedMode = getBoolean("akka.remote.untrusted-mode") + val RemoteTransport: String = getString("akka.remote.transport") + val LogReceive: Boolean = getBoolean("akka.remote.log-received-messages") + val LogSend: Boolean = getBoolean("akka.remote.log-sent-messages") + val RemoteSystemDaemonAckTimeout: Duration = Duration(getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) + val UntrustedMode: Boolean = getBoolean("akka.remote.untrusted-mode") } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 3bade97460..d912d1d878 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -10,7 +10,6 @@ import akka.event.{ LoggingAdapter, Logging } import akka.AkkaException import akka.serialization.Serialization import akka.remote.RemoteProtocol._ -import akka.dispatch.ChildTerminated import akka.actor._ /** @@ -27,54 +26,67 @@ trait RemoteClientLifeCycleEvent extends RemoteLifeCycleEvent { def remoteAddress: Address } +/** + * A RemoteClientError is a general error that is thrown within or from a RemoteClient + */ case class RemoteClientError( @BeanProperty cause: Throwable, @transient @BeanProperty remote: RemoteTransport, @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.ErrorLevel - override def toString = - "RemoteClientError@" + remoteAddress + ": Error[" + cause + "]" + override def logLevel: Logging.LogLevel = Logging.ErrorLevel + override def toString: String = "RemoteClientError@" + remoteAddress + ": Error[" + cause + "]" } +/** + * RemoteClientDisconnected is published when a RemoteClient's connection is disconnected + */ case class RemoteClientDisconnected( @transient @BeanProperty remote: RemoteTransport, @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteClientDisconnected@" + remoteAddress + override def logLevel: Logging.LogLevel = Logging.DebugLevel + override def toString: String = "RemoteClientDisconnected@" + remoteAddress } +/** + * RemoteClientConnected is published when a RemoteClient's connection is established + */ case class RemoteClientConnected( @transient @BeanProperty remote: RemoteTransport, @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteClientConnected@" + remoteAddress + override def logLevel: Logging.LogLevel = Logging.DebugLevel + override def toString: String = "RemoteClientConnected@" + remoteAddress } +/** + * RemoteClientStarted is published when a RemoteClient has started up + */ case class RemoteClientStarted( @transient @BeanProperty remote: RemoteTransport, @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteClientStarted@" + remoteAddress + override def logLevel: Logging.LogLevel = Logging.InfoLevel + override def toString: String = "RemoteClientStarted@" + remoteAddress } +/** + * RemoteClientShutdown is published when a RemoteClient has shut down + */ case class RemoteClientShutdown( @transient @BeanProperty remote: RemoteTransport, @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteClientShutdown@" + remoteAddress + override def logLevel: Logging.LogLevel = Logging.InfoLevel + override def toString: String = "RemoteClientShutdown@" + remoteAddress } +/** + * RemoteClientWriteFailed is published when a remote send of a message detectably fails (throws an exception). + */ case class RemoteClientWriteFailed( @BeanProperty request: AnyRef, @BeanProperty cause: Throwable, @transient @BeanProperty remote: RemoteTransport, @BeanProperty remoteAddress: Address) extends RemoteClientLifeCycleEvent { - override def logLevel = Logging.WarningLevel - override def toString = + override def logLevel: Logging.LogLevel = Logging.WarningLevel + override def toString: String = "RemoteClientWriteFailed@" + remoteAddress + ": MessageClass[" + (if (request ne null) request.getClass.getName else "no message") + "] Error[" + cause + "]" @@ -85,53 +97,65 @@ case class RemoteClientWriteFailed( */ trait RemoteServerLifeCycleEvent extends RemoteLifeCycleEvent +/** + * RemoteServerStarted is published when a local RemoteServer has started up + */ case class RemoteServerStarted( @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteServerStarted@" + remote + override def logLevel: Logging.LogLevel = Logging.InfoLevel + override def toString: String = "RemoteServerStarted@" + remote } +/** + * RemoteServerShutdown is published when a local RemoteServer has shut down + */ case class RemoteServerShutdown( @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.InfoLevel - override def toString = - "RemoteServerShutdown@" + remote + override def logLevel: Logging.LogLevel = Logging.InfoLevel + override def toString: String = "RemoteServerShutdown@" + remote } +/** + * A RemoteServerError is a general error that is thrown within or from a RemoteServer + */ case class RemoteServerError( @BeanProperty val cause: Throwable, @transient @BeanProperty remote: RemoteTransport) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.ErrorLevel - override def toString = - "RemoteServerError@" + remote + "] Error[" + cause + "]" + override def logLevel: Logging.LogLevel = Logging.ErrorLevel + override def toString: String = "RemoteServerError@" + remote + "] Error[" + cause + "]" } +/** + * RemoteServerClientConnected is published when an inbound connection has been established + */ case class RemoteServerClientConnected( @transient @BeanProperty remote: RemoteTransport, @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteServerClientConnected@" + remote + - ": Client[" + clientAddress.getOrElse("no address") + "]" + override def logLevel: Logging.LogLevel = Logging.DebugLevel + override def toString: String = + "RemoteServerClientConnected@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]" } +/** + * RemoteServerClientConnected is published when an inbound connection has been disconnected + */ case class RemoteServerClientDisconnected( @transient @BeanProperty remote: RemoteTransport, @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteServerClientDisconnected@" + remote + - ": Client[" + clientAddress.getOrElse("no address") + "]" + override def logLevel: Logging.LogLevel = Logging.DebugLevel + override def toString: String = + "RemoteServerClientDisconnected@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]" } +/** + * RemoteServerClientClosed is published when an inbound RemoteClient is closed + */ case class RemoteServerClientClosed( @transient @BeanProperty remote: RemoteTransport, @BeanProperty val clientAddress: Option[Address]) extends RemoteServerLifeCycleEvent { - override def logLevel = Logging.DebugLevel - override def toString = - "RemoteServerClientClosed@" + remote + - ": Client[" + clientAddress.getOrElse("no address") + "]" + override def logLevel: Logging.LogLevel = Logging.DebugLevel + override def toString: String = + "RemoteServerClientClosed@" + remote + ": Client[" + clientAddress.getOrElse("no address") + "]" } /** @@ -142,6 +166,10 @@ class RemoteClientException private[akka] ( @transient @BeanProperty val client: RemoteTransport, val remoteAddress: Address, cause: Throwable = null) extends AkkaException(message, cause) +/** + * RemoteTransportException represents a general failure within a RemoteTransport, + * such as inability to start, wrong configuration etc. + */ class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) /** @@ -178,71 +206,56 @@ abstract class RemoteTransport(val system: ExtendedActorSystem, val provider: Re */ def restartClientConnection(address: Address): Boolean - /** Methods that needs to be implemented by a transport **/ - - def send(message: Any, - senderOption: Option[ActorRef], - recipient: RemoteActorRef): Unit + /** + * Sends the given message to the recipient supplying the sender if any + */ + def send(message: Any, senderOption: Option[ActorRef], recipient: RemoteActorRef): Unit + /** + * Default implementation both publishes the message to the eventStream as well as logs it using the system logger + */ def notifyListeners(message: RemoteLifeCycleEvent): Unit = { system.eventStream.publish(message) system.log.log(message.logLevel, "{}", message) } - override def toString = address.toString -} - -class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) { - - def originalReceiver = input.getRecipient.getPath - - lazy val sender: ActorRef = - if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath) - else system.deadLetters - - lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) - - lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage) - - override def toString = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender -} - -trait RemoteMarshallingOps { + /** + * Returns this RemoteTransports Address' textual representation + */ + override def toString: String = address.toString + /** + * A Logger that can be used to log issues that may occur + */ def log: LoggingAdapter - def system: ExtendedActorSystem - - def provider: RemoteActorRefProvider - - def address: Address - + /** + * When this method returns true, some functionality will be turned off for security purposes. + */ protected def useUntrustedMode: Boolean - def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { - val arp = AkkaRemoteProtocol.newBuilder - arp.setMessage(rmp) - arp.build - } + /** + * Returns a newly created AkkaRemoteProtocol with the given message payload. + */ + def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = + AkkaRemoteProtocol.newBuilder.setMessage(rmp).build - def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol = { - val arp = AkkaRemoteProtocol.newBuilder - arp.setInstruction(rcp) - arp.build - } + /** + * Returns a newly created AkkaRemoteProtocol with the given control payload. + */ + def createControlEnvelope(rcp: RemoteControlProtocol): AkkaRemoteProtocol = + AkkaRemoteProtocol.newBuilder.setInstruction(rcp).build /** * Serializes the ActorRef instance into a Protocol Buffers (protobuf) Message. */ - def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = { + def toRemoteActorRefProtocol(actor: ActorRef): ActorRefProtocol = ActorRefProtocol.newBuilder.setPath(actor.path.toStringWithAddress(address)).build - } - - def createRemoteMessageProtocolBuilder( - recipient: ActorRef, - message: Any, - senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { + /** + * Returns a new RemoteMessageProtocol containing the serialized representation of the given parameters. + */ + def createRemoteMessageProtocolBuilder(recipient: ActorRef, message: Any, senderOption: Option[ActorRef]): RemoteMessageProtocol.Builder = { val messageBuilder = RemoteMessageProtocol.newBuilder.setRecipient(toRemoteActorRefProtocol(recipient)) if (senderOption.isDefined) messageBuilder.setSender(toRemoteActorRefProtocol(senderOption.get)) @@ -253,7 +266,12 @@ trait RemoteMarshallingOps { messageBuilder } - def receiveMessage(remoteMessage: RemoteMessage) { + /** + * Call this method with an inbound RemoteMessage and this will take care of security (see: "useUntrustedMode") + * as well as making sure that the message ends up at its destination (best effort). + * There is also a fair amount of logging produced by this method, which is good for debugging. + */ + def receiveMessage(remoteMessage: RemoteMessage): Unit = { val remoteDaemon = provider.remoteDaemon remoteMessage.recipient match { @@ -289,3 +307,43 @@ trait RemoteMarshallingOps { } } } + +/** + * RemoteMessage is a wrapper around a message that has come in over the wire, + * it allows to easily obtain references to the deserialized message, its intended recipient + * and the sender. + */ +class RemoteMessage(input: RemoteMessageProtocol, system: ExtendedActorSystem) { + /** + * Returns a String-representation of the ActorPath that this RemoteMessage is destined for + */ + def originalReceiver: String = input.getRecipient.getPath + + /** + * Returns an Option with the String representation of the ActorPath of the Actor who is the sender of this message + */ + def originalSender: Option[String] = if (input.hasSender) Some(input.getSender.getPath) else None + + /** + * Returns a reference to the Actor that sent this message, or DeadLetterActorRef if not present or found. + */ + lazy val sender: ActorRef = + if (input.hasSender) system.provider.actorFor(system.provider.rootGuardian, input.getSender.getPath) + else system.deadLetters + + /** + * Returns a reference to the Actor that this message is destined for. + * In case this returns a DeadLetterActorRef, you have access to the path using the "originalReceiver" method. + */ + lazy val recipient: InternalActorRef = system.provider.actorFor(system.provider.rootGuardian, originalReceiver) + + /** + * Returns the message + */ + lazy val payload: AnyRef = MessageSerializer.deserialize(system, input.getMessage) + + /** + * Returns a String representation of this RemoteMessage, intended for debugging purposes. + */ + override def toString: String = "RemoteMessage: " + payload + " to " + recipient + "<+{" + originalReceiver + "} from " + sender +} diff --git a/akka-remote/src/main/scala/akka/remote/netty/Client.scala b/akka-remote/src/main/scala/akka/remote/netty/Client.scala index 3c52179e4a..f4f200aef6 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Client.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Client.scala @@ -12,7 +12,6 @@ import org.jboss.netty.channel.{ ChannelFutureListener, ChannelHandler, StaticCh import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder } import org.jboss.netty.handler.execution.ExecutionHandler import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler } - import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol } import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected, RemoteClientWriteFailed } import akka.actor.{ Address, ActorRef } @@ -20,18 +19,12 @@ import akka.AkkaException import akka.event.Logging import akka.util.Switch -class RemoteClientMessageBufferException(message: String, cause: Throwable) extends AkkaException(message, cause) { - def this(msg: String) = this(msg, null) -} - /** * This is the abstract baseclass for netty remote clients, currently there's only an * ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that * reuses an already established connection. */ -abstract class RemoteClient private[akka] ( - val netty: NettyRemoteTransport, - val remoteAddress: Address) { +private[akka] abstract class RemoteClient private[akka] (val netty: NettyRemoteTransport, val remoteAddress: Address) { val log = Logging(netty.system, "RemoteClient") @@ -92,7 +85,7 @@ abstract class RemoteClient private[akka] ( /** * RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node. */ -class ActiveRemoteClient private[akka] ( +private[akka] class ActiveRemoteClient private[akka] ( netty: NettyRemoteTransport, remoteAddress: Address, localAddress: Address) @@ -225,7 +218,7 @@ class ActiveRemoteClient private[akka] ( } @ChannelHandler.Sharable -class ActiveRemoteClientHandler( +private[akka] class ActiveRemoteClientHandler( val name: String, val bootstrap: ClientBootstrap, val remoteAddress: Address, @@ -314,7 +307,7 @@ class ActiveRemoteClientHandler( } } -class ActiveRemoteClientPipelineFactory( +private[akka] class ActiveRemoteClientPipelineFactory( name: String, bootstrap: ClientBootstrap, executionHandler: ExecutionHandler, @@ -339,9 +332,9 @@ class ActiveRemoteClientPipelineFactory( } } -class PassiveRemoteClient(val currentChannel: Channel, - netty: NettyRemoteTransport, - remoteAddress: Address) +private[akka] class PassiveRemoteClient(val currentChannel: Channel, + netty: NettyRemoteTransport, + remoteAddress: Address) extends RemoteClient(netty, remoteAddress) { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 4fd70b822f..d09c17f160 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -16,17 +16,16 @@ import org.jboss.netty.channel.{ ChannelHandlerContext, Channel } import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder } import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor import org.jboss.netty.util.HashedWheelTimer -import akka.dispatch.MonitorableThreadFactory import akka.event.Logging import akka.remote.RemoteProtocol.AkkaRemoteProtocol -import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteSettings, RemoteMarshallingOps, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } +import akka.remote.{ RemoteTransportException, RemoteTransport, RemoteActorRefProvider, RemoteActorRef, RemoteServerStarted } import akka.util.NonFatal import akka.actor.{ ExtendedActorSystem, Address, ActorRef } /** * Provides the implementation of the Netty remote support */ -class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) with RemoteMarshallingOps { +private[akka] class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorRefProvider) extends RemoteTransport(_system, _provider) { import provider.remoteSettings @@ -192,7 +191,7 @@ class NettyRemoteTransport(_system: ExtendedActorSystem, _provider: RemoteActorR } -class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends ProtobufEncoder { +private[akka] class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends ProtobufEncoder { override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = { msg match { case (message: Any, sender: Option[_], recipient: ActorRef) ⇒ @@ -207,9 +206,9 @@ class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends Protobuf } } -class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) +private[akka] class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) -class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) { +private[akka] class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) { protected val guard = new ReentrantReadWriteLock protected val open = new AtomicBoolean(true) diff --git a/akka-remote/src/main/scala/akka/remote/netty/Server.scala b/akka-remote/src/main/scala/akka/remote/netty/Server.scala index 674023dd52..5c18bc6933 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Server.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Server.scala @@ -20,7 +20,7 @@ import java.net.InetAddress import akka.actor.ActorSystemImpl import org.jboss.netty.channel._ -class NettyRemoteServer(val netty: NettyRemoteTransport) { +private[akka] class NettyRemoteServer(val netty: NettyRemoteTransport) { import netty.settings @@ -82,7 +82,7 @@ class NettyRemoteServer(val netty: NettyRemoteTransport) { } } -class RemoteServerPipelineFactory( +private[akka] class RemoteServerPipelineFactory( val openChannels: ChannelGroup, val executionHandler: ExecutionHandler, val netty: NettyRemoteTransport) extends ChannelPipelineFactory { @@ -103,7 +103,7 @@ class RemoteServerPipelineFactory( } @ChannelHandler.Sharable -class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler { +private[akka] class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler { val authenticated = new AnyRef override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match { @@ -130,7 +130,7 @@ class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends Si } @ChannelHandler.Sharable -class RemoteServerHandler( +private[akka] class RemoteServerHandler( val openChannels: ChannelGroup, val netty: NettyRemoteTransport) extends SimpleChannelUpstreamHandler { diff --git a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala index bb33cb9570..64bc184408 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/Settings.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/Settings.scala @@ -9,37 +9,37 @@ import java.util.concurrent.TimeUnit._ import java.net.InetAddress import akka.ConfigurationException -class NettySettings(config: Config, val systemName: String) { +private[akka] class NettySettings(config: Config, val systemName: String) { import config._ - val BackoffTimeout = Duration(getMilliseconds("backoff-timeout"), MILLISECONDS) + val BackoffTimeout: Duration = Duration(getMilliseconds("backoff-timeout"), MILLISECONDS) val SecureCookie: Option[String] = getString("secure-cookie") match { case "" ⇒ None case cookie ⇒ Some(cookie) } - val RequireCookie = { + val RequireCookie: Boolean = { val requireCookie = getBoolean("require-cookie") if (requireCookie && SecureCookie.isEmpty) throw new ConfigurationException( "Configuration option 'akka.remote.netty.require-cookie' is turned on but no secure cookie is defined in 'akka.remote.netty.secure-cookie'.") requireCookie } - val UsePassiveConnections = getBoolean("use-passive-connections") - val UseDispatcherForIO = getString("use-dispatcher-for-io") match { + val UsePassiveConnections: Boolean = getBoolean("use-passive-connections") + val UseDispatcherForIO: Option[String] = getString("use-dispatcher-for-io") match { case "" | null ⇒ None case dispatcher ⇒ Some(dispatcher) } - val ReconnectionTimeWindow = Duration(getMilliseconds("reconnection-time-window"), MILLISECONDS) - val ReadTimeout = Duration(getMilliseconds("read-timeout"), MILLISECONDS) - val WriteTimeout = Duration(getMilliseconds("write-timeout"), MILLISECONDS) - val AllTimeout = Duration(getMilliseconds("all-timeout"), MILLISECONDS) - val ReconnectDelay = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS) - val MessageFrameSize = getBytes("message-frame-size").toInt + val ReconnectionTimeWindow: Duration = Duration(getMilliseconds("reconnection-time-window"), MILLISECONDS) + val ReadTimeout: Duration = Duration(getMilliseconds("read-timeout"), MILLISECONDS) + val WriteTimeout: Duration = Duration(getMilliseconds("write-timeout"), MILLISECONDS) + val AllTimeout: Duration = Duration(getMilliseconds("all-timeout"), MILLISECONDS) + val ReconnectDelay: Duration = Duration(getMilliseconds("reconnect-delay"), MILLISECONDS) + val MessageFrameSize: Int = getBytes("message-frame-size").toInt - val Hostname = getString("hostname") match { + val Hostname: String = getString("hostname") match { case "" ⇒ InetAddress.getLocalHost.getHostAddress case value ⇒ value } @@ -50,25 +50,25 @@ class NettySettings(config: Config, val systemName: String) { } @deprecated("WARNING: This should only be used by professionals.", "2.0") - val PortSelector = getInt("port") + val PortSelector: Int = getInt("port") - val ConnectionTimeout = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) + val ConnectionTimeout: Duration = Duration(getMilliseconds("connection-timeout"), MILLISECONDS) - val Backlog = getInt("backlog") + val Backlog: Int = getInt("backlog") - val ExecutionPoolKeepalive = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) + val ExecutionPoolKeepalive: Duration = Duration(getMilliseconds("execution-pool-keepalive"), MILLISECONDS) - val ExecutionPoolSize = getInt("execution-pool-size") match { + val ExecutionPoolSize: Int = getInt("execution-pool-size") match { case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.netty.execution-pool-size is less than 1") case sz ⇒ sz } - val MaxChannelMemorySize = getBytes("max-channel-memory-size") match { + val MaxChannelMemorySize: Long = getBytes("max-channel-memory-size") match { case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-channel-memory-size is less than 0 bytes") case sz ⇒ sz } - val MaxTotalMemorySize = getBytes("max-total-memory-size") match { + val MaxTotalMemorySize: Long = getBytes("max-total-memory-size") match { case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.netty.max-total-memory-size is less than 0 bytes") case sz ⇒ sz }