diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index b0991936fb..02ed7894d0 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -22,6 +22,7 @@ import org.multiverse.commitbarriers.CountDownCommitBarrier import java.util.{Queue, HashSet} import java.util.concurrent.ConcurrentLinkedQueue import java.net.InetSocketAddress +import java.util.concurrent.locks.{Lock, ReentrantLock} /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -218,6 +219,10 @@ trait Actor extends TransactionManagement { private[akka] var _replyToAddress: Option[InetSocketAddress] = None private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation] + /** + * This lock ensures thread safety in the dispatching: only one message can be dispatched at once on the actor. + */ + private[akka] val _dispatcherLock:Lock = new ReentrantLock // ==================================== // protected fields diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index e115800d4b..b48e7717cf 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -57,18 +57,29 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche @volatile private var active: Boolean = false val name: String = "event-driven:executor:dispatcher:" + _name - init - + init + def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - invocation.receiver.synchronized { - var messageInvocation = invocation.receiver._mailbox.poll - while (messageInvocation != null) { - messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + var lockAcquiredOnce = false + // this do-wile loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock + do { + if (invocation.receiver._dispatcherLock.tryLock) { + lockAcquiredOnce = true + try { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } + } finally { + invocation.receiver._dispatcherLock.unlock + } } - } + } while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty)) } }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") @@ -88,4 +99,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche "Can't build a new thread pool for a dispatcher that is already up and running") private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool -} \ No newline at end of file +} diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala new file mode 100644 index 0000000000..b3e04f3244 --- /dev/null +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala @@ -0,0 +1,85 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.dispatch.Dispatchers +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.CountDownLatch + +/** + * Tests the behaviour of the executor based event driven dispatcher when multiple actors are being dispatched on it. + * + * @author Jan Van Besien + */ +class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustMatchers with ActorTestUtil { + class SlowActor(finishedCounter: CountDownLatch) extends Actor { + messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + id = "SlowActor" + + def receive = { + case x: Int => { + Thread.sleep(50) // slow actor + finishedCounter.countDown + } + } + } + + class FastActor(finishedCounter: CountDownLatch) extends Actor { + messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + id = "FastActor" + + def receive = { + case x: Int => { + finishedCounter.countDown + } + } + } + + @Test def slowActorShouldntBlockFastActor = verify(new TestActor { + def test = { + val sFinished = new CountDownLatch(50) + val fFinished = new CountDownLatch(10) + val s = new SlowActor(sFinished) + val f = new FastActor(fFinished) + + handle(s, f) { + // send a lot of stuff to s + for (i <- 1 to 50) { + s ! i + } + + // send some messages to f + for (i <- 1 to 10) { + f ! i + } + + // now assert that f is finished while s is still busy + fFinished.await + assert(sFinished.getCount > 0) + } + } + }) + +} + +trait ActorTestUtil { + def handle[T](actors: Actor*)(test: => T): T = { + for (a <- actors) a.start + try { + test + } + finally { + for (a <- actors) a.stop + } + } + + def verify(actor: TestActor): Unit = handle(actor) { + actor.test + } +} + +abstract class TestActor extends Actor with ActorTestUtil { + def test: Unit + + def receive = {case _ =>} +} diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 891126f22e..72b51d4f4d 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -127,9 +127,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { Credentials(Path.userHome / ".akka_publish_credentials", log) override def managedStyle = ManagedStyle.Maven - val publishTo = "Scalable Solutions Maven Repository" at "http://scalablesolutions.se/akka/repository/" + val publishTo = "Scalable Solutions Maven Repository" at "~/tmp/akka" +// val publishTo = "Scalable Solutions Maven Repository" at "http://scalablesolutions.se/akka/repository/" val sourceArtifact = Artifact(artifactID, "src", "jar", Some("sources"), Nil, None) - val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None) +// val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None) override def packageDocsJar = defaultJarPath("-javadoc.jar") override def packageSrcJar= defaultJarPath("-sources.jar")