diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index b7c0ba6c8a..d440968d33 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -2,9 +2,10 @@ package akka.actor import org.scalatest.BeforeAndAfterEach import org.multiverse.api.latches.StandardLatch -import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec import akka.testkit.EventFilter +import akka.util.duration._ +import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { @@ -28,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectCancellable(system.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -49,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) // run every 50 millisec - collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) - collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds)) + collectCancellable(system.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50 milliseconds)) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -58,22 +59,6 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { assert(countDownLatch.getCount == 1) } - /** - * ticket #372 - * FIXME rewrite the test so that registry is not used - */ - // "not create actors" in { - // object Ping - // val ticks = new CountDownLatch(1000) - // val actor = actorOf(new Actor { - // def receive = { case Ping ⇒ ticks.countDown } - // }) - // val numActors = system.registry.local.actors.length - // (1 to 1000).foreach(_ ⇒ collectFuture(Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))) - // assert(ticks.await(10, TimeUnit.SECONDS)) - // assert(system.registry.local.actors.length === numActors) - // } - /** * ticket #372 */ @@ -86,7 +71,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) + val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second)) timeout.cancel() } @@ -114,15 +99,59 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectCancellable(system.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds)) // appx 2 pings before crash EventFilter[Exception]("CRASH", occurrences = 1) intercept { - collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) + collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds)) } assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) // should be enough time for the ping countdown to recover and reach 6 pings assert(pingLatch.await(4, TimeUnit.SECONDS)) } + + "never fire prematurely" in { + val ticks = new CountDownLatch(300) + + case class Msg(ts: Long) + + val actor = actorOf(new Actor { + def receive = { + case Msg(ts) ⇒ + val now = System.currentTimeMillis + // Make sure that no message has been dispatched before the scheduled time (10ms) has occurred + if (now - ts < 10) throw new RuntimeException("Interval is too small: " + (now - ts)) + ticks.countDown() + } + }) + + (1 to 300).foreach { i ⇒ + collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.currentTimeMillis), 10 milliseconds)) + Thread.sleep(5) + } + + assert(ticks.await(3, TimeUnit.SECONDS) == true) + } + + "schedule with different initial delay and frequency" in { + val ticks = new CountDownLatch(3) + + case object Msg + + val actor = actorOf(new Actor { + def receive = { + case Msg ⇒ ticks.countDown() + } + }) + + val startTime = System.nanoTime() + val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds) + ticks.await(3, TimeUnit.SECONDS) + val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 + + assert(elapsedTimeMs > 1200) + assert(elapsedTimeMs < 1500) // the precision is not ms exact + cancellable.cancel() + } } } diff --git a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala index c58a24be7c..b594ea0f19 100644 --- a/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/config/ConfigSpec.scala @@ -5,12 +5,10 @@ package akka.config import akka.testkit.AkkaSpec -import akka.actor.ActorSystem -import java.io.File import com.typesafe.config.ConfigFactory import com.typesafe.config.ConfigParseOptions import scala.collection.JavaConverters._ -import java.util.concurrent.TimeUnit +import akka.util.duration._ import akka.util.Duration @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -37,7 +35,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1) getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000) getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000) - settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS)) + settings.DispatcherDefaultShutdown must equal(1 second) getInt("akka.actor.default-dispatcher.throughput") must equal(5) settings.DispatcherThroughput must equal(5) getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0) diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala index 49fec62639..7af8f057d8 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/MailboxConfigSpec.scala @@ -1,14 +1,8 @@ package akka.dispatch -import org.scalatest.WordSpec -import org.scalatest.matchers.MustMatchers import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach } -import org.scalatest.junit.JUnitRunner -import org.junit.runner.RunWith -import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue } -import java.util.{ Queue } +import java.util.concurrent.{ TimeUnit, BlockingQueue } import akka.util._ -import akka.util.Duration._ -import akka.actor.{ LocalActorRef, Actor } +import akka.util.duration._ import akka.testkit.AkkaSpec @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -23,7 +17,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn val q = factory(config) ensureInitialMailboxState(config, q) - implicit val within = Duration(1, TimeUnit.SECONDS) + implicit val within = 1 second val f = spawn { q.dequeue @@ -33,7 +27,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } "create a bounded mailbox with 10 capacity and with push timeout" in { - val config = BoundedMailbox(10, Duration(10, TimeUnit.MILLISECONDS)) + val config = BoundedMailbox(10, 10 milliseconds) val q = factory(config) ensureInitialMailboxState(config, q) @@ -58,11 +52,11 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } "dequeue what was enqueued properly for bounded mailboxes" in { - testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond)) } "dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in { - testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS))) + testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds)) } } @@ -97,7 +91,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn } def testEnqueueDequeue(config: MailboxType) { - implicit val within = Duration(10, TimeUnit.SECONDS) + implicit val within = 10 seconds val q = factory(config) ensureInitialMailboxState(config, q) diff --git a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala index 75aef7708f..a49e837ac4 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/microbench/TellThroughputPerformanceSpec.scala @@ -1,12 +1,11 @@ package akka.performance.microbench import akka.performance.workbench.PerformanceSpec -import org.apache.commons.math.stat.descriptive.DescriptiveStatistics import akka.actor._ import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit } import akka.dispatch._ -import java.util.concurrent.ThreadPoolExecutor.AbortPolicy import akka.util.Duration +import akka.util.duration._ // -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500 @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) @@ -15,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec { def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config ⇒ new Dispatcher(system.dispatcherFactory.prerequisites, name, 5, - Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig()) + Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig()) .withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity .setCorePoolSize(maxClients) .build diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java index 328d2dc39f..5dc9a064f6 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/HashedWheelTimer.java @@ -16,6 +16,7 @@ package org.jboss.netty.akka.util; import akka.event.LoggingAdapter; +import akka.util.Duration; import org.jboss.netty.akka.util.internal.ConcurrentIdentityHashMap; import org.jboss.netty.akka.util.internal.ReusableIterator; @@ -96,24 +97,24 @@ public class HashedWheelTimer implements Timer { * @param threadFactory a {@link java.util.concurrent.ThreadFactory} that creates a * background {@link Thread} which is dedicated to * {@link TimerTask} execution. - * @param tickDuration the duration between tick - * @param unit the time unit of the {@code tickDuration} + * @param duration the duration between ticks * @param ticksPerWheel the size of the wheel */ public HashedWheelTimer( LoggingAdapter logger, ThreadFactory threadFactory, - long tickDuration, TimeUnit unit, int ticksPerWheel) { + Duration duration, + int ticksPerWheel) { if (threadFactory == null) { throw new NullPointerException("threadFactory"); } - if (unit == null) { - throw new NullPointerException("unit"); + if (duration == null) { + throw new NullPointerException("duration"); } - if (tickDuration <= 0) { + if (duration.toNanos() <= 0) { throw new IllegalArgumentException( - "tickDuration must be greater than 0: " + tickDuration); + "duration must be greater than 0 ns: " + duration.toNanos()); } if (ticksPerWheel <= 0) { throw new IllegalArgumentException( @@ -128,14 +129,14 @@ public class HashedWheelTimer implements Timer { mask = wheel.length - 1; // Convert tickDuration to milliseconds. - this.tickDuration = tickDuration = unit.toMillis(tickDuration); + this.tickDuration = duration.toMillis(); // Prevent overflow. if (tickDuration == Long.MAX_VALUE || tickDuration >= Long.MAX_VALUE / wheel.length) { throw new IllegalArgumentException( "tickDuration is too long: " + - tickDuration + ' ' + unit); + tickDuration + ' ' + duration.unit()); } roundDuration = tickDuration * wheel.length; @@ -231,23 +232,22 @@ public class HashedWheelTimer implements Timer { return Collections.unmodifiableSet(unprocessedTimeouts); } - public Timeout newTimeout(TimerTask task, long delay, TimeUnit unit) { + public Timeout newTimeout(TimerTask task, Duration delay) { final long currentTime = System.currentTimeMillis(); if (task == null) { throw new NullPointerException("task"); } - if (unit == null) { - throw new NullPointerException("unit"); + if (delay == null) { + throw new NullPointerException("delay"); } if (!workerThread.isAlive()) { start(); } - delay = unit.toMillis(delay); - HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay); - scheduleTimeout(timeout, delay); + HashedWheelTimeout timeout = new HashedWheelTimeout(task, currentTime + delay.toMillis()); + scheduleTimeout(timeout, delay.toMillis()); return timeout; } diff --git a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java index 43ddec9604..b5bd8c6a7c 100644 --- a/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java +++ b/akka-actor/src/main/java/org/jboss/netty/akka/util/Timer.java @@ -15,6 +15,7 @@ */ package org.jboss.netty.akka.util; +import akka.util.Duration; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -41,7 +42,7 @@ public interface Timer { * @throws IllegalStateException if this timer has been * {@linkplain #stop() stopped} already */ - Timeout newTimeout(TimerTask task, long delay, TimeUnit unit); + Timeout newTimeout(TimerTask task, Duration delay); /** * Releases all resources acquired by this {@link org.jboss.netty.akka.util.Timer} and cancels all diff --git a/akka-actor/src/main/resources/akka-actor-reference.conf b/akka-actor/src/main/resources/akka-actor-reference.conf index d8c2746fcf..a4b74ce474 100644 --- a/akka-actor/src/main/resources/akka-actor-reference.conf +++ b/akka-actor/src/main/resources/akka-actor-reference.conf @@ -123,5 +123,20 @@ akka { } + # Used to set the behavior of the scheduler. + # Changing the default values may change the system behavior drastically so make sure you know what you're doing! + # + scheduler { + # The HashedWheelTimer (HWT) implementation from Netty is used as the default scheduler in the system. + # + # HWT does not execute the scheduled tasks on exact time. + # It will, on every tick, check if there are any tasks behind the schedule and execute them. + # You can increase or decrease the accuracy of the execution timing by specifying smaller or larger tick duration. + # If you are scheduling a lot of tasks you should consider increasing the ticks per wheel. + # For more information see: http://www.jboss.org/netty/ + tickDuration = 100ms + ticksPerWheel = 512 + } + } diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 62066cea00..330824290f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -9,7 +9,7 @@ import scala.annotation.tailrec import scala.collection.immutable.{ Stack, TreeMap } import java.util.concurrent.TimeUnit import akka.event.Logging.{ Debug, Warning, Error } -import akka.util.Helpers +import akka.util.{ Duration, Helpers } /** * The actor context - the view of the actor cell from the actor. @@ -400,7 +400,7 @@ private[akka] class ActorCell( if (recvtimeout._1 > 0 && dispatcher.mailboxIsEmpty(this)) { recvtimeout._2.cancel() //Cancel any ongoing future //Only reschedule if desired and there are currently no more messages to be processed - receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, recvtimeout._1, TimeUnit.MILLISECONDS)) + receiveTimeoutData = (recvtimeout._1, system.scheduler.scheduleOnce(self, ReceiveTimeout, Duration(recvtimeout._1, TimeUnit.MILLISECONDS))) } else cancelReceiveTimeout() } diff --git a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala index 837739cddb..28fc383adf 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorRefProvider.scala @@ -15,7 +15,7 @@ import akka.event.{ Logging, DeathWatch, ActorClassification, EventStream } import akka.routing.{ ScatterGatherFirstCompletedRouter, Routing, RouterType, Router, RoutedProps, RoutedActorRef, RoundRobinRouter, RandomRouter, LocalConnectionManager, DirectRouter } import akka.AkkaException import com.eaio.uuid.UUID -import akka.util.{ Switch, Helpers } +import akka.util.{ Duration, Switch, Helpers } import akka.remote.RemoteAddress import akka.remote.LocalOnly @@ -394,20 +394,20 @@ class LocalDeathWatch extends DeathWatch with ActorClassification { class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay), initialDelay)) - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay), initialDelay)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) + def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay)) - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay)) - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable = - new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay)) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } @@ -415,23 +415,23 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { + private def createSingleTask(f: () ⇒ Unit): TimerTask = + new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } + + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message - timeout.getTimer.newTimeout(this, delay, timeUnit) + timeout.getTimer.newTimeout(this, delay) } } } - private def createSingleTask(f: () ⇒ Unit): TimerTask = - new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } } - - private def createContinuousTask(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): TimerTask = { + private def createContinuousTask(f: () ⇒ Unit, delay: Duration): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() - timeout.getTimer.newTimeout(this, delay, timeUnit) + timeout.getTimer.newTimeout(this, delay) } } } diff --git a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala index d15d237629..1bf1ea2bd1 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorSystem.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorSystem.scala @@ -13,7 +13,6 @@ import com.eaio.uuid.UUID import akka.serialization.Serialization import akka.remote.RemoteAddress import org.jboss.netty.akka.util.HashedWheelTimer -import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.SECONDS import java.util.concurrent.TimeUnit.MILLISECONDS import java.util.concurrent.TimeUnit.NANOSECONDS @@ -23,10 +22,11 @@ import com.typesafe.config.ConfigParseOptions import com.typesafe.config.ConfigRoot import com.typesafe.config.ConfigFactory import java.lang.reflect.InvocationTargetException -import java.util.concurrent.ConcurrentHashMap import akka.util.{ Helpers, Duration, ReflectiveAccess } import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.CountDownLatch +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.Executors import scala.annotation.tailrec import akka.serialization.SerializationExtension @@ -94,6 +94,9 @@ object ActorSystem { val EnabledModules: Seq[String] = getStringList("akka.enabled-modules").asScala + val SchedulerTickDuration = Duration(getMilliseconds("akka.scheduler.tickDuration"), MILLISECONDS) + val SchedulerTicksPerWheel = getInt("akka.scheduler.ticksPerWheel") + if (ConfigVersion != Version) throw new ConfigurationException("Akka JAR version [" + Version + "] does not match the provided config version [" + ConfigVersion + "]") @@ -297,8 +300,7 @@ class ActorSystemImpl(val name: String, val applicationConfig: Config) extends A eventStream.startStdoutLogger(settings) val log = new BusLogging(eventStream, "ActorSystem") // “this” used only for .getClass in tagging messages - // FIXME make this configurable - val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, 100, MILLISECONDS, 512)) + val scheduler = new DefaultScheduler(new HashedWheelTimer(log, Executors.defaultThreadFactory, settings.SchedulerTickDuration, settings.SchedulerTicksPerWheel)) val provider: ActorRefProvider = { val providerClass = ReflectiveAccess.getClassFor(ProviderClass) match { diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index cfd9d511ed..76495843fc 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -7,6 +7,7 @@ import akka.util._ import scala.collection.mutable import akka.event.Logging +import akka.util.Duration._ object FSM { @@ -33,9 +34,9 @@ object FSM { def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { - ref = Some(system.scheduler.schedule(actor, this, timeout.length, timeout.length, timeout.unit)) + ref = Some(system.scheduler.schedule(actor, this, timeout, timeout)) } else { - ref = Some(system.scheduler.scheduleOnce(actor, this, timeout.length, timeout.unit)) + ref = Some(system.scheduler.scheduleOnce(actor, this, timeout)) } } @@ -522,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement { if (timeout.isDefined) { val t = timeout.get if (t.finite_? && t.length >= 0) { - timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t.length, t.unit)) + timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t)) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index d12dcb6329..52a63f1730 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -9,46 +9,50 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. - * - * Rework of David Pollak's ActorPing class in the Lift Project - * which is licensed under the Apache 2 License. */ package akka.actor -import java.util.concurrent._ import akka.util.Duration -import akka.AkkaException -case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) { - def this(msg: String) = this(msg, null) -} +trait Scheduler { + /** + * Schedules a message to be sent repeatedly with an initial delay and frequency. + * E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set + * delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS) + */ + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable -trait JScheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable -} + /** + * Schedules a function to be run repeatedly with an initial delay and a frequency. + * E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set + * delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS) + */ + def schedule(f: () ⇒ Unit, initialDelay: Duration, frequency: Duration): Cancellable -abstract class Scheduler extends JScheduler { - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable + /** + * Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed. + */ + def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable + /** + * Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent. + */ + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = - schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = - schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = - scheduleOnce(receiver, message, delay.length, delay.unit) - - def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = - scheduleOnce(f, delay.length, delay.unit) + /** + * Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run. + */ + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable } trait Cancellable { + /** + * Cancels the underlying scheduled task. + */ def cancel(): Unit + /** + * Checks if the underlying scheduled task has been cancelled. + */ def isCancelled: Boolean } \ No newline at end of file diff --git a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala index 7998a514f8..208eae51ca 100644 --- a/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/AbstractDispatcher.scala @@ -135,7 +135,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext shutdownScheduleUpdater.get(this) match { case UNSCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(this, UNSCHEDULED, SCHEDULED)) { - scheduler.scheduleOnce(shutdownAction, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(shutdownAction, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) () } else ifSensibleToDoSoThenScheduleShutdown() case SCHEDULED ⇒ @@ -210,7 +210,7 @@ abstract class MessageDispatcher(val prerequisites: DispatcherPrerequisites) ext } case RESCHEDULED ⇒ if (shutdownScheduleUpdater.compareAndSet(MessageDispatcher.this, RESCHEDULED, SCHEDULED)) - scheduler.scheduleOnce(this, shutdownTimeout.toMillis, TimeUnit.MILLISECONDS) + scheduler.scheduleOnce(this, Duration(shutdownTimeout.toMillis, TimeUnit.MILLISECONDS)) else run() } } diff --git a/akka-actor/src/main/scala/akka/dispatch/Future.scala b/akka-actor/src/main/scala/akka/dispatch/Future.scala index b57ff39512..e9a3035ea8 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Future.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Future.scala @@ -956,12 +956,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) else func(DefaultPromise.this) } } } - val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + val timeoutFuture = dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) onComplete(_ ⇒ timeoutFuture.cancel()) false } else true @@ -983,12 +983,12 @@ class DefaultPromise[T](val timeout: Timeout)(implicit val dispatcher: MessageDi val runnable = new Runnable { def run() { if (!isCompleted) { - if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, timeLeftNoinline(), NANOS) + if (!isExpired) dispatcher.prerequisites.scheduler.scheduleOnce(this, Duration(timeLeftNoinline(), TimeUnit.NANOSECONDS)) else promise complete (try { Right(fallback) } catch { case e ⇒ Left(e) }) } } } - dispatcher.prerequisites.scheduler.scheduleOnce(runnable, timeLeft(), NANOS) + dispatcher.prerequisites.scheduler.scheduleOnce(runnable, Duration(timeLeft(), TimeUnit.NANOSECONDS)) promise } } else this diff --git a/akka-remote/src/main/scala/akka/remote/Gossiper.scala b/akka-remote/src/main/scala/akka/remote/Gossiper.scala index 5fca79753f..3735f6ceaf 100644 --- a/akka-remote/src/main/scala/akka/remote/Gossiper.scala +++ b/akka-remote/src/main/scala/akka/remote/Gossiper.scala @@ -8,6 +8,7 @@ import akka.actor._ import akka.actor.Status._ import akka.event.Logging import akka.util.duration._ +import akka.util.Duration import akka.remote.RemoteProtocol._ import akka.remote.RemoteProtocol.RemoteSystemDaemonMessageType._ import java.util.concurrent.atomic.AtomicReference @@ -122,8 +123,8 @@ class Gossiper(remote: Remote) { { // start periodic gossip and cluster scrutinization - default is run them every second with 1/2 second in between - system.scheduler schedule (() ⇒ initateGossip(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) - system.scheduler schedule (() ⇒ scrutinize(), initalDelayForGossip.toSeconds, gossipFrequency.toSeconds, timeUnit) + system.scheduler schedule (() ⇒ initateGossip(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) + system.scheduler schedule (() ⇒ scrutinize(), Duration(initalDelayForGossip.toSeconds, timeUnit), Duration(gossipFrequency.toSeconds, timeUnit)) } /** diff --git a/akka-remote/src/main/scala/akka/remote/Remote.scala b/akka-remote/src/main/scala/akka/remote/Remote.scala index 18df5d1bfc..123304c314 100644 --- a/akka-remote/src/main/scala/akka/remote/Remote.scala +++ b/akka-remote/src/main/scala/akka/remote/Remote.scala @@ -281,6 +281,8 @@ trait RemoteMarshallingOps { def system: ActorSystem + protected def useUntrustedMode: Boolean + def createMessageSendEnvelope(rmp: RemoteMessageProtocol): AkkaRemoteProtocol = { val arp = AkkaRemoteProtocol.newBuilder arp.setMessage(rmp) @@ -323,15 +325,15 @@ trait RemoteMarshallingOps { messageBuilder } - def receiveMessage(remoteMessage: RemoteMessage, untrustedMode: Boolean) { + def receiveMessage(remoteMessage: RemoteMessage) { val recipient = remoteMessage.recipient remoteMessage.payload match { case Left(t) ⇒ throw t case Right(r) ⇒ r match { - case _: Terminate ⇒ if (untrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() - case _: AutoReceivedMessage if (untrustedMode) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") - case m ⇒ recipient.!(m)(remoteMessage.sender) + case _: Terminate ⇒ if (useUntrustedMode) throw new SecurityException("RemoteModule server is operating is untrusted mode, can not stop the actor") else recipient.stop() + case _: AutoReceivedMessage if (useUntrustedMode) ⇒ throw new SecurityException("RemoteModule server is operating is untrusted mode, can not pass on a AutoReceivedMessage to the remote actor") + case m ⇒ recipient.!(m)(remoteMessage.sender) } } } diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index 4536fa3d09..eedf58fd9e 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -95,8 +95,8 @@ abstract class RemoteClient private[akka] ( } class PassiveRemoteClient(val currentChannel: Channel, - remoteSupport: NettyRemoteSupport, - remoteAddress: RemoteAddress) + remoteSupport: NettyRemoteSupport, + remoteAddress: RemoteAddress) extends RemoteClient(remoteSupport, remoteAddress) { def connect(reconnectIfAlreadyConnected: Boolean = false): Boolean = runSwitch switchOn { @@ -297,7 +297,7 @@ class ActiveRemoteClientHandler( } case arp: AkkaRemoteProtocol if arp.hasMessage ⇒ - client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader), untrustedMode = false) //TODO FIXME Sensible or not? + client.remoteSupport.receiveMessage(new RemoteMessage(arp.getMessage, client.remoteSupport, client.loader)) case other ⇒ throw new RemoteClientException("Unknown message received in remote client handler: " + other, client.remoteSupport, client.remoteAddress) @@ -364,11 +364,15 @@ class NettyRemoteSupport(_system: ActorSystem) extends RemoteSupport(_system) wi private val remoteClients = new HashMap[RemoteAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock - protected[akka] def send(message: Any, - senderOption: Option[ActorRef], - recipientAddress: RemoteAddress, - recipient: ActorRef, - loader: Option[ClassLoader]): Unit = { + override protected def useUntrustedMode = serverSettings.UntrustedMode + + protected[akka] def send( + message: Any, + senderOption: Option[ActorRef], + recipientAddress: RemoteAddress, + recipient: ActorRef, + loader: Option[ClassLoader]): Unit = { + clientsLock.readLock.lock try { val client = remoteClients.get(recipientAddress) match { @@ -634,7 +638,7 @@ class RemoteServerHandler( override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) = try { event.getMessage match { case remote: AkkaRemoteProtocol if remote.hasMessage ⇒ - remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader), UntrustedMode) + remoteSupport.receiveMessage(new RemoteMessage(remote.getMessage, remoteSupport, applicationLoader)) case remote: AkkaRemoteProtocol if remote.hasInstruction ⇒ val instruction = remote.getInstruction diff --git a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala index 1c86015df4..2c23940c9f 100644 --- a/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala +++ b/akka-samples/akka-sample-fsm/src/main/scala/DiningHakkersOnBecome.scala @@ -4,7 +4,7 @@ package sample.fsm.dining.become //http://www.dalnefre.com/wp/2010/08/dining-philosophers-in-humus/ import akka.actor.{ ActorRef, Actor, ActorSystem } -import java.util.concurrent.TimeUnit +import akka.util.duration._ /* * First we define our messages, they basically speak for themselves @@ -77,7 +77,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Taken(`chopstickToWaitFor`) ⇒ println("%s has picked up %s and %s, and starts to eat", name, left.address, right.address) become(eating) - system.scheduler.scheduleOnce(self, Think, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Think, 5 seconds) case Busy(chopstick) ⇒ become(thinking) @@ -106,7 +106,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { left ! Put(self) right ! Put(self) println("%s puts down his chopsticks and starts to think", name) - system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Eat, 5 seconds) } //All hakkers start in a non-eating state @@ -114,7 +114,7 @@ class Hakker(name: String, left: ActorRef, right: ActorRef) extends Actor { case Think ⇒ println("%s starts to think", name) become(thinking) - system.scheduler.scheduleOnce(self, Eat, 5, TimeUnit.SECONDS) + system.scheduler.scheduleOnce(self, Eat, 5 seconds) } }