2010-05-03 19:32:40 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2010-05-03 19:32:40 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.routing
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-05-24 19:04:25 +02:00
|
|
|
import akka.AkkaException
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.actor._
|
|
|
|
|
import akka.event.EventHandler
|
2011-08-31 15:07:18 +02:00
|
|
|
import akka.config.ConfigurationException
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.actor.UntypedChannel._
|
2011-08-27 08:10:25 +03:00
|
|
|
import akka.dispatch.{ Future, Futures }
|
|
|
|
|
import akka.util.ReflectiveAccess
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
import java.net.InetSocketAddress
|
2011-08-31 15:07:18 +02:00
|
|
|
import java.lang.reflect.InvocationTargetException
|
2011-08-30 14:31:59 +02:00
|
|
|
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
|
2011-08-27 08:10:25 +03:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
import scala.annotation.tailrec
|
2011-08-27 08:10:25 +03:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
/**
|
|
|
|
|
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
|
2011-08-30 14:31:59 +02:00
|
|
|
* {@link FailureDetector} and each Router should be linked to only one {@link FailureDetector}.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
trait Router {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for
|
|
|
|
|
* the current connections, signal that there were problems with one of the connections and see if there have
|
|
|
|
|
* been changes in the connections.
|
|
|
|
|
*
|
|
|
|
|
* This method is not threadsafe, and should only be called once
|
|
|
|
|
*
|
|
|
|
|
* JMM Guarantees:
|
2011-08-12 10:30:26 +03:00
|
|
|
* This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
2011-08-30 14:31:59 +02:00
|
|
|
def init(connections: FailureDetector)
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routes the message to one of the connections.
|
|
|
|
|
*
|
|
|
|
|
* @throws RoutingException if something goes wrong while routing the message
|
|
|
|
|
*/
|
2011-08-29 15:50:40 +02:00
|
|
|
def route(message: Any)(implicit sender: Option[ActorRef])
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
|
|
|
|
|
* completion of the processing of the message.
|
|
|
|
|
*
|
|
|
|
|
* @throws RoutingExceptionif something goes wrong while routing the message.
|
|
|
|
|
*/
|
|
|
|
|
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
/**
|
|
|
|
|
* An Iterable that also contains a version.
|
|
|
|
|
*/
|
2011-08-30 18:42:35 +02:00
|
|
|
trait VersionedIterable[A] {
|
|
|
|
|
val version: Long
|
|
|
|
|
|
|
|
|
|
def iterable: Iterable[A]
|
|
|
|
|
|
|
|
|
|
def apply(): Iterable[A] = iterable
|
2011-08-30 14:31:59 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* An {@link AkkaException} thrown when something goes wrong while routing a message
|
|
|
|
|
*/
|
|
|
|
|
class RoutingException(message: String) extends AkkaException(message)
|
|
|
|
|
|
2011-08-31 15:07:18 +02:00
|
|
|
/**
|
|
|
|
|
* Misc helper and factory methods for failure detection.
|
|
|
|
|
*/
|
|
|
|
|
object FailureDetector {
|
|
|
|
|
|
|
|
|
|
def createCustomFailureDetector(implClass: String, connections: Map[InetSocketAddress, ActorRef]): FailureDetector = {
|
|
|
|
|
ReflectiveAccess.createInstance(implClass, Array[Class[_]](classOf[Map[InetSocketAddress, ActorRef]]), Array[AnyRef](connections)) match {
|
|
|
|
|
case Right(actor) ⇒ actor
|
|
|
|
|
case Left(exception) ⇒
|
|
|
|
|
val cause = exception match {
|
|
|
|
|
case i: InvocationTargetException ⇒ i.getTargetException
|
|
|
|
|
case _ ⇒ exception
|
|
|
|
|
}
|
|
|
|
|
throw new ConfigurationException("Could not instantiate custom FailureDetector of [" + implClass + "] due to: " + cause, cause)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-08-27 08:10:25 +03:00
|
|
|
/**
|
2011-08-30 14:31:59 +02:00
|
|
|
* The FailureDetector acts like a middleman between the Router and the actor reference that does the routing
|
|
|
|
|
* and can dectect and act upon failur.
|
|
|
|
|
*
|
|
|
|
|
* Through the FailureDetector:
|
2011-08-27 08:10:25 +03:00
|
|
|
* <ol>
|
|
|
|
|
* <li>
|
|
|
|
|
* the actor ref can signal that something has changed in the known set of connections. The Router can see
|
|
|
|
|
* when a changed happened (by checking the version) and update its internal datastructures.
|
|
|
|
|
* </li>
|
|
|
|
|
* <li>
|
|
|
|
|
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
|
|
|
|
|
* </li>
|
|
|
|
|
* </ol>
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
2011-08-30 14:31:59 +02:00
|
|
|
trait FailureDetector {
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
|
|
|
|
|
* able to update its internal datastructures.
|
|
|
|
|
*/
|
|
|
|
|
def version: Long
|
|
|
|
|
|
2011-08-18 11:35:14 +02:00
|
|
|
/**
|
2011-08-29 09:22:14 +03:00
|
|
|
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
|
|
|
|
|
* with an atomic read of and size and version.
|
2011-08-18 11:35:14 +02:00
|
|
|
*/
|
|
|
|
|
def size: Int
|
|
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
/**
|
|
|
|
|
* Stops all managed actors
|
|
|
|
|
*/
|
|
|
|
|
def stopAll()
|
|
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
|
|
|
|
|
* the time element, also the version is included to be able to read the data (the connections) and the version
|
|
|
|
|
* in an atomic manner.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
2011-08-27 08:10:25 +03:00
|
|
|
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
|
|
|
|
|
* view of some set of connections.
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
2011-08-27 08:10:25 +03:00
|
|
|
def versionedIterable: VersionedIterable[ActorRef]
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A callback that can be used to indicate that a connected actorRef was dead.
|
|
|
|
|
* <p/>
|
|
|
|
|
* Implementations should make sure that this method can be called without the actorRef being part of the
|
|
|
|
|
* current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the
|
|
|
|
|
* reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that
|
|
|
|
|
* not working.
|
|
|
|
|
*
|
2011-08-29 11:44:33 +02:00
|
|
|
* It could be that even after a remove has been called for a specific ActorRef, that the ActorRef
|
2011-08-12 10:03:33 +03:00
|
|
|
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
|
2011-08-29 09:22:14 +03:00
|
|
|
* made how long this takes.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
|
|
|
|
* @param ref the dead
|
|
|
|
|
*/
|
2011-08-29 15:50:40 +02:00
|
|
|
def remove(deadRef: ActorRef)
|
2011-08-30 14:31:59 +02:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Fails over connections from one address to another.
|
|
|
|
|
*/
|
|
|
|
|
def failOver(from: InetSocketAddress, to: InetSocketAddress)
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
/**
|
2011-08-30 14:31:59 +02:00
|
|
|
* Default "local" failure detector. This failure detector removes an actor from the
|
|
|
|
|
* router if an exception occured in the router's thread (e.g. when trying to add
|
|
|
|
|
* the message to the receiver's mailbox).
|
2011-08-27 08:10:25 +03:00
|
|
|
*/
|
2011-08-31 15:07:18 +02:00
|
|
|
class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector {
|
2011-08-30 14:31:59 +02:00
|
|
|
|
2011-08-30 18:42:35 +02:00
|
|
|
case class State(val version: Long, val iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
|
|
|
|
|
|
|
|
|
|
private val state = new AtomicReference[State]
|
2011-08-30 14:31:59 +02:00
|
|
|
|
|
|
|
|
def this(connectionIterable: Iterable[ActorRef]) = {
|
|
|
|
|
this()
|
2011-08-30 18:42:35 +02:00
|
|
|
state.set(State(Long.MinValue, connectionIterable))
|
2011-08-30 14:31:59 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def version: Long = state.get.version
|
|
|
|
|
|
|
|
|
|
def size: Int = state.get.iterable.size
|
|
|
|
|
|
|
|
|
|
def versionedIterable = state.get
|
|
|
|
|
|
|
|
|
|
def stopAll() {
|
|
|
|
|
state.get.iterable foreach (_.stop())
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here
|
|
|
|
|
|
|
|
|
|
@tailrec
|
|
|
|
|
final def remove(ref: ActorRef) = {
|
|
|
|
|
val oldState = state.get
|
|
|
|
|
|
|
|
|
|
//remote the ref from the connections.
|
|
|
|
|
var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref)
|
|
|
|
|
|
|
|
|
|
if (newList.size != oldState.iterable.size) {
|
|
|
|
|
//one or more occurrences of the actorRef were removed, so we need to update the state.
|
|
|
|
|
|
2011-08-30 18:42:35 +02:00
|
|
|
val newState = State(oldState.version + 1, newList)
|
2011-08-30 14:31:59 +02:00
|
|
|
//if we are not able to update the state, we just try again.
|
|
|
|
|
if (!state.compareAndSet(oldState, newState)) remove(ref)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-08-27 08:10:25 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A Helper class to create actor references that use routing.
|
|
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
object Routing {
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
sealed trait RoutingMessage
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
case class Broadcast(message: Any) extends RoutingMessage
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
/**
|
2011-08-29 15:50:40 +02:00
|
|
|
* FIXME: will very likely be moved to the ActorRef.
|
2011-08-27 08:10:25 +03:00
|
|
|
*/
|
|
|
|
|
def actorOf(props: RoutedProps): ActorRef = {
|
|
|
|
|
//TODO Implement support for configuring by deployment ID etc
|
|
|
|
|
//TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor
|
|
|
|
|
//TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?)
|
|
|
|
|
//TODO If the actor deployed uses a different config, then ignore or throw exception?
|
|
|
|
|
|
|
|
|
|
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
|
|
|
|
|
val localOnly = props.localOnly
|
|
|
|
|
|
|
|
|
|
if (!localOnly && !clusteringEnabled)
|
|
|
|
|
throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled")
|
2011-08-30 18:42:35 +02:00
|
|
|
|
|
|
|
|
else if (clusteringEnabled && !props.localOnly)
|
2011-08-29 09:22:14 +03:00
|
|
|
ReflectiveAccess.ClusterModule.newClusteredActorRef(props).start()
|
2011-08-30 18:42:35 +02:00
|
|
|
|
|
|
|
|
else {
|
2011-08-29 09:22:14 +03:00
|
|
|
if (props.connections.isEmpty)
|
|
|
|
|
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
|
|
|
|
|
|
|
|
|
|
new RoutedActorRef(props).start()
|
2011-08-27 08:10:25 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
/**
|
2011-07-28 15:48:03 +03:00
|
|
|
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
|
|
|
|
|
*
|
|
|
|
|
* @param actorAddress the address of the ActorRef.
|
|
|
|
|
* @param connections an Iterable pointing to all connected actor references.
|
|
|
|
|
* @param routerType the type of routing that should be used.
|
|
|
|
|
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
|
|
|
|
|
* how many connections it can handle.
|
|
|
|
|
*/
|
2011-08-30 14:31:59 +02:00
|
|
|
@deprecated("Use 'Routing.actorOf(props: RoutedProps)' instead.", "2.0")
|
2011-08-01 09:01:15 +03:00
|
|
|
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
|
2011-08-26 17:25:18 +02:00
|
|
|
val router = routerType match {
|
|
|
|
|
case RouterType.Direct if connections.size > 1 ⇒
|
|
|
|
|
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
|
2011-08-30 18:42:35 +02:00
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
case RouterType.Direct ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
new DirectRouter
|
2011-08-30 18:42:35 +02:00
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
case RouterType.Random ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
new RandomRouter
|
2011-08-30 18:42:35 +02:00
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
case RouterType.RoundRobin ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
new RoundRobinRouter
|
2011-08-30 18:42:35 +02:00
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
case r ⇒
|
|
|
|
|
throw new IllegalArgumentException("Unsupported routerType " + r)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-17 10:21:27 +03:00
|
|
|
if (connections.size == 0)
|
|
|
|
|
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
new RoutedActorRef(
|
2011-08-30 14:31:59 +02:00
|
|
|
new RoutedProps(
|
|
|
|
|
() ⇒ router,
|
|
|
|
|
RoutedProps.defaultFailureDetectorFactory,
|
|
|
|
|
actorAddress,
|
|
|
|
|
connections,
|
|
|
|
|
RoutedProps.defaultTimeout, true)).start()
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* An Abstract convenience implementation for building an ActorReference that uses a Router.
|
2011-05-24 19:04:25 +02:00
|
|
|
*/
|
2011-08-27 08:10:25 +03:00
|
|
|
abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef {
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
val router = props.routerFactory()
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
def address = props.deployId
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
override def postMessageToMailbox(message: Any, channel: UntypedChannel) = {
|
2011-07-28 15:48:03 +03:00
|
|
|
val sender = channel match {
|
|
|
|
|
case ref: ActorRef ⇒ Some(ref)
|
2011-07-28 16:56:35 +03:00
|
|
|
case _ ⇒ None
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
router.route(message)(sender)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-18 11:35:14 +02:00
|
|
|
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
|
|
|
|
message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = {
|
2011-07-28 15:48:03 +03:00
|
|
|
val sender = channel match {
|
|
|
|
|
case ref: ActorRef ⇒ Some(ref)
|
2011-07-28 16:56:35 +03:00
|
|
|
case _ ⇒ None
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
router.route[Any](message, timeout)(sender)
|
2011-05-24 19:04:25 +02:00
|
|
|
}
|
2011-08-27 08:10:25 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
|
|
|
|
* on (or more) of these actors.
|
|
|
|
|
*/
|
2011-08-30 18:42:35 +02:00
|
|
|
private[akka] class RoutedActorRef(val routedProps: RoutedProps) extends AbstractRoutedActorRef(routedProps) {
|
2011-08-27 08:10:25 +03:00
|
|
|
|
2011-08-31 15:07:18 +02:00
|
|
|
router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections))
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
def start(): this.type = synchronized[this.type] {
|
2011-08-26 17:25:18 +02:00
|
|
|
if (_status == ActorRefInternals.UNSTARTED)
|
|
|
|
|
_status = ActorRefInternals.RUNNING
|
2011-07-28 15:48:03 +03:00
|
|
|
this
|
|
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
def stop() {
|
|
|
|
|
synchronized {
|
|
|
|
|
if (_status == ActorRefInternals.RUNNING) {
|
|
|
|
|
_status = ActorRefInternals.SHUTDOWN
|
|
|
|
|
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2010-03-04 19:02:23 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
/**
|
|
|
|
|
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
|
|
|
|
|
* Router only needs to implement the next method.
|
|
|
|
|
*
|
2011-08-12 14:35:45 +02:00
|
|
|
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
|
|
|
|
* FIXME: this is also the location where message buffering should be done in case of failure.
|
2011-07-28 15:48:03 +03:00
|
|
|
*/
|
|
|
|
|
trait BasicRouter extends Router {
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@volatile
|
2011-08-30 14:31:59 +02:00
|
|
|
protected var connections: FailureDetector = _
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-30 14:31:59 +02:00
|
|
|
def init(connections: FailureDetector) = {
|
2011-08-12 10:03:33 +03:00
|
|
|
this.connections = connections
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
def route(message: Any)(implicit sender: Option[ActorRef]) = message match {
|
2011-07-28 15:48:03 +03:00
|
|
|
case Routing.Broadcast(message) ⇒
|
|
|
|
|
//it is a broadcast message, we are going to send to message to all connections.
|
2011-08-27 08:10:25 +03:00
|
|
|
connections.versionedIterable.iterable.foreach(actor ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
|
|
|
|
actor.!(message)(sender)
|
|
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
2011-07-28 16:56:35 +03:00
|
|
|
})
|
2011-07-28 15:48:03 +03:00
|
|
|
case _ ⇒
|
|
|
|
|
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
|
|
|
next match {
|
2011-07-28 16:56:35 +03:00
|
|
|
case Some(actor) ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
|
|
|
|
actor.!(message)(sender)
|
|
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
|
|
|
|
}
|
2011-07-28 16:56:35 +03:00
|
|
|
case None ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
throwNoConnectionsError
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
|
2011-07-28 15:48:03 +03:00
|
|
|
case Routing.Broadcast(message) ⇒
|
2011-08-18 11:35:14 +02:00
|
|
|
throw new RoutingException("Broadcasting using '?' is for the time being is not supported. Use ScatterGatherRouter.")
|
2011-07-28 15:48:03 +03:00
|
|
|
case _ ⇒
|
|
|
|
|
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
|
|
|
next match {
|
2011-07-28 16:56:35 +03:00
|
|
|
case Some(actor) ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
|
|
|
|
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
|
|
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
|
|
|
|
}
|
2011-07-28 16:56:35 +03:00
|
|
|
case None ⇒
|
2011-08-29 15:50:40 +02:00
|
|
|
throwNoConnectionsError
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def next: Option[ActorRef]
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private def throwNoConnectionsError = {
|
2011-07-28 15:48:03 +03:00
|
|
|
val error = new RoutingException("No replica connections for router")
|
|
|
|
|
EventHandler.error(error, this, error.toString)
|
|
|
|
|
throw error
|
|
|
|
|
}
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-08-29 09:22:14 +03:00
|
|
|
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
|
|
|
|
|
*
|
2011-07-28 15:48:03 +03:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class DirectRouter extends BasicRouter {
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private val state = new AtomicReference[DirectRouterState]
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
lazy val next: Option[ActorRef] = {
|
2011-08-29 15:50:40 +02:00
|
|
|
val currentState = getState
|
2011-08-12 10:03:33 +03:00
|
|
|
if (currentState.ref == null) None else Some(currentState.ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
2011-08-29 15:50:40 +02:00
|
|
|
private def getState: DirectRouterState = {
|
|
|
|
|
val currentState = state.get
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
if (currentState != null && connections.version == currentState.version) {
|
|
|
|
|
//we are lucky since nothing has changed in the connections.
|
|
|
|
|
currentState
|
|
|
|
|
} else {
|
|
|
|
|
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
|
|
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val versionedIterable = connections.versionedIterable
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val connectionCount = versionedIterable.iterable.size
|
|
|
|
|
if (connectionCount > 1)
|
|
|
|
|
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version)
|
2011-08-29 09:22:14 +03:00
|
|
|
if (state.compareAndSet(currentState, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-08-29 15:50:40 +02:00
|
|
|
getState
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-30 18:42:35 +02:00
|
|
|
private case class DirectRouterState(ref: ActorRef, version: Long)
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-07-28 15:48:03 +03:00
|
|
|
* A Router that randomly selects one of the target connections to send a message to.
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class RandomRouter extends BasicRouter {
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private val state = new AtomicReference[RandomRouterState]
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-08-12 14:35:45 +02:00
|
|
|
//FIXME: threadlocal random?
|
2011-08-29 15:50:40 +02:00
|
|
|
private val random = new java.util.Random(System.nanoTime)
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
def next: Option[ActorRef] = getState.array match {
|
2011-08-26 17:25:18 +02:00
|
|
|
case a if a.isEmpty ⇒ None
|
|
|
|
|
case a ⇒ Some(a(random.nextInt(a.length)))
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
2011-08-29 15:50:40 +02:00
|
|
|
private def getState: RandomRouterState = {
|
|
|
|
|
val currentState = state.get
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
if (currentState != null && currentState.version == connections.version) {
|
|
|
|
|
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
|
|
|
|
currentState
|
|
|
|
|
} else {
|
|
|
|
|
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val versionedIterable = connections.versionedIterable
|
2011-08-29 09:22:14 +03:00
|
|
|
val newState = new RandomRouterState(versionedIterable.iterable.toIndexedSeq, versionedIterable.version)
|
|
|
|
|
if (state.compareAndSet(currentState, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-08-29 15:50:40 +02:00
|
|
|
getState
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-12 10:03:33 +03:00
|
|
|
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
|
2011-07-28 15:48:03 +03:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class RoundRobinRouter extends BasicRouter {
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
private val state = new AtomicReference[RoundRobinState]
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
def next: Option[ActorRef] = getState.next
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@tailrec
|
2011-08-29 15:50:40 +02:00
|
|
|
private def getState: RoundRobinState = {
|
|
|
|
|
val currentState = state.get
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
if (currentState != null && currentState.version == connections.version) {
|
|
|
|
|
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
|
|
|
|
currentState
|
|
|
|
|
} else {
|
|
|
|
|
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val versionedIterable = connections.versionedIterable
|
2011-08-29 09:22:14 +03:00
|
|
|
val newState = new RoundRobinState(versionedIterable.iterable.toIndexedSeq[ActorRef], versionedIterable.version)
|
|
|
|
|
if (state.compareAndSet(currentState, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-08-29 15:50:40 +02:00
|
|
|
getState
|
2011-05-21 15:37:09 +02:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
private val index = new AtomicInteger(0)
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
def next: Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex))
|
2011-05-21 15:37:09 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@tailrec
|
2011-08-29 15:50:40 +02:00
|
|
|
private def nextIndex: Int = {
|
|
|
|
|
val oldIndex = index.get
|
2011-08-12 10:03:33 +03:00
|
|
|
var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
if (!index.compareAndSet(oldIndex, newIndex)) nextIndex
|
2011-08-12 10:03:33 +03:00
|
|
|
else oldIndex
|
|
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-08-17 10:21:27 +03:00
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
/**
|
2011-08-17 10:21:27 +03:00
|
|
|
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
|
|
|
|
|
* specified strategy (specific router needs to implement `gather` method).
|
2011-08-26 08:24:26 +02:00
|
|
|
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
2011-08-17 10:21:27 +03:00
|
|
|
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
|
2011-08-26 08:24:26 +02:00
|
|
|
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
|
|
|
|
|
*
|
2011-08-17 10:21:27 +03:00
|
|
|
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
|
|
|
|
* FIXME: this is also the location where message buffering should be done in case of failure.
|
|
|
|
|
*/
|
|
|
|
|
trait ScatterGatherRouter extends BasicRouter with Serializable {
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
/**
|
|
|
|
|
* Aggregates the responses into a single Future
|
|
|
|
|
* @param results Futures of the responses from connections
|
|
|
|
|
*/
|
2011-08-17 10:21:27 +03:00
|
|
|
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
|
|
|
|
|
|
|
|
|
|
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
|
2011-08-27 08:10:25 +03:00
|
|
|
val responses = connections.versionedIterable.iterable.flatMap { actor ⇒
|
2011-08-17 10:21:27 +03:00
|
|
|
try {
|
|
|
|
|
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-08-17 10:21:27 +03:00
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
if (responses.isEmpty)
|
2011-08-17 10:21:27 +03:00
|
|
|
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
|
2011-08-29 15:50:40 +02:00
|
|
|
else gather(responses)
|
2011-08-17 10:21:27 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
|
|
|
|
|
case Routing.Broadcast(message) ⇒ scatterGather(message, timeout)
|
|
|
|
|
case message ⇒ super.route(message, timeout)(sender)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-29 15:50:40 +02:00
|
|
|
/**
|
2011-08-17 10:21:27 +03:00
|
|
|
* Simple router that broadcasts the message to all connections, and replies with the first response
|
2011-08-26 08:24:26 +02:00
|
|
|
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
2011-08-17 10:21:27 +03:00
|
|
|
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
|
2011-08-26 08:24:26 +02:00
|
|
|
* mode, the router would behave as {@link RoundRobinRouter}
|
2011-08-17 10:21:27 +03:00
|
|
|
*/
|
|
|
|
|
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
|
|
|
|
|
|
|
|
|
|
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results)
|
|
|
|
|
}
|