Merged patterns code into module
This commit is contained in:
parent
653281bb2a
commit
1355dd2411
5 changed files with 370 additions and 0 deletions
101
akka-patterns/src/main/scala/Patterns.scala
Normal file
101
akka-patterns/src/main/scala/Patterns.scala
Normal file
|
|
@ -0,0 +1,101 @@
|
|||
package se.scalablesolutions.akka.actor.patterns
|
||||
|
||||
import se.scalablesolutions.akka.actor.Actor
|
||||
|
||||
object Patterns {
|
||||
type PF[A,B] = PartialFunction[A,B]
|
||||
|
||||
/**
|
||||
* Creates a new PartialFunction whose isDefinedAt is a combination
|
||||
* of the two parameters, and whose apply is first to call filter.apply and then filtered.apply
|
||||
*/
|
||||
def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = {
|
||||
case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) =>
|
||||
filter(a)
|
||||
filtered(a)
|
||||
}
|
||||
|
||||
/**
|
||||
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true
|
||||
*/
|
||||
def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter(
|
||||
{ case a if a.isInstanceOf[A] => interceptor(a) },
|
||||
interceptee
|
||||
)
|
||||
|
||||
//FIXME 2.8, use default params with CyclicIterator
|
||||
def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer {
|
||||
val seq = actors
|
||||
}
|
||||
|
||||
//FIXME 2.8, use default params with CyclicIterator
|
||||
/*def loadBalancerActor(actors : () => List[Actor]) : Actor = loadBalancerActor(
|
||||
new CyclicIterator(actors())
|
||||
) */
|
||||
|
||||
def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher {
|
||||
override def transform(msg : Any) = msgTransformer(msg)
|
||||
def routes = routing
|
||||
}
|
||||
|
||||
def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher {
|
||||
def routes = routing
|
||||
}
|
||||
|
||||
def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor (
|
||||
{ case _ => actorToLog },
|
||||
logger
|
||||
)
|
||||
}
|
||||
|
||||
trait Dispatcher { self : Actor =>
|
||||
|
||||
protected def transform(msg : Any) : Any = msg
|
||||
protected def routes : PartialFunction[Any,Actor]
|
||||
|
||||
protected def dispatch : PartialFunction[Any,Unit] = {
|
||||
case a if routes.isDefinedAt(a) => {
|
||||
if(self.sender.isDefined)
|
||||
routes(a) forward transform(a)
|
||||
else
|
||||
routes(a) send transform(a)
|
||||
}
|
||||
}
|
||||
|
||||
def receive = dispatch
|
||||
}
|
||||
|
||||
trait LoadBalancer extends Dispatcher { self : Actor =>
|
||||
protected def seq : InfiniteIterator[Actor]
|
||||
|
||||
protected def routes = { case x if seq.hasNext => seq.next }
|
||||
}
|
||||
|
||||
trait InfiniteIterator[T] extends Iterator[T]
|
||||
|
||||
class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] {
|
||||
@volatile private[this] var current : List[T] = items
|
||||
def hasNext = items != Nil
|
||||
def next = {
|
||||
val nc = if(current == Nil) items else current
|
||||
current = nc.tail
|
||||
nc.head
|
||||
}
|
||||
}
|
||||
|
||||
//Agent
|
||||
/*
|
||||
val a = agent(startValue)
|
||||
a.set(_ + 5)
|
||||
a.get
|
||||
a.foreach println(_)
|
||||
*/
|
||||
object Agent {
|
||||
sealed trait AgentMessage
|
||||
case class FunMessage[T](f : (T) => T) extends AgentMessage
|
||||
case class ProcMessage[T](f : (T) => Unit) extends AgentMessage
|
||||
case class ValMessage[T](t : T) extends AgentMessage
|
||||
}
|
||||
sealed private[akka] class Agent[T] {
|
||||
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue