Merge branch 'master' into wip-1705-awaitTermination-patriknw
This commit is contained in:
commit
a4e2b5a511
14 changed files with 78 additions and 71 deletions
|
|
@ -23,6 +23,8 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
|||
getString("akka.version") must equal("2.0-SNAPSHOT")
|
||||
settings.ConfigVersion must equal("2.0-SNAPSHOT")
|
||||
|
||||
getBoolean("akka.daemonic") must equal(false)
|
||||
|
||||
getString("akka.actor.default-dispatcher.type") must equal("Dispatcher")
|
||||
getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000)
|
||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(3.0)
|
||||
|
|
@ -45,9 +47,6 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference) {
|
|||
|
||||
getMilliseconds("akka.scheduler.tickDuration") must equal(100)
|
||||
settings.SchedulerTickDuration must equal(100 millis)
|
||||
|
||||
getBoolean("akka.scheduler.daemonic") must equal(true)
|
||||
settings.SchedulerDaemonicity must equal(true)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -31,10 +31,7 @@ object RoutingSpec {
|
|||
"""
|
||||
|
||||
class TestActor extends Actor {
|
||||
def receive = {
|
||||
case _ ⇒
|
||||
println("Hello")
|
||||
}
|
||||
def receive = { case _ ⇒ }
|
||||
}
|
||||
|
||||
class Echo extends Actor {
|
||||
|
|
|
|||
|
|
@ -33,6 +33,9 @@ akka {
|
|||
# See the Akka Documentation for more info about Extensions
|
||||
extensions = []
|
||||
|
||||
# Toggles whether the threads created by this ActorSystem should be daemons or not
|
||||
daemonic = off
|
||||
|
||||
actor {
|
||||
|
||||
provider = "akka.actor.LocalActorRefProvider"
|
||||
|
|
@ -155,9 +158,6 @@ akka {
|
|||
# parameters
|
||||
type = "Dispatcher"
|
||||
|
||||
# Toggles whether the threads created by this dispatcher should be daemons or not
|
||||
daemonic = off
|
||||
|
||||
# Keep alive time for threads
|
||||
keep-alive-time = 60s
|
||||
|
||||
|
|
@ -271,6 +271,5 @@ akka {
|
|||
# For more information see: http://www.jboss.org/netty/
|
||||
tickDuration = 100ms
|
||||
ticksPerWheel = 512
|
||||
daemonic = on
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -91,7 +91,7 @@ object ActorSystem {
|
|||
|
||||
final val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS)
|
||||
final val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel")
|
||||
final val SchedulerDaemonicity = getBoolean("akka.scheduler.daemonic")
|
||||
final val Daemonicity = getBoolean("akka.daemonic")
|
||||
|
||||
if (ConfigVersion != Version)
|
||||
throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]")
|
||||
|
|
@ -293,6 +293,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
|||
import ActorSystem._
|
||||
|
||||
final val settings = new Settings(applicationConfig, name)
|
||||
final val threadFactory = new MonitorableThreadFactory(name, settings.Daemonicity)
|
||||
|
||||
def logConfiguration(): Unit = log.info(settings.toString)
|
||||
|
||||
|
|
@ -379,7 +380,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
|||
}
|
||||
}
|
||||
|
||||
val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
|
||||
val dispatchers = new Dispatchers(settings, DefaultDispatcherPrerequisites(threadFactory, eventStream, deadLetterMailbox, scheduler))
|
||||
val dispatcher = dispatchers.defaultGlobalDispatcher
|
||||
|
||||
def terminationFuture: Future[Unit] = provider.terminationFuture
|
||||
|
|
@ -435,7 +436,7 @@ class ActorSystemImpl(val name: String, applicationConfig: Config) extends Actor
|
|||
*/
|
||||
protected def createScheduler(): Scheduler = {
|
||||
val hwt = new HashedWheelTimer(log,
|
||||
new MonitorableThreadFactory("DefaultScheduler", settings.SchedulerDaemonicity),
|
||||
threadFactory.copy(threadFactory.name + "-scheduler"),
|
||||
settings.SchedulerTickDuration,
|
||||
settings.SchedulerTicksPerWheel)
|
||||
// note that dispatcher is by-name parameter in DefaultScheduler constructor,
|
||||
|
|
|
|||
|
|
@ -342,7 +342,7 @@ abstract class MessageDispatcherConfigurator(val config: Config, val prerequisit
|
|||
|
||||
//Apply the following options to the config if they are present in the config
|
||||
|
||||
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig(daemonic = config getBoolean "daemonic"))
|
||||
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig())
|
||||
.setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS))
|
||||
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
|
||||
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
|
||||
|
|
|
|||
|
|
@ -32,7 +32,14 @@ class Dispatcher(
|
|||
val shutdownTimeout: Duration)
|
||||
extends MessageDispatcher(_prerequisites) {
|
||||
|
||||
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(id)
|
||||
protected[akka] val executorServiceFactory: ExecutorServiceFactory =
|
||||
executorServiceFactoryProvider.createExecutorServiceFactory(
|
||||
id,
|
||||
prerequisites.threadFactory match {
|
||||
case m: MonitorableThreadFactory ⇒ m.copy(m.name + "-" + id)
|
||||
case other ⇒ other
|
||||
})
|
||||
|
||||
protected[akka] val executorService = new AtomicReference[ExecutorService](new ExecutorServiceDelegate {
|
||||
lazy val executor = executorServiceFactory.createExecutorService
|
||||
})
|
||||
|
|
|
|||
|
|
@ -4,8 +4,6 @@
|
|||
|
||||
package akka.dispatch
|
||||
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import akka.actor.newUuid
|
||||
import akka.util.{ Duration, ReflectiveAccess }
|
||||
import akka.actor.ActorSystem
|
||||
|
|
@ -17,14 +15,17 @@ import com.typesafe.config.ConfigFactory
|
|||
import akka.config.ConfigurationException
|
||||
import akka.event.Logging.Warning
|
||||
import akka.actor.Props
|
||||
import java.util.concurrent.{ ThreadFactory, TimeUnit, ConcurrentHashMap }
|
||||
|
||||
trait DispatcherPrerequisites {
|
||||
def threadFactory: ThreadFactory
|
||||
def eventStream: EventStream
|
||||
def deadLetterMailbox: Mailbox
|
||||
def scheduler: Scheduler
|
||||
}
|
||||
|
||||
case class DefaultDispatcherPrerequisites(
|
||||
val threadFactory: ThreadFactory,
|
||||
val eventStream: EventStream,
|
||||
val deadLetterMailbox: Mailbox,
|
||||
val scheduler: Scheduler) extends DispatcherPrerequisites
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ object ThreadPoolConfig {
|
|||
val defaultCorePoolSize: Int = 16
|
||||
val defaultMaxPoolSize: Int = 128
|
||||
val defaultTimeout: Duration = Duration(60000L, TimeUnit.MILLISECONDS)
|
||||
val defaultRejectionPolicy: RejectedExecutionHandler = new SaneRejectedExecutionHandler()
|
||||
|
||||
def scaledPoolSize(floor: Int, multiplier: Double, ceiling: Int): Int = {
|
||||
import scala.math.{ min, max }
|
||||
|
|
@ -54,7 +55,7 @@ trait ExecutorServiceFactory {
|
|||
* Generic way to specify an ExecutorService to a Dispatcher, create it with the given name if desired
|
||||
*/
|
||||
trait ExecutorServiceFactoryProvider {
|
||||
def createExecutorServiceFactory(name: String): ExecutorServiceFactory
|
||||
def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -65,16 +66,24 @@ case class ThreadPoolConfig(allowCorePoolTimeout: Boolean = ThreadPoolConfig.def
|
|||
maxPoolSize: Int = ThreadPoolConfig.defaultMaxPoolSize,
|
||||
threadTimeout: Duration = ThreadPoolConfig.defaultTimeout,
|
||||
queueFactory: ThreadPoolConfig.QueueFactory = ThreadPoolConfig.linkedBlockingQueue(),
|
||||
daemonic: Boolean = false)
|
||||
rejectionPolicy: RejectedExecutionHandler = ThreadPoolConfig.defaultRejectionPolicy)
|
||||
extends ExecutorServiceFactoryProvider {
|
||||
class ThreadPoolExecutorServiceFactory(val threadFactory: ThreadFactory) extends ExecutorServiceFactory {
|
||||
def createExecutorService: ExecutorService = {
|
||||
val service = new ThreadPoolExecutor(corePoolSize, maxPoolSize, threadTimeout.length, threadTimeout.unit, queueFactory(), threadFactory, new SaneRejectedExecutionHandler)
|
||||
val service = new ThreadPoolExecutor(
|
||||
corePoolSize,
|
||||
maxPoolSize,
|
||||
threadTimeout.length,
|
||||
threadTimeout.unit,
|
||||
queueFactory(),
|
||||
threadFactory,
|
||||
rejectionPolicy)
|
||||
service.allowCoreThreadTimeOut(allowCorePoolTimeout)
|
||||
service
|
||||
}
|
||||
}
|
||||
final def createExecutorServiceFactory(name: String): ExecutorServiceFactory = new ThreadPoolExecutorServiceFactory(new MonitorableThreadFactory(name, daemonic))
|
||||
final def createExecutorServiceFactory(name: String, threadFactory: ThreadFactory): ExecutorServiceFactory =
|
||||
new ThreadPoolExecutorServiceFactory(threadFactory)
|
||||
}
|
||||
|
||||
trait DispatcherBuilder {
|
||||
|
|
@ -143,16 +152,20 @@ case class ThreadPoolConfigDispatcherBuilder(dispatcherFactory: (ThreadPoolConfi
|
|||
def configure(fs: Option[Function[ThreadPoolConfigDispatcherBuilder, ThreadPoolConfigDispatcherBuilder]]*): ThreadPoolConfigDispatcherBuilder = fs.foldLeft(this)((c, f) ⇒ f.map(_(c)).getOrElse(c))
|
||||
}
|
||||
|
||||
class MonitorableThreadFactory(val name: String, val daemonic: Boolean = false) extends ThreadFactory {
|
||||
object MonitorableThreadFactory {
|
||||
val doNothing: Thread.UncaughtExceptionHandler =
|
||||
new Thread.UncaughtExceptionHandler() { def uncaughtException(thread: Thread, cause: Throwable) = () }
|
||||
}
|
||||
|
||||
case class MonitorableThreadFactory(name: String,
|
||||
daemonic: Boolean,
|
||||
exceptionHandler: Thread.UncaughtExceptionHandler = MonitorableThreadFactory.doNothing)
|
||||
extends ThreadFactory {
|
||||
protected val counter = new AtomicLong
|
||||
protected val doNothing: Thread.UncaughtExceptionHandler =
|
||||
new Thread.UncaughtExceptionHandler() {
|
||||
def uncaughtException(thread: Thread, cause: Throwable) = {}
|
||||
}
|
||||
|
||||
def newThread(runnable: Runnable) = {
|
||||
val t = new Thread(runnable, name + counter.incrementAndGet())
|
||||
t.setUncaughtExceptionHandler(doNothing)
|
||||
t.setUncaughtExceptionHandler(exceptionHandler)
|
||||
t.setDaemon(daemonic)
|
||||
t
|
||||
}
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ class SquarerImpl(val name: String) extends Squarer {
|
|||
//#typed-actor-impl-methods
|
||||
}
|
||||
//#typed-actor-impl
|
||||
|
||||
import java.lang.Integer.{ parseInt ⇒ println } //Mr funny man avoids printing to stdout AND keeping docs alright
|
||||
//#typed-actor-supercharge
|
||||
trait Foo {
|
||||
def doFoo(times: Int): Unit = println("doFoo(" + times + ")")
|
||||
|
|
|
|||
|
|
@ -22,8 +22,6 @@ object DispatcherDocSpec {
|
|||
my-dispatcher {
|
||||
# Dispatcher is the name of the event-based dispatcher
|
||||
type = Dispatcher
|
||||
# Toggles whether the threads created by this dispatcher should be daemons or not
|
||||
daemonic = off
|
||||
# minimum number of threads to cap factor-based core number to
|
||||
core-pool-size-min = 2
|
||||
# No of core threads ... ceil(available processors * factor)
|
||||
|
|
|
|||
|
|
@ -59,9 +59,6 @@ akka {
|
|||
# Reuse inbound connections for outbound messages
|
||||
use-passive-connections = on
|
||||
|
||||
# Whether any Threds created by the remoting should be daemons or not
|
||||
daemonic = on
|
||||
|
||||
# accrual failure detection config
|
||||
failure-detector {
|
||||
|
||||
|
|
@ -84,13 +81,11 @@ akka {
|
|||
compute-grid-dispatcher {
|
||||
# defaults to same settings as default-dispatcher
|
||||
name = ComputeGridDispatcher
|
||||
daemonic = on
|
||||
}
|
||||
|
||||
# The dispatcher used for the system actor "network-event-sender"
|
||||
network-event-sender-dispatcher {
|
||||
type = PinnedDispatcher
|
||||
daemonic = on
|
||||
}
|
||||
|
||||
server {
|
||||
|
|
@ -105,9 +100,6 @@ akka {
|
|||
# Increase this if you want to be able to send messages with large payloads
|
||||
message-frame-size = 1 MiB
|
||||
|
||||
# Timeout duration
|
||||
connection-timeout = 120s
|
||||
|
||||
# Should the remote server require that it peers share the same secure-cookie
|
||||
# (defined in the 'remote' section)?
|
||||
require-cookie = off
|
||||
|
|
@ -133,11 +125,20 @@ akka {
|
|||
}
|
||||
|
||||
client {
|
||||
# Time before an attempted connection is considered failed
|
||||
connection-timeout = 10s
|
||||
|
||||
#Time between each reconnection attempt
|
||||
reconnect-delay = 5s
|
||||
read-timeout = 3600s
|
||||
message-frame-size = 1 MiB
|
||||
|
||||
# Maximum time window that a client should try to reconnect for
|
||||
reconnection-time-window = 600s
|
||||
|
||||
#Period of time of connection inactivity to be tolerated before hanging up
|
||||
read-timeout = 3600s
|
||||
|
||||
#Max size per message
|
||||
message-frame-size = 1 MiB
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -16,7 +16,6 @@ class RemoteSettings(val config: Config, val systemName: String) {
|
|||
import config._
|
||||
|
||||
val RemoteTransport = getString("akka.remote.transport")
|
||||
val Daemonic = getBoolean("akka.remote.daemonic")
|
||||
val FailureDetectorThreshold = getInt("akka.remote.failure-detector.threshold")
|
||||
val FailureDetectorMaxSampleSize = getInt("akka.remote.failure-detector.max-sample-size")
|
||||
val ShouldCompressData = getBoolean("akka.remote.use-compression")
|
||||
|
|
@ -39,6 +38,7 @@ class RemoteSettings(val config: Config, val systemName: String) {
|
|||
case cookie ⇒ Some(cookie)
|
||||
}
|
||||
|
||||
val ConnectionTimeout = Duration(getMilliseconds("akka.remote.client.connection-timeout"), MILLISECONDS)
|
||||
val ReconnectionTimeWindow = Duration(getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS)
|
||||
val ReadTimeout = Duration(getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
|
||||
val ReconnectDelay = Duration(getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
|
||||
|
|
@ -66,8 +66,13 @@ class RemoteSettings(val config: Config, val systemName: String) {
|
|||
case "" ⇒ InetAddress.getLocalHost.getHostAddress
|
||||
case value ⇒ value
|
||||
}
|
||||
val Port = getInt("akka.remote.server.port")
|
||||
val ConnectionTimeout = Duration(getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
|
||||
val Port = getInt("akka.remote.server.port") match {
|
||||
case 0 ⇒ try {
|
||||
val s = new java.net.ServerSocket(0)
|
||||
try s.getLocalPort finally s.close()
|
||||
} catch { case e ⇒ throw new ConfigurationException("Unable to obtain random port", e) }
|
||||
case other ⇒ other
|
||||
}
|
||||
|
||||
val Backlog = getInt("akka.remote.server.backlog")
|
||||
|
||||
|
|
|
|||
|
|
@ -184,6 +184,7 @@ class ActiveRemoteClient private[akka] (
|
|||
bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
bootstrap.setOption("connectTimeoutMillis", ConnectionTimeout.toMillis)
|
||||
|
||||
log.debug("Starting remote client connection to [{}]", remoteAddress)
|
||||
|
||||
|
|
@ -372,8 +373,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre
|
|||
|
||||
val serverSettings = remote.remoteSettings.serverSettings
|
||||
val clientSettings = remote.remoteSettings.clientSettings
|
||||
|
||||
val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic)
|
||||
val threadFactory = _system.threadFactory.copy(_system.threadFactory.name + "-remote")
|
||||
val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory)
|
||||
|
||||
val executor = new OrderedMemoryAwareThreadPoolExecutor(
|
||||
|
|
@ -548,23 +548,23 @@ class NettyRemoteServer(
|
|||
Executors.newCachedThreadPool(remoteSupport.threadFactory),
|
||||
Executors.newCachedThreadPool(remoteSupport.threadFactory))
|
||||
|
||||
private val bootstrap = new ServerBootstrap(factory)
|
||||
|
||||
private val executionHandler = new ExecutionHandler(remoteSupport.executor)
|
||||
|
||||
// group of open channels, used for clean-up
|
||||
private val openChannels: ChannelGroup = new DefaultDisposableChannelGroup("akka-remote-server")
|
||||
|
||||
val pipelineFactory = new RemoteServerPipelineFactory(name, openChannels, executionHandler, loader, remoteSupport)
|
||||
bootstrap.setPipelineFactory(pipelineFactory)
|
||||
bootstrap.setOption("backlog", Backlog)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
bootstrap.setOption("child.keepAlive", true)
|
||||
bootstrap.setOption("child.reuseAddress", true)
|
||||
bootstrap.setOption("child.connectTimeoutMillis", ConnectionTimeout.toMillis)
|
||||
private val bootstrap: ServerBootstrap = {
|
||||
val b = new ServerBootstrap(factory)
|
||||
b.setPipelineFactory(pipelineFactory)
|
||||
b.setOption("backlog", Backlog)
|
||||
b.setOption("child.tcpNoDelay", true)
|
||||
b.setOption("child.keepAlive", true)
|
||||
b.setOption("child.reuseAddress", true)
|
||||
b
|
||||
}
|
||||
|
||||
openChannels.add(bootstrap.bind(new InetSocketAddress(address.transport.ip.get, address.transport.port)))
|
||||
remoteSupport.notifyListeners(RemoteServerStarted(remoteSupport))
|
||||
|
||||
def shutdown() {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -16,13 +16,12 @@ class RemoteConfigSpec extends AkkaSpec("") {
|
|||
getString("akka.remote.secure-cookie") must equal("")
|
||||
getBoolean("akka.remote.use-passive-connections") must equal(true)
|
||||
getMilliseconds("akka.remote.backoff-timeout") must equal(0)
|
||||
getBoolean("akka.remote.daemonic") must equal(true)
|
||||
// getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
|
||||
|
||||
//akka.remote.server
|
||||
getInt("akka.remote.server.port") must equal(2552)
|
||||
getBytes("akka.remote.server.message-frame-size") must equal(1048576L)
|
||||
getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000)
|
||||
|
||||
getBoolean("akka.remote.server.require-cookie") must equal(false)
|
||||
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
|
||||
getInt("akka.remote.server.backlog") must equal(4096)
|
||||
|
|
@ -38,24 +37,11 @@ class RemoteConfigSpec extends AkkaSpec("") {
|
|||
getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000)
|
||||
getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000)
|
||||
getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000)
|
||||
getMilliseconds("akka.remote.client.connection-timeout") must equal(10000)
|
||||
|
||||
// TODO cluster config will go into akka-cluster/reference.conf when we enable that module
|
||||
//akka.cluster
|
||||
getStringList("akka.cluster.seed-nodes") must equal(new java.util.ArrayList[String])
|
||||
|
||||
// getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000)
|
||||
// getMilliseconds("akka.cluster.session-timeout") must equal(60 * 1000)
|
||||
// getMilliseconds("akka.cluster.connection-timeout") must equal(60 * 1000)
|
||||
// getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true)
|
||||
// getString("akka.cluster.log-directory") must equal("_akka_cluster")
|
||||
|
||||
// //akka.cluster.replication
|
||||
// getString("akka.cluster.replication.digest-type") must equal("MAC")
|
||||
// getString("akka.cluster.replication.password") must equal("secret")
|
||||
// getInt("akka.cluster.replication.ensemble-size") must equal(3)
|
||||
// getInt("akka.cluster.replication.quorum-size") must equal(2)
|
||||
// getInt("akka.cluster.replication.snapshot-frequency") must equal(1000)
|
||||
// getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue