Merge branch 'master' of github.com:jboner/akka

This commit is contained in:
Roland 2012-02-13 22:05:51 +01:00
commit 6fbcfe62af
2 changed files with 22 additions and 23 deletions

View file

@ -366,14 +366,12 @@ abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with Defa
case e
dispatcher match {
case dispatcher: BalancingDispatcher
val buddies = dispatcher.buddies
val team = dispatcher.team
val mq = dispatcher.messageQueue
System.err.println("Buddies left: " + buddies.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants)
buddies.toArray sorted new Ordering[AnyRef] {
def compare(l: AnyRef, r: AnyRef) = (l, r) match {
case (ll: ActorCell, rr: ActorCell) ll.self.path.toString.compareTo(rr.self.path.toString)
}
System.err.println("Teammates left: " + team.size + " stopLatch: " + stopLatch.getCount + " inhab:" + dispatcher.inhabitants)
team.toArray sorted new Ordering[AnyRef] {
def compare(l: AnyRef, r: AnyRef) = (l, r) match { case (ll: ActorCell, rr: ActorCell) ll.self.path compareTo rr.self.path }
} foreach {
case cell: ActorCell
System.err.println(" - " + cell.self.path + " " + cell.isTerminated + " " + cell.mailbox.status + " " + cell.mailbox.numberOfMessages + " " + SystemMessage.size(cell.mailbox.systemDrain()))

View file

@ -35,7 +35,7 @@ class BalancingDispatcher(
attemptTeamWork: Boolean)
extends Dispatcher(_prerequisites, _id, throughput, throughputDeadlineTime, mailboxType, _executorServiceFactoryProvider, _shutdownTimeout) {
val buddies = new ConcurrentSkipListSet[ActorCell](
val team = new ConcurrentSkipListSet[ActorCell](
Helpers.identityHashComparator(new Comparator[ActorCell] {
def compare(l: ActorCell, r: ActorCell) = l.self.path compareTo r.self.path
}))
@ -82,31 +82,32 @@ class BalancingDispatcher(
}
}
protected[akka] override def register(actor: ActorCell) = {
protected[akka] override def register(actor: ActorCell): Unit = {
super.register(actor)
buddies.add(actor)
team.add(actor)
}
protected[akka] override def unregister(actor: ActorCell) = {
buddies.remove(actor)
protected[akka] override def unregister(actor: ActorCell): Unit = {
team.remove(actor)
super.unregister(actor)
scheduleOne()
teamWork()
}
override protected[akka] def dispatch(receiver: ActorCell, invocation: Envelope) = {
messageQueue.enqueue(receiver.self, invocation)
registerForExecution(receiver.mailbox, false, false)
scheduleOne()
teamWork()
}
@tailrec private def scheduleOne(i: Iterator[ActorCell] = buddies.iterator): Unit =
if (attemptTeamWork
&& messageQueue.hasMessages
&& i.hasNext
&& (executorService.get().executor match {
case lm: LoadMetrics lm.atFullThrottle == false
case other true
})
&& !registerForExecution(i.next.mailbox, false, false))
scheduleOne(i)
protected def teamWork(): Unit = if (attemptTeamWork) {
@tailrec def scheduleOne(i: Iterator[ActorCell] = team.iterator): Unit =
if (messageQueue.hasMessages
&& i.hasNext
&& (executorService.get().executor match {
case lm: LoadMetrics lm.atFullThrottle == false
case other true
})
&& !registerForExecution(i.next.mailbox, false, false))
scheduleOne(i)
}
}