Split up Patterns.scala in different files
This commit is contained in:
parent
39a4f72ad1
commit
b93a893722
4 changed files with 84 additions and 65 deletions
27
akka-core/src/main/scala/routing/Iterators.scala
Normal file
27
akka-core/src/main/scala/routing/Iterators.scala
Normal file
|
|
@ -0,0 +1,27 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.patterns
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
||||||
|
trait InfiniteIterator[T] extends Iterator[T]
|
||||||
|
|
||||||
|
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
||||||
|
@volatile private[this] var current: List[T] = items
|
||||||
|
|
||||||
|
def hasNext = items != Nil
|
||||||
|
|
||||||
|
def next = {
|
||||||
|
val nc = if (current == Nil) items else current
|
||||||
|
current = nc.tail
|
||||||
|
nc.head
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] {
|
||||||
|
def hasNext = items != Nil
|
||||||
|
|
||||||
|
def next = items.reduceLeft((a1, a2) => if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
|
||||||
|
}
|
||||||
25
akka-core/src/main/scala/routing/Listeners.scala
Normal file
25
akka-core/src/main/scala/routing/Listeners.scala
Normal file
|
|
@ -0,0 +1,25 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.patterns
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
||||||
|
sealed trait ListenerMessage
|
||||||
|
case class Listen(listener : Actor) extends ListenerMessage
|
||||||
|
case class Deafen(listener : Actor) extends ListenerMessage
|
||||||
|
case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage
|
||||||
|
|
||||||
|
trait Listeners { self : Actor =>
|
||||||
|
import se.scalablesolutions.akka.actor.Agent
|
||||||
|
private lazy val listeners = Agent(Set[Actor]())
|
||||||
|
|
||||||
|
protected def listenerManagement : PartialFunction[Any,Unit] = {
|
||||||
|
case Listen(l) => listeners( _ + l)
|
||||||
|
case Deafen(l) => listeners( _ - l )
|
||||||
|
case WithListeners(f) => listeners foreach f
|
||||||
|
}
|
||||||
|
|
||||||
|
protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
|
||||||
|
}
|
||||||
|
|
@ -1,3 +1,7 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
package se.scalablesolutions.akka.patterns
|
package se.scalablesolutions.akka.patterns
|
||||||
|
|
||||||
import se.scalablesolutions.akka.actor.Actor
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
@ -21,7 +25,6 @@ object Patterns {
|
||||||
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
|
def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] =
|
||||||
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
|
filter({case a if a.isInstanceOf[A] => interceptor(a)}, interceptee)
|
||||||
|
|
||||||
//FIXME 2.8, use default params with CyclicIterator
|
|
||||||
def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer {
|
def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer {
|
||||||
val seq = actors
|
val seq = actors
|
||||||
}
|
}
|
||||||
|
|
@ -39,67 +42,3 @@ object Patterns {
|
||||||
def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor =
|
def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor =
|
||||||
dispatcherActor({case _ => actorToLog}, logger)
|
dispatcherActor({case _ => actorToLog}, logger)
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Dispatcher { self: Actor =>
|
|
||||||
|
|
||||||
protected def transform(msg: Any): Any = msg
|
|
||||||
|
|
||||||
protected def routes: PartialFunction[Any, Actor]
|
|
||||||
|
|
||||||
protected def dispatch: PartialFunction[Any, Unit] = {
|
|
||||||
case a if routes.isDefinedAt(a) =>
|
|
||||||
if (self.replyTo.isDefined) routes(a) forward transform(a)
|
|
||||||
else routes(a) ! transform(a)
|
|
||||||
}
|
|
||||||
|
|
||||||
def receive = dispatch
|
|
||||||
}
|
|
||||||
|
|
||||||
trait LoadBalancer extends Dispatcher { self: Actor =>
|
|
||||||
protected def seq: InfiniteIterator[Actor]
|
|
||||||
|
|
||||||
protected def routes = { case x if seq.hasNext => seq.next }
|
|
||||||
}
|
|
||||||
|
|
||||||
trait InfiniteIterator[T] extends Iterator[T]
|
|
||||||
|
|
||||||
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
|
||||||
@volatile private[this] var current: List[T] = items
|
|
||||||
|
|
||||||
def hasNext = items != Nil
|
|
||||||
|
|
||||||
def next = {
|
|
||||||
val nc = if (current == Nil) items else current
|
|
||||||
current = nc.tail
|
|
||||||
nc.head
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
class SmallestMailboxFirstIterator(items : List[Actor]) extends InfiniteIterator[Actor] {
|
|
||||||
def hasNext = items != Nil
|
|
||||||
|
|
||||||
def next = {
|
|
||||||
def actorWithSmallestMailbox(a1: Actor, a2: Actor) = {
|
|
||||||
if (a1.mailboxSize < a2.mailboxSize) a1 else a2
|
|
||||||
}
|
|
||||||
items.reduceLeft((actor1, actor2) => actorWithSmallestMailbox(actor1,actor2))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
sealed trait ListenerMessage
|
|
||||||
case class Listen(listener : Actor) extends ListenerMessage
|
|
||||||
case class Deafen(listener : Actor) extends ListenerMessage
|
|
||||||
case class WithListeners(f : Set[Actor] => Unit) extends ListenerMessage
|
|
||||||
|
|
||||||
trait Listeners { self : Actor =>
|
|
||||||
import se.scalablesolutions.akka.actor.Agent
|
|
||||||
private lazy val listeners = Agent(Set[Actor]())
|
|
||||||
|
|
||||||
protected def listenerManagement : PartialFunction[Any,Unit] = {
|
|
||||||
case Listen(l) => listeners( _ + l)
|
|
||||||
case Deafen(l) => listeners( _ - l )
|
|
||||||
case WithListeners(f) => listeners foreach f
|
|
||||||
}
|
|
||||||
|
|
||||||
protected def gossip(msg : Any) = listeners foreach ( _ foreach ( _ ! msg ) )
|
|
||||||
}
|
|
||||||
28
akka-core/src/main/scala/routing/Routers.scala
Normal file
28
akka-core/src/main/scala/routing/Routers.scala
Normal file
|
|
@ -0,0 +1,28 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.patterns
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
||||||
|
trait Dispatcher { self: Actor =>
|
||||||
|
|
||||||
|
protected def transform(msg: Any): Any = msg
|
||||||
|
|
||||||
|
protected def routes: PartialFunction[Any, Actor]
|
||||||
|
|
||||||
|
protected def dispatch: PartialFunction[Any, Unit] = {
|
||||||
|
case a if routes.isDefinedAt(a) =>
|
||||||
|
if (self.replyTo.isDefined) routes(a) forward transform(a)
|
||||||
|
else routes(a) ! transform(a)
|
||||||
|
}
|
||||||
|
|
||||||
|
def receive = dispatch
|
||||||
|
}
|
||||||
|
|
||||||
|
trait LoadBalancer extends Dispatcher { self: Actor =>
|
||||||
|
protected def seq: InfiniteIterator[Actor]
|
||||||
|
|
||||||
|
protected def routes = { case x if seq.hasNext => seq.next }
|
||||||
|
}
|
||||||
Loading…
Add table
Add a link
Reference in a new issue