From db1b50a6b6c38dff69d738b455a1ce390b5c442e Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 11:31:19 +0100 Subject: [PATCH 1/3] Adding OrderedMemoryAwareThreadPoolExecutor with an ExecutionHandler to the NettyRemoteServer --- .../main/scala/akka/remote/RemoteShared.scala | 20 +++++++++++++++++++ .../remote/netty/NettyRemoteSupport.scala | 15 ++++++++++---- .../src/test/scala/config/ConfigSpec.scala | 4 ++++ config/akka-reference.conf | 4 ++++ 4 files changed, 39 insertions(+), 4 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala index ee4e5cf809..9fa9d1b5c0 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteShared.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteShared.scala @@ -44,4 +44,24 @@ object RemoteServerSettings { } val BACKLOG = config.getInt("akka.remote.server.backlog", 4096) + + val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT) + + val EXECUTION_POOL_SIZE = { + val sz = config.getInt("akka.remote.server.execution-pool-size",16) + if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1") + sz + } + + val MAX_CHANNEL_MEMORY_SIZE = { + val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0) + if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0") + sz + } + + val MAX_TOTAL_MEMORY_SIZE = { + val sz = config.getInt("akka.remote.server.max-total-memory-size", 0) + if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0") + sz + } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 893b22b059..3fab1b20c1 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -33,6 +33,7 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException } +import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler } import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer } import org.jboss.netty.handler.ssl.SslHandler @@ -753,9 +754,17 @@ class RemoteServerPipelineFactory( case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil) case _ => (Nil, Nil) } - + val execution = new ExecutionHandler( + new OrderedMemoryAwareThreadPoolExecutor( + EXECUTION_POOL_SIZE, + MAX_CHANNEL_MEMORY_SIZE, + MAX_TOTAL_MEMORY_SIZE, + EXECUTION_POOL_KEEPALIVE.length, + EXECUTION_POOL_KEEPALIVE.unit + ) + ) val remoteServer = new RemoteServerHandler(name, openChannels, loader, server) - val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil + val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil new StaticChannelPipeline(stages: _*) } } @@ -856,8 +865,6 @@ class RemoteServerHandler( } private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = { - //FIXME we should definitely spawn off this in a thread pool or something, - // potentially using Actor.spawn or something similar request.getActorInfo.getActorType match { case SCALA_ACTOR => dispatchToActor(request, channel) case TYPED_ACTOR => dispatchToTypedActor(request, channel) diff --git a/akka-remote/src/test/scala/config/ConfigSpec.scala b/akka-remote/src/test/scala/config/ConfigSpec.scala index 7ba5193f6f..e37edcfc34 100644 --- a/akka-remote/src/test/scala/config/ConfigSpec.scala +++ b/akka-remote/src/test/scala/config/ConfigSpec.scala @@ -36,6 +36,10 @@ class ConfigSpec extends WordSpec with MustMatchers { getBool("akka.remote.ssl.debug") must equal(None) getBool("akka.remote.ssl.service") must equal(None) getInt("akka.remote.zlib-compression-level") must equal(Some(6)) + getInt("akka.remote.server.execution-pool-size") must equal(Some(16)) + getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60)) + getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0)) + getInt("akka.remote.server.max-total-memory-size") must equal(Some(0)) } } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 458f937e5e..b27c383de6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -142,6 +142,10 @@ akka { require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. backlog = 4096 # Sets the size of the connection backlog + execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling + execution-pool-size = 16# Size of the core pool of the remote execution unit + max-channel-memory-size = 0 # Maximum channel size, 0 for off + max-total-memory-size = 0 # Maximum total size of all channels, 0 for off } client { From 1747f2120c11df0a63222ba72e47ff1e2a1f9085 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 13:10:47 +0100 Subject: [PATCH 2/3] Removing printlns --- .../scala/serialization/SerializableTypeClassActorSpec.scala | 4 ++-- akka-remote/src/test/scala/serialization/Ticket435Spec.scala | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala index 7d5524bcd2..2eec948698 100644 --- a/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala +++ b/akka-remote/src/test/scala/serialization/SerializableTypeClassActorSpec.scala @@ -243,7 +243,7 @@ class MyStatelessActor extends Actor { class MyStatelessActorWithMessagesInMailbox extends Actor { def receive = { case "hello" => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") } @@ -263,7 +263,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor { class MyActorWithSerializableMessages extends Actor { def receive = { case MyMessage(s, t) => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello-reply" => self.reply("world") } diff --git a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala index f22c876808..a6193d9914 100644 --- a/akka-remote/src/test/scala/serialization/Ticket435Spec.scala +++ b/akka-remote/src/test/scala/serialization/Ticket435Spec.scala @@ -117,7 +117,7 @@ class MyStatefulActor extends Actor { def receive = { case "hi" => - println("# messages in mailbox " + self.mailboxSize) + //println("# messages in mailbox " + self.mailboxSize) Thread.sleep(500) case "hello" => count = count + 1 From dfbc694059bf11c0b6446c2c323960f4f016b9da Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Wed, 23 Mar 2011 15:00:29 +0100 Subject: [PATCH 3/3] Adding synchronous writes to NettyRemoteSupport --- .../remote/netty/NettyRemoteSupport.scala | 22 +++++++++---------- 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 3fab1b20c1..5baddfed89 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -81,8 +81,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem private[akka] def withClientFor[T]( address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = { - loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY - val key = Address(address) lock.readLock.lock try { @@ -217,15 +215,13 @@ abstract class RemoteClient private[akka] ( senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = { if (isRunning) { if (request.getOneWay) { - currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener { - def operationComplete(future: ChannelFuture) { - if (future.isCancelled) { - //We don't care about that right now - } else if (!future.isSuccess) { - notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) - } - } - }) + val future = currentChannel.write(RemoteEncoder.encode(request)) + future.awaitUninterruptibly() + if (!future.isCancelled && !future.isSuccess) { + notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) + throw future.getCause + } + None } else { val futureResult = if (senderFuture.isDefined) senderFuture.get @@ -238,7 +234,9 @@ abstract class RemoteClient private[akka] ( futures.remove(futureUuid) //Clean this up //We don't care about that right now } else if (!future.isSuccess) { - futures.remove(futureUuid) //Clean this up + val f = futures.remove(futureUuid) //Clean this up + if (f ne null) + f.completeWithException(future.getCause) notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress)) } }