DOC: Improved Pi example. Removed latch and return result to listener actor. See #1729
This commit is contained in:
parent
5f4d566a8c
commit
c64b73004a
2 changed files with 46 additions and 42 deletions
|
|
@ -262,7 +262,7 @@ computation, creating a set of ``Worker`` actors. Then it splits up the work
|
||||||
into discrete chunks, and sends these chunks to the different workers in a
|
into discrete chunks, and sends these chunks to the different workers in a
|
||||||
round-robin fashion. The master waits until all the workers have completed their
|
round-robin fashion. The master waits until all the workers have completed their
|
||||||
work and sent back results for aggregation. When computation is completed the
|
work and sent back results for aggregation. When computation is completed the
|
||||||
master prints out the result, shuts down all workers and then itself.
|
master sends the result to the ``Listener``, which prints out the result.
|
||||||
|
|
||||||
With this in mind, let's now create the messages that we want to have flowing in
|
With this in mind, let's now create the messages that we want to have flowing in
|
||||||
the system. We need three different messages:
|
the system. We need three different messages:
|
||||||
|
|
@ -272,6 +272,9 @@ the system. We need three different messages:
|
||||||
the work assignment
|
the work assignment
|
||||||
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor
|
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor
|
||||||
containing the result from the worker's calculation
|
containing the result from the worker's calculation
|
||||||
|
- ``PiEstimate`` -- sent from the ``Master`` actor to the
|
||||||
|
``Listener`` actor containing the the final pi result and how long time
|
||||||
|
the calculation took
|
||||||
|
|
||||||
Messages sent to actors should always be immutable to avoid sharing mutable
|
Messages sent to actors should always be immutable to avoid sharing mutable
|
||||||
state. In scala we have 'case classes' which make excellent messages. So let's
|
state. In scala we have 'case classes' which make excellent messages. So let's
|
||||||
|
|
@ -330,19 +333,8 @@ Here is the master actor:
|
||||||
|
|
||||||
A couple of things are worth explaining further.
|
A couple of things are worth explaining further.
|
||||||
|
|
||||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the
|
Note that we are passing in a ``ActorRef`` to the ``Master`` actor. This is used to
|
||||||
``Master`` actor. This latch is only used for plumbing (in this specific
|
report the the final result to the outside world.
|
||||||
tutorial), to have a simple way of letting the outside world knowing when the
|
|
||||||
master can deliver the result and shut down. In more idiomatic Akka code
|
|
||||||
we would not use a latch but other abstractions and functions like ``Future``
|
|
||||||
and ``?`` to achieve the same thing in a non-blocking way.
|
|
||||||
But for simplicity let's stick to a ``CountDownLatch`` for now.
|
|
||||||
|
|
||||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and
|
|
||||||
``postStop``. In the ``preStart`` callback we are recording the time when the
|
|
||||||
actor is started and in the ``postStop`` callback we are printing out the result
|
|
||||||
(the approximation of Pi) and the time it took to calculate it. In this call we
|
|
||||||
also invoke ``latch.countDown()`` to tell the outside world that we are done.
|
|
||||||
|
|
||||||
But we are not done yet. We are missing the message handler for the ``Master``
|
But we are not done yet. We are missing the message handler for the ``Master``
|
||||||
actor. This message handler needs to be able to react to two different messages:
|
actor. This message handler needs to be able to react to two different messages:
|
||||||
|
|
@ -355,7 +347,8 @@ The ``Calculate`` handler is sending out work to all the ``Worker`` via its rout
|
||||||
The ``Result`` handler gets the value from the ``Result`` message and aggregates it to
|
The ``Result`` handler gets the value from the ``Result`` message and aggregates it to
|
||||||
our ``pi`` member variable. We also keep track of how many results we have received back,
|
our ``pi`` member variable. We also keep track of how many results we have received back,
|
||||||
and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and
|
and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and
|
||||||
invokes the ``self.stop()`` method to stop itself *and* all its supervised actors.
|
sends the final result to the ``listener``. When done it also invokes the ``context.stop(self)``
|
||||||
|
method to stop itself *and* all its supervised actors.
|
||||||
In this case it has one supervised actor, the router, and this in turn has ``nrOfWorkers`` supervised actors.
|
In this case it has one supervised actor, the router, and this in turn has ``nrOfWorkers`` supervised actors.
|
||||||
All of them will be stopped automatically as the invocation of any supervisor's ``stop`` method
|
All of them will be stopped automatically as the invocation of any supervisor's ``stop`` method
|
||||||
will propagate down to all its supervised 'children'.
|
will propagate down to all its supervised 'children'.
|
||||||
|
|
@ -365,6 +358,14 @@ Let's capture this in code:
|
||||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#master-receive
|
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#master-receive
|
||||||
|
|
||||||
|
|
||||||
|
Creating the result listener
|
||||||
|
============================
|
||||||
|
|
||||||
|
The listener is straightforward. When it receives the ``PiEstimate`` from the ``Master`` it
|
||||||
|
prints the result and shuts down the ``ActorSystem``.
|
||||||
|
|
||||||
|
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#result-listener
|
||||||
|
|
||||||
Bootstrap the calculation
|
Bootstrap the calculation
|
||||||
=========================
|
=========================
|
||||||
|
|
||||||
|
|
@ -380,9 +381,9 @@ start up the ``Master`` actor and wait for it to finish:
|
||||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#app
|
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#app
|
||||||
:exclude: actors-and-messages
|
:exclude: actors-and-messages
|
||||||
|
|
||||||
As you can see the *calculate* method above it creates an ActorSystem and this is the Akka container which
|
As you can see the *calculate* method above it creates an ``ActorSystem`` and this is the Akka container which
|
||||||
will contain all actors created in that "context". An example of how to create actors in the container
|
will contain all actors created in that "context". An example of how to create actors in the container
|
||||||
is the *'system.actorOf(...)'* line in the calculate method. In this case we create a top level actor.
|
is the *'system.actorOf(...)'* line in the calculate method. In this case we create two top level actors.
|
||||||
If you instead where in an actor context, i.e. inside an actor creating other actors, you should use
|
If you instead where in an actor context, i.e. inside an actor creating other actors, you should use
|
||||||
*context.actorOf(...)*. This is illustrated in the Master code above.
|
*context.actorOf(...)*. This is illustrated in the Master code above.
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -4,9 +4,10 @@
|
||||||
package akka.tutorial.first.scala
|
package akka.tutorial.first.scala
|
||||||
|
|
||||||
//#imports
|
//#imports
|
||||||
import java.util.concurrent.CountDownLatch
|
|
||||||
import akka.actor._
|
import akka.actor._
|
||||||
import akka.routing._
|
import akka.routing._
|
||||||
|
import akka.util.Duration
|
||||||
|
import akka.util.duration._
|
||||||
//#imports
|
//#imports
|
||||||
|
|
||||||
//#app
|
//#app
|
||||||
|
|
@ -20,6 +21,7 @@ object Pi extends App {
|
||||||
case object Calculate extends PiMessage
|
case object Calculate extends PiMessage
|
||||||
case class Work(start: Int, nrOfElements: Int) extends PiMessage
|
case class Work(start: Int, nrOfElements: Int) extends PiMessage
|
||||||
case class Result(value: Double) extends PiMessage
|
case class Result(value: Double) extends PiMessage
|
||||||
|
case class PiEstimate(pi: Double, duration: Duration)
|
||||||
//#messages
|
//#messages
|
||||||
|
|
||||||
//#worker
|
//#worker
|
||||||
|
|
@ -42,17 +44,16 @@ object Pi extends App {
|
||||||
//#worker
|
//#worker
|
||||||
|
|
||||||
//#master
|
//#master
|
||||||
class Master(
|
class Master(nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, listener: ActorRef)
|
||||||
nrOfWorkers: Int, nrOfMessages: Int, nrOfElements: Int, latch: CountDownLatch)
|
|
||||||
extends Actor {
|
extends Actor {
|
||||||
|
|
||||||
var pi: Double = _
|
var pi: Double = _
|
||||||
var nrOfResults: Int = _
|
var nrOfResults: Int = _
|
||||||
var start: Long = _
|
val start: Long = System.currentTimeMillis
|
||||||
|
|
||||||
//#create-router
|
//#create-router
|
||||||
val router = context.actorOf(
|
val router = context.actorOf(
|
||||||
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), "pi")
|
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workers")
|
||||||
//#create-router
|
//#create-router
|
||||||
|
|
||||||
//#master-receive
|
//#master-receive
|
||||||
|
|
@ -63,45 +64,47 @@ object Pi extends App {
|
||||||
case Result(value) ⇒
|
case Result(value) ⇒
|
||||||
pi += value
|
pi += value
|
||||||
nrOfResults += 1
|
nrOfResults += 1
|
||||||
// Stops this actor and all its supervised children
|
if (nrOfResults == nrOfMessages) {
|
||||||
if (nrOfResults == nrOfMessages) context.stop(self)
|
// Send the result to the listener
|
||||||
|
listener ! PiEstimate(pi, duration = (System.currentTimeMillis - start).millis)
|
||||||
|
// Stops this actor and all its supervised children
|
||||||
|
context.stop(self)
|
||||||
|
}
|
||||||
//#handle-messages
|
//#handle-messages
|
||||||
}
|
}
|
||||||
//#master-receive
|
//#master-receive
|
||||||
|
|
||||||
override def preStart() {
|
|
||||||
start = System.currentTimeMillis
|
|
||||||
}
|
|
||||||
|
|
||||||
override def postStop() {
|
|
||||||
println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis"
|
|
||||||
.format(pi, (System.currentTimeMillis - start)))
|
|
||||||
latch.countDown()
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
//#master
|
//#master
|
||||||
|
|
||||||
|
//#result-listener
|
||||||
|
class Listener extends Actor {
|
||||||
|
def receive = {
|
||||||
|
case PiEstimate(pi, duration) ⇒
|
||||||
|
println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s"
|
||||||
|
.format(pi, duration))
|
||||||
|
context.system.shutdown()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//#result-listener
|
||||||
|
|
||||||
//#actors-and-messages
|
//#actors-and-messages
|
||||||
|
|
||||||
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
|
def calculate(nrOfWorkers: Int, nrOfElements: Int, nrOfMessages: Int) {
|
||||||
// Create an Akka system
|
// Create an Akka system
|
||||||
val system = ActorSystem("PiSystem")
|
val system = ActorSystem("PiSystem")
|
||||||
|
|
||||||
// this latch is only plumbing to know when the calculation is completed
|
// create the result listener, which will print the result and shutdown the system
|
||||||
val latch = new CountDownLatch(1)
|
val listener = system.actorOf(Props[Listener])
|
||||||
|
|
||||||
// create the master
|
// create the master
|
||||||
val master = system.actorOf(Props(new Master(
|
val master = system.actorOf(Props(new Master(
|
||||||
nrOfWorkers, nrOfMessages, nrOfElements, latch)),
|
nrOfWorkers, nrOfMessages, nrOfElements, listener)),
|
||||||
"master")
|
name = "master")
|
||||||
|
|
||||||
// start the calculation
|
// start the calculation
|
||||||
master ! Calculate
|
master ! Calculate
|
||||||
|
|
||||||
// wait for master to shut down
|
|
||||||
latch.await()
|
|
||||||
|
|
||||||
// Shut down the system
|
|
||||||
system.shutdown()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
//#app
|
//#app
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue