diff --git a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala index e1f879d99d..65dfe26de2 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/SchedulerSpec.scala @@ -1,24 +1,21 @@ package akka.actor import org.scalatest.BeforeAndAfterEach -import akka.testkit.TestEvent._ -import akka.testkit.EventFilter import org.multiverse.api.latches.StandardLatch import java.util.concurrent.{ ConcurrentLinkedQueue, CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec -import org.jboss.netty.akka.util.{ Timeout ⇒ TimeOut } @org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { - private val timeouts = new ConcurrentLinkedQueue[TimeOut]() + private val cancellables = new ConcurrentLinkedQueue[Cancellable]() - def collectTimeout(t: TimeOut): TimeOut = { - timeouts.add(t) - t + def collectCancellable(c: Cancellable): Cancellable = { + cancellables.add(c) + c } override def afterEach { - while (timeouts.peek() ne null) { Option(timeouts.poll()).foreach(_.cancel()) } + while (cancellables.peek() ne null) { Option(cancellables.poll()).foreach(_.cancel()) } } "A Scheduler" must { @@ -30,14 +27,14 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { def receive = { case Tick ⇒ countDownLatch.countDown() } }) // run every 50 millisec - collectTimeout(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.schedule(tickActor, Tick, 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch.await(1, TimeUnit.SECONDS)) val countDownLatch2 = new CountDownLatch(3) - collectTimeout(app.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.schedule(() ⇒ countDownLatch2.countDown(), 0, 50, TimeUnit.MILLISECONDS)) // after max 1 second it should be executed at least the 3 times already assert(countDownLatch2.await(2, TimeUnit.SECONDS)) @@ -51,8 +48,8 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) // run every 50 millisec - collectTimeout(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) - collectTimeout(app.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.scheduleOnce(tickActor, Tick, 50, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.scheduleOnce(() ⇒ countDownLatch.countDown(), 50, TimeUnit.MILLISECONDS)) // after 1 second the wait should fail assert(countDownLatch.await(2, TimeUnit.SECONDS) == false) @@ -88,7 +85,7 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) (1 to 10).foreach { i ⇒ - val timeout = collectTimeout(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) + val timeout = collectCancellable(app.scheduler.scheduleOnce(actor, Ping, 1, TimeUnit.SECONDS)) timeout.cancel() } @@ -116,9 +113,9 @@ class SchedulerSpec extends AkkaSpec with BeforeAndAfterEach { }) val actor = (supervisor ? props).as[ActorRef].get - collectTimeout(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.schedule(actor, Ping, 500, 500, TimeUnit.MILLISECONDS)) // appx 2 pings before crash - collectTimeout(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) + collectCancellable(app.scheduler.scheduleOnce(actor, Crash, 1000, TimeUnit.MILLISECONDS)) assert(restartLatch.tryAwait(2, TimeUnit.SECONDS)) // should be enough time for the ping countdown to recover and reach 6 pings diff --git a/akka-actor/src/main/scala/akka/actor/ActorCell.scala b/akka-actor/src/main/scala/akka/actor/ActorCell.scala index 71c9991c84..711816be5f 100644 --- a/akka-actor/src/main/scala/akka/actor/ActorCell.scala +++ b/akka-actor/src/main/scala/akka/actor/ActorCell.scala @@ -75,7 +75,7 @@ private[akka] class ActorCell( final def provider = app.provider - var futureTimeout: Option[org.jboss.netty.akka.util.Timeout] = None + var futureTimeout: Option[Cancellable] = None var _children = emptyChildren //Reuse same empty instance to avoid allocating new instance of the Ordering and the actual empty instance for every actor diff --git a/akka-actor/src/main/scala/akka/actor/FSM.scala b/akka-actor/src/main/scala/akka/actor/FSM.scala index a49a6605f3..682f5ae3c6 100644 --- a/akka-actor/src/main/scala/akka/actor/FSM.scala +++ b/akka-actor/src/main/scala/akka/actor/FSM.scala @@ -30,7 +30,7 @@ object FSM { case class TimeoutMarker(generation: Long) case class Timer(name: String, msg: Any, repeat: Boolean, generation: Int)(implicit app: AkkaApplication) { - private var ref: Option[org.jboss.netty.akka.util.Timeout] = _ + private var ref: Option[Cancellable] = _ def schedule(actor: ActorRef, timeout: Duration) { if (repeat) { @@ -392,7 +392,7 @@ trait FSM[S, D] extends ListenerManagement { * FSM State data and current timeout handling */ private var currentState: State = _ - private var timeoutFuture: Option[org.jboss.netty.akka.util.Timeout] = None + private var timeoutFuture: Option[Cancellable] = None private var generation: Long = 0L /* diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index be6c37cfe1..4a1fe5cb66 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -26,52 +26,52 @@ case class SchedulerException(msg: String, e: Throwable) extends AkkaException(m } trait JScheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): TimeOut - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimeOut + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable } abstract class Scheduler extends JScheduler { - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut + def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): TimeOut + def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable - def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): TimeOut = + def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable = schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): TimeOut = + def schedule(f: () ⇒ Unit, initialDelay: Duration, delay: Duration): Cancellable = schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): TimeOut = + def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable = scheduleOnce(receiver, message, delay.length, delay.unit) - def scheduleOnce(f: () ⇒ Unit, delay: Duration): TimeOut = + def scheduleOnce(f: () ⇒ Unit, delay: Duration): Cancellable = scheduleOnce(f, delay.length, delay.unit) } class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { - def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut = - hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit) + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)) - def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): TimeOut = - hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit) + def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)) - def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimeOut = - hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit) + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)) - def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut = - hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit) + def schedule(f: () ⇒ Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)) - def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): TimeOut = - hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit) + def scheduleOnce(f: () ⇒ Unit, delay: Long, timeUnit: TimeUnit): Cancellable = + new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)) private def createSingleTask(runnable: Runnable): TimerTask = new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } } - private def createSingleTask(receiver: ActorRef, message: Any) = + private def createSingleTask(receiver: ActorRef, message: Any): TimerTask = new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } } - private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit) = { + private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = { new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message @@ -93,4 +93,16 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler { } private[akka] def stop() = hashedWheelTimer.stop() +} + +trait Cancellable { + def cancel(): Unit + + def isCancelled: Boolean +} + +class DefaultCancellable(timeout: TimeOut) extends Cancellable { + def cancel() { timeout.cancel() } + + def isCancelled: Boolean = { timeout.isCancelled } } \ No newline at end of file