Add untyped coordinated example to be used in docs
This commit is contained in:
parent
2cc572e395
commit
baa181fd0d
3 changed files with 85 additions and 2 deletions
|
|
@ -22,7 +22,7 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
|
|||
* Transactors can also accept explicitly sent `Coordinated` messages.
|
||||
* <br/><br/>
|
||||
*
|
||||
* Simple transactors will just implement the `atomically` method which similar to
|
||||
* Simple transactors will just implement the `atomically` method which is similar to
|
||||
* the actor `receive` method but runs within a coordinated transaction.
|
||||
*
|
||||
* Example of a simple transactor that will join a coordinated transaction:
|
||||
|
|
@ -45,7 +45,7 @@ case class SendTo(actor: ActorRef, message: Option[Any] = None)
|
|||
* The `include` method will send on the same message that was received to other transactors.
|
||||
* The `sendTo` method allows you to specify both the actor to send to, and message to send.
|
||||
*
|
||||
* Example of using coordinating an increment:
|
||||
* Example of coordinating an increment:
|
||||
*
|
||||
* {{{
|
||||
* class FriendlyCounter(friend: ActorRef) extends Transactor {
|
||||
|
|
|
|||
|
|
@ -0,0 +1,39 @@
|
|||
package akka.transactor.example;
|
||||
|
||||
import akka.transactor.Coordinated;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.stm.*;
|
||||
|
||||
public class UntypedCoordinatedCounter extends UntypedActor {
|
||||
private Ref<Integer> count = new Ref(0);
|
||||
|
||||
private void increment() {
|
||||
System.out.println("incrementing");
|
||||
count.set(count.get() + 1);
|
||||
}
|
||||
|
||||
public void onReceive(Object incoming) throws Exception {
|
||||
if (incoming instanceof Coordinated) {
|
||||
Coordinated coordinated = (Coordinated) incoming;
|
||||
Object message = coordinated.getMessage();
|
||||
if (message instanceof Increment) {
|
||||
Increment increment = (Increment) message;
|
||||
if (increment.hasFriend()) {
|
||||
increment.getFriend().sendOneWay(coordinated.coordinate(new Increment()));
|
||||
}
|
||||
coordinated.atomic(new Atomic() {
|
||||
public Object atomically() {
|
||||
increment();
|
||||
return null;
|
||||
}
|
||||
});
|
||||
}
|
||||
} else if (incoming instanceof String) {
|
||||
String message = (String) incoming;
|
||||
if (message.equals("GetCount")) {
|
||||
getContext().replyUnsafe(count.get());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,44 @@
|
|||
package akka.transactor.example;
|
||||
|
||||
import akka.transactor.Coordinated;
|
||||
import akka.actor.ActorRef;
|
||||
import akka.actor.UntypedActor;
|
||||
import akka.dispatch.Future;
|
||||
import akka.dispatch.Futures;
|
||||
|
||||
public class UntypedCoordinatedExample {
|
||||
public static void main(String[] args) throws InterruptedException {
|
||||
System.out.println();
|
||||
System.out.println("Untyped transactor example");
|
||||
System.out.println();
|
||||
|
||||
ActorRef counter1 = UntypedActor.actorOf(UntypedCoordinatedCounter.class).start();
|
||||
ActorRef counter2 = UntypedActor.actorOf(UntypedCoordinatedCounter.class).start();
|
||||
|
||||
counter1.sendOneWay(new Coordinated(new Increment(counter2)));
|
||||
|
||||
Thread.sleep(3000);
|
||||
|
||||
Future future1 = counter1.sendRequestReplyFuture("GetCount");
|
||||
Future future2 = counter2.sendRequestReplyFuture("GetCount");
|
||||
|
||||
future1.await();
|
||||
if (future1.isCompleted()) {
|
||||
if (future1.result().isDefined()) {
|
||||
int result = (Integer) future1.result().get();
|
||||
System.out.println("counter 1: " + result);
|
||||
}
|
||||
}
|
||||
|
||||
future2.await();
|
||||
if (future2.isCompleted()) {
|
||||
if (future2.result().isDefined()) {
|
||||
int result = (Integer) future2.result().get();
|
||||
System.out.println("counter 2: " + result);
|
||||
}
|
||||
}
|
||||
|
||||
counter1.stop();
|
||||
counter2.stop();
|
||||
}
|
||||
}
|
||||
Loading…
Add table
Add a link
Reference in a new issue