Improved java version of pi example also. See #1729

This commit is contained in:
Patrik Nordwall 2012-01-26 14:34:31 +01:00
parent c64b73004a
commit fa41cea897
5 changed files with 239 additions and 200 deletions

View file

@ -237,13 +237,24 @@ e.g. in ``$AKKA_HOME/tutorial/akka/tutorial/first/java/Pi.java``.
Creating the messages
---------------------
The design we are aiming for is to have one ``Master`` actor initiating the computation, creating a set of ``Worker`` actors. Then it splits up the work into discrete chunks, and sends these chunks to the different workers in a round-robin fashion. The master waits until all the workers have completed their work and sent back results for aggregation. When computation is completed the master prints out the result, shuts down all workers and then itself.
The design we are aiming for is to have one ``Master`` actor initiating the
computation, creating a set of ``Worker`` actors. Then it splits up the work
into discrete chunks, and sends these chunks to the different workers in a
round-robin fashion. The master waits until all the workers have completed their
work and sent back results for aggregation. When computation is completed the
master sends the result to the ``Listener``, which prints out the result.
With this in mind, let's now create the messages that we want to have flowing in the system. We need three different messages:
With this in mind, let's now create the messages that we want to have flowing in
the system. We need four different messages:
- ``Calculate`` -- sent to the ``Master`` actor to start the calculation
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing the work assignment
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor containing the result from the worker's calculation
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing
the work assignment
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor
containing the result from the worker's calculation
- ``PiApproximation`` -- sent from the ``Master`` actor to the
``Listener`` actor containing the the final pi result and how long time
the calculation took
Messages sent to actors should always be immutable to avoid sharing mutable state. So let's start by creating three messages as immutable POJOs. We also create a wrapper ``Pi`` class to hold our implementation:
@ -285,19 +296,8 @@ Here is the master actor:
A couple of things are worth explaining further.
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the
``Master`` actor. This latch is only used for plumbing (in this specific
tutorial), to have a simple way of letting the outside world knowing when the
master can deliver the result and shut down. In more idiomatic Akka code
we would not use a latch but other abstractions and functions like ``Future``
and ``ask()`` to achieve the same thing in a non-blocking way.
But for simplicity let's stick to a ``CountDownLatch`` for now.
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and
``postStop``. In the ``preStart`` callback we are recording the time when the
actor is started and in the ``postStop`` callback we are printing out the result
(the approximation of Pi) and the time it took to calculate it. In this call we
also invoke ``latch.countDown()`` to tell the outside world that we are done.
Note that we are passing in a ``ActorRef`` to the ``Master`` actor. This is used to
report the the final result to the outside world.
But we are not done yet. We are missing the message handler for the ``Master`` actor.
This message handler needs to be able to react to two different messages:
@ -310,15 +310,25 @@ The ``Calculate`` handler is sending out work to all the ``Worker`` via its rout
The ``Result`` handler gets the value from the ``Result`` message and aggregates it to
our ``pi`` member variable. We also keep track of how many results we have received back,
and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and
invokes the ``self.stop()`` method to stop itself *and* all its supervised actors.
sends the final result to the ``listener``. When done it also invokes the ``getContext().stop(getSelf())``
method to stop itself *and* all its supervised actors.
In this case it has one supervised actor, the router, and this in turn has ``nrOfWorkers`` supervised actors.
All of them will be stopped automatically as the invocation of any supervisor's ``stop`` method
will propagate down to all its supervised 'children'.
Let's capture this in code:
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#master-receive
Creating the result listener
----------------------------
The listener is straightforward. When it receives the ``PiApproximation`` from the ``Master`` it
prints the result and shuts down the ``ActorSystem``.
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#result-listener
Bootstrap the calculation
-------------------------
@ -329,11 +339,11 @@ invoke method ``calculate`` in which we start up the ``Master`` actor and wait f
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#app
:exclude: actors-and-messages
As you can see the *calculate* method above it creates an ActorSystem and this is the Akka container which
As you can see the *calculate* method above it creates an ``ActorSystem`` and this is the Akka container which
will contain all actors created in that "context". An example of how to create actors in the container
is the *'system.actorOf(...)'* line in the calculate method. In this case we create a top level actor.
is the *'system.actorOf(...)'* line in the calculate method. In this case we create two top level actors.
If you instead where in an actor context, i.e. inside an actor creating other actors, you should use
*this.getContext.actorOf(...)*. This is illustrated in the Master code above.
*getContext().actorOf(...)*. This is illustrated in the Master code above.
That's it. Now we are done.
@ -365,8 +375,8 @@ we compiled ourselves::
-cp lib/scala-library.jar:lib/akka/akka-actor-2.0-SNAPSHOT.jar:. \
akka.tutorial.java.first.Pi
Pi estimate: 3.1435501812459323
Calculation time: 609 millis
Pi approximation: 3.1435501812459323
Calculation time: 359 millis
Yippee! It is working.
@ -382,8 +392,8 @@ When this in done we can run our application directly inside Maven::
$ mvn exec:java -Dexec.mainClass="akka.tutorial.first.java.Pi"
...
Pi estimate: 3.1435501812459323
Calculation time: 597 millis
Pi approximation: 3.1435501812459323
Calculation time: 359 millis
Yippee! It is working.

