Making ThreadBasedDispatcher Unbounded if no capacity specced and fix a possible mem leak in it

This commit is contained in:
Viktor Klang 2010-10-25 18:20:29 +02:00
parent b0001eafa9
commit f11d33953f
3 changed files with 24 additions and 12 deletions

View file

@ -77,11 +77,12 @@ object Dispatchers extends Logging {
* <p/> * <p/>
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */
def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor, BoundedMailbox(true)) def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor)
/** /**
* Creates an thread based dispatcher serving a single actor through the same single thread. * Creates an thread based dispatcher serving a single actor through the same single thread.
* Uses the default timeout * Uses the default timeout
* If capacity is negative, it's Integer.MAX_VALUE
* <p/> * <p/>
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */
@ -89,6 +90,7 @@ object Dispatchers extends Logging {
/** /**
* Creates an thread based dispatcher serving a single actor through the same single thread. * Creates an thread based dispatcher serving a single actor through the same single thread.
* If capacity is negative, it's Integer.MAX_VALUE
* <p/> * <p/>
* E.g. each actor consumes its own thread. * E.g. each actor consumes its own thread.
*/ */

View file

@ -10,30 +10,40 @@ import se.scalablesolutions.akka.util.Duration
import java.util.Queue import java.util.Queue
import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue} import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue}
import se.scalablesolutions.akka.actor
import java.util.concurrent.atomic.AtomicReference
/** /**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxType) class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType)
extends ExecutorBasedEventDrivenDispatcher( extends ExecutorBasedEventDrivenDispatcher(
actor.getClass.getName + ":" + actor.uuid, _actor.uuid.toString,Dispatchers.THROUGHPUT,-1,_mailboxType,ThreadBasedDispatcher.oneThread) {
Dispatchers.THROUGHPUT,
-1,
_mailboxType,
ThreadBasedDispatcher.oneThread) {
def this(actor: ActorRef) = this(actor, BoundedMailbox(true)) // For Java API private[akka] val owner = new AtomicReference[ActorRef](_actor)
def this(actor: ActorRef, capacity: Int) = this(actor, BoundedMailbox(true, capacity)) def this(actor: ActorRef) =
this(actor, UnboundedMailbox(true)) // For Java API
def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = this(actor, BoundedMailbox(true, capacity, pushTimeOut)) def this(actor: ActorRef, capacity: Int) =
this(actor, BoundedMailbox(true, capacity)) //For Java API
def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API
this(actor, BoundedMailbox(true, capacity, pushTimeOut))
override def register(actorRef: ActorRef) = { override def register(actorRef: ActorRef) = {
if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) val actor = owner.get()
if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner.compareAndSet(null,actorRef) //Register if unregistered
super.register(actorRef) super.register(actorRef)
} }
override def unregister(actorRef: ActorRef) = {
super.unregister(actorRef)
owner.compareAndSet(actorRef,null) //Unregister (prevent memory leak)
}
} }
object ThreadBasedDispatcher { object ThreadBasedDispatcher {

View file

@ -618,7 +618,7 @@ class RemoteServerHandler(
log.info("Creating a new remote actor [%s:%s]", name, uuid) log.info("Creating a new remote actor [%s:%s]", name, uuid)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name) else Class.forName(name)
val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor]) val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]])
actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow)
actorRef.id = id actorRef.id = id
actorRef.timeout = timeout actorRef.timeout = timeout