Removing executionHandler from Netty remoting since we do 0 (yes, Daisy, you heard me) blocking ops in the message sends

This commit is contained in:
Viktor Klang 2011-11-09 12:41:37 +01:00
parent bd5b07c573
commit f04b6a5647
3 changed files with 45 additions and 66 deletions

View file

@ -7,7 +7,6 @@ package akka.remote
import akka.AkkaApplication
import akka.actor._
import akka.event.EventHandler
import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher }
import akka.actor.Status._
import akka.util._
import akka.util.duration._
@ -21,6 +20,7 @@ import java.net.InetSocketAddress
import com.eaio.uuid.UUID
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
/**
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
@ -303,4 +303,17 @@ trait RemoteMarshallingOps {
def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol =
createMessageSendEnvelope(createRemoteMessageProtocolBuilder(Right(request.getSender), Left(exception), None).build)
def receiveMessage(remoteMessage: RemoteMessage, untrustedMode: Boolean) {
val recipient = remoteMessage.recipient
remoteMessage.payload match {
case Left(t) throw t
case Right(r) r match {
case _: Terminate if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop()
case _: AutoReceivedMessage if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m recipient.!(m)(remoteMessage.sender)
}
}
}
}

View file

@ -44,24 +44,4 @@ class RemoteServerSettings(val app: AkkaApplication) {
val CONNECTION_TIMEOUT = Duration(config.getInt("akka.remote.server.connection-timeout", 100), DefaultTimeUnit)
val BACKLOG = config.getInt("akka.remote.server.backlog", 4096)
val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), DefaultTimeUnit)
val EXECUTION_POOL_SIZE = {
val sz = config.getInt("akka.remote.server.execution-pool-size", 16)
if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
sz
}
val MAX_CHANNEL_MEMORY_SIZE = {
val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0)
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0")
sz
}
val MAX_TOTAL_MEMORY_SIZE = {
val sz = config.getInt("akka.remote.server.max-total-memory-size", 0)
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0")
sz
}
}

View file

@ -4,7 +4,7 @@
package akka.remote.netty
import akka.actor.{ ActorRef, IllegalActorStateException, AutoReceivedMessage, simpleName }
import akka.actor.{ ActorRef, IllegalActorStateException, simpleName }
import akka.remote._
import RemoteProtocol._
import akka.util._
@ -14,19 +14,15 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap }
import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender }
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
import scala.collection.mutable.HashMap
import scala.collection.JavaConversions._
import java.net.InetSocketAddress
import java.util.concurrent._
import java.util.concurrent.atomic._
import akka.AkkaException
import akka.AkkaApplication
import akka.dispatch.{ Terminate }
class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null);
@ -150,13 +146,24 @@ abstract class RemoteClient private[akka] (
def send(request: RemoteMessageProtocol) {
if (isRunning) { //TODO FIXME RACY
app.eventHandler.debug(this, "Sending message: " + new RemoteMessage(request, remoteSupport))
// tell
try {
val future = currentChannel.write(createMessageSendEnvelope(request))
future.awaitUninterruptibly() //TODO FIXME SWITCH TO NONBLOCKING WRITE
if (!future.isCancelled && !future.isSuccess) {
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
}
val payload = createMessageSendEnvelope(request);
currentChannel.write(payload).addListener(
new ChannelFutureListener {
def operationComplete(future: ChannelFuture) {
if (future.isCancelled) {
//Not interesting at the moment
} else if (!future.isSuccess) {
val socketAddress = future.getChannel.getRemoteAddress match {
case i: InetSocketAddress Some(i)
case _ None
}
notifyListeners(RemoteClientWriteFailed(payload, future.getCause, module, remoteAddress))
}
}
})
} catch {
case e: Exception notifyListeners(RemoteClientError(e, module, remoteAddress))
}
@ -456,18 +463,11 @@ class NettyRemoteServer(val app: AkkaApplication, serverModule: NettyRemoteServe
private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool)
private val bootstrap = new ServerBootstrap(factory)
private val executor = new ExecutionHandler(
new OrderedMemoryAwareThreadPoolExecutor(
EXECUTION_POOL_SIZE,
MAX_CHANNEL_MEMORY_SIZE,
MAX_TOTAL_MEMORY_SIZE,
EXECUTION_POOL_KEEPALIVE.length,
EXECUTION_POOL_KEEPALIVE.unit))
// group of open channels, used for clean-up
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
val pipelineFactory = new RemoteServerPipelineFactory(settings, name, openChannels, executor, loader, serverModule)
val pipelineFactory = new RemoteServerPipelineFactory(settings, name, openChannels, loader, serverModule)
bootstrap.setPipelineFactory(pipelineFactory)
bootstrap.setOption("backlog", BACKLOG)
bootstrap.setOption("child.tcpNoDelay", true)
@ -490,7 +490,6 @@ class NettyRemoteServer(val app: AkkaApplication, serverModule: NettyRemoteServe
openChannels.disconnect
openChannels.close.awaitUninterruptibly
bootstrap.releaseExternalResources()
executor.releaseExternalResources()
serverModule.notifyListeners(RemoteServerShutdown(serverModule))
} catch {
case e: Exception serverModule.notifyListeners(RemoteServerError(e, serverModule))
@ -541,7 +540,6 @@ class RemoteServerPipelineFactory(
val settings: RemoteServerSettings,
val name: String,
val openChannels: ChannelGroup,
val executor: ExecutionHandler,
val loader: Option[ClassLoader],
val server: NettyRemoteServerModule) extends ChannelPipelineFactory {
@ -555,7 +553,7 @@ class RemoteServerPipelineFactory(
val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil
val remoteServer = new RemoteServerHandler(settings, name, openChannels, loader, server)
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: authenticator ::: remoteServer :: Nil
new StaticChannelPipeline(stages: _*)
}
}
@ -643,8 +641,18 @@ class RemoteServerHandler(
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = {
event.getMessage match {
case null throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]")
case remote: AkkaRemoteProtocol if remote.hasMessage handleRemoteMessageProtocol(remote.getMessage, event.getChannel)
case remote: AkkaRemoteProtocol if remote.hasMessage
try {
try {
receiveMessage(new RemoteMessage(remote.getMessage, server.remoteSupport, applicationLoader), UNTRUSTED_MODE)
} catch {
case e: SecurityException
server.notifyListeners(RemoteServerError(e, server))
write(event.getChannel, createErrorReplyMessage(e, remote.getMessage)) //TODO FIXME What is the purpose of this response?
}
} catch {
case e: Exception server.notifyListeners(RemoteServerError(e, server))
}
case remote: AkkaRemoteProtocol if remote.hasInstruction
remote.getInstruction.getCommandType match {
case CommandType.CONNECT //TODO FIXME Create passive connection here
@ -665,28 +673,6 @@ class RemoteServerHandler(
case inet: InetSocketAddress Some(inet)
case _ None
}
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try {
try {
val remoteMessage = new RemoteMessage(request, server.remoteSupport, applicationLoader)
val recipient = remoteMessage.recipient
remoteMessage.payload match {
case Left(t) throw t
case Right(r) r match {
case _: Terminate if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop()
case _: AutoReceivedMessage if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor")
case m recipient.!(m)(remoteMessage.sender)
}
}
} catch {
case e: SecurityException
server.notifyListeners(RemoteServerError(e, server))
write(channel, createErrorReplyMessage(e, request))
}
} catch {
case e: Exception server.notifyListeners(RemoteServerError(e, server))
}
}
class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {