2010-05-03 19:32:40 +02:00
/* *
2011-07-14 16:03:08 +02:00
* Copyright ( C ) 2009 - 2011 Typesafe Inc . < http : //www.typesafe.com>
2010-05-03 19:32:40 +02:00
*/
2010-10-26 12:49:25 +02:00
package akka.routing
2010-02-13 21:45:35 +01:00
2011-05-24 19:04:25 +02:00
import akka.AkkaException
2011-07-28 15:48:03 +03:00
import akka.actor._
import akka.event.EventHandler
2011-08-31 15:07:18 +02:00
import akka.config.ConfigurationException
2011-07-28 15:48:03 +03:00
import akka.actor.UntypedChannel._
2011-08-27 08:10:25 +03:00
import akka.dispatch. { Future , Futures }
import akka.util.ReflectiveAccess
2011-08-30 14:31:59 +02:00
import java.net.InetSocketAddress
2011-08-31 15:07:18 +02:00
import java.lang.reflect.InvocationTargetException
2011-08-30 14:31:59 +02:00
import java.util.concurrent.atomic. { AtomicReference , AtomicInteger }
2011-08-27 08:10:25 +03:00
2011-08-30 14:31:59 +02:00
import scala.annotation.tailrec
2011-08-27 08:10:25 +03:00
2011-08-12 10:03:33 +03:00
/* *
* The Router is responsible for sending a message to one ( or more ) of its connections . Connections are stored in the
2011-08-30 14:31:59 +02:00
* { @link FailureDetector } and each Router should be linked to only one { @link FailureDetector } .
2011-08-12 10:03:33 +03:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
trait Router {
/* *
* Initializes this Router with a given set of Connections . The Router can use this datastructure to ask for
* the current connections , signal that there were problems with one of the connections and see if there have
* been changes in the connections .
*
* This method is not threadsafe , and should only be called once
*
* JMM Guarantees :
2011-08-12 10:30:26 +03:00
* This method guarantees that all changes made in this method , are visible before one of the routing methods is called .
2011-08-12 10:03:33 +03:00
*/
2011-08-30 14:31:59 +02:00
def init ( connections : FailureDetector )
2011-08-12 10:03:33 +03:00
/* *
* Routes the message to one of the connections .
*
* @throws RoutingException if something goes wrong while routing the message
*/
2011-08-29 15:50:40 +02:00
def route ( message : Any ) ( implicit sender : Option [ ActorRef ] )
2011-08-12 10:03:33 +03:00
/* *
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
* completion of the processing of the message .
*
* @throws RoutingExceptionif something goes wrong while routing the message .
*/
def route [ T ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ T ]
}
2011-08-30 14:31:59 +02:00
/* *
* An Iterable that also contains a version .
*/
2011-08-30 18:42:35 +02:00
trait VersionedIterable [ A ] {
val version : Long
def iterable : Iterable [ A ]
def apply ( ) : Iterable [ A ] = iterable
2011-08-30 14:31:59 +02:00
}
2011-08-12 10:03:33 +03:00
/* *
2011-08-27 08:10:25 +03:00
* An { @link AkkaException } thrown when something goes wrong while routing a message
*/
class RoutingException ( message : String ) extends AkkaException ( message )
/* *
2011-08-30 14:31:59 +02:00
* Default "local" failure detector . This failure detector removes an actor from the
* router if an exception occured in the router 's thread ( e . g . when trying to add
* the message to the receiver 's mailbox ) .
2011-08-27 08:10:25 +03:00
*/
2011-08-31 15:07:18 +02:00
class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector {
2011-08-30 14:31:59 +02:00
2011-09-08 19:46:05 +02:00
case class State ( version : Long , iterable : Iterable [ ActorRef ] ) extends VersionedIterable [ ActorRef ]
2011-08-30 18:42:35 +02:00
private val state = new AtomicReference [ State ]
2011-08-30 14:31:59 +02:00
def this ( connectionIterable : Iterable [ ActorRef ] ) = {
this ( )
2011-08-30 18:42:35 +02:00
state . set ( State ( Long . MinValue , connectionIterable ) )
2011-08-30 14:31:59 +02:00
}
2011-09-08 19:46:05 +02:00
def isAvailable ( connection : InetSocketAddress ) : Boolean =
state . get . iterable . find ( c ⇒ connection == c ) . isDefined
def recordSuccess ( connection : InetSocketAddress , timestamp : Long ) { }
def recordFailure ( connection : InetSocketAddress , timestamp : Long ) { }
2011-08-30 14:31:59 +02:00
def version : Long = state . get . version
def size : Int = state . get . iterable . size
def versionedIterable = state . get
def stopAll ( ) {
state . get . iterable foreach ( _ . stop ( ) )
}
def failOver ( from : InetSocketAddress , to : InetSocketAddress ) { } // do nothing here
@tailrec
final def remove ( ref : ActorRef ) = {
val oldState = state . get
//remote the ref from the connections.
var newList = oldState . iterable . filter ( currentActorRef ⇒ currentActorRef ne ref )
if ( newList . size != oldState . iterable . size ) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
2011-08-30 18:42:35 +02:00
val newState = State ( oldState . version + 1 , newList )
2011-08-30 14:31:59 +02:00
//if we are not able to update the state, we just try again.
if ( ! state . compareAndSet ( oldState , newState ) ) remove ( ref )
}
}
}
2011-08-27 08:10:25 +03:00
/* *
* A Helper class to create actor references that use routing .
*/
2011-07-28 15:48:03 +03:00
object Routing {
2011-05-24 19:04:25 +02:00
2011-07-28 15:48:03 +03:00
sealed trait RoutingMessage
2011-05-24 19:04:25 +02:00
2011-07-28 15:48:03 +03:00
case class Broadcast ( message : Any ) extends RoutingMessage
2011-05-24 19:04:25 +02:00
2011-08-27 08:10:25 +03:00
/* *
2011-08-29 15:50:40 +02:00
* FIXME : will very likely be moved to the ActorRef .
2011-08-27 08:10:25 +03:00
*/
2011-09-19 15:20:52 +02:00
def actorOf ( props : RoutedProps , address : String = newUuid ( ) . toString ) : ActorRef = {
2011-08-27 08:10:25 +03:00
//TODO Implement support for configuring by deployment ID etc
2011-09-19 15:20:52 +02:00
//TODO If address matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
2011-08-27 08:10:25 +03:00
//TODO If the actor deployed uses a different config, then ignore or throw exception?
val clusteringEnabled = ReflectiveAccess . ClusterModule . isEnabled
val localOnly = props . localOnly
2011-09-08 11:02:17 +02:00
if ( clusteringEnabled && ! props . localOnly )
ReflectiveAccess . ClusterModule . newClusteredActorRef ( props )
2011-08-30 18:42:35 +02:00
else {
2011-09-19 15:20:52 +02:00
if ( props . connections . isEmpty ) //FIXME Shouldn't this be checked when instance is created so that it works with linking instead of barfing?
2011-08-29 09:22:14 +03:00
throw new IllegalArgumentException ( "A routed actorRef can't have an empty connection set" )
2011-09-19 15:20:52 +02:00
new RoutedActorRef ( props , address )
2011-08-27 08:10:25 +03:00
}
}
2011-07-28 16:56:35 +03:00
/* *
2011-07-28 15:48:03 +03:00
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors .
*
* @param actorAddress the address of the ActorRef .
* @param connections an Iterable pointing to all connected actor references .
* @param routerType the type of routing that should be used .
* @throws IllegalArgumentException if the number of connections is zero , or if it depends on the actual router implementation
* how many connections it can handle .
*/
2011-08-30 14:31:59 +02:00
@deprecated ( "Use 'Routing.actorOf(props: RoutedProps)' instead." , "2.0" )
2011-08-01 09:01:15 +03:00
def actorOf ( actorAddress : String , connections : Iterable [ ActorRef ] , routerType : RouterType ) : ActorRef = {
2011-08-26 17:25:18 +02:00
val router = routerType match {
case RouterType . Direct if connections . size > 1 ⇒
throw new IllegalArgumentException ( "A direct router can't have more than 1 connection" )
2011-08-30 18:42:35 +02:00
2011-07-28 16:56:35 +03:00
case RouterType . Direct ⇒
2011-08-29 15:50:40 +02:00
new DirectRouter
2011-08-30 18:42:35 +02:00
2011-07-28 16:56:35 +03:00
case RouterType . Random ⇒
2011-08-29 15:50:40 +02:00
new RandomRouter
2011-08-30 18:42:35 +02:00
2011-07-28 16:56:35 +03:00
case RouterType . RoundRobin ⇒
2011-08-29 15:50:40 +02:00
new RoundRobinRouter
2011-08-30 18:42:35 +02:00
2011-08-26 17:25:18 +02:00
case r ⇒
throw new IllegalArgumentException ( "Unsupported routerType " + r )
2011-07-28 15:48:03 +03:00
}
2011-05-24 19:04:25 +02:00
2011-08-17 10:21:27 +03:00
if ( connections . size == 0 )
throw new IllegalArgumentException ( "To create a routed actor ref, at least one connection is required" )
2011-08-29 15:50:40 +02:00
new RoutedActorRef (
2011-08-30 14:31:59 +02:00
new RoutedProps (
( ) ⇒ router ,
RoutedProps . defaultFailureDetectorFactory ,
connections ,
2011-09-19 15:20:52 +02:00
RoutedProps . defaultTimeout , true ) ,
actorAddress )
2011-07-28 15:48:03 +03:00
}
2011-05-24 19:04:25 +02:00
}
/* *
2011-08-27 08:10:25 +03:00
* An Abstract convenience implementation for building an ActorReference that uses a Router .
2011-05-24 19:04:25 +02:00
*/
2011-08-27 08:10:25 +03:00
abstract private [ akka ] class AbstractRoutedActorRef ( val props : RoutedProps ) extends UnsupportedActorRef {
2011-05-24 19:04:25 +02:00
2011-08-29 15:50:40 +02:00
val router = props . routerFactory ( )
2011-05-24 19:04:25 +02:00
2011-08-29 15:50:40 +02:00
override def postMessageToMailbox ( message : Any , channel : UntypedChannel ) = {
2011-07-28 15:48:03 +03:00
val sender = channel match {
case ref : ActorRef ⇒ Some ( ref )
2011-07-28 16:56:35 +03:00
case _ ⇒ None
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
router . route ( message ) ( sender )
2011-07-28 15:48:03 +03:00
}
2011-05-24 19:04:25 +02:00
2011-08-18 11:35:14 +02:00
override def postMessageToMailboxAndCreateFutureResultWithTimeout (
message : Any , timeout : Timeout , channel : UntypedChannel ) : Future [ Any ] = {
2011-07-28 15:48:03 +03:00
val sender = channel match {
case ref : ActorRef ⇒ Some ( ref )
2011-07-28 16:56:35 +03:00
case _ ⇒ None
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
router . route [ Any ] ( message , timeout ) ( sender )
2011-05-24 19:04:25 +02:00
}
2011-08-27 08:10:25 +03:00
}
/* *
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on ( or more ) of these actors .
*/
2011-09-19 15:20:52 +02:00
private [ akka ] class RoutedActorRef ( val routedProps : RoutedProps , val address : String ) extends AbstractRoutedActorRef ( routedProps ) {
2011-08-27 08:10:25 +03:00
2011-09-15 08:12:07 +02:00
@volatile
private var running : Boolean = true
def isRunning : Boolean = running
def isShutdown : Boolean = ! running
2011-05-24 19:04:25 +02:00
2011-07-28 15:48:03 +03:00
def stop ( ) {
synchronized {
2011-09-15 08:12:07 +02:00
if ( running ) {
running = false
2011-07-28 15:48:03 +03:00
postMessageToMailbox ( RemoteActorSystemMessage . Stop , None )
}
}
}
2011-09-08 11:02:17 +02:00
2011-09-15 08:12:07 +02:00
router . init ( new RemoveConnectionOnFirstFailureLocalFailureDetector ( routedProps . connections ) )
2011-07-28 15:48:03 +03:00
}
2010-03-04 19:02:23 +01:00
2011-07-28 15:48:03 +03:00
/* *
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method .
*
2011-08-12 14:35:45 +02:00
* FIXME : this is also the location where message buffering should be done in case of failure .
2011-07-28 15:48:03 +03:00
*/
trait BasicRouter extends Router {
2010-02-13 21:45:35 +01:00
2011-08-12 10:03:33 +03:00
@volatile
2011-08-30 14:31:59 +02:00
protected var connections : FailureDetector = _
2011-08-12 10:03:33 +03:00
2011-08-30 14:31:59 +02:00
def init ( connections : FailureDetector ) = {
2011-08-12 10:03:33 +03:00
this . connections = connections
}
2011-08-29 15:50:40 +02:00
def route ( message : Any ) ( implicit sender : Option [ ActorRef ] ) = message match {
2011-07-28 15:48:03 +03:00
case Routing . Broadcast ( message ) ⇒
//it is a broadcast message, we are going to send to message to all connections.
2011-08-27 08:10:25 +03:00
connections . versionedIterable . iterable . foreach ( actor ⇒
2011-07-28 15:48:03 +03:00
try {
2011-09-08 19:46:05 +02:00
actor . ! ( message ) ( sender ) // we use original sender, so this is essentially a 'forward'
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e : Exception ⇒
2011-08-29 11:44:33 +02:00
connections . remove ( actor )
2011-07-28 15:48:03 +03:00
throw e
2011-07-28 16:56:35 +03:00
} )
2011-07-28 15:48:03 +03:00
case _ ⇒
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
2011-07-28 16:56:35 +03:00
case Some ( actor ) ⇒
2011-07-28 15:48:03 +03:00
try {
2011-09-08 19:46:05 +02:00
actor . ! ( message ) ( sender ) // we use original sender, so this is essentially a 'forward'
2011-07-28 15:48:03 +03:00
} catch {
2011-07-28 16:56:35 +03:00
case e : Exception ⇒
2011-08-29 11:44:33 +02:00
connections . remove ( actor )
2011-07-28 15:48:03 +03:00
throw e
}
2011-07-28 16:56:35 +03:00
case None ⇒
2011-08-29 15:50:40 +02:00
throwNoConnectionsError
2011-07-28 15:48:03 +03:00
}
}
2011-07-28 16:56:35 +03:00
def route [ T ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ T ] = message match {
2011-07-28 15:48:03 +03:00
case Routing . Broadcast ( message ) ⇒
2011-09-01 14:58:18 +02:00
throw new RoutingException ( "Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter." )
2011-07-28 15:48:03 +03:00
case _ ⇒
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
next match {
2011-07-28 16:56:35 +03:00
case Some ( actor ) ⇒
2011-07-28 15:48:03 +03:00
try {
2011-09-08 19:46:05 +02:00
// FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef
2011-07-28 15:48:03 +03:00
actor . ? ( message , timeout ) ( sender ) . asInstanceOf [ Future [ T ] ]
} catch {
2011-07-28 16:56:35 +03:00
case e : Exception ⇒
2011-08-29 11:44:33 +02:00
connections . remove ( actor )
2011-07-28 15:48:03 +03:00
throw e
}
2011-07-28 16:56:35 +03:00
case None ⇒
2011-08-29 15:50:40 +02:00
throwNoConnectionsError
2011-07-28 15:48:03 +03:00
}
}
protected def next : Option [ ActorRef ]
2011-08-29 15:50:40 +02:00
private def throwNoConnectionsError = {
2011-07-28 15:48:03 +03:00
val error = new RoutingException ( "No replica connections for router" )
EventHandler . error ( error , this , error . toString )
throw error
}
2010-05-21 20:08:49 +02:00
}
2011-05-17 21:15:27 +02:00
/* *
2011-08-29 09:22:14 +03:00
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef .
*
2011-07-28 15:48:03 +03:00
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-05-17 21:15:27 +02:00
*/
2011-08-12 10:03:33 +03:00
class DirectRouter extends BasicRouter {
2011-08-29 15:50:40 +02:00
private val state = new AtomicReference [ DirectRouterState ]
2011-07-28 15:48:03 +03:00
lazy val next : Option [ ActorRef ] = {
2011-08-29 15:50:40 +02:00
val currentState = getState
2011-08-12 10:03:33 +03:00
if ( currentState . ref == null ) None else Some ( currentState . ref )
}
2011-09-08 19:46:05 +02:00
// FIXME rename all 'getState' methods to 'currentState', non-scala
2011-08-12 10:03:33 +03:00
@tailrec
2011-08-29 15:50:40 +02:00
private def getState : DirectRouterState = {
val currentState = state . get
2011-08-12 10:03:33 +03:00
if ( currentState != null && connections . version == currentState . version ) {
//we are lucky since nothing has changed in the connections.
currentState
} else {
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
2011-08-27 08:10:25 +03:00
val versionedIterable = connections . versionedIterable
2011-08-12 10:03:33 +03:00
2011-08-27 08:10:25 +03:00
val connectionCount = versionedIterable . iterable . size
if ( connectionCount > 1 )
throw new RoutingException ( "A DirectRouter can't have more than 1 connected Actor, but found [%s]" . format ( connectionCount ) )
2011-08-12 10:03:33 +03:00
2011-08-27 08:10:25 +03:00
val newState = new DirectRouterState ( versionedIterable . iterable . head , versionedIterable . version )
2011-08-29 09:22:14 +03:00
if ( state . compareAndSet ( currentState , newState ) )
2011-08-12 10:03:33 +03:00
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
2011-08-29 15:50:40 +02:00
getState
2011-08-12 10:03:33 +03:00
}
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
2011-08-30 18:42:35 +02:00
private case class DirectRouterState ( ref : ActorRef , version : Long )
2010-05-21 20:08:49 +02:00
}
2011-05-17 21:15:27 +02:00
/* *
2011-07-28 15:48:03 +03:00
* A Router that randomly selects one of the target connections to send a message to .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-05-17 21:15:27 +02:00
*/
2011-08-12 10:03:33 +03:00
class RandomRouter extends BasicRouter {
2011-08-29 15:50:40 +02:00
private val state = new AtomicReference [ RandomRouterState ]
2011-07-28 15:48:03 +03:00
2011-08-12 14:35:45 +02:00
//FIXME: threadlocal random?
2011-08-29 15:50:40 +02:00
private val random = new java . util . Random ( System . nanoTime )
2011-07-28 15:48:03 +03:00
2011-08-29 15:50:40 +02:00
def next : Option [ ActorRef ] = getState . array match {
2011-08-26 17:25:18 +02:00
case a if a . isEmpty ⇒ None
case a ⇒ Some ( a ( random . nextInt ( a . length ) ) )
2011-08-12 10:03:33 +03:00
}
@tailrec
2011-08-29 15:50:40 +02:00
private def getState : RandomRouterState = {
val currentState = state . get
2011-08-12 10:03:33 +03:00
if ( currentState != null && currentState . version == connections . version ) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
currentState
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
2011-08-18 11:35:14 +02:00
2011-08-27 08:10:25 +03:00
val versionedIterable = connections . versionedIterable
2011-08-29 09:22:14 +03:00
val newState = new RandomRouterState ( versionedIterable . iterable . toIndexedSeq , versionedIterable . version )
if ( state . compareAndSet ( currentState , newState ) )
2011-08-12 10:03:33 +03:00
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
2011-08-29 15:50:40 +02:00
getState
2011-07-28 15:48:03 +03:00
}
2011-08-12 10:03:33 +03:00
}
2011-08-26 17:25:18 +02:00
private case class RandomRouterState ( array : IndexedSeq [ ActorRef ] , version : Long )
2011-05-17 21:15:27 +02:00
}
/* *
2011-08-12 10:03:33 +03:00
* A Router that uses round - robin to select a connection . For concurrent calls , round robin is just a best effort .
2011-07-28 15:48:03 +03:00
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
2011-05-17 21:15:27 +02:00
*/
2011-08-12 10:03:33 +03:00
class RoundRobinRouter extends BasicRouter {
2011-05-17 21:15:27 +02:00
2011-08-29 15:50:40 +02:00
private val state = new AtomicReference [ RoundRobinState ]
2011-05-17 21:15:27 +02:00
2011-08-29 15:50:40 +02:00
def next : Option [ ActorRef ] = getState . next
2011-05-17 21:15:27 +02:00
2011-08-12 10:03:33 +03:00
@tailrec
2011-08-29 15:50:40 +02:00
private def getState : RoundRobinState = {
val currentState = state . get
2011-05-17 21:15:27 +02:00
2011-08-12 10:03:33 +03:00
if ( currentState != null && currentState . version == connections . version ) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
currentState
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
2011-08-18 11:35:14 +02:00
2011-08-27 08:10:25 +03:00
val versionedIterable = connections . versionedIterable
2011-08-29 09:22:14 +03:00
val newState = new RoundRobinState ( versionedIterable . iterable . toIndexedSeq [ ActorRef ] , versionedIterable . version )
if ( state . compareAndSet ( currentState , newState ) )
2011-08-12 10:03:33 +03:00
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
2011-08-26 17:25:18 +02:00
else //we failed to update the state, lets try again... better luck next time.
2011-08-29 15:50:40 +02:00
getState
2011-05-21 15:37:09 +02:00
}
2011-08-12 10:03:33 +03:00
}
2011-08-26 17:25:18 +02:00
private case class RoundRobinState ( array : IndexedSeq [ ActorRef ] , version : Long ) {
2011-08-12 10:03:33 +03:00
private val index = new AtomicInteger ( 0 )
2011-08-29 15:50:40 +02:00
def next : Option [ ActorRef ] = if ( array . isEmpty ) None else Some ( array ( nextIndex ) )
2011-05-21 15:37:09 +02:00
2011-08-12 10:03:33 +03:00
@tailrec
2011-08-29 15:50:40 +02:00
private def nextIndex : Int = {
val oldIndex = index . get
2011-08-12 10:03:33 +03:00
var newIndex = if ( oldIndex == array . length - 1 ) 0 else oldIndex + 1
2011-08-29 15:50:40 +02:00
if ( ! index . compareAndSet ( oldIndex , newIndex ) ) nextIndex
2011-08-12 10:03:33 +03:00
else oldIndex
}
2011-05-17 21:15:27 +02:00
}
}
2011-08-17 10:21:27 +03:00
2011-08-29 15:50:40 +02:00
/* *
2011-08-17 10:21:27 +03:00
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
* specified strategy ( specific router needs to implement `gather` method ) .
2011-08-26 08:24:26 +02:00
* Scatter - gather pattern will be applied only to the messages broadcasted using Future
2011-08-17 10:21:27 +03:00
* ( wrapped into { @link Routing . Broadcast } and sent with "?" method ) . For the messages , sent in a fire - forget
2011-08-26 08:24:26 +02:00
* mode , the router would behave as { @link BasicRouter } , unless it 's mixed in with other router type
*
2011-08-17 10:21:27 +03:00
* FIXME : This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected .
* FIXME : this is also the location where message buffering should be done in case of failure .
*/
trait ScatterGatherRouter extends BasicRouter with Serializable {
2011-08-29 15:50:40 +02:00
/* *
* Aggregates the responses into a single Future
* @param results Futures of the responses from connections
*/
2011-08-17 10:21:27 +03:00
protected def gather [ S , G >: S ] ( results : Iterable [ Future [ S ] ] ) : Future [ G ]
private def scatterGather [ S , G >: S ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ G ] = {
2011-08-27 08:10:25 +03:00
val responses = connections . versionedIterable . iterable . flatMap { actor ⇒
2011-08-17 10:21:27 +03:00
try {
Some ( actor . ? ( message , timeout ) ( sender ) . asInstanceOf [ Future [ S ] ] )
} catch {
case e : Exception ⇒
2011-08-29 11:44:33 +02:00
connections . remove ( actor )
2011-08-17 10:21:27 +03:00
None
}
}
2011-08-26 17:25:18 +02:00
if ( responses . isEmpty )
2011-08-17 10:21:27 +03:00
throw new RoutingException ( "No connections can process the message [%s] sent to scatter-gather router" format ( message ) )
2011-08-29 15:50:40 +02:00
else gather ( responses )
2011-08-17 10:21:27 +03:00
}
override def route [ T ] ( message : Any , timeout : Timeout ) ( implicit sender : Option [ ActorRef ] ) : Future [ T ] = message match {
case Routing . Broadcast ( message ) ⇒ scatterGather ( message , timeout )
case message ⇒ super . route ( message , timeout ) ( sender )
}
}
2011-08-29 15:50:40 +02:00
/* *
2011-08-17 10:21:27 +03:00
* Simple router that broadcasts the message to all connections , and replies with the first response
2011-08-26 08:24:26 +02:00
* Scatter - gather pattern will be applied only to the messages broadcasted using Future
2011-08-17 10:21:27 +03:00
* ( wrapped into { @link Routing . Broadcast } and sent with "?" method ) . For the messages sent in a fire - forget
2011-08-26 08:24:26 +02:00
* mode , the router would behave as { @link RoundRobinRouter }
2011-08-17 10:21:27 +03:00
*/
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
2011-09-08 15:54:06 +02:00
protected def gather [ S , G >: S ] ( results : Iterable [ Future [ S ] ] ) : Future [ G ] = Future . firstCompletedOf ( results )
2011-08-17 10:21:27 +03:00
}