Merge branch 'master' into wip-ActorPath-rk

This commit is contained in:
Roland 2011-12-02 13:31:48 +01:00
commit e38cd19af9
152 changed files with 3741 additions and 1749 deletions

View file

@ -59,35 +59,34 @@ abstract class RemoteClient private[akka] (
/**
* Converts the message to the wireprotocol and sends the message across the wire
*/
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit =
def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) {
send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build)
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
throw exception
}
/**
* Sends the message across the wire
*/
def send(request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY
log.debug("Sending message: " + new RemoteMessage(request, remoteSupport))
def send(request: RemoteMessageProtocol): Unit = {
log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport))
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress))
}
try {
val payload = remoteSupport.createMessageSendEnvelope(request)
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress))
}
})
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}
} else {
val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress)
remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress))
throw exception
}
})
} catch {
case e: Exception remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress))
}
}
@ -125,15 +124,14 @@ class ActiveRemoteClient private[akka] (
import remoteSupport.clientSettings._
//FIXME rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
//TODO rewrite to a wrapper object (minimize volatile access and maximize encapsulation)
@volatile
private var bootstrap: ClientBootstrap = _
@volatile
private[remote] var connection: ChannelFuture = _
@volatile
private[remote] var openChannels: DefaultChannelGroup = _
@volatile
private var timer: HashedWheelTimer = _
@volatile
private var reconnectionTimeWindowStart = 0L
@ -161,7 +159,7 @@ class ActiveRemoteClient private[akka] (
def closeChannel(connection: ChannelFuture) = {
val channel = connection.getChannel
openChannels.remove(channel)
channel.close
channel.close()
}
def attemptReconnect(): Boolean = {
@ -180,10 +178,9 @@ class ActiveRemoteClient private[akka] (
runSwitch switchOn {
openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName)
timer = new HashedWheelTimer
bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, timer, this))
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -219,8 +216,6 @@ class ActiveRemoteClient private[akka] (
log.debug("Shutting down remote client [{}]", name)
notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress))
timer.stop()
timer = null
openChannels.close.awaitUninterruptibly
openChannels = null
bootstrap.releaseExternalResources()
@ -253,18 +248,17 @@ class ActiveRemoteClientPipelineFactory(
name: String,
bootstrap: ClientBootstrap,
remoteAddress: RemoteAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
import client.remoteSupport.clientSettings._
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, ReadTimeout.length, ReadTimeout.unit)
val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit)
val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
val protobufEnc = new ProtobufEncoder
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, timer, client)
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client)
new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient)
}
@ -345,7 +339,7 @@ class ActiveRemoteClientHandler(
client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread
}
case e: Exception
event.getChannel.close //FIXME Is this the correct behavior?
event.getChannel.close() //FIXME Is this the correct behavior???
}
} else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.remoteSupport, client.remoteAddress))
@ -361,6 +355,10 @@ class NettyRemoteSupport(_system: ActorSystem, val remote: Remote) extends Remot
val serverSettings = RemoteExtension(system).serverSettings
val clientSettings = RemoteExtension(system).clientSettings
val timer: HashedWheelTimer = new HashedWheelTimer
_system.registerOnTermination(timer.stop()) //Shut this guy down at the end
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
private val clientsLock = new ReentrantReadWriteLock
@ -519,6 +517,10 @@ class NettyRemoteServer(val remoteSupport: NettyRemoteSupport, val loader: Optio
try {
val shutdownSignal = {
val b = RemoteControlProtocol.newBuilder.setCommandType(CommandType.SHUTDOWN)
b.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setHostname(address.host)
.setPort(address.port)
.build)
if (SecureCookie.nonEmpty)
b.setCookie(SecureCookie.get)
b.build
@ -649,7 +651,7 @@ class RemoteServerHandler(
val inbound = RemoteAddress("BORKED", origin.getHostname, origin.getPort)
val client = new PassiveRemoteClient(event.getChannel, remoteSupport, inbound)
remoteSupport.bindClient(inbound, client)
case CommandType.SHUTDOWN //TODO FIXME Dispose passive connection here
case CommandType.SHUTDOWN //FIXME Dispose passive connection here, ticket #1410
case _ //Unknown command
}
case _ //ignore
@ -660,7 +662,7 @@ class RemoteServerHandler(
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
remoteSupport.notifyListeners(RemoteServerError(event.getCause, remoteSupport))
event.getChannel.close
event.getChannel.close()
}
private def getClientAddress(c: Channel): Option[RemoteAddress] =
@ -680,7 +682,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
if (open.get) {
super.add(channel)
} else {
channel.close
channel.close()
false
}
} finally {
@ -691,7 +693,7 @@ class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(na
override def close(): ChannelGroupFuture = {
guard.writeLock().lock()
try {
if (open.getAndSet(false)) super.close else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
if (open.getAndSet(false)) super.close() else throw new IllegalStateException("ChannelGroup already closed, cannot add new channel")
} finally {
guard.writeLock().unlock()
}