pekko/akka-actor/src/main/scala/actor/Scheduler.scala

134 lines
5.1 KiB
Scala
Raw Normal View History

2009-08-17 20:46:05 +02:00
/*
* Copyright 2007 WorldWide Conferencing, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
2010-05-21 08:18:56 +02:00
*
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
2009-08-17 20:46:05 +02:00
*/
2009-09-07 18:41:45 +02:00
package se.scalablesolutions.akka.actor
2009-08-17 18:42:41 +02:00
import scala.collection.JavaConversions
2009-08-17 18:42:41 +02:00
import java.util.concurrent._
2009-09-07 18:41:45 +02:00
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.AkkaException
2009-08-17 18:42:41 +02:00
object Scheduler extends Logging {
import Actor._
2010-05-21 20:08:49 +02:00
2010-05-21 08:18:56 +02:00
case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
2010-10-25 00:57:17 +02:00
@volatile private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
2010-08-06 17:13:31 +02:00
log.info("Starting up Scheduler")
2009-08-17 18:42:41 +02:00
/**
* Schedules to send the specified message to the receiver after initialDelay and then repeated after delay
*/
2010-08-06 17:13:31 +02:00
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
message, receiver, initialDelay, delay, timeUnit)
2009-08-17 18:42:41 +02:00
try {
2010-08-06 17:13:31 +02:00
service.scheduleAtFixedRate(
2010-05-21 20:08:49 +02:00
new Runnable { def run = receiver ! message },
2010-05-21 08:18:56 +02:00
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
2009-08-17 18:42:41 +02:00
} catch {
case e: Exception => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
2009-08-17 18:42:41 +02:00
}
}
/**
* Schedules to run specified function to the receiver after initialDelay and then repeated after delay,
* avoid blocking operations since this is executed in the schedulers thread
*/
def schedule(f: () => Unit, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
schedule(new Runnable { def run = f() }, initialDelay, delay, timeUnit)
/**
* Schedules to run specified runnable to the receiver after initialDelay and then repeated after delay,
* avoid blocking operations since this is executed in the schedulers thread
*/
def schedule(runnable: Runnable, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule scheduled event\n\trunnable = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
runnable, initialDelay, delay, timeUnit)
try {
service.scheduleAtFixedRate(runnable,initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException("Failed to schedule a Runnable", e)
}
}
/**
* Schedules to send the specified message to the receiver after delay
*/
2010-08-06 17:13:31 +02:00
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
message, receiver, delay, timeUnit)
try {
2010-08-06 17:13:31 +02:00
service.schedule(
new Runnable { def run = receiver ! message },
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException( message + " could not be scheduleOnce'd on " + receiver, e)
}
}
/**
* Schedules a function to be run after delay,
* avoid blocking operations since the runnable is executed in the schedulers thread
*/
def scheduleOnce(f: () => Unit, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] =
scheduleOnce(new Runnable { def run = f() }, delay, timeUnit)
/**
* Schedules a runnable to be run after delay,
* avoid blocking operations since the runnable is executed in the schedulers thread
*/
def scheduleOnce(runnable: Runnable, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
log.trace(
"Schedule one-time event\n\trunnable = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]",
runnable, delay, timeUnit)
try {
service.schedule(runnable,delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception => throw SchedulerException("Failed to scheduleOnce a Runnable", e)
}
}
2010-10-25 00:57:17 +02:00
def shutdown: Unit = synchronized {
log.info("Shutting down Scheduler")
2009-08-17 18:42:41 +02:00
service.shutdown
}
2010-10-25 00:57:17 +02:00
def restart: Unit = synchronized {
log.info("Restarting Scheduler")
2010-05-21 08:18:56 +02:00
shutdown
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
}
}
2009-08-17 18:42:41 +02:00
private object SchedulerThreadFactory extends ThreadFactory {
private var count = 0
val threadFactory = Executors.defaultThreadFactory()
2009-08-17 20:46:05 +02:00
2009-08-17 18:42:41 +02:00
def newThread(r: Runnable): Thread = {
val thread = threadFactory.newThread(r)
thread.setName("akka:scheduler-" + count)
2009-08-17 18:42:41 +02:00
thread.setDaemon(true)
thread
}
}