Updated after code review:

Removed logging
Slimmed down the Java code
Moved default scheduler
This commit is contained in:
Henrik Engstrom 2011-11-10 16:31:50 +01:00
parent d1ebc1ed06
commit 1577f8bcb3
16 changed files with 70 additions and 1072 deletions

View file

@ -17,9 +17,7 @@ package akka.actor
import java.util.concurrent._
import akka.util.Duration
import org.jboss.netty.akka.util.{ HashedWheelTimer, TimerTask }
import akka.AkkaException
import org.jboss.netty.akka.util.{ Timeout TimeOut }
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) {
def this(msg: String) = this(msg, null)
@ -49,60 +47,8 @@ abstract class Scheduler extends JScheduler {
scheduleOnce(f, delay.length, delay.unit)
}
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit))
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit))
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit))
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit))
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): Cancellable =
new DefaultCancellable(hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit))
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } }
private def createSingleTask(receiver: ActorRef, message: Any): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
receiver ! message
timeout.getTimer.newTimeout(this, delay, timeUnit)
}
}
}
private def createSingleTask(f: () Unit): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
private def createContinuousTask(f: () Unit, delay: Long, timeUnit: TimeUnit): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
f()
timeout.getTimer.newTimeout(this, delay, timeUnit)
}
}
}
private[akka] def stop() = hashedWheelTimer.stop()
}
trait Cancellable {
def cancel(): Unit
def isCancelled: Boolean
}
class DefaultCancellable(timeout: TimeOut) extends Cancellable {
def cancel() { timeout.cancel() }
def isCancelled: Boolean = { timeout.isCancelled }
}