#1145 - Changing private[akka] to protected[akka] in MessageDispatcher so that inheriting classes can access those methods

This commit is contained in:
Viktor Klang 2011-08-30 15:50:52 +02:00
parent 11b2e10fa5
commit 548ba08cb5
7 changed files with 40 additions and 42 deletions

View file

@ -113,27 +113,27 @@ object ActorModelSpec {
getStats(actorRef).resumes.incrementAndGet() getStats(actorRef).resumes.incrementAndGet()
} }
private[akka] abstract override def register(actorRef: LocalActorRef) { protected[akka] abstract override def register(actorRef: LocalActorRef) {
super.register(actorRef) super.register(actorRef)
getStats(actorRef).registers.incrementAndGet() getStats(actorRef).registers.incrementAndGet()
} }
private[akka] abstract override def unregister(actorRef: LocalActorRef) { protected[akka] abstract override def unregister(actorRef: LocalActorRef) {
super.unregister(actorRef) super.unregister(actorRef)
getStats(actorRef).unregisters.incrementAndGet() getStats(actorRef).unregisters.incrementAndGet()
} }
private[akka] abstract override def dispatch(invocation: MessageInvocation) { protected[akka] abstract override def dispatch(invocation: MessageInvocation) {
getStats(invocation.receiver).msgsReceived.incrementAndGet() getStats(invocation.receiver).msgsReceived.incrementAndGet()
super.dispatch(invocation) super.dispatch(invocation)
} }
private[akka] abstract override def start() { protected[akka] abstract override def start() {
super.start() super.start()
starts.incrementAndGet() starts.incrementAndGet()
} }
private[akka] abstract override def shutdown() { protected[akka] abstract override def shutdown() {
super.shutdown() super.shutdown()
stops.incrementAndGet() stops.incrementAndGet()
} }

View file

