2010-05-03 19:32:40 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2010-05-03 19:32:40 +02:00
|
|
|
*/
|
|
|
|
|
|
2010-10-26 12:49:25 +02:00
|
|
|
package akka.routing
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-05-21 15:37:09 +02:00
|
|
|
import annotation.tailrec
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-05-24 19:04:25 +02:00
|
|
|
import akka.AkkaException
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.actor._
|
|
|
|
|
import akka.event.EventHandler
|
|
|
|
|
import akka.actor.UntypedChannel._
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
|
2011-08-27 08:10:25 +03:00
|
|
|
import akka.dispatch.{ Future, Futures }
|
|
|
|
|
import akka.util.ReflectiveAccess
|
|
|
|
|
import collection.JavaConversions.iterableAsScalaIterable
|
2011-05-24 19:04:25 +02:00
|
|
|
|
|
|
|
|
sealed trait RouterType
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-12 14:35:45 +02:00
|
|
|
* Used for declarative configuration of Routing.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
2011-05-24 19:04:25 +02:00
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
object RouterType {
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-05-24 19:04:25 +02:00
|
|
|
object Direct extends RouterType
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RouterType that randomly selects a connection to send a message to.
|
|
|
|
|
*/
|
2011-05-24 19:04:25 +02:00
|
|
|
object Random extends RouterType
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RouterType that selects the connection by using round robin.
|
|
|
|
|
*/
|
2011-05-24 19:04:25 +02:00
|
|
|
object RoundRobin extends RouterType
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RouterType that selects the connection based on the least amount of cpu usage
|
|
|
|
|
*/
|
2011-05-24 19:04:25 +02:00
|
|
|
object LeastCPU extends RouterType
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RouterType that select the connection based on the least amount of ram used.
|
|
|
|
|
*
|
2011-08-12 14:35:45 +02:00
|
|
|
* FIXME: this is extremely vague currently since there are so many ways to define least amount of ram.
|
2011-07-28 15:48:03 +03:00
|
|
|
*/
|
2011-05-24 19:04:25 +02:00
|
|
|
object LeastRAM extends RouterType
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RouterType that select the connection where the actor has the least amount of messages in its mailbox.
|
|
|
|
|
*/
|
2011-05-24 19:04:25 +02:00
|
|
|
object LeastMessages extends RouterType
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-08-18 11:35:14 +02:00
|
|
|
/**
|
|
|
|
|
* A user-defined custom RouterType.
|
|
|
|
|
*/
|
|
|
|
|
object Custom extends RouterType
|
|
|
|
|
|
2011-05-24 19:04:25 +02:00
|
|
|
}
|
|
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
object RoutedProps {
|
|
|
|
|
|
|
|
|
|
final val defaultTimeout = Actor.TIMEOUT
|
|
|
|
|
final val defaultRouterFactory = () ⇒ new RoundRobinRouter
|
|
|
|
|
final val defaultDeployId = ""
|
|
|
|
|
final val defaultLocalOnly = !ReflectiveAccess.ClusterModule.isEnabled
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The default RoutedProps instance, uses the settings from the RoutedProps object starting with default*
|
|
|
|
|
*/
|
|
|
|
|
final val default = new RoutedProps()
|
|
|
|
|
|
|
|
|
|
def apply(): RoutedProps = default
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Contains the configuration to create local and clustered routed actor references.
|
|
|
|
|
*
|
|
|
|
|
* Routed ActorRef configuration object, this is thread safe and fully sharable.
|
|
|
|
|
*
|
|
|
|
|
* Because the Routers are stateful, a new Router instance needs to be created for every ActorRef that relies on routing
|
|
|
|
|
* (currently the ClusterActorRef and the RoutedActorRef). That is why a Router factory is used (a function that returns
|
|
|
|
|
* a new Router instance) instead of a single Router instance. This makes sharing the same RoutedProps between multiple
|
|
|
|
|
* threads safe.
|
|
|
|
|
*
|
|
|
|
|
* This configuration object makes it possible to either
|
|
|
|
|
*/
|
|
|
|
|
case class RoutedProps(routerFactory: () ⇒ Router, deployId: String, connections: Iterable[ActorRef], timeout: Timeout, localOnly: Boolean) {
|
|
|
|
|
|
|
|
|
|
def this() = this(
|
|
|
|
|
routerFactory = RoutedProps.defaultRouterFactory,
|
|
|
|
|
deployId = RoutedProps.defaultDeployId,
|
|
|
|
|
connections = List(),
|
|
|
|
|
timeout = RoutedProps.defaultTimeout,
|
|
|
|
|
localOnly = RoutedProps.defaultLocalOnly)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a new RoutedProps with the specified deployId set
|
|
|
|
|
*
|
|
|
|
|
* Java and Scala API
|
|
|
|
|
*/
|
|
|
|
|
def withDeployId(id: String): RoutedProps = copy(deployId = if (id eq null) "" else id)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a new RoutedProps configured with a random router.
|
|
|
|
|
*
|
|
|
|
|
* Java and Scala API.
|
|
|
|
|
*/
|
|
|
|
|
def withRandomRouter(): RoutedProps = copy(routerFactory = () ⇒ new RandomRouter())
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a new RoutedProps configured with a round robin router.
|
|
|
|
|
*
|
|
|
|
|
* Java and Scala API.
|
|
|
|
|
*/
|
|
|
|
|
def withRoundRobinRouter(): RoutedProps = copy(routerFactory = () ⇒ new RoundRobinRouter())
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Returns a new RoutedProps configured with a direct router.
|
|
|
|
|
*
|
|
|
|
|
* Java and Scala API.
|
|
|
|
|
*/
|
|
|
|
|
def withDirectRouter(): RoutedProps = copy(routerFactory = () ⇒ new DirectRouter())
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Makes it possible to change the default behavior in a clustered environment that a clustered actor ref is created.
|
|
|
|
|
* In some cases you just want to have local actor references, even though the Cluster Module is up and running.
|
|
|
|
|
*
|
|
|
|
|
* Java and Scala API.
|
|
|
|
|
*/
|
|
|
|
|
def withLocalOnly(l: Boolean = true) = copy(localOnly = l)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the Router factory method to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new
|
|
|
|
|
* Router instance is needed for every 'routed' ActorRef. That is why a 'factory' function is used to create new
|
|
|
|
|
* instances.
|
|
|
|
|
*
|
|
|
|
|
* Scala API.
|
|
|
|
|
*/
|
|
|
|
|
def withRouter(f: () ⇒ Router): RoutedProps = copy(routerFactory = f)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the RouterFactory to use. Since Router instance contain state, and should be linked to a single 'routed' ActorRef, a new
|
|
|
|
|
* Router instance is needed for every 'routed' ActorRef. That is why a RouterFactory interface is used to create new
|
|
|
|
|
* instances.
|
|
|
|
|
*
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
|
|
|
|
def withRouter(f: RouterFactory): RoutedProps = copy(routerFactory = () ⇒ f.newRouter())
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
*
|
|
|
|
|
*/
|
|
|
|
|
def withTimeout(t: Timeout): RoutedProps = copy(timeout = t)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the connections to use.
|
|
|
|
|
*
|
|
|
|
|
* Scala API.
|
|
|
|
|
*/
|
|
|
|
|
def withConnections(c: Iterable[ActorRef]): RoutedProps = copy(connections = c)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sets the connections to use.
|
|
|
|
|
*
|
|
|
|
|
* Java API.
|
|
|
|
|
*/
|
|
|
|
|
def withConnections(c: java.lang.Iterable[ActorRef]): RoutedProps = copy(connections = iterableAsScalaIterable(c))
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
/**
|
|
|
|
|
* The Router is responsible for sending a message to one (or more) of its connections. Connections are stored in the
|
|
|
|
|
* {@link RouterConnections} and each Router should be linked to only one {@link RouterConnections}.
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
|
|
|
|
*/
|
|
|
|
|
trait Router {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Initializes this Router with a given set of Connections. The Router can use this datastructure to ask for
|
|
|
|
|
* the current connections, signal that there were problems with one of the connections and see if there have
|
|
|
|
|
* been changes in the connections.
|
|
|
|
|
*
|
|
|
|
|
* This method is not threadsafe, and should only be called once
|
|
|
|
|
*
|
|
|
|
|
* JMM Guarantees:
|
2011-08-12 10:30:26 +03:00
|
|
|
* This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
|
|
|
|
def init(connections: RouterConnections): Unit
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routes the message to one of the connections.
|
|
|
|
|
*
|
|
|
|
|
* @throws RoutingException if something goes wrong while routing the message
|
|
|
|
|
*/
|
|
|
|
|
def route(message: Any)(implicit sender: Option[ActorRef]): Unit
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
|
|
|
|
|
* completion of the processing of the message.
|
|
|
|
|
*
|
|
|
|
|
* @throws RoutingExceptionif something goes wrong while routing the message.
|
|
|
|
|
*/
|
|
|
|
|
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* An {@link AkkaException} thrown when something goes wrong while routing a message
|
|
|
|
|
*/
|
|
|
|
|
class RoutingException(message: String) extends AkkaException(message)
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* The RouterConnection acts like a middleman between the Router and the actor reference that does the routing.
|
|
|
|
|
* Through the RouterConnection:
|
|
|
|
|
* <ol>
|
|
|
|
|
* <li>
|
|
|
|
|
* the actor ref can signal that something has changed in the known set of connections. The Router can see
|
|
|
|
|
* when a changed happened (by checking the version) and update its internal datastructures.
|
|
|
|
|
* </li>
|
|
|
|
|
* <li>
|
|
|
|
|
* the Router can indicate that some happened happened with a actor ref, e.g. the actor ref dying.
|
|
|
|
|
* </li>
|
|
|
|
|
* </ol>
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
2011-08-27 08:10:25 +03:00
|
|
|
* It is very likely that the implementation of the RouterConnection will be part of the ActorRef itself.
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
|
|
|
|
trait RouterConnections {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A version that is useful to see if there is any change in the connections. If there is a change, a router is
|
|
|
|
|
* able to update its internal datastructures.
|
|
|
|
|
*/
|
|
|
|
|
def version: Long
|
|
|
|
|
|
2011-08-18 11:35:14 +02:00
|
|
|
/**
|
2011-08-29 09:22:14 +03:00
|
|
|
* Returns the number of connections. Value could be stale as soon as received, and this method can't be combined (easily)
|
|
|
|
|
* with an atomic read of and size and version.
|
2011-08-18 11:35:14 +02:00
|
|
|
*/
|
|
|
|
|
def size: Int
|
|
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* Returns a VersionedIterator containing all connectected ActorRefs at some moment in time. Since there is
|
|
|
|
|
* the time element, also the version is included to be able to read the data (the connections) and the version
|
|
|
|
|
* in an atomic manner.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
2011-08-27 08:10:25 +03:00
|
|
|
* This Iterable is 'persistent'. So it can be handed out to different threads and they see a stable (immutable)
|
|
|
|
|
* view of some set of connections.
|
2011-08-12 10:03:33 +03:00
|
|
|
*/
|
2011-08-27 08:10:25 +03:00
|
|
|
def versionedIterable: VersionedIterable[ActorRef]
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A callback that can be used to indicate that a connected actorRef was dead.
|
|
|
|
|
* <p/>
|
|
|
|
|
* Implementations should make sure that this method can be called without the actorRef being part of the
|
|
|
|
|
* current set of connections. The most logical way to deal with this situation, is just to ignore it. One of the
|
|
|
|
|
* reasons this can happen is that multiple thread could at the 'same' moment discover for the same ActorRef that
|
|
|
|
|
* not working.
|
|
|
|
|
*
|
2011-08-29 11:44:33 +02:00
|
|
|
* It could be that even after a remove has been called for a specific ActorRef, that the ActorRef
|
2011-08-12 10:03:33 +03:00
|
|
|
* is still being used. A good behaving Router will eventually discard this reference, but no guarantees are
|
2011-08-29 09:22:14 +03:00
|
|
|
* made how long this takes.
|
2011-08-12 10:03:33 +03:00
|
|
|
*
|
|
|
|
|
* @param ref the dead
|
|
|
|
|
*/
|
2011-08-29 11:44:33 +02:00
|
|
|
def remove(deadRef: ActorRef): Unit
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
/**
|
|
|
|
|
* An Iterable that also contains a version.
|
|
|
|
|
*/
|
|
|
|
|
case class VersionedIterable[A](version: Long, val iterable: Iterable[A])
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A Helper class to create actor references that use routing.
|
|
|
|
|
*/
|
2011-07-28 15:48:03 +03:00
|
|
|
object Routing {
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
sealed trait RoutingMessage
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
case class Broadcast(message: Any) extends RoutingMessage
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
/**
|
|
|
|
|
* todo: will very likely be moved to the ActorRef.
|
|
|
|
|
*/
|
|
|
|
|
def actorOf(props: RoutedProps): ActorRef = {
|
|
|
|
|
//TODO Implement support for configuring by deployment ID etc
|
|
|
|
|
//TODO If deployId matches an already created actor (Ahead-of-time deployed) return that actor
|
|
|
|
|
//TODO If deployId exists in config, it will override the specified Props (should we attempt to merge?)
|
|
|
|
|
//TODO If the actor deployed uses a different config, then ignore or throw exception?
|
|
|
|
|
|
|
|
|
|
val clusteringEnabled = ReflectiveAccess.ClusterModule.isEnabled
|
|
|
|
|
val localOnly = props.localOnly
|
|
|
|
|
|
|
|
|
|
if (!localOnly && !clusteringEnabled)
|
|
|
|
|
throw new IllegalArgumentException("Can't have clustered actor reference without the ClusterModule being enabled")
|
|
|
|
|
else if (clusteringEnabled && !props.localOnly) {
|
2011-08-29 09:22:14 +03:00
|
|
|
ReflectiveAccess.ClusterModule.newClusteredActorRef(props).start()
|
2011-08-27 08:10:25 +03:00
|
|
|
} else {
|
2011-08-29 09:22:14 +03:00
|
|
|
if (props.connections.isEmpty)
|
|
|
|
|
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
|
|
|
|
|
|
|
|
|
|
new RoutedActorRef(props).start()
|
2011-08-27 08:10:25 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
/**
|
2011-07-28 15:48:03 +03:00
|
|
|
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
|
|
|
|
|
*
|
|
|
|
|
* @param actorAddress the address of the ActorRef.
|
|
|
|
|
* @param connections an Iterable pointing to all connected actor references.
|
|
|
|
|
* @param routerType the type of routing that should be used.
|
|
|
|
|
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
|
|
|
|
|
* how many connections it can handle.
|
|
|
|
|
*/
|
2011-08-27 08:10:25 +03:00
|
|
|
@deprecated
|
2011-08-01 09:01:15 +03:00
|
|
|
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
|
2011-08-26 17:25:18 +02:00
|
|
|
val router = routerType match {
|
|
|
|
|
case RouterType.Direct if connections.size > 1 ⇒
|
|
|
|
|
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
|
2011-07-28 16:56:35 +03:00
|
|
|
case RouterType.Direct ⇒
|
2011-08-26 17:25:18 +02:00
|
|
|
new DirectRouter()
|
2011-07-28 16:56:35 +03:00
|
|
|
case RouterType.Random ⇒
|
2011-08-26 17:25:18 +02:00
|
|
|
new RandomRouter()
|
2011-07-28 16:56:35 +03:00
|
|
|
case RouterType.RoundRobin ⇒
|
2011-08-26 17:25:18 +02:00
|
|
|
new RoundRobinRouter()
|
|
|
|
|
case r ⇒
|
|
|
|
|
throw new IllegalArgumentException("Unsupported routerType " + r)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-17 10:21:27 +03:00
|
|
|
if (connections.size == 0)
|
|
|
|
|
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
|
|
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val props = new RoutedProps(() ⇒ router, actorAddress, connections, RoutedProps.defaultTimeout, true)
|
|
|
|
|
new RoutedActorRef(props).start()
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-27 08:10:25 +03:00
|
|
|
* An Abstract convenience implementation for building an ActorReference that uses a Router.
|
2011-05-24 19:04:25 +02:00
|
|
|
*/
|
2011-08-27 08:10:25 +03:00
|
|
|
abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef {
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val router = props.routerFactory.apply()
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
def address = props.deployId
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
override def postMessageToMailbox(message: Any, channel: UntypedChannel): Unit = {
|
|
|
|
|
val sender = channel match {
|
|
|
|
|
case ref: ActorRef ⇒ Some(ref)
|
2011-07-28 16:56:35 +03:00
|
|
|
case _ ⇒ None
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
router.route(message)(sender)
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-18 11:35:14 +02:00
|
|
|
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
|
|
|
|
message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = {
|
2011-07-28 15:48:03 +03:00
|
|
|
val sender = channel match {
|
|
|
|
|
case ref: ActorRef ⇒ Some(ref)
|
2011-07-28 16:56:35 +03:00
|
|
|
case _ ⇒ None
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
router.route[Any](message, timeout)(sender)
|
2011-05-24 19:04:25 +02:00
|
|
|
}
|
2011-08-27 08:10:25 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
|
|
|
|
|
* on (or more) of these actors.
|
|
|
|
|
*/
|
|
|
|
|
private[akka] class RoutedActorRef(val routedProps: RoutedProps)
|
|
|
|
|
extends AbstractRoutedActorRef(routedProps) {
|
|
|
|
|
|
|
|
|
|
router.init(new RoutedActorRefConnections(routedProps.connections))
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
def start(): this.type = synchronized[this.type] {
|
2011-08-26 17:25:18 +02:00
|
|
|
if (_status == ActorRefInternals.UNSTARTED)
|
|
|
|
|
_status = ActorRefInternals.RUNNING
|
2011-07-28 15:48:03 +03:00
|
|
|
this
|
|
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
def stop() {
|
|
|
|
|
synchronized {
|
|
|
|
|
if (_status == ActorRefInternals.RUNNING) {
|
|
|
|
|
_status = ActorRefInternals.SHUTDOWN
|
|
|
|
|
postMessageToMailbox(RemoteActorSystemMessage.Stop, None)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-05-24 19:04:25 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
private class RoutedActorRefConnections() extends RouterConnections {
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
private val state = new AtomicReference[VersionedIterable[ActorRef]]()
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
def this(connectionIterable: Iterable[ActorRef]) = {
|
|
|
|
|
this()
|
2011-08-27 08:10:25 +03:00
|
|
|
state.set(new VersionedIterable[ActorRef](Long.MinValue, connectionIterable))
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
2010-03-04 19:02:23 +01:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
def version: Long = state.get().version
|
|
|
|
|
|
2011-08-29 09:22:14 +03:00
|
|
|
def size: Int = state.get().iterable.size
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
def versionedIterable = state.get
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
@tailrec
|
2011-08-29 11:44:33 +02:00
|
|
|
final def remove(ref: ActorRef) = {
|
2011-08-12 10:03:33 +03:00
|
|
|
val oldState = state.get()
|
|
|
|
|
|
|
|
|
|
//remote the ref from the connections.
|
2011-08-27 08:10:25 +03:00
|
|
|
var newList = oldState.iterable.filter(currentActorRef ⇒ currentActorRef ne ref)
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
if (newList.size != oldState.iterable.size) {
|
2011-08-12 14:35:45 +02:00
|
|
|
//one or more occurrences of the actorRef were removed, so we need to update the state.
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val newState = new VersionedIterable[ActorRef](oldState.version + 1, newList)
|
2011-08-12 10:03:33 +03:00
|
|
|
//if we are not able to update the state, we just try again.
|
2011-08-29 11:44:33 +02:00
|
|
|
if (!state.compareAndSet(oldState, newState)) remove(ref)
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2010-03-04 19:02:23 +01:00
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
/**
|
|
|
|
|
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
|
|
|
|
|
* Router only needs to implement the next method.
|
|
|
|
|
*
|
2011-08-12 14:35:45 +02:00
|
|
|
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
|
|
|
|
* FIXME: this is also the location where message buffering should be done in case of failure.
|
2011-07-28 15:48:03 +03:00
|
|
|
*/
|
|
|
|
|
trait BasicRouter extends Router {
|
2010-02-13 21:45:35 +01:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@volatile
|
|
|
|
|
protected var connections: RouterConnections = _
|
|
|
|
|
|
|
|
|
|
def init(connections: RouterConnections) = {
|
|
|
|
|
this.connections = connections
|
|
|
|
|
}
|
|
|
|
|
|
2011-07-28 15:48:03 +03:00
|
|
|
def route(message: Any)(implicit sender: Option[ActorRef]): Unit = message match {
|
|
|
|
|
case Routing.Broadcast(message) ⇒
|
|
|
|
|
//it is a broadcast message, we are going to send to message to all connections.
|
2011-08-27 08:10:25 +03:00
|
|
|
connections.versionedIterable.iterable.foreach(actor ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
|
|
|
|
actor.!(message)(sender)
|
|
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
2011-07-28 16:56:35 +03:00
|
|
|
})
|
2011-07-28 15:48:03 +03:00
|
|
|
case _ ⇒
|
|
|
|
|
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
|
|
|
next match {
|
2011-07-28 16:56:35 +03:00
|
|
|
case Some(actor) ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
|
|
|
|
actor.!(message)(sender)
|
|
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
|
|
|
|
}
|
2011-07-28 16:56:35 +03:00
|
|
|
case None ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
throwNoConnectionsError()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-07-28 16:56:35 +03:00
|
|
|
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
|
2011-07-28 15:48:03 +03:00
|
|
|
case Routing.Broadcast(message) ⇒
|
2011-08-18 11:35:14 +02:00
|
|
|
throw new RoutingException("Broadcasting using '?' is for the time being is not supported. Use ScatterGatherRouter.")
|
2011-07-28 15:48:03 +03:00
|
|
|
case _ ⇒
|
|
|
|
|
//it no broadcast message, we are going to select an actor from the connections and send the message to him.
|
|
|
|
|
next match {
|
2011-07-28 16:56:35 +03:00
|
|
|
case Some(actor) ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
try {
|
|
|
|
|
actor.?(message, timeout)(sender).asInstanceOf[Future[T]]
|
|
|
|
|
} catch {
|
2011-07-28 16:56:35 +03:00
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-07-28 15:48:03 +03:00
|
|
|
throw e
|
|
|
|
|
}
|
2011-07-28 16:56:35 +03:00
|
|
|
case None ⇒
|
2011-07-28 15:48:03 +03:00
|
|
|
throwNoConnectionsError()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
protected def next: Option[ActorRef]
|
|
|
|
|
|
|
|
|
|
private def throwNoConnectionsError() = {
|
|
|
|
|
val error = new RoutingException("No replica connections for router")
|
|
|
|
|
EventHandler.error(error, this, error.toString)
|
|
|
|
|
throw error
|
|
|
|
|
}
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-08-29 09:22:14 +03:00
|
|
|
* A DirectRouter a Router that only has a single connected actorRef and forwards all request to that actorRef.
|
|
|
|
|
*
|
2011-07-28 15:48:03 +03:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class DirectRouter extends BasicRouter {
|
|
|
|
|
|
|
|
|
|
private val state = new AtomicReference[DirectRouterState]()
|
2011-07-28 15:48:03 +03:00
|
|
|
|
|
|
|
|
lazy val next: Option[ActorRef] = {
|
2011-08-12 10:03:33 +03:00
|
|
|
val currentState = getState()
|
|
|
|
|
if (currentState.ref == null) None else Some(currentState.ref)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
|
|
|
|
private def getState(): DirectRouterState = {
|
|
|
|
|
val currentState = state.get()
|
|
|
|
|
|
|
|
|
|
if (currentState != null && connections.version == currentState.version) {
|
|
|
|
|
//we are lucky since nothing has changed in the connections.
|
|
|
|
|
currentState
|
|
|
|
|
} else {
|
|
|
|
|
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
|
|
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val versionedIterable = connections.versionedIterable
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val connectionCount = versionedIterable.iterable.size
|
|
|
|
|
if (connectionCount > 1)
|
|
|
|
|
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version)
|
2011-08-29 09:22:14 +03:00
|
|
|
if (state.compareAndSet(currentState, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-08-12 10:03:33 +03:00
|
|
|
getState()
|
|
|
|
|
}
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2011-08-18 11:35:14 +02:00
|
|
|
private case class DirectRouterState(val ref: ActorRef, val version: Long)
|
2011-08-12 10:03:33 +03:00
|
|
|
|
2010-05-21 20:08:49 +02:00
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
|
|
|
|
|
/**
|
2011-07-28 15:48:03 +03:00
|
|
|
* A Router that randomly selects one of the target connections to send a message to.
|
|
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class RandomRouter extends BasicRouter {
|
|
|
|
|
|
|
|
|
|
private val state = new AtomicReference[RandomRouterState]()
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-08-12 14:35:45 +02:00
|
|
|
//FIXME: threadlocal random?
|
2011-08-26 17:25:18 +02:00
|
|
|
private val random = new java.util.Random(System.nanoTime())
|
2011-07-28 15:48:03 +03:00
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
def next: Option[ActorRef] = getState().array match {
|
|
|
|
|
case a if a.isEmpty ⇒ None
|
|
|
|
|
case a ⇒ Some(a(random.nextInt(a.length)))
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@tailrec
|
|
|
|
|
private def getState(): RandomRouterState = {
|
|
|
|
|
val currentState = state.get()
|
|
|
|
|
|
|
|
|
|
if (currentState != null && currentState.version == connections.version) {
|
|
|
|
|
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
|
|
|
|
currentState
|
|
|
|
|
} else {
|
|
|
|
|
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val versionedIterable = connections.versionedIterable
|
2011-08-29 09:22:14 +03:00
|
|
|
val newState = new RandomRouterState(versionedIterable.iterable.toIndexedSeq, versionedIterable.version)
|
|
|
|
|
if (state.compareAndSet(currentState, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-08-12 10:03:33 +03:00
|
|
|
getState()
|
2011-07-28 15:48:03 +03:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
2011-08-12 10:03:33 +03:00
|
|
|
* A Router that uses round-robin to select a connection. For concurrent calls, round robin is just a best effort.
|
2011-07-28 15:48:03 +03:00
|
|
|
*
|
|
|
|
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
2011-05-17 21:15:27 +02:00
|
|
|
*/
|
2011-08-12 10:03:33 +03:00
|
|
|
class RoundRobinRouter extends BasicRouter {
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
private val state = new AtomicReference[RoundRobinState]()
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
def next: Option[ActorRef] = getState().next()
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@tailrec
|
|
|
|
|
private def getState(): RoundRobinState = {
|
|
|
|
|
val currentState = state.get()
|
2011-05-17 21:15:27 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
if (currentState != null && currentState.version == connections.version) {
|
|
|
|
|
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
|
|
|
|
|
currentState
|
|
|
|
|
} else {
|
|
|
|
|
//there has been a change in connections, or it was the first try, so we need to update the internal state
|
2011-08-18 11:35:14 +02:00
|
|
|
|
2011-08-27 08:10:25 +03:00
|
|
|
val versionedIterable = connections.versionedIterable
|
2011-08-29 09:22:14 +03:00
|
|
|
val newState = new RoundRobinState(versionedIterable.iterable.toIndexedSeq[ActorRef], versionedIterable.version)
|
|
|
|
|
if (state.compareAndSet(currentState, newState))
|
2011-08-12 10:03:33 +03:00
|
|
|
//we are lucky since we just updated the state, so we can send it back as the state to use
|
|
|
|
|
newState
|
2011-08-26 17:25:18 +02:00
|
|
|
else //we failed to update the state, lets try again... better luck next time.
|
2011-08-12 10:03:33 +03:00
|
|
|
getState()
|
2011-05-21 15:37:09 +02:00
|
|
|
}
|
2011-08-12 10:03:33 +03:00
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
|
2011-08-12 10:03:33 +03:00
|
|
|
|
|
|
|
|
private val index = new AtomicInteger(0)
|
|
|
|
|
|
|
|
|
|
def next(): Option[ActorRef] = if (array.isEmpty) None else Some(array(nextIndex()))
|
2011-05-21 15:37:09 +02:00
|
|
|
|
2011-08-12 10:03:33 +03:00
|
|
|
@tailrec
|
|
|
|
|
private def nextIndex(): Int = {
|
|
|
|
|
val oldIndex = index.get()
|
|
|
|
|
var newIndex = if (oldIndex == array.length - 1) 0 else oldIndex + 1
|
|
|
|
|
|
|
|
|
|
if (!index.compareAndSet(oldIndex, newIndex)) nextIndex()
|
|
|
|
|
else oldIndex
|
|
|
|
|
}
|
2011-05-17 21:15:27 +02:00
|
|
|
}
|
|
|
|
|
}
|
2011-08-17 10:21:27 +03:00
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* ScatterGatherRouter broadcasts the message to all connections and gathers results according to the
|
|
|
|
|
* specified strategy (specific router needs to implement `gather` method).
|
2011-08-26 08:24:26 +02:00
|
|
|
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
2011-08-17 10:21:27 +03:00
|
|
|
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages, sent in a fire-forget
|
2011-08-26 08:24:26 +02:00
|
|
|
* mode, the router would behave as {@link BasicRouter}, unless it's mixed in with other router type
|
|
|
|
|
*
|
2011-08-17 10:21:27 +03:00
|
|
|
* FIXME: This also is the location where a failover is done in the future if an ActorRef fails and a different one needs to be selected.
|
|
|
|
|
* FIXME: this is also the location where message buffering should be done in case of failure.
|
|
|
|
|
*/
|
|
|
|
|
trait ScatterGatherRouter extends BasicRouter with Serializable {
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Aggregates the responses into a single Future
|
|
|
|
|
* @param results Futures of the responses from connections
|
|
|
|
|
*/
|
|
|
|
|
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
|
|
|
|
|
|
|
|
|
|
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
|
2011-08-27 08:10:25 +03:00
|
|
|
val responses = connections.versionedIterable.iterable.flatMap { actor ⇒
|
2011-08-17 10:21:27 +03:00
|
|
|
try {
|
|
|
|
|
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
|
|
|
|
|
} catch {
|
|
|
|
|
case e: Exception ⇒
|
2011-08-29 11:44:33 +02:00
|
|
|
connections.remove(actor)
|
2011-08-17 10:21:27 +03:00
|
|
|
None
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2011-08-26 17:25:18 +02:00
|
|
|
if (responses.isEmpty)
|
2011-08-17 10:21:27 +03:00
|
|
|
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
|
2011-08-26 17:25:18 +02:00
|
|
|
else
|
|
|
|
|
gather(responses)
|
2011-08-17 10:21:27 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
override def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
|
|
|
|
|
case Routing.Broadcast(message) ⇒ scatterGather(message, timeout)
|
|
|
|
|
case message ⇒ super.route(message, timeout)(sender)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/*
|
|
|
|
|
* Simple router that broadcasts the message to all connections, and replies with the first response
|
2011-08-26 08:24:26 +02:00
|
|
|
* Scatter-gather pattern will be applied only to the messages broadcasted using Future
|
2011-08-17 10:21:27 +03:00
|
|
|
* (wrapped into {@link Routing.Broadcast} and sent with "?" method). For the messages sent in a fire-forget
|
2011-08-26 08:24:26 +02:00
|
|
|
* mode, the router would behave as {@link RoundRobinRouter}
|
2011-08-17 10:21:27 +03:00
|
|
|
*/
|
|
|
|
|
class ScatterGatherFirstCompletedRouter extends RoundRobinRouter with ScatterGatherRouter {
|
|
|
|
|
|
|
|
|
|
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G] = Futures.firstCompletedOf(results)
|
|
|
|
|
|
|
|
|
|
}
|