diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index f1d2ff5655..c9a8a60138 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -2,10 +2,10 @@ package akka.actor import org.scalatest.BeforeAndAfterEach import org.multiverse.api.latches.StandardLatch -import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec import akka.testkit.EventFilter -import akka.util.Duration +import akka.util.duration._ +import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { @@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectCancellable(system.scheduler.schedule(tickActor, Tick, Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -50,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, Duration(50, TimeUnit.MILLISECONDS))) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), Duration(50, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50 milliseconds)) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -87,7 +87,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, Duration(1, TimeUnit.SECONDS))) + val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) timeout.cancel() } @@ -115,10 +115,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, Duration(500, TimeUnit.MILLISECONDS), Duration(500, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, Duration(1000, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) @@ -142,11 +142,32 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 300).foreach { i ⇒ - collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), Duration(10, TimeUnit.MILLISECONDS))) + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), 10 milliseconds)) Thread.sleep(5) } assert(ticks.await(3, TimeUnit.SECONDS) == true) } + + "schedule with different initial delay and frequency" in { + val ticks = new CountDownLatch(3) + + case object Msg + + val actor = actorOf(new Actor { + def receive = { + case Msg ⇒ ticks.countDown() + } + }) + + val startTime = System.nanoTime() + val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) + ticks.await(3, TimeUnit.SECONDS) + val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 + + assert(elapsedTimeMs > 1200) + assert(elapsedTimeMs < 1500) // the precision is not ms exact + cancellable.cancel() + } } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index 17ed0112f0..8744e8a6d9 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -5,12 +5,10 @@ package akka.config import akka.testkit.AkkaSpec -import akka.actor.ActorSystem -import java.io.File import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import scala.collection.JavaConverters._ -import java.util.concurrent.TimeUnit +import akka.util.duration._ import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -37,7 +35,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) - settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS)) + settings.DispatcherDefaultShutdown must equal(1 second) getInt("akka.actor.default-dispatcher.throughput") must equal(5) settings.DispatcherThroughput must equal(5) getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 49fec62639..7af8f057d8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,14 +1,8 @@ package akka.dispatch -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith -import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue } -import java.util.{ Queue } +import java.util.concurrent.{ TimeUnit, BlockingQueue } import akka.util._ -import akka.util.Duration._ -import akka.actor.{ LocalActorRef, Actor } +import akka.util.duration._ import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -23,7 +17,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val q = factory(config) ensureInitialMailboxState(config, q) - implicit val within = Duration(1, TimeUnit.SECONDS) + implicit val within = 1 second val f = spawn { q.dequeue @@ -33,7 +27,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } "create a bounded mailbox with 10 capacity and with push timeout" in { - val config = BoundedMailbox(10, Duration(10, TimeUnit.MILLISECONDS)) + val config = BoundedMailbox(10, 10 milliseconds) val q = factory(config) ensureInitialMailboxState(config, q) @@ -58,11 +52,11 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } "dequeue what was enqueued properly for bounded mailboxes" in { - testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond)) } "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { - testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds)) } } @@ -97,7 +91,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } def testEnqueueDequeue(config: MailboxType) { - implicit val within = Duration(10, TimeUnit.SECONDS) + implicit val within = 10 seconds val q = factory(config) ensureInitialMailboxState(config, q) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 75aef7708f..a49e837ac4 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -1,12 +1,11 @@ package akka.performance.microbench import akka.performance.workbench.PerformanceSpec -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics import akka.actor._ import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } import akka.dispatch._ -import java.util.concurrent.ThreadPoolExecutor.AbortPolicy import akka.util.Duration +import akka.util.duration._ // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -15,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients) .build diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index 9bc6fd77ce..b82ab3e4b3 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -218,9 +218,14 @@ akka { # Changing the default values may change the system behavior drastically so make sure you know what you're doing! # scheduler { - # The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system. - # See http://www.jboss.org/netty/ - tickDuration = 100ms # tick duration in milliseconds (should always be defined in milliseconds) + # The HashedWheelTimer (HWT) implementation from Jetty is used as the default scheduler in the system. + # + # HWT does not execute the scheduled tasks on exact time. + # It will, on every tick, check if there are any tasks behind the schedule and execute them. + # You can increase or decrease the accuracy of the execution timing by specifying smaller or larger tick duration. + # If you are scheduling a lot of tasks you should consider increasing the ticks per wheel. + # For more information see: http://www.jboss.org/netty/ + tickDuration = 100ms ticksPerWheel = 512 } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 2b93450a44..a69c08c4e2 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -411,6 +411,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } + private def createSingleTask(f: () ⇒ Unit): TimerTask = + new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { @@ -420,9 +423,6 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { } } - private def createSingleTask(f: () ⇒ Unit): TimerTask = - new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } - private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index d94bf45707..76495843fc 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -34,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, Duration(timeout.length, timeout.unit), Duration(timeout.length, timeout.unit))) + ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, Duration(timeout.length, timeout.unit))) + ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) } } @@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), Duration(t.length, t.unit))) + timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 3263f7a5c9..52a63f1730 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -15,14 +15,44 @@ package akka.actor import akka.util.Duration trait Scheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable + /** + * Schedules a message to be sent repeatedly with an initial delay and frequency. + * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set + * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) + */ + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable + + /** + * Schedules a function to be run repeatedly with an initial delay and a frequency. + * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set + * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) + */ + def schedule(f: () ⇒ Unit, initialDelay: Duration, frequency: Duration): Cancellable + + /** + * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. + */ def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable + + /** + * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. + */ def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable + + /** + * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. + */ def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable } trait Cancellable { + /** + * Cancels the underlying scheduled task. + */ def cancel(): Unit + + /** + * Checks if the underlying scheduled task has been cancelled. + */ def isCancelled: Boolean } \ No newline at end of file