View file

@ -268,11 +268,16 @@ We start by creating case classes for each type of message in our application, s
call it ``PiMessage``. Right click on the package and choose ``New Scala Class``, and enter ``PiMessage`` as
the name of the class.
We need three different messages:
We need four different messages:
- ``Calculate`` -- sent to the ``Master`` actor to start the calculation
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing the work assignment
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor containing the result from the worker's calculation
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing
the work assignment
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor
containing the result from the worker's calculation
- ``PiApproximation`` -- sent from the ``Master`` actor to the
``Listener`` actor containing the the final pi result and how long time
the calculation took
Messages sent to actors should always be immutable to avoid sharing mutable state.
In Scala we have 'case classes' which make excellent messages. So let's start by creating three messages as case classes.
@ -343,19 +348,8 @@ Here is the master actor:
A couple of things are worth explaining further.
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the
``Master`` actor. This latch is only used for plumbing (in this specific
tutorial), to have a simple way of letting the outside world knowing when the
master can deliver the result and shut down. In more idiomatic Akka code
we would not use a latch but other abstractions and functions like ``Future``
and ``?`` to achieve the same thing in a non-blocking way.
But for simplicity let's stick to a ``CountDownLatch`` for now.
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and
``postStop``. In the ``preStart`` callback we are recording the time when the
actor is started and in the ``postStop`` callback we are printing out the result
(the approximation of Pi) and the time it took to calculate it. In this call we
also invoke ``latch.countDown()`` to tell the outside world that we are done.
Note that we are passing in a ``ActorRef`` to the ``Master`` actor. This is used to
report the the final result to the outside world.
But we are not done yet. We are missing the message handler for the ``Master``
actor. This message handler needs to be able to react to two different messages:
@ -368,7 +362,8 @@ The ``Calculate`` handler is sending out work to all the ``Worker`` via its rout
The ``Result`` handler gets the value from the ``Result`` message and aggregates it to
our ``pi`` member variable. We also keep track of how many results we have received back,
and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and
invokes the ``self.stop()`` method to stop itself *and* all its supervised actors.
sends the final result to the ``listener``. When done it also invokes the ``context.stop(self)``
method to stop itself *and* all its supervised actors.
In this case it has one supervised actor, the router, and this in turn has ``nrOfWorkers`` supervised actors.
All of them will be stopped automatically as the invocation of any supervisor's ``stop`` method
will propagate down to all its supervised 'children'.
@ -377,6 +372,13 @@ Let's capture this in code:
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#master-receive
Creating the result listener
----------------------------
The listener is straightforward. When it receives the ``PiApproximation`` from the ``Master`` it
prints the result and shuts down the ``ActorSystem``.
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#result-listener
Bootstrap the calculation
-------------------------
@ -391,6 +393,12 @@ We also create a method ``calculate`` in which we start up the ``Master`` actor
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#app
:exclude: actors-and-messages
As you can see the *calculate* method above it creates an ``ActorSystem`` and this is the Akka container which
will contain all actors created in that "context". An example of how to create actors in the container
is the *'system.actorOf(...)'* line in the calculate method. In this case we create two top level actors.
If you instead where in an actor context, i.e. inside an actor creating other actors, you should use
*context.actorOf(...)*. This is illustrated in the Master code above.
That's it. Now we are done.
Run it from Eclipse
@ -401,8 +409,8 @@ If not, bring you project up to date by clicking ``Project/Build Project``. If t
you can right-click in the editor where ``Pi`` is defined, and choose ``Run as.. /Scala application``.
If everything works fine, you should see::
Pi estimate: 3.1435501812459323
Calculation time: 632 millis
Pi approximation: 3.1435501812459323
Calculation time: 359 millis
You can also define a new Run configuration, by going to ``Run/Run Configurations``. Create a new ``Scala application``
and choose the tutorial project and the main class to be ``akkatutorial.Pi``. You can pass additional command line

View file

