Merge branch 'master' into wip-1378-fixme-patriknw

Conflicts:
	akka-actor/src/main/scala/akka/actor/ActorRef.scala
	akka-remote/src/main/scala/akka/remote/RemoteActorRefProvider.scala
	akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala
This commit is contained in:
Patrik Nordwall 2011-12-02 09:03:03 +01:00
commit 82bbca43ab
112 changed files with 3126 additions and 1458 deletions

View file

@ -59,35 +59,34 @@ abstract class RemoteClient private[akka] (
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit =
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
throw exception
}
/**
* Sends the message across the wire
*/
def send(request: RemoteMessageProtocol) {
if (isRunning) { //FIXME RACY, ticket #1409
log.debug("Sending message: " + new RemoteMessage(request, remoteSupport))
def send(request: RemoteMessageProtocol): Unit = {
log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport))
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress))
}
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress))
}
})
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
throw exception
}
})
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}
}
@ -132,8 +131,7 @@ class ActiveRemoteClient private[akka] (
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var timer: HashedWheelTimer = _
@volatile
private var reconnectionTimeWindowStart = 0L
@ -180,10 +178,9 @@ class ActiveRemoteClient private[akka] (
runSwitch switchOn {
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, timer, this))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -219,8 +216,6 @@ class ActiveRemoteClient private[akka] (
log.debug("Shutting down remote client [{}]", name)
notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
timer.stop()
timer = null
openChannels.close.awaitUninterruptibly
openChannels = null
bootstrap.releaseExternalResources()
@ -253,18 +248,17 @@ class ActiveRemoteClientPipelineFactory(
name: String,
bootstrap: ClientBootstrap,
remoteAddress: RemoteAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
import client.remoteSupport.clientSettings._
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, ReadTimeout.length, ReadTimeout.unit)
val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit)
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, timer, client)
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client)
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)
}
@ -361,6 +355,10 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi
val serverSettings = RemoteExtension(system).serverSettings
val clientSettings = RemoteExtension(system).clientSettings
val timer: HashedWheelTimer = new HashedWheelTimer
_system.registerOnTermination(timer.stop()) //Shut this guy down at the end
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
@ -519,6 +517,10 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setHostname(address.hostname)
.setPort(address.port)
.build)
if (SecureCookie.nonEmpty)
b.setCookie(SecureCookie.get)
b.build