Improved event driven dispatcher by not scheduling a task for dispatching when another is already busy.

This commit is contained in:
Jan Van Besien 2010-03-04 12:36:57 +01:00
parent 259b6c21bf
commit 6825980822
3 changed files with 98 additions and 1 deletions

View file

@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._
import java.util.{Queue, HashSet}
import java.util.concurrent.ConcurrentLinkedQueue
import java.net.InetSocketAddress
import java.util.concurrent.locks.{Lock, ReentrantLock}
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@ -219,6 +220,11 @@ trait Actor extends TransactionManagement {
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
/**
* This lock ensures thread safety in the dispatching: only one message can be dispatched at once on the actor.
*/
private[akka] val _dispatcherLock:Lock = new ReentrantLock
// ====================================
// protected fields
// ====================================

View file

@ -62,7 +62,9 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche
def dispatch(invocation: MessageInvocation) = if (active) {
executor.execute(new Runnable() {
def run = {
invocation.receiver.synchronized {
val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock
if (lockedForDispatching) {
// Only dispatch if we got the lock. Otherwise another thread is already dispatching.
var messageInvocation = invocation.receiver._mailbox.poll
while (messageInvocation != null) {
messageInvocation.invoke

View file

@ -0,0 +1,89 @@
package se.scalablesolutions.akka.actor
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.dispatch.Dispatchers
import org.scalatest.matchers.MustMatchers
import java.util.concurrent.CountDownLatch
/**
* Tests the behaviour of the executor based event driven dispatcher when multiple actors are being dispatched on it.
*
* @author Jan Van Besien
*/
class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustMatchers with ActorTestUtil {
class SlowActor(finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "SlowActor"
def receive = {
case x: Int => {
Thread.sleep(50) // slow actor
finishedCounter.countDown
println("s processed " + x)
}
}
}
class FastActor(finishedCounter: CountDownLatch) extends Actor {
messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher
id = "FastActor"
def receive = {
case x: Int => {
finishedCounter.countDown
println("f processed " + x)
}
}
}
@Test def slowActorShouldntBlockFastActor = verify(new TestActor {
def test = {
val sFinished = new CountDownLatch(50)
val fFinished = new CountDownLatch(10)
val s = new SlowActor(sFinished)
val f = new FastActor(fFinished)
handle(s, f) {
// send a lot of stuff to s
for (i <- 1 to 50) {
s ! i
}
// send some messages to f
for (i <- 1 to 10) {
f ! i
}
// now assert that f is finished while s is still busy
fFinished.await
assert(sFinished.getCount > 0)
}
}
})
}
trait ActorTestUtil {
def handle[T](actors: Actor*)(test: => T): T = {
for (a <- actors) a.start
try
{
test
}
finally
{
for (a <- actors) a.stop
}
}
def verify(actor: TestActor): Unit = handle(actor)
{actor.test}
}
abstract class TestActor extends Actor with ActorTestUtil
{
def test: Unit
def receive = {case _ =>}
}