2010-05-03 19:32:40 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2010-05-03 19:32:40 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.routing
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-05-24 19:04:25 +02:00
|
|
|
import akka.AkkaException
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.actor._
|
|
|
|
|
import akka.event.EventHandler
|
2011-08-31 15:07:18 +02:00
|
|
|
import akka.config.ConfigurationException
|
2011-10-06 21:19:46 +02:00
|
|
|
import akka.dispatch.{ Future, MessageDispatcher }
|
|
|
|
|
import akka.AkkaApplication
|
2011-08-27 08:10:25 +03:00
|
|
|
import akka.util.ReflectiveAccess
|
2011-08-30 14:31:59 +02:00
|
|
|
import java.net.InetSocketAddress
|
2011-08-31 15:07:18 +02:00
|
|
|
import java.lang.reflect.InvocationTargetException
|
2011-08-30 14:31:59 +02:00
|
|
|
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
|
2011-08-27 08:10:25 +03:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
import scala.annotation.tailrec
|
2011-08-27 08:10:25 +03:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
/**
|
|
|
|
|
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
|
2011-08-30 14:31:59 +02:00
|
|
|
* {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
trait Router {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for
|
|
|
|
|
* the current connections, signal that there were problems with one of the connections and see if there have
|
|
|
|
|
* been changes in the connections.
|
|
|
|
|
*
|
|
|
|
|
* This method is not threadsafe, and should only be called once
|
|
|
|
|
*
|
|
|
|
|
* JMM Guarantees:
|
2011-08-12 10:30:26 +03:00
|
|
|
* This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
2011-10-07 15:42:55 +02:00
|
|
|
def init(connectionManager: ConnectionManager)
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routes the message to one of the connections.
|
|
|
|
|
*
|
|
|
|
|
* @throws RoutingException if something goes wrong while routing the message
|
|
|
|
|
*/
|
2011-10-22 16:06:20 +02:00
|
|
|
def route(message: Any)(implicit sender: ActorRef)
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
|
|
|
|
|
* completion of the processing of the message.
|
|
|
|
|
*
|
|
|
|
|
* @throws RoutingExceptionif something goes wrong while routing the message.
|
|
|
|
|
*/
|
2011-10-22 16:06:20 +02:00
|
|
|
def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T]
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* An {@link AkkaException} thrown when something goes wrong while routing a message
|
|
|
|
|
*/
|
|
|
|
|
class RoutingException(message: String) extends AkkaException(message)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A Helper class to create actor references that use routing.
|
|
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
object Routing {
|
2011-10-07 15:22:36 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
sealed trait RoutingMessage
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
/**
|
2011-10-07 19:42:10 +02:00
|
|
|
* Used to broadcast a message to all connections in a router. E.g. every connection gets the message
|
|
|
|
|
* regardless of their routing algorithm.
|
2011-08-27 08:10:25 +03:00
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
case class Broadcast(message: Any) extends RoutingMessage
|
2011-08-17 10:21:27 +03:00
|
|
|
|
2011-10-11 11:18:47 +02:00
|
|
|
def createCustomRouter(implClass: String): Router = {
|
|
|
|
|
ReflectiveAccess.createInstance(
|
|
|
|
|
implClass,
|
|
|
|
|
Array[Class[_]](),
|
|
|
|
|
Array[AnyRef]()) match {
|
|
|
|
|
case Right(router) ⇒ router.asInstanceOf[Router]
|
|
|
|
|
case Left(exception) ⇒
|
|
|
|
|
val cause = exception match {
|
|
|
|
|
case i: InvocationTargetException ⇒ i.getTargetException
|
|
|
|
|
case _ ⇒ exception
|
|
|
|
|
}
|
|
|
|
|
throw new ConfigurationException(
|
|
|
|
|
"Could not instantiate custom Router of [" +
|
|
|
|
|
implClass + "] due to: " +
|
|
|
|
|
cause, cause)
|
|
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* An Abstract convenience implementation for building an ActorReference that uses a Router.
|
2011-05-24 19:04:25 +02:00
|
|
|
*/
|
2011-10-22 16:06:20 +02:00
|
|
|
abstract private[akka] class AbstractRoutedActorRef(val app: AkkaApplication, val props: RoutedProps) extends UnsupportedActorRef {
|
2011-08-29 15:50:40 +02:00
|
|
|
val router = props.routerFactory()
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
override def postMessageToMailbox(message: Any, sender: ActorRef) = router.route(message)(sender)
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = app.provider.ask(message, this, timeout)
|
2011-08-27 08:10:25 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
|
|
|
|
* on (or more) of these actors.
|
|
|
|
|
*/
|
2011-10-22 16:06:20 +02:00
|
|
|
private[akka] class RoutedActorRef(app: AkkaApplication, val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(app, routedProps) {
|
2011-08-27 08:10:25 +03:00
|
|
|
|
2011-09-15 08:12:07 +02:00
|
|
|
@volatile
|
|
|
|
|
private var running: Boolean = true
|
|
|
|
|
|
2011-10-18 15:39:26 +02:00
|
|
|
override def isShutdown: Boolean = !running
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-10-18 15:39:26 +02:00
|
|
|
override def stop() {
|
2011-07-28 15:48:03 +03:00
|
|
|
synchronized {
|
2011-09-15 08:12:07 +02:00
|
|
|
if (running) {
|
|
|
|
|
running = false
|
2011-10-22 16:06:20 +02:00
|
|
|
router.route(Routing.Broadcast(PoisonPill))(this)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-09-08 11:02:17 +02:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
router.init(routedProps.connectionManager)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2010-03-04 19:02:23 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
/**
|
|
|
|
|
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
|
|
|
|
|
* Router only needs to implement the next method.
|
|
|
|
|
*/
|
|
|
|
|
trait BasicRouter extends Router {
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@volatile
|
2011-10-07 15:42:55 +02:00
|
|
|
protected var connectionManager: ConnectionManager = _
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
def init(connectionManager: ConnectionManager) = {
|
|
|
|
|
this.connectionManager = connectionManager
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
def route(message: Any)(implicit sender: ActorRef) = message match {
|
2011-07-28 15:48:03 +03:00
|
|
|
case Routing.Broadcast(message) ⇒
|
|
|
|
|
//it is a broadcast message, we are going to send to message to all connections.
|
2011-10-07 15:42:55 +02:00
|
|
|
connectionManager.connections.iterable foreach { connection ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
2011-09-20 21:44:50 +02:00
|
|
|
connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
|
2011-07-28 15:48:03 +03:00
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-10-07 15:42:55 +02:00
|
|
|
connectionManager.remove(connection)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
2011-09-20 21:44:50 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
case _ ⇒
|
|
|
|
|
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
|
|
|
next match {
|
2011-09-20 21:44:50 +02:00
|
|
|
case Some(connection) ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
2011-09-20 21:44:50 +02:00
|
|
|
connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
|
2011-07-28 15:48:03 +03:00
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-10-07 15:42:55 +02:00
|
|
|
connectionManager.remove(connection)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
|
|
|
|
}
|
2011-07-28 16:56:35 +03:00
|
|
|
case None ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
throwNoConnectionsError
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] = message match {
|
2011-07-28 15:48:03 +03:00
|
|
|
case Routing.Broadcast(message) ⇒
|
2011-09-01 14:58:18 +02:00
|
|
|
throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.")
|
2011-07-28 15:48:03 +03:00
|
|
|
case _ ⇒
|
|
|
|
|
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
|
|
|
next match {
|
2011-09-20 21:44:50 +02:00
|
|
|
case Some(connection) ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
2011-10-22 16:06:20 +02:00
|
|
|
connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?
|
2011-07-28 15:48:03 +03:00
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-10-07 15:42:55 +02:00
|
|
|
connectionManager.remove(connection)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
|
|
|
|
}
|
2011-07-28 16:56:35 +03:00
|
|
|
case None ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
throwNoConnectionsError
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def next: Option[ActorRef]
|
|
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
private def throwNoConnectionsError = throw new RoutingException("No replica connections for router")
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-08-29 09:22:14 +03:00
|
|
|
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
|
|
|
|
|
*
|
2011-07-28 15:48:03 +03:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class DirectRouter extends BasicRouter {
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private val state = new AtomicReference[DirectRouterState]
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
lazy val next: Option[ActorRef] = {
|
2011-10-07 15:42:55 +02:00
|
|
|
val current = currentState
|
|
|
|
|
if (current.ref == null) None else Some(current.ref)
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
2011-10-07 15:42:55 +02:00
|
|
|
private def currentState: DirectRouterState = {
|
|
|
|
|
val current = state.get
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
if (current != null && connectionManager.version == current.version) {
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since nothing has changed in the connections.
|
2011-10-07 15:42:55 +02:00
|
|
|
current
|
2011-08-12 10:03:33 +03:00
|
|
|
} else {
|
|
|
|
|
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
|
|
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
val connections = connectionManager.connections
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
val connectionCount = connections.iterable.size
|
2011-08-27 08:10:25 +03:00
|
|
|
if (connectionCount > 1)
|
|
|
|
|
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
val newState = new DirectRouterState(connections.iterable.head, connections.version)
|
|
|
|
|
if (state.compareAndSet(current, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-10-07 15:42:55 +02:00
|
|
|
currentState // recur
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-30 18:42:35 +02:00
|
|
|
private case class DirectRouterState(ref: ActorRef, version: Long)
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-07-28 15:48:03 +03:00
|
|
|
* A Router that randomly selects one of the target connections to send a message to.
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class RandomRouter extends BasicRouter {
|
2011-10-28 12:00:06 +02:00
|
|
|
import java.security.SecureRandom
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private val state = new AtomicReference[RandomRouterState]
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-10-28 12:00:06 +02:00
|
|
|
private val random = new ThreadLocal[SecureRandom] {
|
|
|
|
|
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
|
|
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
def next: Option[ActorRef] = currentState.array match {
|
2011-08-26 17:25:18 +02:00
|
|
|
case a if a.isEmpty ⇒ None
|
2011-10-28 12:00:06 +02:00
|
|
|
case a ⇒ Some(a(random.get.nextInt(a.length)))
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
2011-10-07 15:42:55 +02:00
|
|
|
private def currentState: RandomRouterState = {
|
|
|
|
|
val current = state.get
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
if (current != null && current.version == connectionManager.version) {
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
2011-10-07 15:42:55 +02:00
|
|
|
current
|
2011-08-12 10:03:33 +03:00
|
|
|
} else {
|
|
|
|
|
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
val connections = connectionManager.connections
|
|
|
|
|
val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version)
|
|
|
|
|
if (state.compareAndSet(current, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-10-07 15:42:55 +02:00
|
|
|
currentState
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-12 10:03:33 +03:00
|
|
|
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
|
2011-07-28 15:48:03 +03:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class RoundRobinRouter extends BasicRouter {
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private val state = new AtomicReference[RoundRobinState]
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
def next: Option[ActorRef] = currentState.next
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@tailrec
|
2011-10-07 15:42:55 +02:00
|
|
|
private def currentState: RoundRobinState = {
|
|
|
|
|
val current = state.get
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
if (current != null && current.version == connectionManager.version) {
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
2011-10-07 15:42:55 +02:00
|
|
|
current
|
2011-08-12 10:03:33 +03:00
|
|
|
} else {
|
|
|
|
|
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-10-07 15:42:55 +02:00
|
|
|
val connections = connectionManager.connections
|
|
|
|
|
val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version)
|
|
|
|
|
if (state.compareAndSet(current, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-10-07 15:42:55 +02:00
|
|
|
currentState
|
2011-05-21 15:37:09 +02:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
private val index = new AtomicInteger(0)
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex))
|
2011-05-21 15:37:09 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@tailrec
|
2011-08-29 15:50:40 +02:00
|
|
|
private def nextIndex: Int = {
|
|
|
|
|
val oldIndex = index.get
|
2011-08-12 10:03:33 +03:00
|
|
|
var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
if (!index.compareAndSet(oldIndex, newIndex)) nextIndex
|
2011-08-12 10:03:33 +03:00
|
|
|
else oldIndex
|
|
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-08-17 10:21:27 +03:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
/**
|
2011-08-17 10:21:27 +03:00
|
|
|
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
|
|
|
|
|
* specified strategy (specific router needs to implement `gather` method).
|
2011-08-26 08:24:26 +02:00
|
|
|
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
2011-08-17 10:21:27 +03:00
|
|
|
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
|
2011-08-26 08:24:26 +02:00
|
|
|
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
|
|
|
|
|
*
|
2011-08-17 10:21:27 +03:00
|
|
|
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
|
|
|
|
* FIXME: this is also the location where message buffering should be done in case of failure.
|
|
|
|
|
*/
|
|
|
|
|
trait ScatterGatherRouter extends BasicRouter with Serializable {
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
/**
|
2011-10-07 15:42:55 +02:00
|
|
|
* Aggregates the responses into a single Future.
|
|
|
|
|
*
|
2011-08-29 15:50:40 +02:00
|
|
|
* @param results Futures of the responses from connections
|
|
|
|
|
*/
|
2011-08-17 10:21:27 +03:00
|
|
|
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
|
|
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[G] = {
|
2011-10-07 15:42:55 +02:00
|
|
|
val responses = connectionManager.connections.iterable.flatMap { actor ⇒
|
2011-08-17 10:21:27 +03:00
|
|
|
try {
|
2011-10-20 23:37:54 +02:00
|
|
|
if (actor.isShutdown) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace
|
2011-10-22 16:06:20 +02:00
|
|
|
Some(actor.?(message, timeout).asInstanceOf[Future[S]])
|
2011-08-17 10:21:27 +03:00
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
2011-10-07 15:42:55 +02:00
|
|
|
connectionManager.remove(actor)
|
2011-08-17 10:21:27 +03:00
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
if (responses.isEmpty)
|
2011-08-17 10:21:27 +03:00
|
|
|
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
|
2011-08-29 15:50:40 +02:00
|
|
|
else gather(responses)
|
2011-08-17 10:21:27 +03:00
|
|
|
}
|
|
|
|
|
|
2011-10-22 16:06:20 +02:00
|
|
|
override def route[T](message: Any, timeout: Timeout)(implicit sender: ActorRef): Future[T] = message match {
|
2011-08-17 10:21:27 +03:00
|
|
|
case Routing.Broadcast(message) ⇒ scatterGather(message, timeout)
|
|
|
|
|
case message ⇒ super.route(message, timeout)(sender)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
/**
|
2011-08-17 10:21:27 +03:00
|
|
|
* Simple router that broadcasts the message to all connections, and replies with the first response
|
2011-08-26 08:24:26 +02:00
|
|
|
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
2011-08-17 10:21:27 +03:00
|
|
|
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
|
2011-08-26 08:24:26 +02:00
|
|
|
* mode, the router would behave as {@link RoundRobinRouter}
|
2011-08-17 10:21:27 +03:00
|
|
|
*/
|
2011-10-06 21:19:46 +02:00
|
|
|
class ScatterGatherFirstCompletedRouter(implicit val dispatcher: MessageDispatcher, timeout: Timeout) extends RoundRobinRouter with ScatterGatherRouter {
|
2011-08-17 10:21:27 +03:00
|
|
|
|
2011-09-08 15:54:06 +02:00
|
|
|
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Future.firstCompletedOf(results)
|
2011-08-17 10:21:27 +03:00
|
|
|
}
|