Ripping out ReadTimeout and adding Idle timeout and fixing issues with configured port on top of that

This commit is contained in:
Viktor Klang 2012-02-01 16:06:30 +01:00
parent d8d0f4486f
commit 9421f37f96
8 changed files with 448 additions and 404 deletions

View file

@ -10,7 +10,6 @@ import org.jboss.netty.channel.group.DefaultChannelGroup
import org.jboss.netty.channel.{ ChannelHandler, StaticChannelPipeline, SimpleChannelUpstreamHandler, MessageEvent, ExceptionEvent, ChannelStateEvent, ChannelPipelineFactory, ChannelPipeline, ChannelHandlerContext, ChannelFuture, Channel }
import org.jboss.netty.handler.codec.frame.{ LengthFieldPrepender, LengthFieldBasedFrameDecoder }
import org.jboss.netty.handler.execution.ExecutionHandler
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import akka.remote.RemoteProtocol.{ RemoteControlProtocol, CommandType, AkkaRemoteProtocol }
import akka.remote.{ RemoteProtocol, RemoteMessage, RemoteLifeCycleEvent, RemoteClientStarted, RemoteClientShutdown, RemoteClientException, RemoteClientError, RemoteClientDisconnected, RemoteClientConnected }
import akka.actor.{ simpleName, Address }
@ -24,6 +23,7 @@ import java.net.InetAddress
import org.jboss.netty.util.TimerTask
import org.jboss.netty.util.Timeout
import java.util.concurrent.TimeUnit
import org.jboss.netty.handler.timeout.{ IdleState, IdleStateEvent, IdleStateAwareChannelHandler, IdleStateHandler }
class RemoteClientMessageBufferException(message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
@ -159,7 +159,7 @@ class ActiveRemoteClient private[akka] (
executionHandler = new ExecutionHandler(netty.executor)
val b = new ClientBootstrap(netty.clientChannelFactory)
b.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, b, executionHandler, remoteAddress, this))
b.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, b, executionHandler, remoteAddress, localAddress, this))
b.setOption("tcpNoDelay", true)
b.setOption("keepAlive", true)
b.setOption("connectTimeoutMillis", settings.ConnectionTimeout.toMillis)
@ -234,14 +234,37 @@ class ActiveRemoteClientHandler(
val name: String,
val bootstrap: ClientBootstrap,
val remoteAddress: Address,
val localAddress: Address,
val timer: HashedWheelTimer,
val client: ActiveRemoteClient)
extends SimpleChannelUpstreamHandler {
extends IdleStateAwareChannelHandler {
def runOnceNow(thunk: Unit): Unit = timer.newTimeout(new TimerTask() {
def run(timeout: Timeout) = try { thunk } finally { timeout.cancel() }
}, 0, TimeUnit.MILLISECONDS)
override def channelIdle(ctx: ChannelHandlerContext, e: IdleStateEvent) {
import IdleState._
def createHeartBeat(localAddress: Address, cookie: Option[String]): AkkaRemoteProtocol = {
val beat = RemoteControlProtocol.newBuilder.setCommandType(CommandType.HEARTBEAT)
if (cookie.nonEmpty) beat.setCookie(cookie.get)
client.netty.createControlEnvelope(
beat.setOrigin(RemoteProtocol.AddressProtocol.newBuilder
.setSystem(localAddress.system)
.setHostname(localAddress.host.get)
.setPort(localAddress.port.get)
.build).build)
}
e.getState match {
case READER_IDLE e.getChannel.close()
case WRITER_IDLE e.getChannel.write(createHeartBeat(localAddress, client.netty.settings.SecureCookie))
case ALL_IDLE e.getChannel.close()
}
}
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) {
try {
event.getMessage match {
@ -291,18 +314,9 @@ class ActiveRemoteClientHandler(
}
override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = {
val cause = event.getCause
if (cause ne null) {
client.notifyListeners(RemoteClientError(cause, client.netty, client.remoteAddress))
cause match {
case e: ReadTimeoutException
runOnceNow {
client.netty.shutdownClientConnection(remoteAddress) // spawn in another thread
}
case e: Exception event.getChannel.close()
}
} else client.notifyListeners(RemoteClientError(new Exception("Unknown cause"), client.netty, client.remoteAddress))
val cause = if (event.getCause ne null) event.getCause else new Exception("Unknown cause")
client.notifyListeners(RemoteClientError(cause, client.netty, client.remoteAddress))
event.getChannel.close()
}
}
@ -311,17 +325,21 @@ class ActiveRemoteClientPipelineFactory(
bootstrap: ClientBootstrap,
executionHandler: ExecutionHandler,
remoteAddress: Address,
localAddress: Address,
client: ActiveRemoteClient) extends ChannelPipelineFactory {
import client.netty.settings
def getPipeline: ChannelPipeline = {
val timeout = new ReadTimeoutHandler(client.netty.timer, settings.ReadTimeout.length, settings.ReadTimeout.unit)
val timeout = new IdleStateHandler(client.netty.timer,
settings.ReadTimeout.toSeconds.toInt,
settings.WriteTimeout.toSeconds.toInt,
settings.AllTimeout.toSeconds.toInt)
val lenDec = new LengthFieldBasedFrameDecoder(settings.MessageFrameSize, 0, 4, 0, 4)
val lenPrep = new LengthFieldPrepender(4)
val messageDec = new RemoteMessageDecoder
val messageEnc = new RemoteMessageEncoder(client.netty)
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, client.netty.timer, client)
val remoteClient = new ActiveRemoteClientHandler(name, bootstrap, remoteAddress, localAddress, client.netty.timer, client)
new StaticChannelPipeline(timeout, lenDec, messageDec, lenPrep, messageEnc, executionHandler, remoteClient)
}