Removing the old work redistribution

This commit is contained in:
Viktor Klang 2012-02-10 10:36:35 +01:00
parent 51a218b87f
commit f85645e46d

View file

@ -36,7 +36,6 @@ class BalancingDispatcher(
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) {
val buddies = new ConcurrentSkipListSet[ActorCell](akka.util.Helpers.IdentityHashComparator)
val rebalance = new AtomicBoolean(false)
val messageQueue: MessageQueue = mailboxType match {
case u: UnboundedMailbox new QueueBasedMessageQueue with UnboundedMessageQueueSemantics {
@ -84,30 +83,11 @@ class BalancingDispatcher(
protected[akka] override def unregister(actor: ActorCell) = {
buddies.remove(actor)
super.unregister(actor)
intoTheFray(except = actor) //When someone leaves, he tosses a friend into the fray
}
def intoTheFray(except: ActorCell): Unit =
if (rebalance.compareAndSet(false, true)) {
try {
val i = buddies.iterator()
@tailrec
def throwIn(): Unit = {
val n = if (i.hasNext) i.next() else null
if (n eq null) ()
else if ((n ne except) && registerForExecution(n.mailbox, false, false)) ()
else throwIn()
}
throwIn()
} finally {
rebalance.set(false)
}
}
override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
registerForExecution(receiver.mailbox, false, false)
intoTheFray(except = receiver)
//Somewhere around here we have to make sure that not only the intended actor is kept busy
}
}