diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 2fd205edeb..39a1ff774c 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -24,13 +24,13 @@ import org.multiverse.api.exceptions.DeadTransactionException import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ConcurrentHashMap, TimeUnit} import java.util.{Map => JMap} import java.lang.reflect.Field import jsr166x.{Deque, ConcurrentLinkedDeque} import com.google.protobuf.ByteString +import java.util.concurrent.{ScheduledFuture, ConcurrentHashMap, TimeUnit} /** * ActorRef is an immutable and serializable handle to an Actor. @@ -72,7 +72,7 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] @volatile protected[this] var _isShutDown = false @volatile protected[akka] var _isBeingRestarted = false @volatile protected[akka] var _homeAddress = new InetSocketAddress(RemoteServer.HOSTNAME, RemoteServer.PORT) - @volatile protected[akka] var _timeoutActor: Option[ActorRef] = None + @volatile protected[akka] var _futureTimeout: Option[ScheduledFuture[AnyRef]] = None @volatile protected[akka] var startOnCreation = false @volatile protected[akka] var registeredInRemoteNodeDuringSerialization = false protected[this] val guard = new ReentrantGuard @@ -559,14 +559,16 @@ trait ActorRef extends TransactionManagement with java.lang.Comparable[ActorRef] cancelReceiveTimeout receiveTimeout.foreach { time => log.debug("Scheduling timeout for %s", this) - _timeoutActor = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS)) + _futureTimeout = Some(Scheduler.scheduleOnce(this, ReceiveTimeout, time, TimeUnit.MILLISECONDS)) } } - protected[akka] def cancelReceiveTimeout = _timeoutActor.foreach { timeoutActor => - if (timeoutActor.isRunning) Scheduler.unschedule(timeoutActor) - _timeoutActor = None - log.debug("Timeout canceled for %s", this) + protected[akka] def cancelReceiveTimeout = { + if(_futureTimeout.isDefined) { + _futureTimeout.get.cancel(true) + _futureTimeout = None + log.debug("Timeout canceled for %s", this) + } } } diff --git a/akka-core/src/main/scala/actor/Scheduler.scala b/akka-core/src/main/scala/actor/Scheduler.scala index 6f4f099bb2..07486521ec 100644 --- a/akka-core/src/main/scala/actor/Scheduler.scala +++ b/akka-core/src/main/scala/actor/Scheduler.scala @@ -23,59 +23,39 @@ import se.scalablesolutions.akka.util.Logging object Scheduler extends Logging { import Actor._ - case object UnSchedule case class SchedulerException(msg: String, e: Throwable) extends RuntimeException(msg, e) private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) - private val schedulers = new ConcurrentHashMap[ActorRef, ActorRef] + log.info("Starting up Scheduler") - def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ActorRef = { + def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { log.trace( "Schedule scheduled event\n\tevent = [%s]\n\treceiver = [%s]\n\tinitialDelay = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", message, receiver, initialDelay, delay, timeUnit) try { - val future = service.scheduleAtFixedRate( + service.scheduleAtFixedRate( new Runnable { def run = receiver ! message }, initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - createAndStoreScheduleActorForFuture(future) - val scheduler = actorOf(new ScheduleActor(future)).start - schedulers.put(scheduler, scheduler) - scheduler } catch { case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e) } } - def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ActorRef = { + def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { log.trace( "Schedule one-time event\n\tevent = [%s]\n\treceiver = [%s]\n\tdelay = [%s]\n\ttimeUnit = [%s]", message, receiver, delay, timeUnit) try { - val future = service.schedule( + service.schedule( new Runnable { def run = receiver ! message }, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] - createAndStoreScheduleActorForFuture(future) } catch { case e => throw SchedulerException(message + " could not be scheduled on " + receiver, e) } } - private def createAndStoreScheduleActorForFuture(future: ScheduledFuture[AnyRef]): ActorRef = { - val scheduler = actorOf(new ScheduleActor(future)).start - schedulers.put(scheduler, scheduler) - scheduler - } - - def unschedule(scheduleActor: ActorRef) = { - scheduleActor ! UnSchedule - schedulers.remove(scheduleActor) - } - def shutdown = { log.info("Shutting down Scheduler") - import scala.collection.JavaConversions._ - schedulers.values.foreach(_ ! UnSchedule) - schedulers.clear service.shutdown } @@ -86,15 +66,6 @@ object Scheduler extends Logging { } } -private class ScheduleActor(future: ScheduledFuture[AnyRef]) extends Actor { - def receive = { - case Scheduler.UnSchedule => - Scheduler.log.trace("Unschedule event handled by scheduleActor\n\tactorRef = [%s]", self.toString) - future.cancel(true) - self.stop - } -} - private object SchedulerThreadFactory extends ThreadFactory { private var count = 0 val threadFactory = Executors.defaultThreadFactory()