Initial take on new routing implementation. Please note that this is work in progress!

This commit is contained in:
Henrik Engstrom 2011-12-08 14:30:57 +01:00
parent bf3ce9bb87
commit 90b6833978
13 changed files with 475 additions and 398 deletions

View file

@ -4,14 +4,14 @@
package akka.tutorial.first.scala
import java.util.concurrent.CountDownLatch
import akka.routing.{ RoutedActorRef, LocalConnectionManager, RoundRobinRouter, RoutedProps }
import akka.actor.{ ActorSystemImpl, Actor, ActorSystem }
import akka.actor.InternalActorRef
import akka.actor._
import akka.routing._
import com.typesafe.config.ConfigFactory
object Pi extends App {
// Initiate the calculation
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
calculate(nrOfWorkers = 4, nrOfElements = 10, nrOfMessages = 10)
// ====================
// ===== Messages =====
@ -38,7 +38,9 @@ object Pi extends App {
}
def receive = {
case Work(start, nrOfElements) sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
case Work(start, nrOfElements)
println("*** RECEIVED MESSAGE IN: " + self.path)
sender ! Result(calculatePiFor(start, nrOfElements)) // perform the work
}
}
@ -53,14 +55,26 @@ object Pi extends App {
var start: Long = _
// create the workers
val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
var workers = Vector.empty[ActorRef]
for (i 1 to 2) {
workers = context.actorOf[Worker] +: workers
}
// TODO (HE) : use this way of creating actors
//val workers = Vector.fill(nrOfWorkers)(context.actorOf[Worker])
/*
// wrap them with a load-balancing router
// FIXME routers are intended to be used like this
implicit val timout = context.system.settings.ActorTimeout
implicit val dispatcher = context.dispatcher
val props = RoutedProps(routerFactory = () new RoundRobinRouter, connectionManager = new LocalConnectionManager(workers))
val router = new RoutedActorRef(context.system, props, self.asInstanceOf[InternalActorRef], "pi")
*/
//val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 5)), "pi")
val router = context.actorOf(Props(new Worker).withRouting(RoundRobinRouter(nrOfInstances = 3, targets = Seq(workers.head, workers.tail.head))), "pi")
// message handler
def receive = {
@ -93,13 +107,13 @@ object Pi extends App {
// ===== Run it =====
// ==================
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
val system = ActorSystem()
val system = ActorSystem("x", ConfigFactory.parseString("akka.actor.debug.lifecycle=true\nakka.loglevel=DEBUG"))
// this latch is only plumbing to know when the calculation is completed
val latch = new CountDownLatch(1)
// create the master
val master = system.actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch))
val master = system.actorOf(Props(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)), "master")
// start the calculation
master ! Calculate