2011-04-02 15:22:38 +02:00
|
|
|
/**
|
2011-07-14 16:03:08 +02:00
|
|
|
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
|
2011-04-02 15:22:38 +02:00
|
|
|
*/
|
|
|
|
|
|
2011-04-05 17:28:16 +02:00
|
|
|
package akka.tutorial.second
|
2011-04-02 15:22:38 +02:00
|
|
|
|
|
|
|
|
import akka.actor.Actor._
|
2011-10-27 12:23:01 +02:00
|
|
|
import akka.event.Logging
|
2011-07-26 18:33:59 +12:00
|
|
|
import System.{ currentTimeMillis ⇒ now }
|
2011-07-28 15:48:03 +03:00
|
|
|
import akka.routing.Routing.Broadcast
|
2011-10-07 15:42:55 +02:00
|
|
|
import akka.routing._
|
2011-11-10 20:08:00 +01:00
|
|
|
import akka.actor.{ ActorRef, Timeout, Actor, PoisonPill, ActorSystem }
|
2011-04-05 17:28:16 +02:00
|
|
|
|
|
|
|
|
object Pi extends App {
|
|
|
|
|
|
2011-11-10 20:08:00 +01:00
|
|
|
val app = ActorSystem()
|
2011-10-27 12:23:01 +02:00
|
|
|
val log = Logging(app, this)
|
2011-10-12 11:34:35 +02:00
|
|
|
|
2011-04-05 17:28:16 +02:00
|
|
|
calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000)
|
2011-04-02 15:22:38 +02:00
|
|
|
|
|
|
|
|
// ====================
|
|
|
|
|
// ===== Messages =====
|
|
|
|
|
// ====================
|
|
|
|
|
sealed trait PiMessage
|
|
|
|
|
case object Calculate extends PiMessage
|
|
|
|
|
case class Work(arg: Int, nrOfElements: Int) extends PiMessage
|
|
|
|
|
case class Result(value: Double) extends PiMessage
|
|
|
|
|
|
|
|
|
|
// ==================
|
|
|
|
|
// ===== Worker =====
|
|
|
|
|
// ==================
|
|
|
|
|
class Worker() extends Actor {
|
|
|
|
|
// define the work
|
2011-07-26 18:33:59 +12:00
|
|
|
val calculatePiFor = (arg: Int, nrOfElements: Int) ⇒ {
|
2011-04-02 15:22:38 +02:00
|
|
|
val range = (arg * nrOfElements) to ((arg + 1) * nrOfElements - 1)
|
2011-04-05 17:28:16 +02:00
|
|
|
var acc = 0.0D
|
2011-07-26 18:33:59 +12:00
|
|
|
range foreach (i ⇒ acc += 4 * math.pow(-1, i) / (2 * i + 1))
|
2011-04-05 17:28:16 +02:00
|
|
|
acc
|
|
|
|
|
//range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum
|
2011-04-02 15:22:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
def receive = {
|
2011-10-22 16:06:20 +02:00
|
|
|
case Work(arg, nrOfElements) ⇒ sender ! Result(calculatePiFor(arg, nrOfElements)) // perform the work
|
2011-04-02 15:22:38 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ==================
|
|
|
|
|
// ===== Master =====
|
|
|
|
|
// ==================
|
|
|
|
|
case class Master(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) extends Actor {
|
|
|
|
|
var pi: Double = _
|
|
|
|
|
var nrOfResults: Int = _
|
|
|
|
|
|
|
|
|
|
// create the workers
|
2011-10-18 17:56:23 +02:00
|
|
|
val workers = Vector.fill(nrOfWorkers)(app.actorOf[Worker])
|
2011-04-02 15:22:38 +02:00
|
|
|
|
|
|
|
|
// wrap them with a load-balancing router
|
2011-10-18 17:56:23 +02:00
|
|
|
val router = app.actorOf(RoutedProps(
|
2011-10-07 15:42:55 +02:00
|
|
|
routerFactory = () ⇒ new RoundRobinRouter,
|
|
|
|
|
connectionManager = new LocalConnectionManager(workers)), "pi")
|
2011-04-02 15:22:38 +02:00
|
|
|
|
|
|
|
|
// phase 1, can accept a Calculate message
|
|
|
|
|
def scatter: Receive = {
|
2011-07-26 18:33:59 +12:00
|
|
|
case Calculate ⇒
|
2011-04-02 15:22:38 +02:00
|
|
|
// schedule work
|
2011-07-26 18:33:59 +12:00
|
|
|
for (arg ← 0 until nrOfMessages) router ! Work(arg, nrOfElements)
|
2011-04-02 15:22:38 +02:00
|
|
|
|
|
|
|
|
//Assume the gathering behavior
|
2011-10-22 16:06:20 +02:00
|
|
|
this become gather(sender)
|
2011-04-02 15:22:38 +02:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// phase 2, aggregate the results of the Calculation
|
2011-10-22 16:06:20 +02:00
|
|
|
def gather(recipient: ActorRef): Receive = {
|
2011-07-26 18:33:59 +12:00
|
|
|
case Result(value) ⇒
|
2011-04-02 15:22:38 +02:00
|
|
|
// handle result from the worker
|
|
|
|
|
pi += value
|
|
|
|
|
nrOfResults += 1
|
|
|
|
|
if (nrOfResults == nrOfMessages) {
|
|
|
|
|
// send the pi result back to the guy who started the calculation
|
|
|
|
|
recipient ! pi
|
|
|
|
|
// shut ourselves down, we're done
|
2011-04-12 10:53:56 +02:00
|
|
|
self.stop()
|
2011-04-02 15:22:38 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// message handler starts at the scattering behavior
|
|
|
|
|
def receive = scatter
|
|
|
|
|
|
|
|
|
|
// when we are stopped, stop our team of workers and our router
|
2011-04-26 20:31:08 +02:00
|
|
|
override def postStop() {
|
2011-04-02 15:22:38 +02:00
|
|
|
// send a PoisonPill to all workers telling them to shut down themselves
|
|
|
|
|
router ! Broadcast(PoisonPill)
|
|
|
|
|
// send a PoisonPill to the router, telling him to shut himself down
|
|
|
|
|
router ! PoisonPill
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ==================
|
|
|
|
|
// ===== Run it =====
|
|
|
|
|
// ==================
|
|
|
|
|
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
|
|
|
|
|
// create the master
|
2011-10-18 17:56:23 +02:00
|
|
|
val master = app.actorOf(new Master(nrOfWorkers, nrOfElements, nrOfMessages))
|
2011-04-02 15:22:38 +02:00
|
|
|
|
|
|
|
|
//start the calculation
|
|
|
|
|
val start = now
|
|
|
|
|
|
|
|
|
|
//send calculate message
|
2011-06-28 15:20:31 -06:00
|
|
|
master.?(Calculate, Timeout(60000)).
|
2011-07-26 17:58:37 -06:00
|
|
|
await.resultOrException match { //wait for the result, with a 60 seconds timeout
|
|
|
|
|
case Some(pi) ⇒
|
2011-10-27 12:23:01 +02:00
|
|
|
log.info("\n\tPi estimate: \t\t{}\n\tCalculation time: \t{} millis", pi, now - start)
|
2011-07-26 18:33:59 +12:00
|
|
|
case None ⇒
|
2011-10-27 12:23:01 +02:00
|
|
|
log.error("Pi calculation did not complete within the timeout.")
|
2011-04-02 15:22:38 +02:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|