diff --git a/akka-core/src/main/scala/routing/Iterators.scala b/akka-core/src/main/scala/routing/Iterators.scala new file mode 100644 index 0000000000..7d06bd74ee --- /dev/null +++ b/akka-core/src/main/scala/routing/Iterators.scala @@ -0,0 +1,27 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.Actor + +trait InfiniteIterator[T] extends Iterator[T] + +class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { + @volatile private[this] var current: List[T] = items + + def hasNext = items != Nil + + def next = { + val nc = if (current == Nil) items else current + current = nc.tail + nc.head + } +} + +class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] { + def hasNext = items != Nil + + def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2) +} \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Listeners.scala b/akka-core/src/main/scala/routing/Listeners.scala new file mode 100644 index 0000000000..495aab9ee3 --- /dev/null +++ b/akka-core/src/main/scala/routing/Listeners.scala @@ -0,0 +1,25 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.Actor + +sealed trait ListenerMessage +case class Listen(listener : Actor) extends ListenerMessage +case class Deafen(listener : Actor) extends ListenerMessage +case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage + +trait Listeners { self : Actor => + import se.scalablesolutions.akka.actor.Agent + private lazy val listeners = Agent(Set[Actor]()) + + protected def listenerManagement : PartialFunction[Any,Unit] = { + case Listen(l) => listeners( _ + l) + case Deafen(l) => listeners( _ - l ) + case WithListeners(f) => listeners foreach f + } + + protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) ) +} \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Patterns.scala b/akka-core/src/main/scala/routing/Patterns.scala index d8a49c74e3..a27c69b846 100644 --- a/akka-core/src/main/scala/routing/Patterns.scala +++ b/akka-core/src/main/scala/routing/Patterns.scala @@ -1,3 +1,7 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + package se.scalablesolutions.akka.patterns import se.scalablesolutions.akka.actor.Actor @@ -21,7 +25,6 @@ object Patterns { def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee) - //FIXME 2.8, use default params with CyclicIterator def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer { val seq = actors } @@ -38,68 +41,4 @@ object Patterns { def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor({case _ => actorToLog}, logger) -} - -trait Dispatcher { self: Actor => - - protected def transform(msg: Any): Any = msg - - protected def routes: PartialFunction[Any, Actor] - - protected def dispatch: PartialFunction[Any, Unit] = { - case a if routes.isDefinedAt(a) => - if (self.replyTo.isDefined) routes(a) forward transform(a) - else routes(a) ! transform(a) - } - - def receive = dispatch -} - -trait LoadBalancer extends Dispatcher { self: Actor => - protected def seq: InfiniteIterator[Actor] - - protected def routes = { case x if seq.hasNext => seq.next } -} - -trait InfiniteIterator[T] extends Iterator[T] - -class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { - @volatile private[this] var current: List[T] = items - - def hasNext = items != Nil - - def next = { - val nc = if (current == Nil) items else current - current = nc.tail - nc.head - } -} - -class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] { - def hasNext = items != Nil - - def next = { - def actorWithSmallestMailbox(a1: Actor, a2: Actor) = { - if (a1.mailboxSize < a2.mailboxSize) a1 else a2 - } - items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2)) - } -} - -sealed trait ListenerMessage -case class Listen(listener : Actor) extends ListenerMessage -case class Deafen(listener : Actor) extends ListenerMessage -case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage - -trait Listeners { self : Actor => - import se.scalablesolutions.akka.actor.Agent - private lazy val listeners = Agent(Set[Actor]()) - - protected def listenerManagement : PartialFunction[Any,Unit] = { - case Listen(l) => listeners( _ + l) - case Deafen(l) => listeners( _ - l ) - case WithListeners(f) => listeners foreach f - } - - protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) ) } \ No newline at end of file diff --git a/akka-core/src/main/scala/routing/Routers.scala b/akka-core/src/main/scala/routing/Routers.scala new file mode 100644 index 0000000000..3749f94437 --- /dev/null +++ b/akka-core/src/main/scala/routing/Routers.scala @@ -0,0 +1,28 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package se.scalablesolutions.akka.patterns + +import se.scalablesolutions.akka.actor.Actor + +trait Dispatcher { self: Actor => + + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, Actor] + + protected def dispatch: PartialFunction[Any, Unit] = { + case a if routes.isDefinedAt(a) => + if (self.replyTo.isDefined) routes(a) forward transform(a) + else routes(a) ! transform(a) + } + + def receive = dispatch +} + +trait LoadBalancer extends Dispatcher { self: Actor => + protected def seq: InfiniteIterator[Actor] + + protected def routes = { case x if seq.hasNext => seq.next } +} \ No newline at end of file