Adding initial support for Props

This commit is contained in:
Viktor Klang 2011-08-26 17:25:18 +02:00
parent 4bc0cfe0bf
commit c7d58c600b
102 changed files with 1141 additions and 1524 deletions

View file

@ -158,34 +158,30 @@ object Routing {
* how many connections it can handle.
*/
def actorOf(actorAddress: String, connections: Iterable[ActorRef], routerType: RouterType): ActorRef = {
val ref = routerType match {
val router = routerType match {
case RouterType.Direct if connections.size > 1
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
case RouterType.Direct
if (connections.size > 1)
throw new IllegalArgumentException("A direct router can't have more than 1 connection")
actorOf(actorAddress, connections, new DirectRouter())
new DirectRouter()
case RouterType.Random
actorOf(actorAddress, connections, new RandomRouter())
new RandomRouter()
case RouterType.RoundRobin
actorOf(actorAddress, connections, new RoundRobinRouter())
case _ throw new IllegalArgumentException("Unsupported routerType " + routerType)
new RoundRobinRouter()
case r
throw new IllegalArgumentException("Unsupported routerType " + r)
}
ref.start()
actorOf(actorAddress, connections, router).start()
}
def actorOf(actorAddress: String, connections: Iterable[ActorRef], router: Router): ActorRef = {
if (connections.size == 0)
def actorOf(actorAddress: String, connections: Iterable[ActorRef], router: Router): ActorRef =
if (connections.isEmpty)
throw new IllegalArgumentException("To create a routed actor ref, at least one connection is required")
else
new RoutedActorRef(actorAddress, router, connections)
new RoutedActorRef(actorAddress, router, connections)
}
def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef = {
def actorOfWithRoundRobin(actorAddress: String, connections: Iterable[ActorRef]): ActorRef =
actorOf(actorAddress, connections, akka.routing.RouterType.RoundRobin)
}
}
/**
@ -214,7 +210,8 @@ class RoutedActorRef(val address: String, val router: Router, val connectionIter
}
def start(): this.type = synchronized[this.type] {
_status = ActorRefInternals.RUNNING
if (_status == ActorRefInternals.UNSTARTED)
_status = ActorRefInternals.RUNNING
this
}
@ -370,13 +367,10 @@ class DirectRouter extends BasicRouter {
throw new RoutingException("A DirectRouter can't have more than 1 connected Actor, but found [%s]".format(connectionIterable.size))
val newState = new DirectRouterState(connectionIterable.head, version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use
newState
} else {
//we failed to update the state, lets try again... better luck next time.
else //we failed to update the state, lets try again... better luck next time.
getState()
}
}
}
@ -394,16 +388,11 @@ class RandomRouter extends BasicRouter {
private val state = new AtomicReference[RandomRouterState]()
//FIXME: threadlocal random?
private val random = new java.util.Random(System.currentTimeMillis)
private val random = new java.util.Random(System.nanoTime())
def next: Option[ActorRef] = {
val state = getState()
if (state.array.isEmpty) {
None
} else {
val index = random.nextInt(state.array.length)
Some(state.array(index))
}
def next: Option[ActorRef] = getState().array match {
case a if a.isEmpty None
case a Some(a(random.nextInt(a.length)))
}
@tailrec
@ -416,19 +405,16 @@ class RandomRouter extends BasicRouter {
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val (version, connectionIterable) = connections.versionedIterator
val newState = new RandomRouterState(connectionIterable.toArray[ActorRef], version)
val newState = new RandomRouterState(connectionIterable.toIndexedSeq, version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use
newState
} else {
//we failed to update the state, lets try again... better luck next time.
else //we failed to update the state, lets try again... better luck next time.
getState()
}
}
}
private case class RandomRouterState(val array: Array[ActorRef], val version: Long)
private case class RandomRouterState(array: IndexedSeq[ActorRef], version: Long)
}
/**
@ -452,19 +438,16 @@ class RoundRobinRouter extends BasicRouter {
} else {
//there has been a change in connections, or it was the first try, so we need to update the internal state
val (version, connectionIterable) = connections.versionedIterator
val newState = new RoundRobinState(connectionIterable.toArray[ActorRef], version)
val newState = new RoundRobinState(connectionIterable.toIndexedSeq, version)
if (state.compareAndSet(currentState, newState)) {
//we are lucky since we just updated the state, so we can send it back as the state to use
if (state.compareAndSet(currentState, newState)) //we are lucky since we just updated the state, so we can send it back as the state to use
newState
} else {
//we failed to update the state, lets try again... better luck next time.
else //we failed to update the state, lets try again... better luck next time.
getState()
}
}
}
private case class RoundRobinState(val array: Array[ActorRef], val version: Long) {
private case class RoundRobinState(array: IndexedSeq[ActorRef], version: Long) {
private val index = new AtomicInteger(0)
@ -510,10 +493,10 @@ trait ScatterGatherRouter extends BasicRouter with Serializable {
}
}
if (responses.size == 0)
if (responses.isEmpty)
throw new RoutingException("No connections can process the message [%s] sent to scatter-gather router" format (message))
gather(responses)
else
gather(responses)
}
override def route[T](message: Any, timeout: Timeout)(implicit sender: Option[ActorRef]): Future[T] = message match {