pekko/akka-actor/src/main/scala/akka/routing/Routing.scala

214 lines
6 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2011 Scalable Solutions AB <http://scalablesolutions.se>
*/
package akka.routing
2010-02-13 21:45:35 +01:00
import akka.actor.{ UntypedActor, Actor, ActorRef }
import akka.actor.Actor._
2010-02-13 21:45:35 +01:00
import akka.actor.ActorRef
import scala.collection.JavaConversions._
import scala.collection.immutable.Seq
import java.util.concurrent.atomic.AtomicReference
import annotation.tailrec
import akka.AkkaException
class RoutingException(message: String) extends AkkaException(message)
sealed trait RouterType
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RouterType {
object Direct extends RouterType
object Random extends RouterType
object RoundRobin extends RouterType
object LeastCPU extends RouterType
object LeastRAM extends RouterType
object LeastMessages extends RouterType
}
/**
* A Router is a trait whose purpose is to route incoming messages to actors.
*/
trait Router { this: Actor
protected def transform(msg: Any): Any = msg
protected def routes: PartialFunction[Any, ActorRef]
protected def broadcast(message: Any) {}
protected def dispatch: Receive = {
case Routing.Broadcast(message)
broadcast(message)
case a if routes.isDefinedAt(a)
if (isSenderDefined) routes(a).forward(transform(a))(someSelf)
else routes(a).!(transform(a))(None)
}
def receive = dispatch
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
}
/**
* An UntypedRouter is an abstract class whose purpose is to route incoming messages to actors.
*/
abstract class UntypedRouter extends UntypedActor {
protected def transform(msg: Any): Any = msg
protected def route(msg: Any): ActorRef
protected def broadcast(message: Any) {}
private def isSenderDefined = self.senderFuture.isDefined || self.sender.isDefined
@throws(classOf[Exception])
def onReceive(msg: Any): Unit = msg match {
case m: Routing.Broadcast broadcast(m.message)
case _
val r = route(msg)
if (r eq null) throw new IllegalStateException("No route for " + msg + " defined!")
if (isSenderDefined) r.forward(transform(msg))(someSelf)
else r.!(transform(msg))(None)
}
}
/**
* A LoadBalancer is a specialized kind of Router, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
trait LoadBalancer extends Router { self: Actor
protected def seq: InfiniteIterator[ActorRef]
protected def routes = {
case x if seq.hasNext seq.next
}
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
}
/**
* A UntypedLoadBalancer is a specialized kind of UntypedRouter, that is supplied an InfiniteIterator of targets
* to dispatch incoming messages to.
*/
abstract class UntypedLoadBalancer extends UntypedRouter {
protected def seq: InfiniteIterator[ActorRef]
protected def route(msg: Any) =
if (seq.hasNext) seq.next
else null
override def broadcast(message: Any) = seq.items.foreach(_ ! message)
}
object Routing {
2010-06-30 16:26:15 +02:00
sealed trait RoutingMessage
case class Broadcast(message: Any) extends RoutingMessage
type PF[A, B] = PartialFunction[A, B]
2010-02-13 21:45:35 +01:00
2010-06-30 16:26:15 +02:00
/**
* Creates a new PartialFunction whose isDefinedAt is a combination
2010-06-30 16:26:15 +02:00
* of the two parameters, and whose apply is first to call filter.apply
* and then filtered.apply.
2010-02-13 21:45:35 +01:00
*/
def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = {
case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a)
2010-02-13 21:45:35 +01:00
filter(a)
filtered(a)
}
2010-06-30 16:26:15 +02:00
/**
* Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true.
2010-02-13 21:45:35 +01:00
*/
def intercept[A: Manifest, B](interceptor: (A) Unit, interceptee: PF[A, B]): PF[A, B] =
filter({ case a interceptor(a) }, interceptee)
2010-06-30 16:26:15 +02:00
/**
* Creates a LoadBalancer from the thunk-supplied InfiniteIterator.
2010-05-06 22:27:59 +02:00
*/
def loadBalancerActor(actors: InfiniteIterator[ActorRef]): ActorRef =
2010-05-08 15:59:11 +02:00
actorOf(new Actor with LoadBalancer {
2010-05-05 13:26:31 +02:00
val seq = actors
2011-04-12 09:55:32 +02:00
}).start()
2010-05-05 13:26:31 +02:00
2010-06-30 16:26:15 +02:00
/**
* Creates a Router given a routing and a message-transforming function.
2010-05-06 22:27:59 +02:00
*/
def routerActor(routing: PF[Any, ActorRef], msgTransformer: (Any) Any): ActorRef =
actorOf(new Actor with Router {
2010-05-05 13:26:31 +02:00
override def transform(msg: Any) = msgTransformer(msg)
def routes = routing
2011-04-12 09:55:32 +02:00
}).start()
2010-06-30 16:26:15 +02:00
/**
* Creates a Router given a routing.
2010-05-06 22:27:59 +02:00
*/
def routerActor(routing: PF[Any, ActorRef]): ActorRef = actorOf(new Actor with Router {
def routes = routing
2011-04-12 09:55:32 +02:00
}).start()
2010-02-13 21:45:35 +01:00
2010-06-30 16:26:15 +02:00
/**
* Creates an actor that pipes all incoming messages to
* both another actor and through the supplied function
2010-05-06 22:27:59 +02:00
*/
def loggerActor(actorToLog: ActorRef, logger: (Any) Unit): ActorRef =
routerActor({ case _ actorToLog }, logger)
2010-05-21 20:08:49 +02:00
}
/**
* An Iterator that is either always empty or yields an infinite number of Ts.
*/
trait InfiniteIterator[T] extends Iterator[T] {
val items: Seq[T]
}
/**
* CyclicIterator is a round-robin style InfiniteIterator that cycles the supplied List.
*/
case class CyclicIterator[T](val items: Seq[T]) extends InfiniteIterator[T] {
def this(items: java.util.List[T]) = this(items.toList)
private[this] val current: AtomicReference[Seq[T]] = new AtomicReference(items)
def hasNext = items != Nil
def next: T = {
@tailrec
def findNext: T = {
val currentItems = current.get
val newItems = currentItems match {
case Nil items
case xs xs
}
if (current.compareAndSet(currentItems, newItems.tail)) newItems.head
else findNext
}
findNext
}
2011-05-21 13:13:28 +02:00
override def exists(f: T Boolean): Boolean = items exists f
}
/**
* This InfiniteIterator always returns the Actor that has the currently smallest mailbox
* useful for work-stealing.
*/
case class SmallestMailboxFirstIterator(val items: Seq[ActorRef]) extends InfiniteIterator[ActorRef] {
def this(items: java.util.List[ActorRef]) = this(items.toList)
def hasNext = items != Nil
def next = items.reduceLeft((a1, a2) if (a1.mailboxSize < a2.mailboxSize) a1 else a2)
override def exists(f: ActorRef Boolean): Boolean = items.exists(f)
}