Added a Cancellable trait to encapsulate any specific scheduler implementations from leaking. Fixes #1286

This commit is contained in:
Henrik Engstrom 2011-11-10 11:53:36 +01:00
parent 896c906d03
commit d1ebc1ed06
4 changed files with 48 additions and 39 deletions

View file

@ -26,52 +26,52 @@ case class SchedulerException(msg: String, e: Throwable) extends AkkaException(m
}
trait JScheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): TimeOut
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimeOut
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable
}
abstract class Scheduler extends JScheduler {
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): TimeOut
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): TimeOut =
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): Cancellable =
schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): TimeOut =
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): Cancellable =
schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): TimeOut =
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): Cancellable =
scheduleOnce(receiver, message, delay.length, delay.unit)
def scheduleOnce(f: () Unit, delay: Duration): TimeOut =
def scheduleOnce(f: () Unit, delay: Duration): Cancellable =
scheduleOnce(f, delay.length, delay.unit)
}
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit))
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit))
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit))
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit))
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit))
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } }
private def createSingleTask(receiver: ActorRef, message: Any) =
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit) = {
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
receiver ! message
@ -93,4 +93,16 @@ class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
}
private[akka] def stop() = hashedWheelTimer.stop()
}
trait Cancellable {
def cancel(): Unit
def isCancelled: Boolean
}
class DefaultCancellable(timeout: TimeOut) extends Cancellable {
def cancel() { timeout.cancel() }
def isCancelled: Boolean = { timeout.isCancelled }
}