diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 4f02473b4f..0e9e8211cb 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -235,7 +235,7 @@ abstract class ActorSystem extends ActorRefFactory { * Register a block of code to run after all actors in this actor system have * been stopped. */ - def registerOnTermination(code: ⇒ Unit) + def registerOnTermination[T](code: ⇒ T) /** * Register a block of code to run after all actors in this actor system have @@ -354,7 +354,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A def start() = _start - def registerOnTermination(code: ⇒ Unit) { terminationFuture onComplete (_ ⇒ code) } + def registerOnTermination[T](code: ⇒ T) { terminationFuture onComplete (_ ⇒ code) } def registerOnTermination(code: Runnable) { terminationFuture onComplete (_ ⇒ code.run) } // TODO shutdown all that other stuff, whatever that may be diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index d12f5ea7e4..2bf25c6186 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -59,35 +59,34 @@ abstract class RemoteClient private[akka] ( /** * Converts the message to the wireprotocol and sends the message across the wire */ - def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = + def send(message: Any, senderOption: Option[ActorRef], recipient: ActorRef): Unit = if (isRunning) { send(remoteSupport.createRemoteMessageProtocolBuilder(Left(recipient), Right(message), senderOption).build) + } else { + val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) + remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress)) + throw exception + } /** * Sends the message across the wire */ - def send(request: RemoteMessageProtocol) { - if (isRunning) { //TODO FIXME RACY - log.debug("Sending message: " + new RemoteMessage(request, remoteSupport)) + def send(request: RemoteMessageProtocol): Unit = { + log.debug("Sending message: {}", new RemoteMessage(request, remoteSupport)) - try { - val payload = remoteSupport.createMessageSendEnvelope(request) - currentChannel.write(payload).addListener( - new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //Not interesting at the moment - } else if (!future.isSuccess) { - remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress)) - } + try { + val payload = remoteSupport.createMessageSendEnvelope(request) + currentChannel.write(payload).addListener( + new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isCancelled) { + //Not interesting at the moment + } else if (!future.isSuccess) { + remoteSupport.notifyListeners(RemoteClientWriteFailed(payload, future.getCause, remoteSupport, remoteAddress)) } - }) - } catch { - case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) - } - } else { - val exception = new RemoteClientException("RemoteModule client is not running, make sure you have invoked 'RemoteClient.connect()' before using it.", remoteSupport, remoteAddress) - remoteSupport.notifyListeners(RemoteClientError(exception, remoteSupport, remoteAddress)) - throw exception + } + }) + } catch { + case e: Exception ⇒ remoteSupport.notifyListeners(RemoteClientError(e, remoteSupport, remoteAddress)) } } @@ -132,8 +131,7 @@ class ActiveRemoteClient private[akka] ( private[remote] var connection: ChannelFuture = _ @volatile private[remote] var openChannels: DefaultChannelGroup = _ - @volatile - private var timer: HashedWheelTimer = _ + @volatile private var reconnectionTimeWindowStart = 0L @@ -180,10 +178,9 @@ class ActiveRemoteClient private[akka] ( runSwitch switchOn { openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) - timer = new HashedWheelTimer bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)) - bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, timer, this)) + bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -219,8 +216,6 @@ class ActiveRemoteClient private[akka] ( log.debug("Shutting down remote client [{}]", name) notifyListeners(RemoteClientShutdown(remoteSupport, remoteAddress)) - timer.stop() - timer = null openChannels.close.awaitUninterruptibly openChannels = null bootstrap.releaseExternalResources() @@ -253,18 +248,17 @@ class ActiveRemoteClientPipelineFactory( name: String, bootstrap: ClientBootstrap, remoteAddress: RemoteAddress, - timer: HashedWheelTimer, client: ActiveRemoteClient) extends ChannelPipelineFactory { import client.remoteSupport.clientSettings._ def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, ReadTimeout.length, ReadTimeout.unit) + val timeout = new ReadTimeoutHandler(client.remoteSupport.timer, ReadTimeout.length, ReadTimeout.unit) val lenDec = new LengthFieldBasedFrameDecoder(MessageFrameSize, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) val protobufEnc = new ProtobufEncoder - val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, timer, client) + val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.remoteSupport.timer, client) new StaticChannelPipeline(timeout, lenDec, protobufDec, lenPrep, protobufEnc, remoteClient) } @@ -361,6 +355,10 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi val serverSettings = RemoteExtension(system).serverSettings val clientSettings = RemoteExtension(system).clientSettings + val timer: HashedWheelTimer = new HashedWheelTimer + + _system.registerOnTermination(timer.stop()) //Shut this guy down at the end + private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock