From fb2d77762011dee3413a5cbb257a4f365074d2aa Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 19 Aug 2010 10:55:22 +0200 Subject: [PATCH] Changing Listeners backing store to ConcurrentSkipListSet and changing signature of WithListeners(f) to (ActorRef) => Unit --- akka-core/src/main/scala/routing/Listeners.scala | 14 ++++++-------- akka-core/src/test/scala/routing/RoutingSpec.scala | 10 +++++++--- 2 files changed, 13 insertions(+), 11 deletions(-) diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala index 47ab2e442f..6531aee422 100644 --- a/akka-core/src/main/scala/routing/Listeners.scala +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -5,13 +5,13 @@ package se.scalablesolutions.akka.routing import se.scalablesolutions.akka.actor.{Actor, ActorRef} - -import java.util.concurrent.CopyOnWriteArraySet +import java.util.concurrent.ConcurrentSkipListSet +import scala.collection.JavaConversions._ sealed trait ListenerMessage case class Listen(listener: ActorRef) extends ListenerMessage case class Deafen(listener: ActorRef) extends ListenerMessage -case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage +case class WithListeners(f: (ActorRef) => Unit) extends ListenerMessage /** * Listeners is a generic trait to implement listening capability on an Actor. @@ -25,15 +25,13 @@ case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage * Send WithListeners(fun) to traverse the current listeners. */ trait Listeners { self: Actor => - private val listeners = new CopyOnWriteArraySet[ActorRef] + private val listeners = new ConcurrentSkipListSet[ActorRef] protected def listenerManagement: Receive = { case Listen(l) => listeners add l case Deafen(l) => listeners remove l - case WithListeners(f) => f(listenersAsList) + case WithListeners(f) => listeners foreach f } - protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg) - - private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]] + protected def gossip(msg: Any) = listeners foreach (_ ! msg) } diff --git a/akka-core/src/test/scala/routing/RoutingSpec.scala b/akka-core/src/test/scala/routing/RoutingSpec.scala index 747363efe6..b51fa11a0e 100644 --- a/akka-core/src/test/scala/routing/RoutingSpec.scala +++ b/akka-core/src/test/scala/routing/RoutingSpec.scala @@ -94,6 +94,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers @Test def testListener = { val latch = new CountDownLatch(2) + val foreachListener = new CountDownLatch(2) val num = new AtomicInteger(0) val i = actorOf(new Actor with Listeners { def receive = listenerManagement orElse { @@ -104,8 +105,9 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers def newListener = actor { case "bar" => - num.incrementAndGet - latch.countDown + num.incrementAndGet + latch.countDown + case "foo" => foreachListener.countDown } val a1 = newListener @@ -116,12 +118,14 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers i ! Listen(a2) i ! Listen(a3) i ! Deafen(a3) - + i ! WithListeners(_ ! "foo") i ! "foo" val done = latch.await(5,TimeUnit.SECONDS) done must be (true) num.get must be (2) + val withListeners = foreachListener.await(5,TimeUnit.SECONDS) + withListeners must be (true) for(a <- List(i,a1,a2,a3)) a.stop }