From c8c12ab56b8303918e54c80713561a5b4f44a6cb Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 11 Jul 2011 15:21:29 +0200 Subject: [PATCH] Fixing ticket #1005 by using WeakReference for LocalActorRefs --- .../src/main/scala/akka/actor/Scheduler.scala | 35 ++++++++++++------- 1 file changed, 22 insertions(+), 13 deletions(-) diff --git a/akka-actor/src/main/scala/akka/actor/Scheduler.scala b/akka-actor/src/main/scala/akka/actor/Scheduler.scala index c6c978275f..4096188a88 100644 --- a/akka-actor/src/main/scala/akka/actor/Scheduler.scala +++ b/akka-actor/src/main/scala/akka/actor/Scheduler.scala @@ -15,13 +15,12 @@ */ package akka.actor -import scala.collection.JavaConversions - -import java.util.concurrent._ - import akka.event.EventHandler import akka.AkkaException -import atomic.AtomicLong +import java.util.concurrent.atomic.AtomicLong +import java.lang.ref.WeakReference +import java.util.concurrent._ +import java.lang.RuntimeException object Scheduler { import Actor._ @@ -31,16 +30,28 @@ object Scheduler { @volatile private var service = Executors.newSingleThreadScheduledExecutor(SchedulerThreadFactory) + private def createSendRunnable(receiver: ActorRef, message: Any, throwWhenReceiverExpired: Boolean): Runnable = { + receiver match { + case local: LocalActorRef => + val ref = new WeakReference[ActorRef](local) + new Runnable { + def run = ref.get match { + case null => if(throwWhenReceiverExpired) throw new RuntimeException("Receiver not found: GC:ed") + case actor => actor ! message + } + } + case other => new Runnable { def run = other ! message } + } + } + /** * Schedules to send the specified message to the receiver after initialDelay and then repeated after delay. * The returned java.util.concurrent.ScheduledFuture can be used to cancel the * send of the message. */ - def schedule(receiver: ActorRef, message: AnyRef, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { + def schedule(receiver: ActorRef, message: Any, initialDelay: Long, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { try { - service.scheduleAtFixedRate( - new Runnable { def run = receiver ! message }, - initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + service.scheduleAtFixedRate(createSendRunnable(receiver, message, true), initialDelay, delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception ⇒ val error = SchedulerException(message + " could not be scheduled on " + receiver, e) @@ -80,11 +91,9 @@ object Scheduler { * The returned java.util.concurrent.ScheduledFuture can be used to cancel the * send of the message. */ - def scheduleOnce(receiver: ActorRef, message: AnyRef, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { + def scheduleOnce(receiver: ActorRef, message: Any, delay: Long, timeUnit: TimeUnit): ScheduledFuture[AnyRef] = { try { - service.schedule( - new Runnable { def run = receiver ! message }, - delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] + service.schedule(createSendRunnable(receiver, message, false), delay, timeUnit).asInstanceOf[ScheduledFuture[AnyRef]] } catch { case e: Exception ⇒ val error = SchedulerException(message + " could not be scheduleOnce'd on " + receiver, e)