diff --git a/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala index 4d5cae60b4..e5c98ec5ff 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/ClusterSpec.scala @@ -16,10 +16,10 @@ class ClusterSpec extends AkkaSpec { getString("akka.cluster.name") must equal("test-cluster") getString("akka.cluster.zookeeper-server-addresses") must equal("localhost:2181") getInt("akka.remote.server.port") must equal(2552) - getInt("akka.cluster.max-time-to-wait-until-connected") must equal(30) - getInt("akka.cluster.session-timeout") must equal(60) - getInt("akka.cluster.connection-timeout") must equal(60) - getInt("akka.remote.remote-daemon-ack-timeout") must equal(30) + getMilliseconds("akka.cluster.max-time-to-wait-until-connected") must equal(30 * 1000) + getMilliseconds("akka.cluster.session-timeout") must equal(60 * 1000) + getMilliseconds("akka.cluster.connection-timeout") must equal(60 * 1000) + getMilliseconds("akka.remote.remote-daemon-ack-timeout") must equal(30 * 1000) getBoolean("akka.cluster.include-ref-node-in-replica-set") must equal(true) getString("akka.remote.layer") must equal("akka.cluster.netty.NettyRemoteSupport") getString("akka.remote.secure-cookie") must equal("") @@ -32,12 +32,12 @@ class ClusterSpec extends AkkaSpec { getInt("akka.cluster.replication.ensemble-size") must equal(3) getInt("akka.cluster.replication.quorum-size") must equal(2) getInt("akka.cluster.replication.snapshot-frequency") must equal(1000) - getInt("akka.cluster.replication.timeout") must equal(30) + getMilliseconds("akka.cluster.replication.timeout") must equal(30 * 1000) //akka.remote.server getInt("akka.remote.server.port") must equal(2552) getInt("akka.remote.server.message-frame-size") must equal(1048576) - getInt("akka.remote.server.connection-timeout") must equal(120) + getMilliseconds("akka.remote.server.connection-timeout") must equal(120 * 1000) getBoolean("akka.remote.server.require-cookie") must equal(false) getBoolean("akka.remote.server.untrusted-mode") must equal(false) getInt("akka.remote.server.backlog") must equal(4096) @@ -45,10 +45,10 @@ class ClusterSpec extends AkkaSpec { //akka.remote.client getBoolean("akka.remote.client.buffering.retry-message-send-on-failure") must equal(false) getInt("akka.remote.client.buffering.capacity") must equal(-1) - getInt("akka.remote.client.reconnect-delay") must equal(5) - getInt("akka.remote.client.read-timeout") must equal(3600) - getInt("akka.remote.client.reap-futures-delay") must equal(5) - getInt("akka.remote.client.reconnection-time-window") must equal(600) + getMilliseconds("akka.remote.client.reconnect-delay") must equal(5 * 1000) + getMilliseconds("akka.remote.client.read-timeout") must equal(3600 * 1000) + getMilliseconds("akka.remote.client.reap-futures-delay") must equal(5 * 1000) + getMilliseconds("akka.remote.client.reconnection-time-window") must equal(600 * 1000) } } } diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 40013a1c80..9ff7ee86fb 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -142,7 +142,7 @@ object ActorModelSpec { def assertDispatcher(dispatcher: MessageDispatcherInterceptor)( stops: Long = dispatcher.stops.get())(implicit system: ActorSystem) { - val deadline = System.currentTimeMillis + dispatcher.timeoutMs * 5 + val deadline = System.currentTimeMillis + dispatcher.shutdownTimeout.toMillis * 5 try { await(deadline)(stops == dispatcher.stops.get) } catch { @@ -421,8 +421,8 @@ class DispatcherModelSpec extends ActorModelSpec { def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, "foo", system.settings.DispatcherThroughput, - system.dispatcherFactory.ThroughputDeadlineTimeMillis, system.dispatcherFactory.MailboxType, - config, system.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor, + system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, + config, system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor, ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Dispatcher" @@ -458,8 +458,8 @@ class BalancingDispatcherModelSpec extends ActorModelSpec { def newInterceptedDispatcher = ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(system.dispatcherFactory.prerequisites, "foo", 1, // TODO check why 1 here? (came from old test) - system.dispatcherFactory.ThroughputDeadlineTimeMillis, system.dispatcherFactory.MailboxType, - config, system.dispatcherFactory.DispatcherShutdownMillis) with MessageDispatcherInterceptor, + system.settings.DispatcherThroughputDeadlineTime, system.dispatcherFactory.MailboxType, + config, system.settings.DispatcherDefaultShutdown) with MessageDispatcherInterceptor, ThreadPoolConfig()).build.asInstanceOf[MessageDispatcherInterceptor] def dispatcherType = "Balancing Dispatcher" diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala index d2bd4e9c2d..0084cc0ae5 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/DispatcherActorSpec.scala @@ -5,6 +5,8 @@ import java.util.concurrent.atomic.{ AtomicBoolean, AtomicInteger } import akka.testkit.{ filterEvents, EventFilter, AkkaSpec } import akka.dispatch.{ PinnedDispatcher, Dispatchers, Dispatcher } import akka.actor.{ Props, Actor } +import akka.util.Duration +import akka.util.duration._ object DispatcherActorSpec { class TestActor extends Actor { @@ -48,7 +50,7 @@ class DispatcherActorSpec extends AkkaSpec { "respect the throughput setting" in { val throughputDispatcher = system.dispatcherFactory. - newDispatcher("THROUGHPUT", 101, 0, system.dispatcherFactory.MailboxType). + newDispatcher("THROUGHPUT", 101, Duration.Zero, system.dispatcherFactory.MailboxType). setCorePoolSize(1). build @@ -75,9 +77,9 @@ class DispatcherActorSpec extends AkkaSpec { } "respect throughput deadline" in { - val deadlineMs = 100 + val deadline = 100 millis val throughputDispatcher = system.dispatcherFactory. - newDispatcher("THROUGHPUT", 2, deadlineMs, system.dispatcherFactory.MailboxType). + newDispatcher("THROUGHPUT", 2, deadline, system.dispatcherFactory.MailboxType). setCorePoolSize(1). build val works = new AtomicBoolean(true) @@ -100,7 +102,7 @@ class DispatcherActorSpec extends AkkaSpec { slowOne ! "ping" fastOne ! "ping" assert(ready.await(2, TimeUnit.SECONDS) === true) - Thread.sleep(deadlineMs + 10) // wait just a bit more than the deadline + Thread.sleep(deadline.toMillis + 10) // wait just a bit more than the deadline start.countDown() assert(latch.await(2, TimeUnit.SECONDS) === true) } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index aa82316aec..17ed0112f0 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -24,26 +24,24 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec import config._ getList("akka.boot").asScala.toSeq must equal(Nil) - getString("akka.time-unit") must equal("seconds") - settings.DefaultTimeUnit must equal(TimeUnit.SECONDS) getString("akka.version") must equal("2.0-SNAPSHOT") settings.ConfigVersion must equal("2.0-SNAPSHOT") getString("akka.actor.default-dispatcher.type") must equal("Dispatcher") - getInt("akka.actor.default-dispatcher.keep-alive-time") must equal(60) + getMilliseconds("akka.actor.default-dispatcher.keep-alive-time") must equal(60 * 1000) getDouble("akka.actor.default-dispatcher.core-pool-size-factor") must equal(8.0) getDouble("akka.actor.default-dispatcher.max-pool-size-factor") must equal(8.0) getInt("akka.actor.default-dispatcher.task-queue-size") must equal(-1) getString("akka.actor.default-dispatcher.task-queue-type") must equal("linked") getBoolean("akka.actor.default-dispatcher.allow-core-timeout") must equal(true) getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) - getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10) - getLong("akka.actor.dispatcher-shutdown-timeout") must equal(1) + getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) + getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS)) getInt("akka.actor.default-dispatcher.throughput") must equal(5) settings.DispatcherThroughput must equal(5) - getInt("akka.actor.default-dispatcher.throughput-deadline-time") must equal(-1) - settings.DispatcherThroughputDeadlineTime must equal(Duration(-1, TimeUnit.SECONDS)) + getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0) + settings.DispatcherThroughputDeadlineTime must equal(Duration.Zero) getBoolean("akka.actor.serialize-messages") must equal(false) settings.SerializeAllMessages must equal(false) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala index d73009377a..282835e6fc 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/PriorityDispatcherSpec.scala @@ -2,6 +2,7 @@ package akka.dispatch import akka.actor.{ Props, LocalActorRef, Actor } import akka.testkit.AkkaSpec +import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class PriorityDispatcherSpec extends AkkaSpec { @@ -23,7 +24,7 @@ class PriorityDispatcherSpec extends AkkaSpec { } def testOrdering(mboxType: MailboxType) { - val dispatcher = system.dispatcherFactory.newDispatcher("Test", 1, -1, mboxType).build + val dispatcher = system.dispatcherFactory.newDispatcher("Test", 1, Duration.Zero, mboxType).build val actor = actorOf(Props(new Actor { var acc: List[Int] = Nil diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index f6674fa4bf..75aef7708f 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -6,6 +6,7 @@ import akka.actor._ import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } import akka.dispatch._ import java.util.concurrent.ThreadPoolExecutor.AbortPolicy +import akka.util.Duration // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -14,7 +15,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - 0, UnboundedMailbox(), config, 60000), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients) .build diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index b189b737e5..4e1e2eff6c 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -12,8 +12,6 @@ akka { enabled-modules = [] # Comma separated list of the enabled modules. Options: ["cluster", "camel", "http"] - time-unit = "seconds" # Time unit for all timeout properties throughout the config - event-handlers = ["akka.event.Logging$DefaultLogger"] # Event handlers to register at boot time (Logging$DefaultLogger logs to STDOUT) loglevel = "WARNING" # Options: ERROR, WARNING, INFO, DEBUG # this level is used by the configured loggers (see "event-handlers") as soon @@ -25,7 +23,7 @@ akka { # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor name = "EventHandlerDispatcher" # Optional, will be a generated UUID if omitted - keep-alive-time = 60 # Keep alive time for threads + keep-alive-time = 60s # Keep alive time for threads core-pool-size = 1 # No of core threads max-pool-size = 8 # Max no of threads executor-bounds = -1 # Makes the Executor bounded, -1 is unbounded @@ -33,13 +31,12 @@ akka { task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness - throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline + throughput-deadline-time = 0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) # If positive then a bounded mailbox is used and the capacity is set using the property # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care # The following are only used for Dispatcher and only if mailbox-capacity > 0 - mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout - # (in unit defined by the time-unit property) + mailbox-push-timeout-time = 10s # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout } # These boot classes are loaded (and created) automatically when the Akka Microkernel boots up @@ -53,12 +50,12 @@ akka { actor { provider = "akka.actor.LocalActorRefProvider" - timeout = 5 # Default timeout for Future based invocations - # - Actor: ask && ? - # - UntypedActor: ask - # - TypedActor: methods with non-void return type - serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability - dispatcher-shutdown-timeout = 1 # Using the akka.time-unit, how long dispatchers by default will wait for new actors until they shut down + timeout = 5s # Default timeout for Future based invocations + # - Actor: ask && ? + # - UntypedActor: ask + # - TypedActor: methods with non-void return type + serialize-messages = off # Does a deep clone of (non-primitive) messages to ensure immutability + dispatcher-shutdown-timeout = 1s # How long dispatchers by default will wait for new actors until they shut down deployment { @@ -120,20 +117,19 @@ akka { # Dispatcher, (BalancingDispatcher, only valid when all actors using it are of the same type), # A FQCN to a class inheriting MessageDispatcherConfigurator with a no-arg visible constructor name = "DefaultDispatcher" # Optional, will be a generated UUID if omitted - keep-alive-time = 60 # Keep alive time for threads + keep-alive-time = 60s # Keep alive time for threads core-pool-size-factor = 8.0 # No of core threads ... ceil(available processors * factor) max-pool-size-factor = 8.0 # Max no of threads ... ceil(available processors * factor) task-queue-size = -1 # Specifies the bounded capacity of the task queue (< 1 == unbounded) task-queue-type = "linked" # Specifies which type of task queue will be used, can be "array" or "linked" (default) allow-core-timeout = on # Allow core threads to time out throughput = 5 # Throughput for Dispatcher, set to 1 for complete fairness - throughput-deadline-time = -1 # Throughput deadline for Dispatcher, set to 0 or negative for no deadline + throughput-deadline-time = -0ms # Throughput deadline for Dispatcher, set to 0 or negative for no deadline mailbox-capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) # If positive then a bounded mailbox is used and the capacity is set using the property # NOTE: setting a mailbox to 'blocking' can be a bit dangerous, could lead to deadlock, use with care # The following are only used for Dispatcher and only if mailbox-capacity > 0 - mailbox-push-timeout-time = 10 # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout - # (in unit defined by the time-unit property) + mailbox-push-timeout-time = 10s # Specifies the timeout to add a new message to a mailbox that is full - negative number means infinite timeout } debug { @@ -152,7 +148,7 @@ akka { max-size = 2147483647 bytes max-items = 2147483647 max-item-size = 2147483647 bytes - max-age = 0 + max-age = 0s max-journal-size = 16 megabytes max-memory-size = 128 megabytes max-journal-overflow = 10 @@ -173,25 +169,25 @@ akka { # Configurable timeouts for certain ops timeout { - read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future - write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future + read = 3000ms # time to wait for a read to succeed before timing out the future + write = 3000ms # time to wait for a write to succeed before timing out the future } } zookeeper { server-addresses = "127.0.0.1:2181" - session-timeout = 60 - connection-timeout = 60 + session-timeout = 60s + connection-timeout = 60s blocking-queue = on } beanstalk { hostname = "127.0.0.1" port = 11300 - reconnect-window = 5 - message-submit-delay = 0 - message-submit-timeout = 5 - message-time-to-live = 120 + reconnect-window = 5s + message-submit-delay = 0s + message-submit-timeout = 5s + message-time-to-live = 120s } } @@ -225,7 +221,7 @@ akka { secure-cookie = "" # Generate your own with '$AKKA_HOME/scripts/generate_config_with_secure_cookie.sh' # or using 'akka.util.Crypt.generateSecureCookie' - remote-daemon-ack-timeout = 30 # Timeout for ACK of cluster operations, lik checking actor out etc. + remote-daemon-ack-timeout = 30s # Timeout for ACK of cluster operations, lik checking actor out etc. use-passive-connections = on # Reuse inbound connections for outbound messages @@ -240,7 +236,7 @@ akka { server { port = 2552 # The default remote server port clients should connect to. Default is 2552 (AKKA) message-frame-size = 1048576 # Increase this if you want to be able to send messages with large payloads - connection-timeout = 120 # Length in time-unit + connection-timeout = 120s # Timeout duration require-cookie = off # Should the remote server require that it peers share the same secure-cookie (defined in the 'remote' section)? untrusted-mode = off # Enable untrusted mode for full security of server managed actors, allows untrusted clients to connect. backlog = 4096 # Sets the size of the connection backlog @@ -252,20 +248,20 @@ akka { capacity = -1 # If negative (or zero) then an unbounded mailbox is used (default) # If positive then a bounded mailbox is used and the capacity is set using the property } - reconnect-delay = 5 - read-timeout = 3600 + reconnect-delay = 5s + read-timeout = 3600s message-frame-size = 1048576 - reap-futures-delay = 5 - reconnection-time-window = 600 # Maximum time window that a client should try to reconnect for + reap-futures-delay = 5s # FIXME: This is not used anywhere (except in ClusterSpec), remove? + reconnection-time-window = 600s # Maximum time window that a client should try to reconnect for } } cluster { name = "test-cluster" zookeeper-server-addresses = "localhost:2181" # comma-separated list of ':' elements - max-time-to-wait-until-connected = 30 - session-timeout = 60 - connection-timeout = 60 + max-time-to-wait-until-connected = 30s + session-timeout = 60s + connection-timeout = 60s include-ref-node-in-replica-set = on # Can a replica be instantiated on the same node as the cluster reference to the actor # Default: on log-directory = "_akka_cluster" # Where ZooKeeper should store the logs and data files @@ -276,15 +272,15 @@ akka { ensemble-size = 3 quorum-size = 2 snapshot-frequency = 1000 # The number of messages that should be logged between every actor snapshot - timeout = 30 # Timeout for asyncronous (write-behind) operations + timeout = 30s # Timeout for asyncronous (write-behind) operations } } # TODO move to testkit-reference test { - timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load - filter-leeway = 3 # time-units EventFilter.intercept waits after the block is finished until all required messages are received - single-expect-default = 3 # time-units to wait in expectMsg and friends outside of within() block by default + timefactor = "1.0" # factor by which to scale timeouts during tests, e.g. to account for shared build system load + filter-leeway = 3s # duration of EventFilter.intercept waits after the block is finished until all required messages are received + single-expect-default = 3s # duration to wait in expectMsg and friends outside of within() block by default } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index 47ae9a9593..c05fd01638 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -15,7 +15,9 @@ import akka.util.ReflectiveAccess import akka.serialization.Serialization import akka.remote.RemoteAddress import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.{ Executors, TimeUnit } +import java.util.concurrent.Executors +import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.TimeUnit.NANOSECONDS import java.io.File import com.typesafe.config.Config import com.typesafe.config.ConfigParseOptions @@ -60,14 +62,12 @@ object ActorSystem { val ProviderClass = getString("akka.actor.provider") - val DefaultTimeUnit = Duration.timeUnit(getString("akka.time-unit")) - val ActorTimeout = Timeout(Duration(getInt("akka.actor.timeout"), DefaultTimeUnit)) - val ActorTimeoutMillis = ActorTimeout.duration.toMillis + val ActorTimeout = Timeout(Duration(getMilliseconds("akka.actor.timeout"), MILLISECONDS)) val SerializeAllMessages = getBoolean("akka.actor.serialize-messages") val TestTimeFactor = getDouble("akka.test.timefactor") - val SingleExpectDefaultTimeout = Duration(getDouble("akka.test.single-expect-default"), DefaultTimeUnit) - val TestEventFilterLeeway = Duration(getDouble("akka.test.filter-leeway"), DefaultTimeUnit) + val SingleExpectDefaultTimeout = Duration(getMilliseconds("akka.test.single-expect-default"), MILLISECONDS) + val TestEventFilterLeeway = Duration(getMilliseconds("akka.test.filter-leeway"), MILLISECONDS) val LogLevel = getString("akka.loglevel") val StdoutLogLevel = getString("akka.stdout-loglevel") @@ -79,10 +79,10 @@ object ActorSystem { val DebugEventStream = getBoolean("akka.actor.debug.event-stream") val DispatcherThroughput = getInt("akka.actor.default-dispatcher.throughput") - val DispatcherDefaultShutdown = Duration(getLong("akka.actor.dispatcher-shutdown-timeout"), DefaultTimeUnit) + val DispatcherDefaultShutdown = Duration(getMilliseconds("akka.actor.dispatcher-shutdown-timeout"), MILLISECONDS) val MailboxCapacity = getInt("akka.actor.default-dispatcher.mailbox-capacity") - val MailboxPushTimeout = Duration(getInt("akka.actor.default-dispatcher.mailbox-push-timeout-time"), DefaultTimeUnit) - val DispatcherThroughputDeadlineTime = Duration(getInt("akka.actor.default-dispatcher.throughput-deadline-time"), DefaultTimeUnit) + val MailboxPushTimeout = Duration(getNanoseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time"), NANOSECONDS) + val DispatcherThroughputDeadlineTime = Duration(getNanoseconds("akka.actor.default-dispatcher.throughput-deadline-time"), NANOSECONDS) val Home = config.getString("akka.home") match { case "" ⇒ None @@ -229,7 +229,7 @@ class ActorSystemImpl(val name: String, config: Config) extends ActorSystem { } // FIXME make this configurable - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, TimeUnit.MILLISECONDS, 512)) + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512)) // TODO correctly pull its config from the config val dispatcherFactory = new Dispatchers(settings, DefaultDispatcherPrerequisites(eventStream, deadLetterMailbox, scheduler)) diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 06ca082f15..7998a514f8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, timeoutMs, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -210,17 +210,18 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, timeoutMs, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) else run() } } } /** - * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms - * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second + * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, + * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or default specified in + * akka-actor-reference.conf */ - protected[akka] def timeoutMs: Long + protected[akka] def shutdownTimeout: Duration /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference @@ -257,10 +258,10 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext // TODO check whether this should not actually be a property of the mailbox protected[akka] def throughput: Int - protected[akka] def throughputDeadlineTime: Int + protected[akka] def throughputDeadlineTime: Duration @inline - protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime > 0 + protected[akka] final val isThroughputDeadlineTimeDefined = throughputDeadlineTime.toMillis > 0 @inline protected[akka] final val isThroughputDefined = throughput > 1 @@ -296,7 +297,7 @@ abstract class MessageDispatcherConfigurator() { val capacity = config.getInt("mailbox-capacity") if (capacity < 1) UnboundedMailbox() else { - val duration = Duration(config.getInt("mailbox-push-timeout-time"), settings.DefaultTimeUnit) + val duration = Duration(config.getNanoseconds("mailbox-push-timeout-time"), TimeUnit.NANOSECONDS) BoundedMailbox(capacity, duration) } } @@ -309,7 +310,7 @@ abstract class MessageDispatcherConfigurator() { //Apply the following options to the config if they are present in the config ThreadPoolConfigDispatcherBuilder(createDispatcher, ThreadPoolConfig()).configure( - conf_?(Some(config getInt "keep-alive-time"))(time ⇒ _.setKeepAliveTime(Duration(time, settings.DefaultTimeUnit))), + conf_?(Some(config getMilliseconds "keep-alive-time"))(time ⇒ _.setKeepAliveTime(Duration(time, TimeUnit.MILLISECONDS))), conf_?(Some(config getDouble "core-pool-size-factor"))(factor ⇒ _.setCorePoolSizeFromFactor(factor)), conf_?(Some(config getDouble "max-pool-size-factor"))(factor ⇒ _.setMaxPoolSizeFromFactor(factor)), conf_?(Some(config getBoolean "allow-core-timeout"))(allow ⇒ _.setAllowCoreThreadTimeout(allow)), diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index b30de4a102..6f45d8629c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -13,6 +13,7 @@ import akka.actor.ActorSystem import akka.event.EventStream import akka.actor.Scheduler import java.util.concurrent.atomic.AtomicBoolean +import akka.util.Duration /** * An executor based event driven dispatcher which will try to redistribute work from busy actors to idle actors. It is assumed @@ -34,11 +35,11 @@ class BalancingDispatcher( _prerequisites: DispatcherPrerequisites, _name: String, throughput: Int, - throughputDeadlineTime: Int, + throughputDeadlineTime: Duration, mailboxType: MailboxType, config: ThreadPoolConfig, - _timeoutMs: Long) - extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _timeoutMs) { + _shutdownTimeout: Duration) + extends Dispatcher(_prerequisites, _name, throughput, throughputDeadlineTime, mailboxType, config, _shutdownTimeout) { val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator) val rebalance = new AtomicBoolean(false) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 5bda68850e..81a432768d 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -11,6 +11,7 @@ import akka.actor.{ ActorCell, ActorKilledException } import akka.actor.ActorSystem import akka.event.EventStream import akka.actor.Scheduler +import akka.util.Duration /** * Default settings are: @@ -67,10 +68,10 @@ class Dispatcher( _prerequisites: DispatcherPrerequisites, val name: String, val throughput: Int, - val throughputDeadlineTime: Int, + val throughputDeadlineTime: Duration, val mailboxType: MailboxType, executorServiceFactoryProvider: ExecutorServiceFactoryProvider, - val timeoutMs: Long) + val shutdownTimeout: Duration) extends MessageDispatcher(_prerequisites) { protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala index f5550802db..29039c46a6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatchers.scala @@ -58,11 +58,9 @@ case class DefaultDispatcherPrerequisites( */ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: DispatcherPrerequisites) { - val ThroughputDeadlineTimeMillis = settings.DispatcherThroughputDeadlineTime.toMillis.toInt val MailboxType: MailboxType = if (settings.MailboxCapacity < 1) UnboundedMailbox() else BoundedMailbox(settings.MailboxCapacity, settings.MailboxPushTimeout) - val DispatcherShutdownMillis = settings.DispatcherDefaultShutdown.toMillis val defaultDispatcherConfig = settings.config.getConfig("akka.actor.default-dispatcher") @@ -77,8 +75,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(actor: LocalActorRef) = actor match { - case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", MailboxType, DispatcherShutdownMillis) - case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, MailboxType, DispatcherShutdownMillis) + case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", MailboxType, settings.DispatcherDefaultShutdown) + case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, MailboxType, settings.DispatcherDefaultShutdown) } /** @@ -88,8 +86,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(actor: LocalActorRef, mailboxType: MailboxType) = actor match { - case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", mailboxType, DispatcherShutdownMillis) - case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, mailboxType, DispatcherShutdownMillis) + case null ⇒ new PinnedDispatcher(prerequisites, null, "anon", mailboxType, settings.DispatcherDefaultShutdown) + case some ⇒ new PinnedDispatcher(prerequisites, some.underlying, some.address, mailboxType, settings.DispatcherDefaultShutdown) } /** @@ -98,7 +96,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(name: String, mailboxType: MailboxType) = - new PinnedDispatcher(prerequisites, null, name, mailboxType, DispatcherShutdownMillis) + new PinnedDispatcher(prerequisites, null, name, mailboxType, settings.DispatcherDefaultShutdown) /** * Creates an thread based dispatcher serving a single actor through the same single thread. @@ -106,7 +104,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc * E.g. each actor consumes its own thread. */ def newPinnedDispatcher(name: String) = - new PinnedDispatcher(prerequisites, null, name, MailboxType, DispatcherShutdownMillis) + new PinnedDispatcher(prerequisites, null, name, MailboxType, settings.DispatcherDefaultShutdown) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -115,7 +113,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc */ def newDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(prerequisites, name, settings.DispatcherThroughput, - ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. @@ -124,16 +122,17 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc */ def newDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(prerequisites, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new Dispatcher(prerequisites, name, throughput, settings.DispatcherThroughputDeadlineTime, mailboxType, + config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ - def newDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + def newDispatcher(name: String, throughput: Int, throughputDeadline: Duration, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new Dispatcher(prerequisites, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new Dispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -142,7 +141,7 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc */ def newBalancingDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new BalancingDispatcher(prerequisites, name, settings.DispatcherThroughput, - ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + settings.DispatcherThroughputDeadlineTime, MailboxType, config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -151,7 +150,8 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc */ def newBalancingDispatcher(name: String, throughput: Int) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(prerequisites, name, throughput, ThroughputDeadlineTimeMillis, MailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new BalancingDispatcher(prerequisites, name, throughput, settings.DispatcherThroughputDeadlineTime, MailboxType, + config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. @@ -160,16 +160,18 @@ class Dispatchers(val settings: ActorSystem.Settings, val prerequisites: Dispatc */ def newBalancingDispatcher(name: String, throughput: Int, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(prerequisites, name, throughput, ThroughputDeadlineTimeMillis, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new BalancingDispatcher(prerequisites, name, throughput, settings.DispatcherThroughputDeadlineTime, mailboxType, + config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Creates a executor-based event-driven dispatcher, with work-stealing, serving multiple (millions) of actors through a thread pool. *

* Has a fluent builder interface for configuring its semantics. */ - def newBalancingDispatcher(name: String, throughput: Int, throughputDeadlineMs: Int, mailboxType: MailboxType) = + def newBalancingDispatcher(name: String, throughput: Int, throughputDeadline: Duration, mailboxType: MailboxType) = ThreadPoolConfigDispatcherBuilder(config ⇒ - new BalancingDispatcher(prerequisites, name, throughput, throughputDeadlineMs, mailboxType, config, DispatcherShutdownMillis), ThreadPoolConfig()) + new BalancingDispatcher(prerequisites, name, throughput, throughputDeadline, mailboxType, + config, settings.DispatcherDefaultShutdown), ThreadPoolConfig()) /** * Utility function that tries to load the specified dispatcher config from the akka.conf * or else use the supplied default dispatcher @@ -238,10 +240,10 @@ class DispatcherConfigurator() extends MessageDispatcherConfigurator() { threadPoolConfig ⇒ new Dispatcher(prerequisites, config.getString("name"), config.getInt("throughput"), - config.getInt("throughput-deadline-time"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType(config, settings), threadPoolConfig, - settings.DispatcherDefaultShutdown.toMillis)).build + settings.DispatcherDefaultShutdown)).build } } @@ -252,9 +254,9 @@ class BalancingDispatcherConfigurator() extends MessageDispatcherConfigurator() threadPoolConfig ⇒ new BalancingDispatcher(prerequisites, config.getString("name"), config.getInt("throughput"), - config.getInt("throughput-deadline-time"), + Duration(config.getNanoseconds("throughput-deadline-time"), TimeUnit.NANOSECONDS), mailboxType(config, settings), threadPoolConfig, - settings.DispatcherDefaultShutdown.toMillis)).build + settings.DispatcherDefaultShutdown)).build } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala index 6f268f7e9a..95fbffe81c 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Mailbox.scala @@ -157,7 +157,7 @@ abstract class Mailbox(val actor: ActorCell) extends AbstractMailbox with Messag if (nextMessage ne null) { //If we have a message if (dispatcher.isThroughputDefined) { //If we're using throughput, we need to do some book-keeping var processedMessages = 0 - val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + TimeUnit.MILLISECONDS.toNanos(dispatcher.throughputDeadlineTime) else 0 + val deadlineNs = if (dispatcher.isThroughputDeadlineTimeDefined) System.nanoTime + dispatcher.throughputDeadlineTime.toNanos else 0 do { if (debug) println(actor.self + " processing message " + nextMessage) actor invoke nextMessage diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index ec99f59e0f..ed0b3cde99 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -9,6 +9,8 @@ import akka.actor.ActorCell import akka.actor.ActorSystem import akka.event.EventStream import akka.actor.Scheduler +import akka.util.Duration +import java.util.concurrent.TimeUnit /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -20,14 +22,14 @@ class PinnedDispatcher( _actor: ActorCell, _name: String, _mailboxType: MailboxType, - _timeoutMs: Long) + _shutdownTimeout: Duration) extends Dispatcher(_prerequisites, _name, Int.MaxValue, - -1, + Duration.Zero, _mailboxType, ThreadPoolConfig(allowCorePoolTimeout = true, corePoolSize = 1, maxPoolSize = 1), - _timeoutMs) { + _shutdownTimeout) { @volatile protected[akka] var owner: ActorCell = _actor diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/akka-beanstalk-mailbox-reference.conf b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/akka-beanstalk-mailbox-reference.conf index 4dce641550..7cf6b29957 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/akka-beanstalk-mailbox-reference.conf +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/resources/akka-beanstalk-mailbox-reference.conf @@ -10,10 +10,10 @@ akka { beanstalk { hostname = "127.0.0.1" port = 11300 - reconnect-window = 5 - message-submit-delay = 0 - message-submit-timeout = 5 - message-time-to-live = 120 + reconnect-window = 5s + message-submit-delay = 0s + message-submit-timeout = 5s + message-time-to-live = 120s } } diff --git a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala index 164dd2e1bd..29696300f9 100644 --- a/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-beanstalk-mailbox/src/main/scala/akka/actor/mailbox/BeanstalkBasedMailbox.scala @@ -5,6 +5,7 @@ package akka.actor.mailbox import com.surftools.BeanstalkClient._ import com.surftools.BeanstalkClientImpl._ +import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException @@ -22,11 +23,10 @@ class BeanstalkBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) val hostname = system.settings.config.getString("akka.actor.mailbox.beanstalk.hostname") val port = system.settings.config.getInt("akka.actor.mailbox.beanstalk.port") - def defaultTimeUnit = system.settings.DefaultTimeUnit - val reconnectWindow = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.reconnect-window"), defaultTimeUnit).toSeconds.toInt - val messageSubmitDelay = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-delay"), defaultTimeUnit).toSeconds.toInt - val messageSubmitTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-submit-timeout"), defaultTimeUnit).toSeconds.toInt - val messageTimeToLive = Duration(system.settings.config.getInt("akka.actor.mailbox.beanstalk.message-time-to-live"), defaultTimeUnit).toSeconds.toInt + val reconnectWindow = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.reconnect-window"), MILLISECONDS).toSeconds.toInt + val messageSubmitDelay = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-delay"), MILLISECONDS).toSeconds.toInt + val messageSubmitTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-submit-timeout"), MILLISECONDS).toSeconds.toInt + val messageTimeToLive = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.beanstalk.message-time-to-live"), MILLISECONDS).toSeconds.toInt val log = Logging(system, "BeanstalkBasedMailbox") diff --git a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala index 8af72aafb4..162bfd9f53 100644 --- a/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala +++ b/akka-durable-mailboxes/akka-file-mailbox/src/main/scala/akka/actor/mailbox/filequeue/PersistentQueue.scala @@ -21,6 +21,8 @@ import java.io._ import scala.collection.mutable import akka.event.LoggingAdapter import com.typesafe.config.Config +import akka.util.Duration +import java.util.concurrent.TimeUnit // a config value that's backed by a global setting but may be locally overridden class OverlaySetting[T](base: ⇒ T) { @@ -131,7 +133,7 @@ class PersistentQueue(persistencePath: String, val name: String, val config: Con maxItems set Some(config.getInt("akka.actor.mailbox.file-based.max-items")) maxSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-size")) maxItemSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-item-size")) - maxAge set Some(config.getInt("akka.actor.mailbox.file-based.max-age")) + maxAge set Some(Duration(config.getMilliseconds("akka.actor.mailbox.file-based.max-age"), TimeUnit.MILLISECONDS).toSeconds.toInt) maxJournalSize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-journal-size")) maxMemorySize set Some(config.getMemorySizeInBytes("akka.actor.mailbox.file-based.max-memory-size")) maxJournalOverflow set Some(config.getInt("akka.actor.mailbox.file-based.max-journal-overflow")) diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/akka-mongo-mailbox-reference.conf b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/akka-mongo-mailbox-reference.conf index 8b5730212b..cd5283599d 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/akka-mongo-mailbox-reference.conf +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/resources/akka-mongo-mailbox-reference.conf @@ -13,8 +13,8 @@ akka { # Configurable timeouts for certain ops timeout { - read = 3000 # number of milliseconds to wait for a read to succeed before timing out the future - write = 3000 # number of milliseconds to wait for a write to succeed before timing out the future + read = 3000ms # time to wait for a read to succeed before timing out the future + write = 3000ms # time to wait for a write to succeed before timing out the future } } } diff --git a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala index f5a4b6312b..9484aafcdc 100644 --- a/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-mongo-mailbox/src/main/scala/akka/actor/mailbox/MongoBasedMailbox.scala @@ -12,6 +12,8 @@ import akka.dispatch.Envelope import akka.event.Logging import akka.dispatch.DefaultPromise import akka.actor.ActorRef +import akka.util.Duration +import java.util.concurrent.TimeUnit class MongoBasedMailboxException(message: String) extends AkkaException(message) @@ -36,8 +38,8 @@ class MongoBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) { val WRITE_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.write" val READ_TIMEOUT_KEY = "akka.actor.mailbox.mongodb.timeout.read" val mongoURI = if (config.hasPath(URI_CONFIG_KEY)) Some(config.getString(URI_CONFIG_KEY)) else None - val writeTimeout = config.getInt(WRITE_TIMEOUT_KEY) - val readTimeout = config.getInt(READ_TIMEOUT_KEY) + val writeTimeout = Duration(config.getMilliseconds(WRITE_TIMEOUT_KEY), TimeUnit.MILLISECONDS) + val readTimeout = Duration(config.getInt(READ_TIMEOUT_KEY), TimeUnit.MILLISECONDS) val log = Logging(system, "MongoBasedMailbox") diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/akka-zookeeper-mailbox-reference.conf b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/akka-zookeeper-mailbox-reference.conf index 07f9ef2639..b745ca23b4 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/akka-zookeeper-mailbox-reference.conf +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/resources/akka-zookeeper-mailbox-reference.conf @@ -9,8 +9,8 @@ akka { mailbox { zookeeper { server-addresses = "127.0.0.1:2181" - session-timeout = 60 - connection-timeout = 60 + session-timeout = 60s + connection-timeout = 60s blocking-queue = on } } diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala index 2cb4f199b2..8ef047c471 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/actor/mailbox/ZooKeeperBasedMailbox.scala @@ -3,6 +3,7 @@ */ package akka.actor.mailbox +import java.util.concurrent.TimeUnit.MILLISECONDS import akka.actor.LocalActorRef import akka.util.Duration import akka.AkkaException @@ -22,9 +23,8 @@ class ZooKeeperBasedMailboxException(message: String) extends AkkaException(mess class ZooKeeperBasedMailbox(val owner: ActorCell) extends DurableMailbox(owner) with DurableMessageSerialization { val zkServerAddresses = system.settings.config.getString("akka.actor.mailbox.zookeeper.server-addresses") - def defaultTimeUnit = system.settings.DefaultTimeUnit - val sessionTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.zookeeper.session-timeout"), defaultTimeUnit).toMillis.toInt - val connectionTimeout = Duration(system.settings.config.getInt("akka.actor.mailbox.zookeeper.connection-timeout"), defaultTimeUnit).toMillis.toInt + val sessionTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.zookeeper.session-timeout"), MILLISECONDS) + val connectionTimeout = Duration(system.settings.config.getMilliseconds("akka.actor.mailbox.zookeeper.connection-timeout"), MILLISECONDS) val blockingQueue = system.settings.config.getBoolean("akka.actor.mailbox.zookeeper.blocking-queue") val queueNode = "/queues" diff --git a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala index fd27d894bf..4dbafdf6f1 100644 --- a/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala +++ b/akka-durable-mailboxes/akka-zookeeper-mailbox/src/main/scala/akka/cluster/zookeeper/AkkaZkClient.scala @@ -6,15 +6,16 @@ package akka.cluster.zookeeper import org.I0Itec.zkclient._ import org.I0Itec.zkclient.serialize._ import org.I0Itec.zkclient.exception._ +import akka.util.Duration /** * ZooKeeper client. Holds the ZooKeeper connection and manages its session. */ class AkkaZkClient(zkServers: String, - sessionTimeout: Int, - connectionTimeout: Int, + sessionTimeout: Duration, + connectionTimeout: Duration, zkSerializer: ZkSerializer = new SerializableSerializer) - extends ZkClient(zkServers, sessionTimeout, connectionTimeout, zkSerializer) { + extends ZkClient(zkServers, sessionTimeout.toMillis.toInt, connectionTimeout.toMillis.toInt, zkSerializer) { def connection: ZkConnection = _connection.asInstanceOf[ZkConnection] diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 0cf805e239..3e2cc979ab 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -20,6 +20,7 @@ import com.eaio.uuid.UUID import akka.serialization.{ JavaSerializer, Serialization, Serializer, Compression } import akka.dispatch.{ Terminate, Dispatchers, Future, PinnedDispatcher } import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.TimeUnit.MILLISECONDS /** * Remote module - contains remote client and server config, remote server instance, remote daemon, remote dispatchers etc. @@ -35,7 +36,7 @@ class Remote(val system: ActorSystemImpl, val nodename: String) { // TODO move to settings? val shouldCompressData = config.getBoolean("akka.remote.use-compression") - val remoteSystemDaemonAckTimeout = Duration(config.getInt("akka.remote.remote-daemon-ack-timeout"), DefaultTimeUnit).toMillis.toInt + val remoteSystemDaemonAckTimeout = Duration(config.getMilliseconds("akka.remote.remote-daemon-ack-timeout"), MILLISECONDS) val failureDetector = new AccrualFailureDetector(system) diff --git a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala index 53c32e5179..999ed72c65 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteConfig.scala @@ -6,22 +6,22 @@ package akka.remote import akka.util.Duration import akka.config.ConfigurationException -import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeUnit.MILLISECONDS import com.typesafe.config.Config -class RemoteClientSettings(config: Config, defaultTimeUnit: TimeUnit) { +class RemoteClientSettings(config: Config) { val SECURE_COOKIE: Option[String] = config.getString("akka.remote.secure-cookie") match { case "" ⇒ None case cookie ⇒ Some(cookie) } - val RECONNECTION_TIME_WINDOW = Duration(config.getInt("akka.remote.client.reconnection-time-window"), defaultTimeUnit).toMillis - val READ_TIMEOUT = Duration(config.getInt("akka.remote.client.read-timeout"), defaultTimeUnit) - val RECONNECT_DELAY = Duration(config.getInt("akka.remote.client.reconnect-delay"), defaultTimeUnit) + val RECONNECTION_TIME_WINDOW = Duration(config.getMilliseconds("akka.remote.client.reconnection-time-window"), MILLISECONDS).toMillis + val READ_TIMEOUT = Duration(config.getMilliseconds("akka.remote.client.read-timeout"), MILLISECONDS) + val RECONNECT_DELAY = Duration(config.getMilliseconds("akka.remote.client.reconnect-delay"), MILLISECONDS) val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.client.message-frame-size") } -class RemoteServerSettings(config: Config, defaultTimeUnit: TimeUnit) { +class RemoteServerSettings(config: Config) { import scala.collection.JavaConverters._ val isRemotingEnabled = config.getStringList("akka.enabled-modules").asScala.exists(_ == "cluster") //TODO FIXME Shouldn't this be "remote"? val MESSAGE_FRAME_SIZE = config.getInt("akka.remote.server.message-frame-size") @@ -40,7 +40,7 @@ class RemoteServerSettings(config: Config, defaultTimeUnit: TimeUnit) { val UNTRUSTED_MODE = config.getBoolean("akka.remote.server.untrusted-mode") val PORT = config.getInt("akka.remote.server.port") - val CONNECTION_TIMEOUT = Duration(config.getInt("akka.remote.server.connection-timeout"), defaultTimeUnit) + val CONNECTION_TIMEOUT = Duration(config.getMilliseconds("akka.remote.server.connection-timeout"), MILLISECONDS) val BACKLOG = config.getInt("akka.remote.server.backlog") } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 38e03a968f..914a1e4a71 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -353,8 +353,8 @@ class ActiveRemoteClientHandler( class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) with RemoteMarshallingOps { val log = Logging(system, "NettyRemoteSupport") - val serverSettings = new RemoteServerSettings(system.settings.config, system.settings.DefaultTimeUnit) - val clientSettings = new RemoteClientSettings(system.settings.config, system.settings.DefaultTimeUnit) + val serverSettings = new RemoteServerSettings(system.settings.config) + val clientSettings = new RemoteClientSettings(system.settings.config) private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock diff --git a/akka-stm/src/main/resources/akka-stm-reference.conf b/akka-stm/src/main/resources/akka-stm-reference.conf index 5843dc24e8..98a3e70d5d 100644 --- a/akka-stm/src/main/resources/akka-stm-reference.conf +++ b/akka-stm/src/main/resources/akka-stm-reference.conf @@ -10,8 +10,7 @@ akka { stm { fair = on # Should global transactions be fair or non-fair (non fair yield better performance) max-retries = 1000 - timeout = 5 # Default timeout for blocking transactions and transaction set (in unit defined by - # the time-unit property) + timeout = 5s # Default timeout for blocking transactions and transaction set write-skew = on blocking-allowed = off interruptible = off diff --git a/akka-stm/src/main/scala/akka/agent/Agent.scala b/akka-stm/src/main/scala/akka/agent/Agent.scala index bd7e899db2..cfe618ce47 100644 --- a/akka-stm/src/main/scala/akka/agent/Agent.scala +++ b/akka-stm/src/main/scala/akka/agent/Agent.scala @@ -151,7 +151,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { def sendOff(f: T ⇒ T): Unit = { send((value: T) ⇒ { suspend() - val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-send-off", UnboundedMailbox(), system.settings.ActorTimeoutMillis) + val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-send-off", UnboundedMailbox(), system.settings.ActorTimeout.duration) val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) threadBased ! Update(f) value @@ -169,7 +169,7 @@ class Agent[T](initialValue: T, system: ActorSystem) { val result = new DefaultPromise[T](timeout)(system.dispatcher) send((value: T) ⇒ { suspend() - val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeoutMillis) + val pinnedDispatcher = new PinnedDispatcher(system.dispatcherFactory.prerequisites, null, "agent-alter-off", UnboundedMailbox(), system.settings.ActorTimeout.duration) val threadBased = system.actorOf(Props(new ThreadBasedAgentUpdater(this)).withDispatcher(pinnedDispatcher)) result completeWith threadBased.?(Update(f), timeout).asInstanceOf[Future[T]] value diff --git a/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala b/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala index 7204f01468..19a4450cf3 100644 --- a/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala +++ b/akka-stm/src/test/scala/akka/stm/test/ConfigSpec.scala @@ -30,7 +30,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec getString("akka.stm.propagation") must equal("requires") getBoolean("akka.stm.quick-release") must equal(true) getBoolean("akka.stm.speculative") must equal(true) - getLong("akka.stm.timeout") must equal(5) + getMilliseconds("akka.stm.timeout") must equal(5 * 1000) getString("akka.stm.trace-level") must equal("none") getBoolean("akka.stm.write-skew") must equal(true) } diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index fe3b406575..382c25523d 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -14,6 +14,8 @@ import akka.actor.{ ActorCell, ActorRef, ActorSystem } import akka.dispatch._ import akka.actor.Scheduler import akka.event.EventStream +import akka.util.Duration +import java.util.concurrent.TimeUnit /* * Locking rules: @@ -122,10 +124,10 @@ class CallingThreadDispatcher( protected[akka] override def shutdown() {} protected[akka] override def throughput = 0 - protected[akka] override def throughputDeadlineTime = 0 + protected[akka] override def throughputDeadlineTime = Duration.Zero protected[akka] override def registerForExecution(mbox: Mailbox, hasMessageHint: Boolean, hasSystemMessageHint: Boolean): Boolean = false - protected[akka] override def timeoutMs = 100L + protected[akka] override def shutdownTimeout = Duration(100L, TimeUnit.MILLISECONDS) override def suspend(actor: ActorCell) { getMailbox(actor) foreach (_.suspendSwitch.switchOn)