@ -55,7 +55,7 @@ class BalancingDispatcher(
private var members = Vector[LocalActorRef]() private var members = Vector[LocalActorRef]()
private val donationInProgress = new DynamicVariable(false) private val donationInProgress = new DynamicVariable(false)
private[akka] override def register(actorRef: LocalActorRef) = { protected[akka] override def register(actorRef: LocalActorRef) = {
//Verify actor type conformity //Verify actor type conformity
actorType match { actorType match {
case None actorType = Some(actorRef.actorInstance.get().getClass) case None actorType = Some(actorRef.actorInstance.get().getClass)
@ -70,12 +70,12 @@ class BalancingDispatcher(
super.register(actorRef) super.register(actorRef)
} }
private[akka] override def unregister(actorRef: LocalActorRef) = { protected[akka] override def unregister(actorRef: LocalActorRef) = {
synchronized { members = members.filterNot(actorRef eq) } //Update members synchronized { members = members.filterNot(actorRef eq) } //Update members
super.unregister(actorRef) super.unregister(actorRef)
} }
override private[akka] def dispatch(invocation: MessageInvocation) = { override protected[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver) val mbox = getMailbox(invocation.receiver)
if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) { if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) {
//We were busy and we got to donate the message to some other lucky guy, we're done here //We were busy and we got to donate the message to some other lucky guy, we're done here
@ -85,7 +85,7 @@ class BalancingDispatcher(
} }
} }
override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { override protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
try { try {
donationInProgress.value = true donationInProgress.value = true
while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor

View file

@ -87,16 +87,16 @@ class Dispatcher(
val name = "akka:event-driven:dispatcher:" + _name val name = "akka:event-driven:dispatcher:" + _name
private[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name)
private[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
private[akka] def dispatch(invocation: MessageInvocation) = { protected[akka] def dispatch(invocation: MessageInvocation) = {
val mbox = getMailbox(invocation.receiver) val mbox = getMailbox(invocation.receiver)
mbox enqueue invocation mbox enqueue invocation
registerForExecution(mbox) registerForExecution(mbox)
} }
private[akka] def executeTask(invocation: TaskInvocation): Unit = if (active.isOn) { protected[akka] def executeTask(invocation: TaskInvocation): Unit = if (active.isOn) {
try executorService.get() execute invocation try executorService.get() execute invocation
catch { catch {
case e: RejectedExecutionException case e: RejectedExecutionException
@ -131,16 +131,16 @@ class Dispatcher(
} }
} }
private[akka] def start {} protected[akka] def start {}
private[akka] def shutdown { protected[akka] def shutdown {
val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService))
if (old ne null) { if (old ne null) {
old.shutdownNow() old.shutdownNow()
} }
} }
private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { protected[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = {
if (mbox.dispatcherLock.tryLock()) { if (mbox.dispatcherLock.tryLock()) {
if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended
try { try {
@ -157,7 +157,7 @@ class Dispatcher(
} }
} }
private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit =
registerForExecution(mbox) registerForExecution(mbox)
protected override def cleanUpMailboxFor(actorRef: LocalActorRef) { protected override def cleanUpMailboxFor(actorRef: LocalActorRef) {

View file

@ -63,7 +63,7 @@ abstract class MessageDispatcher {
/** /**
* Creates and returns a mailbox for the given actor. * Creates and returns a mailbox for the given actor.
*/ */
private[akka] def createMailbox(actorRef: LocalActorRef): AnyRef protected[akka] def createMailbox(actorRef: LocalActorRef): AnyRef
/** /**
* Name of this dispatcher. * Name of this dispatcher.
@ -88,11 +88,9 @@ abstract class MessageDispatcher {
} }
} }
private[akka] final def dispatchMessage(invocation: MessageInvocation) { protected[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation)
dispatch(invocation)
}
private[akka] final def dispatchTask(block: () Unit): Unit = { protected[akka] final def dispatchTask(block: () Unit): Unit = {
_tasks.getAndIncrement() _tasks.getAndIncrement()
try { try {
if (active.isOff) if (active.isOff)
@ -129,7 +127,7 @@ abstract class MessageDispatcher {
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "attach" for the only invocation * and only call it under the dispatcher-guard, see "attach" for the only invocation
*/ */
private[akka] def register(actorRef: LocalActorRef) { protected[akka] def register(actorRef: LocalActorRef) {
if (actorRef.mailbox eq null) if (actorRef.mailbox eq null)
actorRef.mailbox = createMailbox(actorRef) actorRef.mailbox = createMailbox(actorRef)
@ -145,7 +143,7 @@ abstract class MessageDispatcher {
* Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER,
* and only call it under the dispatcher-guard, see "detach" for the only invocation * and only call it under the dispatcher-guard, see "detach" for the only invocation
*/ */
private[akka] def unregister(actorRef: LocalActorRef) = { protected[akka] def unregister(actorRef: LocalActorRef) = {
if (uuids remove actorRef.uuid) { if (uuids remove actorRef.uuid) {
cleanUpMailboxFor(actorRef) cleanUpMailboxFor(actorRef)
actorRef.mailbox = null actorRef.mailbox = null
@ -206,7 +204,7 @@ abstract class MessageDispatcher {
* When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms
* defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second
*/ */
private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis protected[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis
/** /**
* After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference
@ -221,19 +219,19 @@ abstract class MessageDispatcher {
/** /**
* Will be called when the dispatcher is to queue an invocation for execution * Will be called when the dispatcher is to queue an invocation for execution
*/ */
private[akka] def dispatch(invocation: MessageInvocation) protected[akka] def dispatch(invocation: MessageInvocation)
private[akka] def executeTask(invocation: TaskInvocation) protected[akka] def executeTask(invocation: TaskInvocation)
/** /**
* Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown
*/ */
private[akka] def start() protected[akka] def start(): Unit
/** /**
* Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached
*/ */
private[akka] def shutdown() protected[akka] def shutdown(): Unit
/** /**
* Returns the size of the mailbox for the specified actor * Returns the size of the mailbox for the specified actor

View file

@ -30,17 +30,17 @@ class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxTyp
def this() = this(Dispatchers.MAILBOX_TYPE) def this() = this(Dispatchers.MAILBOX_TYPE)
private[akka] val owner = new AtomicReference[ActorRef](_actor) protected[akka] val owner = new AtomicReference[ActorRef](_actor)
//Relies on an external lock provided by MessageDispatcher.attach //Relies on an external lock provided by MessageDispatcher.attach
private[akka] override def register(actorRef: LocalActorRef) = { protected[akka] override def register(actorRef: LocalActorRef) = {
val actor = owner.get() val actor = owner.get()
if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor)
owner.compareAndSet(null, actorRef) //Register if unregistered owner.compareAndSet(null, actorRef) //Register if unregistered
super.register(actorRef) super.register(actorRef)
} }
//Relies on an external lock provided by MessageDispatcher.detach //Relies on an external lock provided by MessageDispatcher.detach
private[akka] override def unregister(actorRef: LocalActorRef) = { protected[akka] override def unregister(actorRef: LocalActorRef) = {
super.unregister(actorRef) super.unregister(actorRef)
owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak) owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak)
} }

View file

@ -90,7 +90,7 @@ case class DurableDispatcher(
override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef) override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef)
private[akka] override def dispatch(invocation: MessageInvocation): Unit = { protected[akka] override def dispatch(invocation: MessageInvocation): Unit = {
if (invocation.channel.isInstanceOf[ActorPromise]) if (invocation.channel.isInstanceOf[ActorPromise])
throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from ?") throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from ?")
super.dispatch(invocation) super.dispatch(invocation)
@ -127,7 +127,7 @@ case class DurablePinnedDispatcher(
override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef) override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef)
private[akka] override def dispatch(invocation: MessageInvocation): Unit = { protected[akka] override def dispatch(invocation: MessageInvocation): Unit = {
if (invocation.channel.isInstanceOf[ActorPromise]) if (invocation.channel.isInstanceOf[ActorPromise])
throw new IllegalArgumentException("Actor has a durable mailbox that does not support ?") throw new IllegalArgumentException("Actor has a durable mailbox that does not support ?")
super.dispatch(invocation) super.dispatch(invocation)

View file

@ -43,7 +43,7 @@ object CallingThreadDispatcher {
queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty)
} }
private[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized { protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized {
if (queues contains mbox) { if (queues contains mbox) {
val newSet = queues(mbox) + new WeakReference(q) val newSet = queues(mbox) + new WeakReference(q)
queues += mbox -> newSet queues += mbox -> newSet
@ -58,7 +58,7 @@ object CallingThreadDispatcher {
* given mailbox. When this method returns, the queue will be entered * given mailbox. When this method returns, the queue will be entered
* (active). * (active).
*/ */
private[akka] def gatherFromAllInactiveQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized { protected[akka] def gatherFromAllInactiveQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized {
if (!own.isActive) own.enter if (!own.isActive) own.enter
if (queues contains mbox) { if (queues contains mbox) {
for { for {
@ -107,15 +107,15 @@ object CallingThreadDispatcher {
class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher { class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher {
import CallingThreadDispatcher._ import CallingThreadDispatcher._
private[akka] override def createMailbox(actor: LocalActorRef) = new CallingThreadMailbox protected[akka] override def createMailbox(actor: LocalActorRef) = new CallingThreadMailbox
private def getMailbox(actor: LocalActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] private def getMailbox(actor: LocalActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox]
private[akka] override def start() {} protected[akka] override def start() {}
private[akka] override def shutdown() {} protected[akka] override def shutdown() {}
private[akka] override def timeoutMs = 100L protected[akka] override def timeoutMs = 100L
override def suspend(actor: LocalActorRef) { override def suspend(actor: LocalActorRef) {
getMailbox(actor).suspended.switchOn getMailbox(actor).suspended.switchOn
@ -137,7 +137,7 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).queue.isEmpty override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).queue.isEmpty
private[akka] override def dispatch(handle: MessageInvocation) { protected[akka] override def dispatch(handle: MessageInvocation) {
val mbox = getMailbox(handle.receiver) val mbox = getMailbox(handle.receiver)
val queue = mbox.queue val queue = mbox.queue
val execute = mbox.suspended.fold { val execute = mbox.suspended.fold {
@ -161,7 +161,7 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings:
if (execute) runQueue(mbox, queue) if (execute) runQueue(mbox, queue)
} }
private[akka] override def executeTask(invocation: TaskInvocation) { invocation.run } protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run }
/* /*
* This method must be called with this thread's queue, which must already * This method must be called with this thread's queue, which must already