Updated introduction documents to Akka 2.0. Fixes #1480
This commit is contained in:
parent
66e7155ef1
commit
49e350a815
9 changed files with 327 additions and 744 deletions
|
|
@ -167,7 +167,8 @@ It should now look something like this:
|
|||
<?xml version="1.0" encoding="UTF-8"?>
|
||||
<project xmlns="http://maven.apache.org/POM/4.0.0"
|
||||
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
|
||||
http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<modelVersion>4.0.0</modelVersion>
|
||||
|
||||
<name>akka-tutorial-first-java</name>
|
||||
|
|
@ -213,28 +214,15 @@ Start writing the code
|
|||
|
||||
Now it's about time to start hacking.
|
||||
|
||||
We start by creating a ``Pi.java`` file and adding these import statements at the top of the file::
|
||||
We start by creating a ``Pi.java`` file and adding these import statements at the top of the file:
|
||||
|
||||
package akka.tutorial.first.java;
|
||||
|
||||
import static akka.actor.Actors.actorOf;
|
||||
import static akka.actor.Actors.poisonPill;
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import akka.actor.Props;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
import akka.routing.CyclicIterator;
|
||||
import akka.routing.InfiniteIterator;
|
||||
import akka.routing.Routing.Broadcast;
|
||||
import akka.routing.UntypedLoadBalancer;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#imports
|
||||
|
||||
If you are using Maven in this tutorial then create the file in the ``src/main/java/akka/tutorial/first/java`` directory.
|
||||
|
||||
If you are using the command line tools then create the file wherever you want. I will create it in a directory called ``tutorial`` at the root of the Akka distribution, e.g. in ``$AKKA_HOME/tutorial/akka/tutorial/first/java/Pi.java``.
|
||||
If you are using the command line tools then create the file wherever you want.
|
||||
We will create it in a directory called ``tutorial`` at the root of the Akka distribution,
|
||||
e.g. in ``$AKKA_HOME/tutorial/akka/tutorial/first/java/Pi.java``.
|
||||
|
||||
Creating the messages
|
||||
---------------------
|
||||
|
|
@ -247,466 +235,101 @@ With this in mind, let's now create the messages that we want to have flowing in
|
|||
- ``Work`` -- sent from the ``Master`` actor to the ``Worker`` actors containing the work assignment
|
||||
- ``Result`` -- sent from the ``Worker`` actors to the ``Master`` actor containing the result from the worker's calculation
|
||||
|
||||
Messages sent to actors should always be immutable to avoid sharing mutable state. So let's start by creating three messages as immutable POJOs. We also create a wrapper ``Pi`` class to hold our implementation::
|
||||
Messages sent to actors should always be immutable to avoid sharing mutable state. So let's start by creating three messages as immutable POJOs. We also create a wrapper ``Pi`` class to hold our implementation:
|
||||
|
||||
public class Pi {
|
||||
|
||||
static class Calculate {}
|
||||
|
||||
static class Work {
|
||||
private final int start;
|
||||
private final int nrOfElements;
|
||||
|
||||
public Work(int start, int nrOfElements) {
|
||||
this.start = start;
|
||||
this.nrOfElements = nrOfElements;
|
||||
}
|
||||
|
||||
public int getStart() { return start; }
|
||||
public int getNrOfElements() { return nrOfElements; }
|
||||
}
|
||||
|
||||
static class Result {
|
||||
private final double value;
|
||||
|
||||
public Result(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public double getValue() { return value; }
|
||||
}
|
||||
}
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#messages
|
||||
|
||||
Creating the worker
|
||||
-------------------
|
||||
|
||||
Now we can create the worker actor. This is done by extending in the ``UntypedActor`` base class and defining the ``onReceive`` method. The ``onReceive`` method defines our message handler. We expect it to be able to handle the ``Work`` message so we need to add a handler for this message::
|
||||
Now we can create the worker actor. This is done by extending in the ``UntypedActor`` base class and defining the ``onReceive`` method. The ``onReceive`` method defines our message handler. We expect it to be able to handle the ``Work`` message so we need to add a handler for this message:
|
||||
|
||||
static class Worker extends UntypedActor {
|
||||
|
||||
// message handler
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof Work) {
|
||||
Work work = (Work) message;
|
||||
|
||||
// perform the work
|
||||
double result = calculatePiFor(work.getStart(), work.getNrOfElements());
|
||||
|
||||
// reply with the result
|
||||
getContext().reply(new Result(result));
|
||||
|
||||
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
|
||||
}
|
||||
}
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#worker
|
||||
:exclude: calculatePiFor
|
||||
|
||||
As you can see we have now created an ``UntypedActor`` with a ``onReceive`` method as a handler for the ``Work`` message. In this handler we invoke the ``calculatePiFor(..)`` method, wrap the result in a ``Result`` message and send it back to the original sender using ``getContext().reply(..)``. In Akka the sender reference is implicitly passed along with the message so that the receiver can always reply or store away the sender reference for future use.
|
||||
|
||||
The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method::
|
||||
The only thing missing in our ``Worker`` actor is the implementation on the ``calculatePiFor(..)`` method:
|
||||
|
||||
// define the work
|
||||
private double calculatePiFor(int start, int nrOfElements) {
|
||||
double acc = 0.0;
|
||||
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
|
||||
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
|
||||
}
|
||||
return acc;
|
||||
}
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#calculatePiFor
|
||||
|
||||
Creating the master
|
||||
-------------------
|
||||
|
||||
The master actor is a little bit more involved. In its constructor we need to create the workers (the ``Worker`` actors) and start them. We will also wrap them in a load-balancing router to make it easier to spread out the work evenly between the workers. Let's do that first::
|
||||
The master actor is a little bit more involved. In its constructor we create a round-robin router
|
||||
to make it easier to spread out the work evenly between the workers. Let's do that first:
|
||||
|
||||
static class Master extends UntypedActor {
|
||||
...
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#create-router
|
||||
|
||||
static class PiRouter extends UntypedLoadBalancer {
|
||||
private final InfiniteIterator<ActorRef> workers;
|
||||
|
||||
public PiRouter(ActorRef[] workers) {
|
||||
this.workers = new CyclicIterator<ActorRef>(asList(workers));
|
||||
}
|
||||
|
||||
public InfiniteIterator<ActorRef> seq() {
|
||||
return workers;
|
||||
}
|
||||
}
|
||||
|
||||
public Master(...) {
|
||||
...
|
||||
|
||||
// create the workers
|
||||
final ActorRef[] workers = new ActorRef[nrOfWorkers];
|
||||
for (int i = 0; i < nrOfWorkers; i++) {
|
||||
workers[i] = actorOf(new Props(Worker.class));
|
||||
}
|
||||
|
||||
// wrap them with a load-balancing router
|
||||
ActorRef router = actorOf(new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new PiRouter(workers);
|
||||
}
|
||||
}));
|
||||
}
|
||||
}
|
||||
|
||||
As you can see we are using the ``actorOf`` factory method to create actors, this method returns as an ``ActorRef`` which is a reference to our newly created actor. This method is available in the ``Actors`` object but is usually imported::
|
||||
|
||||
import static akka.actor.Actors.actorOf;
|
||||
|
||||
One thing to note is that we used two different versions of the ``actorOf`` method. For creating the ``Worker`` actor we just pass in the class but to create the ``PiRouter`` actor we can't do that since the constructor in the ``PiRouter`` class takes arguments, instead we need to use the ``UntypedActorFactory`` which unfortunately is a bit more verbose.
|
||||
|
||||
``actorOf`` is the only way to create an instance of an Actor, this is enforced by Akka runtime. The ``actorOf`` method instantiates the actor and returns, not an instance to the actor, but an instance to an ``ActorRef``. This reference is the handle through which you communicate with the actor. It is immutable, serializable and location-aware meaning that it "remembers" its original actor even if it is sent to other nodes across the network and can be seen as the equivalent to the Erlang actor's PID.
|
||||
|
||||
The actor's life-cycle is:
|
||||
|
||||
- Created & Started -- ``Actor.actorOf(Props[MyActor]`` -- can receive messages
|
||||
- Stopped -- ``actorRef.stop()`` -- can **not** receive messages
|
||||
|
||||
Once the actor has been stopped it is dead and can not be started again.
|
||||
|
||||
Now we have a router that is representing all our workers in a single abstraction. If you paid attention to the code above, you saw that we were using the ``nrOfWorkers`` variable. This variable and others we have to pass to the ``Master`` actor in its constructor. So now let's create the master actor. We have to pass in three integer variables:
|
||||
Now we have a router that is representing all our workers in a single
|
||||
abstraction. So now let's create the master actor. We pass it three integer variables:
|
||||
|
||||
- ``nrOfWorkers`` -- defining how many workers we should start up
|
||||
- ``nrOfMessages`` -- defining how many number chunks to send out to the workers
|
||||
- ``nrOfElements`` -- defining how big the number chunks sent to each worker should be
|
||||
|
||||
Here is the master actor::
|
||||
Here is the master actor:
|
||||
|
||||
static class Master extends UntypedActor {
|
||||
private final int nrOfMessages;
|
||||
private final int nrOfElements;
|
||||
private final CountDownLatch latch;
|
||||
|
||||
private double pi;
|
||||
private int nrOfResults;
|
||||
private long start;
|
||||
|
||||
private ActorRef router;
|
||||
|
||||
static class PiRouter extends UntypedLoadBalancer {
|
||||
private final InfiniteIterator<ActorRef> workers;
|
||||
|
||||
public PiRouter(ActorRef[] workers) {
|
||||
this.workers = new CyclicIterator<ActorRef>(asList(workers));
|
||||
}
|
||||
|
||||
public InfiniteIterator<ActorRef> seq() {
|
||||
return workers;
|
||||
}
|
||||
}
|
||||
|
||||
public Master(
|
||||
int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
|
||||
this.nrOfMessages = nrOfMessages;
|
||||
this.nrOfElements = nrOfElements;
|
||||
this.latch = latch;
|
||||
|
||||
// create the workers
|
||||
final ActorRef[] workers = new ActorRef[nrOfWorkers];
|
||||
for (int i = 0; i < nrOfWorkers; i++) {
|
||||
workers[i] = actorOf(new Props(Worker.class));
|
||||
}
|
||||
|
||||
// wrap them with a load-balancing router
|
||||
router = actorOf(new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new PiRouter(workers);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// message handler
|
||||
public void onReceive(Object message) { ... }
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
start = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
// tell the world that the calculation is complete
|
||||
System.out.println(String.format(
|
||||
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
|
||||
pi, (System.currentTimeMillis() - start)));
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#master
|
||||
:exclude: handle-messages
|
||||
|
||||
A couple of things are worth explaining further.
|
||||
|
||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the ``Master`` actor. This latch is only used for plumbing (in this specific tutorial), to have a simple way of letting the outside world knowing when the master can deliver the result and shut down. In more idiomatic Akka code, as we will see in part two of this tutorial series, we would not use a latch but other abstractions and functions like ``Channel``, ``Future`` and ``ask()`` to achieve the same thing in a non-blocking way. But for simplicity let's stick to a ``CountDownLatch`` for now.
|
||||
First, we are passing in a ``java.util.concurrent.CountDownLatch`` to the
|
||||
``Master`` actor. This latch is only used for plumbing (in this specific
|
||||
tutorial), to have a simple way of letting the outside world knowing when the
|
||||
master can deliver the result and shut down. In more idiomatic Akka code
|
||||
we would not use a latch but other abstractions and functions like ``Future``
|
||||
and ``ask()`` to achieve the same thing in a non-blocking way.
|
||||
But for simplicity let's stick to a ``CountDownLatch`` for now.
|
||||
|
||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and ``postStop``. In the ``preStart`` callback we are recording the time when the actor is started and in the ``postStop`` callback we are printing out the result (the approximation of Pi) and the time it took to calculate it. In this call we also invoke ``latch.countDown()`` to tell the outside world that we are done.
|
||||
Second, we are adding a couple of life-cycle callback methods; ``preStart`` and
|
||||
``postStop``. In the ``preStart`` callback we are recording the time when the
|
||||
actor is started and in the ``postStop`` callback we are printing out the result
|
||||
(the approximation of Pi) and the time it took to calculate it. In this call we
|
||||
also invoke ``latch.countDown()`` to tell the outside world that we are done.
|
||||
|
||||
But we are not done yet. We are missing the message handler for the ``Master`` actor. This message handler needs to be able to react to two different messages:
|
||||
But we are not done yet. We are missing the message handler for the ``Master`` actor.
|
||||
This message handler needs to be able to react to two different messages:
|
||||
|
||||
- ``Calculate`` -- which should start the calculation
|
||||
- ``Result`` -- which should aggregate the different results
|
||||
|
||||
The ``Calculate`` handler is sending out work to all the ``Worker`` actors and after doing that it also sends a ``new Broadcast(poisonPill())`` message to the router, which will send out the ``PoisonPill`` message to all the actors it is representing (in our case all the ``Worker`` actors). ``PoisonPill`` is a special kind of message that tells the receiver to shut itself down using the normal shutdown method; ``getContext().stop()``, and is created through the ``poisonPill()`` method. We also send a ``PoisonPill`` to the router itself (since it's also an actor that we want to shut down).
|
||||
The ``Calculate`` handler is sending out work to all the ``Worker`` via its router.
|
||||
|
||||
The ``Result`` handler is simpler, here we get the value from the ``Result`` message and aggregate it to our ``pi`` member variable. We also keep track of how many results we have received back, and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and shuts down.
|
||||
The ``Result`` handler gets the value from the ``Result`` message and aggregates it to
|
||||
our ``pi`` member variable. We also keep track of how many results we have received back,
|
||||
and if that matches the number of tasks sent out, the ``Master`` actor considers itself done and
|
||||
invokes the ``self.stop()`` method to stop itself *and* all its supervised actors.
|
||||
In this case it has one supervised actor, the router, and this in turn has ``nrOfWorkers`` supervised actors.
|
||||
All of them will be stopped automatically as the invocation of any supervisor's ``stop`` method
|
||||
will propagate down to all its supervised 'children'.
|
||||
|
||||
Let's capture this in code::
|
||||
Let's capture this in code:
|
||||
|
||||
// message handler
|
||||
public void onReceive(Object message) {
|
||||
|
||||
if (message instanceof Calculate) {
|
||||
// schedule work
|
||||
for (int start = 0; start < nrOfMessages; start++) {
|
||||
router.tell(new Work(start, nrOfElements), getContext());
|
||||
}
|
||||
|
||||
// send a PoisonPill to all workers telling them to shut down themselves
|
||||
router.tell(new Broadcast(poisonPill()));
|
||||
|
||||
// send a PoisonPill to the router, telling him to shut himself down
|
||||
router.tell(poisonPill());
|
||||
|
||||
} else if (message instanceof Result) {
|
||||
|
||||
// handle result from the worker
|
||||
Result result = (Result) message;
|
||||
pi += result.getValue();
|
||||
nrOfResults += 1;
|
||||
if (nrOfResults == nrOfMessages) getContext().stop();
|
||||
|
||||
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
|
||||
}
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#master-receive
|
||||
|
||||
Bootstrap the calculation
|
||||
-------------------------
|
||||
|
||||
Now the only thing that is left to implement is the runner that should bootstrap and run the calculation for us. We do that by adding a ``main`` method to the enclosing ``Pi`` class in which we create a new instance of ``Pi`` and invoke method ``calculate`` in which we start up the ``Master`` actor and wait for it to finish::
|
||||
Now the only thing that is left to implement is the runner that should bootstrap and run the calculation for us.
|
||||
We do that by adding a ``main`` method to the enclosing ``Pi`` class in which we create a new instance of ``Pi`` and
|
||||
invoke method ``calculate`` in which we start up the ``Master`` actor and wait for it to finish:
|
||||
|
||||
public class Pi {
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java#app
|
||||
:exclude: actors-and-messages
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Pi pi = new Pi();
|
||||
pi.calculate(4, 10000, 10000);
|
||||
}
|
||||
|
||||
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages)
|
||||
throws Exception {
|
||||
|
||||
// this latch is only plumbing to know when the calculation is completed
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// create the master
|
||||
ActorRef master = actorOf(new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
|
||||
}
|
||||
}));
|
||||
|
||||
// start the calculation
|
||||
master.tell(new Calculate());
|
||||
|
||||
// wait for master to shut down
|
||||
latch.await();
|
||||
}
|
||||
}
|
||||
As you can see the *calculate* method above it creates an ActorSystem and this is the Akka container which
|
||||
will contain all actors created in that "context". An example of how to create actors in the container
|
||||
is the *'system.actorOf(...)'* line in the calculate method. In this case we create a top level actor.
|
||||
If you instead where in an actor context, i.e. inside an actor creating other actors, you should use
|
||||
*this.getContext.actorOf(...)*. This is illustrated in the Master code above.
|
||||
|
||||
That's it. Now we are done.
|
||||
|
||||
Before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all::
|
||||
|
||||
package akka.tutorial.first.java;
|
||||
|
||||
import static akka.actor.Actors.actorOf;
|
||||
import static akka.actor.Actors.poisonPill;
|
||||
import static java.util.Arrays.asList;
|
||||
|
||||
import akka.actor.Props;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.actor.UntypedActorFactory;
|
||||
import akka.routing.CyclicIterator;
|
||||
import akka.routing.InfiniteIterator;
|
||||
import akka.routing.Routing.Broadcast;
|
||||
import akka.routing.UntypedLoadBalancer;
|
||||
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
|
||||
public class Pi {
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
Pi pi = new Pi();
|
||||
pi.calculate(4, 10000, 10000);
|
||||
}
|
||||
|
||||
// ====================
|
||||
// ===== Messages =====
|
||||
// ====================
|
||||
static class Calculate {}
|
||||
|
||||
static class Work {
|
||||
private final int start;
|
||||
private final int nrOfElements;
|
||||
|
||||
public Work(int start, int nrOfElements) {
|
||||
this.start = start;
|
||||
this.nrOfElements = nrOfElements;
|
||||
}
|
||||
|
||||
public int getStart() { return start; }
|
||||
public int getNrOfElements() { return nrOfElements; }
|
||||
}
|
||||
|
||||
static class Result {
|
||||
private final double value;
|
||||
|
||||
public Result(double value) {
|
||||
this.value = value;
|
||||
}
|
||||
|
||||
public double getValue() { return value; }
|
||||
}
|
||||
|
||||
// ==================
|
||||
// ===== Worker =====
|
||||
// ==================
|
||||
static class Worker extends UntypedActor {
|
||||
|
||||
// define the work
|
||||
private double calculatePiFor(int start, int nrOfElements) {
|
||||
double acc = 0.0;
|
||||
for (int i = start * nrOfElements; i <= ((start + 1) * nrOfElements - 1); i++) {
|
||||
acc += 4.0 * (1 - (i % 2) * 2) / (2 * i + 1);
|
||||
}
|
||||
return acc;
|
||||
}
|
||||
|
||||
// message handler
|
||||
public void onReceive(Object message) {
|
||||
if (message instanceof Work) {
|
||||
Work work = (Work) message;
|
||||
|
||||
// perform the work
|
||||
double result = calculatePiFor(work.getStart(), work.getNrOfElements())
|
||||
|
||||
// reply with the result
|
||||
getContext().reply(new Result(result));
|
||||
|
||||
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
|
||||
}
|
||||
}
|
||||
|
||||
// ==================
|
||||
// ===== Master =====
|
||||
// ==================
|
||||
static class Master extends UntypedActor {
|
||||
private final int nrOfMessages;
|
||||
private final int nrOfElements;
|
||||
private final CountDownLatch latch;
|
||||
|
||||
private double pi;
|
||||
private int nrOfResults;
|
||||
private long start;
|
||||
|
||||
private ActorRef router;
|
||||
|
||||
static class PiRouter extends UntypedLoadBalancer {
|
||||
private final InfiniteIterator<ActorRef> workers;
|
||||
|
||||
public PiRouter(ActorRef[] workers) {
|
||||
this.workers = new CyclicIterator<ActorRef>(asList(workers));
|
||||
}
|
||||
|
||||
public InfiniteIterator<ActorRef> seq() {
|
||||
return workers;
|
||||
}
|
||||
}
|
||||
|
||||
public Master(
|
||||
int nrOfWorkers, int nrOfMessages, int nrOfElements, CountDownLatch latch) {
|
||||
|
||||
this.nrOfMessages = nrOfMessages;
|
||||
this.nrOfElements = nrOfElements;
|
||||
this.latch = latch;
|
||||
|
||||
// create the workers
|
||||
final ActorRef[] workers = new ActorRef[nrOfWorkers];
|
||||
for (int i = 0; i < nrOfWorkers; i++) {
|
||||
workers[i] = actorOf(new Props(Worker.class));
|
||||
}
|
||||
|
||||
// wrap them with a load-balancing router
|
||||
router = actorOf(new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new PiRouter(workers);
|
||||
}
|
||||
}));
|
||||
}
|
||||
|
||||
// message handler
|
||||
public void onReceive(Object message) {
|
||||
|
||||
if (message instanceof Calculate) {
|
||||
// schedule work
|
||||
for (int start = 0; start < nrOfMessages; start++) {
|
||||
router.tell(new Work(start, nrOfElements), getContext());
|
||||
}
|
||||
|
||||
// send a PoisonPill to all workers telling them to shut down themselves
|
||||
router.tell(new Broadcast(poisonPill()));
|
||||
|
||||
// send a PoisonPill to the router, telling him to shut himself down
|
||||
router.tell(poisonPill());
|
||||
|
||||
} else if (message instanceof Result) {
|
||||
|
||||
// handle result from the worker
|
||||
Result result = (Result) message;
|
||||
pi += result.getValue();
|
||||
nrOfResults += 1;
|
||||
if (nrOfResults == nrOfMessages) getContext().stop();
|
||||
|
||||
} else throw new IllegalArgumentException("Unknown message [" + message + "]");
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preStart() {
|
||||
start = System.currentTimeMillis();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void postStop() {
|
||||
// tell the world that the calculation is complete
|
||||
System.out.println(String.format(
|
||||
"\n\tPi estimate: \t\t%s\n\tCalculation time: \t%s millis",
|
||||
pi, (System.currentTimeMillis() - start)));
|
||||
latch.countDown();
|
||||
}
|
||||
}
|
||||
|
||||
// ==================
|
||||
// ===== Run it =====
|
||||
// ==================
|
||||
public void calculate(final int nrOfWorkers, final int nrOfElements, final int nrOfMessages)
|
||||
throws Exception {
|
||||
|
||||
// this latch is only plumbing to know when the calculation is completed
|
||||
final CountDownLatch latch = new CountDownLatch(1);
|
||||
|
||||
// create the master
|
||||
ActorRef master = actorOf(new Props(new UntypedActorFactory() {
|
||||
public UntypedActor create() {
|
||||
return new Master(nrOfWorkers, nrOfMessages, nrOfElements, latch);
|
||||
}
|
||||
}));
|
||||
|
||||
// start the calculation
|
||||
master.tell(new Calculate());
|
||||
|
||||
// wait for master to shut down
|
||||
latch.await();
|
||||
}
|
||||
}
|
||||
Before we package it up and run it, let's take a look at the full code now, with package declaration, imports and all:
|
||||
|
||||
.. includecode:: ../../akka-tutorials/akka-tutorial-first/src/main/java/akka/tutorial/first/java/Pi.java
|
||||
|
||||
Run it as a command line application
|
||||
------------------------------------
|
||||
|
|
@ -729,11 +352,11 @@ and the ``scala-library.jar`` JAR files to the classpath as well as the classes
|
|||
we compiled ourselves::
|
||||
|
||||
$ java \
|
||||
-cp lib/scala-library.jar:lib/akka/akka-actor-2.0-SNAPSHOT.jar:tutorial \
|
||||
-cp lib/scala-library.jar:lib/akka/akka-actor-2.0-SNAPSHOT.jar:. \
|
||||
akka.tutorial.java.first.Pi
|
||||
|
||||
Pi estimate: 3.1435501812459323
|
||||
Calculation time: 822 millis
|
||||
Calculation time: 609 millis
|
||||
|
||||
Yippee! It is working.
|
||||
|
||||
|
|
@ -750,7 +373,7 @@ When this in done we can run our application directly inside Maven::
|
|||
$ mvn exec:java -Dexec.mainClass="akka.tutorial.first.java.Pi"
|
||||
...
|
||||
Pi estimate: 3.1435501812459323
|
||||
Calculation time: 939 millis
|
||||
Calculation time: 597 millis
|
||||
|
||||
Yippee! It is working.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue