From 68259808221f9651bb2e90c511df5e2adb273d5c Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Thu, 4 Mar 2010 12:36:57 +0100 Subject: [PATCH 1/7] Improved event driven dispatcher by not scheduling a task for dispatching when another is already busy. --- akka-core/src/main/scala/actor/Actor.scala | 6 ++ .../ExecutorBasedEventDrivenDispatcher.scala | 4 +- ...BasedEventDrivenDispatcherActorsTest.scala | 89 +++++++++++++++++++ 3 files changed, 98 insertions(+), 1 deletion(-) create mode 100644 akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index e5423e7bd1..077ea02a14 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._ import java.util.{Queue, HashSet} import java.util.concurrent.ConcurrentLinkedQueue import java.net.InetSocketAddress +import java.util.concurrent.locks.{Lock, ReentrantLock} /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -219,6 +220,11 @@ trait Actor extends TransactionManagement { private[akka] var _replyToAddress: Option[InetSocketAddress] = None private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation] + /** + * This lock ensures thread safety in the dispatching: only one message can be dispatched at once on the actor. + */ + private[akka] val _dispatcherLock:Lock = new ReentrantLock + // ==================================== // protected fields // ==================================== diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index e115800d4b..6dc1dd03b2 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,7 +62,9 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - invocation.receiver.synchronized { + val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock + if (lockedForDispatching) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. var messageInvocation = invocation.receiver._mailbox.poll while (messageInvocation != null) { messageInvocation.invoke diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala new file mode 100644 index 0000000000..92dc9ebf13 --- /dev/null +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala @@ -0,0 +1,89 @@ +package se.scalablesolutions.akka.actor + +import org.scalatest.junit.JUnitSuite +import org.junit.Test +import se.scalablesolutions.akka.dispatch.Dispatchers +import org.scalatest.matchers.MustMatchers +import java.util.concurrent.CountDownLatch + +/** + * Tests the behaviour of the executor based event driven dispatcher when multiple actors are being dispatched on it. + * + * @author Jan Van Besien + */ +class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustMatchers with ActorTestUtil { + class SlowActor(finishedCounter: CountDownLatch) extends Actor { + messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + id = "SlowActor" + + def receive = { + case x: Int => { + Thread.sleep(50) // slow actor + finishedCounter.countDown + println("s processed " + x) + } + } + } + + class FastActor(finishedCounter: CountDownLatch) extends Actor { + messageDispatcher = Dispatchers.globalExecutorBasedEventDrivenDispatcher + id = "FastActor" + + def receive = { + case x: Int => { + finishedCounter.countDown + println("f processed " + x) + } + } + } + + @Test def slowActorShouldntBlockFastActor = verify(new TestActor { + def test = { + val sFinished = new CountDownLatch(50) + val fFinished = new CountDownLatch(10) + val s = new SlowActor(sFinished) + val f = new FastActor(fFinished) + + handle(s, f) { + // send a lot of stuff to s + for (i <- 1 to 50) { + s ! i + } + + // send some messages to f + for (i <- 1 to 10) { + f ! i + } + + // now assert that f is finished while s is still busy + fFinished.await + assert(sFinished.getCount > 0) + } + } + }) + +} + +trait ActorTestUtil { + def handle[T](actors: Actor*)(test: => T): T = { + for (a <- actors) a.start + try + { + test + } + finally + { + for (a <- actors) a.stop + } + } + + def verify(actor: TestActor): Unit = handle(actor) + {actor.test} +} + +abstract class TestActor extends Actor with ActorTestUtil +{ + def test: Unit + + def receive = {case _ =>} +} From a14b10416010885ee3bca4d6fa1aa77096d688aa Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Thu, 4 Mar 2010 13:00:04 +0100 Subject: [PATCH 2/7] Release the lock when done dispatching. --- .../ExecutorBasedEventDrivenDispatcher.scala | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 6dc1dd03b2..b4f6c3425c 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -63,13 +63,17 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche executor.execute(new Runnable() { def run = { val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock - if (lockedForDispatching) { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - var messageInvocation = invocation.receiver._mailbox.poll - while (messageInvocation != null) { - messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + try { + if (lockedForDispatching) { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } } + } finally { + invocation.receiver._dispatcherLock.unlock } } }) From 77b44551fd5743164deb7087e159ddb18bf63127 Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Thu, 4 Mar 2010 14:50:11 +0100 Subject: [PATCH 3/7] remove println's in test --- .../scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala | 2 -- 1 file changed, 2 deletions(-) diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala index 92dc9ebf13..55e30fec5d 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala @@ -20,7 +20,6 @@ class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustM case x: Int => { Thread.sleep(50) // slow actor finishedCounter.countDown - println("s processed " + x) } } } @@ -32,7 +31,6 @@ class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustM def receive = { case x: Int => { finishedCounter.countDown - println("f processed " + x) } } } From a36411cfb6a0498a64c01f08c399a980503d4ce2 Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Thu, 4 Mar 2010 15:20:52 +0100 Subject: [PATCH 4/7] only unlock if locked. --- .../ExecutorBasedEventDrivenDispatcher.scala | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index b4f6c3425c..77879e0e3b 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -57,23 +57,23 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche @volatile private var active: Boolean = false val name: String = "event-driven:executor:dispatcher:" + _name - init - + init + def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock - try { - if (lockedForDispatching) { + if (lockedForDispatching) { + try { // Only dispatch if we got the lock. Otherwise another thread is already dispatching. var messageInvocation = invocation.receiver._mailbox.poll while (messageInvocation != null) { messageInvocation.invoke messageInvocation = invocation.receiver._mailbox.poll } + } finally { + invocation.receiver._dispatcherLock.unlock } - } finally { - invocation.receiver._dispatcherLock.unlock } } }) From 2bafccb1ce7e27c0f8a977830407d13d8b24bbea Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Wed, 10 Mar 2010 19:35:05 +0100 Subject: [PATCH 5/7] fixed layout --- ...rBasedEventDrivenDispatcherActorsTest.scala | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala index 55e30fec5d..b3e04f3244 100644 --- a/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala +++ b/akka-core/src/test/scala/ExecutorBasedEventDrivenDispatcherActorsTest.scala @@ -65,22 +65,20 @@ class ExecutorBasedEventDrivenDispatcherActorsTest extends JUnitSuite with MustM trait ActorTestUtil { def handle[T](actors: Actor*)(test: => T): T = { for (a <- actors) a.start - try - { + try { test } - finally - { - for (a <- actors) a.stop - } + finally { + for (a <- actors) a.stop + } } - def verify(actor: TestActor): Unit = handle(actor) - {actor.test} + def verify(actor: TestActor): Unit = handle(actor) { + actor.test + } } -abstract class TestActor extends Actor with ActorTestUtil -{ +abstract class TestActor extends Actor with ActorTestUtil { def test: Unit def receive = {case _ =>} From 34973c8d1cba2ed1f3b8302087b383d5050d5607 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Sun, 14 Mar 2010 18:01:49 +0100 Subject: [PATCH 6/7] dispatcher speed improvements --- .../ExecutorBasedEventDrivenDispatcher.scala | 13 +++++++------ project/build/AkkaProject.scala | 5 +++-- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index e115800d4b..68e537ae82 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,13 +62,14 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - invocation.receiver.synchronized { +// invocation.receiver.synchronized { var messageInvocation = invocation.receiver._mailbox.poll - while (messageInvocation != null) { - messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll - } - } + if (messageInvocation != null) messageInvocation.invoke +// while (messageInvocation != null) { +// messageInvocation.invoke +// messageInvocation = invocation.receiver._mailbox.poll +// } +// } } }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index 2df647a5bd..d6f2a1d79d 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -127,9 +127,10 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) { Credentials(Path.userHome / ".akka_publish_credentials", log) override def managedStyle = ManagedStyle.Maven - val publishTo = "Scalable Solutions Maven Repository" at "http://scalablesolutions.se/akka/repository/" + val publishTo = "Scalable Solutions Maven Repository" at "~/tmp/akka" +// val publishTo = "Scalable Solutions Maven Repository" at "http://scalablesolutions.se/akka/repository/" val sourceArtifact = Artifact(artifactID, "src", "jar", Some("sources"), Nil, None) - val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None) +// val docsArtifact = Artifact(artifactID, "docs", "jar", Some("javadoc"), Nil, None) override def packageDocsJar = defaultJarPath("-javadoc.jar") override def packageSrcJar= defaultJarPath("-sources.jar") From a269dc4022f8c068b8d4213d361a695155ee1b78 Mon Sep 17 00:00:00 2001 From: Jan Van Besien Date: Tue, 16 Mar 2010 12:25:58 +0100 Subject: [PATCH 7/7] Fixed bug which allowed messages to be "missed" if they arrived after looping through the mailbox, but before releasing the lock. --- .../ExecutorBasedEventDrivenDispatcher.scala | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 77879e0e3b..b48e7717cf 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -62,19 +62,24 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche def dispatch(invocation: MessageInvocation) = if (active) { executor.execute(new Runnable() { def run = { - val lockedForDispatching = invocation.receiver._dispatcherLock.tryLock - if (lockedForDispatching) { - try { - // Only dispatch if we got the lock. Otherwise another thread is already dispatching. - var messageInvocation = invocation.receiver._mailbox.poll - while (messageInvocation != null) { - messageInvocation.invoke - messageInvocation = invocation.receiver._mailbox.poll + var lockAcquiredOnce = false + // this do-wile loop is required to prevent missing new messages between the end of the inner while + // loop and releasing the lock + do { + if (invocation.receiver._dispatcherLock.tryLock) { + lockAcquiredOnce = true + try { + // Only dispatch if we got the lock. Otherwise another thread is already dispatching. + var messageInvocation = invocation.receiver._mailbox.poll + while (messageInvocation != null) { + messageInvocation.invoke + messageInvocation = invocation.receiver._mailbox.poll + } + } finally { + invocation.receiver._dispatcherLock.unlock } - } finally { - invocation.receiver._dispatcherLock.unlock } - } + } while ((lockAcquiredOnce && !invocation.receiver._mailbox.isEmpty)) } }) } else throw new IllegalStateException("Can't submit invocations to dispatcher since it's not started") @@ -94,4 +99,4 @@ class ExecutorBasedEventDrivenDispatcher(_name: String) extends MessageDispatche "Can't build a new thread pool for a dispatcher that is already up and running") private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool -} \ No newline at end of file +}