diff --git a/akka-actor/src/main/scala/dispatch/Dispatchers.scala b/akka-actor/src/main/scala/dispatch/Dispatchers.scala index d7592d49b7..cd06fd8857 100644 --- a/akka-actor/src/main/scala/dispatch/Dispatchers.scala +++ b/akka-actor/src/main/scala/dispatch/Dispatchers.scala @@ -77,11 +77,12 @@ object Dispatchers extends Logging { *

* E.g. each actor consumes its own thread. */ - def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor, BoundedMailbox(true)) + def newThreadBasedDispatcher(actor: ActorRef) = new ThreadBasedDispatcher(actor) /** * Creates an thread based dispatcher serving a single actor through the same single thread. * Uses the default timeout + * If capacity is negative, it's Integer.MAX_VALUE *

* E.g. each actor consumes its own thread. */ @@ -89,6 +90,7 @@ object Dispatchers extends Logging { /** * Creates an thread based dispatcher serving a single actor through the same single thread. + * If capacity is negative, it's Integer.MAX_VALUE *

* E.g. each actor consumes its own thread. */ diff --git a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 247399818f..8ec4421c3e 100644 --- a/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-actor/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -10,30 +10,40 @@ import se.scalablesolutions.akka.util.Duration import java.util.Queue import java.util.concurrent.{ConcurrentLinkedQueue, BlockingQueue, TimeUnit, LinkedBlockingQueue} +import se.scalablesolutions.akka.actor +import java.util.concurrent.atomic.AtomicReference /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * * @author Jonas Bonér */ -class ThreadBasedDispatcher(private val actor: ActorRef, _mailboxType: MailboxType) +class ThreadBasedDispatcher(_actor: ActorRef, _mailboxType: MailboxType) extends ExecutorBasedEventDrivenDispatcher( - actor.getClass.getName + ":" + actor.uuid, - Dispatchers.THROUGHPUT, - -1, - _mailboxType, - ThreadBasedDispatcher.oneThread) { + _actor.uuid.toString,Dispatchers.THROUGHPUT,-1,_mailboxType,ThreadBasedDispatcher.oneThread) { - def this(actor: ActorRef) = this(actor, BoundedMailbox(true)) // For Java API + private[akka] val owner = new AtomicReference[ActorRef](_actor) - def this(actor: ActorRef, capacity: Int) = this(actor, BoundedMailbox(true, capacity)) + def this(actor: ActorRef) = + this(actor, UnboundedMailbox(true)) // For Java API - def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = this(actor, BoundedMailbox(true, capacity, pushTimeOut)) + def this(actor: ActorRef, capacity: Int) = + this(actor, BoundedMailbox(true, capacity)) //For Java API + + def this(actor: ActorRef, capacity: Int, pushTimeOut: Duration) = //For Java API + this(actor, BoundedMailbox(true, capacity, pushTimeOut)) override def register(actorRef: ActorRef) = { - if (actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) + val actor = owner.get() + if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) + owner.compareAndSet(null,actorRef) //Register if unregistered super.register(actorRef) } + + override def unregister(actorRef: ActorRef) = { + super.unregister(actorRef) + owner.compareAndSet(actorRef,null) //Unregister (prevent memory leak) + } } object ThreadBasedDispatcher { diff --git a/akka-remote/src/main/scala/remote/RemoteServer.scala b/akka-remote/src/main/scala/remote/RemoteServer.scala index deb3c05b87..f3c29c62f9 100644 --- a/akka-remote/src/main/scala/remote/RemoteServer.scala +++ b/akka-remote/src/main/scala/remote/RemoteServer.scala @@ -618,7 +618,7 @@ class RemoteServerHandler( log.info("Creating a new remote actor [%s:%s]", name, uuid) val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) else Class.forName(name) - val actorRef = Actor.actorOf(clazz.newInstance.asInstanceOf[Actor]) + val actorRef = Actor.actorOf(clazz.asInstanceOf[Class[_ <: Actor]]) actorRef.uuid = uuidFrom(uuid.getHigh,uuid.getLow) actorRef.id = id actorRef.timeout = timeout