restructure remoting
- remove Remote, incorporate its few fields into RemoteActorRefProvider - rename RemoteSupport to RemoteTransport to sync up with conf and the way we talk about it - remove LocalAddress/RemoteAddress etc. and just have a final case class Address(protocol, system, host, port) - split netty settings out or RemoteSettings into NettySettings - split out from NettyRemoteSupport.scala: Server.scala, Client.scala, Settings.scala plus a few fixes, including using the contextClassLoader when loading the provider for ActorSystemImpl
This commit is contained in:
parent
6db3e59ce1
commit
edceda8edf
34 changed files with 1415 additions and 1520 deletions
|
|
@ -4,394 +4,114 @@
|
|||
|
||||
package akka.remote.netty
|
||||
|
||||
import akka.actor.{ ActorRef, IllegalActorStateException, simpleName }
|
||||
import akka.remote._
|
||||
import RemoteProtocol._
|
||||
import akka.util._
|
||||
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture }
|
||||
import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory }
|
||||
import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
|
||||
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
|
||||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
|
||||
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
|
||||
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
|
||||
import java.net.{ UnknownHostException, InetAddress }
|
||||
import java.util.concurrent.atomic.AtomicBoolean
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock
|
||||
import java.util.concurrent.Executors
|
||||
import scala.collection.mutable.HashMap
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.AkkaException
|
||||
import akka.event.Logging
|
||||
import org.jboss.netty.channel._
|
||||
import akka.actor.ActorSystemImpl
|
||||
import org.jboss.netty.handler.execution.{ ExecutionHandler, OrderedMemoryAwareThreadPoolExecutor }
|
||||
import java.util.concurrent._
|
||||
import locks.ReentrantReadWriteLock
|
||||
import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroupFuture }
|
||||
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
|
||||
import org.jboss.netty.channel.{ ChannelHandlerContext, ChannelFutureListener, ChannelFuture, Channel }
|
||||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufEncoder, ProtobufDecoder }
|
||||
import org.jboss.netty.handler.execution.OrderedMemoryAwareThreadPoolExecutor
|
||||
import org.jboss.netty.util.HashedWheelTimer
|
||||
import akka.actor.{ ActorSystemImpl, ActorRef, simpleName }
|
||||
import akka.dispatch.MonitorableThreadFactory
|
||||
|
||||
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
|
||||
def this(msg: String) = this(msg, null)
|
||||
}
|
||||
|
||||
/**
|
||||
* This is the abstract baseclass for netty remote clients, currently there's only an
|
||||
* ActiveRemoteClient, but others could be feasible, like a PassiveRemoteClient that
|
||||
* reuses an already established connection.
|
||||
*/
|
||||
abstract class RemoteClient private[akka] (
|
||||
val remoteSupport: NettyRemoteSupport,
|
||||
val remoteAddress: RemoteNettyAddress) {
|
||||
|
||||
val log = Logging(remoteSupport.system, "RemoteClient")
|
||||
|
||||
val name = simpleName(this) + "@" + remoteAddress
|
||||
|
||||
private[remote] val runSwitch = new Switch()
|
||||
|
||||
private[remote] def isRunning = runSwitch.isOn
|
||||
|
||||
protected def currentChannel: Channel
|
||||
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean
|
||||
|
||||
def shutdown(): Boolean
|
||||
|
||||
def isBoundTo(address: RemoteNettyAddress): Boolean = remoteAddress == address
|
||||
|
||||
/**
|
||||
* Converts the message to the wireprotocol and sends the message across the wire
|
||||
*/
|
||||
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
|
||||
log.debug("Sending message {} from {} to {}", message, senderOption, recipient)
|
||||
send((message, senderOption, recipient))
|
||||
} else {
|
||||
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
|
||||
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
|
||||
throw exception
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the message across the wire
|
||||
*/
|
||||
private def send(request: (Any, Option[ActorRef], ActorRef)): Unit = {
|
||||
try {
|
||||
val channel = currentChannel
|
||||
val f = channel.write(request)
|
||||
f.addListener(
|
||||
new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture) {
|
||||
if (future.isCancelled || !future.isSuccess) {
|
||||
remoteSupport.notifyListeners(RemoteClientWriteFailed(request, future.getCause, remoteSupport, remoteAddress))
|
||||
}
|
||||
}
|
||||
})
|
||||
// Check if we should back off
|
||||
if (!channel.isWritable) {
|
||||
val backoff = remoteSupport.remote.remoteSettings.BackoffTimeout
|
||||
if (backoff.length > 0 && !f.await(backoff.length, backoff.unit)) f.cancel() //Waited as long as we could, now back off
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
override def toString = name
|
||||
}
|
||||
|
||||
class PassiveRemoteClient(val currentChannel: Channel,
|
||||
remoteSupport: NettyRemoteSupport,
|
||||
remoteAddress: RemoteNettyAddress)
|
||||
extends RemoteClient(remoteSupport, remoteAddress) {
|
||||
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn {
|
||||
remoteSupport.notifyListeners(RemoteClientStarted(remoteSupport, remoteAddress))
|
||||
log.debug("Starting remote client connection to [{}]", remoteAddress)
|
||||
}
|
||||
|
||||
def shutdown() = runSwitch switchOff {
|
||||
log.debug("Shutting down remote client [{}]", name)
|
||||
|
||||
remoteSupport.notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
|
||||
log.debug("[{}] has been shut down", name)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* RemoteClient represents a connection to an Akka node. Is used to send messages to remote actors on the node.
|
||||
*/
|
||||
class ActiveRemoteClient private[akka] (
|
||||
remoteSupport: NettyRemoteSupport,
|
||||
remoteAddress: RemoteNettyAddress,
|
||||
localAddress: RemoteSystemAddress[ParsedTransportAddress],
|
||||
val loader: Option[ClassLoader] = None)
|
||||
extends RemoteClient(remoteSupport, remoteAddress) {
|
||||
|
||||
if (remoteAddress.ip.isEmpty) throw new java.net.UnknownHostException(remoteAddress.host)
|
||||
|
||||
import remoteSupport.clientSettings._
|
||||
|
||||
//TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
|
||||
@volatile
|
||||
private var bootstrap: ClientBootstrap = _
|
||||
@volatile
|
||||
private var connection: ChannelFuture = _
|
||||
@volatile
|
||||
private[remote] var openChannels: DefaultChannelGroup = _
|
||||
@volatile
|
||||
private var executionHandler: ExecutionHandler = _
|
||||
|
||||
@volatile
|
||||
private var reconnectionTimeWindowStart = 0L
|
||||
|
||||
def notifyListeners(msg: RemoteLifeCycleEvent): Unit = remoteSupport.notifyListeners(msg)
|
||||
|
||||
def currentChannel = connection.getChannel
|
||||
|
||||
/**
|
||||
* Connect to remote server.
|
||||
*/
|
||||
def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = {
|
||||
|
||||
def sendSecureCookie(connection: ChannelFuture) {
|
||||
val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT)
|
||||
if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get)
|
||||
handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
|
||||
.setSystem(localAddress.system)
|
||||
.setHostname(localAddress.transport.host)
|
||||
.setPort(localAddress.transport.port)
|
||||
.build)
|
||||
connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build))
|
||||
}
|
||||
|
||||
def attemptReconnect(): Boolean = {
|
||||
log.debug("Remote client reconnecting to [{}]", remoteAddress)
|
||||
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
|
||||
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
|
||||
|
||||
if (!connection.isSuccess) {
|
||||
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
|
||||
false
|
||||
} else {
|
||||
sendSecureCookie(connection)
|
||||
true
|
||||
}
|
||||
}
|
||||
|
||||
runSwitch switchOn {
|
||||
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
|
||||
|
||||
executionHandler = new ExecutionHandler(remoteSupport.executor)
|
||||
|
||||
bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory)
|
||||
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
log.debug("Starting remote client connection to [{}]", remoteAddress)
|
||||
|
||||
connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port))
|
||||
|
||||
openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails.
|
||||
|
||||
if (!connection.isSuccess) {
|
||||
notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress))
|
||||
false
|
||||
} else {
|
||||
sendSecureCookie(connection)
|
||||
notifyListeners(RemoteClientStarted(remoteSupport, remoteAddress))
|
||||
true
|
||||
}
|
||||
} match {
|
||||
case true ⇒ true
|
||||
case false if reconnectIfAlreadyConnected ⇒
|
||||
connection.getChannel.close()
|
||||
openChannels.remove(connection.getChannel)
|
||||
|
||||
log.debug("Remote client reconnecting to [{}]", remoteAddress)
|
||||
attemptReconnect()
|
||||
|
||||
case false ⇒ false
|
||||
}
|
||||
}
|
||||
|
||||
// Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
|
||||
def shutdown() = runSwitch switchOff {
|
||||
log.debug("Shutting down remote client [{}]", name)
|
||||
|
||||
notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
|
||||
try {
|
||||
if ((connection ne null) && (connection.getChannel ne null))
|
||||
connection.getChannel.close()
|
||||
} finally {
|
||||
try {
|
||||
if (openChannels ne null) openChannels.close.awaitUninterruptibly()
|
||||
} finally {
|
||||
connection = null
|
||||
executionHandler = null
|
||||
}
|
||||
}
|
||||
|
||||
log.debug("[{}] has been shut down", name)
|
||||
}
|
||||
|
||||
private[akka] def isWithinReconnectionTimeWindow: Boolean = {
|
||||
if (reconnectionTimeWindowStart == 0L) {
|
||||
reconnectionTimeWindowStart = System.currentTimeMillis
|
||||
true
|
||||
} else {
|
||||
val timeLeft = (ReconnectionTimeWindow.toMillis - (System.currentTimeMillis - reconnectionTimeWindowStart)) > 0
|
||||
if (timeLeft)
|
||||
log.info("Will try to reconnect to remote server for another [{}] milliseconds", timeLeft)
|
||||
|
||||
timeLeft
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def resetReconnectionTimeWindow = reconnectionTimeWindowStart = 0L
|
||||
}
|
||||
|
||||
class ActiveRemoteClientPipelineFactory(
|
||||
name: String,
|
||||
bootstrap: ClientBootstrap,
|
||||
executionHandler: ExecutionHandler,
|
||||
remoteAddress: RemoteNettyAddress,
|
||||
client: ActiveRemoteClient) extends ChannelPipelineFactory {
|
||||
|
||||
import client.remoteSupport.clientSettings._
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit)
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val messageDec = new RemoteMessageDecoder
|
||||
val messageEnc = new RemoteMessageEncoder(client.remoteSupport)
|
||||
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client)
|
||||
|
||||
new StaticChannelPipeline(timeout, lenDec, messageDec, lenPrep, messageEnc, executionHandler, remoteClient)
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteMessageEncoder(remoteSupport: NettyRemoteSupport) extends ProtobufEncoder {
|
||||
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
|
||||
msg match {
|
||||
case (message: Any, sender: Option[_], recipient: ActorRef) ⇒
|
||||
super.encode(ctx, channel,
|
||||
remoteSupport.createMessageSendEnvelope(
|
||||
remoteSupport.createRemoteMessageProtocolBuilder(
|
||||
recipient,
|
||||
message,
|
||||
sender.asInstanceOf[Option[ActorRef]]).build))
|
||||
case _ ⇒ super.encode(ctx, channel, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class ActiveRemoteClientHandler(
|
||||
val name: String,
|
||||
val bootstrap: ClientBootstrap,
|
||||
val remoteAddress: RemoteNettyAddress,
|
||||
val timer: HashedWheelTimer,
|
||||
val client: ActiveRemoteClient)
|
||||
extends SimpleChannelUpstreamHandler {
|
||||
|
||||
def runOnceNow(thunk: ⇒ Unit): Unit = timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() }
|
||||
}, 0, TimeUnit.MILLISECONDS)
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
|
||||
try {
|
||||
event.getMessage match {
|
||||
case arp: AkkaRemoteProtocol if arp.hasInstruction ⇒
|
||||
val rcp = arp.getInstruction
|
||||
rcp.getCommandType match {
|
||||
case CommandType.SHUTDOWN ⇒ runOnceNow { client.remoteSupport.shutdownClientConnection(remoteAddress) }
|
||||
case _ ⇒ //Ignore others
|
||||
}
|
||||
|
||||
case arp: AkkaRemoteProtocol if arp.hasMessage ⇒
|
||||
client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport.system, client.loader))
|
||||
|
||||
case other ⇒
|
||||
throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress)
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.remoteSupport, client.remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = client.runSwitch ifOn {
|
||||
if (client.isWithinReconnectionTimeWindow) {
|
||||
timer.newTimeout(new TimerTask() {
|
||||
def run(timeout: Timeout) =
|
||||
if (client.isRunning) {
|
||||
client.openChannels.remove(event.getChannel)
|
||||
client.connect(reconnectIfAlreadyConnected = true)
|
||||
}
|
||||
}, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS)
|
||||
} else runOnceNow {
|
||||
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
|
||||
}
|
||||
}
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
try {
|
||||
client.notifyListeners(RemoteClientConnected(client.remoteSupport, client.remoteAddress))
|
||||
client.resetReconnectionTimeWindow
|
||||
} catch {
|
||||
case e: Exception ⇒ client.notifyListeners(RemoteClientError(e, client.remoteSupport, client.remoteAddress))
|
||||
}
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
client.notifyListeners(RemoteClientDisconnected(client.remoteSupport, client.remoteAddress))
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
val cause = event.getCause
|
||||
if (cause ne null) {
|
||||
client.notifyListeners(RemoteClientError(cause, client.remoteSupport, client.remoteAddress))
|
||||
cause match {
|
||||
case e: ReadTimeoutException ⇒
|
||||
runOnceNow {
|
||||
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
|
||||
}
|
||||
case e: Exception ⇒ event.getChannel.close()
|
||||
}
|
||||
|
||||
} else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress))
|
||||
}
|
||||
}
|
||||
import akka.event.Logging
|
||||
import akka.remote.RemoteProtocol.AkkaRemoteProtocol
|
||||
import akka.remote.{ RemoteTransport, RemoteMarshallingOps, RemoteClientWriteFailed, RemoteClientException, RemoteClientError, RemoteActorRef }
|
||||
import akka.util.Switch
|
||||
import akka.AkkaException
|
||||
import com.typesafe.config.Config
|
||||
import akka.remote.RemoteSettings
|
||||
import akka.actor.Address
|
||||
import java.net.InetSocketAddress
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.remote.RemoteActorRefProvider
|
||||
import akka.event.LoggingAdapter
|
||||
|
||||
/**
|
||||
* Provides the implementation of the Netty remote support
|
||||
*/
|
||||
class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val address: RemoteSystemAddress[RemoteNettyAddress])
|
||||
extends RemoteSupport[RemoteNettyAddress](_system) with RemoteMarshallingOps {
|
||||
val log = Logging(system, "NettyRemoteSupport")
|
||||
class NettyRemoteTransport(val remoteSettings: RemoteSettings)
|
||||
extends RemoteTransport with RemoteMarshallingOps {
|
||||
|
||||
val serverSettings = remote.remoteSettings.serverSettings
|
||||
val clientSettings = remote.remoteSettings.clientSettings
|
||||
val settings = new NettySettings(remoteSettings.config.getConfig("akka.remote.netty"), remoteSettings.systemName)
|
||||
|
||||
val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic)
|
||||
val threadFactory = new MonitorableThreadFactory("NettyRemoteTransport", settings.Daemonic)
|
||||
val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory)
|
||||
|
||||
val executor = new OrderedMemoryAwareThreadPoolExecutor(
|
||||
serverSettings.ExecutionPoolSize,
|
||||
serverSettings.MaxChannelMemorySize,
|
||||
serverSettings.MaxTotalMemorySize,
|
||||
serverSettings.ExecutionPoolKeepAlive.length,
|
||||
serverSettings.ExecutionPoolKeepAlive.unit,
|
||||
settings.ExecutionPoolSize,
|
||||
settings.MaxChannelMemorySize,
|
||||
settings.MaxTotalMemorySize,
|
||||
settings.ExecutionPoolKeepAlive.length,
|
||||
settings.ExecutionPoolKeepAlive.unit,
|
||||
threadFactory)
|
||||
|
||||
val clientChannelFactory = new NioClientSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(threadFactory),
|
||||
Executors.newCachedThreadPool(threadFactory))
|
||||
|
||||
private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient]
|
||||
private val remoteClients = new HashMap[Address, RemoteClient]
|
||||
private val clientsLock = new ReentrantReadWriteLock
|
||||
|
||||
override protected def useUntrustedMode = serverSettings.UntrustedMode
|
||||
override protected def useUntrustedMode = remoteSettings.UntrustedMode
|
||||
|
||||
val server = try new NettyRemoteServer(this, Some(getClass.getClassLoader)) catch {
|
||||
case ex ⇒ shutdown(); throw ex
|
||||
}
|
||||
|
||||
val address = {
|
||||
server.channel.getLocalAddress match {
|
||||
case ia: InetSocketAddress ⇒ Address("akka", remoteSettings.systemName, Some(ia.getHostName), Some(ia.getPort))
|
||||
case x ⇒
|
||||
shutdown()
|
||||
throw new IllegalArgumentException("unknown address format " + x + ":" + x.getClass)
|
||||
}
|
||||
}
|
||||
|
||||
@volatile
|
||||
private var _system: ActorSystemImpl = _
|
||||
def system = _system
|
||||
|
||||
@volatile
|
||||
private var _provider: RemoteActorRefProvider = _
|
||||
def provider = _provider
|
||||
|
||||
@volatile
|
||||
private var _log: LoggingAdapter = _
|
||||
def log = _log
|
||||
|
||||
def start(system: ActorSystemImpl, provider: RemoteActorRefProvider): Unit = {
|
||||
_system = system
|
||||
_provider = provider
|
||||
_log = Logging(system, "NettyRemoteTransport")
|
||||
server.start(system)
|
||||
}
|
||||
|
||||
def shutdown(): Unit = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
remoteClients foreach { case (_, client) ⇒ client.shutdown() }
|
||||
remoteClients.clear()
|
||||
} finally {
|
||||
clientsLock.writeLock().unlock()
|
||||
try {
|
||||
if (server != null) server.shutdown()
|
||||
} finally {
|
||||
try {
|
||||
timer.stop()
|
||||
} finally {
|
||||
try {
|
||||
clientChannelFactory.releaseExternalResources()
|
||||
} finally {
|
||||
executor.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected[akka] def send(
|
||||
message: Any,
|
||||
|
|
@ -399,13 +119,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
recipient: RemoteActorRef,
|
||||
loader: Option[ClassLoader]): Unit = {
|
||||
|
||||
val recipientAddress = recipient.path.address match {
|
||||
case RemoteSystemAddress(sys, transport) ⇒
|
||||
transport match {
|
||||
case x: RemoteNettyAddress ⇒ x
|
||||
case _ ⇒ throw new IllegalArgumentException("invoking NettyRemoteSupport.send with foreign target address " + transport)
|
||||
}
|
||||
}
|
||||
val recipientAddress = recipient.path.address
|
||||
|
||||
clientsLock.readLock.lock
|
||||
try {
|
||||
|
|
@ -420,7 +134,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
//Recheck for addition, race between upgrades
|
||||
case Some(client) ⇒ client //If already populated by other writer
|
||||
case None ⇒ //Populate map
|
||||
val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader)
|
||||
val client = new ActiveRemoteClient(this, recipientAddress, address, loader)
|
||||
client.connect()
|
||||
remoteClients += recipientAddress -> client
|
||||
client
|
||||
|
|
@ -438,7 +152,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
}
|
||||
}
|
||||
|
||||
def bindClient(remoteAddress: RemoteNettyAddress, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = {
|
||||
def bindClient(remoteAddress: Address, client: RemoteClient, putIfAbsent: Boolean = false): Boolean = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
if (putIfAbsent && remoteClients.contains(remoteAddress)) false
|
||||
|
|
@ -452,7 +166,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
}
|
||||
}
|
||||
|
||||
def unbindClient(remoteAddress: RemoteNettyAddress): Unit = {
|
||||
def unbindClient(remoteAddress: Address): Unit = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
remoteClients.foreach { case (k, v) ⇒ if (v.isBoundTo(remoteAddress)) { v.shutdown(); remoteClients.remove(k) } }
|
||||
|
|
@ -461,7 +175,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
}
|
||||
}
|
||||
|
||||
def shutdownClientConnection(remoteAddress: RemoteNettyAddress): Boolean = {
|
||||
def shutdownClientConnection(remoteAddress: Address): Boolean = {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
remoteClients.remove(remoteAddress) match {
|
||||
|
|
@ -473,7 +187,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
}
|
||||
}
|
||||
|
||||
def restartClientConnection(remoteAddress: RemoteNettyAddress): Boolean = {
|
||||
def restartClientConnection(remoteAddress: Address): Boolean = {
|
||||
clientsLock.readLock().lock()
|
||||
try {
|
||||
remoteClients.get(remoteAddress) match {
|
||||
|
|
@ -485,229 +199,24 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Server section
|
||||
*/
|
||||
@volatile
|
||||
private var currentServer: NettyRemoteServer = _
|
||||
}
|
||||
|
||||
def name = currentServer match {
|
||||
case null ⇒ remote.remoteAddress.toString
|
||||
case server ⇒ server.name
|
||||
}
|
||||
|
||||
private val _isRunning = new Switch(false)
|
||||
|
||||
def isRunning = _isRunning.isOn
|
||||
|
||||
def start(loader: Option[ClassLoader] = None): Unit =
|
||||
_isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) }
|
||||
|
||||
/**
|
||||
* Common section
|
||||
*/
|
||||
|
||||
def shutdown(): Unit = _isRunning switchOff {
|
||||
clientsLock.writeLock().lock()
|
||||
try {
|
||||
remoteClients foreach { case (_, client) ⇒ client.shutdown() }
|
||||
remoteClients.clear()
|
||||
} finally {
|
||||
clientsLock.writeLock().unlock()
|
||||
try {
|
||||
val s = currentServer
|
||||
currentServer = null
|
||||
s.shutdown()
|
||||
} finally {
|
||||
try {
|
||||
timer.stop()
|
||||
} finally {
|
||||
try {
|
||||
clientChannelFactory.releaseExternalResources()
|
||||
} finally {
|
||||
executor.shutdown()
|
||||
}
|
||||
}
|
||||
}
|
||||
class RemoteMessageEncoder(remoteSupport: NettyRemoteTransport) extends ProtobufEncoder {
|
||||
override def encode(ctx: ChannelHandlerContext, channel: Channel, msg: AnyRef): AnyRef = {
|
||||
msg match {
|
||||
case (message: Any, sender: Option[_], recipient: ActorRef) ⇒
|
||||
super.encode(ctx, channel,
|
||||
remoteSupport.createMessageSendEnvelope(
|
||||
remoteSupport.createRemoteMessageProtocolBuilder(
|
||||
recipient,
|
||||
message,
|
||||
sender.asInstanceOf[Option[ActorRef]]).build))
|
||||
case _ ⇒ super.encode(ctx, channel, msg)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class NettyRemoteServer(
|
||||
val remoteSupport: NettyRemoteSupport,
|
||||
val loader: Option[ClassLoader],
|
||||
val address: RemoteSystemAddress[RemoteNettyAddress]) {
|
||||
val log = Logging(remoteSupport.system, "NettyRemoteServer")
|
||||
import remoteSupport.serverSettings._
|
||||
|
||||
if (address.transport.ip.isEmpty) throw new java.net.UnknownHostException(address.transport.host)
|
||||
|
||||
val name = "NettyRemoteServer@" + address
|
||||
|
||||
private val factory = new NioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool(remoteSupport.threadFactory),
|
||||
Executors.newCachedThreadPool(remoteSupport.threadFactory))
|
||||
|
||||
private val bootstrap = new ServerBootstrap(factory)
|
||||
|
||||
private val executionHandler = new ExecutionHandler(remoteSupport.executor)
|
||||
|
||||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
|
||||
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("backlog", Backlog)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
bootstrap.setOption("child.keepAlive", true)
|
||||
bootstrap.setOption("child.reuseAddress", true)
|
||||
bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis)
|
||||
|
||||
openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port)))
|
||||
remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport))
|
||||
|
||||
def shutdown() {
|
||||
try {
|
||||
val shutdownSignal = {
|
||||
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
|
||||
b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
|
||||
.setSystem(address.system)
|
||||
.setHostname(address.transport.host)
|
||||
.setPort(address.transport.port)
|
||||
.build)
|
||||
if (SecureCookie.nonEmpty)
|
||||
b.setCookie(SecureCookie.get)
|
||||
b.build
|
||||
}
|
||||
openChannels.write(remoteSupport.createControlEnvelope(shutdownSignal)).awaitUninterruptibly
|
||||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources()
|
||||
remoteSupport.notifyListeners(RemoteServerShutdown(remoteSupport))
|
||||
} catch {
|
||||
case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RemoteServerPipelineFactory(
|
||||
val name: String,
|
||||
val openChannels: ChannelGroup,
|
||||
val executionHandler: ExecutionHandler,
|
||||
val loader: Option[ClassLoader],
|
||||
val remoteSupport: NettyRemoteSupport) extends ChannelPipelineFactory {
|
||||
|
||||
import remoteSupport.serverSettings._
|
||||
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
|
||||
val lenPrep = new LengthFieldPrepender(4)
|
||||
val messageDec = new RemoteMessageDecoder
|
||||
val messageEnc = new RemoteMessageEncoder(remoteSupport)
|
||||
|
||||
val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport)
|
||||
val stages: List[ChannelHandler] = lenDec :: messageDec :: lenPrep :: messageEnc :: executionHandler :: authenticator ::: remoteServer :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class RemoteServerAuthenticationHandler(secureCookie: Option[String]) extends SimpleChannelUpstreamHandler {
|
||||
val authenticated = new AnyRef
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = secureCookie match {
|
||||
case None ⇒ ctx.sendUpstream(event)
|
||||
case Some(cookie) ⇒
|
||||
ctx.getAttachment match {
|
||||
case `authenticated` ⇒ ctx.sendUpstream(event)
|
||||
case null ⇒ event.getMessage match {
|
||||
case remoteProtocol: AkkaRemoteProtocol if remoteProtocol.hasInstruction ⇒
|
||||
val instruction = remoteProtocol.getInstruction
|
||||
instruction.getCookie match {
|
||||
case `cookie` ⇒
|
||||
ctx.setAttachment(authenticated)
|
||||
ctx.sendUpstream(event)
|
||||
case _ ⇒
|
||||
throw new SecurityException(
|
||||
"The remote client [" + ctx.getChannel.getRemoteAddress + "] secure cookie is not the same as remote server secure cookie")
|
||||
}
|
||||
case _ ⇒
|
||||
throw new SecurityException("The remote client [" + ctx.getChannel.getRemoteAddress + "] is not authorized!")
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ChannelHandler.Sharable
|
||||
class RemoteServerHandler(
|
||||
val name: String,
|
||||
val openChannels: ChannelGroup,
|
||||
val applicationLoader: Option[ClassLoader],
|
||||
val remoteSupport: NettyRemoteSupport) extends SimpleChannelUpstreamHandler {
|
||||
|
||||
val log = Logging(remoteSupport.system, "RemoteServerHandler")
|
||||
|
||||
import remoteSupport.serverSettings._
|
||||
|
||||
/**
|
||||
* ChannelOpen overridden to store open channels for a clean postStop of a node.
|
||||
* If a channel is closed before, it is automatically removed from the open channels group.
|
||||
*/
|
||||
override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = openChannels.add(ctx.getChannel)
|
||||
|
||||
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx.getChannel)
|
||||
remoteSupport.notifyListeners(RemoteServerClientConnected(remoteSupport, clientAddress))
|
||||
}
|
||||
|
||||
override def channelDisconnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
|
||||
val clientAddress = getClientAddress(ctx.getChannel)
|
||||
remoteSupport.notifyListeners(RemoteServerClientDisconnected(remoteSupport, clientAddress))
|
||||
}
|
||||
|
||||
override def channelClosed(ctx: ChannelHandlerContext, event: ChannelStateEvent) = getClientAddress(ctx.getChannel) match {
|
||||
case s @ Some(address) ⇒
|
||||
if (UsePassiveConnections)
|
||||
remoteSupport.unbindClient(address)
|
||||
remoteSupport.notifyListeners(RemoteServerClientClosed(remoteSupport, s))
|
||||
case None ⇒
|
||||
remoteSupport.notifyListeners(RemoteServerClientClosed[RemoteNettyAddress](remoteSupport, None))
|
||||
}
|
||||
|
||||
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try {
|
||||
event.getMessage match {
|
||||
case remote: AkkaRemoteProtocol if remote.hasMessage ⇒
|
||||
remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport.system, applicationLoader))
|
||||
|
||||
case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒
|
||||
val instruction = remote.getInstruction
|
||||
instruction.getCommandType match {
|
||||
case CommandType.CONNECT if UsePassiveConnections ⇒
|
||||
val origin = instruction.getOrigin
|
||||
val inbound = RemoteNettyAddress(origin.getHostname, origin.getPort)
|
||||
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
|
||||
remoteSupport.bindClient(inbound, client)
|
||||
case CommandType.SHUTDOWN ⇒ //Will be unbound in channelClosed
|
||||
case _ ⇒ //Unknown command
|
||||
}
|
||||
case _ ⇒ //ignore
|
||||
}
|
||||
} catch {
|
||||
case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport))
|
||||
}
|
||||
|
||||
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
|
||||
remoteSupport.notifyListeners(RemoteServerError(event.getCause, remoteSupport))
|
||||
event.getChannel.close()
|
||||
}
|
||||
|
||||
private def getClientAddress(c: Channel): Option[RemoteNettyAddress] =
|
||||
c.getRemoteAddress match {
|
||||
case inet: InetSocketAddress ⇒ Some(RemoteNettyAddress(inet.getHostName, Some(inet.getAddress), inet.getPort))
|
||||
case _ ⇒ None
|
||||
}
|
||||
}
|
||||
class RemoteMessageDecoder extends ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
|
||||
|
||||
class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {
|
||||
protected val guard = new ReentrantReadWriteLock
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue