From 2757869c62d1bd879f23dae1facd942e106aee7b Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 18 Jan 2012 15:59:59 +1300 Subject: [PATCH 1/6] Update to new Java API for Scala STM --- .../docs/transactor/CoordinatedCounter.java | 18 ++--- .../akka/docs/transactor/Coordinator.java | 5 +- .../code/akka/docs/transactor/Counter.java | 12 ++-- .../akka/docs/transactor/FriendlyCounter.java | 12 ++-- akka-docs/java/transactors.rst | 2 +- .../scala/akka/transactor/Atomically.scala | 67 ------------------- .../scala/akka/transactor/Coordinated.scala | 18 +++-- .../akka/transactor/UntypedTransactor.scala | 4 +- .../transactor/UntypedCoordinatedCounter.java | 25 +++---- .../java/akka/transactor/UntypedCounter.java | 23 +++---- .../java/akka/transactor/UntypedFailer.java | 2 +- project/AkkaBuild.scala | 2 +- 12 files changed, 61 insertions(+), 129 deletions(-) delete mode 100644 akka-transactor/src/main/scala/akka/transactor/Atomically.scala diff --git a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java index dca10b8984..f17e86ade0 100644 --- a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java @@ -7,15 +7,11 @@ package akka.docs.transactor; //#class import akka.actor.*; import akka.transactor.*; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import static scala.concurrent.stm.JavaAPI.*; public class CoordinatedCounter extends UntypedActor { - private Ref count = Stm.ref(0); - - private void increment(InTxn txn) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); - } + private Ref.View count = newRef(0); public void onReceive(Object incoming) throws Exception { if (incoming instanceof Coordinated) { @@ -26,14 +22,14 @@ public class CoordinatedCounter extends UntypedActor { if (increment.hasFriend()) { increment.getFriend().tell(coordinated.coordinate(new Increment())); } - coordinated.atomic(new Atomically() { - public void atomically(InTxn txn) { - increment(txn); + coordinated.atomic(new Runnable() { + public void run() { + increment(count, 1); } }); } } else if ("GetCount".equals(incoming)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); } else { unhandled(incoming); } diff --git a/akka-docs/java/code/akka/docs/transactor/Coordinator.java b/akka-docs/java/code/akka/docs/transactor/Coordinator.java index 6854ed99f6..195906f5f6 100644 --- a/akka-docs/java/code/akka/docs/transactor/Coordinator.java +++ b/akka-docs/java/code/akka/docs/transactor/Coordinator.java @@ -6,7 +6,6 @@ package akka.docs.transactor; import akka.actor.*; import akka.transactor.*; -import scala.concurrent.stm.*; public class Coordinator extends UntypedActor { public void onReceive(Object incoming) throws Exception { @@ -15,8 +14,8 @@ public class Coordinator extends UntypedActor { Object message = coordinated.getMessage(); if (message instanceof Message) { //#coordinated-atomic - coordinated.atomic(new Atomically() { - public void atomically(InTxn txn) { + coordinated.atomic(new Runnable() { + public void run() { // do something in the coordinated transaction ... } }); diff --git a/akka-docs/java/code/akka/docs/transactor/Counter.java b/akka-docs/java/code/akka/docs/transactor/Counter.java index 0a6b7b2219..efe2aaed72 100644 --- a/akka-docs/java/code/akka/docs/transactor/Counter.java +++ b/akka-docs/java/code/akka/docs/transactor/Counter.java @@ -6,21 +6,21 @@ package akka.docs.transactor; //#class import akka.transactor.*; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import static scala.concurrent.stm.JavaAPI.*; public class Counter extends UntypedTransactor { - Ref count = Stm.ref(0); + Ref.View count = newRef(0); - public void atomically(InTxn txn, Object message) { + public void atomically(Object message) { if (message instanceof Increment) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); + increment(count, 1); } } @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java index d70c653063..7ef31c5bea 100644 --- a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java @@ -8,10 +8,11 @@ package akka.docs.transactor; import akka.actor.*; import akka.transactor.*; import java.util.Set; -import scala.concurrent.stm.*; +import scala.concurrent.stm.Ref; +import static scala.concurrent.stm.JavaAPI.*; public class FriendlyCounter extends UntypedTransactor { - Ref count = Stm.ref(0); + Ref.View count = newRef(0); @Override public Set coordinate(Object message) { if (message instanceof Increment) { @@ -22,16 +23,15 @@ public class FriendlyCounter extends UntypedTransactor { return nobody(); } - public void atomically(InTxn txn, Object message) { + public void atomically(Object message) { if (message instanceof Increment) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); + increment(count, 1); } } @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-docs/java/transactors.rst b/akka-docs/java/transactors.rst index f7471412a9..9dd69664b6 100644 --- a/akka-docs/java/transactors.rst +++ b/akka-docs/java/transactors.rst @@ -102,7 +102,7 @@ be sent. :language: java To enter the coordinated transaction use the atomic method of the coordinated -object, passing in an ``akka.transactor.Atomically`` object. +object, passing in a ``java.lang.Runnable``. .. includecode:: code/akka/docs/transactor/Coordinator.java#coordinated-atomic :language: java diff --git a/akka-transactor/src/main/scala/akka/transactor/Atomically.scala b/akka-transactor/src/main/scala/akka/transactor/Atomically.scala deleted file mode 100644 index 4995a6b8bd..0000000000 --- a/akka-transactor/src/main/scala/akka/transactor/Atomically.scala +++ /dev/null @@ -1,67 +0,0 @@ -/** - * Copyright (C) 2009-2011 Typesafe Inc. - */ - -package akka.transactor - -import scala.concurrent.stm._ - -/** - * Java API. - * - * For creating Java-friendly coordinated atomic blocks. - * - * @see [[akka.transactor.Coordinated]] - */ -trait Atomically { - def atomically(txn: InTxn): Unit -} - -/** - * Java API. - * - * For creating completion handlers. - */ -trait CompletionHandler { - def handle(status: Txn.Status): Unit -} - -/** - * Java API. - * - * To ease some of the pain of using Scala STM from Java until - * the proper Java API is created. - */ -object Stm { - /** - * Create an STM Ref with an initial value. - */ - def ref[A](initialValue: A): Ref[A] = Ref(initialValue) - - /** - * Add a CompletionHandler to run after the current transaction - * has committed. - */ - def afterCommit(handler: CompletionHandler): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCommit(status ⇒ handler.handle(status))(txn.get) - } - - /** - * Add a CompletionHandler to run after the current transaction - * has rolled back. - */ - def afterRollback(handler: CompletionHandler): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterRollback(status ⇒ handler.handle(status))(txn.get) - } - - /** - * Add a CompletionHandler to run after the current transaction - * has committed or rolled back. - */ - def afterCompletion(handler: CompletionHandler): Unit = { - val txn = Txn.findCurrent - if (txn.isDefined) Txn.afterCompletion(status ⇒ handler.handle(status))(txn.get) - } -} diff --git a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala index f9ef8538be..a7c709b9fe 100644 --- a/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala +++ b/akka-transactor/src/main/scala/akka/transactor/Coordinated.scala @@ -6,7 +6,8 @@ package akka.transactor import akka.AkkaException import akka.util.Timeout -import scala.concurrent.stm._ +import scala.concurrent.stm.{ CommitBarrier, InTxn } +import java.util.concurrent.Callable /** * Akka-specific exception for coordinated transactions. @@ -125,7 +126,7 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) { * * @throws CoordinatedTransactionException if the coordinated transaction fails. */ - def atomic[T](body: InTxn ⇒ T): T = { + def atomic[A](body: InTxn ⇒ A): A = { member.atomic(body) match { case Right(result) ⇒ result case Left(CommitBarrier.MemberUncaughtExceptionCause(x)) ⇒ @@ -136,13 +137,22 @@ class Coordinated(val message: Any, member: CommitBarrier.Member) { } /** - * Java API: coordinated atomic method that accepts an [[akka.transactor.Atomically]]. + * Java API: coordinated atomic method that accepts a `java.lang.Runnable`. * Delimits the coordinated transaction. The transaction will wait for all other transactions * in this coordination before committing. The timeout is specified when creating the Coordinated. * * @throws CoordinatedTransactionException if the coordinated transaction fails. */ - def atomic(atomically: Atomically): Unit = atomic(txn ⇒ atomically.atomically(txn)) + def atomic(runnable: Runnable): Unit = atomic { _ ⇒ runnable.run } + + /** + * Java API: coordinated atomic method that accepts a `java.util.concurrent.Callable`. + * Delimits the coordinated transaction. The transaction will wait for all other transactions + * in this coordination before committing. The timeout is specified when creating the Coordinated. + * + * @throws CoordinatedTransactionException if the coordinated transaction fails. + */ + def atomic[A](callable: Callable[A]): A = atomic { _ ⇒ callable.call } /** * An empty coordinated atomic block. Can be used to complete the number of members involved diff --git a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala index 9a37f81915..59dc8f049d 100644 --- a/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala +++ b/akka-transactor/src/main/scala/akka/transactor/UntypedTransactor.scala @@ -25,7 +25,7 @@ abstract class UntypedTransactor extends UntypedActor { sendTo.actor.tell(coordinated(sendTo.message.getOrElse(message))) } before(message) - coordinated.atomic { txn ⇒ atomically(txn, message) } + coordinated.atomic { txn ⇒ atomically(message) } after(message) } case message ⇒ { @@ -84,7 +84,7 @@ abstract class UntypedTransactor extends UntypedActor { * The Receive block to run inside the coordinated transaction. */ @throws(classOf[Exception]) - def atomically(txn: InTxn, message: Any) {} + def atomically(message: Any) /** * A Receive block that runs after the coordinated transaction. diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java index 694a675d8e..7c92930e02 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java @@ -7,24 +7,20 @@ package akka.transactor; import akka.actor.ActorRef; import akka.actor.Actors; import akka.actor.UntypedActor; -import scala.concurrent.stm.*; +import static scala.concurrent.stm.JavaAPI.*; +import scala.concurrent.stm.Ref; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class UntypedCoordinatedCounter extends UntypedActor { private String name; - private Ref count = Stm.ref(0); + private Ref.View count = newRef(0); public UntypedCoordinatedCounter(String name) { this.name = name; } - private void increment(InTxn txn) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); - } - public void onReceive(Object incoming) throws Exception { if (incoming instanceof Coordinated) { Coordinated coordinated = (Coordinated) incoming; @@ -33,8 +29,8 @@ public class UntypedCoordinatedCounter extends UntypedActor { Increment increment = (Increment) message; List friends = increment.getFriends(); final CountDownLatch latch = increment.getLatch(); - final CompletionHandler countDown = new CompletionHandler() { - public void handle(Txn.Status status) { + final Runnable countDown = new Runnable() { + public void run() { latch.countDown(); } }; @@ -42,15 +38,16 @@ public class UntypedCoordinatedCounter extends UntypedActor { Increment coordMessage = new Increment(friends.subList(1, friends.size()), latch); friends.get(0).tell(coordinated.coordinate(coordMessage)); } - coordinated.atomic(new Atomically() { - public void atomically(InTxn txn) { - increment(txn); - Stm.afterCompletion(countDown); + coordinated.atomic(new Runnable() { + public void run() { + increment(count, 1); + afterRollback(countDown); + afterCommit(countDown); } }); } } else if ("GetCount".equals(incoming)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); } } } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java index f03f74b10f..392bfbca42 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java @@ -7,7 +7,8 @@ package akka.transactor; import akka.actor.ActorRef; import akka.transactor.UntypedTransactor; import akka.transactor.SendTo; -import scala.concurrent.stm.*; +import static scala.concurrent.stm.JavaAPI.*; +import scala.concurrent.stm.Ref; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -15,17 +16,12 @@ import java.util.concurrent.TimeUnit; public class UntypedCounter extends UntypedTransactor { private String name; - private Ref count = Stm.ref(0); + private Ref.View count = newRef(0); public UntypedCounter(String name) { this.name = name; } - private void increment(InTxn txn) { - Integer newValue = count.get(txn) + 1; - count.set(newValue, txn); - } - @Override public Set coordinate(Object message) { if (message instanceof Increment) { Increment increment = (Increment) message; @@ -41,22 +37,23 @@ public class UntypedCounter extends UntypedTransactor { } } - public void atomically(InTxn txn, Object message) { + public void atomically(Object message) { if (message instanceof Increment) { - increment(txn); + increment(count, 1); final Increment increment = (Increment) message; - CompletionHandler countDown = new CompletionHandler() { - public void handle(Txn.Status status) { + Runnable countDown = new Runnable() { + public void run() { increment.getLatch().countDown(); } }; - Stm.afterCompletion(countDown); + afterRollback(countDown); + afterCommit(countDown); } } @Override public boolean normally(Object message) { if ("GetCount".equals(message)) { - getSender().tell(count.single().get()); + getSender().tell(count.get()); return true; } else return false; } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java index 1f9e6ff41c..8ead9ae2ea 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedFailer.java @@ -7,7 +7,7 @@ package akka.transactor; import scala.concurrent.stm.InTxn; public class UntypedFailer extends UntypedTransactor { - public void atomically(InTxn txn, Object message) throws Exception { + public void atomically(Object message) throws Exception { throw new ExpectedFailureException(); } } diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 091346de34..626ede1834 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -450,7 +450,7 @@ object Dependency { val Netty = "3.2.5.Final" val Protobuf = "2.4.1" val Rabbit = "2.3.1" - val ScalaStm = "0.4" + val ScalaStm = "0.5.0-SNAPSHOT" val Scalatest = "1.6.1" val Slf4j = "1.6.4" val Spring = "3.0.5.RELEASE" From 20587654852144aadc824ea25cbd202081c6e9db Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Thu, 19 Jan 2012 11:09:03 +1300 Subject: [PATCH 2/6] Add basic java api for scala stm to transactor module Note: this commit will be reverted once a java api is published for scala stm. Adding to transactor module for M3 release. --- .../scala/scala/concurrent/stm/JavaAPI.scala | 112 ++++++++++++ .../scala/concurrent/stm/JavaAPITests.java | 161 ++++++++++++++++++ .../scala/concurrent/stm/TestException.java | 9 + .../scala/concurrent/stm/JavaAPISuite.scala | 7 + project/AkkaBuild.scala | 2 +- 5 files changed, 290 insertions(+), 1 deletion(-) create mode 100644 akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala create mode 100644 akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java create mode 100644 akka-transactor/src/test/java/scala/concurrent/stm/TestException.java create mode 100644 akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala diff --git a/akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala b/akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala new file mode 100644 index 0000000000..964664fe55 --- /dev/null +++ b/akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala @@ -0,0 +1,112 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm + +import java.util.concurrent.Callable +import scala.runtime.AbstractFunction1 + +/** + * Java-friendly API. + */ +object JavaAPI { + + /** + * Create a Ref with an initial value. Return a `Ref.View`, which does not + * require implicit transactions. + * @param initialValue the initial value for the newly created `Ref.View` + * @return a new `Ref.View` + */ + def newRef[A](initialValue: A): Ref.View[A] = Ref(initialValue).single + + /** + * Create an empty TMap. Return a `TMap.View`, which does not require + * implicit transactions. + * @return a new, empty `TMap.View` + */ + def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single + + /** + * Create an empty TSet. Return a `TSet.View`, which does not require + * implicit transactions. + * @return a new, empty `TSet.View` + */ + def newTSet[A](): TSet.View[A] = TSet.empty[A].single + + /** + * Create a TArray containing `length` elements. Return a `TArray.View`, + * which does not require implicit transactions. + * @param length the length of the `TArray.View` to be created + * @return a new `TArray.View` containing `length` elements (initially null) + */ + def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single + + /** + * Atomic block that takes a `Runnable`. + * @param runnable the `Runnable` to run within a transaction + */ + def atomic(runnable: Runnable): Unit = scala.concurrent.stm.atomic { txn ⇒ runnable.run } + + /** + * Atomic block that takes a `Callable`. + * @param callable the `Callable` to run within a transaction + * @return the value returned by the `Callable` + */ + def atomic[A](callable: Callable[A]): A = scala.concurrent.stm.atomic { txn ⇒ callable.call } + + /** + * Transform the value stored by `ref` by applying the function `f`. + * @param ref the `Ref.View` to be transformed + * @param f the function to be applied + */ + def transform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): Unit = ref.transform(f) + + /** + * Transform the value stored by `ref` by applying the function `f` and + * return the old value. + * @param ref the `Ref.View` to be transformed + * @param f the function to be applied + * @return the old value of `ref` + */ + def getAndTransform[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.getAndTransform(f) + + /** + * Transform the value stored by `ref` by applying the function `f` and + * return the new value. + * @param ref the `Ref.View` to be transformed + * @param f the function to be applied + * @return the new value of `ref` + */ + def transformAndGet[A](ref: Ref.View[A], f: AbstractFunction1[A, A]): A = ref.transformAndGet(f) + + /** + * Increment the `java.lang.Integer` value of a `Ref.View`. + * @param ref the `Ref.View` to be incremented + * @param delta the amount to increment + */ + def increment(ref: Ref.View[java.lang.Integer], delta: Int): Unit = ref.transform { v ⇒ v.intValue + delta } + + /** + * Increment the `java.lang.Long` value of a `Ref.View`. + * @param ref the `Ref.View` to be incremented + * @param delta the amount to increment + */ + def increment(ref: Ref.View[java.lang.Long], delta: Long): Unit = ref.transform { v ⇒ v.longValue + delta } + + /** + * Add a task to run after the current transaction has committed. + * @param task the `Runnable` task to run after transaction commit + */ + def afterCommit(task: Runnable): Unit = { + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterCommit(status ⇒ task.run)(txn.get) + } + + /** + * Add a task to run after the current transaction has rolled back. + * @param task the `Runnable` task to run after transaction rollback + */ + def afterRollback(task: Runnable): Unit = { + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterRollback(status ⇒ task.run)(txn.get) + } +} diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java new file mode 100644 index 0000000000..e2d0631590 --- /dev/null +++ b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java @@ -0,0 +1,161 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm; + +import static org.junit.Assert.*; +import org.junit.Test; + +import scala.concurrent.stm.Ref; +import static scala.concurrent.stm.JavaAPI.*; + +import scala.runtime.AbstractFunction1; +import java.util.concurrent.Callable; + +import static scala.collection.JavaConversions.*; +import java.util.Map; +import java.util.Set; +import java.util.List; + +public class JavaAPITests { + @Test + public void createIntegerRef() { + Ref.View ref = newRef(0); + int unboxed = ref.get(); + assertEquals(0, unboxed); + } + + @Test + public void atomicWithRunnable() { + final Ref.View ref = newRef(0); + atomic(new Runnable() { + public void run() { + ref.set(10); + } + }); + int value = ref.get(); + assertEquals(10, value); + } + + @Test + public void atomicWithCallable() { + final Ref.View ref = newRef(0); + int oldValue = atomic(new Callable() { + public Integer call() { + return ref.swap(10); + } + }); + assertEquals(0, oldValue); + int newValue = ref.get(); + assertEquals(10, newValue); + } + + @Test(expected = TestException.class) + public void failingTransaction() { + final Ref.View ref = newRef(0); + try { + atomic(new Runnable() { + public void run() { + ref.set(10); + throw new TestException(); + } + }); + } catch (TestException e) { + int value = ref.get(); + assertEquals(0, value); + throw e; + } + } + + @Test + public void transformInteger() { + Ref.View ref = newRef(0); + transform(ref, new AbstractFunction1() { + public Integer apply(Integer i) { + return i + 10; + } + }); + int value = ref.get(); + assertEquals(10, value); + } + + @Test + public void incrementInteger() { + Ref.View ref = newRef(0); + increment(ref, 10); + int value = ref.get(); + assertEquals(10, value); + } + + @Test + public void incrementLong() { + Ref.View ref = newRef(0L); + increment(ref, 10L); + long value = ref.get(); + assertEquals(10L, value); + } + + @Test + public void createAndUseTMap() { + TMap.View tmap = newTMap(); + Map map = mutableMapAsJavaMap(tmap); + map.put(1, "one"); + map.put(2, "two"); + assertEquals("one", map.get(1)); + assertEquals("two", map.get(2)); + assertTrue(map.containsKey(2)); + map.remove(2); + assertFalse(map.containsKey(2)); + } + + @Test(expected = TestException.class) + public void failingTMapTransaction() { + TMap.View tmap = newTMap(); + final Map map = mutableMapAsJavaMap(tmap); + try { + atomic(new Runnable() { + public void run() { + map.put(1, "one"); + map.put(2, "two"); + assertTrue(map.containsKey(1)); + assertTrue(map.containsKey(2)); + throw new TestException(); + } + }); + } catch (TestException e) { + assertFalse(map.containsKey(1)); + assertFalse(map.containsKey(2)); + throw e; + } + } + + @Test + public void createAndUseTSet() { + TSet.View tset = newTSet(); + Set set = mutableSetAsJavaSet(tset); + set.add("one"); + set.add("two"); + assertTrue(set.contains("one")); + assertTrue(set.contains("two")); + assertEquals(2, set.size()); + set.add("one"); + assertEquals(2, set.size()); + set.remove("two"); + assertFalse(set.contains("two")); + assertEquals(1, set.size()); + } + + @Test + public void createAndUseTArray() { + TArray.View tarray = newTArray(3); + List seq = mutableSeqAsJavaList(tarray); + assertEquals(null, seq.get(0)); + assertEquals(null, seq.get(1)); + assertEquals(null, seq.get(2)); + seq.set(0, "zero"); + seq.set(1, "one"); + seq.set(2, "two"); + assertEquals("zero", seq.get(0)); + assertEquals("one", seq.get(1)); + assertEquals("two", seq.get(2)); + } +} diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java b/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java new file mode 100644 index 0000000000..cc810761d4 --- /dev/null +++ b/akka-transactor/src/test/java/scala/concurrent/stm/TestException.java @@ -0,0 +1,9 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm; + +public class TestException extends RuntimeException { + public TestException() { + super("Expected failure"); + } +} diff --git a/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala b/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala new file mode 100644 index 0000000000..3d0c48e90f --- /dev/null +++ b/akka-transactor/src/test/scala/scala/concurrent/stm/JavaAPISuite.scala @@ -0,0 +1,7 @@ +/* scala-stm - (c) 2009-2011, Stanford University, PPL */ + +package scala.concurrent.stm + +import org.scalatest.junit.JUnitWrapperSuite + +class JavaAPISuite extends JUnitWrapperSuite("scala.concurrent.stm.JavaAPITests", Thread.currentThread.getContextClassLoader) diff --git a/project/AkkaBuild.scala b/project/AkkaBuild.scala index 626ede1834..091346de34 100644 --- a/project/AkkaBuild.scala +++ b/project/AkkaBuild.scala @@ -450,7 +450,7 @@ object Dependency { val Netty = "3.2.5.Final" val Protobuf = "2.4.1" val Rabbit = "2.3.1" - val ScalaStm = "0.5.0-SNAPSHOT" + val ScalaStm = "0.4" val Scalatest = "1.6.1" val Slf4j = "1.6.4" val Spring = "3.0.5.RELEASE" From b9bbb0744a06a7529046fa10f41f743f03a6dbb4 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Wed, 18 Jan 2012 19:01:46 +1300 Subject: [PATCH 3/6] Add some migration guidance for stm --- .../project/migration-guide-1.3.x-2.0.x.rst | 280 +++++++++++++++++- 1 file changed, 266 insertions(+), 14 deletions(-) diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 353e0c0ddb..2749f0107c 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -6,7 +6,9 @@ .. sidebar:: Contents - .. contents:: :local: + .. contents:: + :local: + :depth: 3 Actors ====== @@ -77,8 +79,11 @@ Last task of the migration would be to create your own ``ActorSystem``. Unordered Collection of Migration Items ======================================= +Actors +------ + Creating and starting actors ----------------------------- +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ Actors are created by passing in a ``Props`` instance into the actorOf factory method in a ``ActorRefProvider``, which is the ``ActorSystem`` or ``ActorContext``. @@ -111,7 +116,7 @@ Documentation: * :ref:`untyped-actors-java` Stopping actors ---------------- +^^^^^^^^^^^^^^^ ``ActorRef.stop()`` has been moved. Use ``ActorSystem`` or ``ActorContext`` to stop actors. @@ -144,7 +149,7 @@ Documentation: * :ref:`untyped-actors-java` Identifying Actors ------------------- +^^^^^^^^^^^^^^^^^^ In v1.3 actors have ``uuid`` and ``id`` field. In v2.0 each actor has a unique logical ``path``. @@ -167,7 +172,7 @@ Documentation: * :ref:`untyped-actors-java` Reply to messages ------------------ +^^^^^^^^^^^^^^^^^ ``self.channel`` has been replaced with unified reply mechanism using ``sender`` (Scala) or ``getSender()`` (Java). This works for both tell (!) and ask (?). @@ -189,7 +194,7 @@ Documentation: * :ref:`untyped-actors-java` ``ActorRef.ask()`` ------------------- +^^^^^^^^^^^^^^^^^^ The mechanism for collecting an actor’s reply in a :class:`Future` has been reworked for better location transparency: it uses an actor under the hood. @@ -206,7 +211,7 @@ Documentation: * :ref:`untyped-actors-java` ActorPool ---------- +^^^^^^^^^ The ActorPool has been replaced by dynamically resizable routers. @@ -216,7 +221,7 @@ Documentation: * :ref:`routing-java` ``UntypedActor.getContext()`` (Java API only) ---------------------------------------------- +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ ``getContext()`` in the Java API for UntypedActor is renamed to ``getSelf()``. @@ -234,7 +239,7 @@ Documentation: * :ref:`untyped-actors-java` Logging -------- +^^^^^^^ EventHandler API has been replaced by LoggingAdapter, which publish log messages to the event bus. You can still plugin your own actor as event listener with the @@ -267,7 +272,7 @@ Documentation: * :ref:`event-bus-java` Supervision ------------ +^^^^^^^^^^^ Akka v2.0 implements parental supervision. Actors can only be created by other actors — where the top-level actor is provided by the library — and each created actor is supervised by its parent. @@ -343,7 +348,7 @@ Documentation: * :ref:`untyped-actors-java` Spawn ------ +^^^^^ ``spawn`` has been removed and can be implemented like this, if needed. Be careful to not access any shared mutable state closed over by the body. @@ -359,7 +364,7 @@ Documentation: * :ref:`jmm` HotSwap -------- +^^^^^^^ In v2.0 ``become`` and ``unbecome`` metods are located in ``ActorContext``, i.e. ``context.become`` and ``context.unbecome``. @@ -370,15 +375,262 @@ in the actor receiving the message. * :ref:`actors-scala` * :ref:`untyped-actors-java` +STM +--- + +In Akka v2.0 `ScalaSTM`_ is used rather than Multiverse. + +.. _ScalaSTM: http://nbronson.github.com/scala-stm/ + +Agent and Transactor have been ported to ScalaSTM. The API's for Agent and +Transactor are basically the same, other than integration with ScalaSTM. See: + + * :ref:`agents-scala` + * :ref:`agents-java` + * :ref:`transactors-scala` + * :ref:`transactors-java` + +Imports +^^^^^^^ + +Scala +~~~~~ + +To use ScalaSTM the import from Scala is:: + + import scala.concurrent.stm._ + +Java +~~~~ + +For Java there is a special JavaAPI helper object that can be statically +imported, along with any other imports that might be needed:: + + import scala.concurrent.stm.Ref; + import static scala.concurrent.stm.JavaAPI.*; + +Transactions +^^^^^^^^^^^^ + +Scala +~~~~~ + +Both v1.3 and v2.0 provide an ``atomic`` block, however, the ScalaSTM ``atomic`` +is a function from ``InTxn`` to return type. + +v1.3:: + + atomic { + // do something in transaction + } + +v2.0:: + + atomic { implicit txn => + // do something in transaction + } + +Note that in ScalaSTM the ``InTxn`` in the atomic function is usually marked as +implicit as transactional references require an implicit ``InTxn`` on all +methods. That is, the transaction is statically required and it is a +compile-time warning to use a reference without a transaction. There is also a +``Ref.View`` for operations without requiring an ``InTxn`` statically. See below +for more information. + +Java +~~~~ + +In the ScalaSTM JavaAPI helpers there are atomic methods which accept +``java.lang.Runnable`` and ``java.util.concurrent.Callable``. + +v1.3:: + + new Atomic() { + public Object atomically() { + // in transaction + return null; + } + }.execute(); + + SomeObject result = new Atomic() { + public SomeObject atomically() { + // in transaction + return ...; + } + }.execute(); + +v2.0:: + + import static scala.concurrent.stm.JavaAPI.*; + import java.util.concurrent.Callable; + + atomic(new Runnable() { + public void run() { + // in transaction + } + }); + + SomeObject result = atomic(new Callable() { + public SomeObject call() { + // in transaction + return ...; + } + }); + +Ref +^^^ + +Scala +~~~~~ + +Other than the import, creating a Ref is basically identical between Akka STM in +v1.3 and ScalaSTM used in v2.0. + +v1.3:: + + val ref = Ref(0) + +v2.0:: + + val ref = Ref(0) + +The API for Ref is similar. For example: + +v1.3:: + + ref.get // get current value + ref() // same as get + + ref.set(1) // set to new value, return old value + ref() = 1 // same as set + ref.swap(2) // same as set + + ref alter { _ + 1 } // apply a function, return new value + +v2.0:: + + ref.get // get current value + ref() // same as get + + ref.set(1) // set to new value, return nothing + ref() = 1 // same as set + ref.swap(2) // set and return old value + + ref transform { _ + 1 } // apply function, return nothing + + ref transformIfDefined { case 1 => 2 } // apply partial function if defined + +Ref.View +^^^^^^^^ + +In v1.3 using a ``Ref`` method outside of a transaction would automatically +create a single-operation transaction. In v2.0 (in ScalaSTM) there is a +``Ref.View`` which provides methods without requiring a current +transaction. + +Scala +~~~~~ + +The ``Ref.View`` can be accessed with the ``single`` method:: + + ref.single() // returns current value + ref.single() = 1 // set new value + + // with atomic this would be: + + atomic { implicit t => ref() } + atomic { implicit t => ref() = 1 } + +Java +~~~~ + +As ``Ref.View`` in ScalaSTM does not require implicit transactions, this is more +easily used from Java. ``Ref`` could be used, but requires explicit threading of +transactions. There are helper methods in ``JavaAPI`` for creating ``Ref.View`` +references. + +v1.3:: + + Ref ref = new Ref(0); + +v2.0:: + + Ref.View ref = newRef(0); + +The ``set`` and ``get`` methods work the same way for both versions. + +v1.3:: + + ref.get(); // get current value + ref.set(1); // set new value + +v2.0:: + + ref.get(); // get current value + ref.set(1); // set new value + +There are also ``transform``, ``getAndTransform``, and ``transformAndGet`` +methods in ``JavaAPI`` which accept ``scala.runtime.AbstractFunction1``. + +There are ``increment`` helper methods for ``Ref.View`` and +``Ref.View`` references. + +Transaction lifecycle callbacks +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +Scala +~~~~~ + +It is also possible to hook into the transaction lifecycle in ScalaSTM. See the +ScalaSTM documentation for the full range of possibilities. + +v1.3:: + + atomic { + deferred { + // executes when transaction commits + } + compensating { + // executes when transaction aborts + } + } + +v2.0:: + + atomic { implicit txn => + txn.afterCommit { txnStatus => + // executes when transaction commits + } + txn.afterRollback { txnStatus => + // executes when transaction rolls back + } + } + +Java +~~~~ + +Rather than using the ``deferred`` and ``compensating`` methods in +``akka.stm.StmUtils``, use the ``afterCommit`` and ``afterRollback`` methods in +``scala.concurrent.stm.JavaAPI``, which behave in the same way and accept +``Runnable``. + +Transactional Datastructures +^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +In ScalaSTM see ``TMap``, ``TSet``, and ``TArray`` for transactional +datastructures. There are helper methods for creating these in +``JavaAPI``. These datastructure implement the ``scala.collection`` interfaces +and can also be used from Java with Scala's ``JavaConversions``. + + More to be written ------------------ * Futures * Dispatchers -* STM * TypedActors * Routing * Remoting * Scheduler * Configuration -* ...? \ No newline at end of file +* ...? From 6db3e59ce1e8f034083994f5e5549e2822633a50 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 19 Jan 2012 15:06:58 +0100 Subject: [PATCH 4/6] Restructuring ActiveRemoteClient, moving to a shared NioClientChannelFactory, making the timer optionally a daemon, switching to channel groups --- .../remote/netty/NettyRemoteSupport.scala | 89 +++++++++++-------- 1 file changed, 51 insertions(+), 38 deletions(-) diff --git a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala index ea240858d9..db58b6b23d 100644 --- a/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala +++ b/akka-remote/src/main/scala/akka/remote/netty/NettyRemoteSupport.scala @@ -9,8 +9,7 @@ import akka.remote._ import RemoteProtocol._ import akka.util._ import org.jboss.netty.channel.group.{ DefaultChannelGroup, ChannelGroup, ChannelGroupFuture } -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory -import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.channel.socket.nio.{ NioServerSocketChannelFactory, NioClientSocketChannelFactory } import org.jboss.netty.bootstrap.{ ServerBootstrap, ClientBootstrap } import org.jboss.netty.handler.codec.frame.{ LengthFieldBasedFrameDecoder, LengthFieldPrepender } import org.jboss.netty.handler.codec.protobuf.{ ProtobufDecoder, ProtobufEncoder } @@ -121,6 +120,7 @@ class PassiveRemoteClient(val currentChannel: Channel, class ActiveRemoteClient private[akka] ( remoteSupport: NettyRemoteSupport, remoteAddress: RemoteNettyAddress, + localAddress: RemoteSystemAddress[ParsedTransportAddress], val loader: Option[ClassLoader] = None) extends RemoteClient(remoteSupport, remoteAddress) { @@ -132,7 +132,11 @@ class ActiveRemoteClient private[akka] ( @volatile private var bootstrap: ClientBootstrap = _ @volatile - private[remote] var connection: ChannelFuture = _ + private var connection: ChannelFuture = _ + @volatile + private[remote] var openChannels: DefaultChannelGroup = _ + @volatile + private var executionHandler: ExecutionHandler = _ @volatile private var reconnectionTimeWindowStart = 0L @@ -141,10 +145,6 @@ class ActiveRemoteClient private[akka] ( def currentChannel = connection.getChannel - private val senderRemoteAddress = remoteSupport.remote.remoteAddress - @volatile - private var executionHandler: ExecutionHandler = _ - /** * Connect to remote server. */ @@ -154,9 +154,9 @@ class ActiveRemoteClient private[akka] ( val handshake = RemoteControlProtocol.newBuilder.setCommandType(CommandType.CONNECT) if (SecureCookie.nonEmpty) handshake.setCookie(SecureCookie.get) handshake.setOrigin(RemoteProtocol.AddressProtocol.newBuilder - .setSystem(senderRemoteAddress.system) - .setHostname(senderRemoteAddress.transport.host) - .setPort(senderRemoteAddress.transport.port) + .setSystem(localAddress.system) + .setHostname(localAddress.transport.host) + .setPort(localAddress.transport.port) .build) connection.getChannel.write(remoteSupport.createControlEnvelope(handshake.build)) } @@ -164,7 +164,7 @@ class ActiveRemoteClient private[akka] ( def attemptReconnect(): Boolean = { log.debug("Remote client reconnecting to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -176,11 +176,11 @@ class ActiveRemoteClient private[akka] ( } runSwitch switchOn { + openChannels = new DefaultDisposableChannelGroup(classOf[RemoteClient].getName) + executionHandler = new ExecutionHandler(remoteSupport.executor) - bootstrap = new ClientBootstrap(new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(remoteSupport.threadFactory), - Executors.newCachedThreadPool(remoteSupport.threadFactory))) + bootstrap = new ClientBootstrap(remoteSupport.clientChannelFactory) bootstrap.setPipelineFactory(new ActiveRemoteClientPipelineFactory(name, bootstrap, executionHandler, remoteAddress, this)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -188,7 +188,8 @@ class ActiveRemoteClient private[akka] ( log.debug("Starting remote client connection to [{}]", remoteAddress) connection = bootstrap.connect(new InetSocketAddress(remoteAddress.ip.get, remoteAddress.port)) - connection.awaitUninterruptibly.getChannel // Wait until the connection attempt succeeds or fails. + + openChannels.add(connection.awaitUninterruptibly.getChannel) // Wait until the connection attempt succeeds or fails. if (!connection.isSuccess) { notifyListeners(RemoteClientError(connection.getCause, remoteSupport, remoteAddress)) @@ -202,6 +203,7 @@ class ActiveRemoteClient private[akka] ( case true ⇒ true case false if reconnectIfAlreadyConnected ⇒ connection.getChannel.close() + openChannels.remove(connection.getChannel) log.debug("Remote client reconnecting to [{}]", remoteAddress) attemptReconnect() @@ -219,13 +221,11 @@ class ActiveRemoteClient private[akka] ( if ((connection ne null) && (connection.getChannel ne null)) connection.getChannel.close() } finally { - connection = null - executionHandler = null - //Do not do this: executionHandler.releaseExternalResources(), since it's shutting down the shared threadpool try { - bootstrap.releaseExternalResources() + if (openChannels ne null) openChannels.close.awaitUninterruptibly() } finally { - bootstrap = null + connection = null + executionHandler = null } } @@ -324,7 +324,10 @@ class ActiveRemoteClientHandler( if (client.isWithinReconnectionTimeWindow) { timer.newTimeout(new TimerTask() { def run(timeout: Timeout) = - if (client.isRunning) client.connect(reconnectIfAlreadyConnected = true) + if (client.isRunning) { + client.openChannels.remove(event.getChannel) + client.connect(reconnectIfAlreadyConnected = true) + } }, client.remoteSupport.clientSettings.ReconnectDelay.toMillis, TimeUnit.MILLISECONDS) } else runOnceNow { client.remoteSupport.shutdownClientConnection(remoteAddress) // spawn in another thread @@ -369,8 +372,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre val serverSettings = remote.remoteSettings.serverSettings val clientSettings = remote.remoteSettings.clientSettings + val threadFactory = new MonitorableThreadFactory("NettyRemoteSupport", remote.remoteSettings.Daemonic) - val timer: HashedWheelTimer = new HashedWheelTimer + val timer: HashedWheelTimer = new HashedWheelTimer(threadFactory) + val executor = new OrderedMemoryAwareThreadPoolExecutor( serverSettings.ExecutionPoolSize, serverSettings.MaxChannelMemorySize, @@ -379,6 +384,10 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre serverSettings.ExecutionPoolKeepAlive.unit, threadFactory) + val clientChannelFactory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool(threadFactory), + Executors.newCachedThreadPool(threadFactory)) + private val remoteClients = new HashMap[RemoteNettyAddress, RemoteClient] private val clientsLock = new ReentrantReadWriteLock @@ -411,7 +420,7 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre //Recheck for addition, race between upgrades case Some(client) ⇒ client //If already populated by other writer case None ⇒ //Populate map - val client = new ActiveRemoteClient(this, recipientAddress, loader) + val client = new ActiveRemoteClient(this, recipientAddress, remote.remoteAddress, loader) client.connect() remoteClients += recipientAddress -> client client @@ -479,26 +488,20 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre /** * Server section */ - private[akka] val currentServer = new AtomicReference[Option[NettyRemoteServer]](None) + @volatile + private var currentServer: NettyRemoteServer = _ - def name = currentServer.get match { - case Some(server) ⇒ server.name - case None ⇒ remote.remoteAddress.toString + def name = currentServer match { + case null ⇒ remote.remoteAddress.toString + case server ⇒ server.name } private val _isRunning = new Switch(false) def isRunning = _isRunning.isOn - def start(loader: Option[ClassLoader] = None): Unit = { - _isRunning switchOn { - try { - currentServer.set(Some(new NettyRemoteServer(this, loader, address))) - } catch { - case e: Exception ⇒ notifyListeners(RemoteServerError(e, this)) - } - } - } + def start(loader: Option[ClassLoader] = None): Unit = + _isRunning switchOn { currentServer = new NettyRemoteServer(this, loader, address) } /** * Common section @@ -512,9 +515,19 @@ class NettyRemoteSupport(_system: ActorSystemImpl, val remote: Remote, val addre } finally { clientsLock.writeLock().unlock() try { - currentServer.getAndSet(None) foreach { _.shutdown() } + val s = currentServer + currentServer = null + s.shutdown() } finally { - try { timer.stop() } finally { executor.shutdown() } + try { + timer.stop() + } finally { + try { + clientChannelFactory.releaseExternalResources() + } finally { + executor.shutdown() + } + } } } } From 27da7c4d128243ca41025fad24bce5ab19e8e87c Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 20 Jan 2012 11:31:28 +1300 Subject: [PATCH 5/6] Update java-friendly api for scala stm - move to japi.Stm - add newMap, newSet, newList methods with java conversions - add afterCompletion lifecycle callback --- .../docs/transactor/CoordinatedCounter.java | 6 +-- .../code/akka/docs/transactor/Counter.java | 6 +-- .../akka/docs/transactor/FriendlyCounter.java | 6 +-- .../stm/{JavaAPI.scala => japi/Stm.scala} | 51 ++++++++++++++++--- .../transactor/UntypedCoordinatedCounter.java | 9 ++-- .../java/akka/transactor/UntypedCounter.java | 9 ++-- .../scala/concurrent/stm/JavaAPITests.java | 35 ++++++------- 7 files changed, 75 insertions(+), 47 deletions(-) rename akka-transactor/src/main/scala/scala/concurrent/stm/{JavaAPI.scala => japi/Stm.scala} (66%) diff --git a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java index f17e86ade0..a00d26ed88 100644 --- a/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/CoordinatedCounter.java @@ -8,10 +8,10 @@ package akka.docs.transactor; import akka.actor.*; import akka.transactor.*; import scala.concurrent.stm.Ref; -import static scala.concurrent.stm.JavaAPI.*; +import scala.concurrent.stm.japi.Stm; public class CoordinatedCounter extends UntypedActor { - private Ref.View count = newRef(0); + private Ref.View count = Stm.newRef(0); public void onReceive(Object incoming) throws Exception { if (incoming instanceof Coordinated) { @@ -24,7 +24,7 @@ public class CoordinatedCounter extends UntypedActor { } coordinated.atomic(new Runnable() { public void run() { - increment(count, 1); + Stm.increment(count, 1); } }); } diff --git a/akka-docs/java/code/akka/docs/transactor/Counter.java b/akka-docs/java/code/akka/docs/transactor/Counter.java index efe2aaed72..acd0d8f516 100644 --- a/akka-docs/java/code/akka/docs/transactor/Counter.java +++ b/akka-docs/java/code/akka/docs/transactor/Counter.java @@ -7,14 +7,14 @@ package akka.docs.transactor; //#class import akka.transactor.*; import scala.concurrent.stm.Ref; -import static scala.concurrent.stm.JavaAPI.*; +import scala.concurrent.stm.japi.Stm; public class Counter extends UntypedTransactor { - Ref.View count = newRef(0); + Ref.View count = Stm.newRef(0); public void atomically(Object message) { if (message instanceof Increment) { - increment(count, 1); + Stm.increment(count, 1); } } diff --git a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java index 7ef31c5bea..fe3d759539 100644 --- a/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java +++ b/akka-docs/java/code/akka/docs/transactor/FriendlyCounter.java @@ -9,10 +9,10 @@ import akka.actor.*; import akka.transactor.*; import java.util.Set; import scala.concurrent.stm.Ref; -import static scala.concurrent.stm.JavaAPI.*; +import scala.concurrent.stm.japi.Stm; public class FriendlyCounter extends UntypedTransactor { - Ref.View count = newRef(0); + Ref.View count = Stm.newRef(0); @Override public Set coordinate(Object message) { if (message instanceof Increment) { @@ -25,7 +25,7 @@ public class FriendlyCounter extends UntypedTransactor { public void atomically(Object message) { if (message instanceof Increment) { - increment(count, 1); + Stm.increment(count, 1); } } diff --git a/akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala b/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala similarity index 66% rename from akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala rename to akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala index 964664fe55..d9ed5a8330 100644 --- a/akka-transactor/src/main/scala/scala/concurrent/stm/JavaAPI.scala +++ b/akka-transactor/src/main/scala/scala/concurrent/stm/japi/Stm.scala @@ -1,14 +1,19 @@ /* scala-stm - (c) 2009-2011, Stanford University, PPL */ -package scala.concurrent.stm +package scala.concurrent.stm.japi import java.util.concurrent.Callable +import java.util.{ List ⇒ JList, Map ⇒ JMap, Set ⇒ JSet } +import scala.collection.JavaConversions +import scala.concurrent.stm +import scala.concurrent.stm._ import scala.runtime.AbstractFunction1 /** - * Java-friendly API. + * Java-friendly API for ScalaSTM. + * These methods can also be statically imported. */ -object JavaAPI { +object Stm { /** * Create a Ref with an initial value. Return a `Ref.View`, which does not @@ -20,38 +25,58 @@ object JavaAPI { /** * Create an empty TMap. Return a `TMap.View`, which does not require - * implicit transactions. + * implicit transactions. See newMap for included java conversion. * @return a new, empty `TMap.View` */ def newTMap[A, B](): TMap.View[A, B] = TMap.empty[A, B].single + /** + * Create an empty TMap. Return a `java.util.Map` view of this TMap. + * @return a new, empty `TMap.View` wrapped as a `java.util.Map`. + */ + def newMap[A, B](): JMap[A, B] = JavaConversions.mutableMapAsJavaMap(newTMap[A, B]) + /** * Create an empty TSet. Return a `TSet.View`, which does not require - * implicit transactions. + * implicit transactions. See newSet for included java conversion. * @return a new, empty `TSet.View` */ def newTSet[A](): TSet.View[A] = TSet.empty[A].single + /** + * Create an empty TSet. Return a `java.util.Set` view of this TSet. + * @return a new, empty `TSet.View` wrapped as a `java.util.Set`. + */ + def newSet[A](): JSet[A] = JavaConversions.mutableSetAsJavaSet(newTSet[A]) + /** * Create a TArray containing `length` elements. Return a `TArray.View`, - * which does not require implicit transactions. + * which does not require implicit transactions. See newList for included + * java conversion. * @param length the length of the `TArray.View` to be created * @return a new `TArray.View` containing `length` elements (initially null) */ def newTArray[A <: AnyRef](length: Int): TArray.View[A] = TArray.ofDim[A](length)(ClassManifest.classType(AnyRef.getClass)).single + /** + * Create an empty TArray. Return a `java.util.List` view of this Array. + * @param length the length of the `TArray.View` to be created + * @return a new, empty `TArray.View` wrapped as a `java.util.List`. + */ + def newList[A <: AnyRef](length: Int): JList[A] = JavaConversions.mutableSeqAsJavaList(newTArray[A](length)) + /** * Atomic block that takes a `Runnable`. * @param runnable the `Runnable` to run within a transaction */ - def atomic(runnable: Runnable): Unit = scala.concurrent.stm.atomic { txn ⇒ runnable.run } + def atomic(runnable: Runnable): Unit = stm.atomic { txn ⇒ runnable.run } /** * Atomic block that takes a `Callable`. * @param callable the `Callable` to run within a transaction * @return the value returned by the `Callable` */ - def atomic[A](callable: Callable[A]): A = scala.concurrent.stm.atomic { txn ⇒ callable.call } + def atomic[A](callable: Callable[A]): A = stm.atomic { txn ⇒ callable.call } /** * Transform the value stored by `ref` by applying the function `f`. @@ -109,4 +134,14 @@ object JavaAPI { val txn = Txn.findCurrent if (txn.isDefined) Txn.afterRollback(status ⇒ task.run)(txn.get) } + + /** + * Add a task to run after the current transaction has either rolled back + * or committed. + * @param task the `Runnable` task to run after transaction completion + */ + def afterCompletion(task: Runnable): Unit = { + val txn = Txn.findCurrent + if (txn.isDefined) Txn.afterCompletion(status ⇒ task.run)(txn.get) + } } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java index 7c92930e02..435fb0df54 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCoordinatedCounter.java @@ -7,15 +7,15 @@ package akka.transactor; import akka.actor.ActorRef; import akka.actor.Actors; import akka.actor.UntypedActor; -import static scala.concurrent.stm.JavaAPI.*; import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; import java.util.List; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; public class UntypedCoordinatedCounter extends UntypedActor { private String name; - private Ref.View count = newRef(0); + private Ref.View count = Stm.newRef(0); public UntypedCoordinatedCounter(String name) { this.name = name; @@ -40,9 +40,8 @@ public class UntypedCoordinatedCounter extends UntypedActor { } coordinated.atomic(new Runnable() { public void run() { - increment(count, 1); - afterRollback(countDown); - afterCommit(countDown); + Stm.increment(count, 1); + Stm.afterCompletion(countDown); } }); } diff --git a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java index 392bfbca42..e4e680f74b 100644 --- a/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java +++ b/akka-transactor/src/test/java/akka/transactor/UntypedCounter.java @@ -7,8 +7,8 @@ package akka.transactor; import akka.actor.ActorRef; import akka.transactor.UntypedTransactor; import akka.transactor.SendTo; -import static scala.concurrent.stm.JavaAPI.*; import scala.concurrent.stm.Ref; +import scala.concurrent.stm.japi.Stm; import java.util.List; import java.util.Set; import java.util.concurrent.CountDownLatch; @@ -16,7 +16,7 @@ import java.util.concurrent.TimeUnit; public class UntypedCounter extends UntypedTransactor { private String name; - private Ref.View count = newRef(0); + private Ref.View count = Stm.newRef(0); public UntypedCounter(String name) { this.name = name; @@ -39,15 +39,14 @@ public class UntypedCounter extends UntypedTransactor { public void atomically(Object message) { if (message instanceof Increment) { - increment(count, 1); + Stm.increment(count, 1); final Increment increment = (Increment) message; Runnable countDown = new Runnable() { public void run() { increment.getLatch().countDown(); } }; - afterRollback(countDown); - afterCommit(countDown); + Stm.afterCompletion(countDown); } } diff --git a/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java index e2d0631590..63fb6abb74 100644 --- a/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java +++ b/akka-transactor/src/test/java/scala/concurrent/stm/JavaAPITests.java @@ -5,13 +5,12 @@ package scala.concurrent.stm; import static org.junit.Assert.*; import org.junit.Test; -import scala.concurrent.stm.Ref; -import static scala.concurrent.stm.JavaAPI.*; +import scala.concurrent.stm.japi.Stm; +import static scala.concurrent.stm.japi.Stm.*; import scala.runtime.AbstractFunction1; import java.util.concurrent.Callable; -import static scala.collection.JavaConversions.*; import java.util.Map; import java.util.Set; import java.util.List; @@ -96,8 +95,7 @@ public class JavaAPITests { @Test public void createAndUseTMap() { - TMap.View tmap = newTMap(); - Map map = mutableMapAsJavaMap(tmap); + Map map = newMap(); map.put(1, "one"); map.put(2, "two"); assertEquals("one", map.get(1)); @@ -109,8 +107,7 @@ public class JavaAPITests { @Test(expected = TestException.class) public void failingTMapTransaction() { - TMap.View tmap = newTMap(); - final Map map = mutableMapAsJavaMap(tmap); + final Map map = newMap(); try { atomic(new Runnable() { public void run() { @@ -130,8 +127,7 @@ public class JavaAPITests { @Test public void createAndUseTSet() { - TSet.View tset = newTSet(); - Set set = mutableSetAsJavaSet(tset); + Set set = newSet(); set.add("one"); set.add("two"); assertTrue(set.contains("one")); @@ -146,16 +142,15 @@ public class JavaAPITests { @Test public void createAndUseTArray() { - TArray.View tarray = newTArray(3); - List seq = mutableSeqAsJavaList(tarray); - assertEquals(null, seq.get(0)); - assertEquals(null, seq.get(1)); - assertEquals(null, seq.get(2)); - seq.set(0, "zero"); - seq.set(1, "one"); - seq.set(2, "two"); - assertEquals("zero", seq.get(0)); - assertEquals("one", seq.get(1)); - assertEquals("two", seq.get(2)); + List list = newList(3); + assertEquals(null, list.get(0)); + assertEquals(null, list.get(1)); + assertEquals(null, list.get(2)); + list.set(0, "zero"); + list.set(1, "one"); + list.set(2, "two"); + assertEquals("zero", list.get(0)); + assertEquals("one", list.get(1)); + assertEquals("two", list.get(2)); } } From b2bfc8bec02ddc35c4b7718d7bb4388bcab83bcd Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Fri, 20 Jan 2012 12:01:36 +1300 Subject: [PATCH 6/6] Update stm migration with changes to the java api --- .../project/migration-guide-1.3.x-2.0.x.rst | 35 ++++++++++++------- 1 file changed, 23 insertions(+), 12 deletions(-) diff --git a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst index 2749f0107c..e00c116cd0 100644 --- a/akka-docs/project/migration-guide-1.3.x-2.0.x.rst +++ b/akka-docs/project/migration-guide-1.3.x-2.0.x.rst @@ -403,11 +403,17 @@ To use ScalaSTM the import from Scala is:: Java ~~~~ -For Java there is a special JavaAPI helper object that can be statically -imported, along with any other imports that might be needed:: +For Java there is a special helper object with Java-friendly methods:: + + import scala.concurrent.stm.japi.Stm; + +These methods can also be statically imported:: + + import static scala.concurrent.stm.japi.Stm.*; + +Other imports that are needed are in the stm package, particularly ``Ref``:: import scala.concurrent.stm.Ref; - import static scala.concurrent.stm.JavaAPI.*; Transactions ^^^^^^^^^^^^ @@ -440,7 +446,7 @@ for more information. Java ~~~~ -In the ScalaSTM JavaAPI helpers there are atomic methods which accept +In the ScalaSTM Java API helpers there are atomic methods which accept ``java.lang.Runnable`` and ``java.util.concurrent.Callable``. v1.3:: @@ -461,7 +467,7 @@ v1.3:: v2.0:: - import static scala.concurrent.stm.JavaAPI.*; + import static scala.concurrent.stm.japi.Stm.atomic; import java.util.concurrent.Callable; atomic(new Runnable() { @@ -546,7 +552,7 @@ Java As ``Ref.View`` in ScalaSTM does not require implicit transactions, this is more easily used from Java. ``Ref`` could be used, but requires explicit threading of -transactions. There are helper methods in ``JavaAPI`` for creating ``Ref.View`` +transactions. There are helper methods in ``japi.Stm`` for creating ``Ref.View`` references. v1.3:: @@ -555,7 +561,7 @@ v1.3:: v2.0:: - Ref.View ref = newRef(0); + Ref.View ref = Stm.newRef(0); The ``set`` and ``get`` methods work the same way for both versions. @@ -570,7 +576,7 @@ v2.0:: ref.set(1); // set new value There are also ``transform``, ``getAndTransform``, and ``transformAndGet`` -methods in ``JavaAPI`` which accept ``scala.runtime.AbstractFunction1``. +methods in ``japi.Stm`` which accept ``scala.runtime.AbstractFunction1``. There are ``increment`` helper methods for ``Ref.View`` and ``Ref.View`` references. @@ -611,16 +617,21 @@ Java Rather than using the ``deferred`` and ``compensating`` methods in ``akka.stm.StmUtils``, use the ``afterCommit`` and ``afterRollback`` methods in -``scala.concurrent.stm.JavaAPI``, which behave in the same way and accept +``scala.concurrent.stm.japi.Stm``, which behave in the same way and accept ``Runnable``. Transactional Datastructures ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ In ScalaSTM see ``TMap``, ``TSet``, and ``TArray`` for transactional -datastructures. There are helper methods for creating these in -``JavaAPI``. These datastructure implement the ``scala.collection`` interfaces -and can also be used from Java with Scala's ``JavaConversions``. +datastructures. + +There are helper methods for creating these from Java in ``japi.Stm``: +``newTMap``, ``newTSet``, and ``newTArray``. These datastructures implement the +``scala.collection`` interfaces and can also be used from Java with Scala's +``JavaConversions``. There are helper methods that apply the conversions, +returning ``java.util`` ``Map``, ``Set``, and ``List``: ``newMap``, ``newSet``, +and ``newList``. More to be written