diff --git a/akka-core/src/main/scala/routing/Iterators.scala b/akka-core/src/main/scala/routing/Iterators.scala index 77159767af..8d9b0b4f9d 100644 --- a/akka-core/src/main/scala/routing/Iterators.scala +++ b/akka-core/src/main/scala/routing/Iterators.scala @@ -6,8 +6,12 @@ package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.ActorID +/** An Iterator that is either always empty or yields an infinite number of Ts + */ trait InfiniteIterator[T] extends Iterator[T] +/** CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List + */ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { @volatile private[this] var current: List[T] = items @@ -20,6 +24,9 @@ class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { } } +/** This InfiniteIterator always returns the Actor that has the currently smallest mailbox + * useful for work-stealing. + */ class SmallestMailboxFirstIterator(items : List[ActorID]) extends InfiniteIterator[ActorID] { def hasNext = items != Nil diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala index d2fcc1cc73..652c8126be 100644 --- a/akka-core/src/main/scala/routing/Listeners.scala +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -11,6 +11,12 @@ case class Listen(listener: ActorID) extends ListenerMessage case class Deafen(listener: ActorID) extends ListenerMessage case class WithListeners(f: Set[ActorID] => Unit) extends ListenerMessage +/** Listeners is a generic trait to implement listening capability on an Actor + * Use the gossip(msg) method to have it sent to the listenees + * Send Listen(self) to start listening + * Send Deafen(self) to stop listening + * Send WithListeners(fun) to traverse the current listeners + */ trait Listeners { self : Actor => import se.scalablesolutions.akka.actor.Agent private lazy val listeners = Agent(Set[ActorID]()) diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala index c8ac39fc72..7612cfa28f 100644 --- a/akka-core/src/main/scala/routing/Patterns.scala +++ b/akka-core/src/main/scala/routing/Patterns.scala @@ -10,8 +10,7 @@ import se.scalablesolutions.akka.actor.Actor._ object Patterns { type PF[A, B] = PartialFunction[A, B] - /** - * Creates a new PartialFunction whose isDefinedAt is a combination + /** Creates a new PartialFunction whose isDefinedAt is a combination * of the two parameters, and whose apply is first to call filter.apply and then filtered.apply */ def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { @@ -20,19 +19,21 @@ object Patterns { filtered(a) } - /** - * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true + /** Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true */ def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) - //FIXME 2.8, use default params with CyclicIterator + /** Creates a LoadBalancer from the thunk-supplied InfiniteIterator + */ def loadBalancerActor(actors: => InfiniteIterator[ActorID]): ActorID = newActor(() => new Actor with LoadBalancer { start val seq = actors }) + /** Creates a Dispatcher given a routing and a message-transforming function + */ def dispatcherActor(routing: PF[Any, ActorID], msgTransformer: (Any) => Any): ActorID = newActor(() => new Actor with Dispatcher { start @@ -40,11 +41,16 @@ object Patterns { def routes = routing }) + /** Creates a Dispatcher given a routing + */ def dispatcherActor(routing: PF[Any, ActorID]): ActorID = newActor(() => new Actor with Dispatcher { start def routes = routing }) + /** Creates an actor that pipes all incoming messages to + * both another actor and through the supplied function + */ def loggerActor(actorToLog: ActorID, logger: (Any) => Unit): ActorID = dispatcherActor({case _ => actorToLog}, logger) } \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala index ce3c7c311c..62cccfb882 100644 --- a/akka-core/src/main/scala/routing/Routers.scala +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -6,6 +6,8 @@ package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.{Actor, ActorID} +/** A Dispatcher is a trait whose purpose is to route incoming messages to actors + */ trait Dispatcher { self: Actor => protected def transform(msg: Any): Any = msg @@ -15,12 +17,16 @@ trait Dispatcher { self: Actor => protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => if (self.replyTo.isDefined) routes(a) forward transform(a) - else routes(a) ! transform(a) + else routes(a).!(transform(a))(None) } def receive = dispatch } +/** A LoadBalancer is a specialized kind of Dispatcher, + * that is supplied an InfiniteIterator of targets + * to dispatch incoming messages to + */ trait LoadBalancer extends Dispatcher { self: Actor => protected def seq: InfiniteIterator[ActorID]