diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index 3932df4ea3..ba34987c9c 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -11,7 +11,7 @@ import akka.pattern.ask import java.util.concurrent.atomic.AtomicInteger @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout { +class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout with ImplicitSender { private val cancellables = new ConcurrentLinkedQueue[Cancellable]() import system.dispatcher @@ -33,39 +33,47 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout "schedule more than once" in { case object Tick - val countDownLatch = new CountDownLatch(3) - val tickActor = system.actorOf(Props(new Actor { - def receive = { case Tick ⇒ countDownLatch.countDown() } + case object Tock + + val tickActor, tickActor2 = system.actorOf(Props(new Actor { + var ticks = 0 + def receive = { + case Tick ⇒ + if (ticks < 3) { + sender ! Tock + ticks += 1 + } + } })) // run every 50 milliseconds collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds, tickActor, Tick)) // after max 1 second it should be executed at least the 3 times already - assert(countDownLatch.await(2, TimeUnit.SECONDS)) + expectMsg(Tock) + expectMsg(Tock) + expectMsg(Tock) + expectNoMsg(500 millis) - val countDownLatch2 = new CountDownLatch(3) - - collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(countDownLatch2.countDown())) + collectCancellable(system.scheduler.schedule(0 milliseconds, 50 milliseconds)(tickActor2 ! Tick)) // after max 1 second it should be executed at least the 3 times already - assert(countDownLatch2.await(2, TimeUnit.SECONDS)) + expectMsg(Tock) + expectMsg(Tock) + expectMsg(Tock) + expectNoMsg(500 millis) } "stop continuous scheduling if the receiving actor has been terminated" taggedAs TimingTest in { - val actor = system.actorOf(Props(new Actor { - def receive = { - case x ⇒ testActor ! x - } - })) + val actor = system.actorOf(Props(new Actor { def receive = { case x ⇒ sender ! x } })) // run immediately and then every 100 milliseconds collectCancellable(system.scheduler.schedule(0 milliseconds, 100 milliseconds, actor, "msg")) expectMsg("msg") // stop the actor and, hence, the continuous messaging from happening - actor ! PoisonPill + system stop actor - expectNoMsg(1 second) + expectNoMsg(500 millis) } "schedule once" in { @@ -93,19 +101,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout * ticket #372 */ "be cancellable" in { - object Ping - val ticks = new CountDownLatch(1) + for (_ ← 1 to 10) system.scheduler.scheduleOnce(1 second, testActor, "fail").cancel() - val actor = system.actorOf(Props(new Actor { - def receive = { case Ping ⇒ ticks.countDown() } - })) - - (1 to 10).foreach { i ⇒ - val timeout = collectCancellable(system.scheduler.scheduleOnce(1 second, actor, Ping)) - timeout.cancel() - } - - assert(ticks.await(3, TimeUnit.SECONDS) == false) //No counting down should've been made + expectNoMsg(2 seconds) } "be cancellable during initial delay" taggedAs TimingTest in { @@ -200,31 +198,24 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout case object Msg val actor = system.actorOf(Props(new Actor { - def receive = { - case Msg ⇒ ticks.countDown() - } + def receive = { case Msg ⇒ ticks.countDown() } })) val startTime = System.nanoTime() - val cancellable = system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg) + collectCancellable(system.scheduler.schedule(1 second, 300 milliseconds, actor, Msg)) Await.ready(ticks, 3 seconds) - val elapsedTimeMs = (System.nanoTime() - startTime) / 1000000 - assert(elapsedTimeMs > 1600) - assert(elapsedTimeMs < 2000) // the precision is not ms exact - cancellable.cancel() + (System.nanoTime() - startTime).nanos.toMillis must be(1800L plusOrMinus 199) } "adjust for scheduler inaccuracy" taggedAs TimingTest in { val startTime = System.nanoTime val n = 33 val latch = new TestLatch(n) - system.scheduler.schedule(150.millis, 150.millis) { - latch.countDown() - } + system.scheduler.schedule(150.millis, 150.millis) { latch.countDown() } Await.ready(latch, 6.seconds) - val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis - rate must be(6.66 plusOrMinus (0.4)) + // Rate + n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(6.66 plusOrMinus 0.4) } "not be affected by long running task" taggedAs TimingTest in { @@ -236,8 +227,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach with DefaultTimeout latch.countDown() } Await.ready(latch, 6.seconds) - val rate = n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis - rate must be(4.4 plusOrMinus (0.3)) + // Rate + n * 1000.0 / (System.nanoTime - startTime).nanos.toMillis must be(4.4 plusOrMinus 0.3) } } } diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index 4aa91f916f..bbb830110d 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -39,7 +39,7 @@ trait Scheduler { initialDelay: FiniteDuration, interval: FiniteDuration, receiver: ActorRef, - message: Any)(implicit executor: ExecutionContext): Cancellable + message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable /** * Schedules a function to be run repeatedly with an initial delay and a @@ -136,7 +136,7 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer, log: LoggingAdapter) override def schedule(initialDelay: FiniteDuration, delay: FiniteDuration, receiver: ActorRef, - message: Any)(implicit executor: ExecutionContext): Cancellable = { + message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = { val continuousCancellable = new ContinuousCancellable continuousCancellable.init( hashedWheelTimer.newTimeout( diff --git a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala index f7cbe5e788..553566e9af 100644 --- a/akka-cluster/src/main/scala/akka/cluster/Cluster.scala +++ b/akka-cluster/src/main/scala/akka/cluster/Cluster.scala @@ -108,7 +108,7 @@ class Cluster(val system: ExtendedActorSystem) extends Extension { override def close(): Unit = () // we are using system.scheduler, which we are not responsible for closing override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration, - receiver: ActorRef, message: Any)(implicit executor: ExecutionContext): Cancellable = + receiver: ActorRef, message: Any)(implicit executor: ExecutionContext, sender: ActorRef = Actor.noSender): Cancellable = systemScheduler.schedule(initialDelay, interval, receiver, message) override def schedule(initialDelay: FiniteDuration, interval: FiniteDuration)(f: ⇒ Unit)(implicit executor: ExecutionContext): Cancellable = diff --git a/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java b/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java index 0b3d55f33f..66b5181ba8 100644 --- a/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java +++ b/akka-docs/rst/java/code/docs/actor/SchedulerDocTestBase.java @@ -79,7 +79,7 @@ public class SchedulerDocTestBase { //to the tickActor after 0ms repeating every 50ms Cancellable cancellable = system.scheduler().schedule(Duration.Zero(), Duration.create(50, TimeUnit.MILLISECONDS), tickActor, "Tick", - system.dispatcher()); + system.dispatcher(), null); //This cancels further Ticks to be sent cancellable.cancel(); diff --git a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java index 5b7a3073c3..37d1da703a 100644 --- a/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java +++ b/akka-docs/rst/java/code/docs/actor/japi/FaultHandlingDocSample.java @@ -148,7 +148,7 @@ public class FaultHandlingDocSample { progressListener = getSender(); getContext().system().scheduler().schedule( Duration.Zero(), Duration.create(1, "second"), getSelf(), Do, - getContext().dispatcher() + getContext().dispatcher(), null ); } else if (msg.equals(Do)) { counterService.tell(new Increment(1), getSelf()); diff --git a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java index e712eee146..bb3eb8b777 100644 --- a/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java +++ b/akka-docs/rst/java/code/docs/pattern/SchedulerPatternTest.java @@ -35,7 +35,7 @@ public class SchedulerPatternTest { private final Cancellable tick = getContext().system().scheduler().schedule( Duration.create(500, TimeUnit.MILLISECONDS), Duration.create(1000, TimeUnit.MILLISECONDS), - getSelf(), "tick", getContext().dispatcher()); + getSelf(), "tick", getContext().dispatcher(), null); //#schedule-constructor // this variable and constructor is declared here to not show up in the docs final ActorRef target; diff --git a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java index 8c54ed1058..d0020c0a5a 100644 --- a/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java +++ b/akka-docs/rst/java/code/docs/zeromq/ZeromqDocTestBase.java @@ -198,7 +198,7 @@ public class ZeromqDocTestBase { public void preStart() { getContext().system().scheduler() .schedule(Duration.create(1, "second"), Duration.create(1, "second"), - getSelf(), TICK, getContext().dispatcher()); + getSelf(), TICK, getContext().dispatcher(), null); } @Override diff --git a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java index bb3f52e248..2a89390677 100644 --- a/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java +++ b/akka-samples/akka-sample-cluster/src/main/java/sample/cluster/stats/japi/StatsSampleClient.java @@ -38,7 +38,7 @@ public class StatsSampleClient extends UntypedActor { .system() .scheduler() .schedule(interval, interval, getSelf(), "tick", - getContext().dispatcher()); + getContext().dispatcher(), null); } //subscribe to cluster changes, MemberEvent