diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 50d1ebb87f..a0f933ce7c 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -7,7 +7,6 @@ package akka.remote import akka.AkkaApplication import akka.actor._ import akka.event.EventHandler -import akka.dispatch.{ Dispatchers, Future, PinnedDispatcher } import akka.actor.Status._ import akka.util._ import akka.util.duration._ @@ -21,6 +20,7 @@ import java.net.InetSocketAddress import com.eaio.uuid.UUID import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression } +import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher } /** * Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc. @@ -303,4 +303,17 @@ trait RemoteMarshallingOps { def createErrorReplyMessage(exception: Throwable, request: RemoteMessageProtocol): AkkaRemoteProtocol = createMessageSendEnvelope(createRemoteMessageProtocolBuilder(Right(request.getSender), Left(exception), None).build) + + def receiveMessage(remoteMessage: RemoteMessage, untrustedMode: Boolean) { + val recipient = remoteMessage.recipient + + remoteMessage.payload match { + case Left(t) ⇒ throw t + case Right(r) ⇒ r match { + case _: Terminate ⇒ if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() + case _: AutoReceivedMessage if (untrustedMode) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") + case m ⇒ recipient.!(m)(remoteMessage.sender) + } + } + } } diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala index 4cae594a68..21eb5009f7 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala @@ -44,24 +44,4 @@ class RemoteServerSettings(val app: AkkaApplication) { val CONNECTION_TIMEOUT = Duration(config.getInt("akka.remote.server.connection-timeout", 100), DefaultTimeUnit) val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) - - val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), DefaultTimeUnit) - - val EXECUTION_POOL_SIZE = { - val sz = config.getInt("akka.remote.server.execution-pool-size", 16) - if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") - sz - } - - val MAX_CHANNEL_MEMORY_SIZE = { - val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0) - if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0") - sz - } - - val MAX_TOTAL_MEMORY_SIZE = { - val sz = config.getInt("akka.remote.server.max-total-memory-size", 0) - if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0") - sz - } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 68f1a52d72..f06ae37e0d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -4,7 +4,7 @@ package akka.remote.netty -import akka.actor.{ ActorRef, IllegalActorStateException, AutoReceivedMessage, simpleName } +import akka.actor.{ ActorRef, IllegalActorStateException, simpleName } import akka.remote._ import RemoteProtocol._ import akka.util._ @@ -14,19 +14,15 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } -import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } -import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import scala.collection.mutable.HashMap -import scala.collection.JavaConversions._ import java.net.InetSocketAddress import java.util.concurrent._ import java.util.concurrent.atomic._ import akka.AkkaException import akka.AkkaApplication -import akka.dispatch.{ Terminate } class RemoteClientMessageBufferException(message: String, cause: Throwable = null) extends AkkaException(message, cause) { def this(msg: String) = this(msg, null); @@ -150,13 +146,24 @@ abstract class RemoteClient private[akka] ( def send(request: RemoteMessageProtocol) { if (isRunning) { //TODO FIXME RACY app.eventHandler.debug(this, "Sending message: " + new RemoteMessage(request, remoteSupport)) + // tell try { - val future = currentChannel.write(createMessageSendEnvelope(request)) - future.awaitUninterruptibly() //TODO FIXME SWITCH TO NONBLOCKING WRITE - if (!future.isCancelled && !future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } + val payload = createMessageSendEnvelope(request); + currentChannel.write(payload).addListener( + new ChannelFutureListener { + def operationComplete(future: ChannelFuture) { + if (future.isCancelled) { + //Not interesting at the moment + } else if (!future.isSuccess) { + val socketAddress = future.getChannel.getRemoteAddress match { + case i: InetSocketAddress ⇒ Some(i) + case _ ⇒ None + } + notifyListeners(RemoteClientWriteFailed(payload, future.getCause, module, remoteAddress)) + } + } + }) } catch { case e: Exception ⇒ notifyListeners(RemoteClientError(e, module, remoteAddress)) } @@ -456,18 +463,11 @@ class NettyRemoteServer(val app: AkkaApplication, serverModule: NettyRemoteServe private val factory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool, Executors.newCachedThreadPool) private val bootstrap = new ServerBootstrap(factory) - private val executor = new ExecutionHandler( - new OrderedMemoryAwareThreadPoolExecutor( - EXECUTION_POOL_SIZE, - MAX_CHANNEL_MEMORY_SIZE, - MAX_TOTAL_MEMORY_SIZE, - EXECUTION_POOL_KEEPALIVE.length, - EXECUTION_POOL_KEEPALIVE.unit)) // group of open channels, used for clean-up private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server") - val pipelineFactory = new RemoteServerPipelineFactory(settings, name, openChannels, executor, loader, serverModule) + val pipelineFactory = new RemoteServerPipelineFactory(settings, name, openChannels, loader, serverModule) bootstrap.setPipelineFactory(pipelineFactory) bootstrap.setOption("backlog", BACKLOG) bootstrap.setOption("child.tcpNoDelay", true) @@ -490,7 +490,6 @@ class NettyRemoteServer(val app: AkkaApplication, serverModule: NettyRemoteServe openChannels.disconnect openChannels.close.awaitUninterruptibly bootstrap.releaseExternalResources() - executor.releaseExternalResources() serverModule.notifyListeners(RemoteServerShutdown(serverModule)) } catch { case e: Exception ⇒ serverModule.notifyListeners(RemoteServerError(e, serverModule)) @@ -541,7 +540,6 @@ class RemoteServerPipelineFactory( val settings: RemoteServerSettings, val name: String, val openChannels: ChannelGroup, - val executor: ExecutionHandler, val loader: Option[ClassLoader], val server: NettyRemoteServerModule) extends ChannelPipelineFactory { @@ -555,7 +553,7 @@ class RemoteServerPipelineFactory( val authenticator = if (REQUIRE_COOKIE) new RemoteServerAuthenticationHandler(SECURE_COOKIE) :: Nil else Nil val remoteServer = new RemoteServerHandler(settings, name, openChannels, loader, server) - val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil + val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: authenticator ::: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } @@ -643,8 +641,18 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = { event.getMessage match { - case null ⇒ throw new IllegalActorStateException("Message in remote MessageEvent is null [" + event + "]") - case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ handleRemoteMessageProtocol(remote.getMessage, event.getChannel) + case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ + try { + try { + receiveMessage(new RemoteMessage(remote.getMessage, server.remoteSupport, applicationLoader), UNTRUSTED_MODE) + } catch { + case e: SecurityException ⇒ + server.notifyListeners(RemoteServerError(e, server)) + write(event.getChannel, createErrorReplyMessage(e, remote.getMessage)) //TODO FIXME What is the purpose of this response? + } + } catch { + case e: Exception ⇒ server.notifyListeners(RemoteServerError(e, server)) + } case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ remote.getInstruction.getCommandType match { case CommandType.CONNECT ⇒ //TODO FIXME Create passive connection here @@ -665,28 +673,6 @@ class RemoteServerHandler( case inet: InetSocketAddress ⇒ Some(inet) case _ ⇒ None } - - private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = try { - try { - val remoteMessage = new RemoteMessage(request, server.remoteSupport, applicationLoader) - val recipient = remoteMessage.recipient - - remoteMessage.payload match { - case Left(t) ⇒ throw t - case Right(r) ⇒ r match { - case _: Terminate ⇒ if (UNTRUSTED_MODE) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() - case _: AutoReceivedMessage if (UNTRUSTED_MODE) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") - case m ⇒ recipient.!(m)(remoteMessage.sender) - } - } - } catch { - case e: SecurityException ⇒ - server.notifyListeners(RemoteServerError(e, server)) - write(channel, createErrorReplyMessage(e, request)) - } - } catch { - case e: Exception ⇒ server.notifyListeners(RemoteServerError(e, server)) - } } class DefaultDisposableChannelGroup(name: String) extends DefaultChannelGroup(name) {