!pro #3809 Update to config 1.2.0
* Changed all duration reads to use implicit conversion and the new getDuration to reduce boilerplate
This commit is contained in:
parent
e25455e935
commit
e441d1b29b
31 changed files with 166 additions and 110 deletions
|
|
@ -12,6 +12,7 @@ import scala.concurrent.duration.{ Duration, FiniteDuration }
|
|||
import akka.ConfigurationException
|
||||
import com.typesafe.config.Config
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
object ActorMailboxSpec {
|
||||
val mailboxConf = ConfigFactory.parseString("""
|
||||
|
|
@ -174,7 +175,7 @@ object ActorMailboxSpec {
|
|||
extends MailboxType with ProducesMessageQueue[MCBoundedMessageQueueSemantics] {
|
||||
|
||||
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
|
||||
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
||||
config.getNanosDuration("mailbox-push-timeout-time"))
|
||||
|
||||
final override def create(owner: Option[ActorRef], system: Option[ActorSystem]): MessageQueue =
|
||||
new BoundedMailbox.MessageQueue(capacity, pushTimeOut)
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import akka.dispatch._
|
|||
import com.typesafe.config.Config
|
||||
import java.util.concurrent.{ LinkedBlockingQueue, BlockingQueue, TimeUnit }
|
||||
import akka.util.Switch
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
class JavaExtensionSpec extends JavaExtension with JUnitSuiteLike
|
||||
|
||||
|
|
@ -87,9 +88,9 @@ object ActorSystemSpec {
|
|||
this,
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
config.getNanosDuration("throughput-deadline-time"),
|
||||
configureExecutor(),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
|
||||
config.getMillisDuration("shutdown-timeout")) {
|
||||
val doneIt = new Switch
|
||||
override protected[akka] def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = {
|
||||
val ret = super.registerForExecution(mbox, hasMessageHint, hasSystemMessageHint)
|
||||
|
|
|
|||
|
|
@ -24,6 +24,7 @@ import java.lang.ref.WeakReference
|
|||
import akka.event.Logging
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
import java.lang.System.identityHashCode
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
object SupervisorHierarchySpec {
|
||||
class FireWorkerException(msg: String) extends Exception(msg)
|
||||
|
|
@ -82,9 +83,9 @@ object SupervisorHierarchySpec {
|
|||
new Dispatcher(this,
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
config.getNanosDuration("throughput-deadline-time"),
|
||||
configureExecutor(),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) {
|
||||
config.getMillisDuration("shutdown-timeout")) {
|
||||
|
||||
override def suspend(cell: ActorCell): Unit = {
|
||||
cell.actor match {
|
||||
|
|
|
|||
|
|
@ -21,6 +21,7 @@ import akka.dispatch._
|
|||
import akka.event.Logging.Error
|
||||
import akka.pattern.ask
|
||||
import akka.testkit._
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.util.Switch
|
||||
import scala.concurrent.duration._
|
||||
import scala.concurrent.{ Await, Future, Promise }
|
||||
|
|
@ -528,13 +529,15 @@ object DispatcherModelSpec {
|
|||
class MessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
|
||||
extends MessageDispatcherConfigurator(config, prerequisites) {
|
||||
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
private val instance: MessageDispatcher =
|
||||
new Dispatcher(this,
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
config.getNanosDuration("throughput-deadline-time"),
|
||||
configureExecutor(),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS)) with MessageDispatcherInterceptor
|
||||
config.getMillisDuration("shutdown-timeout")) with MessageDispatcherInterceptor
|
||||
|
||||
override def dispatcher(): MessageDispatcher = instance
|
||||
}
|
||||
|
|
@ -600,14 +603,16 @@ object BalancingDispatcherModelSpec {
|
|||
class BalancingMessageDispatcherInterceptorConfigurator(config: Config, prerequisites: DispatcherPrerequisites)
|
||||
extends BalancingDispatcherConfigurator(config, prerequisites) {
|
||||
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
override protected def create(mailboxType: MailboxType): BalancingDispatcher =
|
||||
new BalancingDispatcher(this,
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
config.getNanosDuration("throughput-deadline-time"),
|
||||
mailboxType,
|
||||
configureExecutor(),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
|
||||
config.getMillisDuration("shutdown-timeout"),
|
||||
config.getBoolean("attempt-teamwork")) with MessageDispatcherInterceptor
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ import scala.collection.JavaConverters._
|
|||
import scala.concurrent.duration._
|
||||
import akka.actor.ActorSystem
|
||||
import akka.event.Logging.DefaultLogger
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.findClassLoader())) {
|
||||
|
|
@ -35,7 +36,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
|||
settings.SerializeAllMessages should equal(true)
|
||||
|
||||
getInt("akka.scheduler.ticks-per-wheel") should equal(512)
|
||||
getMilliseconds("akka.scheduler.tick-duration") should equal(10)
|
||||
getDuration("akka.scheduler.tick-duration", TimeUnit.MILLISECONDS) should equal(10)
|
||||
getString("akka.scheduler.implementation") should equal("akka.actor.LightArrayRevolverScheduler")
|
||||
|
||||
getBoolean("akka.daemonic") should be(false)
|
||||
|
|
@ -47,14 +48,14 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
|||
getInt("akka.actor.deployment.default.virtual-nodes-factor") should be(10)
|
||||
settings.DefaultVirtualNodesFactor should be(10)
|
||||
|
||||
getMilliseconds("akka.actor.unstarted-push-timeout") should be(10.seconds.toMillis)
|
||||
getDuration("akka.actor.unstarted-push-timeout", TimeUnit.MILLISECONDS) should be(10.seconds.toMillis)
|
||||
settings.UnstartedPushTimeout.duration should be(10.seconds)
|
||||
|
||||
settings.Loggers.size should be(1)
|
||||
settings.Loggers.head should be(classOf[DefaultLogger].getName)
|
||||
getStringList("akka.loggers").get(0) should be(classOf[DefaultLogger].getName)
|
||||
|
||||
getMilliseconds("akka.logger-startup-timeout") should be(5.seconds.toMillis)
|
||||
getDuration("akka.logger-startup-timeout", TimeUnit.MILLISECONDS) should be(5.seconds.toMillis)
|
||||
settings.LoggerStartTimeout.duration should be(5.seconds)
|
||||
|
||||
getInt("akka.log-dead-letters") should be(10)
|
||||
|
|
@ -72,9 +73,9 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
|||
{
|
||||
c.getString("type") should equal("Dispatcher")
|
||||
c.getString("executor") should equal("fork-join-executor")
|
||||
c.getMilliseconds("shutdown-timeout") should equal(1 * 1000)
|
||||
c.getDuration("shutdown-timeout", TimeUnit.MILLISECONDS) should equal(1 * 1000)
|
||||
c.getInt("throughput") should equal(5)
|
||||
c.getMilliseconds("throughput-deadline-time") should equal(0)
|
||||
c.getDuration("throughput-deadline-time", TimeUnit.MILLISECONDS) should equal(0)
|
||||
c.getBoolean("attempt-teamwork") should equal(true)
|
||||
}
|
||||
|
||||
|
|
@ -92,7 +93,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
|||
{
|
||||
val pool = c.getConfig("thread-pool-executor")
|
||||
import pool._
|
||||
getMilliseconds("keep-alive-time") should equal(60 * 1000)
|
||||
getDuration("keep-alive-time", TimeUnit.MILLISECONDS) should equal(60 * 1000)
|
||||
getDouble("core-pool-size-factor") should equal(3.0)
|
||||
getDouble("max-pool-size-factor") should equal(3.0)
|
||||
getInt("task-queue-size") should equal(-1)
|
||||
|
|
@ -135,7 +136,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.defaultReference(ActorSystem.fin
|
|||
|
||||
{
|
||||
c.getInt("mailbox-capacity") should equal(1000)
|
||||
c.getMilliseconds("mailbox-push-timeout-time") should equal(10 * 1000)
|
||||
c.getDuration("mailbox-push-timeout-time", TimeUnit.MILLISECONDS) should equal(10 * 1000)
|
||||
c.getString("mailbox-type") should be("akka.dispatch.UnboundedMailbox")
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,15 +10,15 @@ import java.util.concurrent.TimeUnit
|
|||
import akka.event.Logging
|
||||
|
||||
abstract class PerformanceSpec(cfg: Config = BenchmarkConfig.config) extends AkkaSpec(cfg) with BeforeAndAfterEach {
|
||||
|
||||
import akka.util.Helpers.ConfigOps
|
||||
def config = system.settings.config
|
||||
def isLongRunningBenchmark() = config.getBoolean("benchmark.longRunning")
|
||||
def minClients() = config.getInt("benchmark.minClients")
|
||||
def maxClients() = config.getInt("benchmark.maxClients")
|
||||
def repeatFactor() = config.getInt("benchmark.repeatFactor")
|
||||
def timeDilation() = config.getLong("benchmark.timeDilation")
|
||||
def maxRunDuration() = Duration(config.getMilliseconds("benchmark.maxRunDuration"), TimeUnit.MILLISECONDS)
|
||||
def clientDelay = Duration(config.getNanoseconds("benchmark.clientDelay"), TimeUnit.NANOSECONDS)
|
||||
def maxRunDuration() = config.getMillisDuration("benchmark.maxRunDuration")
|
||||
def clientDelay = config.getNanosDuration("benchmark.clientDelay")
|
||||
|
||||
val resultRepository = BenchResultRepository()
|
||||
lazy val report = new Report(system, resultRepository, compareResultWith)
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import scala.concurrent.duration._
|
|||
import akka.pattern.ask
|
||||
import scala.concurrent.Await
|
||||
import akka.util.Timeout
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import scala.collection.immutable.TreeSet
|
||||
import java.util.concurrent.TimeoutException
|
||||
import java.util.concurrent.atomic.AtomicInteger
|
||||
|
|
@ -100,7 +101,7 @@ object ActorDSL extends dsl.Inbox with dsl.Creators {
|
|||
|
||||
lazy val config = system.settings.config.getConfig("akka.actor.dsl")
|
||||
|
||||
val DSLDefaultTimeout = Duration(config.getMilliseconds("default-timeout"), TimeUnit.MILLISECONDS)
|
||||
val DSLDefaultTimeout = config.getMillisDuration("default-timeout")
|
||||
|
||||
def mkChild(p: Props, name: String) = boss.underlying.asInstanceOf[ActorCell].attachChild(p, name, systemService = true)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -61,14 +61,14 @@ object ActorSystem {
|
|||
* then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader
|
||||
* associated with the ActorSystem class.
|
||||
*
|
||||
* @see <a href="http://typesafehub.github.com/config/v0.4.1/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
*/
|
||||
def create(name: String, config: Config): ActorSystem = apply(name, config)
|
||||
|
||||
/**
|
||||
* Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader
|
||||
*
|
||||
* @see <a href="http://typesafehub.github.com/config/v0.4.1/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
*/
|
||||
def create(name: String, config: Config, classLoader: ClassLoader): ActorSystem = apply(name, config, classLoader)
|
||||
|
||||
|
|
@ -99,14 +99,14 @@ object ActorSystem {
|
|||
* then tries to walk the stack to find the callers class loader, then falls back to the ClassLoader
|
||||
* associated with the ActorSystem class.
|
||||
*
|
||||
* @see <a href="http://typesafehub.github.com/config/v0.4.1/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
*/
|
||||
def apply(name: String, config: Config): ActorSystem = apply(name, config, findClassLoader())
|
||||
|
||||
/**
|
||||
* Creates a new ActorSystem with the name "default", the specified Config, and specified ClassLoader
|
||||
*
|
||||
* @see <a href="http://typesafehub.github.com/config/v0.4.1/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
*/
|
||||
def apply(name: String, config: Config, classLoader: ClassLoader): ActorSystem = new ActorSystemImpl(name, config, classLoader).start()
|
||||
|
||||
|
|
@ -115,14 +115,14 @@ object ActorSystem {
|
|||
*
|
||||
* For more detailed information about the different possible configuration options, look in the Akka Documentation under "Configuration"
|
||||
*
|
||||
* @see <a href="http://typesafehub.github.com/config/v0.4.1/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
*/
|
||||
class Settings(classLoader: ClassLoader, cfg: Config, final val name: String) {
|
||||
|
||||
/**
|
||||
* The backing Config of this ActorSystem's Settings
|
||||
*
|
||||
* @see <a href="http://typesafehub.github.com/config/v0.4.1/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
* @see <a href="http://typesafehub.github.io/config/v1.2.0/" target="_blank">The Typesafe Config Library API Documentation</a>
|
||||
*/
|
||||
final val config: Config = {
|
||||
val config = cfg.withFallback(ConfigFactory.defaultReference(classLoader))
|
||||
|
|
@ -131,13 +131,14 @@ object ActorSystem {
|
|||
}
|
||||
|
||||
import scala.collection.JavaConverters._
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import config._
|
||||
|
||||
final val ConfigVersion: String = getString("akka.version")
|
||||
final val ProviderClass: String = getString("akka.actor.provider")
|
||||
final val SupervisorStrategyClass: String = getString("akka.actor.guardian-supervisor-strategy")
|
||||
final val CreationTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.actor.creation-timeout"), MILLISECONDS))
|
||||
final val UnstartedPushTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.actor.unstarted-push-timeout"), MILLISECONDS))
|
||||
final val CreationTimeout: Timeout = Timeout(config.getMillisDuration("akka.actor.creation-timeout"))
|
||||
final val UnstartedPushTimeout: Timeout = Timeout(config.getMillisDuration("akka.actor.unstarted-push-timeout"))
|
||||
|
||||
final val SerializeAllMessages: Boolean = getBoolean("akka.actor.serialize-messages")
|
||||
final val SerializeAllCreators: Boolean = getBoolean("akka.actor.serialize-creators")
|
||||
|
|
@ -145,7 +146,7 @@ object ActorSystem {
|
|||
final val LogLevel: String = getString("akka.loglevel")
|
||||
final val StdoutLogLevel: String = getString("akka.stdout-loglevel")
|
||||
final val Loggers: immutable.Seq[String] = immutableSeq(getStringList("akka.loggers"))
|
||||
final val LoggerStartTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.logger-startup-timeout"), MILLISECONDS))
|
||||
final val LoggerStartTimeout: Timeout = Timeout(config.getMillisDuration("akka.logger-startup-timeout"))
|
||||
final val LogConfigOnStart: Boolean = config.getBoolean("akka.log-config-on-start")
|
||||
final val LogDeadLetters: Int = config.getString("akka.log-dead-letters").toLowerCase match {
|
||||
case "off" | "false" ⇒ 0
|
||||
|
|
|
|||
|
|
@ -185,15 +185,16 @@ class LightArrayRevolverScheduler(config: Config,
|
|||
extends Scheduler with Closeable {
|
||||
|
||||
import Helpers.Requiring
|
||||
import Helpers.ConfigOps
|
||||
|
||||
val WheelSize =
|
||||
config.getInt("akka.scheduler.ticks-per-wheel")
|
||||
.requiring(ticks ⇒ (ticks & (ticks - 1)) == 0, "ticks-per-wheel must be a power of 2")
|
||||
val TickDuration =
|
||||
Duration(config.getMilliseconds("akka.scheduler.tick-duration"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.scheduler.tick-duration")
|
||||
.requiring(_ >= 10.millis || !Helpers.isWindows, "minimum supported akka.scheduler.tick-duration on Windows is 10ms")
|
||||
.requiring(_ >= 1.millis, "minimum supported akka.scheduler.tick-duration is 1ms")
|
||||
val ShutdownTimeout = Duration(config.getMilliseconds("akka.scheduler.shutdown-timeout"), MILLISECONDS)
|
||||
val ShutdownTimeout = config.getMillisDuration("akka.scheduler.shutdown-timeout")
|
||||
|
||||
import LightArrayRevolverScheduler._
|
||||
|
||||
|
|
|
|||
|
|
@ -637,11 +637,12 @@ class TypedActorExtension(val system: ExtendedActorSystem) extends TypedActorFac
|
|||
protected def typedActor = this
|
||||
|
||||
import system.settings
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
/**
|
||||
* Default timeout for typed actor methods with non-void return type
|
||||
*/
|
||||
final val DefaultReturnTimeout = Timeout(Duration(settings.config.getMilliseconds("akka.actor.typed.timeout"), MILLISECONDS))
|
||||
final val DefaultReturnTimeout = Timeout(settings.config.getMillisDuration("akka.actor.typed.timeout"))
|
||||
|
||||
/**
|
||||
* Retrieves the underlying ActorRef for the supplied TypedActor proxy, or null if none found
|
||||
|
|
|
|||
|
|
@ -339,8 +339,9 @@ class ThreadPoolExecutorConfigurator(config: Config, prerequisites: DispatcherPr
|
|||
val threadPoolConfig: ThreadPoolConfig = createThreadPoolConfigBuilder(config, prerequisites).config
|
||||
|
||||
protected def createThreadPoolConfigBuilder(config: Config, prerequisites: DispatcherPrerequisites): ThreadPoolConfigBuilder = {
|
||||
import akka.util.Helpers.ConfigOps
|
||||
ThreadPoolConfigBuilder(ThreadPoolConfig())
|
||||
.setKeepAliveTime(Duration(config getMilliseconds "keep-alive-time", TimeUnit.MILLISECONDS))
|
||||
.setKeepAliveTime(config.getMillisDuration("keep-alive-time"))
|
||||
.setAllowCoreThreadTimeout(config getBoolean "allow-core-timeout")
|
||||
.setCorePoolSizeFromFactor(config getInt "core-pool-size-min", config getDouble "core-pool-size-factor", config getInt "core-pool-size-max")
|
||||
.setMaxPoolSizeFromFactor(config getInt "max-pool-size-min", config getDouble "max-pool-size-factor", config getInt "max-pool-size-max")
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.dispatch
|
|||
import com.typesafe.config._
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
import scala.util.{ Failure, Success, Try }
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -167,5 +168,15 @@ private[akka] class CachingConfig(_config: Config) extends Config {
|
|||
def atKey(key: String) = new CachingConfig(config.atKey(key))
|
||||
|
||||
def withValue(path: String, value: ConfigValue) = new CachingConfig(config.withValue(path, value))
|
||||
|
||||
def getDuration(path: String, unit: TimeUnit) = config.getDuration(path, unit)
|
||||
|
||||
def getDurationList(path: String, unit: TimeUnit) = config.getDurationList(path, unit)
|
||||
|
||||
def isResolved() = config.isResolved()
|
||||
|
||||
def resolveWith(source: Config, options: ConfigResolveOptions) = config.resolveWith(source, options)
|
||||
|
||||
def resolveWith(source: Config) = config.resolveWith(source)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.event.EventStream
|
|||
import scala.concurrent.duration.Duration
|
||||
import akka.ConfigurationException
|
||||
import akka.actor.Deploy
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
/**
|
||||
* DispatcherPrerequisites represents useful contextual pieces when constructing a MessageDispatcher
|
||||
|
|
@ -178,9 +179,9 @@ class DispatcherConfigurator(config: Config, prerequisites: DispatcherPrerequisi
|
|||
this,
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
config.getNanosDuration("throughput-deadline-time"),
|
||||
configureExecutor(),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS))
|
||||
config.getMillisDuration("shutdown-timeout"))
|
||||
|
||||
/**
|
||||
* Returns the same dispatcher instance for each invocation
|
||||
|
|
@ -231,10 +232,10 @@ class BalancingDispatcherConfigurator(_config: Config, _prerequisites: Dispatche
|
|||
this,
|
||||
config.getString("id"),
|
||||
config.getInt("throughput"),
|
||||
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||
config.getNanosDuration("throughput-deadline-time"),
|
||||
mailboxType,
|
||||
configureExecutor(),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS),
|
||||
config.getMillisDuration("shutdown-timeout"),
|
||||
config.getBoolean("attempt-teamwork"))
|
||||
|
||||
/**
|
||||
|
|
@ -267,6 +268,6 @@ class PinnedDispatcherConfigurator(config: Config, prerequisites: DispatcherPrer
|
|||
override def dispatcher(): MessageDispatcher =
|
||||
new PinnedDispatcher(
|
||||
this, null, config.getString("id"),
|
||||
Duration(config.getMilliseconds("shutdown-timeout"), TimeUnit.MILLISECONDS), threadPoolConfig)
|
||||
config.getMillisDuration("shutdown-timeout"), threadPoolConfig)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.AkkaException
|
|||
import akka.dispatch.sysmsg._
|
||||
import akka.actor.{ ActorCell, ActorRef, Cell, ActorSystem, InternalActorRef, DeadLetter }
|
||||
import akka.util.{ Unsafe, BoundedBlockingQueue }
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.event.Logging.Error
|
||||
import scala.concurrent.duration.Duration
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
|
@ -579,7 +580,7 @@ case class BoundedMailbox(val capacity: Int, val pushTimeOut: FiniteDuration)
|
|||
extends MailboxType with ProducesMessageQueue[BoundedMailbox.MessageQueue] {
|
||||
|
||||
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
|
||||
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
||||
config.getNanosDuration("mailbox-push-timeout-time"))
|
||||
|
||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedMailbox can not be null")
|
||||
|
|
@ -659,7 +660,7 @@ case class BoundedDequeBasedMailbox( final val capacity: Int, final val pushTime
|
|||
extends MailboxType with ProducesMessageQueue[BoundedDequeBasedMailbox.MessageQueue] {
|
||||
|
||||
def this(settings: ActorSystem.Settings, config: Config) = this(config.getInt("mailbox-capacity"),
|
||||
Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS))
|
||||
config.getNanosDuration("mailbox-push-timeout-time"))
|
||||
|
||||
if (capacity < 0) throw new IllegalArgumentException("The capacity for BoundedDequeBasedMailbox can not be negative")
|
||||
if (pushTimeOut eq null) throw new IllegalArgumentException("The push time-out for BoundedDequeBasedMailbox can not be null")
|
||||
|
|
|
|||
|
|
@ -514,6 +514,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
|
||||
val Settings = new Settings(system.settings.config.getConfig("akka.io.tcp"))
|
||||
class Settings private[TcpExt] (_config: Config) extends SelectionHandlerSettings(_config) {
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import _config._
|
||||
|
||||
val NrOfSelectors: Int = getInt("nr-of-selectors") requiring (_ > 0, "nr-of-selectors must be > 0")
|
||||
|
|
@ -523,7 +524,7 @@ class TcpExt(system: ExtendedActorSystem) extends IO.Extension {
|
|||
val MaxDirectBufferPoolSize: Int = getInt("direct-buffer-pool-limit")
|
||||
val RegisterTimeout: Duration = getString("register-timeout") match {
|
||||
case "infinite" ⇒ Duration.Undefined
|
||||
case x ⇒ Duration(getMilliseconds("register-timeout"), MILLISECONDS)
|
||||
case x ⇒ _config.getMillisDuration("register-timeout")
|
||||
}
|
||||
val ReceivedMessageSizeLimit: Int = getString("max-received-message-size") match {
|
||||
case "unlimited" ⇒ Int.MaxValue
|
||||
|
|
|
|||
|
|
@ -18,6 +18,7 @@ import akka.dispatch.ExecutionContexts
|
|||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.duration._
|
||||
import akka.util.Timeout
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import java.util.concurrent.TimeUnit
|
||||
import akka.actor.ActorSystem
|
||||
|
||||
|
|
@ -102,7 +103,7 @@ final case class ScatterGatherFirstCompletedPool(
|
|||
def this(config: Config) =
|
||||
this(
|
||||
nrOfInstances = config.getInt("nr-of-instances"),
|
||||
within = Duration(config.getMilliseconds("within"), TimeUnit.MILLISECONDS),
|
||||
within = config.getMillisDuration("within"),
|
||||
resizer = DefaultResizer.fromConfig(config),
|
||||
usePoolDispatcher = config.hasPath("pool-dispatcher"))
|
||||
|
||||
|
|
@ -167,7 +168,7 @@ final case class ScatterGatherFirstCompletedGroup(
|
|||
def this(config: Config) =
|
||||
this(
|
||||
paths = immutableSeq(config.getStringList("routees.paths")),
|
||||
within = Duration(config.getMilliseconds("within"), TimeUnit.MILLISECONDS))
|
||||
within = config.getMillisDuration("within"))
|
||||
|
||||
/**
|
||||
* Java API
|
||||
|
|
|
|||
|
|
@ -6,6 +6,10 @@ package akka.util
|
|||
import java.util.Comparator
|
||||
import scala.annotation.tailrec
|
||||
import java.util.regex.Pattern
|
||||
import com.typesafe.config.Config
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import scala.concurrent.duration.Duration
|
||||
import java.util.concurrent.TimeUnit
|
||||
|
||||
object Helpers {
|
||||
|
||||
|
|
@ -105,4 +109,17 @@ object Helpers {
|
|||
value
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
*/
|
||||
private[akka] final implicit class ConfigOps(val config: Config) extends AnyVal {
|
||||
def getMillisDuration(path: String): FiniteDuration = getDuration(path, TimeUnit.MILLISECONDS)
|
||||
|
||||
def getNanosDuration(path: String): FiniteDuration = getDuration(path, TimeUnit.NANOSECONDS)
|
||||
|
||||
private def getDuration(path: String, unit: TimeUnit): FiniteDuration =
|
||||
Duration(config.getDuration(path, unit), unit)
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,17 +59,19 @@ trait Camel extends Extension with Activation {
|
|||
* @param config the config
|
||||
*/
|
||||
class CamelSettings private[camel] (config: Config, dynamicAccess: DynamicAccess) {
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
/**
|
||||
* Configured setting for how long the actor should wait for activation before it fails.
|
||||
*/
|
||||
final val ActivationTimeout: FiniteDuration = Duration(config.getMilliseconds("akka.camel.consumer.activation-timeout"), MILLISECONDS)
|
||||
final val ActivationTimeout: FiniteDuration = config.getMillisDuration("akka.camel.consumer.activation-timeout")
|
||||
|
||||
/**
|
||||
* Configured setting, when endpoint is out-capable (can produce responses) replyTimeout is the maximum time
|
||||
* the endpoint can take to send the response before the message exchange fails.
|
||||
* This setting is used for out-capable, in-only, manually acknowledged communication.
|
||||
*/
|
||||
final val ReplyTimeout: FiniteDuration = Duration(config.getMilliseconds("akka.camel.consumer.reply-timeout"), MILLISECONDS)
|
||||
final val ReplyTimeout: FiniteDuration = config.getMillisDuration("akka.camel.consumer.reply-timeout")
|
||||
|
||||
/**
|
||||
* Configured setting which determines whether one-way communications between an endpoint and this consumer actor
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.actor.ActorSystem
|
|||
import scala.concurrent.duration.Duration
|
||||
import java.util.concurrent.TimeUnit._
|
||||
import akka.testkit.TestKit
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
class CamelConfigSpec extends WordSpec with Matchers {
|
||||
|
||||
|
|
@ -20,7 +21,7 @@ class CamelConfigSpec extends WordSpec with Matchers {
|
|||
}
|
||||
"CamelConfigSpec" must {
|
||||
"have correct activationTimeout config" in {
|
||||
settings.ActivationTimeout should equal(Duration(config.getMilliseconds("akka.camel.consumer.activation-timeout"), MILLISECONDS))
|
||||
settings.ActivationTimeout should equal(config.getMillisDuration("akka.camel.consumer.activation-timeout"))
|
||||
}
|
||||
|
||||
"have correct autoAck config" in {
|
||||
|
|
@ -28,7 +29,7 @@ class CamelConfigSpec extends WordSpec with Matchers {
|
|||
}
|
||||
|
||||
"have correct replyTimeout config" in {
|
||||
settings.ReplyTimeout should equal(Duration(config.getMilliseconds("akka.camel.consumer.reply-timeout"), MILLISECONDS))
|
||||
settings.ReplyTimeout should equal(config.getMillisDuration("akka.camel.consumer.reply-timeout"))
|
||||
}
|
||||
|
||||
"have correct streamingCache config" in {
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import akka.actor.Address
|
|||
import akka.actor.AddressFromURIString
|
||||
import akka.dispatch.Dispatchers
|
||||
import akka.util.Helpers.Requiring
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
import akka.japi.Util.immutableSeq
|
||||
|
||||
|
|
@ -23,10 +24,10 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
val FailureDetectorConfig: Config = cc.getConfig("failure-detector")
|
||||
val FailureDetectorImplementationClass: String = FailureDetectorConfig.getString("implementation-class")
|
||||
val HeartbeatInterval: FiniteDuration = {
|
||||
Duration(FailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
|
||||
FailureDetectorConfig.getMillisDuration("heartbeat-interval")
|
||||
} requiring (_ > Duration.Zero, "failure-detector.heartbeat-interval must be > 0")
|
||||
val HeartbeatExpectedResponseAfter: FiniteDuration = {
|
||||
Duration(FailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS)
|
||||
FailureDetectorConfig.getMillisDuration("expected-response-after")
|
||||
} requiring (_ > Duration.Zero, "failure-detector.expected-response-after > 0")
|
||||
val MonitoredByNrOfMembers: Int = {
|
||||
FailureDetectorConfig.getInt("monitored-by-nr-of-members")
|
||||
|
|
@ -34,26 +35,26 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
|
||||
val SeedNodes: immutable.IndexedSeq[Address] =
|
||||
immutableSeq(cc.getStringList("seed-nodes")).map { case AddressFromURIString(addr) ⇒ addr }.toVector
|
||||
val SeedNodeTimeout: FiniteDuration = Duration(cc.getMilliseconds("seed-node-timeout"), MILLISECONDS)
|
||||
val SeedNodeTimeout: FiniteDuration = cc.getMillisDuration("seed-node-timeout")
|
||||
val RetryUnsuccessfulJoinAfter: Duration = {
|
||||
val key = "retry-unsuccessful-join-after"
|
||||
cc.getString(key).toLowerCase match {
|
||||
case "off" ⇒ Duration.Undefined
|
||||
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ > Duration.Zero, key + " > 0s, or off")
|
||||
case _ ⇒ cc.getMillisDuration(key) requiring (_ > Duration.Zero, key + " > 0s, or off")
|
||||
}
|
||||
}
|
||||
val PeriodicTasksInitialDelay: FiniteDuration = Duration(cc.getMilliseconds("periodic-tasks-initial-delay"), MILLISECONDS)
|
||||
val GossipInterval: FiniteDuration = Duration(cc.getMilliseconds("gossip-interval"), MILLISECONDS)
|
||||
val PeriodicTasksInitialDelay: FiniteDuration = cc.getMillisDuration("periodic-tasks-initial-delay")
|
||||
val GossipInterval: FiniteDuration = cc.getMillisDuration("gossip-interval")
|
||||
val GossipTimeToLive: FiniteDuration = {
|
||||
Duration(cc.getMilliseconds("gossip-time-to-live"), MILLISECONDS)
|
||||
cc.getMillisDuration("gossip-time-to-live")
|
||||
} requiring (_ > Duration.Zero, "gossip-time-to-live must be > 0")
|
||||
val LeaderActionsInterval: FiniteDuration = Duration(cc.getMilliseconds("leader-actions-interval"), MILLISECONDS)
|
||||
val UnreachableNodesReaperInterval: FiniteDuration = Duration(cc.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||
val LeaderActionsInterval: FiniteDuration = cc.getMillisDuration("leader-actions-interval")
|
||||
val UnreachableNodesReaperInterval: FiniteDuration = cc.getMillisDuration("unreachable-nodes-reaper-interval")
|
||||
val PublishStatsInterval: Duration = {
|
||||
val key = "publish-stats-interval"
|
||||
cc.getString(key).toLowerCase match {
|
||||
case "off" ⇒ Duration.Undefined
|
||||
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ >= Duration.Zero, key + " >= 0s, or off")
|
||||
case _ ⇒ cc.getMillisDuration(key) requiring (_ >= Duration.Zero, key + " >= 0s, or off")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -63,7 +64,7 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
val key = "auto-down-unreachable-after"
|
||||
cc.getString(key).toLowerCase match {
|
||||
case "off" ⇒ if (AutoDown) Duration.Zero else Duration.Undefined
|
||||
case _ ⇒ Duration(cc.getMilliseconds(key), MILLISECONDS) requiring (_ >= Duration.Zero, key + " >= 0s, or off")
|
||||
case _ ⇒ cc.getMillisDuration(key) requiring (_ >= Duration.Zero, key + " >= 0s, or off")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -84,16 +85,16 @@ final class ClusterSettings(val config: Config, val systemName: String) {
|
|||
}
|
||||
val GossipDifferentViewProbability: Double = cc.getDouble("gossip-different-view-probability")
|
||||
val ReduceGossipDifferentViewProbability: Int = cc.getInt("reduce-gossip-different-view-probability")
|
||||
val SchedulerTickDuration: FiniteDuration = Duration(cc.getMilliseconds("scheduler.tick-duration"), MILLISECONDS)
|
||||
val SchedulerTickDuration: FiniteDuration = cc.getMillisDuration("scheduler.tick-duration")
|
||||
val SchedulerTicksPerWheel: Int = cc.getInt("scheduler.ticks-per-wheel")
|
||||
val MetricsEnabled: Boolean = cc.getBoolean("metrics.enabled")
|
||||
val MetricsCollectorClass: String = cc.getString("metrics.collector-class")
|
||||
val MetricsInterval: FiniteDuration = {
|
||||
Duration(cc.getMilliseconds("metrics.collect-interval"), MILLISECONDS)
|
||||
cc.getMillisDuration("metrics.collect-interval")
|
||||
} requiring (_ > Duration.Zero, "metrics.collect-interval must be > 0")
|
||||
val MetricsGossipInterval: FiniteDuration = Duration(cc.getMilliseconds("metrics.gossip-interval"), MILLISECONDS)
|
||||
val MetricsGossipInterval: FiniteDuration = cc.getMillisDuration("metrics.gossip-interval")
|
||||
val MetricsMovingAverageHalfLife: FiniteDuration = {
|
||||
Duration(cc.getMilliseconds("metrics.moving-average-half-life"), MILLISECONDS)
|
||||
cc.getMillisDuration("metrics.moving-average-half-life")
|
||||
} requiring (_ > Duration.Zero, "metrics.moving-average-half-life must be > 0")
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -39,6 +39,7 @@ import akka.testkit._
|
|||
import akka.testkit.TestEvent._
|
||||
import akka.actor.Identify
|
||||
import akka.actor.ActorIdentity
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.util.Helpers.Requiring
|
||||
import java.lang.management.ManagementFactory
|
||||
|
||||
|
|
@ -169,8 +170,6 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
private val testConfig = conf.getConfig("akka.test.cluster-stress-spec")
|
||||
import testConfig._
|
||||
|
||||
private def getDuration(name: String): FiniteDuration = Duration(getMilliseconds(name), MILLISECONDS)
|
||||
|
||||
val infolog = getBoolean("infolog")
|
||||
val nFactor = getInt("nr-of-nodes-factor")
|
||||
val numberOfSeedNodes = getInt("nr-of-seed-nodes") // not scaled by nodes factor
|
||||
|
|
@ -192,19 +191,19 @@ private[cluster] object StressMultiJvmSpec extends MultiNodeConfig {
|
|||
val numberOfNodesJoinRemove = getInt("nr-of-nodes-join-remove") // not scaled by nodes factor
|
||||
|
||||
val workBatchSize = getInt("work-batch-size")
|
||||
val workBatchInterval = Duration(getMilliseconds("work-batch-interval"), MILLISECONDS)
|
||||
val workBatchInterval = testConfig.getMillisDuration("work-batch-interval")
|
||||
val payloadSize = getInt("payload-size")
|
||||
val dFactor = getInt("duration-factor")
|
||||
val joinRemoveDuration = getDuration("join-remove-duration") * dFactor
|
||||
val normalThroughputDuration = getDuration("normal-throughput-duration") * dFactor
|
||||
val highThroughputDuration = getDuration("high-throughput-duration") * dFactor
|
||||
val supervisionDuration = getDuration("supervision-duration") * dFactor
|
||||
val supervisionOneIteration = getDuration("supervision-one-iteration") * dFactor
|
||||
val idleGossipDuration = getDuration("idle-gossip-duration") * dFactor
|
||||
val expectedTestDuration = getDuration("expected-test-duration") * dFactor
|
||||
val joinRemoveDuration = testConfig.getMillisDuration("join-remove-duration") * dFactor
|
||||
val normalThroughputDuration = testConfig.getMillisDuration("normal-throughput-duration") * dFactor
|
||||
val highThroughputDuration = testConfig.getMillisDuration("high-throughput-duration") * dFactor
|
||||
val supervisionDuration = testConfig.getMillisDuration("supervision-duration") * dFactor
|
||||
val supervisionOneIteration = testConfig.getMillisDuration("supervision-one-iteration") * dFactor
|
||||
val idleGossipDuration = testConfig.getMillisDuration("idle-gossip-duration") * dFactor
|
||||
val expectedTestDuration = testConfig.getMillisDuration("expected-test-duration") * dFactor
|
||||
val treeWidth = getInt("tree-width")
|
||||
val treeLevels = getInt("tree-levels")
|
||||
val reportMetricsInterval = getDuration("report-metrics-interval")
|
||||
val reportMetricsInterval = testConfig.getMillisDuration("report-metrics-interval")
|
||||
val convergenceWithinFactor = getDouble("convergence-within-factor")
|
||||
val exerciseActors = getBoolean("exercise-actors")
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.testkit.AkkaSpec
|
|||
import akka.dispatch.Dispatchers
|
||||
import scala.concurrent.duration._
|
||||
import akka.remote.PhiAccrualFailureDetector
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
class ClusterConfigSpec extends AkkaSpec {
|
||||
|
|
@ -21,8 +22,8 @@ class ClusterConfigSpec extends AkkaSpec {
|
|||
LogInfo should be(true)
|
||||
FailureDetectorConfig.getDouble("threshold") should be(8.0 +- 0.0001)
|
||||
FailureDetectorConfig.getInt("max-sample-size") should be(1000)
|
||||
Duration(FailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) should be(100 millis)
|
||||
Duration(FailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) should be(3 seconds)
|
||||
FailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis)
|
||||
FailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(3 seconds)
|
||||
FailureDetectorImplementationClass should be(classOf[PhiAccrualFailureDetector].getName)
|
||||
SeedNodes should be(Seq.empty[String])
|
||||
SeedNodeTimeout should be(5 seconds)
|
||||
|
|
|
|||
|
|
@ -19,7 +19,7 @@ Akka uses the `Typesafe Config Library
|
|||
for the configuration of your own application or library built with or without
|
||||
Akka. This library is implemented in Java with no external dependencies; you
|
||||
should have a look at its documentation (in particular about `ConfigFactory
|
||||
<http://typesafehub.github.com/config/latest/api/com/typesafe/config/ConfigFactory.html>`_),
|
||||
<http://typesafehub.github.io/config/v1.2.0/com/typesafe/config/ConfigFactory.html>`_),
|
||||
which is only summarized in the following.
|
||||
|
||||
.. warning::
|
||||
|
|
|
|||
|
|
@ -56,14 +56,15 @@ class TestConductorExt(val system: ExtendedActorSystem) extends Extension with C
|
|||
|
||||
object Settings {
|
||||
val config = system.settings.config.getConfig("akka.testconductor")
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
val ConnectTimeout = Duration(config.getMilliseconds("connect-timeout"), MILLISECONDS)
|
||||
val ConnectTimeout = config.getMillisDuration("connect-timeout")
|
||||
val ClientReconnects = config.getInt("client-reconnects")
|
||||
val ReconnectBackoff = Duration(config.getMilliseconds("reconnect-backoff"), MILLISECONDS)
|
||||
val ReconnectBackoff = config.getMillisDuration("reconnect-backoff")
|
||||
|
||||
implicit val BarrierTimeout = Timeout(Duration(config.getMilliseconds("barrier-timeout"), MILLISECONDS))
|
||||
implicit val QueryTimeout = Timeout(Duration(config.getMilliseconds("query-timeout"), MILLISECONDS))
|
||||
val PacketSplitThreshold = Duration(config.getMilliseconds("packet-split-threshold"), MILLISECONDS)
|
||||
implicit val BarrierTimeout = Timeout(config.getMillisDuration("barrier-timeout"))
|
||||
implicit val QueryTimeout = Timeout(config.getMillisDuration("query-timeout"))
|
||||
val PacketSplitThreshold = config.getMillisDuration("packet-split-threshold")
|
||||
|
||||
private def computeWPS(config: Config): Int =
|
||||
ThreadPoolConfig.scaledPoolSize(
|
||||
|
|
|
|||
|
|
@ -12,6 +12,7 @@ import scala.concurrent.duration.FiniteDuration
|
|||
import scala.collection.immutable
|
||||
import com.typesafe.config.Config
|
||||
import akka.event.EventStream
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
/**
|
||||
* Implementation of 'The Phi Accrual Failure Detector' by Hayashibara et al. as defined in their paper:
|
||||
|
|
@ -71,9 +72,9 @@ class PhiAccrualFailureDetector(
|
|||
this(
|
||||
threshold = config.getDouble("threshold"),
|
||||
maxSampleSize = config.getInt("max-sample-size"),
|
||||
minStdDeviation = Duration(config.getMilliseconds("min-std-deviation"), MILLISECONDS),
|
||||
acceptableHeartbeatPause = Duration(config.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS),
|
||||
firstHeartbeatEstimate = Duration(config.getMilliseconds("heartbeat-interval"), MILLISECONDS))
|
||||
minStdDeviation = config.getMillisDuration("min-std-deviation"),
|
||||
acceptableHeartbeatPause = config.getMillisDuration("acceptable-heartbeat-pause"),
|
||||
firstHeartbeatEstimate = config.getMillisDuration("heartbeat-interval"))
|
||||
|
||||
require(threshold > 0.0, "failure-detector.threshold must be > 0")
|
||||
require(maxSampleSize > 0, "failure-detector.max-sample-size must be > 0")
|
||||
|
|
|
|||
|
|
@ -8,6 +8,7 @@ import scala.concurrent.duration._
|
|||
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||
import akka.util.Timeout
|
||||
import scala.collection.immutable
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.util.Helpers.Requiring
|
||||
import akka.japi.Util._
|
||||
import akka.actor.Props
|
||||
|
|
@ -44,23 +45,23 @@ final class RemoteSettings(val config: Config) {
|
|||
def configureDispatcher(props: Props): Props = if (Dispatcher.isEmpty) props else props.withDispatcher(Dispatcher)
|
||||
|
||||
val ShutdownTimeout: Timeout = {
|
||||
Timeout(Duration(getMilliseconds("akka.remote.shutdown-timeout"), MILLISECONDS))
|
||||
Timeout(config.getMillisDuration("akka.remote.shutdown-timeout"))
|
||||
} requiring (_.duration > Duration.Zero, "shutdown-timeout must be > 0")
|
||||
|
||||
val FlushWait: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.flush-wait-on-shutdown"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.flush-wait-on-shutdown")
|
||||
} requiring (_ > Duration.Zero, "flush-wait-on-shutdown must be > 0")
|
||||
|
||||
val StartupTimeout: Timeout = {
|
||||
Timeout(Duration(getMilliseconds("akka.remote.startup-timeout"), MILLISECONDS))
|
||||
Timeout(config.getMillisDuration("akka.remote.startup-timeout"))
|
||||
} requiring (_.duration > Duration.Zero, "startup-timeout must be > 0")
|
||||
|
||||
val RetryGateClosedFor: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.retry-gate-closed-for"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.retry-gate-closed-for")
|
||||
} requiring (_ >= Duration.Zero, "retry-gate-closed-for must be >= 0")
|
||||
|
||||
val UnknownAddressGateClosedFor: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.gate-invalid-addresses-for"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.gate-invalid-addresses-for")
|
||||
} requiring (_ > Duration.Zero, "gate-invalid-addresses-for must be > 0")
|
||||
|
||||
val UsePassiveConnections: Boolean = getBoolean("akka.remote.use-passive-connections")
|
||||
|
|
@ -70,19 +71,19 @@ final class RemoteSettings(val config: Config) {
|
|||
} requiring (_ > 0, "maximum-retries-in-window must be > 0")
|
||||
|
||||
val RetryWindow: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.retry-window"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.retry-window")
|
||||
} requiring (_ > Duration.Zero, "retry-window must be > 0")
|
||||
|
||||
val BackoffPeriod: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.backoff-interval"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.backoff-interval")
|
||||
} requiring (_ > Duration.Zero, "backoff-interval must be > 0")
|
||||
|
||||
val SysMsgAckTimeout: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.system-message-ack-piggyback-timeout"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.system-message-ack-piggyback-timeout")
|
||||
} requiring (_ > Duration.Zero, "system-message-ack-piggyback-timeout must be > 0")
|
||||
|
||||
val SysResendTimeout: FiniteDuration = {
|
||||
Duration(getMilliseconds("akka.remote.resend-interval"), MILLISECONDS)
|
||||
config.getMillisDuration("akka.remote.resend-interval")
|
||||
} requiring (_ > Duration.Zero, "resend-interval must be > 0")
|
||||
|
||||
val SysMsgBufferSize: Int = {
|
||||
|
|
@ -91,24 +92,24 @@ final class RemoteSettings(val config: Config) {
|
|||
|
||||
val QuarantineDuration: Duration = {
|
||||
if (getString("akka.remote.quarantine-systems-for") == "off") Duration.Undefined
|
||||
else Duration(getMilliseconds("akka.remote.quarantine-systems-for"), MILLISECONDS).requiring(_ > Duration.Zero,
|
||||
else config.getMillisDuration("akka.remote.quarantine-systems-for").requiring(_ > Duration.Zero,
|
||||
"quarantine-systems-for must be > 0 or off")
|
||||
}
|
||||
|
||||
val CommandAckTimeout: Timeout = {
|
||||
Timeout(Duration(getMilliseconds("akka.remote.command-ack-timeout"), MILLISECONDS))
|
||||
Timeout(config.getMillisDuration("akka.remote.command-ack-timeout"))
|
||||
} requiring (_.duration > Duration.Zero, "command-ack-timeout must be > 0")
|
||||
|
||||
val WatchFailureDetectorConfig: Config = getConfig("akka.remote.watch-failure-detector")
|
||||
val WatchFailureDetectorImplementationClass: String = WatchFailureDetectorConfig.getString("implementation-class")
|
||||
val WatchHeartBeatInterval: FiniteDuration = {
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
|
||||
WatchFailureDetectorConfig.getMillisDuration("heartbeat-interval")
|
||||
} requiring (_ > Duration.Zero, "watch-failure-detector.heartbeat-interval must be > 0")
|
||||
val WatchUnreachableReaperInterval: FiniteDuration = {
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("unreachable-nodes-reaper-interval"), MILLISECONDS)
|
||||
WatchFailureDetectorConfig.getMillisDuration("unreachable-nodes-reaper-interval")
|
||||
} requiring (_ > Duration.Zero, "watch-failure-detector.unreachable-nodes-reaper-interval must be > 0")
|
||||
val WatchHeartbeatExpectedResponseAfter: FiniteDuration = {
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("expected-response-after"), MILLISECONDS)
|
||||
WatchFailureDetectorConfig.getMillisDuration("expected-response-after")
|
||||
} requiring (_ > Duration.Zero, "watch-failure-detector.expected-response-after > 0")
|
||||
|
||||
val Transports: immutable.Seq[(String, immutable.Seq[String], Config)] = transportNames.map { name ⇒
|
||||
|
|
|
|||
|
|
@ -31,12 +31,13 @@ class AkkaProtocolException(msg: String, cause: Throwable) extends AkkaException
|
|||
|
||||
private[remote] class AkkaProtocolSettings(config: Config) {
|
||||
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import config._
|
||||
|
||||
val TransportFailureDetectorConfig: Config = getConfig("akka.remote.transport-failure-detector")
|
||||
val TransportFailureDetectorImplementationClass: String = TransportFailureDetectorConfig.getString("implementation-class")
|
||||
val TransportHeartBeatInterval: FiniteDuration = {
|
||||
Duration(TransportFailureDetectorConfig.getMilliseconds("heartbeat-interval"), MILLISECONDS)
|
||||
TransportFailureDetectorConfig.getMillisDuration("heartbeat-interval")
|
||||
} requiring (_ > Duration.Zero, "transport-failure-detector.heartbeat-interval must be > 0")
|
||||
|
||||
val RequireCookie: Boolean = getBoolean("akka.remote.require-cookie")
|
||||
|
|
|
|||
|
|
@ -71,6 +71,7 @@ class NettyTransportException(msg: String, cause: Throwable) extends RuntimeExce
|
|||
|
||||
class NettyTransportSettings(config: Config) {
|
||||
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import config._
|
||||
|
||||
val TransportMode: Mode = getString("transport-protocol") match {
|
||||
|
|
@ -92,7 +93,7 @@ class NettyTransportSettings(config: Config) {
|
|||
case other ⇒ Some(other)
|
||||
}
|
||||
|
||||
val ConnectionTimeout: FiniteDuration = Duration(getMilliseconds("connection-timeout"), MILLISECONDS)
|
||||
val ConnectionTimeout: FiniteDuration = config.getMillisDuration("connection-timeout")
|
||||
|
||||
val WriteBufferHighWaterMark: Option[Int] = optionSize("write-buffer-high-water-mark")
|
||||
|
||||
|
|
|
|||
|
|
@ -9,6 +9,7 @@ import akka.actor.ExtendedActorSystem
|
|||
import scala.concurrent.duration._
|
||||
import akka.remote.transport.AkkaProtocolSettings
|
||||
import akka.util.{ Timeout, Helpers }
|
||||
import akka.util.Helpers.ConfigOps
|
||||
import akka.remote.transport.netty.{ NettyTransportSettings, SSLSettings }
|
||||
|
||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||
|
|
@ -57,8 +58,8 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
WatchUnreachableReaperInterval should be(1 second)
|
||||
WatchFailureDetectorConfig.getDouble("threshold") should be(10.0 +- 0.0001)
|
||||
WatchFailureDetectorConfig.getInt("max-sample-size") should be(200)
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) should be(4 seconds)
|
||||
Duration(WatchFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) should be(100 millis)
|
||||
WatchFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(4 seconds)
|
||||
WatchFailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis)
|
||||
|
||||
remoteSettings.config.getString("akka.remote.log-frame-size-exceeding") should be("off")
|
||||
}
|
||||
|
|
@ -74,8 +75,8 @@ class RemoteConfigSpec extends AkkaSpec(
|
|||
TransportHeartBeatInterval should equal(1.seconds)
|
||||
TransportFailureDetectorConfig.getDouble("threshold") should be(7.0 +- 0.0001)
|
||||
TransportFailureDetectorConfig.getInt("max-sample-size") should be(100)
|
||||
Duration(TransportFailureDetectorConfig.getMilliseconds("acceptable-heartbeat-pause"), MILLISECONDS) should be(3 seconds)
|
||||
Duration(TransportFailureDetectorConfig.getMilliseconds("min-std-deviation"), MILLISECONDS) should be(100 millis)
|
||||
TransportFailureDetectorConfig.getMillisDuration("acceptable-heartbeat-pause") should be(3 seconds)
|
||||
TransportFailureDetectorConfig.getMillisDuration("min-std-deviation") should be(100 millis)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,10 +17,10 @@ object TestKitExtension extends ExtensionId[TestKitSettings] {
|
|||
|
||||
class TestKitSettings(val config: Config) extends Extension {
|
||||
|
||||
import config._
|
||||
import akka.util.Helpers.ConfigOps
|
||||
|
||||
val TestTimeFactor = getDouble("akka.test.timefactor")
|
||||
val SingleExpectDefaultTimeout: FiniteDuration = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
|
||||
val TestEventFilterLeeway: FiniteDuration = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
|
||||
val DefaultTimeout: Timeout = Timeout(Duration(getMilliseconds("akka.test.default-timeout"), MILLISECONDS))
|
||||
val TestTimeFactor = config.getDouble("akka.test.timefactor")
|
||||
val SingleExpectDefaultTimeout: FiniteDuration = config.getMillisDuration("akka.test.single-expect-default")
|
||||
val TestEventFilterLeeway: FiniteDuration = config.getMillisDuration("akka.test.filter-leeway")
|
||||
val DefaultTimeout: Timeout = Timeout(config.getMillisDuration("akka.test.default-timeout"))
|
||||
}
|
||||
|
|
|
|||
|
|
@ -1158,7 +1158,7 @@ object Dependencies {
|
|||
// Compile
|
||||
val camelCore = "org.apache.camel" % "camel-core" % "2.10.3" exclude("org.slf4j", "slf4j-api") // ApacheV2
|
||||
|
||||
val config = "com.typesafe" % "config" % "1.0.2" // ApacheV2
|
||||
val config = "com.typesafe" % "config" % "1.2.0" // ApacheV2
|
||||
val netty = "io.netty" % "netty" % "3.8.0.Final" // ApacheV2
|
||||
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.5.0" // New BSD
|
||||
val scalaStm = "org.scala-stm" %% "scala-stm" % scalaStmVersion // Modified BSD (Scala)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue