Fixing ticket #1005 by using WeakReference for LocalActorRefs

This commit is contained in:
Viktor Klang 2011-07-11 15:21:29 +02:00
parent 24250d0da6
commit c8c12ab56b

View file

@ -15,13 +15,12 @@
*/
package akka.actor
import scala.collection.JavaConversions
import java.util.concurrent._
import akka.event.EventHandler
import akka.AkkaException
import atomic.AtomicLong
import java.util.concurrent.atomic.AtomicLong
import java.lang.ref.WeakReference
import java.util.concurrent._
import java.lang.RuntimeException
object Scheduler {
import Actor._
@ -31,16 +30,28 @@ object Scheduler {
@volatile
private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory)
private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = {
receiver match {
case local: LocalActorRef =>
val ref = new WeakReference[ActorRef](local)
new Runnable {
def run = ref.get match {
case null => if(throwWhenReceiverExpired) throw new RuntimeException("Receiver not found: GC:ed")
case actor => actor ! message
}
}
case other => new Runnable { def run = other ! message }
}
}
/**
* Schedules to send the specified message to the receiver after initialDelay and then repeated after delay.
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* send of the message.
*/
def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
try {
service.scheduleAtFixedRate(
new Runnable { def run = receiver ! message },
initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
service.scheduleAtFixedRate(createSendRunnable(receiver, message, true), initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception
val error = SchedulerException(message + " could not be scheduled on " + receiver, e)
@ -80,11 +91,9 @@ object Scheduler {
* The returned java.util.concurrent.ScheduledFuture can be used to cancel the
* send of the message.
*/
def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = {
try {
service.schedule(
new Runnable { def run = receiver ! message },
delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
service.schedule(createSendRunnable(receiver, message, false), delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]]
} catch {
case e: Exception
val error = SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e)