diff --git a/akka-docs/intro/getting-started-first-scala.rst b/akka-docs/intro/getting-started-first-scala.rst index 8759343a2e..aebd410e2d 100644 --- a/akka-docs/intro/getting-started-first-scala.rst +++ b/akka-docs/intro/getting-started-first-scala.rst @@ -262,7 +262,7 @@ computation, creating a set of ``Worker`` actors. Then it splits up the work into discrete chunks, and sends these chunks to the different workers in a round-robin fashion. The master waits until all the workers have completed their work and sent back results for aggregation. When computation is completed the -master prints out the result, shuts down all workers and then itself. +master sends the result to the ``Listener``, which prints out the result. With this in mind, let's now create the messages that we want to have flowing in the system. We need three different messages: @@ -272,6 +272,9 @@ the system. We need three different messages: the work assignment - ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor containing the result from the worker's calculation +- ``PiEstimate`` -- sent from the ``Master`` actor to the + ``Listener`` actor containing the the final pi result and how long time + the calculation took Messages sent to actors should always be immutable to avoid sharing mutable state. In scala we have 'case classes' which make excellent messages. So let's @@ -330,19 +333,8 @@ Here is the master actor: A couple of things are worth explaining further. -First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the -``Master`` actor. This latch is only used for plumbing (in this specific -tutorial), to have a simple way of letting the outside world knowing when the -master can deliver the result and shut down. In more idiomatic Akka code -we would not use a latch but other abstractions and functions like ``Future`` -and ``?`` to achieve the same thing in a non-blocking way. -But for simplicity let's stick to a ``CountDownLatch`` for now. - -Second, we are adding a couple of life-cycle callback methods; ``preStart`` and -``postStop``. In the ``preStart`` callback we are recording the time when the -actor is started and in the ``postStop`` callback we are printing out the result -(the approximation of Pi) and the time it took to calculate it. In this call we -also invoke ``latch.countDown()`` to tell the outside world that we are done. +Note that we are passing in a ``ActorRef`` to the ``Master`` actor. This is used to +report the the final result to the outside world. But we are not done yet. We are missing the message handler for the ``Master`` actor. This message handler needs to be able to react to two different messages: @@ -355,7 +347,8 @@ The ``Calculate`` handler is sending out work to all the ``Worker`` via its rout The ``Result`` handler gets the value from the ``Result`` message and aggregates it to our ``pi`` member variable. We also keep track of how many results we have received back, and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and -invokes the ``self.stop()`` method to stop itself *and* all its supervised actors. +sends the final result to the ``listener``. When done it also invokes the ``context.stop(self)`` +method to stop itself *and* all its supervised actors. In this case it has one supervised actor, the router, and this in turn has ``nrOfWorkers`` supervised actors. All of them will be stopped automatically as the invocation of any supervisor's ``stop`` method will propagate down to all its supervised 'children'. @@ -365,6 +358,14 @@ Let's capture this in code: .. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#master-receive +Creating the result listener +============================ + +The listener is straightforward. When it receives the ``PiEstimate`` from the ``Master`` it +prints the result and shuts down the ``ActorSystem``. + +.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#result-listener + Bootstrap the calculation ========================= @@ -380,9 +381,9 @@ start up the ``Master`` actor and wait for it to finish: .. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#app :exclude: actors-and-messages -As you can see the *calculate* method above it creates an ActorSystem and this is the Akka container which +As you can see the *calculate* method above it creates an ``ActorSystem`` and this is the Akka container which will contain all actors created in that "context". An example of how to create actors in the container -is the *'system.actorOf(...)'* line in the calculate method. In this case we create a top level actor. +is the *'system.actorOf(...)'* line in the calculate method. In this case we create two top level actors. If you instead where in an actor context, i.e. inside an actor creating other actors, you should use *context.actorOf(...)*. This is illustrated in the Master code above. diff --git a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala index 9b73b7620d..9e1130915a 100644 --- a/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala +++ b/akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala @@ -4,9 +4,10 @@ package akka.tutorial.first.scala //#imports -import java.util.concurrent.CountDownLatch import akka.actor._ import akka.routing._ +import akka.util.Duration +import akka.util.duration._ //#imports //#app @@ -20,6 +21,7 @@ object Pi extends App { case object Calculate extends PiMessage case class Work(start: Int, nrOfElements: Int) extends PiMessage case class Result(value: Double) extends PiMessage + case class PiEstimate(pi: Double, duration: Duration) //#messages //#worker @@ -42,17 +44,16 @@ object Pi extends App { //#worker //#master - class Master( - nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch) + class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef) extends Actor { var pi: Double = _ var nrOfResults: Int = _ - var start: Long = _ + val start: Long = System.currentTimeMillis //#create-router val router = context.actorOf( - Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), "pi") + Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workers") //#create-router //#master-receive @@ -63,45 +64,47 @@ object Pi extends App { case Result(value) ⇒ pi += value nrOfResults += 1 - // Stops this actor and all its supervised children - if (nrOfResults == nrOfMessages) context.stop(self) + if (nrOfResults == nrOfMessages) { + // Send the result to the listener + listener ! PiEstimate(pi, duration = (System.currentTimeMillis - start).millis) + // Stops this actor and all its supervised children + context.stop(self) + } //#handle-messages } //#master-receive - override def preStart() { - start = System.currentTimeMillis - } - - override def postStop() { - println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis" - .format(pi, (System.currentTimeMillis - start))) - latch.countDown() - } } //#master + + //#result-listener + class Listener extends Actor { + def receive = { + case PiEstimate(pi, duration) ⇒ + println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s" + .format(pi, duration)) + context.system.shutdown() + } + } + //#result-listener + //#actors-and-messages def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) { // Create an Akka system val system = ActorSystem("PiSystem") - // this latch is only plumbing to know when the calculation is completed - val latch = new CountDownLatch(1) + // create the result listener, which will print the result and shutdown the system + val listener = system.actorOf(Props[Listener]) // create the master val master = system.actorOf(Props(new Master( - nrOfWorkers, nrOfMessages, nrOfElements, latch)), - "master") + nrOfWorkers, nrOfMessages, nrOfElements, listener)), + name = "master") // start the calculation master ! Calculate - // wait for master to shut down - latch.await() - - // Shut down the system - system.shutdown() } } //#app