diff --git a/akka-stm/src/main/scala/akka/stm/Stm.scala b/akka-stm/src/main/scala/akka/stm/Stm.scala index 17be90ff52..be511f0e2c 100644 --- a/akka-stm/src/main/scala/akka/stm/Stm.scala +++ b/akka-stm/src/main/scala/akka/stm/Stm.scala @@ -145,5 +145,36 @@ trait StmUtil { } } +/** + * Stm utility methods for using from Java. + */ +object StmUtils { + /** + * Schedule a deferred task on the thread local transaction (use within an atomic). + * This is executed when the transaction commits. + */ + def deferred(runnable: Runnable): Unit = MultiverseStmUtils.scheduleDeferredTask(runnable) + /** + * Schedule a compensating task on the thread local transaction (use within an atomic). + * This is executed when the transaction aborts. + */ + def compensating(runnable: Runnable): Unit = MultiverseStmUtils.scheduleCompensatingTask(runnable) + /** + * STM retry for blocking transactions (use within an atomic). + * Can be used to wait for a condition. + */ + def retry = MultiverseStmUtils.retry +} + +/** + * Use EitherOrElse to combine two blocking transactions (from Java). + */ +abstract class EitherOrElse[T] extends OrElseTemplate[T] { + def either(mtx: MultiverseTransaction) = either + def orelse(mtx: MultiverseTransaction) = orElse + + def either: T + def orElse: T +} diff --git a/akka-stm/src/test/java/akka/stm/example/Branch.java b/akka-stm/src/test/java/akka/stm/example/Branch.java new file mode 100644 index 0000000000..d62a6ecd16 --- /dev/null +++ b/akka-stm/src/test/java/akka/stm/example/Branch.java @@ -0,0 +1,15 @@ +package akka.stm.example; + +import akka.stm.*; + +public class Branch { + public Ref left; + public Ref right; + public int amount; + + public Branch(Ref left, Ref right, int amount) { + this.left = left; + this.right = right; + this.amount = amount; + } +} diff --git a/akka-stm/src/test/java/akka/stm/example/Brancher.java b/akka-stm/src/test/java/akka/stm/example/Brancher.java new file mode 100644 index 0000000000..5b26c58526 --- /dev/null +++ b/akka-stm/src/test/java/akka/stm/example/Brancher.java @@ -0,0 +1,46 @@ +package akka.stm.example; + +import akka.stm.*; +import static akka.stm.StmUtils.retry; +import akka.actor.*; +import akka.util.FiniteDuration; +import java.util.concurrent.TimeUnit; + +public class Brancher extends UntypedActor { + TransactionFactory txFactory = new TransactionFactoryBuilder() + .setBlockingAllowed(true) + .setTrackReads(true) + .setTimeout(new FiniteDuration(60, TimeUnit.SECONDS)) + .build(); + + public void onReceive(Object message) throws Exception { + if (message instanceof Branch) { + Branch branch = (Branch) message; + final Ref left = branch.left; + final Ref right = branch.right; + final double amount = branch.amount; + new Atomic(txFactory) { + public Integer atomically() { + return new EitherOrElse() { + public Integer either() { + if (left.get() < amount) { + System.out.println("not enough on left - retrying"); + retry(); + } + System.out.println("going left"); + return left.get(); + } + public Integer orElse() { + if (right.get() < amount) { + System.out.println("not enough on right - retrying"); + retry(); + } + System.out.println("going right"); + return right.get(); + } + }.execute(); + } + }.execute(); + } + } +} diff --git a/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java new file mode 100644 index 0000000000..0e0345b6af --- /dev/null +++ b/akka-stm/src/test/java/akka/stm/example/EitherOrElseExample.java @@ -0,0 +1,27 @@ +package akka.stm.example; + +import akka.stm.*; +import akka.actor.*; + +public class EitherOrElseExample { + public static void main(String[] args) { + System.out.println(); + System.out.println("EitherOrElse example"); + System.out.println(); + + final Ref left = new Ref(100); + final Ref right = new Ref(100); + + ActorRef brancher = Actors.actorOf(Brancher.class).start(); + + brancher.sendOneWay(new Branch(left, right, 500)); + + new Atomic() { + public Object atomically() { + return right.set(right.get() + 1000); + } + }.execute(); + + brancher.stop(); + } +} diff --git a/akka-stm/src/test/java/akka/stm/example/RetryExample.java b/akka-stm/src/test/java/akka/stm/example/RetryExample.java new file mode 100644 index 0000000000..6fe3e3f535 --- /dev/null +++ b/akka-stm/src/test/java/akka/stm/example/RetryExample.java @@ -0,0 +1,47 @@ +package akka.stm.example; + +import akka.stm.*; +import akka.actor.*; + +public class RetryExample { + public static void main(String[] args) { + System.out.println(); + System.out.println("Retry example"); + System.out.println(); + + final Ref account1 = new Ref(100.0); + final Ref account2 = new Ref(100.0); + + ActorRef transferer = Actors.actorOf(Transferer.class).start(); + + transferer.sendOneWay(new Transfer(account1, account2, 500.0)); + // Transferer: not enough money - retrying + + new Atomic() { + public Object atomically() { + return account1.set(account1.get() + 2000); + } + }.execute(); + // Transferer: transferring + + Double acc1 = new Atomic() { + public Double atomically() { + return account1.get(); + } + }.execute(); + + Double acc2 = new Atomic() { + public Double atomically() { + return account2.get(); + } + }.execute(); + + System.out.println("Account 1: " + acc1); + // Account 1: 1600.0 + + System.out.println("Account 2: " + acc2); + // Account 2: 600.0 + + transferer.stop(); + } +} diff --git a/akka-stm/src/test/java/akka/stm/example/Transfer.java b/akka-stm/src/test/java/akka/stm/example/Transfer.java new file mode 100644 index 0000000000..ad6cdbdd09 --- /dev/null +++ b/akka-stm/src/test/java/akka/stm/example/Transfer.java @@ -0,0 +1,15 @@ +package akka.stm.example; + +import akka.stm.*; + +public class Transfer { + public Ref from; + public Ref to; + public double amount; + + public Transfer(Ref from, Ref to, double amount) { + this.from = from; + this.to = to; + this.amount = amount; + } +} diff --git a/akka-stm/src/test/java/akka/stm/example/Transferer.java b/akka-stm/src/test/java/akka/stm/example/Transferer.java new file mode 100644 index 0000000000..a8c8b50ff8 --- /dev/null +++ b/akka-stm/src/test/java/akka/stm/example/Transferer.java @@ -0,0 +1,36 @@ +package akka.stm.example; + +import akka.stm.*; +import static akka.stm.StmUtils.retry; +import akka.actor.*; +import akka.util.FiniteDuration; +import java.util.concurrent.TimeUnit; + +public class Transferer extends UntypedActor { + TransactionFactory txFactory = new TransactionFactoryBuilder() + .setBlockingAllowed(true) + .setTrackReads(true) + .setTimeout(new FiniteDuration(60, TimeUnit.SECONDS)) + .build(); + + public void onReceive(Object message) throws Exception { + if (message instanceof Transfer) { + Transfer transfer = (Transfer) message; + final Ref from = transfer.from; + final Ref to = transfer.to; + final double amount = transfer.amount; + new Atomic(txFactory) { + public Object atomically() { + if (from.get() < amount) { + System.out.println("Transferer: not enough money - retrying"); + retry(); + } + System.out.println("Transferer: transferring"); + from.set(from.get() - amount); + to.set(to.get() + amount); + return null; + } + }.execute(); + } + } +} diff --git a/akka-stm/src/test/java/akka/stm/test/JavaStmTests.java b/akka-stm/src/test/java/akka/stm/test/JavaStmTests.java index 79547c1bb6..fecb80389c 100644 --- a/akka-stm/src/test/java/akka/stm/test/JavaStmTests.java +++ b/akka-stm/src/test/java/akka/stm/test/JavaStmTests.java @@ -5,11 +5,14 @@ import org.junit.Test; import org.junit.Before; import akka.stm.*; +import static akka.stm.StmUtils.deferred; +import static akka.stm.StmUtils.compensating; import org.multiverse.api.ThreadLocalTransaction; import org.multiverse.api.TransactionConfiguration; import org.multiverse.api.exceptions.ReadonlyException; + public class JavaStmTests { private Ref ref; @@ -87,4 +90,37 @@ public class JavaStmTests { } }.execute(); } + + @Test public void deferredTask() { + final Ref enteredDeferred = new Ref(false); + new Atomic() { + public Object atomically() { + deferred(new Runnable() { + public void run() { + enteredDeferred.set(true); + } + }); + return ref.set(3); + } + }.execute(); + assertEquals(true, enteredDeferred.get()); + } + + @Test public void compensatingTask() { + final Ref enteredCompensating = new Ref(false); + try { + new Atomic() { + public Object atomically() { + compensating(new Runnable() { + public void run() { + enteredCompensating.set(true); + } + }); + ref.set(3); + throw new RuntimeException(); + } + }.execute(); + } catch(RuntimeException e) {} + assertEquals(true, enteredCompensating.get()); + } }