diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index fcbba357ad..f97576fbb6 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -9,8 +9,10 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. + * + * Rework of David Pollak's ActorPing class in the Lift Project + * which is licensed under the Apache 2 License. */ - package se.scalablesolutions.akka.actor import java.util.concurrent._ @@ -19,59 +21,50 @@ import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy} import se.scalablesolutions.akka.util.Logging -case object UnSchedule -case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e) - -/** - * Rework of David Pollak's ActorPing class in the Lift Project - * which is licensed under the Apache 2 License. - */ -class ScheduleActor(val receiver: ActorRef, val future: ScheduledFuture[AnyRef]) extends Actor with Logging { - self.lifeCycle = Some(LifeCycle(Permanent)) - - def receive = { - case UnSchedule => - Scheduler.stopSupervising(self) - future.cancel(true) - exit - } -} - -object Scheduler extends Actor { +object Scheduler { import Actor._ + case object UnSchedule + case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e) + private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef] - self.faultHandler = Some(OneForOneStrategy(5, 5000)) - self.trapExit = List(classOf[Throwable]) def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit) = { try { - self.startLink(actorOf(new ScheduleActor( - receiver, - service.scheduleAtFixedRate(new java.lang.Runnable { - def run = receiver ! message; - }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]))) + val future = service.scheduleAtFixedRate( + new Runnable { def run = receiver ! message }, + initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + val scheduler = actorOf(new ScheduleActor(future)).start + schedulers.put(scheduler, scheduler) } catch { case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e) } } - def restart = service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - - def stopSupervising(actorRef: ActorRef) = { - self.unlink(actorRef) + def unschedule(actorRef: ActorRef) = { + actorRef ! UnSchedule schedulers.remove(actorRef) } - override def shutdown = { + def shutdown = { import scala.collection.JavaConversions._ schedulers.values.foreach(_ ! UnSchedule) + schedulers.clear service.shutdown } + def restart = { + shutdown + service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) + } +} + +private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor with Logging { def receive = { - case _ => {} // ignore all messages + case Scheduler.UnSchedule => + future.cancel(true) + exit } }