Changing Listeners backing store to ConcurrentSkipListSet and changing signature of WithListeners(f) to (ActorRef) => Unit
This commit is contained in:
parent
2cf35a27a3
commit
fb2d777620
2 changed files with 13 additions and 11 deletions
|
|
@ -5,13 +5,13 @@
|
|||
package se.scalablesolutions.akka.routing
|
||||
|
||||
import se.scalablesolutions.akka.actor.{Actor, ActorRef}
|
||||
|
||||
import java.util.concurrent.CopyOnWriteArraySet
|
||||
import java.util.concurrent.ConcurrentSkipListSet
|
||||
import scala.collection.JavaConversions._
|
||||
|
||||
sealed trait ListenerMessage
|
||||
case class Listen(listener: ActorRef) extends ListenerMessage
|
||||
case class Deafen(listener: ActorRef) extends ListenerMessage
|
||||
case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage
|
||||
case class WithListeners(f: (ActorRef) => Unit) extends ListenerMessage
|
||||
|
||||
/**
|
||||
* Listeners is a generic trait to implement listening capability on an Actor.
|
||||
|
|
@ -25,15 +25,13 @@ case class WithListeners(f: List[ActorRef] => Unit) extends ListenerMessage
|
|||
* Send <code>WithListeners(fun)</code> to traverse the current listeners.
|
||||
*/
|
||||
trait Listeners { self: Actor =>
|
||||
private val listeners = new CopyOnWriteArraySet[ActorRef]
|
||||
private val listeners = new ConcurrentSkipListSet[ActorRef]
|
||||
|
||||
protected def listenerManagement: Receive = {
|
||||
case Listen(l) => listeners add l
|
||||
case Deafen(l) => listeners remove l
|
||||
case WithListeners(f) => f(listenersAsList)
|
||||
case WithListeners(f) => listeners foreach f
|
||||
}
|
||||
|
||||
protected def gossip(msg: Any) = listenersAsList foreach (_ ! msg)
|
||||
|
||||
private def listenersAsList: List[ActorRef] = listeners.toArray.toList.asInstanceOf[List[ActorRef]]
|
||||
protected def gossip(msg: Any) = listeners foreach (_ ! msg)
|
||||
}
|
||||
|
|
|
|||
|
|
@ -94,6 +94,7 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
|
||||
@Test def testListener = {
|
||||
val latch = new CountDownLatch(2)
|
||||
val foreachListener = new CountDownLatch(2)
|
||||
val num = new AtomicInteger(0)
|
||||
val i = actorOf(new Actor with Listeners {
|
||||
def receive = listenerManagement orElse {
|
||||
|
|
@ -104,8 +105,9 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
|
||||
def newListener = actor {
|
||||
case "bar" =>
|
||||
num.incrementAndGet
|
||||
latch.countDown
|
||||
num.incrementAndGet
|
||||
latch.countDown
|
||||
case "foo" => foreachListener.countDown
|
||||
}
|
||||
|
||||
val a1 = newListener
|
||||
|
|
@ -116,12 +118,14 @@ class RoutingSpec extends junit.framework.TestCase with Suite with MustMatchers
|
|||
i ! Listen(a2)
|
||||
i ! Listen(a3)
|
||||
i ! Deafen(a3)
|
||||
|
||||
i ! WithListeners(_ ! "foo")
|
||||
i ! "foo"
|
||||
|
||||
val done = latch.await(5,TimeUnit.SECONDS)
|
||||
done must be (true)
|
||||
num.get must be (2)
|
||||
val withListeners = foreachListener.await(5,TimeUnit.SECONDS)
|
||||
withListeners must be (true)
|
||||
for(a <- List(i,a1,a2,a3)) a.stop
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue