diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index e7de6cbc1f..adbca60133 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -27,7 +27,7 @@ import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap} import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } -import org.jboss.netty.handler.timeout.ReadTimeoutHandler +import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler @@ -80,14 +80,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem lock.readLock.lock try { val c = remoteClients.get(key) match { - case Some(client) => client + case s: Some[RemoteClient] => s.get case None => lock.readLock.unlock lock.writeLock.lock //Lock upgrade, not supported natively try { try { remoteClients.get(key) match { //Recheck for addition, race between upgrades - case Some(client) => client //If already populated by other writer + case s: Some[RemoteClient] => s.get //If already populated by other writer case None => //Populate map val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _) client.connect() @@ -103,14 +103,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard { remoteClients.remove(Address(address)) match { - case Some(client) => client.shutdown + case s: Some[RemoteClient] => s.get.shutdown case None => false } } def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard { remoteClients.get(Address(address)) match { - case Some(client) => client.connect(reconnectIfAlreadyConnected = true) + case s: Some[RemoteClient] => s.get.connect(reconnectIfAlreadyConnected = true) case None => false } } @@ -120,7 +120,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard { remoteClients.get(Address(actorRef.homeAddress.get)) match { - case Some(client) => client.deregisterSupervisorForActor(actorRef) + case s: Some[RemoteClient] => s.get.deregisterSupervisorForActor(actorRef) case None => actorRef } } @@ -325,6 +325,7 @@ class ActiveRemoteClient private[akka] ( } } + //Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients def shutdown = runSwitch switchOff { notifyListeners(RemoteClientShutdown(module, remoteAddress)) timer.stop @@ -356,12 +357,12 @@ class ActiveRemoteClientPipelineFactory( futures: ConcurrentMap[Uuid, CompletableFuture[_]], supervisors: ConcurrentMap[Uuid, ActorRef], bootstrap: ClientBootstrap, - remoteAddress: SocketAddress, + remoteAddress: InetSocketAddress, timer: HashedWheelTimer, client: ActiveRemoteClient) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { - val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt) + val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit) val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4) val lenPrep = new LengthFieldPrepender(4) val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance) @@ -386,25 +387,18 @@ class ActiveRemoteClientHandler( val futures: ConcurrentMap[Uuid, CompletableFuture[_]], val supervisors: ConcurrentMap[Uuid, ActorRef], val bootstrap: ClientBootstrap, - val remoteAddress: SocketAddress, + val remoteAddress: InetSocketAddress, val timer: HashedWheelTimer, val client: ActiveRemoteClient) extends SimpleChannelUpstreamHandler { - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { - if (event.isInstanceOf[ChannelStateEvent] && - event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { - } - super.handleUpstream(ctx, event) - } - override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { try { event.getMessage match { case arp: AkkaRemoteProtocol if arp.hasInstruction => val rcp = arp.getInstruction rcp.getCommandType match { - case CommandType.SHUTDOWN => spawn { client.shutdown } + case CommandType.SHUTDOWN => spawn { client.module.shutdownClientConnection(remoteAddress) } } case arp: AkkaRemoteProtocol if arp.hasMessage => val reply = arp.getMessage @@ -451,7 +445,7 @@ class ActiveRemoteClientHandler( } } }, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS) - } else spawn { client.shutdown } + } else spawn { client.module.shutdownClientConnection(remoteAddress) } } override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { @@ -464,9 +458,13 @@ class ActiveRemoteClientHandler( } override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { - client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress)) - if (event.getCause ne null) event.getCause.printStackTrace - event.getChannel.close + event.getCause match { + case e: ReadTimeoutException => + spawn { client.module.shutdownClientConnection(remoteAddress) } + case e => + client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress)) + event.getChannel.close //FIXME Is this the correct behavior? + } } private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = { @@ -569,13 +567,14 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule => import RemoteServerSettings._ private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) + def address = currentServer.get match { - case Some(s) => s.address + case s: Some[NettyRemoteServer] => s.get.address case None => ReflectiveAccess.Remote.configDefaultAddress } def name = currentServer.get match { - case Some(s) => s.name + case s: Some[NettyRemoteServer] => s.get.name case None => val a = ReflectiveAccess.Remote.configDefaultAddress "NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort