diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index e5423e7bd1..077ea02a14 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._ import java.util.{Queue, HashSet} import java.util.concurrent.ConcurrentLinkedQueue import java.net.InetSocketAddress +import java.util.concurrent.locks.{Lock, ReentrantLock} /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -219,6 +220,11 @@ trait Actor extends TransactionManagement { private[akka] var _replyToAddress: Option[InetSocketAddress] = None private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation] + /** + * This lock ensures thread safety in the dispatching: only one message can be dispatched at once on the actor. + */ + private[akka] val _dispatcherLock:Lock = new ReentrantLock + // ==================================== // protected fields // ==================================== diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index e115800d4b..6dc1dd03b2 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,7 +62,9 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - invocation.receiver.synchronized { + val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock + if (lockedForDispatching) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. var messageInvocation = invocation.receiver._mailbox.poll while (messageInvocation != null) { messageInvocation.invoke diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala new file mode 100644 index 0000000000..92dc9ebf13 --- /dev/null +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala @@ -0,0 +1,89 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.dispatch.Dispatchers +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.CountDownLatch + +/** + * Tests the behaviour of the executor based event driven dispatcher when multiple actors are being dispatched on it. + * + * @author Jan Van Besien + */ +class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustMatchers with ActorTestUtil { + class SlowActor(finishedCounter: CountDownLatch) extends Actor { + messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + id = "SlowActor" + + def receive = { + case x: Int => { + Thread.sleep(50) // slow actor + finishedCounter.countDown + println("s processed " + x) + } + } + } + + class FastActor(finishedCounter: CountDownLatch) extends Actor { + messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + id = "FastActor" + + def receive = { + case x: Int => { + finishedCounter.countDown + println("f processed " + x) + } + } + } + + @Test def slowActorShouldntBlockFastActor = verify(new TestActor { + def test = { + val sFinished = new CountDownLatch(50) + val fFinished = new CountDownLatch(10) + val s = new SlowActor(sFinished) + val f = new FastActor(fFinished) + + handle(s, f) { + // send a lot of stuff to s + for (i <- 1 to 50) { + s ! i + } + + // send some messages to f + for (i <- 1 to 10) { + f ! i + } + + // now assert that f is finished while s is still busy + fFinished.await + assert(sFinished.getCount > 0) + } + } + }) + +} + +trait ActorTestUtil { + def handle[T](actors: Actor*)(test: => T): T = { + for (a <- actors) a.start + try + { + test + } + finally + { + for (a <- actors) a.stop + } + } + + def verify(actor: TestActor): Unit = handle(actor) + {actor.test} +} + +abstract class TestActor extends Actor with ActorTestUtil +{ + def test: Unit + + def receive = {case _ =>} +}