pekko/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala

235 lines
7.4 KiB
Scala
Raw Normal View History

package akka.actor
import language.postfixOps
2011-06-20 15:11:32 -06:00
import org.scalatest.BeforeAndAfterEach
import scala.concurrent.duration._
import java.util.concurrent.{ CountDownLatch, ConcurrentLinkedQueue, TimeUnit }
import akka.testkit._
import scala.concurrent.Await
import akka.pattern.ask
import java.util.concurrent.atomic.AtomicInteger
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout with ImplicitSender {
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
import system.dispatcher
def collectCancellable(c: Cancellable): Cancellable = {
cancellables.add(c)
c
}
2011-10-11 16:05:48 +02:00
override def afterEach {
while (cancellables.peek() ne null) {
for (c Option(cancellables.poll())) {
c.cancel()
c.isCancelled must be === true
}
}
}
2011-10-11 16:05:48 +02:00
"A Scheduler" must {
2011-10-11 16:05:48 +02:00
"schedule more than once" in {
case object Tick
case object Tock
val tickActor, tickActor2 = system.actorOf(Props(new Actor {
var ticks = 0
def receive = {
case Tick
if (ticks < 3) {
sender ! Tock
ticks += 1
}
}
}))
// run every 50 milliseconds
collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick))
2011-10-11 16:05:48 +02:00
// after max 1 second it should be executed at least the 3 times already
expectMsg(Tock)
expectMsg(Tock)
expectMsg(Tock)
expectNoMsg(500 millis)
collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(tickActor2 ! Tick))
2011-10-11 16:05:48 +02:00
// after max 1 second it should be executed at least the 3 times already
expectMsg(Tock)
expectMsg(Tock)
expectMsg(Tock)
expectNoMsg(500 millis)
2011-10-11 16:05:48 +02:00
}
"stop continuous scheduling if the receiving actor has been terminated" taggedAs TimingTest in {
val actor = system.actorOf(Props(new Actor { def receive = { case x sender ! x } }))
2012-02-03 14:52:10 +01:00
// run immediately and then every 100 milliseconds
2012-02-03 14:45:58 +01:00
collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, actor, "msg"))
2012-05-13 23:22:41 +02:00
expectMsg("msg")
// stop the actor and, hence, the continuous messaging from happening
system stop actor
expectNoMsg(500 millis)
}
2011-10-11 16:05:48 +02:00
"schedule once" in {
case object Tick
val countDownLatch = new CountDownLatch(3)
val tickActor = system.actorOf(Props(new Actor {
2011-10-11 16:05:48 +02:00
def receive = { case Tick countDownLatch.countDown() }
}))
// run after 300 millisec
collectCancellable(system.scheduler.scheduleOnce(300 milliseconds, tickActor, Tick))
collectCancellable(system.scheduler.scheduleOnce(300 milliseconds)(countDownLatch.countDown()))
// should not be run immediately
assert(countDownLatch.await(100, TimeUnit.MILLISECONDS) == false)
countDownLatch.getCount must be(3)
2011-10-11 16:05:48 +02:00
// after 1 second the wait should fail
2012-02-03 14:45:58 +01:00
assert(countDownLatch.await(2, TimeUnit.SECONDS) == false)
2011-10-11 16:05:48 +02:00
// should still be 1 left
countDownLatch.getCount must be(1)
}
2011-10-11 16:05:48 +02:00
/**
* ticket #372
*/
"be cancellable" in {
for (_ 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel()
expectNoMsg(2 seconds)
2011-10-11 16:05:48 +02:00
}
2012-02-03 14:45:58 +01:00
"be cancellable during initial delay" taggedAs TimingTest in {
val ticks = new AtomicInteger
val initialDelay = 200.milliseconds.dilated
val delay = 10.milliseconds.dilated
val timeout = collectCancellable(system.scheduler.schedule(initialDelay, delay) {
ticks.incrementAndGet()
})
Thread.sleep(10.milliseconds.dilated.toMillis)
timeout.cancel()
Thread.sleep((initialDelay + 100.milliseconds.dilated).toMillis)
ticks.get must be(0)
}
2012-02-03 14:45:58 +01:00
"be cancellable after initial delay" taggedAs TimingTest in {
val ticks = new AtomicInteger
val initialDelay = 20.milliseconds.dilated
val delay = 200.milliseconds.dilated
val timeout = collectCancellable(system.scheduler.schedule(initialDelay, delay) {
ticks.incrementAndGet()
})
Thread.sleep((initialDelay + 100.milliseconds.dilated).toMillis)
timeout.cancel()
Thread.sleep((delay + 100.milliseconds.dilated).toMillis)
ticks.get must be(1)
}
2011-10-11 16:05:48 +02:00
/**
* ticket #307
*/
"pick up schedule after actor restart" in {
2011-10-11 16:05:48 +02:00
object Ping
object Crash
val restartLatch = new TestLatch
val pingLatch = new TestLatch(6)
2011-10-11 16:05:48 +02:00
val supervisor = system.actorOf(Props(new Supervisor(AllForOneStrategy(3, 1 second)(List(classOf[Exception])))))
val props = Props(new Actor {
2011-10-11 16:05:48 +02:00
def receive = {
case Ping pingLatch.countDown()
case Crash throw new Exception("CRASH")
}
override def postRestart(reason: Throwable) = restartLatch.open
})
val actor = Await.result((supervisor ? props).mapTo[ActorRef], timeout.duration)
2011-10-11 16:05:48 +02:00
collectCancellable(system.scheduler.schedule(500 milliseconds, 500 milliseconds, actor, Ping))
2011-10-11 16:05:48 +02:00
// appx 2 pings before crash
EventFilter[Exception]("CRASH", occurrences = 1) intercept {
collectCancellable(system.scheduler.scheduleOnce(1000 milliseconds, actor, Crash))
}
2011-10-11 16:05:48 +02:00
Await.ready(restartLatch, 2 seconds)
2011-10-11 16:05:48 +02:00
// should be enough time for the ping countdown to recover and reach 6 pings
Await.ready(pingLatch, 5 seconds)
2011-10-11 16:05:48 +02:00
}
"never fire prematurely" in {
val ticks = new TestLatch(300)
case class Msg(ts: Long)
val actor = system.actorOf(Props(new Actor {
def receive = {
case Msg(ts)
val now = System.nanoTime
// Make sure that no message has been dispatched before the scheduled time (10ms) has occurred
if (now - ts < 10.millis.toNanos) throw new RuntimeException("Interval is too small: " + (now - ts))
ticks.countDown()
}
}))
(1 to 300).foreach { i
collectCancellable(system.scheduler.scheduleOnce(10 milliseconds, actor, Msg(System.nanoTime)))
Thread.sleep(5)
}
Await.ready(ticks, 3 seconds)
}
2012-02-03 14:45:58 +01:00
"schedule with different initial delay and frequency" taggedAs TimingTest in {
val ticks = new TestLatch(3)
case object Msg
val actor = system.actorOf(Props(new Actor {
def receive = { case Msg ticks.countDown() }
}))
val startTime = System.nanoTime()
collectCancellable(system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg))
Await.ready(ticks, 3 seconds)
(System.nanoTime() - startTime).nanos.toMillis must be(1800L plusOrMinus 199)
}
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 33
val latch = new TestLatch(n)
system.scheduler.schedule(150.millis, 150.millis) { latch.countDown() }
Await.ready(latch, 6.seconds)
// Rate
n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(6.66 plusOrMinus 0.4)
}
"not be affected by long running task" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 22
val latch = new TestLatch(n)
system.scheduler.schedule(225.millis, 225.millis) {
Thread.sleep(80)
latch.countDown()
}
Await.ready(latch, 6.seconds)
// Rate
n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3)
}
}
}