Merge branch 'master' of github.com:jboner/akka
This commit is contained in:
commit
0cf28f586d
1 changed files with 5 additions and 15 deletions
|
|
@ -8,17 +8,12 @@ import static akka.actor.Actors.actorOf;
|
||||||
import static akka.actor.Actors.poisonPill;
|
import static akka.actor.Actors.poisonPill;
|
||||||
import static java.lang.System.currentTimeMillis;
|
import static java.lang.System.currentTimeMillis;
|
||||||
import static java.util.Arrays.asList;
|
import static java.util.Arrays.asList;
|
||||||
|
|
||||||
import java.util.concurrent.CountDownLatch;
|
|
||||||
|
|
||||||
import scala.Option;
|
import scala.Option;
|
||||||
|
|
||||||
import akka.actor.ActorRef;
|
import akka.actor.ActorRef;
|
||||||
|
import akka.actor.Channel;
|
||||||
import akka.actor.UntypedActor;
|
import akka.actor.UntypedActor;
|
||||||
import akka.actor.UntypedActorFactory;
|
import akka.actor.UntypedActorFactory;
|
||||||
import akka.dispatch.CompletableFuture;
|
|
||||||
import akka.dispatch.Future;
|
import akka.dispatch.Future;
|
||||||
import akka.event.EventHandler;
|
|
||||||
import akka.japi.Procedure;
|
import akka.japi.Procedure;
|
||||||
import akka.routing.CyclicIterator;
|
import akka.routing.CyclicIterator;
|
||||||
import akka.routing.InfiniteIterator;
|
import akka.routing.InfiniteIterator;
|
||||||
|
|
@ -157,7 +152,7 @@ public class Pi {
|
||||||
|
|
||||||
// message handler
|
// message handler
|
||||||
public void onReceive(Object message) {
|
public void onReceive(Object message) {
|
||||||
throw new IllegalStateException("Should be gatter or scatter");
|
throw new IllegalStateException("Should be gather or scatter");
|
||||||
}
|
}
|
||||||
|
|
||||||
private final Procedure<Object> scatter = new Procedure<Object>() {
|
private final Procedure<Object> scatter = new Procedure<Object>() {
|
||||||
|
|
@ -166,16 +161,12 @@ public class Pi {
|
||||||
for (int arg = 0; arg < nrOfMessages; arg++) {
|
for (int arg = 0; arg < nrOfMessages; arg++) {
|
||||||
router.sendOneWay(new Work(arg, nrOfElements), getContext());
|
router.sendOneWay(new Work(arg, nrOfElements), getContext());
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO would like to use channel instead, wrong docs, channel() not there
|
|
||||||
// getContext().channel()
|
|
||||||
CompletableFuture<Object> resultFuture = getContext().getSenderFuture().get();
|
|
||||||
// Assume the gathering behavior
|
// Assume the gathering behavior
|
||||||
become(gatter(resultFuture));
|
become(gather(getContext().getChannel()));
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
private Procedure<Object> gatter(final CompletableFuture<Object> resultFuture) {
|
private Procedure<Object> gather(final Channel<Object> recipient) {
|
||||||
return new Procedure<Object>() {
|
return new Procedure<Object>() {
|
||||||
public void apply(Object msg) {
|
public void apply(Object msg) {
|
||||||
// handle result from the worker
|
// handle result from the worker
|
||||||
|
|
@ -183,9 +174,8 @@ public class Pi {
|
||||||
pi += result.getValue();
|
pi += result.getValue();
|
||||||
nrOfResults += 1;
|
nrOfResults += 1;
|
||||||
if (nrOfResults == nrOfMessages) {
|
if (nrOfResults == nrOfMessages) {
|
||||||
System.out.println("# DONE");
|
|
||||||
// send the pi result back to the guy who started the calculation
|
// send the pi result back to the guy who started the calculation
|
||||||
resultFuture.completeWithResult(pi);
|
recipient.sendOneWay(pi);
|
||||||
// shut ourselves down, we're done
|
// shut ourselves down, we're done
|
||||||
getContext().stop();
|
getContext().stop();
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue