Merge remote branch 'origin/master' into logging

BalancingDispatcherModelSpec is currently broken, leaves count==1 in
wavesOfActors test, committed anyway in order to check out where it
broke before this merge
This commit is contained in:
Roland 2011-11-09 14:56:05 +01:00
commit a747ef7856
190 changed files with 3076 additions and 9153 deletions

View file

@ -41,7 +41,7 @@ trait Router {
*
* @throws RoutingException if something goes wrong while routing the message
*/
def route(message: Any)(implicit sender: Option[ActorRef])
def route(message: Any)(implicit sender: ActorRef)
/**
* Routes the message using a timeout to one of the connections and returns a Future to synchronize on the
@ -49,7 +49,7 @@ trait Router {
*
* @throws RoutingExceptionif something goes wrong while routing the message.
*/
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T]
def route[T](message: Any, timeout: Timeout): Future[T]
}
/**
@ -92,34 +92,19 @@ object Routing {
/**
* An Abstract convenience implementation for building an ActorReference that uses a Router.
*/
abstract private[akka] class AbstractRoutedActorRef(val props: RoutedProps) extends UnsupportedActorRef {
private[akka] override val uuid: Uuid = newUuid
abstract private[akka] class AbstractRoutedActorRef(val app: AkkaApplication, val props: RoutedProps) extends UnsupportedActorRef {
val router = props.routerFactory()
override def postMessageToMailbox(message: Any, channel: UntypedChannel) = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
router.route(message)(sender)
}
override def postMessageToMailbox(message: Any, sender: ActorRef) = router.route(message)(sender)
override def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any, timeout: Timeout, channel: UntypedChannel): Future[Any] = {
val sender = channel match {
case ref: ActorRef Some(ref)
case _ None
}
router.route[Any](message, timeout)(sender)
}
override def ?(message: Any)(implicit timeout: Timeout): Future[Any] = router.route(message, timeout)
}
/**
* A RoutedActorRef is an ActorRef that has a set of connected ActorRef and it uses a Router to send a message to
* on (or more) of these actors.
*/
private[akka] class RoutedActorRef(val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(routedProps) {
private[akka] class RoutedActorRef(app: AkkaApplication, val routedProps: RoutedProps, override val address: String) extends AbstractRoutedActorRef(app, routedProps) {
@volatile
private var running: Boolean = true
@ -130,7 +115,7 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, override val ad
synchronized {
if (running) {
running = false
router.route(Routing.Broadcast(PoisonPill))(Some(this))
router.route(Routing.Broadcast(PoisonPill))(this)
}
}
}
@ -141,8 +126,6 @@ private[akka] class RoutedActorRef(val routedProps: RoutedProps, override val ad
/**
* An Abstract Router implementation that already provides the basic infrastructure so that a concrete
* Router only needs to implement the next method.
*
* FIXME: this is also the location where message buffering should be done in case of failure.
*/
trait BasicRouter extends Router {
@ -153,8 +136,9 @@ trait BasicRouter extends Router {
this.connectionManager = connectionManager
}
def route(message: Any)(implicit sender: Option[ActorRef]) = message match {
def route(message: Any)(implicit sender: ActorRef) = message match {
case Routing.Broadcast(message)
//it is a broadcast message, we are going to send to message to all connections.
connectionManager.connections.iterable foreach { connection
try {
@ -181,7 +165,7 @@ trait BasicRouter extends Router {
}
}
def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
def route[T](message: Any, timeout: Timeout): Future[T] = message match {
case Routing.Broadcast(message)
throw new RoutingException("Broadcasting using '?'/'ask' is for the time being is not supported. Use ScatterGatherRouter.")
case _
@ -189,8 +173,7 @@ trait BasicRouter extends Router {
next match {
case Some(connection)
try {
// FIXME is this not wrong? it will not pass on and use the original Future but create a new one. Should reuse 'channel: UntypedChannel' in the AbstractRoutedActorRef
connection.?(message, timeout)(sender).asInstanceOf[Future[T]]
connection.?(message, timeout).asInstanceOf[Future[T]] //FIXME this does not preserve the original sender, shouldn't it?
} catch {
case e: Exception
connectionManager.remove(connection)
@ -203,10 +186,7 @@ trait BasicRouter extends Router {
protected def next: Option[ActorRef]
private def throwNoConnectionsError = {
val error = new RoutingException("No replica connections for router")
throw error
}
private def throwNoConnectionsError = throw new RoutingException("No replica connections for router")
}
/**
@ -257,15 +237,17 @@ class DirectRouter extends BasicRouter {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RandomRouter extends BasicRouter {
import java.security.SecureRandom
private val state = new AtomicReference[RandomRouterState]
//FIXME: threadlocal random?
private val random = new java.util.Random(System.nanoTime)
private val random = new ThreadLocal[SecureRandom] {
override def initialValue = SecureRandom.getInstance("SHA1PRNG")
}
def next: Option[ActorRef] = currentState.array match {
case a if a.isEmpty None
case a Some(a(random.nextInt(a.length)))
case a Some(a(random.get.nextInt(a.length)))
}
@tailrec
@ -358,11 +340,11 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
*/
protected def gather[S, G >: S](results: Iterable[Future[S]]): Future[G]
private def scatterGather[S, G >: S](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[G] = {
private def scatterGather[S, G >: S](message: Any, timeout: Timeout): Future[G] = {
val responses = connectionManager.connections.iterable.flatMap { actor
try {
if (actor.isShutdown) throw ActorInitializationException(actor, "For compatability - check death first", new Exception) // for stack trace
Some(actor.?(message, timeout)(sender).asInstanceOf[Future[S]])
Some(actor.?(message, timeout).asInstanceOf[Future[S]])
} catch {
case e: Exception
connectionManager.remove(actor)
@ -375,9 +357,9 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
else gather(responses)
}
override def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {
override def route[T](message: Any, timeout: Timeout): Future[T] = message match {
case Routing.Broadcast(message) scatterGather(message, timeout)
case message super.route(message, timeout)(sender)
case message super.route(message, timeout)
}
}