@ -265,14 +265,14 @@ work and sent back results for aggregation. When computation is completed the
master sends the result to the ``Listener``, which prints out the result.
With this in mind, let's now create the messages that we want to have flowing in
the system. We need three different messages:
the system. We need four different messages:
- ``Calculate`` -- sent to the ``Master`` actor to start the calculation
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing
the work assignment
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor
containing the result from the worker's calculation
- ``PiEstimate`` -- sent from the ``Master`` actor to the
- ``PiApproximation`` -- sent from the ``Master`` actor to the
``Listener`` actor containing the the final pi result and how long time
the calculation took
@ -361,7 +361,7 @@ Let's capture this in code:
Creating the result listener
============================
The listener is straightforward. When it receives the ``PiEstimate`` from the ``Master`` it
The listener is straightforward. When it receives the ``PiApproximation`` from the ``Master`` it
prints the result and shuts down the ``ActorSystem``.
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/scala/Pi.scala#result-listener
@ -418,8 +418,8 @@ compiled ourselves::
-cp lib/scala-library.jar:lib/akka/akka-actor-2.0-SNAPSHOT.jar:. \
akka.tutorial.first.scala.Pi
Pi estimate: 3.1435501812459323
Calculation time: 553 millis
Pi approximation: 3.1435501812459323
Calculation time: 359 millis
Yippee! It is working.
@ -437,8 +437,8 @@ When this in done we can run our application directly inside SBT::
> run
...
Pi estimate: 3.1435501812459323
Calculation time: 531 millis
Pi approximation: 3.1435501812459323
Calculation time: 359 millis
Yippee! It is working.

View file

@ -5,172 +5,193 @@
package akka.tutorial.first.java;
//#imports
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.InternalActorRef;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.actor.UntypedActorFactory;
import akka.japi.Creator;
import akka.routing.*;
import akka.util.Timeout;
import akka.routing.RoundRobinRouter;
import akka.util.Duration;
import java.util.concurrent.TimeUnit;
import java.util.LinkedList;
import java.util.concurrent.CountDownLatch;
//#imports
//#app
public class Pi {
public static void main(String[] args) throws Exception {
Pi pi = new Pi();
pi.calculate(4, 10000, 10000);
public static void main(String[] args) {
Pi pi = new Pi();
pi.calculate(4, 10000, 10000);
}
//#actors-and-messages
//#messages
static class Calculate {
}
static class Work {
private final int start;
private final int nrOfElements;
public Work(int start, int nrOfElements) {
this.start = start;
this.nrOfElements = nrOfElements;
}
//#actors-and-messages
//#messages
static class Calculate {
public int getStart() {
return start;
}
static class Work {
private final int start;
private final int nrOfElements;
public int getNrOfElements() {
return nrOfElements;
}
}
public Work(int start, int nrOfElements) {
this.start = start;
this.nrOfElements = nrOfElements;
}
static class Result {
private final double value;
public int getStart() {
return start;
}
public int getNrOfElements() {
return nrOfElements;
}
public Result(double value) {
this.value = value;
}
static class Result {
private final double value;
public Result(double value) {
this.value = value;
}
public double getValue() {
return value;
}
public double getValue() {
return value;
}
//#messages
}
//#worker
public static class Worker extends UntypedActor {
static class PiApproximation {
private final double pi;
private final Duration duration;
//#calculatePiFor
private double calculatePiFor(int start, int nrOfElements) {
double acc = 0.0;
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
return acc;
}
//#calculatePiFor
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
getSender().tell(new Result(result));
} else {
throw new IllegalArgumentException("Unknown message [" + message + "]");
}
}
public PiApproximation(double pi, Duration duration) {
this.pi = pi;
this.duration = duration;
}
//#worker
//#master
public static class Master extends UntypedActor {
private final int nrOfMessages;
private final int nrOfElements;
private final CountDownLatch latch;
private double pi;
private int nrOfResults;
private long start;
private ActorRef router;
public Master(final int nrOfWorkers, int nrOfMessages,
int nrOfElements, CountDownLatch latch) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.latch = latch;
//#create-router
router = this.getContext().actorOf(
new Props(Worker.class).withRouter(new RoundRobinRouter(nrOfWorkers)),
"pi");
//#create-router
}
//#master-receive
public void onReceive(Object message) {
//#handle-messages
if (message instanceof Calculate) {
for (int start = 0; start < nrOfMessages; start++) {
router.tell(new Work(start, nrOfElements), getSelf());
}
} else if (message instanceof Result) {
Result result = (Result) message;
pi += result.getValue();
nrOfResults += 1;
if (nrOfResults == nrOfMessages) getContext().stop(getSelf());
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
//#handle-messages
}
//#master-receive
@Override
public void preStart() {
start = System.currentTimeMillis();
}
@Override
public void postStop() {
System.out.println(String.format(
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
pi, (System.currentTimeMillis() - start)));
latch.countDown();
}
public double getPi() {
return pi;
}
//#master
//#actors-and-messages
public void calculate(final int nrOfWorkers,
final int nrOfElements,
final int nrOfMessages)
throws Exception {
// Create an Akka system
final ActorSystem system = ActorSystem.create();
// this latch is only plumbing to know when the calculation is completed
final CountDownLatch latch = new CountDownLatch(1);
// create the master
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
}
}));
// start the calculation
master.tell(new Calculate());
// wait for master to shut down
latch.await();
// Shut down the system
system.shutdown();
public Duration getDuration() {
return duration;
}
}
//#messages
//#worker
public static class Worker extends UntypedActor {
//#calculatePiFor
private double calculatePiFor(int start, int nrOfElements) {
double acc = 0.0;
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
}
return acc;
}
//#calculatePiFor
public void onReceive(Object message) {
if (message instanceof Work) {
Work work = (Work) message;
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
getSender().tell(new Result(result), getSelf());
} else {
unhandled(message);
}
}
}
//#worker
//#master
public static class Master extends UntypedActor {
private final int nrOfMessages;
private final int nrOfElements;
private double pi;
private int nrOfResults;
private final long start = System.currentTimeMillis();
private final ActorRef listener;
private final ActorRef workerRouter;
public Master(final int nrOfWorkers, int nrOfMessages, int nrOfElements, ActorRef listener) {
this.nrOfMessages = nrOfMessages;
this.nrOfElements = nrOfElements;
this.listener = listener;
//#create-router
workerRouter = this.getContext().actorOf(new Props(Worker.class).withRouter(new RoundRobinRouter(nrOfWorkers)),
"workerRouter");
//#create-router
}
//#master-receive
public void onReceive(Object message) {
//#handle-messages
if (message instanceof Calculate) {
for (int start = 0; start < nrOfMessages; start++) {
workerRouter.tell(new Work(start, nrOfElements), getSelf());
}
} else if (message instanceof Result) {
Result result = (Result) message;
pi += result.getValue();
nrOfResults += 1;
if (nrOfResults == nrOfMessages) {
// Send the result to the listener
Duration duration = Duration.create(System.currentTimeMillis() - start, TimeUnit.MILLISECONDS);
listener.tell(new PiApproximation(pi, duration), getSelf());
// Stops this actor and all its supervised children
getContext().stop(getSelf());
}
} else {
unhandled(message);
}
//#handle-messages
}
//#master-receive
}
//#master
//#result-listener
public static class Listener extends UntypedActor {
public void onReceive(Object message) {
if (message instanceof PiApproximation) {
PiApproximation approximation = (PiApproximation) message;
System.out.println(String.format("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s",
approximation.getPi(), approximation.getDuration()));
getContext().system().shutdown();
} else {
unhandled(message);
}
}
}
//#result-listener
//#actors-and-messages
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages) {
// Create an Akka system
ActorSystem system = ActorSystem.create("PiSystem");
// create the result listener, which will print the result and shutdown the system
final ActorRef listener = system.actorOf(new Props(Listener.class));
// create the master
ActorRef master = system.actorOf(new Props(new UntypedActorFactory() {
public UntypedActor create() {
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, listener);
}
}), "master");
// start the calculation
master.tell(new Calculate());
}
}
//#app

