2010-10-26 12:49:25 +02:00
|
|
|
package akka.actor
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
import org.scalatest.junit.JUnitSuite
|
|
|
|
|
import Actor._
|
2011-05-18 17:25:30 +02:00
|
|
|
import java.util.concurrent.{ CountDownLatch, TimeUnit }
|
2010-10-26 12:49:25 +02:00
|
|
|
import akka.config.Supervision._
|
2010-08-24 23:21:28 +02:00
|
|
|
import org.multiverse.api.latches.StandardLatch
|
|
|
|
|
import org.junit.Test
|
|
|
|
|
|
|
|
|
|
class SchedulerSpec extends JUnitSuite {
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
def withCleanEndState(action: ⇒ Unit) {
|
2010-08-24 23:21:28 +02:00
|
|
|
action
|
|
|
|
|
Scheduler.restart
|
2011-04-08 15:29:14 +02:00
|
|
|
Actor.registry.local.shutdownAll
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def schedulerShouldScheduleMoreThanOnce = withCleanEndState {
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
case object Tick
|
|
|
|
|
val countDownLatch = new CountDownLatch(3)
|
2010-10-12 12:03:09 +02:00
|
|
|
val tickActor = actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case Tick ⇒ countDownLatch.countDown() }
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
// run every 50 millisec
|
|
|
|
|
Scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)
|
|
|
|
|
|
|
|
|
|
// after max 1 second it should be executed at least the 3 times already
|
|
|
|
|
assert(countDownLatch.await(1, TimeUnit.SECONDS))
|
|
|
|
|
|
|
|
|
|
val countDownLatch2 = new CountDownLatch(3)
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
Scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
// after max 1 second it should be executed at least the 3 times already
|
|
|
|
|
assert(countDownLatch2.await(1, TimeUnit.SECONDS))
|
|
|
|
|
}
|
|
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def schedulerShouldScheduleOnce = withCleanEndState {
|
2010-08-24 23:21:28 +02:00
|
|
|
case object Tick
|
|
|
|
|
val countDownLatch = new CountDownLatch(3)
|
2010-10-12 12:03:09 +02:00
|
|
|
val tickActor = actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case Tick ⇒ countDownLatch.countDown() }
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
// run every 50 millisec
|
|
|
|
|
Scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)
|
2011-05-18 17:25:30 +02:00
|
|
|
Scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
// after 1 second the wait should fail
|
|
|
|
|
assert(countDownLatch.await(1, TimeUnit.SECONDS) == false)
|
|
|
|
|
// should still be 1 left
|
|
|
|
|
assert(countDownLatch.getCount == 1)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* ticket #372
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def schedulerShouldntCreateActors = withCleanEndState {
|
2010-08-24 23:21:28 +02:00
|
|
|
object Ping
|
|
|
|
|
val ticks = new CountDownLatch(1000)
|
|
|
|
|
val actor = actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case Ping ⇒ ticks.countDown }
|
2010-08-24 23:21:28 +02:00
|
|
|
}).start
|
2011-04-08 15:29:14 +02:00
|
|
|
val numActors = Actor.registry.local.actors.length
|
2011-05-18 17:25:30 +02:00
|
|
|
(1 to 1000).foreach(_ ⇒ Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.MILLISECONDS))
|
|
|
|
|
assert(ticks.await(10, TimeUnit.SECONDS))
|
2011-04-08 15:29:14 +02:00
|
|
|
assert(Actor.registry.local.actors.length === numActors)
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* ticket #372
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def schedulerShouldBeCancellable = withCleanEndState {
|
2010-08-24 23:21:28 +02:00
|
|
|
object Ping
|
|
|
|
|
val ticks = new CountDownLatch(1)
|
|
|
|
|
|
|
|
|
|
val actor = actorOf(new Actor {
|
2011-05-18 17:25:30 +02:00
|
|
|
def receive = { case Ping ⇒ ticks.countDown() }
|
2011-04-12 09:55:32 +02:00
|
|
|
}).start()
|
2010-08-24 23:21:28 +02:00
|
|
|
|
2011-05-18 17:25:30 +02:00
|
|
|
(1 to 10).foreach { i ⇒
|
|
|
|
|
val future = Scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)
|
2010-08-24 23:21:28 +02:00
|
|
|
future.cancel(true)
|
|
|
|
|
}
|
2011-05-18 17:25:30 +02:00
|
|
|
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* ticket #307
|
|
|
|
|
*/
|
2011-05-18 17:25:30 +02:00
|
|
|
@Test
|
|
|
|
|
def actorRestartShouldPickUpScheduleAgain = withCleanEndState {
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
object Ping
|
|
|
|
|
object Crash
|
|
|
|
|
|
|
|
|
|
val restartLatch = new StandardLatch
|
|
|
|
|
val pingLatch = new CountDownLatch(6)
|
|
|
|
|
|
|
|
|
|
val actor = actorOf(new Actor {
|
2010-10-19 10:44:27 +02:00
|
|
|
self.lifeCycle = Permanent
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
def receive = {
|
2011-05-18 17:25:30 +02:00
|
|
|
case Ping ⇒ pingLatch.countDown()
|
|
|
|
|
case Crash ⇒ throw new Exception("CRASH")
|
2010-08-24 23:21:28 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def postRestart(reason: Throwable) = restartLatch.open
|
|
|
|
|
})
|
2010-10-18 19:30:43 +02:00
|
|
|
|
2010-08-24 23:21:28 +02:00
|
|
|
Supervisor(
|
|
|
|
|
SupervisorConfig(
|
2010-10-19 14:17:22 +02:00
|
|
|
AllForOneStrategy(List(classOf[Exception]), 3, 1000),
|
2010-08-24 23:21:28 +02:00
|
|
|
Supervise(
|
|
|
|
|
actor,
|
2010-10-04 11:18:10 +02:00
|
|
|
Permanent)
|
2011-05-18 17:25:30 +02:00
|
|
|
:: Nil)).start
|
2010-08-24 23:21:28 +02:00
|
|
|
|
|
|
|
|
Scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)
|
|
|
|
|
// appx 2 pings before crash
|
|
|
|
|
Scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)
|
|
|
|
|
|
|
|
|
|
assert(restartLatch.tryAwait(2, TimeUnit.SECONDS))
|
|
|
|
|
// should be enough time for the ping countdown to recover and reach 6 pings
|
|
|
|
|
assert(pingLatch.await(4, TimeUnit.SECONDS))
|
|
|
|
|
}
|
|
|
|
|
}
|