Added ScalaDoc for akka-patterns

This commit is contained in:
Viktor Klang 2010-05-06 22:27:59 +02:00
parent 3ce843eff7
commit a56ac7b664
4 changed files with 31 additions and 6 deletions

View file

@ -6,8 +6,12 @@ package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.ActorID import se.scalablesolutions.akka.actor.ActorID
/** An Iterator that is either always empty or yields an infinite number of Ts
*/
trait InfiniteIterator[T] extends Iterator[T] trait InfiniteIterator[T] extends Iterator[T]
/** CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List
*/
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
@volatile private[this] var current: List[T] = items @volatile private[this] var current: List[T] = items
@ -20,6 +24,9 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
} }
} }
/** This InfiniteIterator always returns the Actor that has the currently smallest mailbox
* useful for work-stealing.
*/
class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] {
def hasNext = items != Nil def hasNext = items != Nil

View file

@ -11,6 +11,12 @@ case class Listen(listener: ActorID) extends ListenerMessage
case class Deafen(listener: ActorID) extends ListenerMessage case class Deafen(listener: ActorID) extends ListenerMessage
case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage
/** Listeners is a generic trait to implement listening capability on an Actor
* Use the <code>gossip(msg)</code> method to have it sent to the listenees
* Send <code>Listen(self)</code> to start listening
* Send <code>Deafen(self)</code> to stop listening
* Send <code>WithListeners(fun)</code> to traverse the current listeners
*/
trait Listeners { self : Actor => trait Listeners { self : Actor =>
import se.scalablesolutions.akka.actor.Agent import se.scalablesolutions.akka.actor.Agent
private lazy val listeners = Agent(Set[ActorID]()) private lazy val listeners = Agent(Set[ActorID]())

View file

@ -10,8 +10,7 @@ import se.scalablesolutions.akka.actor.Actor._
object Patterns { object Patterns {
type PF[A, B] = PartialFunction[A, B] type PF[A, B] = PartialFunction[A, B]
/** /** Creates a new PartialFunction whose isDefinedAt is a combination
* Creates a new PartialFunction whose isDefinedAt is a combination
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply * of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
*/ */
def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
@ -20,19 +19,21 @@ object Patterns {
filtered(a) filtered(a)
} }
/** /** Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
*/ */
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
//FIXME 2.8, use default params with CyclicIterator /** Creates a LoadBalancer from the thunk-supplied InfiniteIterator
*/
def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID =
newActor(() => new Actor with LoadBalancer { newActor(() => new Actor with LoadBalancer {
start start
val seq = actors val seq = actors
}) })
/** Creates a Dispatcher given a routing and a message-transforming function
*/
def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID =
newActor(() => new Actor with Dispatcher { newActor(() => new Actor with Dispatcher {
start start
@ -40,11 +41,16 @@ object Patterns {
def routes = routing def routes = routing
}) })
/** Creates a Dispatcher given a routing
*/
def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher { def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher {
start start
def routes = routing def routes = routing
}) })
/** Creates an actor that pipes all incoming messages to
* both another actor and through the supplied function
*/
def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID =
dispatcherActor({case _ => actorToLog}, logger) dispatcherActor({case _ => actorToLog}, logger)
} }

View file

@ -6,6 +6,8 @@ package se.scalablesolutions.akka.patterns
import se.scalablesolutions.akka.actor.{Actor, ActorID} import se.scalablesolutions.akka.actor.{Actor, ActorID}
/** A Dispatcher is a trait whose purpose is to route incoming messages to actors
*/
trait Dispatcher { self: Actor => trait Dispatcher { self: Actor =>
protected def transform(msg: Any): Any = msg protected def transform(msg: Any): Any = msg
@ -15,12 +17,16 @@ trait Dispatcher { self: Actor =>
protected def dispatch: PartialFunction[Any, Unit] = { protected def dispatch: PartialFunction[Any, Unit] = {
case a if routes.isDefinedAt(a) => case a if routes.isDefinedAt(a) =>
if (self.replyTo.isDefined) routes(a) forward transform(a) if (self.replyTo.isDefined) routes(a) forward transform(a)
else routes(a) ! transform(a) else routes(a).!(transform(a))(None)
} }
def receive = dispatch def receive = dispatch
} }
/** A LoadBalancer is a specialized kind of Dispatcher,
* that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to
*/
trait LoadBalancer extends Dispatcher { self: Actor => trait LoadBalancer extends Dispatcher { self: Actor =>
protected def seq: InfiniteIterator[ActorID] protected def seq: InfiniteIterator[ActorID]