From 2e70b4a1b5ad25e4acc78b3bd0fa31ebffe215a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bone=CC=81r?= Date: Sat, 2 Apr 2011 14:01:36 +0200 Subject: [PATCH] Simplified the Master/Worker interaction in first tutorial --- .../src/main/scala/Pi.scala | 43 +++++++++---------- 1 file changed, 20 insertions(+), 23 deletions(-) diff --git a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala index 658658545c..dd859ce0dc 100644 --- a/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-pi-sbt/src/main/scala/Pi.scala @@ -32,31 +32,35 @@ import java.util.concurrent.CountDownLatch * @author Jonas Bonér */ object Pi { - val nrOfWorkers = 4 - val nrOfMessages = 10000 - val nrOfElements = 10000 // ==================== // ===== Messages ===== // ==================== sealed trait PiMessage - case class Calculate(nrOfMessages: Int, nrOfElements: Int) extends PiMessage - case class Work(arg: Int, fun: (Int) => Double) extends PiMessage + case object Calculate extends PiMessage + case class Work(arg: Int, nrOfElements: Int) extends PiMessage case class Result(value: Double) extends PiMessage // ================== // ===== Worker ===== // ================== - class Worker extends Actor { + class Worker() extends Actor { + // define the work + val calculatePiFor = (arg: Int, nrOfElements: Int) => { + val range = (arg * nrOfElements) to ((arg + 1) * nrOfElements - 1) + range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum + } + def receive = { - case Work(arg, fun) => self reply Result(fun(arg)) + case Work(arg, nrOfElements) => + self reply Result(calculatePiFor(arg, nrOfElements)) // perform the work } } // ================== // ===== Master ===== // ================== - class Master(latch: CountDownLatch) extends Actor { + class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) extends Actor { var pi: Double = _ var nrOfResults: Int = _ var start: Long = _ @@ -71,17 +75,11 @@ object Pi { // wrap them with a load-balancing router val router = Routing.loadBalancerActor(CyclicIterator(workers)).start - // define the work - val algorithm = (i: Int) => { - val range = (i * nrOfElements) to ((i + 1) * nrOfElements - 1) - range map (j => 4 * math.pow(-1, j) / (2 * j + 1)) sum - } - // message handler def receive = { - case Calculate(nrOfMessages, nrOfElements) => + case Calculate => // schedule work - for (arg <- 0 until nrOfMessages) router ! Work(arg, algorithm) + for (arg <- 0 until nrOfMessages) router ! Work(arg, nrOfElements) // send a PoisonPill to all workers telling them to shut down themselves router ! Broadcast(PoisonPill) @@ -106,21 +104,20 @@ object Pi { // ===== Run it ===== // ================== def calculate = { + val nrOfWorkers = 4 + val nrOfMessages = 10000 + val nrOfElements = 10000 + // this latch is only plumbing to know when the calculation is completed val latch = new CountDownLatch(1) // create the master - val master = actorOf(new Master(latch)).start + val master = actorOf(new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch)).start // start the calculation - master ! Calculate(nrOfMessages, nrOfElements) + master ! Calculate // wait for master to shut down latch.await } } - -// To be able to run it as a main application -object Main extends App { - Pi.calculate -}