Update to new Java API for Scala STM

This commit is contained in:
Peter Vlugter 2012-01-18 15:59:59 +13:00
parent 158bbabb58
commit 2757869c62
12 changed files with 61 additions and 129 deletions

View file

@ -7,15 +7,11 @@ package akka.docs.transactor;
//#class //#class
import akka.actor.*; import akka.actor.*;
import akka.transactor.*; import akka.transactor.*;
import scala.concurrent.stm.*; import scala.concurrent.stm.Ref;
import static scala.concurrent.stm.JavaAPI.*;
public class CoordinatedCounter extends UntypedActor { public class CoordinatedCounter extends UntypedActor {
private Ref<Integer> count = Stm.ref(0); private Ref.View<Integer> count = newRef(0);
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
public void onReceive(Object incoming) throws Exception { public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) { if (incoming instanceof Coordinated) {
@ -26,14 +22,14 @@ public class CoordinatedCounter extends UntypedActor {
if (increment.hasFriend()) { if (increment.hasFriend()) {
increment.getFriend().tell(coordinated.coordinate(new Increment())); increment.getFriend().tell(coordinated.coordinate(new Increment()));
} }
coordinated.atomic(new Atomically() { coordinated.atomic(new Runnable() {
public void atomically(InTxn txn) { public void run() {
increment(txn); increment(count, 1);
} }
}); });
} }
} else if ("GetCount".equals(incoming)) { } else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get()); getSender().tell(count.get());
} else { } else {
unhandled(incoming); unhandled(incoming);
} }

View file

@ -6,7 +6,6 @@ package akka.docs.transactor;
import akka.actor.*; import akka.actor.*;
import akka.transactor.*; import akka.transactor.*;
import scala.concurrent.stm.*;
public class Coordinator extends UntypedActor { public class Coordinator extends UntypedActor {
public void onReceive(Object incoming) throws Exception { public void onReceive(Object incoming) throws Exception {
@ -15,8 +14,8 @@ public class Coordinator extends UntypedActor {
Object message = coordinated.getMessage(); Object message = coordinated.getMessage();
if (message instanceof Message) { if (message instanceof Message) {
//#coordinated-atomic //#coordinated-atomic
coordinated.atomic(new Atomically() { coordinated.atomic(new Runnable() {
public void atomically(InTxn txn) { public void run() {
// do something in the coordinated transaction ... // do something in the coordinated transaction ...
} }
}); });

View file

@ -6,21 +6,21 @@ package akka.docs.transactor;
//#class //#class
import akka.transactor.*; import akka.transactor.*;
import scala.concurrent.stm.*; import scala.concurrent.stm.Ref;
import static scala.concurrent.stm.JavaAPI.*;
public class Counter extends UntypedTransactor { public class Counter extends UntypedTransactor {
Ref<Integer> count = Stm.ref(0); Ref.View<Integer> count = newRef(0);
public void atomically(InTxn txn, Object message) { public void atomically(Object message) {
if (message instanceof Increment) { if (message instanceof Increment) {
Integer newValue = count.get(txn) + 1; increment(count, 1);
count.set(newValue, txn);
} }
} }
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getSender().tell(count.single().get()); getSender().tell(count.get());
return true; return true;
} else return false; } else return false;
} }

View file

@ -8,10 +8,11 @@ package akka.docs.transactor;
import akka.actor.*; import akka.actor.*;
import akka.transactor.*; import akka.transactor.*;
import java.util.Set; import java.util.Set;
import scala.concurrent.stm.*; import scala.concurrent.stm.Ref;
import static scala.concurrent.stm.JavaAPI.*;
public class FriendlyCounter extends UntypedTransactor { public class FriendlyCounter extends UntypedTransactor {
Ref<Integer> count = Stm.ref(0); Ref.View<Integer> count = newRef(0);
@Override public Set<SendTo> coordinate(Object message) { @Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) { if (message instanceof Increment) {
@ -22,16 +23,15 @@ public class FriendlyCounter extends UntypedTransactor {
return nobody(); return nobody();
} }
public void atomically(InTxn txn, Object message) { public void atomically(Object message) {
if (message instanceof Increment) { if (message instanceof Increment) {
Integer newValue = count.get(txn) + 1; increment(count, 1);
count.set(newValue, txn);
} }
} }
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getSender().tell(count.single().get()); getSender().tell(count.get());
return true; return true;
} else return false; } else return false;
} }

View file

@ -102,7 +102,7 @@ be sent.
:language: java :language: java
To enter the coordinated transaction use the atomic method of the coordinated To enter the coordinated transaction use the atomic method of the coordinated
object, passing in an ``akka.transactor.Atomically`` object. object, passing in a ``java.lang.Runnable``.
.. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic .. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic
:language: java :language: java

View file

@ -1,67 +0,0 @@
/**
* Copyright (C) 2009-2011 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import scala.concurrent.stm._
/**
* Java API.
*
* For creating Java-friendly coordinated atomic blocks.
*
* @see [[akka.transactor.Coordinated]]
*/
trait Atomically {
def atomically(txn: InTxn): Unit
}
/**
* Java API.
*
* For creating completion handlers.
*/
trait CompletionHandler {
def handle(status: Txn.Status): Unit
}
/**
* Java API.
*
* To ease some of the pain of using Scala STM from Java until
* the proper Java API is created.
*/
object Stm {
/**
* Create an STM Ref with an initial value.
*/
def ref[A](initialValue: A): Ref[A] = Ref(initialValue)
/**
* Add a CompletionHandler to run after the current transaction
* has committed.
*/
def afterCommit(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status handler.handle(status))(txn.get)
}
/**
* Add a CompletionHandler to run after the current transaction
* has rolled back.
*/
def afterRollback(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterRollback(status handler.handle(status))(txn.get)
}
/**
* Add a CompletionHandler to run after the current transaction
* has committed or rolled back.
*/
def afterCompletion(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCompletion(status handler.handle(status))(txn.get)
}
}

View file

@ -6,7 +6,8 @@ package akka.transactor
import akka.AkkaException import akka.AkkaException
import akka.util.Timeout import akka.util.Timeout
import scala.concurrent.stm._ import scala.concurrent.stm.{ CommitBarrier, InTxn }
import java.util.concurrent.Callable
/** /**
* Akka-specific exception for coordinated transactions. * Akka-specific exception for coordinated transactions.
@ -125,7 +126,7 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
* *
* @throws CoordinatedTransactionException if the coordinated transaction fails. * @throws CoordinatedTransactionException if the coordinated transaction fails.
*/ */
def atomic[T](body: InTxn T): T = { def atomic[A](body: InTxn A): A = {
member.atomic(body) match { member.atomic(body) match {
case Right(result) result case Right(result) result
case Left(CommitBarrier.MemberUncaughtExceptionCause(x)) case Left(CommitBarrier.MemberUncaughtExceptionCause(x))
@ -136,13 +137,22 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
} }
/** /**
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. * Java API: coordinated atomic method that accepts a `java.lang.Runnable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions * Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated. * in this coordination before committing. The timeout is specified when creating the Coordinated.
* *
* @throws CoordinatedTransactionException if the coordinated transaction fails. * @throws CoordinatedTransactionException if the coordinated transaction fails.
*/ */
def atomic(atomically: Atomically): Unit = atomic(txn atomically.atomically(txn)) def atomic(runnable: Runnable): Unit = atomic { _ runnable.run }
/**
* Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[A](callable: Callable[A]): A = atomic { _ callable.call }
/** /**
* An empty coordinated atomic block. Can be used to complete the number of members involved * An empty coordinated atomic block. Can be used to complete the number of members involved

View file

@ -25,7 +25,7 @@ abstract class UntypedTransactor extends UntypedActor {
sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message))) sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message)))
} }
before(message) before(message)
coordinated.atomic { txn atomically(txn, message) } coordinated.atomic { txn atomically(message) }
after(message) after(message)
} }
case message { case message {
@ -84,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor {
* The Receive block to run inside the coordinated transaction. * The Receive block to run inside the coordinated transaction.
*/ */
@throws(classOf[Exception]) @throws(classOf[Exception])
def atomically(txn: InTxn, message: Any) {} def atomically(message: Any)
/** /**
* A Receive block that runs after the coordinated transaction. * A Receive block that runs after the coordinated transaction.

View file

@ -7,24 +7,20 @@ package akka.transactor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.actor.Actors; import akka.actor.Actors;
import akka.actor.UntypedActor; import akka.actor.UntypedActor;
import scala.concurrent.stm.*; import static scala.concurrent.stm.JavaAPI.*;
import scala.concurrent.stm.Ref;
import java.util.List; import java.util.List;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
public class UntypedCoordinatedCounter extends UntypedActor { public class UntypedCoordinatedCounter extends UntypedActor {
private String name; private String name;
private Ref<Integer> count = Stm.ref(0); private Ref.View<Integer> count = newRef(0);
public UntypedCoordinatedCounter(String name) { public UntypedCoordinatedCounter(String name) {
this.name = name; this.name = name;
} }
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
public void onReceive(Object incoming) throws Exception { public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) { if (incoming instanceof Coordinated) {
Coordinated coordinated = (Coordinated) incoming; Coordinated coordinated = (Coordinated) incoming;
@ -33,8 +29,8 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment increment = (Increment) message; Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends(); List<ActorRef> friends = increment.getFriends();
final CountDownLatch latch = increment.getLatch(); final CountDownLatch latch = increment.getLatch();
final CompletionHandler countDown = new CompletionHandler() { final Runnable countDown = new Runnable() {
public void handle(Txn.Status status) { public void run() {
latch.countDown(); latch.countDown();
} }
}; };
@ -42,15 +38,16 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).tell(coordinated.coordinate(coordMessage)); friends.get(0).tell(coordinated.coordinate(coordMessage));
} }
coordinated.atomic(new Atomically() { coordinated.atomic(new Runnable() {
public void atomically(InTxn txn) { public void run() {
increment(txn); increment(count, 1);
Stm.afterCompletion(countDown); afterRollback(countDown);
afterCommit(countDown);
} }
}); });
} }
} else if ("GetCount".equals(incoming)) { } else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get()); getSender().tell(count.get());
} }
} }
} }

View file

@ -7,7 +7,8 @@ package akka.transactor;
import akka.actor.ActorRef; import akka.actor.ActorRef;
import akka.transactor.UntypedTransactor; import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo; import akka.transactor.SendTo;
import scala.concurrent.stm.*; import static scala.concurrent.stm.JavaAPI.*;
import scala.concurrent.stm.Ref;
import java.util.List; import java.util.List;
import java.util.Set; import java.util.Set;
import java.util.concurrent.CountDownLatch; import java.util.concurrent.CountDownLatch;
@ -15,17 +16,12 @@ import java.util.concurrent.TimeUnit;
public class UntypedCounter extends UntypedTransactor { public class UntypedCounter extends UntypedTransactor {
private String name; private String name;
private Ref<Integer> count = Stm.ref(0); private Ref.View<Integer> count = newRef(0);
public UntypedCounter(String name) { public UntypedCounter(String name) {
this.name = name; this.name = name;
} }
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
@Override public Set<SendTo> coordinate(Object message) { @Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) { if (message instanceof Increment) {
Increment increment = (Increment) message; Increment increment = (Increment) message;
@ -41,22 +37,23 @@ public class UntypedCounter extends UntypedTransactor {
} }
} }
public void atomically(InTxn txn, Object message) { public void atomically(Object message) {
if (message instanceof Increment) { if (message instanceof Increment) {
increment(txn); increment(count, 1);
final Increment increment = (Increment) message; final Increment increment = (Increment) message;
CompletionHandler countDown = new CompletionHandler() { Runnable countDown = new Runnable() {
public void handle(Txn.Status status) { public void run() {
increment.getLatch().countDown(); increment.getLatch().countDown();
} }
}; };
Stm.afterCompletion(countDown); afterRollback(countDown);
afterCommit(countDown);
} }
} }
@Override public boolean normally(Object message) { @Override public boolean normally(Object message) {
if ("GetCount".equals(message)) { if ("GetCount".equals(message)) {
getSender().tell(count.single().get()); getSender().tell(count.get());
return true; return true;
} else return false; } else return false;
} }

View file

@ -7,7 +7,7 @@ package akka.transactor;
import scala.concurrent.stm.InTxn; import scala.concurrent.stm.InTxn;
public class UntypedFailer extends UntypedTransactor { public class UntypedFailer extends UntypedTransactor {
public void atomically(InTxn txn, Object message) throws Exception { public void atomically(Object message) throws Exception {
throw new ExpectedFailureException(); throw new ExpectedFailureException();
} }
} }

View file

@ -450,7 +450,7 @@ object Dependency {
val Netty = "3.2.5.Final" val Netty = "3.2.5.Final"
val Protobuf = "2.4.1" val Protobuf = "2.4.1"
val Rabbit = "2.3.1" val Rabbit = "2.3.1"
val ScalaStm = "0.4" val ScalaStm = "0.5.0-SNAPSHOT"
val Scalatest = "1.6.1" val Scalatest = "1.6.1"
val Slf4j = "1.6.4" val Slf4j = "1.6.4"
val Spring = "3.0.5.RELEASE" val Spring = "3.0.5.RELEASE"