Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
a8991ac538
6 changed files with 52 additions and 19 deletions
|
|
@ -44,4 +44,24 @@ object RemoteServerSettings {
|
|||
}
|
||||
|
||||
val BACKLOG = config.getInt("akka.remote.server.backlog", 4096)
|
||||
|
||||
val EXECUTION_POOL_KEEPALIVE = Duration(config.getInt("akka.remote.server.execution-pool-keepalive", 60), TIME_UNIT)
|
||||
|
||||
val EXECUTION_POOL_SIZE = {
|
||||
val sz = config.getInt("akka.remote.server.execution-pool-size",16)
|
||||
if (sz < 1) throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
|
||||
sz
|
||||
}
|
||||
|
||||
val MAX_CHANNEL_MEMORY_SIZE = {
|
||||
val sz = config.getInt("akka.remote.server.max-channel-memory-size", 0)
|
||||
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0")
|
||||
sz
|
||||
}
|
||||
|
||||
val MAX_TOTAL_MEMORY_SIZE = {
|
||||
val sz = config.getInt("akka.remote.server.max-total-memory-size", 0)
|
||||
if (sz < 0) throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0")
|
||||
sz
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -34,6 +34,7 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt
|
|||
import org.jboss.netty.handler.codec.compression.{ ZlibDecoder, ZlibEncoder }
|
||||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
|
||||
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
|
||||
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
|
||||
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
|
||||
import org.jboss.netty.handler.ssl.SslHandler
|
||||
|
||||
|
|
@ -81,8 +82,6 @@ trait NettyRemoteClientModule extends RemoteClientModule { self: ListenerManagem
|
|||
|
||||
private[akka] def withClientFor[T](
|
||||
address: InetSocketAddress, loader: Option[ClassLoader])(fun: RemoteClient => T): T = {
|
||||
loader.foreach(MessageSerializer.setClassLoader(_))//TODO: REVISIT: THIS SMELLS FUNNY
|
||||
|
||||
val key = Address(address)
|
||||
lock.readLock.lock
|
||||
try {
|
||||
|
|
@ -217,15 +216,13 @@ abstract class RemoteClient private[akka] (
|
|||
senderFuture: Option[CompletableFuture[T]]): Option[CompletableFuture[T]] = {
|
||||
if (isRunning) {
|
||||
if (request.getOneWay) {
|
||||
currentChannel.write(RemoteEncoder.encode(request)).addListener(new ChannelFutureListener {
|
||||
def operationComplete(future: ChannelFuture) {
|
||||
if (future.isCancelled) {
|
||||
//We don't care about that right now
|
||||
} else if (!future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
}
|
||||
}
|
||||
})
|
||||
val future = currentChannel.write(RemoteEncoder.encode(request))
|
||||
future.awaitUninterruptibly()
|
||||
if (!future.isCancelled && !future.isSuccess) {
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
throw future.getCause
|
||||
}
|
||||
|
||||
None
|
||||
} else {
|
||||
val futureResult = if (senderFuture.isDefined) senderFuture.get
|
||||
|
|
@ -238,7 +235,9 @@ abstract class RemoteClient private[akka] (
|
|||
futures.remove(futureUuid) //Clean this up
|
||||
//We don't care about that right now
|
||||
} else if (!future.isSuccess) {
|
||||
futures.remove(futureUuid) //Clean this up
|
||||
val f = futures.remove(futureUuid) //Clean this up
|
||||
if (f ne null)
|
||||
f.completeWithException(future.getCause)
|
||||
notifyListeners(RemoteClientWriteFailed(request, future.getCause, module, remoteAddress))
|
||||
}
|
||||
}
|
||||
|
|
@ -754,9 +753,17 @@ class RemoteServerPipelineFactory(
|
|||
case "zlib" => (new ZlibEncoder(ZLIB_COMPRESSION_LEVEL) :: Nil, new ZlibDecoder :: Nil)
|
||||
case _ => (Nil, Nil)
|
||||
}
|
||||
|
||||
val execution = new ExecutionHandler(
|
||||
new OrderedMemoryAwareThreadPoolExecutor(
|
||||
EXECUTION_POOL_SIZE,
|
||||
MAX_CHANNEL_MEMORY_SIZE,
|
||||
MAX_TOTAL_MEMORY_SIZE,
|
||||
EXECUTION_POOL_KEEPALIVE.length,
|
||||
EXECUTION_POOL_KEEPALIVE.unit
|
||||
)
|
||||
)
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, server)
|
||||
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: remoteServer :: Nil
|
||||
val stages: List[ChannelHandler] = dec ::: lenDec :: protobufDec :: enc ::: lenPrep :: protobufEnc :: execution :: remoteServer :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
@ -857,8 +864,6 @@ class RemoteServerHandler(
|
|||
}
|
||||
|
||||
private def handleRemoteMessageProtocol(request: RemoteMessageProtocol, channel: Channel) = {
|
||||
//FIXME we should definitely spawn off this in a thread pool or something,
|
||||
// potentially using Actor.spawn or something similar
|
||||
request.getActorInfo.getActorType match {
|
||||
case SCALA_ACTOR => dispatchToActor(request, channel)
|
||||
case TYPED_ACTOR => dispatchToTypedActor(request, channel)
|
||||
|
|
|
|||
|
|
@ -36,6 +36,10 @@ class ConfigSpec extends WordSpec with MustMatchers {
|
|||
getBool("akka.remote.ssl.debug") must equal(None)
|
||||
getBool("akka.remote.ssl.service") must equal(None)
|
||||
getInt("akka.remote.zlib-compression-level") must equal(Some(6))
|
||||
getInt("akka.remote.server.execution-pool-size") must equal(Some(16))
|
||||
getInt("akka.remote.server.execution-pool-keepalive") must equal(Some(60))
|
||||
getInt("akka.remote.server.max-channel-memory-size") must equal(Some(0))
|
||||
getInt("akka.remote.server.max-total-memory-size") must equal(Some(0))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -243,7 +243,7 @@ class MyStatelessActor extends Actor {
|
|||
class MyStatelessActorWithMessagesInMailbox extends Actor {
|
||||
def receive = {
|
||||
case "hello" =>
|
||||
println("# messages in mailbox " + self.mailboxSize)
|
||||
//println("# messages in mailbox " + self.mailboxSize)
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" => self.reply("world")
|
||||
}
|
||||
|
|
@ -263,7 +263,7 @@ class MyStatelessActorWithMessagesInMailbox extends Actor {
|
|||
class MyActorWithSerializableMessages extends Actor {
|
||||
def receive = {
|
||||
case MyMessage(s, t) =>
|
||||
println("# messages in mailbox " + self.mailboxSize)
|
||||
//println("# messages in mailbox " + self.mailboxSize)
|
||||
Thread.sleep(500)
|
||||
case "hello-reply" => self.reply("world")
|
||||
}
|
||||
|
|
|
|||
|
|
@ -117,7 +117,7 @@ class MyStatefulActor extends Actor {
|
|||
|
||||
def receive = {
|
||||
case "hi" =>
|
||||
println("# messages in mailbox " + self.mailboxSize)
|
||||
//println("# messages in mailbox " + self.mailboxSize)
|
||||
Thread.sleep(500)
|
||||
case "hello" =>
|
||||
count = count + 1
|
||||
|
|
|
|||
|
|
@ -139,6 +139,10 @@ akka {
|
|||
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
|
||||
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
|
||||
backlog = 4096 # Sets the size of the connection backlog
|
||||
execution-pool-keepalive = 60# Length in akka.time-unit how long core threads will be kept alive if idling
|
||||
execution-pool-size = 16# Size of the core pool of the remote execution unit
|
||||
max-channel-memory-size = 0 # Maximum channel size, 0 for off
|
||||
max-total-memory-size = 0 # Maximum total size of all channels, 0 for off
|
||||
}
|
||||
|
||||
client {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue