Merge with Peter's work (i.e. merging master into tame-globals)

This commit is contained in:
Roland 2011-10-13 14:17:07 +02:00
commit 44b94643e5
60 changed files with 2078 additions and 1700 deletions

View file

@ -10,7 +10,7 @@ import akka.event.EventHandler
import akka.config.ConfigurationException
import akka.dispatch.{ Future, MessageDispatcher }
import akka.AkkaApplication
import akka.util.ReflectiveAccess
import java.net.InetSocketAddress
import java.lang.reflect.InvocationTargetException
import java.util.concurrent.atomic.{ AtomicReference, AtomicInteger }
@ -35,7 +35,7 @@ trait Router {
* JMM Guarantees:
* This method guarantees that all changes made in this method, are visible before one of the routing methods is called.
*/
def init(connections: FailureDetector)
def init(connectionManager: ConnectionManager)
/**
* Routes the message to one of the connections.
@ -53,150 +53,40 @@ trait Router {
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
}
/**
* An Iterable that also contains a version.
*/
trait VersionedIterable[A] {
val version: Long
def iterable: Iterable[A]
def apply(): Iterable[A] = iterable
}
/**
* An {@link AkkaException} thrown when something goes wrong while routing a message
*/
class RoutingException(message: String) extends AkkaException(message)
/**
* Default "local" failure detector. This failure detector removes an actor from the
* router if an exception occured in the router's thread (e.g. when trying to add
* the message to the receiver's mailbox).
* A Helper class to create actor references that use routing.
*/
class RemoveConnectionOnFirstFailureLocalFailureDetector extends FailureDetector {
case class State(version: Long, iterable: Iterable[ActorRef]) extends VersionedIterable[ActorRef]
private val state = new AtomicReference[State]
def this(connectionIterable: Iterable[ActorRef]) = {
this()
state.set(State(Long.MinValue, connectionIterable))
}
def isAvailable(connection: InetSocketAddress): Boolean =
state.get.iterable.find(c connection == c).isDefined
def recordSuccess(connection: InetSocketAddress, timestamp: Long) {}
def recordFailure(connection: InetSocketAddress, timestamp: Long) {}
def version: Long = state.get.version
def size: Int = state.get.iterable.size
def versionedIterable = state.get
def stopAll() {
state.get.iterable foreach (_.stop())
}
@tailrec
final def remove(ref: ActorRef) = {
val oldState = state.get
//remote the ref from the connections.
var newList = oldState.iterable.filter(currentActorRef currentActorRef ne ref)
if (newList.size != oldState.iterable.size) {
//one or more occurrences of the actorRef were removed, so we need to update the state.
val newState = State(oldState.version + 1, newList)
//if we are not able to update the state, we just try again.
if (!state.compareAndSet(oldState, newState)) remove(ref)
}
}
def failOver(from: InetSocketAddress, to: InetSocketAddress) {} // do nothing here
def putIfAbsent(address: InetSocketAddress, newConnectionFactory: () ActorRef): ActorRef = {
throw new UnsupportedOperationException("Not supported")
}
}
object Routing {
sealed trait RoutingMessage
/**
* Used to broadcast a message to all connections in a router. E.g. every connection gets the message
* regardless of their routing algorithm.
*/
case class Broadcast(message: Any) extends RoutingMessage
}
/**
* A Helper class to create actor references that use routing.
*/
class Routing(val application: AkkaApplication) {
/**
* FIXME: will very likely be moved to the ActorRef.
*/
def actorOf(props: RoutedProps, address: String = newUuid().toString): ActorRef = {
//TODO Implement support for configuring by deployment ID etc
//TODO If address matches an already created actor (Ahead-of-time deployed) return that actor
//TODO If address exists in config, it will override the specified Props (should we attempt to merge?)
//TODO If the actor deployed uses a different config, then ignore or throw exception?
val clusteringEnabled = application.reflective.ClusterModule.isEnabled
if (clusteringEnabled && !props.localOnly)
application.reflective.ClusterModule.newClusteredActorRef(props)
else {
if (props.connections.isEmpty) //FIXME Shouldn't this be checked when instance is created so that it works with linking instead of barfing?
throw new IllegalArgumentException("A routed actorRef can't have an empty connection set")
new RoutedActorRef(props, address)
}
}
/**
* Creates a new started RoutedActorRef that uses routing to deliver a message to one of its connected actors.
*
* @param actorAddress the address of the ActorRef.
* @param connections an Iterable pointing to all connected actor references.
* @param routerType the type of routing that should be used.
* @throws IllegalArgumentException if the number of connections is zero, or if it depends on the actual router implementation
* how many connections it can handle.
*/
@deprecated("Use 'Routing.actorOf(props: RoutedProps)' instead.", "2.0")
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
val router = routerType match {
case RouterType.Direct if connections.size > 1
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
case RouterType.Direct
new DirectRouter
case RouterType.Random
new RandomRouter
case RouterType.RoundRobin
new RoundRobinRouter
case r
throw new IllegalArgumentException("Unsupported routerType " + r)
}
if (connections.size == 0)
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
new RoutedActorRef(
new RoutedProps(
() router,
connections,
RoutedProps.defaultFailureDetectorFactory,
RoutedProps.defaultTimeout, true),
actorAddress)
def createCustomRouter(implClass: String): Router = {
ReflectiveAccess.createInstance(
implClass,
Array[Class[_]](),
Array[AnyRef]()) match {
case Right(router) router.asInstanceOf[Router]
case Left(exception)
val cause = exception match {
case i: InvocationTargetException i.getTargetException
case _ exception
}
throw new ConfigurationException(
"Could not instantiate custom Router of [" +
implClass + "] due to: " +
cause, cause)
}
}
}
@ -245,7 +135,7 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: St
}
}
router.init(new RemoveConnectionOnFirstFailureLocalFailureDetector(routedProps.connections))
router.init(routedProps.connectionManager)
}
/**
@ -257,21 +147,21 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, val address: St
trait BasicRouter extends Router {
@volatile
protected var connections: FailureDetector = _
protected var connectionManager: ConnectionManager = _
def init(connections: FailureDetector) = {
this.connections = connections
def init(connectionManager: ConnectionManager) = {
this.connectionManager = connectionManager
}
def route(message: Any)(implicit sender: Option[ActorRef]) = message match {
case Routing.Broadcast(message)
//it is a broadcast message, we are going to send to message to all connections.
connections.versionedIterable.iterable foreach { connection
connectionManager.connections.iterable foreach { connection
try {
connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
} catch {
case e: Exception
connections.remove(connection)
connectionManager.remove(connection)
throw e
}
}
@ -283,7 +173,7 @@ trait BasicRouter extends Router {
connection.!(message)(sender) // we use original sender, so this is essentially a 'forward'
} catch {
case e: Exception
connections.remove(connection)
connectionManager.remove(connection)
throw e
}
case None
@ -303,7 +193,7 @@ trait BasicRouter extends Router {
connection.?(message, timeout)(sender).asInstanceOf[Future[T]]
} catch {
case e: Exception
connections.remove(connection)
connectionManager.remove(connection)
throw e
}
case None
@ -329,33 +219,32 @@ class DirectRouter extends BasicRouter {
private val state = new AtomicReference[DirectRouterState]
lazy val next: Option[ActorRef] = {
val currentState = getState
if (currentState.ref == null) None else Some(currentState.ref)
val current = currentState
if (current.ref == null) None else Some(current.ref)
}
// FIXME rename all 'getState' methods to 'currentState', non-scala
@tailrec
private def getState: DirectRouterState = {
val currentState = state.get
private def currentState: DirectRouterState = {
val current = state.get
if (currentState != null && connections.version == currentState.version) {
if (current != null && connectionManager.version == current.version) {
//we are lucky since nothing has changed in the connections.
currentState
current
} else {
//there has been a change in the connections, or this is the first time this method is called. So we are going to do some updating.
val versionedIterable = connections.versionedIterable
val connections = connectionManager.connections
val connectionCount = versionedIterable.iterable.size
val connectionCount = connections.iterable.size
if (connectionCount > 1)
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionCount))
val newState = new DirectRouterState(versionedIterable.iterable.head, versionedIterable.version)
if (state.compareAndSet(currentState, newState))
val newState = new DirectRouterState(connections.iterable.head, connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
else //we failed to update the state, lets try again... better luck next time.
getState
currentState // recur
}
}
@ -374,28 +263,28 @@ class RandomRouter extends BasicRouter {
//FIXME: threadlocal random?
private val random = new java.util.Random(System.nanoTime)
def next: Option[ActorRef] = getState.array match {
def next: Option[ActorRef] = currentState.array match {
case a if a.isEmpty None
case a Some(a(random.nextInt(a.length)))
}
@tailrec
private def getState: RandomRouterState = {
val currentState = state.get
private def currentState: RandomRouterState = {
val current = state.get
if (currentState != null && currentState.version == connections.version) {
if (current != null && current.version == connectionManager.version) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
currentState
current
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val versionedIterable = connections.versionedIterable
val newState = new RandomRouterState(versionedIterable.iterable.toIndexedSeq, versionedIterable.version)
if (state.compareAndSet(currentState, newState))
val connections = connectionManager.connections
val newState = new RandomRouterState(connections.iterable.toIndexedSeq, connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
else //we failed to update the state, lets try again... better luck next time.
getState
currentState
}
}
@ -411,25 +300,25 @@ class RoundRobinRouter extends BasicRouter {
private val state = new AtomicReference[RoundRobinState]
def next: Option[ActorRef] = getState.next
def next: Option[ActorRef] = currentState.next
@tailrec
private def getState: RoundRobinState = {
val currentState = state.get
private def currentState: RoundRobinState = {
val current = state.get
if (currentState != null && currentState.version == connections.version) {
if (current != null && current.version == connectionManager.version) {
//we are lucky, since there has not been any change in the connections. So therefor we can use the existing state.
currentState
current
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val versionedIterable = connections.versionedIterable
val newState = new RoundRobinState(versionedIterable.iterable.toIndexedSeq[ActorRef], versionedIterable.version)
if (state.compareAndSet(currentState, newState))
val connections = connectionManager.connections
val newState = new RoundRobinState(connections.iterable.toIndexedSeq[ActorRef], connections.version)
if (state.compareAndSet(current, newState))
//we are lucky since we just updated the state, so we can send it back as the state to use
newState
else //we failed to update the state, lets try again... better luck next time.
getState
currentState
}
}
@ -463,19 +352,20 @@ class RoundRobinRouter extends BasicRouter {
trait ScatterGatherRouter extends BasicRouter with Serializable {
/**
* Aggregates the responses into a single Future
* Aggregates the responses into a single Future.
*
* @param results Futures of the responses from connections
*/
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
val responses = connections.versionedIterable.iterable.flatMap { actor
val responses = connectionManager.connections.iterable.flatMap { actor
try {
if (actor.isShutdown) throw new ActorInitializationException("For compatability - check death first")
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
} catch {
case e: Exception
connections.remove(actor)
connectionManager.remove(actor)
None
}
}