pekko/akka-actor/src/main/scala/akka/routing/Routing.scala

390 lines
14 KiB
Scala
Raw Normal View History

/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.routing
2010-02-13 21:45:35 +01:00
import akka.AkkaException
2011-07-28 15:48:03 +03:00
import akka.actor._
import akka.event.EventHandler
import akka.config.ConfigurationException
2011-07-28 15:48:03 +03:00
import akka.actor.UntypedChannel._
import akka.dispatch.{ Future, Futures }
import akka.util.ReflectiveAccess
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
import scala.annotation.tailrec
/**
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
* {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Router {
/**
* Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for
* the current connections, signal that there were problems with one of the connections and see if there have
* been changes in the connections.
*
* This method is not threadsafe, and should only be called once
*
* JMM Guarantees:
2011-08-12 10:30:26 +03:00
* This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
*/
def init(connectionManager: ConnectionManager)
/**
* Routes the message to one of the connections.
*
* @throws RoutingException if something goes wrong while routing the message
*/
def route(message: Any)(implicit sender: Option[ActorRef])
/**
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
* completion of the processing of the message.
*
* @throws RoutingExceptionif something goes wrong while routing the message.
*/
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
}
/**
* An {@link AkkaException} thrown when something goes wrong while routing a message
*/
class RoutingException(message: String) extends AkkaException(message)
/**
* A Helper class to create actor references that use routing.
*/
2011-07-28 15:48:03 +03:00
object Routing {
2011-07-28 15:48:03 +03:00
sealed trait RoutingMessage
2011-07-28 15:48:03 +03:00
case class Broadcast(message: Any) extends RoutingMessage
/**
* FIXME: will very likely be moved to the ActorRef.
*/
def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = {
//TODO Implement support for configuring by deployment ID etc
//TODO If address matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If the actor deployed uses a different config, then ignore or throw exception?
if (props.connectionManager.size == 0) throw new ConfigurationException("RoutedProps used for creating actor [" + address + "] has zero connections configured; can't create a router")
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
val localOnly = props.localOnly
if (clusteringEnabled && !props.localOnly) ReflectiveAccess.ClusterModule.newClusteredActorRef(props)
else new RoutedActorRef(props, address)
2011-07-28 15:48:03 +03:00
}
}
/**
* An Abstract convenience implementation for building an ActorReference that uses a Router.
*/
abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef {
val router = props.routerFactory()
override def postMessageToMailbox(message: Any, channel: UntypedChannel) = {
2011-07-28 15:48:03 +03:00
val sender = channel match {
case ref: ActorRef Some(ref)
2011-07-28 16:56:35 +03:00
case _ None
2011-07-28 15:48:03 +03:00
}
router.route(message)(sender)
2011-07-28 15:48:03 +03:00
}
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = {
2011-07-28 15:48:03 +03:00
val sender = channel match {
case ref: ActorRef Some(ref)
2011-07-28 16:56:35 +03:00
case _ None
2011-07-28 15:48:03 +03:00
}
router.route[Any](message, timeout)(sender)
}
}
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: String) extends AbstractRoutedActorRef(routedProps) {
@volatile
private var running: Boolean = true
def isShutdown: Boolean = !running
2011-07-28 15:48:03 +03:00
def stop() {
synchronized {
if (running) {
running = false
router.route(Routing.Broadcast(PoisonPill))(Some(this))
2011-07-28 15:48:03 +03:00
}
}
}
2011-09-08 11:02:17 +02:00
router.init(routedProps.connectionManager)
2011-07-28 15:48:03 +03:00
}
2011-07-28 15:48:03 +03:00
/**
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* FIXME: this is also the location where message buffering should be done in case of failure.
2011-07-28 15:48:03 +03:00
*/
trait BasicRouter extends Router {
2010-02-13 21:45:35 +01:00
@volatile
protected var connectionManager: ConnectionManager = _
def init(connectionManager: ConnectionManager) = {
this.connectionManager = connectionManager
}
def route(message: Any)(implicit sender: Option[ActorRef]) = message match {
2011-07-28 15:48:03 +03:00
case Routing.Broadcast(message)
//it is a broadcast message, we are going to send to message to all connections.
connectionManager.connections.iterable foreach { connection
2011-07-28 15:48:03 +03:00
try {
connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e: Exception
connectionManager.remove(connection)
2011-07-28 15:48:03 +03:00
throw e
}
}
2011-07-28 15:48:03 +03:00
case _
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
case Some(connection)
2011-07-28 15:48:03 +03:00
try {
connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e: Exception
connectionManager.remove(connection)
2011-07-28 15:48:03 +03:00
throw e
}
2011-07-28 16:56:35 +03:00
case None
throwNoConnectionsError
2011-07-28 15:48:03 +03:00
}
}
2011-07-28 16:56:35 +03:00
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
2011-07-28 15:48:03 +03:00
case Routing.Broadcast(message)
throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.")
2011-07-28 15:48:03 +03:00
case _
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
case Some(connection)
2011-07-28 15:48:03 +03:00
try {
// FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef
connection.?(message, timeout)(sender).asInstanceOf[Future[T]]
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e: Exception
connectionManager.remove(connection)
2011-07-28 15:48:03 +03:00
throw e
}
2011-07-28 16:56:35 +03:00
case None
throwNoConnectionsError
2011-07-28 15:48:03 +03:00
}
}
protected def next: Option[ActorRef]
private def throwNoConnectionsError = {
2011-07-28 15:48:03 +03:00
val error = new RoutingException("No replica connections for router")
EventHandler.error(error, this, error.toString)
throw error
}
2010-05-21 20:08:49 +02:00
}
/**
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
*
2011-07-28 15:48:03 +03:00
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class DirectRouter extends BasicRouter {
private val state = new AtomicReference[DirectRouterState]
2011-07-28 15:48:03 +03:00
lazy val next: Option[ActorRef] = {
val current = currentState
if (current.ref == null) None else Some(current.ref)
}
@tailrec
private def currentState: DirectRouterState = {
val current = state.get
if (current != null && connectionManager.version == current.version) {
//we are lucky since nothing has changed in the connections.
current
} else {
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
val connections = connectionManager.connections
val connectionCount = connections.iterable.size
if (connectionCount > 1)
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
val newState = new DirectRouterState(connections.iterable.head, connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
currentState // recur
}
2011-07-28 15:48:03 +03:00
}
private case class DirectRouterState(ref: ActorRef, version: Long)
2010-05-21 20:08:49 +02:00
}
/**
2011-07-28 15:48:03 +03:00
* A Router that randomly selects one of the target connections to send a message to.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RandomRouter extends BasicRouter {
private val state = new AtomicReference[RandomRouterState]
2011-07-28 15:48:03 +03:00
//FIXME: threadlocal random?
private val random = new java.util.Random(System.nanoTime)
2011-07-28 15:48:03 +03:00
def next: Option[ActorRef] = currentState.array match {
2011-08-26 17:25:18 +02:00
case a if a.isEmpty None
case a Some(a(random.nextInt(a.length)))
}
@tailrec
private def currentState: RandomRouterState = {
val current = state.get
if (current != null && current.version == connectionManager.version) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
current
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val connections = connectionManager.connections
val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
currentState
2011-07-28 15:48:03 +03:00
}
}
2011-08-26 17:25:18 +02:00
private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
}
/**
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
2011-07-28 15:48:03 +03:00
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RoundRobinRouter extends BasicRouter {
private val state = new AtomicReference[RoundRobinState]
def next: Option[ActorRef] = currentState.next
@tailrec
private def currentState: RoundRobinState = {
val current = state.get
if (current != null && current.version == connectionManager.version) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
current
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val connections = connectionManager.connections
val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
currentState
}
}
2011-08-26 17:25:18 +02:00
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
private val index = new AtomicInteger(0)
def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex))
@tailrec
private def nextIndex: Int = {
val oldIndex = index.get
var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1
if (!index.compareAndSet(oldIndex, newIndex)) nextIndex
else oldIndex
}
}
}
/**
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
* specified strategy (specific router needs to implement `gather` method).
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
*
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
* FIXME: this is also the location where message buffering should be done in case of failure.
*/
trait ScatterGatherRouter extends BasicRouter with Serializable {
/**
* Aggregates the responses into a single Future.
*
* @param results Futures of the responses from connections
*/
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
val responses = connectionManager.connections.iterable.flatMap { actor
try {
if (actor.isShutdown) throw new ActorInitializationException("For compatability - check death first")
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
} catch {
case e: Exception
connectionManager.remove(actor)
None
}
}
2011-08-26 17:25:18 +02:00
if (responses.isEmpty)
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
else gather(responses)
}
override def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
case Routing.Broadcast(message) scatterGather(message, timeout)
case message super.route(message, timeout)(sender)
}
}
/**
* Simple router that broadcasts the message to all connections, and replies with the first response
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
* mode, the router would behave as {@link RoundRobinRouter}
*/
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
}