Fixing ReadTimeoutException, and implement proper shutdown after timeout

This commit is contained in:
Viktor Klang 2011-03-14 11:21:41 +01:00
parent af97128c36
commit 49338dc9ed

View file

@ -27,7 +27,7 @@ import org.jboss.netty.bootstrap.{ServerBootstrap,ClientBootstrap}
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.ReadTimeoutHandler
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import org.jboss.netty.handler.ssl.SslHandler
@ -80,14 +80,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
lock.readLock.lock
try {
val c = remoteClients.get(key) match {
case Some(client) => client
case s: Some[RemoteClient] => s.get
case None =>
lock.readLock.unlock
lock.writeLock.lock //Lock upgrade, not supported natively
try {
try {
remoteClients.get(key) match { //Recheck for addition, race between upgrades
case Some(client) => client //If already populated by other writer
case s: Some[RemoteClient] => s.get //If already populated by other writer
case None => //Populate map
val client = new ActiveRemoteClient(this, address, loader, self.notifyListeners _)
client.connect()
@ -103,14 +103,14 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
def shutdownClientConnection(address: InetSocketAddress): Boolean = lock withWriteGuard {
remoteClients.remove(Address(address)) match {
case Some(client) => client.shutdown
case s: Some[RemoteClient] => s.get.shutdown
case None => false
}
}
def restartClientConnection(address: InetSocketAddress): Boolean = lock withReadGuard {
remoteClients.get(Address(address)) match {
case Some(client) => client.connect(reconnectIfAlreadyConnected = true)
case s: Some[RemoteClient] => s.get.connect(reconnectIfAlreadyConnected = true)
case None => false
}
}
@ -120,7 +120,7 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
private[akka] def deregisterSupervisorForActor(actorRef: ActorRef): ActorRef = lock withReadGuard {
remoteClients.get(Address(actorRef.homeAddress.get)) match {
case Some(client) => client.deregisterSupervisorForActor(actorRef)
case s: Some[RemoteClient] => s.get.deregisterSupervisorForActor(actorRef)
case None => actorRef
}
}
@ -325,6 +325,7 @@ class ActiveRemoteClient private[akka] (
}
}
//Please note that this method does _not_ remove the ARC from the NettyRemoteClientModule's map of clients
def shutdown = runSwitch switchOff {
notifyListeners(RemoteClientShutdown(module, remoteAddress))
timer.stop
@ -356,12 +357,12 @@ class ActiveRemoteClientPipelineFactory(
futures: ConcurrentMap[Uuid, CompletableFuture[_]],
supervisors: ConcurrentMap[Uuid, ActorRef],
bootstrap: ClientBootstrap,
remoteAddress: SocketAddress,
remoteAddress: InetSocketAddress,
timer: HashedWheelTimer,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.toMillis.toInt)
val timeout = new ReadTimeoutHandler(timer, RemoteClientSettings.READ_TIMEOUT.length, RemoteClientSettings.READ_TIMEOUT.unit)
val lenDec = new LengthFieldBasedFrameDecoder(RemoteClientSettings.MESSAGE_FRAME_SIZE, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val protobufDec = new ProtobufDecoder(AkkaRemoteProtocol.getDefaultInstance)
@ -386,25 +387,18 @@ class ActiveRemoteClientHandler(
val futures: ConcurrentMap[Uuid, CompletableFuture[_]],
val supervisors: ConcurrentMap[Uuid, ActorRef],
val bootstrap: ClientBootstrap,
val remoteAddress: SocketAddress,
val remoteAddress: InetSocketAddress,
val timer: HashedWheelTimer,
val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler {
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] &&
event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
}
super.handleUpstream(ctx, event)
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
event.getMessage match {
case arp: AkkaRemoteProtocol if arp.hasInstruction =>
val rcp = arp.getInstruction
rcp.getCommandType match {
case CommandType.SHUTDOWN => spawn { client.shutdown }
case CommandType.SHUTDOWN => spawn { client.module.shutdownClientConnection(remoteAddress) }
}
case arp: AkkaRemoteProtocol if arp.hasMessage =>
val reply = arp.getMessage
@ -451,7 +445,7 @@ class ActiveRemoteClientHandler(
}
}
}, RemoteClientSettings.RECONNECT_DELAY.toMillis, TimeUnit.MILLISECONDS)
} else spawn { client.shutdown }
} else spawn { client.module.shutdownClientConnection(remoteAddress) }
}
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
@ -464,9 +458,13 @@ class ActiveRemoteClientHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
client.notifyListeners(RemoteClientError(event.getCause, client.module, client.remoteAddress))
if (event.getCause ne null) event.getCause.printStackTrace
event.getChannel.close
event.getCause match {
case e: ReadTimeoutException =>
spawn { client.module.shutdownClientConnection(remoteAddress) }
case e =>
client.notifyListeners(RemoteClientError(e, client.module, client.remoteAddress))
event.getChannel.close //FIXME Is this the correct behavior?
}
}
private def parseException(reply: RemoteMessageProtocol, loader: Option[ClassLoader]): Throwable = {
@ -569,13 +567,14 @@ trait NettyRemoteServerModule extends RemoteServerModule { self: RemoteModule =>
import RemoteServerSettings._
private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None)
def address = currentServer.get match {
case Some(s) => s.address
case s: Some[NettyRemoteServer] => s.get.address
case None => ReflectiveAccess.Remote.configDefaultAddress
}
def name = currentServer.get match {
case Some(s) => s.name
case s: Some[NettyRemoteServer] => s.get.name
case None =>
val a = ReflectiveAccess.Remote.configDefaultAddress
"NettyRemoteServer@" + a.getAddress.getHostAddress + ":" + a.getPort