Re-adding the ExecutionHandler to try to combat OOME
This commit is contained in:
parent
6e9126bb67
commit
64fb2d853d
3 changed files with 42 additions and 3 deletions
|
|
@ -105,6 +105,18 @@ akka {
|
|||
|
||||
# Sets the size of the connection backlog
|
||||
backlog = 4096
|
||||
|
||||
# Length in akka.time-unit how long core threads will be kept alive if idling
|
||||
execution-pool-keepalive = 60s
|
||||
|
||||
# Size of the core pool of the remote execution unit
|
||||
execution-pool-size = 4
|
||||
|
||||
# Maximum channel size, 0 for off
|
||||
max-channel-memory-size = 0
|
||||
|
||||
# Maximum total size of all channels, 0 for off
|
||||
max-total-memory-size = 0
|
||||
}
|
||||
|
||||
client {
|
||||
|
|
|
|||
|
|
@ -76,6 +76,23 @@ class RemoteSettings(val config: Config, val systemName: String) extends Extensi
|
|||
|
||||
val Backlog = config.getInt("akka.remote.server.backlog")
|
||||
|
||||
val ExecutionPoolKeepAlive = Duration(config.getMilliseconds("akka.remote.server.execution-pool-keepalive"), MILLISECONDS)
|
||||
|
||||
val ExecutionPoolSize = config.getInt("akka.remote.server.execution-pool-size") match {
|
||||
case sz if sz < 1 ⇒ throw new IllegalArgumentException("akka.remote.server.execution-pool-size is less than 1")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxChannelMemorySize = config.getInt("akka.remote.server.max-channel-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-channel-memory-size is less than 0")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
val MaxTotalMemorySize = config.getInt("akka.remote.server.max-total-memory-size") match {
|
||||
case sz if sz < 0 ⇒ throw new IllegalArgumentException("akka.remote.server.max-total-memory-size is less than 0")
|
||||
case sz ⇒ sz
|
||||
}
|
||||
|
||||
// TODO handle the system name right and move this to config file syntax
|
||||
val URI = "akka://sys@" + Hostname + ":" + Port
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,12 +16,12 @@ import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, Lengt
|
|||
import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder }
|
||||
import org.jboss.netty.handler.timeout.{ ReadTimeoutHandler, ReadTimeoutException }
|
||||
import org.jboss.netty.util.{ TimerTask, Timeout, HashedWheelTimer }
|
||||
import org.jboss.netty.handler.execution.{ OrderedMemoryAwareThreadPoolExecutor, ExecutionHandler }
|
||||
import scala.collection.mutable.HashMap
|
||||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent._
|
||||
import java.util.concurrent.atomic._
|
||||
import akka.AkkaException
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging
|
||||
import locks.ReentrantReadWriteLock
|
||||
import org.jboss.netty.channel._
|
||||
|
|
@ -507,10 +507,18 @@ class NettyRemoteServer(
|
|||
|
||||
private val bootstrap = new ServerBootstrap(factory)
|
||||
|
||||
private val executor = new ExecutionHandler(
|
||||
new OrderedMemoryAwareThreadPoolExecutor(
|
||||
ExecutionPoolSize,
|
||||
MaxChannelMemorySize,
|
||||
MaxTotalMemorySize,
|
||||
ExecutionPoolKeepAlive.length,
|
||||
ExecutionPoolKeepAlive.unit))
|
||||
|
||||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
|
||||
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, loader, remoteSupport)
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executor, loader, remoteSupport)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("backlog", Backlog)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
|
|
@ -538,6 +546,7 @@ class NettyRemoteServer(
|
|||
openChannels.disconnect
|
||||
openChannels.close.awaitUninterruptibly
|
||||
bootstrap.releaseExternalResources()
|
||||
executor.releaseExternalResources()
|
||||
remoteSupport.notifyListeners(RemoteServerShutdown(remoteSupport))
|
||||
} catch {
|
||||
case e: Exception ⇒ remoteSupport.notifyListeners(RemoteServerError(e, remoteSupport))
|
||||
|
|
@ -548,6 +557,7 @@ class NettyRemoteServer(
|
|||
class RemoteServerPipelineFactory(
|
||||
val name: String,
|
||||
val openChannels: ChannelGroup,
|
||||
val executor: ExecutionHandler,
|
||||
val loader: Option[ClassLoader],
|
||||
val remoteSupport: NettyRemoteSupport) extends ChannelPipelineFactory {
|
||||
|
||||
|
|
@ -561,7 +571,7 @@ class RemoteServerPipelineFactory(
|
|||
|
||||
val authenticator = if (RequireCookie) new RemoteServerAuthenticationHandler(SecureCookie) :: Nil else Nil
|
||||
val remoteServer = new RemoteServerHandler(name, openChannels, loader, remoteSupport)
|
||||
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: authenticator ::: remoteServer :: Nil
|
||||
val stages: List[ChannelHandler] = lenDec :: protobufDec :: lenPrep :: protobufEnc :: executor :: authenticator ::: remoteServer :: Nil
|
||||
new StaticChannelPipeline(stages: _*)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue