Merge ClusterActoRef & RoutedActorRef: After merge with master
This commit is contained in:
commit
56d4fc7d7c
14 changed files with 398 additions and 163 deletions
|
|
@ -7,18 +7,14 @@ package akka.routing
|
|||
import annotation.tailrec
|
||||
|
||||
import akka.AkkaException
|
||||
import akka.dispatch.Future
|
||||
import akka.actor._
|
||||
import akka.dispatch.Futures
|
||||
import akka.event.EventHandler
|
||||
import akka.actor.UntypedChannel._
|
||||
|
||||
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
|
||||
|
||||
/**
|
||||
* An {@link AkkaException} thrown when something goes wrong while routing a message
|
||||
*/
|
||||
class RoutingException(message: String) extends AkkaException(message)
|
||||
import akka.dispatch.{ Future, Futures }
|
||||
import akka.util.ReflectiveAccess
|
||||
import collection.JavaConversions.iterableAsScalaIterable
|
||||
|
||||
sealed trait RouterType
|
||||
|
||||
|
|
@ -65,6 +61,116 @@ object RouterType {
|
|||
|
||||
}
|
||||
|
||||
object RoutedProps {
|
||||
|
||||
final val defaultTimeout = Actor.TIMEOUT
|
||||
final val defaultRouterFactory = () ⇒ new RoundRobinRouter
|
||||
final val defaultDeployId = ""
|
||||
final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled
|
||||
|
||||
/**
|
||||
* The default RoutedProps instance, uses the settings from the RoutedProps object starting with default*
|
||||
*/
|
||||
final val default = new RoutedProps()
|
||||
|
||||
def apply(): RoutedProps = default
|
||||
}
|
||||
|
||||
/**
|
||||
* Contains the configuration to create local and clustered routed actor references.
|
||||
*
|
||||
* Routed ActorRef configuration object, this is thread safe and fully sharable.
|
||||
*
|
||||
* Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing
|
||||
* (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns
|
||||
* a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple
|
||||
* threads safe.
|
||||
*
|
||||
* This configuration object makes it possible to either
|
||||
*/
|
||||
case class RoutedProps(routerFactory: () ⇒ Router, deployId: String, connections: Iterable[ActorRef], timeout: Timeout, localOnly: Boolean) {
|
||||
|
||||
def this() = this(
|
||||
routerFactory = RoutedProps.defaultRouterFactory,
|
||||
deployId = RoutedProps.defaultDeployId,
|
||||
connections = List(),
|
||||
timeout = RoutedProps.defaultTimeout,
|
||||
localOnly = RoutedProps.defaultLocalOnly)
|
||||
|
||||
/**
|
||||
* Returns a new RoutedProps with the specified deployId set
|
||||
*
|
||||
* Java and Scala API
|
||||
*/
|
||||
def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id)
|
||||
|
||||
/**
|
||||
* Returns a new RoutedProps configured with a random router.
|
||||
*
|
||||
* Java and Scala API.
|
||||
*/
|
||||
def withRandomRouter(): RoutedProps = copy(routerFactory = () ⇒ new RandomRouter())
|
||||
|
||||
/**
|
||||
* Returns a new RoutedProps configured with a round robin router.
|
||||
*
|
||||
* Java and Scala API.
|
||||
*/
|
||||
def withRoundRobinRouter(): RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter())
|
||||
|
||||
/**
|
||||
* Returns a new RoutedProps configured with a direct router.
|
||||
*
|
||||
* Java and Scala API.
|
||||
*/
|
||||
def withDirectRouter(): RoutedProps = copy(routerFactory = () ⇒ new DirectRouter())
|
||||
|
||||
/**
|
||||
* Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created.
|
||||
* In some cases you just want to have local actor references, even though the Cluster Module is up and running.
|
||||
*
|
||||
* Java and Scala API.
|
||||
*/
|
||||
def withLocalOnly(l: Boolean = true) = copy(localOnly = l)
|
||||
|
||||
/**
|
||||
* Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new
|
||||
* Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new
|
||||
* instances.
|
||||
*
|
||||
* Scala API.
|
||||
*/
|
||||
def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f)
|
||||
|
||||
/**
|
||||
* Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new
|
||||
* Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new
|
||||
* instances.
|
||||
*
|
||||
* Java API.
|
||||
*/
|
||||
def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter())
|
||||
|
||||
/**
|
||||
*
|
||||
*/
|
||||
def withTimeout(t: Timeout): RoutedProps = copy(timeout = t)
|
||||
|
||||
/**
|
||||
* Sets the connections to use.
|
||||
*
|
||||
* Scala API.
|
||||
*/
|
||||
def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c)
|
||||
|
||||
/**
|
||||
* Sets the connections to use.
|
||||
*
|
||||
* Java API.
|
||||
*/
|
||||
def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c))
|
||||
}
|
||||
|
||||
/**
|
||||
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
|
||||
* {@link RouterConnections} and each Router should be linked to only one {@link RouterConnections}.
|
||||
|
|
@ -102,7 +208,24 @@ trait Router {
|
|||
}
|
||||
|
||||
/**
|
||||
* An {@link AkkaException} thrown when something goes wrong while routing a message
|
||||
*/
|
||||
class RoutingException(message: String) extends AkkaException(message)
|
||||
|
||||
/**
|
||||
* The RouterConnection acts like a middleman between the Router and the actor reference that does the routing.
|
||||
* Through the RouterConnection:
|
||||
* <ol>
|
||||
* <li>
|
||||
* the actor ref can signal that something has changed in the known set of connections. The Router can see
|
||||
* when a changed happened (by checking the version) and update its internal datastructures.
|
||||
* </li>
|
||||
* <li>
|
||||
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
|
||||
* </li>
|
||||
* </ol>
|
||||
*
|
||||
* It is very likely that the implementation of the RouterConnection will be part of the ActorRef itself.
|
||||
*/
|
||||
trait RouterConnections {
|
||||
|
||||
|
|
@ -113,17 +236,20 @@ trait RouterConnections {
|
|||
def version: Long
|
||||
|
||||
/**
|
||||
* Returns the number of connections.
|
||||
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
|
||||
* with an atomic read of and size and version.
|
||||
*/
|
||||
def size: Int
|
||||
|
||||
/**
|
||||
* Returns a tuple containing the version and Iterable of all connected ActorRefs this Router uses to send messages to.
|
||||
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
|
||||
* the time element, also the version is included to be able to read the data (the connections) and the version
|
||||
* in an atomic manner.
|
||||
*
|
||||
* This iterator should be 'persistent'. So it can be handed out to other threads so that they are working on
|
||||
* a stable (immutable) view of some set of connections.
|
||||
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
|
||||
* view of some set of connections.
|
||||
*/
|
||||
def versionedIterator: (Long, Iterable[ActorRef])
|
||||
def versionedIterable: VersionedIterable[ActorRef]
|
||||
|
||||
/**
|
||||
* A callback that can be used to indicate that a connected actorRef was dead.
|
||||
|
|
@ -135,19 +261,51 @@ trait RouterConnections {
|
|||
*
|
||||
* It could be that even after a signalDeadActor has been called for a specific ActorRef, that the ActorRef
|
||||
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
|
||||
* made how long this takes place.
|
||||
* made how long this takes.
|
||||
*
|
||||
* @param ref the dead
|
||||
*/
|
||||
def signalDeadActor(deadRef: ActorRef): Unit
|
||||
}
|
||||
|
||||
/**
|
||||
* An Iterable that also contains a version.
|
||||
*/
|
||||
case class VersionedIterable[A](version: Long, val iterable: Iterable[A])
|
||||
|
||||
/**
|
||||
* A Helper class to create actor references that use routing.
|
||||
*/
|
||||
object Routing {
|
||||
|
||||
sealed trait RoutingMessage
|
||||
|
||||
case class Broadcast(message: Any) extends RoutingMessage
|
||||
|
||||
/**
|
||||
* todo: will very likely be moved to the ActorRef.
|
||||
*/
|
||||
def actorOf(props: RoutedProps): ActorRef = {
|
||||
//TODO Implement support for configuring by deployment ID etc
|
||||
//TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor
|
||||
//TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?)
|
||||
//TODO If the actor deployed uses a different config, then ignore or throw exception?
|
||||
|
||||
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
|
||||
val localOnly = props.localOnly
|
||||
|
||||
if (!localOnly && !clusteringEnabled)
|
||||
throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled")
|
||||
else if (clusteringEnabled && !props.localOnly) {
|
||||
ReflectiveAccess.ClusterModule.newClusteredActorRef(props).start()
|
||||
} else {
|
||||
if (props.connections.isEmpty)
|
||||
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
|
||||
|
||||
new RoutedActorRef(props).start()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
|
||||
*
|
||||
|
|
@ -157,6 +315,7 @@ object Routing {
|
|||
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
|
||||
* how many connections it can handle.
|
||||
*/
|
||||
@deprecated
|
||||
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
|
||||
val router = routerType match {
|
||||
case RouterType.Direct if connections.size > 1 ⇒
|
||||
|
|
@ -171,26 +330,22 @@ object Routing {
|
|||
throw new IllegalArgumentException("Unsupported routerType " + r)
|
||||
}
|
||||
|
||||
actorOf(actorAddress, connections, router).start()
|
||||
}
|
||||
|
||||
def actorOf(actorAddress: String, connections: Iterable[ActorRef], router: Router): ActorRef =
|
||||
if (connections.isEmpty)
|
||||
if (connections.size == 0)
|
||||
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
|
||||
else
|
||||
new RoutedActorRef(actorAddress, router, connections)
|
||||
|
||||
def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef =
|
||||
actorOf(actorAddress, connections, akka.routing.RouterType.RoundRobin)
|
||||
val props = new RoutedProps(() ⇒ router, actorAddress, connections, RoutedProps.defaultTimeout, true)
|
||||
new RoutedActorRef(props).start()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
||||
* on (or more) of these actors.
|
||||
* An Abstract convenience implementation for building an ActorReference that uses a Router.
|
||||
*/
|
||||
class RoutedActorRef(val address: String, val router: Router, val connectionIterator: Iterable[ActorRef]) extends UnsupportedActorRef {
|
||||
abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef {
|
||||
|
||||
router.init(new RoutedActorRefConnections(connectionIterator))
|
||||
val router = props.routerFactory.apply()
|
||||
|
||||
def address = props.deployId
|
||||
|
||||
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
|
||||
val sender = channel match {
|
||||
|
|
@ -208,6 +363,16 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
|
|||
}
|
||||
router.route[Any](message, timeout)(sender)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
||||
* on (or more) of these actors.
|
||||
*/
|
||||
private[akka] class RoutedActorRef(val routedProps: RoutedProps)
|
||||
extends AbstractRoutedActorRef(routedProps) {
|
||||
|
||||
router.init(new RoutedActorRefConnections(routedProps.connections))
|
||||
|
||||
def start(): this.type = synchronized[this.type] {
|
||||
if (_status == ActorRefInternals.UNSTARTED)
|
||||
|
|
@ -220,49 +385,40 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
|
|||
if (_status == ActorRefInternals.RUNNING) {
|
||||
_status = ActorRefInternals.SHUTDOWN
|
||||
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
|
||||
|
||||
// FIXME here we need to fire off Actor.cluster.remove(address) (which needs to be properly implemented first, see ticket)
|
||||
|
||||
//inetSocketAddressToActorRefMap.get.values foreach (_.stop()) // shut down all remote connections
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class RoutedActorRefConnections() extends RouterConnections {
|
||||
|
||||
private val state = new AtomicReference[State]()
|
||||
private val state = new AtomicReference[VersionedIterable[ActorRef]]()
|
||||
|
||||
def this(connectionIterable: Iterable[ActorRef]) = {
|
||||
this()
|
||||
state.set(new State(Long.MinValue, connectionIterable))
|
||||
state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable))
|
||||
}
|
||||
|
||||
def version: Long = state.get().version
|
||||
|
||||
def size: Int = state.get().connections.size
|
||||
def size: Int = state.get().iterable.size
|
||||
|
||||
def versionedIterator = {
|
||||
val s = state.get
|
||||
(s.version, s.connections)
|
||||
}
|
||||
def versionedIterable = state.get
|
||||
|
||||
@tailrec
|
||||
final def signalDeadActor(ref: ActorRef) = {
|
||||
val oldState = state.get()
|
||||
|
||||
//remote the ref from the connections.
|
||||
var newList = oldState.connections.filter(currentActorRef ⇒ currentActorRef ne ref)
|
||||
var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref)
|
||||
|
||||
if (newList.size != oldState.connections.size) {
|
||||
if (newList.size != oldState.iterable.size) {
|
||||
//one or more occurrences of the actorRef were removed, so we need to update the state.
|
||||
|
||||
val newState = new State(oldState.version + 1, newList)
|
||||
val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList)
|
||||
//if we are not able to update the state, we just try again.
|
||||
if (!state.compareAndSet(oldState, newState)) signalDeadActor(ref)
|
||||
}
|
||||
}
|
||||
|
||||
case class State(val version: Long, val connections: Iterable[ActorRef])
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -285,7 +441,7 @@ trait BasicRouter extends Router {
|
|||
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match {
|
||||
case Routing.Broadcast(message) ⇒
|
||||
//it is a broadcast message, we are going to send to message to all connections.
|
||||
connections.versionedIterator._2.foreach(actor ⇒
|
||||
connections.versionedIterable.iterable.foreach(actor ⇒
|
||||
try {
|
||||
actor.!(message)(sender)
|
||||
} catch {
|
||||
|
|
@ -338,7 +494,8 @@ trait BasicRouter extends Router {
|
|||
}
|
||||
|
||||
/**
|
||||
* A DirectRouter is FIXME
|
||||
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
|
||||
*
|
||||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -361,13 +518,15 @@ class DirectRouter extends BasicRouter {
|
|||
} else {
|
||||
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
|
||||
|
||||
val (version, connectionIterable) = connections.versionedIterator
|
||||
val versionedIterable = connections.versionedIterable
|
||||
|
||||
if (connectionIterable.size > 1)
|
||||
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionIterable.size))
|
||||
val connectionCount = versionedIterable.iterable.size
|
||||
if (connectionCount > 1)
|
||||
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
|
||||
|
||||
val newState = new DirectRouterState(connectionIterable.head, version)
|
||||
if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use
|
||||
val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version)
|
||||
if (state.compareAndSet(currentState, newState))
|
||||
//we are lucky since we just updated the state, so we can send it back as the state to use
|
||||
newState
|
||||
else //we failed to update the state, lets try again... better luck next time.
|
||||
getState()
|
||||
|
|
@ -404,10 +563,11 @@ class RandomRouter extends BasicRouter {
|
|||
currentState
|
||||
} else {
|
||||
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
||||
val (version, connectionIterable) = connections.versionedIterator
|
||||
val newState = new RandomRouterState(connectionIterable.toIndexedSeq, version)
|
||||
|
||||
if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use
|
||||
val versionedIterable = connections.versionedIterable
|
||||
val newState = new RandomRouterState(versionedIterable.iterable.toIndexedSeq, versionedIterable.version)
|
||||
if (state.compareAndSet(currentState, newState))
|
||||
//we are lucky since we just updated the state, so we can send it back as the state to use
|
||||
newState
|
||||
else //we failed to update the state, lets try again... better luck next time.
|
||||
getState()
|
||||
|
|
@ -437,10 +597,11 @@ class RoundRobinRouter extends BasicRouter {
|
|||
currentState
|
||||
} else {
|
||||
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
||||
val (version, connectionIterable) = connections.versionedIterator
|
||||
val newState = new RoundRobinState(connectionIterable.toIndexedSeq, version)
|
||||
|
||||
if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use
|
||||
val versionedIterable = connections.versionedIterable
|
||||
val newState = new RoundRobinState(versionedIterable.iterable.toIndexedSeq[ActorRef], versionedIterable.version)
|
||||
if (state.compareAndSet(currentState, newState))
|
||||
//we are lucky since we just updated the state, so we can send it back as the state to use
|
||||
newState
|
||||
else //we failed to update the state, lets try again... better luck next time.
|
||||
getState()
|
||||
|
|
@ -467,10 +628,10 @@ class RoundRobinRouter extends BasicRouter {
|
|||
/*
|
||||
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
|
||||
* specified strategy (specific router needs to implement `gather` method).
|
||||
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
||||
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
||||
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
|
||||
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
|
||||
*
|
||||
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
|
||||
*
|
||||
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
||||
* FIXME: this is also the location where message buffering should be done in case of failure.
|
||||
*/
|
||||
|
|
@ -483,7 +644,7 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
|
|||
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
|
||||
|
||||
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
|
||||
val responses = connections.versionedIterator._2.flatMap { actor ⇒
|
||||
val responses = connections.versionedIterable.iterable.flatMap { actor ⇒
|
||||
try {
|
||||
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
|
||||
} catch {
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue