Updated the scheduler implementation after feedback; changed Duration(x, timeunit) to more fluent 'x timeunit' and added ScalaDoc

This commit is contained in:
Henrik Engstrom 2011-11-23 15:15:44 +01:00
parent 7ca5a4161b
commit e2ad1088b0
8 changed files with 88 additions and 41 deletions

View file

@ -2,10 +2,10 @@ package akka.actor
import org.scalatest.BeforeAndAfterEach
import org.multiverse.api.latches.StandardLatch
import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit }
import akka.testkit.AkkaSpec
import akka.testkit.EventFilter
import akka.util.Duration
import akka.util.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
@ -29,14 +29,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
def receive = { case Tick countDownLatch.countDown() }
})
// run every 50 millisec
collectCancellable(system.scheduler.schedule(tickActor, Tick, Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.schedule(tickActor, Tick, 0 milliseconds, 50 milliseconds))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(1, TimeUnit.SECONDS))
val countDownLatch2 = new CountDownLatch(3)
collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), Duration(0, TimeUnit.MILLISECONDS), Duration(50, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.schedule(() countDownLatch2.countDown(), 0 milliseconds, 50 milliseconds))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS))
@ -50,8 +50,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
// run every 50 millisec
collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, Duration(50, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), Duration(50, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.scheduleOnce(tickActor, Tick, 50 milliseconds))
collectCancellable(system.scheduler.scheduleOnce(() countDownLatch.countDown(), 50 milliseconds))
// after 1 second the wait should fail
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
@ -87,7 +87,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
(1 to 10).foreach { i
val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, Duration(1, TimeUnit.SECONDS)))
val timeout = collectCancellable(system.scheduler.scheduleOnce(actor, Ping, 1 second))
timeout.cancel()
}
@ -115,10 +115,10 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
val actor = (supervisor ? props).as[ActorRef].get
collectCancellable(system.scheduler.schedule(actor, Ping, Duration(500, TimeUnit.MILLISECONDS), Duration(500, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.schedule(actor, Ping, 500 milliseconds, 500 milliseconds))
// appx 2 pings before crash
EventFilter[Exception]("CRASH", occurrences = 1) intercept {
collectCancellable(system.scheduler.scheduleOnce(actor, Crash, Duration(1000, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.scheduleOnce(actor, Crash, 1000 milliseconds))
}
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
@ -142,11 +142,32 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach {
})
(1 to 300).foreach { i
collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), Duration(10, TimeUnit.MILLISECONDS)))
collectCancellable(system.scheduler.scheduleOnce(actor, Msg(System.nanoTime()), 10 milliseconds))
Thread.sleep(5)
}
assert(ticks.await(3, TimeUnit.SECONDS) == true)
}
"schedule with different initial delay and frequency" in {
val ticks = new CountDownLatch(3)
case object Msg
val actor = actorOf(new Actor {
def receive = {
case Msg ticks.countDown()
}
})
val startTime = System.nanoTime()
val cancellable = system.scheduler.schedule(actor, Msg, 1 second, 100 milliseconds)
ticks.await(3, TimeUnit.SECONDS)
val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000
assert(elapsedTimeMs > 1200)
assert(elapsedTimeMs < 1500) // the precision is not ms exact
cancellable.cancel()
}
}
}

View file

@ -5,12 +5,10 @@
package akka.config
import akka.testkit.AkkaSpec
import akka.actor.ActorSystem
import java.io.File
import com.typesafe.config.ConfigFactory
import com.typesafe.config.ConfigParseOptions
import scala.collection.JavaConverters._
import java.util.concurrent.TimeUnit
import akka.util.duration._
import akka.util.Duration
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -37,7 +35,7 @@ class ConfigSpec extends AkkaSpec(ConfigFactory.parseResource(classOf[ConfigSpec
getInt("akka.actor.default-dispatcher.mailbox-capacity") must equal(-1)
getMilliseconds("akka.actor.default-dispatcher.mailbox-push-timeout-time") must equal(10 * 1000)
getMilliseconds("akka.actor.dispatcher-shutdown-timeout") must equal(1 * 1000)
settings.DispatcherDefaultShutdown must equal(Duration(1, TimeUnit.SECONDS))
settings.DispatcherDefaultShutdown must equal(1 second)
getInt("akka.actor.default-dispatcher.throughput") must equal(5)
settings.DispatcherThroughput must equal(5)
getMilliseconds("akka.actor.default-dispatcher.throughput-deadline-time") must equal(0)

View file

@ -1,14 +1,8 @@
package akka.dispatch
import org.scalatest.WordSpec
import org.scalatest.matchers.MustMatchers
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfterEach }
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
import java.util.concurrent.{ TimeUnit, CountDownLatch, BlockingQueue }
import java.util.{ Queue }
import java.util.concurrent.{ TimeUnit, BlockingQueue }
import akka.util._
import akka.util.Duration._
import akka.actor.{ LocalActorRef, Actor }
import akka.util.duration._
import akka.testkit.AkkaSpec
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -23,7 +17,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
val q = factory(config)
ensureInitialMailboxState(config, q)
implicit val within = Duration(1, TimeUnit.SECONDS)
implicit val within = 1 second
val f = spawn {
q.dequeue
@ -33,7 +27,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
}
"create a bounded mailbox with 10 capacity and with push timeout" in {
val config = BoundedMailbox(10, Duration(10, TimeUnit.MILLISECONDS))
val config = BoundedMailbox(10, 10 milliseconds)
val q = factory(config)
ensureInitialMailboxState(config, q)
@ -58,11 +52,11 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
}
"dequeue what was enqueued properly for bounded mailboxes" in {
testEnqueueDequeue(BoundedMailbox(10000, Duration(-1, TimeUnit.MILLISECONDS)))
testEnqueueDequeue(BoundedMailbox(10000, -1 millisecond))
}
"dequeue what was enqueued properly for bounded mailboxes with pushTimeout" in {
testEnqueueDequeue(BoundedMailbox(10000, Duration(100, TimeUnit.MILLISECONDS)))
testEnqueueDequeue(BoundedMailbox(10000, 100 milliseconds))
}
}
@ -97,7 +91,7 @@ abstract class MailboxSpec extends AkkaSpec with BeforeAndAfterAll with BeforeAn
}
def testEnqueueDequeue(config: MailboxType) {
implicit val within = Duration(10, TimeUnit.SECONDS)
implicit val within = 10 seconds
val q = factory(config)
ensureInitialMailboxState(config, q)

View file

@ -1,12 +1,11 @@
package akka.performance.microbench
import akka.performance.workbench.PerformanceSpec
import org.apache.commons.math.stat.descriptive.DescriptiveStatistics
import akka.actor._
import java.util.concurrent.{ ThreadPoolExecutor, CountDownLatch, TimeUnit }
import akka.dispatch._
import java.util.concurrent.ThreadPoolExecutor.AbortPolicy
import akka.util.Duration
import akka.util.duration._
// -server -Xms512M -Xmx1024M -XX:+UseParallelGC -Dbenchmark=true -Dbenchmark.repeatFactor=500
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
@ -15,7 +14,7 @@ class TellThroughputPerformanceSpec extends PerformanceSpec {
def createDispatcher(name: String) = ThreadPoolConfigDispatcherBuilder(config
new Dispatcher(system.dispatcherFactory.prerequisites, name, 5,
Duration.Zero, UnboundedMailbox(), config, Duration(60, TimeUnit.SECONDS)), ThreadPoolConfig())
Duration.Zero, UnboundedMailbox(), config, 60 seconds), ThreadPoolConfig())
.withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity
.setCorePoolSize(maxClients)
.build

