diff --git a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala index 1849f7d143..6e585e2ef0 100644 --- a/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/dispatch/ActorModelSpec.scala @@ -113,27 +113,27 @@ object ActorModelSpec { getStats(actorRef).resumes.incrementAndGet() } - private[akka] abstract override def register(actorRef: LocalActorRef) { + protected[akka] abstract override def register(actorRef: LocalActorRef) { super.register(actorRef) getStats(actorRef).registers.incrementAndGet() } - private[akka] abstract override def unregister(actorRef: LocalActorRef) { + protected[akka] abstract override def unregister(actorRef: LocalActorRef) { super.unregister(actorRef) getStats(actorRef).unregisters.incrementAndGet() } - private[akka] abstract override def dispatch(invocation: MessageInvocation) { + protected[akka] abstract override def dispatch(invocation: MessageInvocation) { getStats(invocation.receiver).msgsReceived.incrementAndGet() super.dispatch(invocation) } - private[akka] abstract override def start() { + protected[akka] abstract override def start() { super.start() starts.incrementAndGet() } - private[akka] abstract override def shutdown() { + protected[akka] abstract override def shutdown() { super.shutdown() stops.incrementAndGet() } diff --git a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala index ffff55422d..04afac83e6 100644 --- a/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/BalancingDispatcher.scala @@ -55,7 +55,7 @@ class BalancingDispatcher( private var members = Vector[LocalActorRef]() private val donationInProgress = new DynamicVariable(false) - private[akka] override def register(actorRef: LocalActorRef) = { + protected[akka] override def register(actorRef: LocalActorRef) = { //Verify actor type conformity actorType match { case None ⇒ actorType = Some(actorRef.actorInstance.get().getClass) @@ -70,12 +70,12 @@ class BalancingDispatcher( super.register(actorRef) } - private[akka] override def unregister(actorRef: LocalActorRef) = { + protected[akka] override def unregister(actorRef: LocalActorRef) = { synchronized { members = members.filterNot(actorRef eq) } //Update members super.unregister(actorRef) } - override private[akka] def dispatch(invocation: MessageInvocation) = { + override protected[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) if (donationInProgress.value == false && (!mbox.isEmpty || mbox.dispatcherLock.locked) && attemptDonationOf(invocation, mbox)) { //We were busy and we got to donate the message to some other lucky guy, we're done here @@ -85,7 +85,7 @@ class BalancingDispatcher( } } - override private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { + override protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { try { donationInProgress.value = true while (donateFrom(mbox)) {} //When we reregister, first donate messages to another actor diff --git a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala index 410e3120cc..eaa5fc613b 100644 --- a/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/Dispatcher.scala @@ -87,16 +87,16 @@ class Dispatcher( val name = "akka:event-driven:dispatcher:" + _name - private[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) - private[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) + protected[akka] val executorServiceFactory = executorServiceFactoryProvider.createExecutorServiceFactory(name) + protected[akka] val executorService = new AtomicReference[ExecutorService](new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) - private[akka] def dispatch(invocation: MessageInvocation) = { + protected[akka] def dispatch(invocation: MessageInvocation) = { val mbox = getMailbox(invocation.receiver) mbox enqueue invocation registerForExecution(mbox) } - private[akka] def executeTask(invocation: TaskInvocation): Unit = if (active.isOn) { + protected[akka] def executeTask(invocation: TaskInvocation): Unit = if (active.isOn) { try executorService.get() execute invocation catch { case e: RejectedExecutionException ⇒ @@ -131,16 +131,16 @@ class Dispatcher( } } - private[akka] def start {} + protected[akka] def start {} - private[akka] def shutdown { + protected[akka] def shutdown { val old = executorService.getAndSet(new LazyExecutorServiceWrapper(executorServiceFactory.createExecutorService)) if (old ne null) { old.shutdownNow() } } - private[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { + protected[akka] def registerForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = { if (mbox.dispatcherLock.tryLock()) { if (active.isOn && !mbox.suspended.locked) { //If the dispatcher is active and the actor not suspended try { @@ -157,7 +157,7 @@ class Dispatcher( } } - private[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = + protected[akka] def reRegisterForExecution(mbox: MessageQueue with ExecutableMailbox): Unit = registerForExecution(mbox) protected override def cleanUpMailboxFor(actorRef: LocalActorRef) { diff --git a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala index 9eefc29e53..f57d374d4a 100644 --- a/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala +++ b/akka-actor/src/main/scala/akka/dispatch/MessageHandling.scala @@ -63,7 +63,7 @@ abstract class MessageDispatcher { /** * Creates and returns a mailbox for the given actor. */ - private[akka] def createMailbox(actorRef: LocalActorRef): AnyRef + protected[akka] def createMailbox(actorRef: LocalActorRef): AnyRef /** * Name of this dispatcher. @@ -88,11 +88,9 @@ abstract class MessageDispatcher { } } - private[akka] final def dispatchMessage(invocation: MessageInvocation) { - dispatch(invocation) - } + protected[akka] final def dispatchMessage(invocation: MessageInvocation): Unit = dispatch(invocation) - private[akka] final def dispatchTask(block: () ⇒ Unit): Unit = { + protected[akka] final def dispatchTask(block: () ⇒ Unit): Unit = { _tasks.getAndIncrement() try { if (active.isOff) @@ -129,7 +127,7 @@ abstract class MessageDispatcher { * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, * and only call it under the dispatcher-guard, see "attach" for the only invocation */ - private[akka] def register(actorRef: LocalActorRef) { + protected[akka] def register(actorRef: LocalActorRef) { if (actorRef.mailbox eq null) actorRef.mailbox = createMailbox(actorRef) @@ -145,7 +143,7 @@ abstract class MessageDispatcher { * Only "private[akka] for the sake of intercepting calls, DO NOT CALL THIS OUTSIDE OF THE DISPATCHER, * and only call it under the dispatcher-guard, see "detach" for the only invocation */ - private[akka] def unregister(actorRef: LocalActorRef) = { + protected[akka] def unregister(actorRef: LocalActorRef) = { if (uuids remove actorRef.uuid) { cleanUpMailboxFor(actorRef) actorRef.mailbox = null @@ -206,7 +204,7 @@ abstract class MessageDispatcher { * When the dispatcher no longer has any actors registered, how long will it wait until it shuts itself down, in Ms * defaulting to your akka configs "akka.actor.dispatcher-shutdown-timeout" or otherwise, 1 Second */ - private[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis + protected[akka] def timeoutMs: Long = Dispatchers.DEFAULT_SHUTDOWN_TIMEOUT.toMillis /** * After the call to this method, the dispatcher mustn't begin any new message processing for the specified reference @@ -221,19 +219,19 @@ abstract class MessageDispatcher { /** * Will be called when the dispatcher is to queue an invocation for execution */ - private[akka] def dispatch(invocation: MessageInvocation) + protected[akka] def dispatch(invocation: MessageInvocation) - private[akka] def executeTask(invocation: TaskInvocation) + protected[akka] def executeTask(invocation: TaskInvocation) /** * Called one time every time an actor is attached to this dispatcher and this dispatcher was previously shutdown */ - private[akka] def start() + protected[akka] def start(): Unit /** * Called one time every time an actor is detached from this dispatcher and this dispatcher has no actors left attached */ - private[akka] def shutdown() + protected[akka] def shutdown(): Unit /** * Returns the size of the mailbox for the specified actor diff --git a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala index 7d3e539c1c..b2922469a3 100644 --- a/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala +++ b/akka-actor/src/main/scala/akka/dispatch/PinnedDispatcher.scala @@ -30,17 +30,17 @@ class PinnedDispatcher(_actor: ActorRef, _name: String, _mailboxType: MailboxTyp def this() = this(Dispatchers.MAILBOX_TYPE) - private[akka] val owner = new AtomicReference[ActorRef](_actor) + protected[akka] val owner = new AtomicReference[ActorRef](_actor) //Relies on an external lock provided by MessageDispatcher.attach - private[akka] override def register(actorRef: LocalActorRef) = { + protected[akka] override def register(actorRef: LocalActorRef) = { val actor = owner.get() if ((actor ne null) && actorRef != actor) throw new IllegalArgumentException("Cannot register to anyone but " + actor) owner.compareAndSet(null, actorRef) //Register if unregistered super.register(actorRef) } //Relies on an external lock provided by MessageDispatcher.detach - private[akka] override def unregister(actorRef: LocalActorRef) = { + protected[akka] override def unregister(actorRef: LocalActorRef) = { super.unregister(actorRef) owner.compareAndSet(actorRef, null) //Unregister (prevent memory leak) } diff --git a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala index 7262992759..dc5b0637b7 100644 --- a/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala +++ b/akka-durable-mailboxes/akka-mailboxes-common/src/main/scala/akka/actor/mailbox/DurableDispatcher.scala @@ -90,7 +90,7 @@ case class DurableDispatcher( override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef) - private[akka] override def dispatch(invocation: MessageInvocation): Unit = { + protected[akka] override def dispatch(invocation: MessageInvocation): Unit = { if (invocation.channel.isInstanceOf[ActorPromise]) throw new IllegalArgumentException("Durable mailboxes do not support Future-based messages from ?") super.dispatch(invocation) @@ -127,7 +127,7 @@ case class DurablePinnedDispatcher( override def createMailbox(actorRef: LocalActorRef): AnyRef = _storage.createFor(actorRef) - private[akka] override def dispatch(invocation: MessageInvocation): Unit = { + protected[akka] override def dispatch(invocation: MessageInvocation): Unit = { if (invocation.channel.isInstanceOf[ActorPromise]) throw new IllegalArgumentException("Actor has a durable mailbox that does not support ?") super.dispatch(invocation) diff --git a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala index 51fba43376..2657faf868 100644 --- a/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala +++ b/akka-testkit/src/main/scala/akka/testkit/CallingThreadDispatcher.scala @@ -43,7 +43,7 @@ object CallingThreadDispatcher { queues = queues mapValues (_ filter (_.get ne null)) filter (!_._2.isEmpty) } - private[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized { + protected[akka] def registerQueue(mbox: CallingThreadMailbox, q: NestingQueue): Unit = synchronized { if (queues contains mbox) { val newSet = queues(mbox) + new WeakReference(q) queues += mbox -> newSet @@ -58,7 +58,7 @@ object CallingThreadDispatcher { * given mailbox. When this method returns, the queue will be entered * (active). */ - private[akka] def gatherFromAllInactiveQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized { + protected[akka] def gatherFromAllInactiveQueues(mbox: CallingThreadMailbox, own: NestingQueue): Unit = synchronized { if (!own.isActive) own.enter if (queues contains mbox) { for { @@ -107,15 +107,15 @@ object CallingThreadDispatcher { class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: Boolean = true) extends MessageDispatcher { import CallingThreadDispatcher._ - private[akka] override def createMailbox(actor: LocalActorRef) = new CallingThreadMailbox + protected[akka] override def createMailbox(actor: LocalActorRef) = new CallingThreadMailbox private def getMailbox(actor: LocalActorRef) = actor.mailbox.asInstanceOf[CallingThreadMailbox] - private[akka] override def start() {} + protected[akka] override def start() {} - private[akka] override def shutdown() {} + protected[akka] override def shutdown() {} - private[akka] override def timeoutMs = 100L + protected[akka] override def timeoutMs = 100L override def suspend(actor: LocalActorRef) { getMailbox(actor).suspended.switchOn @@ -137,7 +137,7 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: override def mailboxIsEmpty(actorRef: LocalActorRef): Boolean = getMailbox(actorRef).queue.isEmpty - private[akka] override def dispatch(handle: MessageInvocation) { + protected[akka] override def dispatch(handle: MessageInvocation) { val mbox = getMailbox(handle.receiver) val queue = mbox.queue val execute = mbox.suspended.fold { @@ -161,7 +161,7 @@ class CallingThreadDispatcher(val name: String = "calling-thread", val warnings: if (execute) runQueue(mbox, queue) } - private[akka] override def executeTask(invocation: TaskInvocation) { invocation.run } + protected[akka] override def executeTask(invocation: TaskInvocation) { invocation.run } /* * This method must be called with this thread's queue, which must already