From 3f2c9a2be9b799594501af2b77744c9b5b928b32 Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Wed, 17 Mar 2010 22:10:49 +0100 Subject: [PATCH] Added load balancer which prefers actors with small mailboxes (discussed on mailing list a while ago). --- akka-core/src/main/scala/actor/Actor.scala | 2 ++ akka-patterns/src/main/scala/Patterns.scala | 13 +++++++- .../src/test/scala/ActorPatternsTest.scala | 30 +++++++++++++++++++ 3 files changed, 44 insertions(+), 1 deletion(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index da0502f000..f28f994044 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -463,6 +463,8 @@ trait Actor extends TransactionManagement with Logging { def isRunning = _isRunning + def mailboxSize = _mailbox.size + /** * Sends a one-way asynchronous message. E.g. fire-and-forget semantics. *

diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index 4ac40bfaba..b477adb10e 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -73,4 +73,15 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { current = nc.tail nc.head } -} \ No newline at end of file +} + +class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] { + def hasNext = items != Nil + + def next = { + def actorWithSmallestMailbox(a1: Actor, a2: Actor) = { + if (a1.mailboxSize < a2.mailboxSize) a1 else a2 + } + items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2)) + } +} diff --git a/akka-patterns/src/test/scala/ActorPatternsTest.scala b/akka-patterns/src/test/scala/ActorPatternsTest.scala index 3019af0436..0ce999add0 100644 --- a/akka-patterns/src/test/scala/ActorPatternsTest.scala +++ b/akka-patterns/src/test/scala/ActorPatternsTest.scala @@ -1,5 +1,6 @@ package se.scalablesolutions.akka.patterns +import java.util.concurrent.atomic.AtomicInteger import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.actor.Actor import se.scalablesolutions.akka.actor.Actor._ @@ -61,6 +62,35 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat } } }) + + @Test def testSmallestMailboxFirstDispatcher = verify(new TestActor { + def test = { + val t1ProcessedCount = new AtomicInteger(0) + val t1: Actor = actor { + case x => { + Thread.sleep(50) // slow actor + t1ProcessedCount.incrementAndGet + } + } + + val t2ProcessedCount = new AtomicInteger(0) + val t2: Actor = actor { + case x => { + t2ProcessedCount.incrementAndGet + } + } + + val d = loadBalancerActor(new SmallestMailboxFirstIterator(t1 :: t2 :: Nil)) + + handle(d, t1, t2) { + for (i <- 1 to 500) + d ! i + Thread.sleep(6000) + t1ProcessedCount.get must be < (t2ProcessedCount.get) // because t1 is much slower and thus has a bigger mailbox all the time + } + } + }) + } trait ActorTestUtil {