From c14c01a53ce1b99b03a85436f3128bf5575e069c Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Sat, 2 Apr 2011 15:22:38 +0200 Subject: [PATCH] Adding Pi2, fixing router shutdown in Pi, cleaning up the generation of workers in Pi --- .../src/main/scala/Pi.scala | 9 +- .../src/main/scala/Pi2.scala | 131 ++++++++++++++++++ 2 files changed, 135 insertions(+), 5 deletions(-) create mode 100644 akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi2.scala diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala index dd859ce0dc..ebd2e20ce0 100644 --- a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala @@ -66,11 +66,7 @@ object Pi { var start: Long = _ // create the workers - val workers = { - var ws = Vector[ActorRef]() - for (i <- 0 until nrOfWorkers) ws = ws :+ actorOf[Worker].start - ws - } + val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start) // wrap them with a load-balancing router val router = Routing.loadBalancerActor(CyclicIterator(workers)).start @@ -84,6 +80,9 @@ object Pi { // send a PoisonPill to all workers telling them to shut down themselves router ! Broadcast(PoisonPill) + // send a PoisonPill to the router, telling him to shut himself down + router ! PoisonPill + case Result(value) => // handle result from the worker pi += value diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi2.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi2.scala new file mode 100644 index 0000000000..7bbcba96d5 --- /dev/null +++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi2.scala @@ -0,0 +1,131 @@ +/** + * Copyright (C) 2009-2011 Scalable Solutions AB + */ + +package akka.tutorial.sbt.pi + +import akka.actor.Actor._ +import akka.routing.{Routing, CyclicIterator} +import Routing._ +import akka.event.EventHandler +import System.{currentTimeMillis => now} +import akka.actor.{Channel, Actor, PoisonPill} +import akka.dispatch.Future + +/** + * Sample for Akka, SBT an Scala tutorial. + *

+ * Calculates Pi. + *

+ * Run it in SBT: + *

+ *   $ sbt
+ *   > update
+ *   > console
+ *   > akka.tutorial.sbt.pi.Pi.calculate
+ *   > ...
+ *   > :quit
+ * 
+ * + * @author Jonas Bonér + */ +object Pi2 { + + // ==================== + // ===== Messages ===== + // ==================== + sealed trait PiMessage + case object Calculate extends PiMessage + case class Work(arg: Int, nrOfElements: Int) extends PiMessage + case class Result(value: Double) extends PiMessage + + // ================== + // ===== Worker ===== + // ================== + class Worker() extends Actor { + // define the work + val calculatePiFor = (arg: Int, nrOfElements: Int) => { + val range = (arg * nrOfElements) to ((arg + 1) * nrOfElements - 1) + range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum + } + + def receive = { + case Work(arg, nrOfElements) => + self reply Result(calculatePiFor(arg, nrOfElements)) // perform the work + } + } + + // ================== + // ===== Master ===== + // ================== + case class Master(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) extends Actor { + var pi: Double = _ + var nrOfResults: Int = _ + + // create the workers + val workers = Vector.fill(nrOfWorkers)(actorOf[Worker].start) + + // wrap them with a load-balancing router + val router = Routing.loadBalancerActor(CyclicIterator(workers)).start + + // phase 1, can accept a Calculate message + def scatter: Receive = { + case Calculate => + // schedule work + for (arg <- 0 until nrOfMessages) router ! Work(arg, nrOfElements) + + //Assume the gathering behavior + this become gather(self.channel) + } + + // phase 2, aggregate the results of the Calculation + def gather(recipient: Channel[Any]): Receive = { + case Result(value) => + // handle result from the worker + pi += value + nrOfResults += 1 + if (nrOfResults == nrOfMessages) { + // send the pi result back to the guy who started the calculation + recipient ! pi + // shut ourselves down, we're done + self.stop + } + } + + // message handler starts at the scattering behavior + def receive = scatter + + // when we are stopped, stop our team of workers and our router + override def postStop { + // send a PoisonPill to all workers telling them to shut down themselves + router ! Broadcast(PoisonPill) + // send a PoisonPill to the router, telling him to shut himself down + router ! PoisonPill + } + } + + // ================== + // ===== Run it ===== + // ================== + def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { + // create the master + val master = actorOf(new Master(nrOfWorkers, nrOfElements, nrOfMessages)).start + + //start the calculation + val start = now + + //send calculate message + master.!!![Double](Calculate, timeout = 60000). + await.resultOrException match {//wait for the result, with a 60 seconds timeout + case Some(pi) => + EventHandler.info(this, "\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis".format(pi, (now - start))) + case None => + EventHandler.error(this, "Pi calculation did not complete within the timeout.") + } + } +} + +// To be able to run it as a main application +object Main extends App { + Pi2.calculate(nrOfWorkers = 4, nrOfElements = 10000, nrOfMessages = 10000) +}