2010-05-03 19:32:40 +02:00
/* *
2011-07-14 16:03:08 +02:00
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
2010-05-03 19:32:40 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.routing
2010-02-13 21:45:35 +01:00
2011-05-24 19:04:25 +02:00
import akka.AkkaException
2011-07-28 15:48:03 +03:00
import akka.actor._
import akka.event.EventHandler
2011-08-31 15:07:18 +02:00
import akka.config.ConfigurationException
2011-10-06 21:19:46 +02:00
import akka.dispatch. { Future , MessageDispatcher }
import akka.AkkaApplication
2011-08-27 08:10:25 +03:00
import akka.util.ReflectiveAccess
2011-08-30 14:31:59 +02:00
import java.net.InetSocketAddress
2011-08-31 15:07:18 +02:00
import java.lang.reflect.InvocationTargetException
2011-08-30 14:31:59 +02:00
import java.util.concurrent.atomic. { AtomicReference , AtomicInteger }
2011-08-27 08:10:25 +03:00
2011-08-30 14:31:59 +02:00
import scala.annotation.tailrec
2011-08-27 08:10:25 +03:00
2011-08-12 10:03:33 +03:00
/* *
* The Router is responsible for sending a message to one ( or more ) of its connections . Connections are stored in the
2011-08-30 14:31:59 +02:00
* { @link FailureDetector } and each Router should be linked to only one { @link FailureDetector } .
2011-08-12 10:03:33 +03:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
trait Router {
/* *
* Initializes this Router with a given set of Connections . The Router can use this datastructure to ask for
* the current connections , signal that there were problems with one of the connections and see if there have
* been changes in the connections .
*
* This method is not threadsafe , and should only be called once
*
* JMM Guarantees :
2011-08-12 10:30:26 +03:00
* This method guarantees that all changes made in this method , are visible before one of the routing methods is called .
2011-08-12 10:03:33 +03:00
*/
2011-10-07 15:42:55 +02:00
def init ( connectionManager : ConnectionManager )
2011-08-12 10:03:33 +03:00
/* *
* Routes the message to one of the connections .
*
* @throws RoutingException if something goes wrong while routing the message
*/
2011-08-29 15:50:40 +02:00
def route ( message : Any ) ( implicit sender : Option [ ActorRef ] )
2011-08-12 10:03:33 +03:00
/* *
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
* completion of the processing of the message .
*
* @throws RoutingExceptionif something goes wrong while routing the message .
*/
def route [ T ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ T ]
}
/* *
2011-08-27 08:10:25 +03:00
* An { @link AkkaException } thrown when something goes wrong while routing a message
*/
class RoutingException ( message : String ) extends AkkaException ( message )
/* *
* A Helper class to create actor references that use routing .
*/
2011-07-28 15:48:03 +03:00
object Routing {
2011-10-07 15:22:36 +02:00
2011-07-28 15:48:03 +03:00
sealed trait RoutingMessage
2011-05-24 19:04:25 +02:00
2011-08-27 08:10:25 +03:00
/* *
2011-10-07 19:42:10 +02:00
* Used to broadcast a message to all connections in a router . E . g . every connection gets the message
* regardless of their routing algorithm .
2011-08-27 08:10:25 +03:00
*/
2011-07-28 15:48:03 +03:00
case class Broadcast ( message : Any ) extends RoutingMessage
2011-08-17 10:21:27 +03:00
2011-10-11 11:18:47 +02:00
def createCustomRouter ( implClass : String ) : Router = {
ReflectiveAccess . createInstance (
implClass ,
Array [ Class [ _ ] ] ( ) ,
Array [ AnyRef ] ( ) ) match {
case Right ( router ) ⇒ router . asInstanceOf [ Router ]
case Left ( exception ) ⇒
val cause = exception match {
case i : InvocationTargetException ⇒ i . getTargetException
case _ ⇒ exception
}
throw new ConfigurationException (
"Could not instantiate custom Router of [" +
implClass + "] due to: " +
cause , cause )
}
2011-07-28 15:48:03 +03:00
}
2011-05-24 19:04:25 +02:00
}
/* *
2011-08-27 08:10:25 +03:00
* An Abstract convenience implementation for building an ActorReference that uses a Router .
2011-05-24 19:04:25 +02:00
*/
2011-08-27 08:10:25 +03:00
abstract private [ akka ] class AbstractRoutedActorRef ( val props : RoutedProps ) extends UnsupportedActorRef {
2011-05-24 19:04:25 +02:00
2011-08-29 15:50:40 +02:00
val router = props . routerFactory ( )
2011-05-24 19:04:25 +02:00
2011-08-29 15:50:40 +02:00
override def postMessageToMailbox ( message : Any , channel : UntypedChannel ) = {
2011-07-28 15:48:03 +03:00
val sender = channel match {
case ref : ActorRef ⇒ Some ( ref )
2011-07-28 16:56:35 +03:00
case _ ⇒ None
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
router . route ( message ) ( sender )
2011-07-28 15:48:03 +03:00
}
2011-05-24 19:04:25 +02:00
2011-08-18 11:35:14 +02:00
override def postMessageToMailboxAndCreateFutureResultWithTimeout (
message : Any , timeout : Timeout , channel : UntypedChannel ) : Future [ Any ] = {
2011-07-28 15:48:03 +03:00
val sender = channel match {
case ref : ActorRef ⇒ Some ( ref )
2011-07-28 16:56:35 +03:00
case _ ⇒ None
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
router . route [ Any ] ( message , timeout ) ( sender )
2011-05-24 19:04:25 +02:00
}
2011-08-27 08:10:25 +03:00
}
/* *
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on ( or more ) of these actors .
*/
2011-09-19 15:20:52 +02:00
private [ akka ] class RoutedActorRef ( val routedProps : RoutedProps , val address : String ) extends AbstractRoutedActorRef ( routedProps ) {
2011-08-27 08:10:25 +03:00
2011-09-15 08:12:07 +02:00
@volatile
private var running : Boolean = true
def isShutdown : Boolean = ! running
2011-05-24 19:04:25 +02:00
2011-07-28 15:48:03 +03:00
def stop ( ) {
synchronized {
2011-09-15 08:12:07 +02:00
if ( running ) {
running = false
2011-09-28 17:43:51 +02:00
router . route ( Routing . Broadcast ( PoisonPill ) ) ( Some ( this ) )
2011-07-28 15:48:03 +03:00
}
}
}
2011-09-08 11:02:17 +02:00
2011-10-07 15:42:55 +02:00
router . init ( routedProps . connectionManager )
2011-07-28 15:48:03 +03:00
}
2010-03-04 19:02:23 +01:00
2011-07-28 15:48:03 +03:00
/* *
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method .
*
2011-08-12 14:35:45 +02:00
* FIXME : this is also the location where message buffering should be done in case of failure .
2011-07-28 15:48:03 +03:00
*/
trait BasicRouter extends Router {
2010-02-13 21:45:35 +01:00
2011-08-12 10:03:33 +03:00
@volatile
2011-10-07 15:42:55 +02:00
protected var connectionManager : ConnectionManager = _
2011-08-12 10:03:33 +03:00
2011-10-07 15:42:55 +02:00
def init ( connectionManager : ConnectionManager ) = {
this . connectionManager = connectionManager
2011-08-12 10:03:33 +03:00
}
2011-08-29 15:50:40 +02:00
def route ( message : Any ) ( implicit sender : Option [ ActorRef ] ) = message match {
2011-07-28 15:48:03 +03:00
case Routing . Broadcast ( message ) ⇒
//it is a broadcast message, we are going to send to message to all connections.
2011-10-07 15:42:55 +02:00
connectionManager . connections . iterable foreach { connection ⇒
2011-07-28 15:48:03 +03:00
try {
2011-09-20 21:44:50 +02:00
connection . ! ( message ) ( sender ) // we use original sender, so this is essentially a 'forward'
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e : Exception ⇒
2011-10-07 15:42:55 +02:00
connectionManager . remove ( connection )
2011-07-28 15:48:03 +03:00
throw e
2011-09-20 21:44:50 +02:00
}
}
2011-07-28 15:48:03 +03:00
case _ ⇒
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
2011-09-20 21:44:50 +02:00
case Some ( connection ) ⇒
2011-07-28 15:48:03 +03:00
try {
2011-09-20 21:44:50 +02:00
connection . ! ( message ) ( sender ) // we use original sender, so this is essentially a 'forward'
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e : Exception ⇒
2011-10-07 15:42:55 +02:00
connectionManager . remove ( connection )
2011-07-28 15:48:03 +03:00
throw e
}
2011-07-28 16:56:35 +03:00
case None ⇒
2011-08-29 15:50:40 +02:00
throwNoConnectionsError
2011-07-28 15:48:03 +03:00
}
}
2011-07-28 16:56:35 +03:00
def route [ T ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ T ] = message match {
2011-07-28 15:48:03 +03:00
case Routing . Broadcast ( message ) ⇒
2011-09-01 14:58:18 +02:00
throw new RoutingException ( "Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter." )
2011-07-28 15:48:03 +03:00
case _ ⇒
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
2011-09-20 21:44:50 +02:00
case Some ( connection ) ⇒
2011-07-28 15:48:03 +03:00
try {
2011-09-08 19:46:05 +02:00
// FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef
2011-09-20 21:44:50 +02:00
connection . ? ( message , timeout ) ( sender ) . asInstanceOf [ Future [ T ] ]
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e : Exception ⇒
2011-10-07 15:42:55 +02:00
connectionManager . remove ( connection )
2011-07-28 15:48:03 +03:00
throw e
}
2011-07-28 16:56:35 +03:00
case None ⇒
2011-08-29 15:50:40 +02:00
throwNoConnectionsError
2011-07-28 15:48:03 +03:00
}
}
protected def next : Option [ ActorRef ]
2011-08-29 15:50:40 +02:00
private def throwNoConnectionsError = {
2011-07-28 15:48:03 +03:00
val error = new RoutingException ( "No replica connections for router" )
throw error
}
2010-05-21 20:08:49 +02:00
}
2011-05-17 21:15:27 +02:00
/* *
2011-08-29 09:22:14 +03:00
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef .
*
2011-07-28 15:48:03 +03:00
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-05-17 21:15:27 +02:00
*/
2011-08-12 10:03:33 +03:00
class DirectRouter extends BasicRouter {
2011-08-29 15:50:40 +02:00
private val state = new AtomicReference [ DirectRouterState ]
2011-07-28 15:48:03 +03:00
lazy val next : Option [ ActorRef ] = {
2011-10-07 15:42:55 +02:00
val current = currentState
if ( current . ref == null ) None else Some ( current . ref )
2011-08-12 10:03:33 +03:00
}
@tailrec
2011-10-07 15:42:55 +02:00
private def currentState : DirectRouterState = {
val current = state . get
2011-08-12 10:03:33 +03:00
2011-10-07 15:42:55 +02:00
if ( current != null && connectionManager . version == current . version ) {
2011-08-12 10:03:33 +03:00
//we are lucky since nothing has changed in the connections.
2011-10-07 15:42:55 +02:00
current
2011-08-12 10:03:33 +03:00
} else {
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
2011-10-07 15:42:55 +02:00
val connections = connectionManager . connections
2011-08-12 10:03:33 +03:00
2011-10-07 15:42:55 +02:00
val connectionCount = connections . iterable . size
2011-08-27 08:10:25 +03:00
if ( connectionCount > 1 )
throw new RoutingException ( "A DirectRouter can't have more than 1 connected Actor, but found [%s]" . format ( connectionCount ) )
2011-08-12 10:03:33 +03:00
2011-10-07 15:42:55 +02:00
val newState = new DirectRouterState ( connections . iterable . head , connections . version )
if ( state . compareAndSet ( current , newState ) )
2011-08-12 10:03:33 +03:00
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
2011-10-07 15:42:55 +02:00
currentState // recur
2011-08-12 10:03:33 +03:00
}
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
2011-08-30 18:42:35 +02:00
private case class DirectRouterState ( ref : ActorRef , version : Long )
2010-05-21 20:08:49 +02:00
}
2011-05-17 21:15:27 +02:00
/* *
2011-07-28 15:48:03 +03:00
* A Router that randomly selects one of the target connections to send a message to .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-05-17 21:15:27 +02:00
*/
2011-08-12 10:03:33 +03:00
class RandomRouter extends BasicRouter {
2011-08-29 15:50:40 +02:00
private val state = new AtomicReference [ RandomRouterState ]
2011-07-28 15:48:03 +03:00
2011-08-12 14:35:45 +02:00
//FIXME: threadlocal random?
2011-08-29 15:50:40 +02:00
private val random = new java . util . Random ( System . nanoTime )
2011-07-28 15:48:03 +03:00
2011-10-07 15:42:55 +02:00
def next : Option [ ActorRef ] = currentState . array match {
2011-08-26 17:25:18 +02:00
case a if a . isEmpty ⇒ None
case a ⇒ Some ( a ( random . nextInt ( a . length ) ) )
2011-08-12 10:03:33 +03:00
}
@tailrec
2011-10-07 15:42:55 +02:00
private def currentState : RandomRouterState = {
val current = state . get
2011-08-12 10:03:33 +03:00
2011-10-07 15:42:55 +02:00
if ( current != null && current . version == connectionManager . version ) {
2011-08-12 10:03:33 +03:00
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
2011-10-07 15:42:55 +02:00
current
2011-08-12 10:03:33 +03:00
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
2011-08-18 11:35:14 +02:00
2011-10-07 15:42:55 +02:00
val connections = connectionManager . connections
val newState = new RandomRouterState ( connections . iterable . toIndexedSeq , connections . version )
if ( state . compareAndSet ( current , newState ) )
2011-08-12 10:03:33 +03:00
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
2011-10-07 15:42:55 +02:00
currentState
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
}
2011-08-26 17:25:18 +02:00
private case class RandomRouterState ( array : IndexedSeq [ ActorRef ] , version : Long )
2011-05-17 21:15:27 +02:00
}
/* *
2011-08-12 10:03:33 +03:00
* A Router that uses round - robin to select a connection . For concurrent calls , round robin is just a best effort .
2011-07-28 15:48:03 +03:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-05-17 21:15:27 +02:00
*/
2011-08-12 10:03:33 +03:00
class RoundRobinRouter extends BasicRouter {
2011-05-17 21:15:27 +02:00
2011-08-29 15:50:40 +02:00
private val state = new AtomicReference [ RoundRobinState ]
2011-05-17 21:15:27 +02:00
2011-10-07 15:42:55 +02:00
def next : Option [ ActorRef ] = currentState . next
2011-05-17 21:15:27 +02:00
2011-08-12 10:03:33 +03:00
@tailrec
2011-10-07 15:42:55 +02:00
private def currentState : RoundRobinState = {
val current = state . get
2011-05-17 21:15:27 +02:00
2011-10-07 15:42:55 +02:00
if ( current != null && current . version == connectionManager . version ) {
2011-08-12 10:03:33 +03:00
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
2011-10-07 15:42:55 +02:00
current
2011-08-12 10:03:33 +03:00
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
2011-08-18 11:35:14 +02:00
2011-10-07 15:42:55 +02:00
val connections = connectionManager . connections
val newState = new RoundRobinState ( connections . iterable . toIndexedSeq [ ActorRef ] , connections . version )
if ( state . compareAndSet ( current , newState ) )
2011-08-12 10:03:33 +03:00
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
2011-10-07 15:42:55 +02:00
currentState
2011-05-21 15:37:09 +02:00
}
2011-08-12 10:03:33 +03:00
}
2011-08-26 17:25:18 +02:00
private case class RoundRobinState ( array : IndexedSeq [ ActorRef ] , version : Long ) {
2011-08-12 10:03:33 +03:00
private val index = new AtomicInteger ( 0 )
2011-08-29 15:50:40 +02:00
def next : Option [ ActorRef ] = if ( array . isEmpty ) None else Some ( array ( nextIndex ) )
2011-05-21 15:37:09 +02:00
2011-08-12 10:03:33 +03:00
@tailrec
2011-08-29 15:50:40 +02:00
private def nextIndex : Int = {
val oldIndex = index . get
2011-08-12 10:03:33 +03:00
var newIndex = if ( oldIndex == array . length - 1 ) 0 else oldIndex + 1
2011-08-29 15:50:40 +02:00
if ( ! index . compareAndSet ( oldIndex , newIndex ) ) nextIndex
2011-08-12 10:03:33 +03:00
else oldIndex
}
2011-05-17 21:15:27 +02:00
}
}
2011-08-17 10:21:27 +03:00
2011-08-29 15:50:40 +02:00
/* *
2011-08-17 10:21:27 +03:00
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
* specified strategy ( specific router needs to implement `gather` method ) .
2011-08-26 08:24:26 +02:00
* Scatter - gather pattern will be applied only to the messages broadcasted using Future
2011-08-17 10:21:27 +03:00
* ( wrapped into { @link Routing . Broadcast } and sent with "?" method ) . For the messages , sent in a fire - forget
2011-08-26 08:24:26 +02:00
* mode , the router would behave as { @link BasicRouter } , unless it 's mixed in with other router type
*
2011-08-17 10:21:27 +03:00
* FIXME : This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected .
* FIXME : this is also the location where message buffering should be done in case of failure .
*/
trait ScatterGatherRouter extends BasicRouter with Serializable {
2011-08-29 15:50:40 +02:00
/* *
2011-10-07 15:42:55 +02:00
* Aggregates the responses into a single Future .
*
2011-08-29 15:50:40 +02:00
* @param results Futures of the responses from connections
*/
2011-08-17 10:21:27 +03:00
protected def gather [ S , G >: S ] ( results : Iterable [ Future [ S ] ] ) : Future [ G ]
private def scatterGather [ S , G >: S ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ G ] = {
2011-10-07 15:42:55 +02:00
val responses = connectionManager . connections . iterable . flatMap { actor ⇒
2011-08-17 10:21:27 +03:00
try {
2011-09-26 15:43:11 +02:00
if ( actor . isShutdown ) throw new ActorInitializationException ( "For compatability - check death first" )
2011-08-17 10:21:27 +03:00
Some ( actor . ? ( message , timeout ) ( sender ) . asInstanceOf [ Future [ S ] ] )
} catch {
case e : Exception ⇒
2011-10-07 15:42:55 +02:00
connectionManager . remove ( actor )
2011-08-17 10:21:27 +03:00
None
}
}
2011-08-26 17:25:18 +02:00
if ( responses . isEmpty )
2011-08-17 10:21:27 +03:00
throw new RoutingException ( "No connections can process the message [%s] sent to scatter-gather router" format ( message ) )
2011-08-29 15:50:40 +02:00
else gather ( responses )
2011-08-17 10:21:27 +03:00
}
override def route [ T ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ T ] = message match {
case Routing . Broadcast ( message ) ⇒ scatterGather ( message , timeout )
case message ⇒ super . route ( message , timeout ) ( sender )
}
}
2011-08-29 15:50:40 +02:00
/* *
2011-08-17 10:21:27 +03:00
* Simple router that broadcasts the message to all connections , and replies with the first response
2011-08-26 08:24:26 +02:00
* Scatter - gather pattern will be applied only to the messages broadcasted using Future
2011-08-17 10:21:27 +03:00
* ( wrapped into { @link Routing . Broadcast } and sent with "?" method ) . For the messages sent in a fire - forget
2011-08-26 08:24:26 +02:00
* mode , the router would behave as { @link RoundRobinRouter }
2011-08-17 10:21:27 +03:00
*/
2011-10-06 21:19:46 +02:00
class ScatterGatherFirstCompletedRouter ( implicit val dispatcher : MessageDispatcher , timeout : Timeout ) extends RoundRobinRouter with ScatterGatherRouter {
2011-08-17 10:21:27 +03:00
2011-09-08 15:54:06 +02:00
protected def gather [ S , G >: S ] ( results : Iterable [ Future [ S ] ] ) : Future [ G ] = Future . firstCompletedOf ( results )
2011-08-17 10:21:27 +03:00
}