View file

@ -218,9 +218,14 @@ akka {
# Changing the default values may change the system behavior drastically so make sure you know what you're doing!
#
scheduler {
# The HashedWheelTimer implementation from Jetty is used as the default scheduler in the system.
# See http://www.jboss.org/netty/
tickDuration = 100ms # tick duration in milliseconds (should always be defined in milliseconds)
# The HashedWheelTimer (HWT) implementation from Jetty is used as the default scheduler in the system.
#
# HWT does not execute the scheduled tasks on exact time.
# It will, on every tick, check if there are any tasks behind the schedule and execute them.
# You can increase or decrease the accuracy of the execution timing by specifying smaller or larger tick duration.
# If you are scheduling a lot of tasks you should consider increasing the ticks per wheel.
# For more information see: http://www.jboss.org/netty/
tickDuration = 100ms
ticksPerWheel = 512
}

View file

@ -411,6 +411,9 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } }
private def createSingleTask(f: () Unit): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Duration): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
@ -420,9 +423,6 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
}
}
private def createSingleTask(f: () Unit): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
private def createContinuousTask(f: () Unit, delay: Duration): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {

View file

@ -34,9 +34,9 @@ object FSM {
def schedule(actor: ActorRef, timeout: Duration) {
if (repeat) {
ref = Some(system.scheduler.schedule(actor, this, Duration(timeout.length, timeout.unit), Duration(timeout.length, timeout.unit)))
ref = Some(system.scheduler.schedule(actor, this, timeout, timeout))
} else {
ref = Some(system.scheduler.scheduleOnce(actor, this, Duration(timeout.length, timeout.unit)))
ref = Some(system.scheduler.scheduleOnce(actor, this, timeout))
}
}
@ -523,7 +523,7 @@ trait FSM[S, D] extends ListenerManagement {
if (timeout.isDefined) {
val t = timeout.get
if (t.finite_? && t.length >= 0) {
timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), Duration(t.length, t.unit)))
timeoutFuture = Some(system.scheduler.scheduleOnce(self, TimeoutMarker(generation), t))
}
}
}

View file

@ -15,14 +15,44 @@ package akka.actor
import akka.util.Duration
trait Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable
/**
* Schedules a message to be sent repeatedly with an initial delay and frequency.
* E.g. if you would like a message to be sent immediately and thereafter every 500ms you would set
* delay = Duration.Zero and frequency = Duration(500, TimeUnit.MILLISECONDS)
*/
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, frequency: Duration): Cancellable
/**
* Schedules a function to be run repeatedly with an initial delay and a frequency.
* E.g. if you would like the function to be run after 2 seconds and thereafter every 100ms you would set
* delay = Duration(2, TimeUnit.SECONDS) and frequency = Duration(100, TimeUnit.MILLISECONDS)
*/
def schedule(f: () Unit, initialDelay: Duration, frequency: Duration): Cancellable
/**
* Schedules a Runnable to be run once with a delay, i.e. a time period that has to pass before the runnable is executed.
*/
def scheduleOnce(runnable: Runnable, delay: Duration): Cancellable
/**
* Schedules a message to be sent once with a delay, i.e. a time period that has to pass before the message is sent.
*/
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable
/**
* Schedules a function to be run once with a delay, i.e. a time period that has to pass before the function is run.
*/
def scheduleOnce(f: () Unit, delay: Duration): Cancellable
}
trait Cancellable {
/**
* Cancels the underlying scheduled task.
*/
def cancel(): Unit
/**
* Checks if the underlying scheduled task has been cancelled.
*/
def isCancelled: Boolean
}