Update Java STM API to include STM utils
This commit is contained in:
parent
ec84822675
commit
f8082436dd
8 changed files with 253 additions and 0 deletions
|
|
@ -145,5 +145,36 @@ trait StmUtil {
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Stm utility methods for using from Java.
|
||||
*/
|
||||
object StmUtils {
|
||||
/**
|
||||
* Schedule a deferred task on the thread local transaction (use within an atomic).
|
||||
* This is executed when the transaction commits.
|
||||
*/
|
||||
def deferred(runnable: Runnable): Unit = MultiverseStmUtils.scheduleDeferredTask(runnable)
|
||||
|
||||
/**
|
||||
* Schedule a compensating task on the thread local transaction (use within an atomic).
|
||||
* This is executed when the transaction aborts.
|
||||
*/
|
||||
def compensating(runnable: Runnable): Unit = MultiverseStmUtils.scheduleCompensatingTask(runnable)
|
||||
|
||||
/**
|
||||
* STM retry for blocking transactions (use within an atomic).
|
||||
* Can be used to wait for a condition.
|
||||
*/
|
||||
def retry = MultiverseStmUtils.retry
|
||||
}
|
||||
|
||||
/**
|
||||
* Use EitherOrElse to combine two blocking transactions (from Java).
|
||||
*/
|
||||
abstract class EitherOrElse[T] extends OrElseTemplate[T] {
|
||||
def either(mtx: MultiverseTransaction) = either
|
||||
def orelse(mtx: MultiverseTransaction) = orElse
|
||||
|
||||
def either: T
|
||||
def orElse: T
|
||||
}
|
||||
|
|
|
|||
15
akka-stm/src/test/java/akka/stm/example/Branch.java
Normal file
15
akka-stm/src/test/java/akka/stm/example/Branch.java
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
package akka.stm.example;
|
||||
|
||||
import akka.stm.*;
|
||||
|
||||
public class Branch {
|
||||
public Ref<Integer> left;
|
||||
public Ref<Integer> right;
|
||||
public int amount;
|
||||
|
||||
public Branch(Ref<Integer> left, Ref<Integer> right, int amount) {
|
||||
this.left = left;
|
||||
this.right = right;
|
||||
this.amount = amount;
|
||||
}
|
||||
}
|
||||
46
akka-stm/src/test/java/akka/stm/example/Brancher.java
Normal file
46
akka-stm/src/test/java/akka/stm/example/Brancher.java
Normal file
|
|
@ -0,0 +1,46 @@
|
|||
package akka.stm.example;
|
||||
|
||||
import akka.stm.*;
|
||||
import static akka.stm.StmUtils.retry;
|
||||
import akka.actor.*;
|
||||
import akka.util.FiniteDuration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Brancher extends UntypedActor {
|
||||
TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||
.setBlockingAllowed(true)
|
||||
.setTrackReads(true)
|
||||
.setTimeout(new FiniteDuration(60, TimeUnit.SECONDS))
|
||||
.build();
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Branch) {
|
||||
Branch branch = (Branch) message;
|
||||
final Ref<Integer> left = branch.left;
|
||||
final Ref<Integer> right = branch.right;
|
||||
final double amount = branch.amount;
|
||||
new Atomic<Integer>(txFactory) {
|
||||
public Integer atomically() {
|
||||
return new EitherOrElse<Integer>() {
|
||||
public Integer either() {
|
||||
if (left.get() < amount) {
|
||||
System.out.println("not enough on left - retrying");
|
||||
retry();
|
||||
}
|
||||
System.out.println("going left");
|
||||
return left.get();
|
||||
}
|
||||
public Integer orElse() {
|
||||
if (right.get() < amount) {
|
||||
System.out.println("not enough on right - retrying");
|
||||
retry();
|
||||
}
|
||||
System.out.println("going right");
|
||||
return right.get();
|
||||
}
|
||||
}.execute();
|
||||
}
|
||||
}.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -0,0 +1,27 @@
|
|||
package akka.stm.example;
|
||||
|
||||
import akka.stm.*;
|
||||
import akka.actor.*;
|
||||
|
||||
public class EitherOrElseExample {
|
||||
public static void main(String[] args) {
|
||||
System.out.println();
|
||||
System.out.println("EitherOrElse example");
|
||||
System.out.println();
|
||||
|
||||
final Ref<Integer> left = new Ref<Integer>(100);
|
||||
final Ref<Integer> right = new Ref<Integer>(100);
|
||||
|
||||
ActorRef brancher = Actors.actorOf(Brancher.class).start();
|
||||
|
||||
brancher.sendOneWay(new Branch(left, right, 500));
|
||||
|
||||
new Atomic() {
|
||||
public Object atomically() {
|
||||
return right.set(right.get() + 1000);
|
||||
}
|
||||
}.execute();
|
||||
|
||||
brancher.stop();
|
||||
}
|
||||
}
|
||||
47
akka-stm/src/test/java/akka/stm/example/RetryExample.java
Normal file
47
akka-stm/src/test/java/akka/stm/example/RetryExample.java
Normal file
|
|
@ -0,0 +1,47 @@
|
|||
package akka.stm.example;
|
||||
|
||||
import akka.stm.*;
|
||||
import akka.actor.*;
|
||||
|
||||
public class RetryExample {
|
||||
public static void main(String[] args) {
|
||||
System.out.println();
|
||||
System.out.println("Retry example");
|
||||
System.out.println();
|
||||
|
||||
final Ref<Double> account1 = new Ref<Double>(100.0);
|
||||
final Ref<Double> account2 = new Ref<Double>(100.0);
|
||||
|
||||
ActorRef transferer = Actors.actorOf(Transferer.class).start();
|
||||
|
||||
transferer.sendOneWay(new Transfer(account1, account2, 500.0));
|
||||
// Transferer: not enough money - retrying
|
||||
|
||||
new Atomic() {
|
||||
public Object atomically() {
|
||||
return account1.set(account1.get() + 2000);
|
||||
}
|
||||
}.execute();
|
||||
// Transferer: transferring
|
||||
|
||||
Double acc1 = new Atomic<Double>() {
|
||||
public Double atomically() {
|
||||
return account1.get();
|
||||
}
|
||||
}.execute();
|
||||
|
||||
Double acc2 = new Atomic<Double>() {
|
||||
public Double atomically() {
|
||||
return account2.get();
|
||||
}
|
||||
}.execute();
|
||||
|
||||
System.out.println("Account 1: " + acc1);
|
||||
// Account 1: 1600.0
|
||||
|
||||
System.out.println("Account 2: " + acc2);
|
||||
// Account 2: 600.0
|
||||
|
||||
transferer.stop();
|
||||
}
|
||||
}
|
||||
15
akka-stm/src/test/java/akka/stm/example/Transfer.java
Normal file
15
akka-stm/src/test/java/akka/stm/example/Transfer.java
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
package akka.stm.example;
|
||||
|
||||
import akka.stm.*;
|
||||
|
||||
public class Transfer {
|
||||
public Ref<Double> from;
|
||||
public Ref<Double> to;
|
||||
public double amount;
|
||||
|
||||
public Transfer(Ref<Double> from, Ref<Double> to, double amount) {
|
||||
this.from = from;
|
||||
this.to = to;
|
||||
this.amount = amount;
|
||||
}
|
||||
}
|
||||
36
akka-stm/src/test/java/akka/stm/example/Transferer.java
Normal file
36
akka-stm/src/test/java/akka/stm/example/Transferer.java
Normal file
|
|
@ -0,0 +1,36 @@
|
|||
package akka.stm.example;
|
||||
|
||||
import akka.stm.*;
|
||||
import static akka.stm.StmUtils.retry;
|
||||
import akka.actor.*;
|
||||
import akka.util.FiniteDuration;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class Transferer extends UntypedActor {
|
||||
TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||
.setBlockingAllowed(true)
|
||||
.setTrackReads(true)
|
||||
.setTimeout(new FiniteDuration(60, TimeUnit.SECONDS))
|
||||
.build();
|
||||
|
||||
public void onReceive(Object message) throws Exception {
|
||||
if (message instanceof Transfer) {
|
||||
Transfer transfer = (Transfer) message;
|
||||
final Ref<Double> from = transfer.from;
|
||||
final Ref<Double> to = transfer.to;
|
||||
final double amount = transfer.amount;
|
||||
new Atomic(txFactory) {
|
||||
public Object atomically() {
|
||||
if (from.get() < amount) {
|
||||
System.out.println("Transferer: not enough money - retrying");
|
||||
retry();
|
||||
}
|
||||
System.out.println("Transferer: transferring");
|
||||
from.set(from.get() - amount);
|
||||
to.set(to.get() + amount);
|
||||
return null;
|
||||
}
|
||||
}.execute();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -5,11 +5,14 @@ import org.junit.Test;
|
|||
import org.junit.Before;
|
||||
|
||||
import akka.stm.*;
|
||||
import static akka.stm.StmUtils.deferred;
|
||||
import static akka.stm.StmUtils.compensating;
|
||||
|
||||
import org.multiverse.api.ThreadLocalTransaction;
|
||||
import org.multiverse.api.TransactionConfiguration;
|
||||
import org.multiverse.api.exceptions.ReadonlyException;
|
||||
|
||||
|
||||
public class JavaStmTests {
|
||||
|
||||
private Ref<Integer> ref;
|
||||
|
|
@ -87,4 +90,37 @@ public class JavaStmTests {
|
|||
}
|
||||
}.execute();
|
||||
}
|
||||
|
||||
@Test public void deferredTask() {
|
||||
final Ref<Boolean> enteredDeferred = new Ref<Boolean>(false);
|
||||
new Atomic() {
|
||||
public Object atomically() {
|
||||
deferred(new Runnable() {
|
||||
public void run() {
|
||||
enteredDeferred.set(true);
|
||||
}
|
||||
});
|
||||
return ref.set(3);
|
||||
}
|
||||
}.execute();
|
||||
assertEquals(true, enteredDeferred.get());
|
||||
}
|
||||
|
||||
@Test public void compensatingTask() {
|
||||
final Ref<Boolean> enteredCompensating = new Ref<Boolean>(false);
|
||||
try {
|
||||
new Atomic() {
|
||||
public Object atomically() {
|
||||
compensating(new Runnable() {
|
||||
public void run() {
|
||||
enteredCompensating.set(true);
|
||||
}
|
||||
});
|
||||
ref.set(3);
|
||||
throw new RuntimeException();
|
||||
}
|
||||
}.execute();
|
||||
} catch(RuntimeException e) {}
|
||||
assertEquals(true, enteredCompensating.get());
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue