Implemented HashedWheelTimer as the default scheduling mechanism in Akka. Fixes #1291

This commit is contained in:
Henrik Engstrom 2011-11-09 15:25:14 +01:00
parent b2d548bd0e
commit 896c906d03
27 changed files with 3320 additions and 134 deletions

View file

@ -15,132 +15,82 @@
*/
package akka.actor
import akka.AkkaException
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent._
import akka.util.Duration
import org.jboss.netty.akka.util.{ HashedWheelTimer, TimerTask }
import akka.AkkaException
import org.jboss.netty.akka.util.{ Timeout TimeOut }
case class SchedulerException(msg: String, e: Throwable) extends AkkaException(msg, e) {
def this(msg: String) = this(msg, null)
}
trait JScheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): TimeOut
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimeOut
}
abstract class Scheduler extends JScheduler {
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef]
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): TimeOut
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] =
def schedule(receiver: ActorRef, message: Any, initialDelay: Duration, delay: Duration): TimeOut =
schedule(receiver, message, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): ScheduledFuture[AnyRef] =
def schedule(f: () Unit, initialDelay: Duration, delay: Duration): TimeOut =
schedule(f, initialDelay.toNanos, delay.toNanos, TimeUnit.NANOSECONDS)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): ScheduledFuture[AnyRef] =
def scheduleOnce(receiver: ActorRef, message: Any, delay: Duration): TimeOut =
scheduleOnce(receiver, message, delay.length, delay.unit)
def scheduleOnce(f: () Unit, delay: Duration): ScheduledFuture[AnyRef] =
def scheduleOnce(f: () Unit, delay: Duration): TimeOut =
scheduleOnce(f, delay.length, delay.unit)
}
class DefaultScheduler extends Scheduler {
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = new Runnable {
def run = {
receiver ! message
if (throwWhenReceiverExpired && receiver.isShutdown) throw new ActorKilledException("Receiver was terminated")
class DefaultScheduler(hashedWheelTimer: HashedWheelTimer) extends Scheduler {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createContinuousTask(receiver, message, delay, timeUnit), initialDelay, timeUnit)
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createSingleTask(runnable), delay, timeUnit)
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createSingleTask(receiver, message), delay, timeUnit)
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createContinuousTask(f, delay, timeUnit), initialDelay, timeUnit)
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): TimeOut =
hashedWheelTimer.newTimeout(createSingleTask(f), delay, timeUnit)
private def createSingleTask(runnable: Runnable): TimerTask =
new TimerTask() { def run(timeout: org.jboss.netty.akka.util.Timeout) { runnable.run() } }
private def createSingleTask(receiver: ActorRef, message: Any) =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { receiver ! message } }
private def createContinuousTask(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit) = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
receiver ! message
timeout.getTimer.newTimeout(this, delay, timeUnit)
}
}
}
private[akka] val service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private def createSingleTask(f: () Unit): TimerTask =
new TimerTask { def run(timeout: org.jboss.netty.akka.util.Timeout) { f() } }
/**
* Schedules to send the specified message to the receiver after initialDelay and then repeated after delay.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* send of the message.
*/
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
try {
service.scheduleAtFixedRate(createSendRunnable(receiver, message, true), initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception throw SchedulerException(message + " could not be scheduled on " + receiver, e)
private def createContinuousTask(f: () Unit, delay: Long, timeUnit: TimeUnit): TimerTask = {
new TimerTask {
def run(timeout: org.jboss.netty.akka.util.Timeout) {
f()
timeout.getTimer.newTimeout(this, delay, timeUnit)
}
}
}
/**
* Schedules to run specified function to the receiver after initialDelay and then repeated after delay,
* avoid blocking operations since this is executed in the schedulers thread.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* execution of the function.
*/
def schedule(f: () Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit)
/**
* Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay,
* avoid blocking operations since this is executed in the schedulers thread.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* execution of the runnable.
*/
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
try {
service.scheduleAtFixedRate(runnable, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception throw SchedulerException("Failed to schedule a Runnable", e)
}
}
/**
* Schedules to send the specified message to the receiver after delay.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* send of the message.
*/
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
try {
service.schedule(createSendRunnable(receiver, message, false), delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception throw SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e)
}
}
/**
* Schedules a function to be run after delay,
* avoid blocking operations since the runnable is executed in the schedulers thread.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* execution of the function.
*/
def scheduleOnce(f: () Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
scheduleOnce(new Runnable { def run = f() }, delay, timeUnit)
/**
* Schedules a runnable to be run after delay,
* avoid blocking operations since the runnable is executed in the schedulers thread.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* execution of the runnable.
*/
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
try {
service.schedule(runnable, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception throw SchedulerException("Failed to scheduleOnce a Runnable", e)
}
}
private[akka] def shutdown() { service.shutdownNow() }
}
private object SchedulerThreadFactory extends ThreadFactory {
private val count = new AtomicLong(0)
val threadFactory = Executors.defaultThreadFactory()
def newThread(r: Runnable): Thread = {
val thread = threadFactory.newThread(r)
thread.setName("akka:scheduler-" + count.incrementAndGet())
thread.setDaemon(true)
thread
}
}
private[akka] def stop() = hashedWheelTimer.stop()
}