View file

@ -5,7 +5,7 @@ package akka.tutorial.first.scala
//#imports
import akka.actor._
import akka.routing._
import akka.routing.RoundRobinRouter
import akka.util.Duration
import akka.util.duration._
//#imports
@ -21,7 +21,7 @@ object Pi extends App {
case object Calculate extends PiMessage
case class Work(start: Int, nrOfElements: Int) extends PiMessage
case class Result(value: Double) extends PiMessage
case class PiEstimate(pi: Double, duration: Duration)
case class PiApproximation(pi: Double, duration: Duration)
//#messages
//#worker
@ -52,21 +52,21 @@ object Pi extends App {
val start: Long = System.currentTimeMillis
//#create-router
val router = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workers")
val workerRouter = context.actorOf(
Props[Worker].withRouter(RoundRobinRouter(nrOfWorkers)), name = "workerRouter")
//#create-router
//#master-receive
def receive = {
//#handle-messages
case Calculate
for (i 0 until nrOfMessages) router ! Work(i * nrOfElements, nrOfElements)
for (i 0 until nrOfMessages) workerRouter ! Work(i * nrOfElements, nrOfElements)
case Result(value)
pi += value
nrOfResults += 1
if (nrOfResults == nrOfMessages) {
// Send the result to the listener
listener ! PiEstimate(pi, duration = (System.currentTimeMillis - start).millis)
listener ! PiApproximation(pi, duration = (System.currentTimeMillis - start).millis)
// Stops this actor and all its supervised children
context.stop(self)
}
@ -80,8 +80,8 @@ object Pi extends App {
//#result-listener
class Listener extends Actor {
def receive = {
case PiEstimate(pi, duration)
println("\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s"
case PiApproximation(pi, duration)
println("\n\tPi approximation: \t\t%s\n\tCalculation time: \t%s"
.format(pi, duration))
context.system.shutdown()
}