Remove default time unit in config. All durations explicit. See #1363
* Also changed in dispatcher to use explicit Duration instead of Int/Long
This commit is contained in:
parent
1543594c78
commit
e5f8a41cb8
29 changed files with 166 additions and 154 deletions
|
|
@ -16,10 +16,10 @@ class ClusterSpec extends AkkaSpec {
|
||||||
getString("akka.cluster.name") must equal("test-cluster")
|
getString("akka.cluster.name") must equal("test-cluster")
|
||||||
getString("akka.cluster.zookeeper-server-addresses") must equal("localhost:2181")
|
getString("akka.cluster.zookeeper-server-addresses") must equal("localhost:2181")
|
||||||
getInt("akka.remote.server.port") must equal(2552)
|
getInt("akka.remote.server.port") must equal(2552)
|
||||||
getInt("akka.cluster.max-time-to-wait-until-connected") must equal(30)
|
getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000)
|
||||||
getInt("akka.cluster.session-timeout") must equal(60)
|
getMilliseconds("akka.cluster.session-timeout") must equal(60 * 1000)
|
||||||
getInt("akka.cluster.connection-timeout") must equal(60)
|
getMilliseconds("akka.cluster.connection-timeout") must equal(60 * 1000)
|
||||||
getInt("akka.remote.remote-daemon-ack-timeout") must equal(30)
|
getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000)
|
||||||
getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true)
|
getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true)
|
||||||
getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport")
|
getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport")
|
||||||
getString("akka.remote.secure-cookie") must equal("")
|
getString("akka.remote.secure-cookie") must equal("")
|
||||||
|
|
@ -32,12 +32,12 @@ class ClusterSpec extends AkkaSpec {
|
||||||
getInt("akka.cluster.replication.ensemble-size") must equal(3)
|
getInt("akka.cluster.replication.ensemble-size") must equal(3)
|
||||||
getInt("akka.cluster.replication.quorum-size") must equal(2)
|
getInt("akka.cluster.replication.quorum-size") must equal(2)
|
||||||
getInt("akka.cluster.replication.snapshot-frequency") must equal(1000)
|
getInt("akka.cluster.replication.snapshot-frequency") must equal(1000)
|
||||||
getInt("akka.cluster.replication.timeout") must equal(30)
|
getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000)
|
||||||
|
|
||||||
//akka.remote.server
|
//akka.remote.server
|
||||||
getInt("akka.remote.server.port") must equal(2552)
|
getInt("akka.remote.server.port") must equal(2552)
|
||||||
getInt("akka.remote.server.message-frame-size") must equal(1048576)
|
getInt("akka.remote.server.message-frame-size") must equal(1048576)
|
||||||
getInt("akka.remote.server.connection-timeout") must equal(120)
|
getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000)
|
||||||
getBoolean("akka.remote.server.require-cookie") must equal(false)
|
getBoolean("akka.remote.server.require-cookie") must equal(false)
|
||||||
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
|
getBoolean("akka.remote.server.untrusted-mode") must equal(false)
|
||||||
getInt("akka.remote.server.backlog") must equal(4096)
|
getInt("akka.remote.server.backlog") must equal(4096)
|
||||||
|
|
@ -45,10 +45,10 @@ class ClusterSpec extends AkkaSpec {
|
||||||
//akka.remote.client
|
//akka.remote.client
|
||||||
getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false)
|
getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false)
|
||||||
getInt("akka.remote.client.buffering.capacity") must equal(-1)
|
getInt("akka.remote.client.buffering.capacity") must equal(-1)
|
||||||
getInt("akka.remote.client.reconnect-delay") must equal(5)
|
getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000)
|
||||||
getInt("akka.remote.client.read-timeout") must equal(3600)
|
getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000)
|
||||||
getInt("akka.remote.client.reap-futures-delay") must equal(5)
|
getMilliseconds("akka.remote.client.reap-futures-delay") must equal(5 * 1000)
|
||||||
getInt("akka.remote.client.reconnection-time-window") must equal(600)
|
getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -142,7 +142,7 @@ object ActorModelSpec {
|
||||||
|
|
||||||
def assertDispatcher(dispatcher: MessageDispatcherInterceptor)(
|
def assertDispatcher(dispatcher: MessageDispatcherInterceptor)(
|
||||||
stops: Long = dispatcher.stops.get())(implicit system: ActorSystem) {
|
stops: Long = dispatcher.stops.get())(implicit system: ActorSystem) {
|
||||||
val deadline = System.currentTimeMillis + dispatcher.timeoutMs * 5
|
val deadline = System.currentTimeMillis + dispatcher.shutdownTimeout.toMillis * 5
|
||||||
try {
|
try {
|
||||||
await(deadline)(stops == dispatcher.stops.get)
|
await(deadline)(stops == dispatcher.stops.get)
|
||||||
} catch {
|
} catch {
|
||||||
|
|
@ -421,8 +421,8 @@ class DispatcherModelSpec extends ActorModelSpec {
|
||||||
|
|
||||||
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒
|
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput,
|
new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput,
|
||||||
system.dispatcherFactory.ThroughputDeadlineTimeMillis, system.dispatcherFactory.MailboxType,
|
system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType,
|
||||||
config, system.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor,
|
config, system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor,
|
||||||
ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor]
|
ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor]
|
||||||
|
|
||||||
def dispatcherType = "Dispatcher"
|
def dispatcherType = "Dispatcher"
|
||||||
|
|
@ -458,8 +458,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec {
|
||||||
|
|
||||||
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒
|
def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new BalancingDispatcher(system.dispatcherFactory.prerequisites, "foo", 1, // TODO check why 1 here? (came from old test)
|
new BalancingDispatcher(system.dispatcherFactory.prerequisites, "foo", 1, // TODO check why 1 here? (came from old test)
|
||||||
system.dispatcherFactory.ThroughputDeadlineTimeMillis, system.dispatcherFactory.MailboxType,
|
system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType,
|
||||||
config, system.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor,
|
config, system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor,
|
||||||
ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor]
|
ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor]
|
||||||
|
|
||||||
def dispatcherType = "Balancing Dispatcher"
|
def dispatcherType = "Balancing Dispatcher"
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,8 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger }
|
||||||
import akka.testkit.{ filterEvents, EventFilter, AkkaSpec }
|
import akka.testkit.{ filterEvents, EventFilter, AkkaSpec }
|
||||||
import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher }
|
import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher }
|
||||||
import akka.actor.{ Props, Actor }
|
import akka.actor.{ Props, Actor }
|
||||||
|
import akka.util.Duration
|
||||||
|
import akka.util.duration._
|
||||||
|
|
||||||
object DispatcherActorSpec {
|
object DispatcherActorSpec {
|
||||||
class TestActor extends Actor {
|
class TestActor extends Actor {
|
||||||
|
|
@ -48,7 +50,7 @@ class DispatcherActorSpec extends AkkaSpec {
|
||||||
|
|
||||||
"respect the throughput setting" in {
|
"respect the throughput setting" in {
|
||||||
val throughputDispatcher = system.dispatcherFactory.
|
val throughputDispatcher = system.dispatcherFactory.
|
||||||
newDispatcher("THROUGHPUT", 101, 0, system.dispatcherFactory.MailboxType).
|
newDispatcher("THROUGHPUT", 101, Duration.Zero, system.dispatcherFactory.MailboxType).
|
||||||
setCorePoolSize(1).
|
setCorePoolSize(1).
|
||||||
build
|
build
|
||||||
|
|
||||||
|
|
@ -75,9 +77,9 @@ class DispatcherActorSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
"respect throughput deadline" in {
|
"respect throughput deadline" in {
|
||||||
val deadlineMs = 100
|
val deadline = 100 millis
|
||||||
val throughputDispatcher = system.dispatcherFactory.
|
val throughputDispatcher = system.dispatcherFactory.
|
||||||
newDispatcher("THROUGHPUT", 2, deadlineMs, system.dispatcherFactory.MailboxType).
|
newDispatcher("THROUGHPUT", 2, deadline, system.dispatcherFactory.MailboxType).
|
||||||
setCorePoolSize(1).
|
setCorePoolSize(1).
|
||||||
build
|
build
|
||||||
val works = new AtomicBoolean(true)
|
val works = new AtomicBoolean(true)
|
||||||
|
|
@ -100,7 +102,7 @@ class DispatcherActorSpec extends AkkaSpec {
|
||||||
slowOne ! "ping"
|
slowOne ! "ping"
|
||||||
fastOne ! "ping"
|
fastOne ! "ping"
|
||||||
assert(ready.await(2, TimeUnit.SECONDS) === true)
|
assert(ready.await(2, TimeUnit.SECONDS) === true)
|
||||||
Thread.sleep(deadlineMs + 10) // wait just a bit more than the deadline
|
Thread.sleep(deadline.toMillis + 10) // wait just a bit more than the deadline
|
||||||
start.countDown()
|
start.countDown()
|
||||||
assert(latch.await(2, TimeUnit.SECONDS) === true)
|
assert(latch.await(2, TimeUnit.SECONDS) === true)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,26 +24,24 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec
|
||||||
import config._
|
import config._
|
||||||
|
|
||||||
getList("akka.boot").asScala.toSeq must equal(Nil)
|
getList("akka.boot").asScala.toSeq must equal(Nil)
|
||||||
getString("akka.time-unit") must equal("seconds")
|
|
||||||
settings.DefaultTimeUnit must equal(TimeUnit.SECONDS)
|
|
||||||
getString("akka.version") must equal("2.0-SNAPSHOT")
|
getString("akka.version") must equal("2.0-SNAPSHOT")
|
||||||
settings.ConfigVersion must equal("2.0-SNAPSHOT")
|
settings.ConfigVersion must equal("2.0-SNAPSHOT")
|
||||||
|
|
||||||
getString("akka.actor.default-dispatcher.type") must equal("Dispatcher")
|
getString("akka.actor.default-dispatcher.type") must equal("Dispatcher")
|
||||||
getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(60)
|
getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000)
|
||||||
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(8.0)
|
getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(8.0)
|
||||||
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(8.0)
|
getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(8.0)
|
||||||
getInt("akka.actor.default-dispatcher.task-queue-size") must equal(-1)
|
getInt("akka.actor.default-dispatcher.task-queue-size") must equal(-1)
|
||||||
getString("akka.actor.default-dispatcher.task-queue-type") must equal("linked")
|
getString("akka.actor.default-dispatcher.task-queue-type") must equal("linked")
|
||||||
getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true)
|
getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true)
|
||||||
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
|
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
|
||||||
getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10)
|
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
|
||||||
getLong("akka.actor.dispatcher-shutdown-timeout") must equal(1)
|
getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000)
|
||||||
settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS))
|
settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS))
|
||||||
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
|
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
|
||||||
settings.DispatcherThroughput must equal(5)
|
settings.DispatcherThroughput must equal(5)
|
||||||
getInt("akka.actor.default-dispatcher.throughput-deadline-time") must equal(-1)
|
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)
|
||||||
settings.DispatcherThroughputDeadlineTime must equal(Duration(-1, TimeUnit.SECONDS))
|
settings.DispatcherThroughputDeadlineTime must equal(Duration.Zero)
|
||||||
getBoolean("akka.actor.serialize-messages") must equal(false)
|
getBoolean("akka.actor.serialize-messages") must equal(false)
|
||||||
settings.SerializeAllMessages must equal(false)
|
settings.SerializeAllMessages must equal(false)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,6 +2,7 @@ package akka.dispatch
|
||||||
|
|
||||||
import akka.actor.{ Props, LocalActorRef, Actor }
|
import akka.actor.{ Props, LocalActorRef, Actor }
|
||||||
import akka.testkit.AkkaSpec
|
import akka.testkit.AkkaSpec
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
class PriorityDispatcherSpec extends AkkaSpec {
|
class PriorityDispatcherSpec extends AkkaSpec {
|
||||||
|
|
@ -23,7 +24,7 @@ class PriorityDispatcherSpec extends AkkaSpec {
|
||||||
}
|
}
|
||||||
|
|
||||||
def testOrdering(mboxType: MailboxType) {
|
def testOrdering(mboxType: MailboxType) {
|
||||||
val dispatcher = system.dispatcherFactory.newDispatcher("Test", 1, -1, mboxType).build
|
val dispatcher = system.dispatcherFactory.newDispatcher("Test", 1, Duration.Zero, mboxType).build
|
||||||
|
|
||||||
val actor = actorOf(Props(new Actor {
|
val actor = actorOf(Props(new Actor {
|
||||||
var acc: List[Int] = Nil
|
var acc: List[Int] = Nil
|
||||||
|
|
|
||||||
|
|
@ -6,6 +6,7 @@ import akka.actor._
|
||||||
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
|
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
|
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
|
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
|
||||||
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
|
||||||
|
|
@ -14,7 +15,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
|
||||||
|
|
||||||
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒
|
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
|
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
|
||||||
0, UnboundedMailbox(), config, 60000), ThreadPoolConfig())
|
Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig())
|
||||||
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
|
||||||
.setCorePoolSize(maxClients)
|
.setCorePoolSize(maxClients)
|
||||||
.build
|
.build
|
||||||
|
|
|
||||||
|
|
@ -12,8 +12,6 @@ akka {
|
||||||
|
|
||||||
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["cluster", "camel", "http"]
|
enabled-modules = [] # Comma separated list of the enabled modules. Options: ["cluster", "camel", "http"]
|
||||||
|
|
||||||
time-unit = "seconds" # Time unit for all timeout properties throughout the config
|
|
||||||
|
|
||||||
event-handlers = ["akka.event.Logging$DefaultLogger"] # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
|
event-handlers = ["akka.event.Logging$DefaultLogger"] # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT)
|
||||||
loglevel = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG
|
loglevel = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG
|
||||||
# this level is used by the configured loggers (see "event-handlers") as soon
|
# this level is used by the configured loggers (see "event-handlers") as soon
|
||||||
|
|
@ -25,7 +23,7 @@ akka {
|
||||||
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
||||||
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
||||||
name = "EventHandlerDispatcher" # Optional, will be a generated UUID if omitted
|
name = "EventHandlerDispatcher" # Optional, will be a generated UUID if omitted
|
||||||
keep-alive-time = 60 # Keep alive time for threads
|
keep-alive-time = 60s # Keep alive time for threads
|
||||||
core-pool-size = 1 # No of core threads
|
core-pool-size = 1 # No of core threads
|
||||||
max-pool-size = 8 # Max no of threads
|
max-pool-size = 8 # Max no of threads
|
||||||
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded
|
||||||
|
|
@ -33,13 +31,12 @@ akka {
|
||||||
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
||||||
allow-core-timeout = on # Allow core threads to time out
|
allow-core-timeout = on # Allow core threads to time out
|
||||||
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
||||||
throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
|
throughput-deadline-time = 0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
|
||||||
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||||
# If positive then a bounded mailbox is used and the capacity is set using the property
|
# If positive then a bounded mailbox is used and the capacity is set using the property
|
||||||
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
|
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
|
||||||
# The following are only used for Dispatcher and only if mailbox-capacity > 0
|
# The following are only used for Dispatcher and only if mailbox-capacity > 0
|
||||||
mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
|
mailbox-push-timeout-time = 10s # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
|
||||||
# (in unit defined by the time-unit property)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
# These boot classes are loaded (and created) automatically when the Akka Microkernel boots up
|
||||||
|
|
@ -53,12 +50,12 @@ akka {
|
||||||
|
|
||||||
actor {
|
actor {
|
||||||
provider = "akka.actor.LocalActorRefProvider"
|
provider = "akka.actor.LocalActorRefProvider"
|
||||||
timeout = 5 # Default timeout for Future based invocations
|
timeout = 5s # Default timeout for Future based invocations
|
||||||
# - Actor: ask && ?
|
# - Actor: ask && ?
|
||||||
# - UntypedActor: ask
|
# - UntypedActor: ask
|
||||||
# - TypedActor: methods with non-void return type
|
# - TypedActor: methods with non-void return type
|
||||||
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability
|
||||||
dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down
|
dispatcher-shutdown-timeout = 1s # How long dispatchers by default will wait for new actors until they shut down
|
||||||
|
|
||||||
deployment {
|
deployment {
|
||||||
|
|
||||||
|
|
@ -120,20 +117,19 @@ akka {
|
||||||
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
# Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type),
|
||||||
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
# A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor
|
||||||
name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted
|
name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted
|
||||||
keep-alive-time = 60 # Keep alive time for threads
|
keep-alive-time = 60s # Keep alive time for threads
|
||||||
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
|
core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor)
|
||||||
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
|
max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor)
|
||||||
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
|
task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded)
|
||||||
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default)
|
||||||
allow-core-timeout = on # Allow core threads to time out
|
allow-core-timeout = on # Allow core threads to time out
|
||||||
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness
|
||||||
throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
|
throughput-deadline-time = -0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline
|
||||||
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||||
# If positive then a bounded mailbox is used and the capacity is set using the property
|
# If positive then a bounded mailbox is used and the capacity is set using the property
|
||||||
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
|
# NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care
|
||||||
# The following are only used for Dispatcher and only if mailbox-capacity > 0
|
# The following are only used for Dispatcher and only if mailbox-capacity > 0
|
||||||
mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
|
mailbox-push-timeout-time = 10s # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout
|
||||||
# (in unit defined by the time-unit property)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
debug {
|
debug {
|
||||||
|
|
@ -152,7 +148,7 @@ akka {
|
||||||
max-size = 2147483647 bytes
|
max-size = 2147483647 bytes
|
||||||
max-items = 2147483647
|
max-items = 2147483647
|
||||||
max-item-size = 2147483647 bytes
|
max-item-size = 2147483647 bytes
|
||||||
max-age = 0
|
max-age = 0s
|
||||||
max-journal-size = 16 megabytes
|
max-journal-size = 16 megabytes
|
||||||
max-memory-size = 128 megabytes
|
max-memory-size = 128 megabytes
|
||||||
max-journal-overflow = 10
|
max-journal-overflow = 10
|
||||||
|
|
@ -173,25 +169,25 @@ akka {
|
||||||
|
|
||||||
# Configurable timeouts for certain ops
|
# Configurable timeouts for certain ops
|
||||||
timeout {
|
timeout {
|
||||||
read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future
|
read = 3000ms # time to wait for a read to succeed before timing out the future
|
||||||
write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future
|
write = 3000ms # time to wait for a write to succeed before timing out the future
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
zookeeper {
|
zookeeper {
|
||||||
server-addresses = "127.0.0.1:2181"
|
server-addresses = "127.0.0.1:2181"
|
||||||
session-timeout = 60
|
session-timeout = 60s
|
||||||
connection-timeout = 60
|
connection-timeout = 60s
|
||||||
blocking-queue = on
|
blocking-queue = on
|
||||||
}
|
}
|
||||||
|
|
||||||
beanstalk {
|
beanstalk {
|
||||||
hostname = "127.0.0.1"
|
hostname = "127.0.0.1"
|
||||||
port = 11300
|
port = 11300
|
||||||
reconnect-window = 5
|
reconnect-window = 5s
|
||||||
message-submit-delay = 0
|
message-submit-delay = 0s
|
||||||
message-submit-timeout = 5
|
message-submit-timeout = 5s
|
||||||
message-time-to-live = 120
|
message-time-to-live = 120s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -225,7 +221,7 @@ akka {
|
||||||
secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'
|
secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh'
|
||||||
# or using 'akka.util.Crypt.generateSecureCookie'
|
# or using 'akka.util.Crypt.generateSecureCookie'
|
||||||
|
|
||||||
remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc.
|
remote-daemon-ack-timeout = 30s # Timeout for ACK of cluster operations, lik checking actor out etc.
|
||||||
|
|
||||||
use-passive-connections = on # Reuse inbound connections for outbound messages
|
use-passive-connections = on # Reuse inbound connections for outbound messages
|
||||||
|
|
||||||
|
|
@ -240,7 +236,7 @@ akka {
|
||||||
server {
|
server {
|
||||||
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)
|
port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA)
|
||||||
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
|
message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads
|
||||||
connection-timeout = 120 # Length in time-unit
|
connection-timeout = 120s # Timeout duration
|
||||||
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
|
require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)?
|
||||||
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
|
untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect.
|
||||||
backlog = 4096 # Sets the size of the connection backlog
|
backlog = 4096 # Sets the size of the connection backlog
|
||||||
|
|
@ -252,20 +248,20 @@ akka {
|
||||||
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default)
|
||||||
# If positive then a bounded mailbox is used and the capacity is set using the property
|
# If positive then a bounded mailbox is used and the capacity is set using the property
|
||||||
}
|
}
|
||||||
reconnect-delay = 5
|
reconnect-delay = 5s
|
||||||
read-timeout = 3600
|
read-timeout = 3600s
|
||||||
message-frame-size = 1048576
|
message-frame-size = 1048576
|
||||||
reap-futures-delay = 5
|
reap-futures-delay = 5s # FIXME: This is not used anywhere (except in ClusterSpec), remove?
|
||||||
reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for
|
reconnection-time-window = 600s # Maximum time window that a client should try to reconnect for
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
cluster {
|
cluster {
|
||||||
name = "test-cluster"
|
name = "test-cluster"
|
||||||
zookeeper-server-addresses = "localhost:2181" # comma-separated list of '<hostname>:<port>' elements
|
zookeeper-server-addresses = "localhost:2181" # comma-separated list of '<hostname>:<port>' elements
|
||||||
max-time-to-wait-until-connected = 30
|
max-time-to-wait-until-connected = 30s
|
||||||
session-timeout = 60
|
session-timeout = 60s
|
||||||
connection-timeout = 60
|
connection-timeout = 60s
|
||||||
include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor
|
include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor
|
||||||
# Default: on
|
# Default: on
|
||||||
log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files
|
log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files
|
||||||
|
|
@ -276,15 +272,15 @@ akka {
|
||||||
ensemble-size = 3
|
ensemble-size = 3
|
||||||
quorum-size = 2
|
quorum-size = 2
|
||||||
snapshot-frequency = 1000 # The number of messages that should be logged between every actor snapshot
|
snapshot-frequency = 1000 # The number of messages that should be logged between every actor snapshot
|
||||||
timeout = 30 # Timeout for asyncronous (write-behind) operations
|
timeout = 30s # Timeout for asyncronous (write-behind) operations
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
# TODO move to testkit-reference
|
# TODO move to testkit-reference
|
||||||
test {
|
test {
|
||||||
timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load
|
timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load
|
||||||
filter-leeway = 3 # time-units EventFilter.intercept waits after the block is finished until all required messages are received
|
filter-leeway = 3s # duration of EventFilter.intercept waits after the block is finished until all required messages are received
|
||||||
single-expect-default = 3 # time-units to wait in expectMsg and friends outside of within() block by default
|
single-expect-default = 3s # duration to wait in expectMsg and friends outside of within() block by default
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -15,7 +15,9 @@ import akka.util.ReflectiveAccess
|
||||||
import akka.serialization.Serialization
|
import akka.serialization.Serialization
|
||||||
import akka.remote.RemoteAddress
|
import akka.remote.RemoteAddress
|
||||||
import org.jboss.netty.akka.util.HashedWheelTimer
|
import org.jboss.netty.akka.util.HashedWheelTimer
|
||||||
import java.util.concurrent.{ Executors, TimeUnit }
|
import java.util.concurrent.Executors
|
||||||
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
|
import java.util.concurrent.TimeUnit.NANOSECONDS
|
||||||
import java.io.File
|
import java.io.File
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
import com.typesafe.config.ConfigParseOptions
|
import com.typesafe.config.ConfigParseOptions
|
||||||
|
|
@ -60,14 +62,12 @@ object ActorSystem {
|
||||||
|
|
||||||
val ProviderClass = getString("akka.actor.provider")
|
val ProviderClass = getString("akka.actor.provider")
|
||||||
|
|
||||||
val DefaultTimeUnit = Duration.timeUnit(getString("akka.time-unit"))
|
val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS))
|
||||||
val ActorTimeout = Timeout(Duration(getInt("akka.actor.timeout"), DefaultTimeUnit))
|
|
||||||
val ActorTimeoutMillis = ActorTimeout.duration.toMillis
|
|
||||||
val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
|
val SerializeAllMessages = getBoolean("akka.actor.serialize-messages")
|
||||||
|
|
||||||
val TestTimeFactor = getDouble("akka.test.timefactor")
|
val TestTimeFactor = getDouble("akka.test.timefactor")
|
||||||
val SingleExpectDefaultTimeout = Duration(getDouble("akka.test.single-expect-default"), DefaultTimeUnit)
|
val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS)
|
||||||
val TestEventFilterLeeway = Duration(getDouble("akka.test.filter-leeway"), DefaultTimeUnit)
|
val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS)
|
||||||
|
|
||||||
val LogLevel = getString("akka.loglevel")
|
val LogLevel = getString("akka.loglevel")
|
||||||
val StdoutLogLevel = getString("akka.stdout-loglevel")
|
val StdoutLogLevel = getString("akka.stdout-loglevel")
|
||||||
|
|
@ -79,10 +79,10 @@ object ActorSystem {
|
||||||
val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
|
val DebugEventStream = getBoolean("akka.actor.debug.event-stream")
|
||||||
|
|
||||||
val DispatcherThroughput = getInt("akka.actor.default-dispatcher.throughput")
|
val DispatcherThroughput = getInt("akka.actor.default-dispatcher.throughput")
|
||||||
val DispatcherDefaultShutdown = Duration(getLong("akka.actor.dispatcher-shutdown-timeout"), DefaultTimeUnit)
|
val DispatcherDefaultShutdown = Duration(getMilliseconds("akka.actor.dispatcher-shutdown-timeout"), MILLISECONDS)
|
||||||
val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity")
|
val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity")
|
||||||
val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time"), DefaultTimeUnit)
|
val MailboxPushTimeout = Duration(getNanoseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time"), NANOSECONDS)
|
||||||
val DispatcherThroughputDeadlineTime = Duration(getInt("akka.actor.default-dispatcher.throughput-deadline-time"), DefaultTimeUnit)
|
val DispatcherThroughputDeadlineTime = Duration(getNanoseconds("akka.actor.default-dispatcher.throughput-deadline-time"), NANOSECONDS)
|
||||||
|
|
||||||
val Home = config.getString("akka.home") match {
|
val Home = config.getString("akka.home") match {
|
||||||
case "" ⇒ None
|
case "" ⇒ None
|
||||||
|
|
@ -229,7 +229,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem {
|
||||||
}
|
}
|
||||||
|
|
||||||
// FIXME make this configurable
|
// FIXME make this configurable
|
||||||
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512))
|
val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512))
|
||||||
|
|
||||||
// TODO correctly pull its config from the config
|
// TODO correctly pull its config from the config
|
||||||
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
|
val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler))
|
||||||
|
|
|
||||||
|
|
@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
shutdownScheduleUpdater.get(this) match {
|
shutdownScheduleUpdater.get(this) match {
|
||||||
case UNSCHEDULED ⇒
|
case UNSCHEDULED ⇒
|
||||||
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
|
if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) {
|
||||||
scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS)
|
scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)
|
||||||
()
|
()
|
||||||
} else ifSensibleToDoSoThenScheduleShutdown()
|
} else ifSensibleToDoSoThenScheduleShutdown()
|
||||||
case SCHEDULED ⇒
|
case SCHEDULED ⇒
|
||||||
|
|
@ -210,17 +210,18 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
}
|
}
|
||||||
case RESCHEDULED ⇒
|
case RESCHEDULED ⇒
|
||||||
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
|
if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED))
|
||||||
scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS)
|
scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)
|
||||||
else run()
|
else run()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
|
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down,
|
||||||
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
|
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or default specified in
|
||||||
|
* akka-actor-reference.conf
|
||||||
*/
|
*/
|
||||||
protected[akka] def timeoutMs: Long
|
protected[akka] def shutdownTimeout: Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
|
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
|
||||||
|
|
@ -257,10 +258,10 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext
|
||||||
|
|
||||||
// TODO check whether this should not actually be a property of the mailbox
|
// TODO check whether this should not actually be a property of the mailbox
|
||||||
protected[akka] def throughput: Int
|
protected[akka] def throughput: Int
|
||||||
protected[akka] def throughputDeadlineTime: Int
|
protected[akka] def throughputDeadlineTime: Duration
|
||||||
|
|
||||||
@inline
|
@inline
|
||||||
protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime > 0
|
protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0
|
||||||
@inline
|
@inline
|
||||||
protected[akka] final val isThroughputDefined = throughput > 1
|
protected[akka] final val isThroughputDefined = throughput > 1
|
||||||
|
|
||||||
|
|
@ -296,7 +297,7 @@ abstract class MessageDispatcherConfigurator() {
|
||||||
val capacity = config.getInt("mailbox-capacity")
|
val capacity = config.getInt("mailbox-capacity")
|
||||||
if (capacity < 1) UnboundedMailbox()
|
if (capacity < 1) UnboundedMailbox()
|
||||||
else {
|
else {
|
||||||
val duration = Duration(config.getInt("mailbox-push-timeout-time"), settings.DefaultTimeUnit)
|
val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS)
|
||||||
BoundedMailbox(capacity, duration)
|
BoundedMailbox(capacity, duration)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -309,7 +310,7 @@ abstract class MessageDispatcherConfigurator() {
|
||||||
//Apply the following options to the config if they are present in the config
|
//Apply the following options to the config if they are present in the config
|
||||||
|
|
||||||
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
|
ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure(
|
||||||
conf_?(Some(config getInt "keep-alive-time"))(time ⇒ _.setKeepAliveTime(Duration(time, settings.DefaultTimeUnit))),
|
conf_?(Some(config getMilliseconds "keep-alive-time"))(time ⇒ _.setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))),
|
||||||
conf_?(Some(config getDouble "core-pool-size-factor"))(factor ⇒ _.setCorePoolSizeFromFactor(factor)),
|
conf_?(Some(config getDouble "core-pool-size-factor"))(factor ⇒ _.setCorePoolSizeFromFactor(factor)),
|
||||||
conf_?(Some(config getDouble "max-pool-size-factor"))(factor ⇒ _.setMaxPoolSizeFromFactor(factor)),
|
conf_?(Some(config getDouble "max-pool-size-factor"))(factor ⇒ _.setMaxPoolSizeFromFactor(factor)),
|
||||||
conf_?(Some(config getBoolean "allow-core-timeout"))(allow ⇒ _.setAllowCoreThreadTimeout(allow)),
|
conf_?(Some(config getBoolean "allow-core-timeout"))(allow ⇒ _.setAllowCoreThreadTimeout(allow)),
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ import akka.actor.ActorSystem
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import akka.actor.Scheduler
|
import akka.actor.Scheduler
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
|
* An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed
|
||||||
|
|
@ -34,11 +35,11 @@ class BalancingDispatcher(
|
||||||
_prerequisites: DispatcherPrerequisites,
|
_prerequisites: DispatcherPrerequisites,
|
||||||
_name: String,
|
_name: String,
|
||||||
throughput: Int,
|
throughput: Int,
|
||||||
throughputDeadlineTime: Int,
|
throughputDeadlineTime: Duration,
|
||||||
mailboxType: MailboxType,
|
mailboxType: MailboxType,
|
||||||
config: ThreadPoolConfig,
|
config: ThreadPoolConfig,
|
||||||
_timeoutMs: Long)
|
_shutdownTimeout: Duration)
|
||||||
extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) {
|
extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _shutdownTimeout) {
|
||||||
|
|
||||||
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
|
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
|
||||||
val rebalance = new AtomicBoolean(false)
|
val rebalance = new AtomicBoolean(false)
|
||||||
|
|
|
||||||
|
|
@ -11,6 +11,7 @@ import akka.actor.{ ActorCell, ActorKilledException }
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import akka.actor.Scheduler
|
import akka.actor.Scheduler
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Default settings are:
|
* Default settings are:
|
||||||
|
|
@ -67,10 +68,10 @@ class Dispatcher(
|
||||||
_prerequisites: DispatcherPrerequisites,
|
_prerequisites: DispatcherPrerequisites,
|
||||||
val name: String,
|
val name: String,
|
||||||
val throughput: Int,
|
val throughput: Int,
|
||||||
val throughputDeadlineTime: Int,
|
val throughputDeadlineTime: Duration,
|
||||||
val mailboxType: MailboxType,
|
val mailboxType: MailboxType,
|
||||||
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
|
executorServiceFactoryProvider: ExecutorServiceFactoryProvider,
|
||||||
val timeoutMs: Long)
|
val shutdownTimeout: Duration)
|
||||||
extends MessageDispatcher(_prerequisites) {
|
extends MessageDispatcher(_prerequisites) {
|
||||||
|
|
||||||
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
|
protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
|
||||||
|
|
|
||||||
|
|
@ -58,11 +58,9 @@ case class DefaultDispatcherPrerequisites(
|
||||||
*/
|
*/
|
||||||
class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: DispatcherPrerequisites) {
|
class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: DispatcherPrerequisites) {
|
||||||
|
|
||||||
val ThroughputDeadlineTimeMillis = settings.DispatcherThroughputDeadlineTime.toMillis.toInt
|
|
||||||
val MailboxType: MailboxType =
|
val MailboxType: MailboxType =
|
||||||
if (settings.MailboxCapacity < 1) UnboundedMailbox()
|
if (settings.MailboxCapacity < 1) UnboundedMailbox()
|
||||||
else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout)
|
else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout)
|
||||||
val DispatcherShutdownMillis = settings.DispatcherDefaultShutdown.toMillis
|
|
||||||
|
|
||||||
val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")
|
val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher")
|
||||||
|
|
||||||
|
|
@ -77,8 +75,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
|
def newPinnedDispatcher(actor: LocalActorRef) = actor match {
|
||||||
case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", MailboxType, DispatcherShutdownMillis)
|
case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", MailboxType, settings.DispatcherDefaultShutdown)
|
||||||
case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, MailboxType, DispatcherShutdownMillis)
|
case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, MailboxType, settings.DispatcherDefaultShutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -88,8 +86,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
|
def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match {
|
||||||
case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", mailboxType, DispatcherShutdownMillis)
|
case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", mailboxType, settings.DispatcherDefaultShutdown)
|
||||||
case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, mailboxType, DispatcherShutdownMillis)
|
case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, mailboxType, settings.DispatcherDefaultShutdown)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -98,7 +96,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newPinnedDispatcher(name: String, mailboxType: MailboxType) =
|
def newPinnedDispatcher(name: String, mailboxType: MailboxType) =
|
||||||
new PinnedDispatcher(prerequisites, null, name, mailboxType, DispatcherShutdownMillis)
|
new PinnedDispatcher(prerequisites, null, name, mailboxType, settings.DispatcherDefaultShutdown)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||||
|
|
@ -106,7 +104,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
* E.g. each actor consumes its own thread.
|
* E.g. each actor consumes its own thread.
|
||||||
*/
|
*/
|
||||||
def newPinnedDispatcher(name: String) =
|
def newPinnedDispatcher(name: String) =
|
||||||
new PinnedDispatcher(prerequisites, null, name, MailboxType, DispatcherShutdownMillis)
|
new PinnedDispatcher(prerequisites, null, name, MailboxType, settings.DispatcherDefaultShutdown)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -115,7 +113,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
*/
|
*/
|
||||||
def newDispatcher(name: String) =
|
def newDispatcher(name: String) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(prerequisites, name, settings.DispatcherThroughput,
|
ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(prerequisites, name, settings.DispatcherThroughput,
|
||||||
ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -124,16 +122,17 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
*/
|
*/
|
||||||
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new Dispatcher(prerequisites, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
new Dispatcher(prerequisites, name, throughput, settings.DispatcherThroughputDeadlineTime, mailboxType,
|
||||||
|
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
def newDispatcher(name: String, throughput: Int, throughputDeadline: Duration, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new Dispatcher(prerequisites, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
new Dispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -142,7 +141,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
*/
|
*/
|
||||||
def newBalancingDispatcher(name: String) =
|
def newBalancingDispatcher(name: String) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(prerequisites, name, settings.DispatcherThroughput,
|
ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(prerequisites, name, settings.DispatcherThroughput,
|
||||||
ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -151,7 +150,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
*/
|
*/
|
||||||
def newBalancingDispatcher(name: String, throughput: Int) =
|
def newBalancingDispatcher(name: String, throughput: Int) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new BalancingDispatcher(prerequisites, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
new BalancingDispatcher(prerequisites, name, throughput, settings.DispatcherThroughputDeadlineTime, MailboxType,
|
||||||
|
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
|
|
@ -160,16 +160,18 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc
|
||||||
*/
|
*/
|
||||||
def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new BalancingDispatcher(prerequisites, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
new BalancingDispatcher(prerequisites, name, throughput, settings.DispatcherThroughputDeadlineTime, mailboxType,
|
||||||
|
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
* Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Has a fluent builder interface for configuring its semantics.
|
* Has a fluent builder interface for configuring its semantics.
|
||||||
*/
|
*/
|
||||||
def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) =
|
def newBalancingDispatcher(name: String, throughput: Int, throughputDeadline: Duration, mailboxType: MailboxType) =
|
||||||
ThreadPoolConfigDispatcherBuilder(config ⇒
|
ThreadPoolConfigDispatcherBuilder(config ⇒
|
||||||
new BalancingDispatcher(prerequisites, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig())
|
new BalancingDispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType,
|
||||||
|
config, settings.DispatcherDefaultShutdown), ThreadPoolConfig())
|
||||||
/**
|
/**
|
||||||
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
* Utility function that tries to load the specified dispatcher config from the akka.conf
|
||||||
* or else use the supplied default dispatcher
|
* or else use the supplied default dispatcher
|
||||||
|
|
@ -238,10 +240,10 @@ class DispatcherConfigurator() extends MessageDispatcherConfigurator() {
|
||||||
threadPoolConfig ⇒ new Dispatcher(prerequisites,
|
threadPoolConfig ⇒ new Dispatcher(prerequisites,
|
||||||
config.getString("name"),
|
config.getString("name"),
|
||||||
config.getInt("throughput"),
|
config.getInt("throughput"),
|
||||||
config.getInt("throughput-deadline-time"),
|
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||||
mailboxType(config, settings),
|
mailboxType(config, settings),
|
||||||
threadPoolConfig,
|
threadPoolConfig,
|
||||||
settings.DispatcherDefaultShutdown.toMillis)).build
|
settings.DispatcherDefaultShutdown)).build
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -252,9 +254,9 @@ class BalancingDispatcherConfigurator() extends MessageDispatcherConfigurator()
|
||||||
threadPoolConfig ⇒ new BalancingDispatcher(prerequisites,
|
threadPoolConfig ⇒ new BalancingDispatcher(prerequisites,
|
||||||
config.getString("name"),
|
config.getString("name"),
|
||||||
config.getInt("throughput"),
|
config.getInt("throughput"),
|
||||||
config.getInt("throughput-deadline-time"),
|
Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS),
|
||||||
mailboxType(config, settings),
|
mailboxType(config, settings),
|
||||||
threadPoolConfig,
|
threadPoolConfig,
|
||||||
settings.DispatcherDefaultShutdown.toMillis)).build
|
settings.DispatcherDefaultShutdown)).build
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -157,7 +157,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag
|
||||||
if (nextMessage ne null) { //If we have a message
|
if (nextMessage ne null) { //If we have a message
|
||||||
if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping
|
if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping
|
||||||
var processedMessages = 0
|
var processedMessages = 0
|
||||||
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0
|
val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0
|
||||||
do {
|
do {
|
||||||
if (debug) println(actor.self + " processing message " + nextMessage)
|
if (debug) println(actor.self + " processing message " + nextMessage)
|
||||||
actor invoke nextMessage
|
actor invoke nextMessage
|
||||||
|
|
|
||||||
|
|
@ -9,6 +9,8 @@ import akka.actor.ActorCell
|
||||||
import akka.actor.ActorSystem
|
import akka.actor.ActorSystem
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
import akka.actor.Scheduler
|
import akka.actor.Scheduler
|
||||||
|
import akka.util.Duration
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
|
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
|
||||||
|
|
@ -20,14 +22,14 @@ class PinnedDispatcher(
|
||||||
_actor: ActorCell,
|
_actor: ActorCell,
|
||||||
_name: String,
|
_name: String,
|
||||||
_mailboxType: MailboxType,
|
_mailboxType: MailboxType,
|
||||||
_timeoutMs: Long)
|
_shutdownTimeout: Duration)
|
||||||
extends Dispatcher(_prerequisites,
|
extends Dispatcher(_prerequisites,
|
||||||
_name,
|
_name,
|
||||||
Int.MaxValue,
|
Int.MaxValue,
|
||||||
-1,
|
Duration.Zero,
|
||||||
_mailboxType,
|
_mailboxType,
|
||||||
ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1),
|
ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1),
|
||||||
_timeoutMs) {
|
_shutdownTimeout) {
|
||||||
|
|
||||||
@volatile
|
@volatile
|
||||||
protected[akka] var owner: ActorCell = _actor
|
protected[akka] var owner: ActorCell = _actor
|
||||||
|
|
|
||||||
|
|
@ -10,10 +10,10 @@ akka {
|
||||||
beanstalk {
|
beanstalk {
|
||||||
hostname = "127.0.0.1"
|
hostname = "127.0.0.1"
|
||||||
port = 11300
|
port = 11300
|
||||||
reconnect-window = 5
|
reconnect-window = 5s
|
||||||
message-submit-delay = 0
|
message-submit-delay = 0s
|
||||||
message-submit-timeout = 5
|
message-submit-timeout = 5s
|
||||||
message-time-to-live = 120
|
message-time-to-live = 120s
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,7 @@ package akka.actor.mailbox
|
||||||
|
|
||||||
import com.surftools.BeanstalkClient._
|
import com.surftools.BeanstalkClient._
|
||||||
import com.surftools.BeanstalkClientImpl._
|
import com.surftools.BeanstalkClientImpl._
|
||||||
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.actor.LocalActorRef
|
import akka.actor.LocalActorRef
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
|
|
@ -22,11 +23,10 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner)
|
||||||
|
|
||||||
val hostname = system.settings.config.getString("akka.actor.mailbox.beanstalk.hostname")
|
val hostname = system.settings.config.getString("akka.actor.mailbox.beanstalk.hostname")
|
||||||
val port = system.settings.config.getInt("akka.actor.mailbox.beanstalk.port")
|
val port = system.settings.config.getInt("akka.actor.mailbox.beanstalk.port")
|
||||||
def defaultTimeUnit = system.settings.DefaultTimeUnit
|
val reconnectWindow = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS).toSeconds.toInt
|
||||||
val reconnectWindow = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.reconnect-window"), defaultTimeUnit).toSeconds.toInt
|
val messageSubmitDelay = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS).toSeconds.toInt
|
||||||
val messageSubmitDelay = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-delay"), defaultTimeUnit).toSeconds.toInt
|
val messageSubmitTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS).toSeconds.toInt
|
||||||
val messageSubmitTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout"), defaultTimeUnit).toSeconds.toInt
|
val messageTimeToLive = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS).toSeconds.toInt
|
||||||
val messageTimeToLive = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live"), defaultTimeUnit).toSeconds.toInt
|
|
||||||
|
|
||||||
val log = Logging(system, "BeanstalkBasedMailbox")
|
val log = Logging(system, "BeanstalkBasedMailbox")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -21,6 +21,8 @@ import java.io._
|
||||||
import scala.collection.mutable
|
import scala.collection.mutable
|
||||||
import akka.event.LoggingAdapter
|
import akka.event.LoggingAdapter
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
import akka.util.Duration
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
// a config value that's backed by a global setting but may be locally overridden
|
// a config value that's backed by a global setting but may be locally overridden
|
||||||
class OverlaySetting[T](base: ⇒ T) {
|
class OverlaySetting[T](base: ⇒ T) {
|
||||||
|
|
@ -131,7 +133,7 @@ class PersistentQueue(persistencePath: String, val name: String, val config: Con
|
||||||
maxItems set Some(config.getInt("akka.actor.mailbox.file-based.max-items"))
|
maxItems set Some(config.getInt("akka.actor.mailbox.file-based.max-items"))
|
||||||
maxSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size"))
|
maxSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size"))
|
||||||
maxItemSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size"))
|
maxItemSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size"))
|
||||||
maxAge set Some(config.getInt("akka.actor.mailbox.file-based.max-age"))
|
maxAge set Some(Duration(config.getMilliseconds("akka.actor.mailbox.file-based.max-age"), TimeUnit.MILLISECONDS).toSeconds.toInt)
|
||||||
maxJournalSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size"))
|
maxJournalSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size"))
|
||||||
maxMemorySize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size"))
|
maxMemorySize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size"))
|
||||||
maxJournalOverflow set Some(config.getInt("akka.actor.mailbox.file-based.max-journal-overflow"))
|
maxJournalOverflow set Some(config.getInt("akka.actor.mailbox.file-based.max-journal-overflow"))
|
||||||
|
|
|
||||||
|
|
@ -13,8 +13,8 @@ akka {
|
||||||
|
|
||||||
# Configurable timeouts for certain ops
|
# Configurable timeouts for certain ops
|
||||||
timeout {
|
timeout {
|
||||||
read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future
|
read = 3000ms # time to wait for a read to succeed before timing out the future
|
||||||
write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future
|
write = 3000ms # time to wait for a write to succeed before timing out the future
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,6 +12,8 @@ import akka.dispatch.Envelope
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.dispatch.DefaultPromise
|
import akka.dispatch.DefaultPromise
|
||||||
import akka.actor.ActorRef
|
import akka.actor.ActorRef
|
||||||
|
import akka.util.Duration
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class MongoBasedMailboxException(message: String) extends AkkaException(message)
|
class MongoBasedMailboxException(message: String) extends AkkaException(message)
|
||||||
|
|
||||||
|
|
@ -36,8 +38,8 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) {
|
||||||
val WRITE_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.write"
|
val WRITE_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.write"
|
||||||
val READ_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.read"
|
val READ_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.read"
|
||||||
val mongoURI = if (config.hasPath(URI_CONFIG_KEY)) Some(config.getString(URI_CONFIG_KEY)) else None
|
val mongoURI = if (config.hasPath(URI_CONFIG_KEY)) Some(config.getString(URI_CONFIG_KEY)) else None
|
||||||
val writeTimeout = config.getInt(WRITE_TIMEOUT_KEY)
|
val writeTimeout = Duration(config.getMilliseconds(WRITE_TIMEOUT_KEY), TimeUnit.MILLISECONDS)
|
||||||
val readTimeout = config.getInt(READ_TIMEOUT_KEY)
|
val readTimeout = Duration(config.getInt(READ_TIMEOUT_KEY), TimeUnit.MILLISECONDS)
|
||||||
|
|
||||||
val log = Logging(system, "MongoBasedMailbox")
|
val log = Logging(system, "MongoBasedMailbox")
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -9,8 +9,8 @@ akka {
|
||||||
mailbox {
|
mailbox {
|
||||||
zookeeper {
|
zookeeper {
|
||||||
server-addresses = "127.0.0.1:2181"
|
server-addresses = "127.0.0.1:2181"
|
||||||
session-timeout = 60
|
session-timeout = 60s
|
||||||
connection-timeout = 60
|
connection-timeout = 60s
|
||||||
blocking-queue = on
|
blocking-queue = on
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -3,6 +3,7 @@
|
||||||
*/
|
*/
|
||||||
package akka.actor.mailbox
|
package akka.actor.mailbox
|
||||||
|
|
||||||
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import akka.actor.LocalActorRef
|
import akka.actor.LocalActorRef
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import akka.AkkaException
|
import akka.AkkaException
|
||||||
|
|
@ -22,9 +23,8 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess
|
||||||
class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
|
class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization {
|
||||||
|
|
||||||
val zkServerAddresses = system.settings.config.getString("akka.actor.mailbox.zookeeper.server-addresses")
|
val zkServerAddresses = system.settings.config.getString("akka.actor.mailbox.zookeeper.server-addresses")
|
||||||
def defaultTimeUnit = system.settings.DefaultTimeUnit
|
val sessionTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS)
|
||||||
val sessionTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.zookeeper.session-timeout"), defaultTimeUnit).toMillis.toInt
|
val connectionTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS)
|
||||||
val connectionTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.zookeeper.connection-timeout"), defaultTimeUnit).toMillis.toInt
|
|
||||||
val blockingQueue = system.settings.config.getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
|
val blockingQueue = system.settings.config.getBoolean("akka.actor.mailbox.zookeeper.blocking-queue")
|
||||||
|
|
||||||
val queueNode = "/queues"
|
val queueNode = "/queues"
|
||||||
|
|
|
||||||
|
|
@ -6,15 +6,16 @@ package akka.cluster.zookeeper
|
||||||
import org.I0Itec.zkclient._
|
import org.I0Itec.zkclient._
|
||||||
import org.I0Itec.zkclient.serialize._
|
import org.I0Itec.zkclient.serialize._
|
||||||
import org.I0Itec.zkclient.exception._
|
import org.I0Itec.zkclient.exception._
|
||||||
|
import akka.util.Duration
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* ZooKeeper client. Holds the ZooKeeper connection and manages its session.
|
* ZooKeeper client. Holds the ZooKeeper connection and manages its session.
|
||||||
*/
|
*/
|
||||||
class AkkaZkClient(zkServers: String,
|
class AkkaZkClient(zkServers: String,
|
||||||
sessionTimeout: Int,
|
sessionTimeout: Duration,
|
||||||
connectionTimeout: Int,
|
connectionTimeout: Duration,
|
||||||
zkSerializer: ZkSerializer = new SerializableSerializer)
|
zkSerializer: ZkSerializer = new SerializableSerializer)
|
||||||
extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) {
|
extends ZkClient(zkServers, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt, zkSerializer) {
|
||||||
|
|
||||||
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
|
def connection: ZkConnection = _connection.asInstanceOf[ZkConnection]
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -20,6 +20,7 @@ import com.eaio.uuid.UUID
|
||||||
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
|
import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression }
|
||||||
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
|
import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher }
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
|
* Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc.
|
||||||
|
|
@ -35,7 +36,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) {
|
||||||
|
|
||||||
// TODO move to settings?
|
// TODO move to settings?
|
||||||
val shouldCompressData = config.getBoolean("akka.remote.use-compression")
|
val shouldCompressData = config.getBoolean("akka.remote.use-compression")
|
||||||
val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout"), DefaultTimeUnit).toMillis.toInt
|
val remoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS)
|
||||||
|
|
||||||
val failureDetector = new AccrualFailureDetector(system)
|
val failureDetector = new AccrualFailureDetector(system)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,22 +6,22 @@ package akka.remote
|
||||||
|
|
||||||
import akka.util.Duration
|
import akka.util.Duration
|
||||||
import akka.config.ConfigurationException
|
import akka.config.ConfigurationException
|
||||||
import java.util.concurrent.TimeUnit
|
import java.util.concurrent.TimeUnit.MILLISECONDS
|
||||||
import com.typesafe.config.Config
|
import com.typesafe.config.Config
|
||||||
|
|
||||||
class RemoteClientSettings(config: Config, defaultTimeUnit: TimeUnit) {
|
class RemoteClientSettings(config: Config) {
|
||||||
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") match {
|
val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") match {
|
||||||
case "" ⇒ None
|
case "" ⇒ None
|
||||||
case cookie ⇒ Some(cookie)
|
case cookie ⇒ Some(cookie)
|
||||||
}
|
}
|
||||||
|
|
||||||
val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window"), defaultTimeUnit).toMillis
|
val RECONNECTION_TIME_WINDOW = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS).toMillis
|
||||||
val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout"), defaultTimeUnit)
|
val READ_TIMEOUT = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS)
|
||||||
val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay"), defaultTimeUnit)
|
val RECONNECT_DELAY = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS)
|
||||||
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size")
|
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size")
|
||||||
}
|
}
|
||||||
|
|
||||||
class RemoteServerSettings(config: Config, defaultTimeUnit: TimeUnit) {
|
class RemoteServerSettings(config: Config) {
|
||||||
import scala.collection.JavaConverters._
|
import scala.collection.JavaConverters._
|
||||||
val isRemotingEnabled = config.getStringList("akka.enabled-modules").asScala.exists(_ == "cluster") //TODO FIXME Shouldn't this be "remote"?
|
val isRemotingEnabled = config.getStringList("akka.enabled-modules").asScala.exists(_ == "cluster") //TODO FIXME Shouldn't this be "remote"?
|
||||||
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size")
|
val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size")
|
||||||
|
|
@ -40,7 +40,7 @@ class RemoteServerSettings(config: Config, defaultTimeUnit: TimeUnit) {
|
||||||
|
|
||||||
val UNTRUSTED_MODE = config.getBoolean("akka.remote.server.untrusted-mode")
|
val UNTRUSTED_MODE = config.getBoolean("akka.remote.server.untrusted-mode")
|
||||||
val PORT = config.getInt("akka.remote.server.port")
|
val PORT = config.getInt("akka.remote.server.port")
|
||||||
val CONNECTION_TIMEOUT = Duration(config.getInt("akka.remote.server.connection-timeout"), defaultTimeUnit)
|
val CONNECTION_TIMEOUT = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS)
|
||||||
|
|
||||||
val BACKLOG = config.getInt("akka.remote.server.backlog")
|
val BACKLOG = config.getInt("akka.remote.server.backlog")
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -353,8 +353,8 @@ class ActiveRemoteClientHandler(
|
||||||
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
|
class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps {
|
||||||
val log = Logging(system, "NettyRemoteSupport")
|
val log = Logging(system, "NettyRemoteSupport")
|
||||||
|
|
||||||
val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit)
|
val serverSettings = new RemoteServerSettings(system.settings.config)
|
||||||
val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit)
|
val clientSettings = new RemoteClientSettings(system.settings.config)
|
||||||
|
|
||||||
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
|
private val remoteClients = new HashMap[RemoteAddress, RemoteClient]
|
||||||
private val clientsLock = new ReentrantReadWriteLock
|
private val clientsLock = new ReentrantReadWriteLock
|
||||||
|
|
|
||||||
|
|
@ -10,8 +10,7 @@ akka {
|
||||||
stm {
|
stm {
|
||||||
fair = on # Should global transactions be fair or non-fair (non fair yield better performance)
|
fair = on # Should global transactions be fair or non-fair (non fair yield better performance)
|
||||||
max-retries = 1000
|
max-retries = 1000
|
||||||
timeout = 5 # Default timeout for blocking transactions and transaction set (in unit defined by
|
timeout = 5s # Default timeout for blocking transactions and transaction set
|
||||||
# the time-unit property)
|
|
||||||
write-skew = on
|
write-skew = on
|
||||||
blocking-allowed = off
|
blocking-allowed = off
|
||||||
interruptible = off
|
interruptible = off
|
||||||
|
|
|
||||||
|
|
@ -151,7 +151,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
|
||||||
def sendOff(f: T ⇒ T): Unit = {
|
def sendOff(f: T ⇒ T): Unit = {
|
||||||
send((value: T) ⇒ {
|
send((value: T) ⇒ {
|
||||||
suspend()
|
suspend()
|
||||||
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-send-off", UnboundedMailbox(), system.settings.ActorTimeoutMillis)
|
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-send-off", UnboundedMailbox(), system.settings.ActorTimeout.duration)
|
||||||
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
||||||
threadBased ! Update(f)
|
threadBased ! Update(f)
|
||||||
value
|
value
|
||||||
|
|
@ -169,7 +169,7 @@ class Agent[T](initialValue: T, system: ActorSystem) {
|
||||||
val result = new DefaultPromise[T](timeout)(system.dispatcher)
|
val result = new DefaultPromise[T](timeout)(system.dispatcher)
|
||||||
send((value: T) ⇒ {
|
send((value: T) ⇒ {
|
||||||
suspend()
|
suspend()
|
||||||
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeoutMillis)
|
val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration)
|
||||||
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher))
|
||||||
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
|
result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]]
|
||||||
value
|
value
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec
|
||||||
getString("akka.stm.propagation") must equal("requires")
|
getString("akka.stm.propagation") must equal("requires")
|
||||||
getBoolean("akka.stm.quick-release") must equal(true)
|
getBoolean("akka.stm.quick-release") must equal(true)
|
||||||
getBoolean("akka.stm.speculative") must equal(true)
|
getBoolean("akka.stm.speculative") must equal(true)
|
||||||
getLong("akka.stm.timeout") must equal(5)
|
getMilliseconds("akka.stm.timeout") must equal(5 * 1000)
|
||||||
getString("akka.stm.trace-level") must equal("none")
|
getString("akka.stm.trace-level") must equal("none")
|
||||||
getBoolean("akka.stm.write-skew") must equal(true)
|
getBoolean("akka.stm.write-skew") must equal(true)
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -14,6 +14,8 @@ import akka.actor.{ ActorCell, ActorRef, ActorSystem }
|
||||||
import akka.dispatch._
|
import akka.dispatch._
|
||||||
import akka.actor.Scheduler
|
import akka.actor.Scheduler
|
||||||
import akka.event.EventStream
|
import akka.event.EventStream
|
||||||
|
import akka.util.Duration
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Locking rules:
|
* Locking rules:
|
||||||
|
|
@ -122,10 +124,10 @@ class CallingThreadDispatcher(
|
||||||
protected[akka] override def shutdown() {}
|
protected[akka] override def shutdown() {}
|
||||||
|
|
||||||
protected[akka] override def throughput = 0
|
protected[akka] override def throughput = 0
|
||||||
protected[akka] override def throughputDeadlineTime = 0
|
protected[akka] override def throughputDeadlineTime = Duration.Zero
|
||||||
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false
|
protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false
|
||||||
|
|
||||||
protected[akka] override def timeoutMs = 100L
|
protected[akka] override def shutdownTimeout = Duration(100L, TimeUnit.MILLISECONDS)
|
||||||
|
|
||||||
override def suspend(actor: ActorCell) {
|
override def suspend(actor: ActorCell) {
|
||||||
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
|
getMailbox(actor) foreach (_.suspendSwitch.switchOn)
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue