Merge pull request #223 from jboner/java-api-for-scala-stm

Java API for ScalaSTM
This commit is contained in:
Peter Vlugter 2012-01-19 14:47:08 -08:00
commit b70feea2e2
15 changed files with 375 additions and 126 deletions

View file

@ -7,15 +7,11 @@ package akka.docs.transactor;
//#class
import akka.actor.*;
import akka.transactor.*;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
public class CoordinatedCounter extends UntypedActor {
private Ref<Integer> count = Stm.ref(0);
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
private Ref.View<Integer> count = Stm.newRef(0);
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
@ -26,14 +22,14 @@ public class CoordinatedCounter extends UntypedActor {
if (increment.hasFriend()) {
increment.getFriend().tell(coordinated.coordinate(new Increment()));
}
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
increment(txn);
coordinated.atomic(new Runnable() {
public void run() {
Stm.increment(count, 1);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
} else {
unhandled(incoming);
}

View file

@ -6,7 +6,6 @@ package akka.docs.transactor;
import akka.actor.*;
import akka.transactor.*;
import scala.concurrent.stm.*;
public class Coordinator extends UntypedActor {
public void onReceive(Object incoming) throws Exception {
@ -15,8 +14,8 @@ public class Coordinator extends UntypedActor {
Object message = coordinated.getMessage();
if (message instanceof Message) {
//#coordinated-atomic
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
coordinated.atomic(new Runnable() {
public void run() {
// do something in the coordinated transaction ...
}
});

View file

@ -6,21 +6,21 @@ package akka.docs.transactor;
//#class
import akka.transactor.*;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
public class Counter extends UntypedTransactor {
Ref<Integer> count = Stm.ref(0);
Ref.View<Integer> count = Stm.newRef(0);
public void atomically(InTxn txn, Object message) {
public void atomically(Object message) {
if (message instanceof Increment) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
Stm.increment(count, 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
return true;
} else return false;
}

View file

@ -8,10 +8,11 @@ package akka.docs.transactor;
import akka.actor.*;
import akka.transactor.*;
import java.util.Set;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
public class FriendlyCounter extends UntypedTransactor {
Ref<Integer> count = Stm.ref(0);
Ref.View<Integer> count = Stm.newRef(0);
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
@ -22,16 +23,15 @@ public class FriendlyCounter extends UntypedTransactor {
return nobody();
}
public void atomically(InTxn txn, Object message) {
public void atomically(Object message) {
if (message instanceof Increment) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
Stm.increment(count, 1);
}
}
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
return true;
} else return false;
}

View file

@ -102,7 +102,7 @@ be sent.
:language: java
To enter the coordinated transaction use the atomic method of the coordinated
object, passing in an ``akka.transactor.Atomically`` object.
object, passing in a ``java.lang.Runnable``.
.. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic
:language: java

View file

@ -1,67 +0,0 @@
/**
* Copyright (C) 2009-2012 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.transactor
import scala.concurrent.stm._
/**
* Java API.
*
* For creating Java-friendly coordinated atomic blocks.
*
* @see [[akka.transactor.Coordinated]]
*/
trait Atomically {
def atomically(txn: InTxn): Unit
}
/**
* Java API.
*
* For creating completion handlers.
*/
trait CompletionHandler {
def handle(status: Txn.Status): Unit
}
/**
* Java API.
*
* To ease some of the pain of using Scala STM from Java until
* the proper Java API is created.
*/
object Stm {
/**
* Create an STM Ref with an initial value.
*/
def ref[A](initialValue: A): Ref[A] = Ref(initialValue)
/**
* Add a CompletionHandler to run after the current transaction
* has committed.
*/
def afterCommit(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status handler.handle(status))(txn.get)
}
/**
* Add a CompletionHandler to run after the current transaction
* has rolled back.
*/
def afterRollback(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterRollback(status handler.handle(status))(txn.get)
}
/**
* Add a CompletionHandler to run after the current transaction
* has committed or rolled back.
*/
def afterCompletion(handler: CompletionHandler): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCompletion(status handler.handle(status))(txn.get)
}
}

View file

@ -6,7 +6,8 @@ package akka.transactor
import akka.AkkaException
import akka.util.Timeout
import scala.concurrent.stm._
import scala.concurrent.stm.{ CommitBarrier, InTxn }
import java.util.concurrent.Callable
/**
* Akka-specific exception for coordinated transactions.
@ -125,7 +126,7 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[T](body: InTxn T): T = {
def atomic[A](body: InTxn A): A = {
member.atomic(body) match {
case Right(result) result
case Left(CommitBarrier.MemberUncaughtExceptionCause(x))
@ -136,13 +137,22 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) {
}
/**
* Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]].
* Java API: coordinated atomic method that accepts a `java.lang.Runnable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic(atomically: Atomically): Unit = atomic(txn atomically.atomically(txn))
def atomic(runnable: Runnable): Unit = atomic { _ runnable.run }
/**
* Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`.
* Delimits the coordinated transaction. The transaction will wait for all other transactions
* in this coordination before committing. The timeout is specified when creating the Coordinated.
*
* @throws CoordinatedTransactionException if the coordinated transaction fails.
*/
def atomic[A](callable: Callable[A]): A = atomic { _ callable.call }
/**
* An empty coordinated atomic block. Can be used to complete the number of members involved

View file

@ -25,7 +25,7 @@ abstract class UntypedTransactor extends UntypedActor {
sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message)))
}
before(message)
coordinated.atomic { txn atomically(txn, message) }
coordinated.atomic { txn atomically(message) }
after(message)
}
case message {
@ -84,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor {
* The Receive block to run inside the coordinated transaction.
*/
@throws(classOf[Exception])
def atomically(txn: InTxn, message: Any) {}
def atomically(message: Any)
/**
* A Receive block that runs after the coordinated transaction.

View file

@ -0,0 +1,147 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm.japi
import java.util.concurrent.Callable
import java.util.{ List JList, Map JMap, Set JSet }
import scala.collection.JavaConversions
import scala.concurrent.stm
import scala.concurrent.stm._
import scala.runtime.AbstractFunction1
/**
* Java-friendly API for ScalaSTM.
* These methods can also be statically imported.
*/
object Stm {
/**
* Create a Ref with an initial value. Return a `Ref.View`, which does not
* require implicit transactions.
* @param initialValue the initial value for the newly created `Ref.View`
* @return a new `Ref.View`
*/
def newRef[A](initialValue: A): Ref.View[A] = Ref(initialValue).single
/**
* Create an empty TMap. Return a `TMap.View`, which does not require
* implicit transactions. See newMap for included java conversion.
* @return a new, empty `TMap.View`
*/
def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single
/**
* Create an empty TMap. Return a `java.util.Map` view of this TMap.
* @return a new, empty `TMap.View` wrapped as a `java.util.Map`.
*/
def newMap[A, B](): JMap[A, B] = JavaConversions.mutableMapAsJavaMap(newTMap[A, B])
/**
* Create an empty TSet. Return a `TSet.View`, which does not require
* implicit transactions. See newSet for included java conversion.
* @return a new, empty `TSet.View`
*/
def newTSet[A](): TSet.View[A] = TSet.empty[A].single
/**
* Create an empty TSet. Return a `java.util.Set` view of this TSet.
* @return a new, empty `TSet.View` wrapped as a `java.util.Set`.
*/
def newSet[A](): JSet[A] = JavaConversions.mutableSetAsJavaSet(newTSet[A])
/**
* Create a TArray containing `length` elements. Return a `TArray.View`,
* which does not require implicit transactions. See newList for included
* java conversion.
* @param length the length of the `TArray.View` to be created
* @return a new `TArray.View` containing `length` elements (initially null)
*/
def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single
/**
* Create an empty TArray. Return a `java.util.List` view of this Array.
* @param length the length of the `TArray.View` to be created
* @return a new, empty `TArray.View` wrapped as a `java.util.List`.
*/
def newList[A <: AnyRef](length: Int): JList[A] = JavaConversions.mutableSeqAsJavaList(newTArray[A](length))
/**
* Atomic block that takes a `Runnable`.
* @param runnable the `Runnable` to run within a transaction
*/
def atomic(runnable: Runnable): Unit = stm.atomic { txn runnable.run }
/**
* Atomic block that takes a `Callable`.
* @param callable the `Callable` to run within a transaction
* @return the value returned by the `Callable`
*/
def atomic[A](callable: Callable[A]): A = stm.atomic { txn callable.call }
/**
* Transform the value stored by `ref` by applying the function `f`.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
*/
def transform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): Unit = ref.transform(f)
/**
* Transform the value stored by `ref` by applying the function `f` and
* return the old value.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
* @return the old value of `ref`
*/
def getAndTransform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.getAndTransform(f)
/**
* Transform the value stored by `ref` by applying the function `f` and
* return the new value.
* @param ref the `Ref.View` to be transformed
* @param f the function to be applied
* @return the new value of `ref`
*/
def transformAndGet[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.transformAndGet(f)
/**
* Increment the `java.lang.Integer` value of a `Ref.View`.
* @param ref the `Ref.View<Integer>` to be incremented
* @param delta the amount to increment
*/
def increment(ref: Ref.View[java.lang.Integer], delta: Int): Unit = ref.transform { v v.intValue + delta }
/**
* Increment the `java.lang.Long` value of a `Ref.View`.
* @param ref the `Ref.View<Long>` to be incremented
* @param delta the amount to increment
*/
def increment(ref: Ref.View[java.lang.Long], delta: Long): Unit = ref.transform { v v.longValue + delta }
/**
* Add a task to run after the current transaction has committed.
* @param task the `Runnable` task to run after transaction commit
*/
def afterCommit(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCommit(status task.run)(txn.get)
}
/**
* Add a task to run after the current transaction has rolled back.
* @param task the `Runnable` task to run after transaction rollback
*/
def afterRollback(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterRollback(status task.run)(txn.get)
}
/**
* Add a task to run after the current transaction has either rolled back
* or committed.
* @param task the `Runnable` task to run after transaction completion
*/
def afterCompletion(task: Runnable): Unit = {
val txn = Txn.findCurrent
if (txn.isDefined) Txn.afterCompletion(status task.run)(txn.get)
}
}

View file

@ -7,24 +7,20 @@ package akka.transactor;
import akka.actor.ActorRef;
import akka.actor.Actors;
import akka.actor.UntypedActor;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
public class UntypedCoordinatedCounter extends UntypedActor {
private String name;
private Ref<Integer> count = Stm.ref(0);
private Ref.View<Integer> count = Stm.newRef(0);
public UntypedCoordinatedCounter(String name) {
this.name = name;
}
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
public void onReceive(Object incoming) throws Exception {
if (incoming instanceof Coordinated) {
Coordinated coordinated = (Coordinated) incoming;
@ -33,8 +29,8 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment increment = (Increment) message;
List<ActorRef> friends = increment.getFriends();
final CountDownLatch latch = increment.getLatch();
final CompletionHandler countDown = new CompletionHandler() {
public void handle(Txn.Status status) {
final Runnable countDown = new Runnable() {
public void run() {
latch.countDown();
}
};
@ -42,15 +38,15 @@ public class UntypedCoordinatedCounter extends UntypedActor {
Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch);
friends.get(0).tell(coordinated.coordinate(coordMessage));
}
coordinated.atomic(new Atomically() {
public void atomically(InTxn txn) {
increment(txn);
coordinated.atomic(new Runnable() {
public void run() {
Stm.increment(count, 1);
Stm.afterCompletion(countDown);
}
});
}
} else if ("GetCount".equals(incoming)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
}
}
}

View file

@ -7,7 +7,8 @@ package akka.transactor;
import akka.actor.ActorRef;
import akka.transactor.UntypedTransactor;
import akka.transactor.SendTo;
import scala.concurrent.stm.*;
import scala.concurrent.stm.Ref;
import scala.concurrent.stm.japi.Stm;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
@ -15,17 +16,12 @@ import java.util.concurrent.TimeUnit;
public class UntypedCounter extends UntypedTransactor {
private String name;
private Ref<Integer> count = Stm.ref(0);
private Ref.View<Integer> count = Stm.newRef(0);
public UntypedCounter(String name) {
this.name = name;
}
private void increment(InTxn txn) {
Integer newValue = count.get(txn) + 1;
count.set(newValue, txn);
}
@Override public Set<SendTo> coordinate(Object message) {
if (message instanceof Increment) {
Increment increment = (Increment) message;
@ -41,12 +37,12 @@ public class UntypedCounter extends UntypedTransactor {
}
}
public void atomically(InTxn txn, Object message) {
public void atomically(Object message) {
if (message instanceof Increment) {
increment(txn);
Stm.increment(count, 1);
final Increment increment = (Increment) message;
CompletionHandler countDown = new CompletionHandler() {
public void handle(Txn.Status status) {
Runnable countDown = new Runnable() {
public void run() {
increment.getLatch().countDown();
}
};
@ -56,7 +52,7 @@ public class UntypedCounter extends UntypedTransactor {
@Override public boolean normally(Object message) {
if ("GetCount".equals(message)) {
getSender().tell(count.single().get());
getSender().tell(count.get());
return true;
} else return false;
}

View file

@ -7,7 +7,7 @@ package akka.transactor;
import scala.concurrent.stm.InTxn;
public class UntypedFailer extends UntypedTransactor {
public void atomically(InTxn txn, Object message) throws Exception {
public void atomically(Object message) throws Exception {
throw new ExpectedFailureException();
}
}

View file

@ -0,0 +1,156 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm;
import static org.junit.Assert.*;
import org.junit.Test;
import scala.concurrent.stm.japi.Stm;
import static scala.concurrent.stm.japi.Stm.*;
import scala.runtime.AbstractFunction1;
import java.util.concurrent.Callable;
import java.util.Map;
import java.util.Set;
import java.util.List;
public class JavaAPITests {
@Test
public void createIntegerRef() {
Ref.View<Integer> ref = newRef(0);
int unboxed = ref.get();
assertEquals(0, unboxed);
}
@Test
public void atomicWithRunnable() {
final Ref.View<Integer> ref = newRef(0);
atomic(new Runnable() {
public void run() {
ref.set(10);
}
});
int value = ref.get();
assertEquals(10, value);
}
@Test
public void atomicWithCallable() {
final Ref.View<Integer> ref = newRef(0);
int oldValue = atomic(new Callable<Integer>() {
public Integer call() {
return ref.swap(10);
}
});
assertEquals(0, oldValue);
int newValue = ref.get();
assertEquals(10, newValue);
}
@Test(expected = TestException.class)
public void failingTransaction() {
final Ref.View<Integer> ref = newRef(0);
try {
atomic(new Runnable() {
public void run() {
ref.set(10);
throw new TestException();
}
});
} catch (TestException e) {
int value = ref.get();
assertEquals(0, value);
throw e;
}
}
@Test
public void transformInteger() {
Ref.View<Integer> ref = newRef(0);
transform(ref, new AbstractFunction1<Integer, Integer>() {
public Integer apply(Integer i) {
return i + 10;
}
});
int value = ref.get();
assertEquals(10, value);
}
@Test
public void incrementInteger() {
Ref.View<Integer> ref = newRef(0);
increment(ref, 10);
int value = ref.get();
assertEquals(10, value);
}
@Test
public void incrementLong() {
Ref.View<Long> ref = newRef(0L);
increment(ref, 10L);
long value = ref.get();
assertEquals(10L, value);
}
@Test
public void createAndUseTMap() {
Map<Integer, String> map = newMap();
map.put(1, "one");
map.put(2, "two");
assertEquals("one", map.get(1));
assertEquals("two", map.get(2));
assertTrue(map.containsKey(2));
map.remove(2);
assertFalse(map.containsKey(2));
}
@Test(expected = TestException.class)
public void failingTMapTransaction() {
final Map<Integer, String> map = newMap();
try {
atomic(new Runnable() {
public void run() {
map.put(1, "one");
map.put(2, "two");
assertTrue(map.containsKey(1));
assertTrue(map.containsKey(2));
throw new TestException();
}
});
} catch (TestException e) {
assertFalse(map.containsKey(1));
assertFalse(map.containsKey(2));
throw e;
}
}
@Test
public void createAndUseTSet() {
Set<String> set = newSet();
set.add("one");
set.add("two");
assertTrue(set.contains("one"));
assertTrue(set.contains("two"));
assertEquals(2, set.size());
set.add("one");
assertEquals(2, set.size());
set.remove("two");
assertFalse(set.contains("two"));
assertEquals(1, set.size());
}
@Test
public void createAndUseTArray() {
List<String> list = newList(3);
assertEquals(null, list.get(0));
assertEquals(null, list.get(1));
assertEquals(null, list.get(2));
list.set(0, "zero");
list.set(1, "one");
list.set(2, "two");
assertEquals("zero", list.get(0));
assertEquals("one", list.get(1));
assertEquals("two", list.get(2));
}
}

View file

@ -0,0 +1,9 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm;
public class TestException extends RuntimeException {
public TestException() {
super("Expected failure");
}
}

View file

@ -0,0 +1,7 @@
/* scala-stm - (c) 2009-2011, Stanford University, PPL */
package scala.concurrent.stm
import org.scalatest.junit.JUnitWrapperSuite
class JavaAPISuite extends JUnitWrapperSuite("scala.concurrent.stm.JavaAPITests", Thread.currentThread.getContextClassLoader)