Fixed regression in Scheduler

This commit is contained in:
Jonas Bonér 2010-05-21 08:18:56 +02:00
parent c34f084645
commit 86a8fd0d3b

View file

@ -9,8 +9,10 @@
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
*/
package se.scalablesolutions.akka.actor
import java.util.concurrent._
@ -19,59 +21,50 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.util.Logging
case object UnSchedule
case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
/**
* Rework of David Pollak's ActorPing class in the Lift Project
* which is licensed under the Apache 2 License.
*/
class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging {
self.lifeCycle = Some(LifeCycle(Permanent))
def receive = {
case UnSchedule =>
Scheduler.stopSupervising(self)
future.cancel(true)
exit
}
}
object Scheduler extends Actor {
object Scheduler {
import Actor._
case object UnSchedule
case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e)
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef]
self.faultHandler = Some(OneForOneStrategy(5, 5000))
self.trapExit = List(classOf[Throwable])
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = {
try {
self.startLink(actorOf(new ScheduleActor(
receiver,
service.scheduleAtFixedRate(new java.lang.Runnable {
def run = receiver ! message;
}, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]])))
val future = service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
val scheduler = actorOf(new ScheduleActor(future)).start
schedulers.put(scheduler, scheduler)
} catch {
case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e)
}
}
def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
def stopSupervising(actorRef: ActorRef) = {
self.unlink(actorRef)
def unschedule(actorRef: ActorRef) = {
actorRef ! UnSchedule
schedulers.remove(actorRef)
}
override def shutdown = {
def shutdown = {
import scala.collection.JavaConversions._
schedulers.values.foreach(_ ! UnSchedule)
schedulers.clear
service.shutdown
}
def restart = {
shutdown
service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
}
}
private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor with Logging {
def receive = {
case _ => {} // ignore all messages
case Scheduler.UnSchedule =>
future.cancel(true)
exit
}
}