#2721 - Adding implicit sender to Scheduler + tests

This commit is contained in:
Viktor Klang 2012-12-18 00:51:11 +01:00
parent 68f72459a3
commit 978c86339f
8 changed files with 42 additions and 51 deletions

View file

@ -11,7 +11,7 @@ import akka.pattern.ask
import java.util.concurrent.atomic.AtomicInteger
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout {
class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout with ImplicitSender {
private val cancellables = new ConcurrentLinkedQueue[Cancellable]()
import system.dispatcher
@ -33,39 +33,47 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
"schedule more than once" in {
case object Tick
val countDownLatch = new CountDownLatch(3)
val tickActor = system.actorOf(Props(new Actor {
def receive = { case Tick countDownLatch.countDown() }
case object Tock
val tickActor, tickActor2 = system.actorOf(Props(new Actor {
var ticks = 0
def receive = {
case Tick
if (ticks < 3) {
sender ! Tock
ticks += 1
}
}
}))
// run every 50 milliseconds
collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch.await(2, TimeUnit.SECONDS))
expectMsg(Tock)
expectMsg(Tock)
expectMsg(Tock)
expectNoMsg(500 millis)
val countDownLatch2 = new CountDownLatch(3)
collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(countDownLatch2.countDown()))
collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(tickActor2 ! Tick))
// after max 1 second it should be executed at least the 3 times already
assert(countDownLatch2.await(2, TimeUnit.SECONDS))
expectMsg(Tock)
expectMsg(Tock)
expectMsg(Tock)
expectNoMsg(500 millis)
}
"stop continuous scheduling if the receiving actor has been terminated" taggedAs TimingTest in {
val actor = system.actorOf(Props(new Actor {
def receive = {
case x testActor ! x
}
}))
val actor = system.actorOf(Props(new Actor { def receive = { case x sender ! x } }))
// run immediately and then every 100 milliseconds
collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, actor, "msg"))
expectMsg("msg")
// stop the actor and, hence, the continuous messaging from happening
actor ! PoisonPill
system stop actor
expectNoMsg(1 second)
expectNoMsg(500 millis)
}
"schedule once" in {
@ -93,19 +101,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
* ticket #372
*/
"be cancellable" in {
object Ping
val ticks = new CountDownLatch(1)
for (_ 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel()
val actor = system.actorOf(Props(new Actor {
def receive = { case Ping ticks.countDown() }
}))
(1 to 10).foreach { i
val timeout = collectCancellable(system.scheduler.scheduleOnce(1 second, actor, Ping))
timeout.cancel()
}
assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made
expectNoMsg(2 seconds)
}
"be cancellable during initial delay" taggedAs TimingTest in {
@ -200,31 +198,24 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
case object Msg
val actor = system.actorOf(Props(new Actor {
def receive = {
case Msg ticks.countDown()
}
def receive = { case Msg ticks.countDown() }
}))
val startTime = System.nanoTime()
val cancellable = system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg)
collectCancellable(system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg))
Await.ready(ticks, 3 seconds)
val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000
assert(elapsedTimeMs > 1600)
assert(elapsedTimeMs < 2000) // the precision is not ms exact
cancellable.cancel()
(System.nanoTime() - startTime).nanos.toMillis must be(1800L plusOrMinus 199)
}
"adjust for scheduler inaccuracy" taggedAs TimingTest in {
val startTime = System.nanoTime
val n = 33
val latch = new TestLatch(n)
system.scheduler.schedule(150.millis, 150.millis) {
latch.countDown()
}
system.scheduler.schedule(150.millis, 150.millis) { latch.countDown() }
Await.ready(latch, 6.seconds)
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
rate must be(6.66 plusOrMinus (0.4))
// Rate
n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(6.66 plusOrMinus 0.4)
}
"not be affected by long running task" taggedAs TimingTest in {
@ -236,8 +227,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout
latch.countDown()
}
Await.ready(latch, 6.seconds)
val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis
rate must be(4.4 plusOrMinus (0.3))
// Rate
n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3)
}
}
}

View file

@ -39,7 +39,7 @@ trait Scheduler {
initialDelay: FiniteDuration,
interval: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable
message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable
/**
* Schedules a function to be run repeatedly with an initial delay and a
@ -136,7 +136,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter)
override def schedule(initialDelay: FiniteDuration,
delay: FiniteDuration,
receiver: ActorRef,
message: Any)(implicit executor: ExecutionContext): Cancellable = {
message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = {
val continuousCancellable = new ContinuousCancellable
continuousCancellable.init(
hashedWheelTimer.newTimeout(

View file

@ -108,7 +108,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension {
override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration,
receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable =
receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable =
systemScheduler.schedule(initialDelay, interval, receiver, message)
override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: Unit)(implicit executor: ExecutionContext): Cancellable =

View file

@ -79,7 +79,7 @@ public class SchedulerDocTestBase {
//to the tickActor after 0ms repeating every 50ms
Cancellable cancellable = system.scheduler().schedule(Duration.Zero(),
Duration.create(50, TimeUnit.MILLISECONDS), tickActor, "Tick",
system.dispatcher());
system.dispatcher(), null);
//This cancels further Ticks to be sent
cancellable.cancel();

View file

@ -148,7 +148,7 @@ public class FaultHandlingDocSample {
progressListener = getSender();
getContext().system().scheduler().schedule(
Duration.Zero(), Duration.create(1, "second"), getSelf(), Do,
getContext().dispatcher()
getContext().dispatcher(), null
);
} else if (msg.equals(Do)) {
counterService.tell(new Increment(1), getSelf());

View file

@ -35,7 +35,7 @@ public class SchedulerPatternTest {
private final Cancellable tick = getContext().system().scheduler().schedule(
Duration.create(500, TimeUnit.MILLISECONDS),
Duration.create(1000, TimeUnit.MILLISECONDS),
getSelf(), "tick", getContext().dispatcher());
getSelf(), "tick", getContext().dispatcher(), null);
//#schedule-constructor
// this variable and constructor is declared here to not show up in the docs
final ActorRef target;

View file

@ -198,7 +198,7 @@ public class ZeromqDocTestBase {
public void preStart() {
getContext().system().scheduler()
.schedule(Duration.create(1, "second"), Duration.create(1, "second"),
getSelf(), TICK, getContext().dispatcher());
getSelf(), TICK, getContext().dispatcher(), null);
}
@Override

View file

@ -38,7 +38,7 @@ public class StatsSampleClient extends UntypedActor {
.system()
.scheduler()
.schedule(interval, interval, getSelf(), "tick",
getContext().dispatcher());
getContext().dispatcher(), null);
}
//subscribe to cluster changes, MemberEvent