Merge branch '0deps', remote branch 'origin' into 0deps

This commit is contained in:
Jonas Bonér 2011-03-04 16:15:07 +01:00
commit c909bb28ea
8 changed files with 253 additions and 0 deletions

View file

@ -145,5 +145,36 @@ trait StmUtil {
}
}
/**
* Stm utility methods for using from Java.
*/
object StmUtils {
/**
* Schedule a deferred task on the thread local transaction (use within an atomic).
* This is executed when the transaction commits.
*/
def deferred(runnable: Runnable): Unit = MultiverseStmUtils.scheduleDeferredTask(runnable)
/**
* Schedule a compensating task on the thread local transaction (use within an atomic).
* This is executed when the transaction aborts.
*/
def compensating(runnable: Runnable): Unit = MultiverseStmUtils.scheduleCompensatingTask(runnable)
/**
* STM retry for blocking transactions (use within an atomic).
* Can be used to wait for a condition.
*/
def retry = MultiverseStmUtils.retry
}
/**
* Use EitherOrElse to combine two blocking transactions (from Java).
*/
abstract class EitherOrElse[T] extends OrElseTemplate[T] {
def either(mtx: MultiverseTransaction) = either
def orelse(mtx: MultiverseTransaction) = orElse
def either: T
def orElse: T
}

View file

@ -0,0 +1,15 @@
package akka.stm.example;
import akka.stm.*;
public class Branch {
public Ref<Integer> left;
public Ref<Integer> right;
public int amount;
public Branch(Ref<Integer> left, Ref<Integer> right, int amount) {
this.left = left;
this.right = right;
this.amount = amount;
}
}

View file

@ -0,0 +1,46 @@
package akka.stm.example;
import akka.stm.*;
import static akka.stm.StmUtils.retry;
import akka.actor.*;
import akka.util.FiniteDuration;
import java.util.concurrent.TimeUnit;
public class Brancher extends UntypedActor {
TransactionFactory txFactory = new TransactionFactoryBuilder()
.setBlockingAllowed(true)
.setTrackReads(true)
.setTimeout(new FiniteDuration(60, TimeUnit.SECONDS))
.build();
public void onReceive(Object message) throws Exception {
if (message instanceof Branch) {
Branch branch = (Branch) message;
final Ref<Integer> left = branch.left;
final Ref<Integer> right = branch.right;
final double amount = branch.amount;
new Atomic<Integer>(txFactory) {
public Integer atomically() {
return new EitherOrElse<Integer>() {
public Integer either() {
if (left.get() < amount) {
System.out.println("not enough on left - retrying");
retry();
}
System.out.println("going left");
return left.get();
}
public Integer orElse() {
if (right.get() < amount) {
System.out.println("not enough on right - retrying");
retry();
}
System.out.println("going right");
return right.get();
}
}.execute();
}
}.execute();
}
}
}

View file

@ -0,0 +1,27 @@
package akka.stm.example;
import akka.stm.*;
import akka.actor.*;
public class EitherOrElseExample {
public static void main(String[] args) {
System.out.println();
System.out.println("EitherOrElse example");
System.out.println();
final Ref<Integer> left = new Ref<Integer>(100);
final Ref<Integer> right = new Ref<Integer>(100);
ActorRef brancher = Actors.actorOf(Brancher.class).start();
brancher.sendOneWay(new Branch(left, right, 500));
new Atomic() {
public Object atomically() {
return right.set(right.get() + 1000);
}
}.execute();
brancher.stop();
}
}

View file

@ -0,0 +1,47 @@
package akka.stm.example;
import akka.stm.*;
import akka.actor.*;
public class RetryExample {
public static void main(String[] args) {
System.out.println();
System.out.println("Retry example");
System.out.println();
final Ref<Double> account1 = new Ref<Double>(100.0);
final Ref<Double> account2 = new Ref<Double>(100.0);
ActorRef transferer = Actors.actorOf(Transferer.class).start();
transferer.sendOneWay(new Transfer(account1, account2, 500.0));
// Transferer: not enough money - retrying
new Atomic() {
public Object atomically() {
return account1.set(account1.get() + 2000);
}
}.execute();
// Transferer: transferring
Double acc1 = new Atomic<Double>() {
public Double atomically() {
return account1.get();
}
}.execute();
Double acc2 = new Atomic<Double>() {
public Double atomically() {
return account2.get();
}
}.execute();
System.out.println("Account 1: " + acc1);
// Account 1: 1600.0
System.out.println("Account 2: " + acc2);
// Account 2: 600.0
transferer.stop();
}
}

View file

@ -0,0 +1,15 @@
package akka.stm.example;
import akka.stm.*;
public class Transfer {
public Ref<Double> from;
public Ref<Double> to;
public double amount;
public Transfer(Ref<Double> from, Ref<Double> to, double amount) {
this.from = from;
this.to = to;
this.amount = amount;
}
}

View file

@ -0,0 +1,36 @@
package akka.stm.example;
import akka.stm.*;
import static akka.stm.StmUtils.retry;
import akka.actor.*;
import akka.util.FiniteDuration;
import java.util.concurrent.TimeUnit;
public class Transferer extends UntypedActor {
TransactionFactory txFactory = new TransactionFactoryBuilder()
.setBlockingAllowed(true)
.setTrackReads(true)
.setTimeout(new FiniteDuration(60, TimeUnit.SECONDS))
.build();
public void onReceive(Object message) throws Exception {
if (message instanceof Transfer) {
Transfer transfer = (Transfer) message;
final Ref<Double> from = transfer.from;
final Ref<Double> to = transfer.to;
final double amount = transfer.amount;
new Atomic(txFactory) {
public Object atomically() {
if (from.get() < amount) {
System.out.println("Transferer: not enough money - retrying");
retry();
}
System.out.println("Transferer: transferring");
from.set(from.get() - amount);
to.set(to.get() + amount);
return null;
}
}.execute();
}
}
}

View file

@ -5,11 +5,14 @@ import org.junit.Test;
import org.junit.Before;
import akka.stm.*;
import static akka.stm.StmUtils.deferred;
import static akka.stm.StmUtils.compensating;
import org.multiverse.api.ThreadLocalTransaction;
import org.multiverse.api.TransactionConfiguration;
import org.multiverse.api.exceptions.ReadonlyException;
public class JavaStmTests {
private Ref<Integer> ref;
@ -87,4 +90,37 @@ public class JavaStmTests {
}
}.execute();
}
@Test public void deferredTask() {
final Ref<Boolean> enteredDeferred = new Ref<Boolean>(false);
new Atomic() {
public Object atomically() {
deferred(new Runnable() {
public void run() {
enteredDeferred.set(true);
}
});
return ref.set(3);
}
}.execute();
assertEquals(true, enteredDeferred.get());
}
@Test public void compensatingTask() {
final Ref<Boolean> enteredCompensating = new Ref<Boolean>(false);
try {
new Atomic() {
public Object atomically() {
compensating(new Runnable() {
public void run() {
enteredCompensating.set(true);
}
});
ref.set(3);
throw new RuntimeException();
}
}.execute();
} catch(RuntimeException e) {}
assertEquals(true, enteredCompensating.get());
}
}