From 2b79caa8b3c2f30948feac0cb1066a6b67e272b9 Mon Sep 17 00:00:00 2001 From: Peter Vlugter Date: Sat, 6 Nov 2010 19:11:56 +1300 Subject: [PATCH] Reworked stm with no global/local --- .../src/test/scala/ReflectiveAccessSpec.scala | 6 +- .../scala/CassandraPersistentActorSpec.scala | 2 +- .../src/test/scala/Ticket343Test.scala | 2 +- .../scala/CouchDBPersistentActorSpec.scala | 4 +- ...sePersistentActorSpecTestIntegration.scala | 2 +- .../HbaseTicket343SpecTestIntegration.scala | 2 +- .../test/scala/MongoPersistentActorSpec.scala | 2 +- .../src/test/scala/MongoTicket343Spec.scala | 2 +- .../scala/RedisInconsistentSizeBugTest.scala | 2 +- .../test/scala/RedisPersistentActorSpec.scala | 2 +- .../src/test/scala/RedisPersistentQSpec.scala | 2 +- .../scala/RedisPersistentSortedSetSpec.scala | 2 +- .../src/test/scala/RedisTicket343Spec.scala | 2 +- .../scala/VoldemortPersistentActorSuite.scala | 2 +- .../src/main/scala/Ants.scala | 2 +- .../akka-sample-ants/src/main/spde/Ants.spde | 1 - .../src/main/scala/ChatServer.scala | 4 +- .../java/PersistentSimpleServiceImpl.java | 2 +- .../sample/rest/java/SimpleServiceImpl.java | 2 +- .../src/main/scala/SimpleService.scala | 2 +- .../src/main/scala/SimpleService.scala | 2 +- .../java/akka/spring/foo/StatefulPojo.java | 2 +- .../actor/TransactionalTypedActorImpl.java | 2 - .../main/scala/stm/{local => }/Atomic.scala | 5 +- akka-stm/src/main/scala/stm/Ref.scala | 37 ++++- akka-stm/src/main/scala/stm/Stm.scala | 88 +++++++++++ akka-stm/src/main/scala/stm/Transaction.scala | 109 +++++++------- .../main/scala/stm/TransactionFactory.scala | 119 +++++++++------ .../scala/stm/TransactionFactoryBuilder.scala | 64 ++++---- .../scala/stm/TransactionManagement.scala | 141 ------------------ .../src/main/scala/stm/TransactionalMap.scala | 15 +- .../main/scala/stm/TransactionalVector.scala | 15 +- .../src/main/scala/stm/global/Atomic.scala | 41 ----- .../src/main/scala/stm/global/GlobalStm.scala | 58 ------- .../src/main/scala/stm/global/package.scala | 10 -- .../src/main/scala/stm/local/LocalStm.scala | 48 ------ .../src/main/scala/stm/local/package.scala | 10 -- .../{transactional.scala => package.scala} | 30 ++-- .../java/akka/stm/{ => test}/Address.java | 2 +- .../akka/stm/{ => test}/CounterExample.java | 5 +- .../akka/stm/{ => test}/JavaStmTests.java | 3 +- .../java/akka/stm/{ => test}/RefExample.java | 5 +- .../java/akka/stm/{ => test}/StmExamples.java | 5 +- .../{ => test}/TransactionFactoryExample.java | 3 +- .../{ => test}/TransactionalMapExample.java | 3 +- .../TransactionalVectorExample.java | 3 +- .../test/java/akka/stm/{ => test}/User.java | 2 +- akka-stm/src/test/scala/stm/JavaStmSpec.scala | 4 +- akka-stm/src/test/scala/stm/RefSpec.scala | 6 +- akka-stm/src/test/scala/stm/StmSpec.scala | 110 +------------- 50 files changed, 363 insertions(+), 631 deletions(-) rename akka-stm/src/main/scala/stm/{local => }/Atomic.scala (88%) create mode 100644 akka-stm/src/main/scala/stm/Stm.scala delete mode 100644 akka-stm/src/main/scala/stm/TransactionManagement.scala delete mode 100644 akka-stm/src/main/scala/stm/global/Atomic.scala delete mode 100644 akka-stm/src/main/scala/stm/global/GlobalStm.scala delete mode 100644 akka-stm/src/main/scala/stm/global/package.scala delete mode 100644 akka-stm/src/main/scala/stm/local/LocalStm.scala delete mode 100644 akka-stm/src/main/scala/stm/local/package.scala rename akka-stm/src/main/scala/stm/{transactional.scala => package.scala} (64%) rename akka-stm/src/test/java/akka/stm/{ => test}/Address.java (90%) rename akka-stm/src/test/java/akka/stm/{ => test}/CounterExample.java (89%) rename akka-stm/src/test/java/akka/stm/{ => test}/JavaStmTests.java (98%) rename akka-stm/src/test/java/akka/stm/{ => test}/RefExample.java (92%) rename akka-stm/src/test/java/akka/stm/{ => test}/StmExamples.java (84%) rename akka-stm/src/test/java/akka/stm/{ => test}/TransactionFactoryExample.java (95%) rename akka-stm/src/test/java/akka/stm/{ => test}/TransactionalMapExample.java (95%) rename akka-stm/src/test/java/akka/stm/{ => test}/TransactionalVectorExample.java (95%) rename akka-stm/src/test/java/akka/stm/{ => test}/User.java (89%) diff --git a/akka-jta/src/test/scala/ReflectiveAccessSpec.scala b/akka-jta/src/test/scala/ReflectiveAccessSpec.scala index 417aec6eab..76bd83e15c 100644 --- a/akka-jta/src/test/scala/ReflectiveAccessSpec.scala +++ b/akka-jta/src/test/scala/ReflectiveAccessSpec.scala @@ -6,11 +6,11 @@ package akka.jta import org.scalatest.junit.JUnitSuite import org.junit.Test -import akka.stm.JtaModule +import akka.stm.ReflectiveJtaModule class ReflectiveAccessSpec extends JUnitSuite { @Test def ensureReflectiveAccessCanLoadTransactionContainer { - JtaModule.ensureJtaEnabled - assert(JtaModule.transactionContainerObjectInstance.isDefined) + ReflectiveJtaModule.ensureJtaEnabled + assert(ReflectiveJtaModule.transactionContainerObjectInstance.isDefined) } } diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index a425f0d7f5..f0284c84d2 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -2,7 +2,7 @@ package akka.persistence.cassandra import akka.actor.{Actor, ActorRef} import Actor._ -import akka.stm.local._ +import akka.stm._ import org.junit.Test import org.junit.Assert._ diff --git a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala index 641f83d569..c2d564b7a1 100644 --- a/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala +++ b/akka-persistence/akka-persistence-common/src/test/scala/Ticket343Test.scala @@ -13,7 +13,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import akka.config.Supervision.{OneForOneStrategy, Permanent} import Actor._ -import akka.stm.global._ +import akka.stm._ import akka.util.Logging import StorageObj._ diff --git a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala index 7d418375af..2433f90e80 100644 --- a/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-couchdb/src/test/scala/CouchDBPersistentActorSpec.scala @@ -8,7 +8,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import Actor._ -import akka.stm.local +import akka.stm._ case class Balance(accountNo: String) @@ -26,7 +26,7 @@ class BankAccountActor extends Actor { import sjson.json.DefaultProtocol._ import sjson.json.JsonSerialization._ - def receive = { case message => local.atomic { atomicReceive(message) } } + def receive = { case message => atomic { atomicReceive(message) } } def atomicReceive: Receive = { // check balance diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala index 2a8ef2c8e3..aa638572ae 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbasePersistentActorSpecTestIntegration.scala @@ -2,7 +2,7 @@ package akka.persistence.hbase import akka.actor.{ Actor, ActorRef } import Actor._ -import akka.stm.local._ +import akka.stm._ import org.junit.Test import org.junit.Assert._ diff --git a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala index ee22bfaa21..0403d07946 100644 --- a/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala +++ b/akka-persistence/akka-persistence-hbase/src/test/scala/HbaseTicket343SpecTestIntegration.scala @@ -9,7 +9,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ -import akka.stm.global._ +import akka.stm._ import akka.util.Logging import HbaseStorageBackend._ diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala index 597a847e1b..48929a1688 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala @@ -8,7 +8,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import Actor._ -import akka.stm.local._ +import akka.stm._ case class Balance(accountNo: String) diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala index 98cb9568b1..71eec80652 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoTicket343Spec.scala @@ -9,7 +9,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ -import akka.stm.global._ +import akka.stm._ import akka.util.Logging import MongoStorageBackend._ diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala index f365924f2f..650eabfe4d 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisInconsistentSizeBugTest.scala @@ -8,7 +8,7 @@ import akka.actor.{Actor, ActorRef} import akka.config.Supervision.{OneForOneStrategy, Permanent} import Actor._ import akka.persistence.common.PersistentVector -import akka.stm.global._ +import akka.stm._ import akka.util.Logging import java.util.{Calendar, Date} diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index a28a5485ed..ddc77a4bdf 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -7,7 +7,7 @@ import org.junit.Assert._ import akka.actor.{Actor, ActorRef} import akka.actor.Actor._ -import akka.stm.local._ +import akka.stm._ /** * A persistent actor based on Redis storage. diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala index e8f127f218..94c001f8e8 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentQSpec.scala @@ -5,7 +5,7 @@ import org.junit.Assert._ import akka.actor.{Actor, ActorRef} import Actor._ -import akka.stm.local._ +import akka.stm._ /** * A persistent actor based on Redis queue storage. diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala index d8a061a64d..8cc34dd39e 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentSortedSetSpec.scala @@ -9,7 +9,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import Actor._ -import akka.stm.local._ +import akka.stm._ /** * A persistent actor based on Redis sortedset storage. diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala index 76b07203bd..3af8c89a39 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisTicket343Spec.scala @@ -10,7 +10,7 @@ import akka.actor.{Actor} import akka.config.Supervision.{OneForOneStrategy,Permanent} import Actor._ import akka.persistence.common.PersistentVector -import akka.stm.global._ +import akka.stm._ import akka.util.Logging import RedisStorageBackend._ diff --git a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala index b0b0bdb2b3..0518191f0a 100644 --- a/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala +++ b/akka-persistence/akka-persistence-voldemort/src/test/scala/VoldemortPersistentActorSuite.scala @@ -9,7 +9,7 @@ import org.junit.runner.RunWith import akka.actor.{Actor, ActorRef} import Actor._ import BankAccountActor._ -import akka.stm.local._ +import akka.stm._ case class Balance(accountNo: String) diff --git a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala index 5ff6e9ceb1..5cddce66fa 100644 --- a/akka-samples/akka-sample-ants/src/main/scala/Ants.scala +++ b/akka-samples/akka-sample-ants/src/main/scala/Ants.scala @@ -8,7 +8,7 @@ import java.util.concurrent.TimeUnit import scala.util.Random.{nextInt => randomInt} import akka.actor.{Actor, ActorRef, Scheduler} import akka.actor.Actor.actorOf -import akka.stm.local._ +import akka.stm._ object Config { val Dim = 80 // dimensions of square world diff --git a/akka-samples/akka-sample-ants/src/main/spde/Ants.spde b/akka-samples/akka-sample-ants/src/main/spde/Ants.spde index ad7dce1239..df0da84b25 100644 --- a/akka-samples/akka-sample-ants/src/main/spde/Ants.spde +++ b/akka-samples/akka-sample-ants/src/main/spde/Ants.spde @@ -1,6 +1,5 @@ import sample.ants._ import sample.ants.Config._ -import akka.stm.local._ val scale = 5 diff --git a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala index 27bdad9d18..aa34824bab 100644 --- a/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala +++ b/akka-samples/akka-sample-chat/src/main/scala/ChatServer.scala @@ -10,7 +10,7 @@ import akka.actor.{SupervisorFactory, Actor, ActorRef, RemoteActor} import akka.remote.{RemoteNode, RemoteClient} import akka.persistence.common.PersistentVector import akka.persistence.redis.RedisStorage -import akka.stm.global._ +import akka.stm._ import akka.config.Supervision.{OneForOneStrategy,Permanent} import akka.util.Logging import Actor._ @@ -99,7 +99,7 @@ class RedisChatStorage extends ChatStorage { self.lifeCycle = Permanent val CHAT_LOG = "akka.chat.log" - private var chatLog = atomic { RedisStorage.getVector(CHAT_LOG) } + private var chatLog = RedisStorage.getVector(CHAT_LOG) log.info("Redis-based chat storage is starting up...") diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceImpl.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceImpl.java index 33136d6a2e..5d84b27d02 100644 --- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceImpl.java +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/PersistentSimpleServiceImpl.java @@ -5,7 +5,7 @@ package sample.rest.java; import akka.actor.TypedActor; -import akka.stm.local.Atomic; +import akka.stm.Atomic; import akka.persistence.common.PersistentMap; import akka.persistence.cassandra.CassandraStorage; diff --git a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceImpl.java b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceImpl.java index effcda5607..e0b95d3c92 100644 --- a/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceImpl.java +++ b/akka-samples/akka-sample-rest-java/src/main/java/sample/rest/java/SimpleServiceImpl.java @@ -5,7 +5,7 @@ package sample.rest.java; import akka.actor.TypedActor; -import akka.stm.local.Atomic; +import akka.stm.Atomic; import akka.stm.TransactionalMap; public class SimpleServiceImpl extends TypedActor implements SimpleService { diff --git a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala index fe743f0278..5ef2181e3b 100644 --- a/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-rest-scala/src/main/scala/SimpleService.scala @@ -6,7 +6,7 @@ package sample.rest.scala import akka.actor.{SupervisorFactory, Actor} import akka.actor.Actor._ -import akka.stm.local._ +import akka.stm._ import akka.stm.TransactionalMap import akka.persistence.cassandra.CassandraStorage import akka.config.Supervision._ diff --git a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala index 003979511e..1a462d1700 100644 --- a/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala +++ b/akka-samples/akka-sample-security/src/main/scala/SimpleService.scala @@ -9,7 +9,7 @@ import akka.actor.Actor._ import akka.config.Supervision._ import akka.util.Logging import akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo} -import akka.stm.local._ +import akka.stm._ import akka.stm.TransactionalMap import akka.actor.ActorRegistry.actorFor diff --git a/akka-spring/src/test/java/akka/spring/foo/StatefulPojo.java b/akka-spring/src/test/java/akka/spring/foo/StatefulPojo.java index 9f53371b5f..8f291d2a36 100644 --- a/akka-spring/src/test/java/akka/spring/foo/StatefulPojo.java +++ b/akka-spring/src/test/java/akka/spring/foo/StatefulPojo.java @@ -5,7 +5,7 @@ import akka.stm.TransactionalMap; import akka.stm.TransactionalVector; import akka.stm.Ref; import akka.actor.*; -import akka.stm.local.Atomic; +import akka.stm.Atomic; public class StatefulPojo extends TypedActor { private TransactionalMap mapState; diff --git a/akka-stm/disabled-src/test/java/akka/actor/TransactionalTypedActorImpl.java b/akka-stm/disabled-src/test/java/akka/actor/TransactionalTypedActorImpl.java index 599bd272a6..49a2b493c0 100644 --- a/akka-stm/disabled-src/test/java/akka/actor/TransactionalTypedActorImpl.java +++ b/akka-stm/disabled-src/test/java/akka/actor/TransactionalTypedActorImpl.java @@ -2,8 +2,6 @@ package akka.actor; import akka.actor.*; import akka.stm.*; -import akka.stm.local.*; -import akka.stm.local.Atomic; public class TransactionalTypedActorImpl extends TypedTransactor implements TransactionalTypedActor { private TransactionalMap mapState; diff --git a/akka-stm/src/main/scala/stm/local/Atomic.scala b/akka-stm/src/main/scala/stm/Atomic.scala similarity index 88% rename from akka-stm/src/main/scala/stm/local/Atomic.scala rename to akka-stm/src/main/scala/stm/Atomic.scala index c4929fe57c..05531ed4af 100644 --- a/akka-stm/src/main/scala/stm/local/Atomic.scala +++ b/akka-stm/src/main/scala/stm/Atomic.scala @@ -2,7 +2,7 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -package akka.stm.local +package akka.stm /** * Java-friendly atomic blocks. @@ -11,7 +11,6 @@ package akka.stm.local *

*

  * import akka.stm.*;
- * import akka.stm.local.Atomic;
  *
  * final Ref ref = new Ref(0);
  *
@@ -35,7 +34,7 @@ package akka.stm.local
  * 
*/ abstract class Atomic[T](factory: TransactionFactory) { - def this() = this(DefaultLocalTransactionFactory) + def this() = this(DefaultTransactionFactory) def atomically: T def execute: T = atomic(factory)(atomically) } diff --git a/akka-stm/src/main/scala/stm/Ref.scala b/akka-stm/src/main/scala/stm/Ref.scala index d76fd67797..1bb8779b1b 100644 --- a/akka-stm/src/main/scala/stm/Ref.scala +++ b/akka-stm/src/main/scala/stm/Ref.scala @@ -9,9 +9,7 @@ import akka.actor.{newUuid, Uuid} import org.multiverse.transactional.refs.BasicRef /** - * Ref - * - * @author Jonas Bonér + * Transactional managed reference. See the companion class for more information. */ object Ref { def apply[T]() = new Ref[T]() @@ -25,9 +23,38 @@ object Ref { } /** - * Transactional managed reference. + * Refs (transactional references) are mutable references to values and through + * the STM allow the safe sharing of mutable data. Refs separate identity from value. + * To ensure safety the value stored in a Ref should be immutable (they can also + * contain refs themselves). The value referenced by a Ref can only be accessed + * or swapped within a transaction. If a transaction is not available, the call will + * be executed in its own transaction (equivalent to using the Ref.atomic* methods). * - * @author Jonas Bonér + *

+ * Creating a Ref (in Scala): + *

+ *

+ * import akka.stm._
+ *
+ * // giving an initial value
+ * val ref = Ref(0)
+ *
+ * // specifying a type but no initial value
+ * val ref = Ref[Int]
+ * 
+ * + *

+ * Creating a Ref (in Java): + *

+ *

+ * import akka.stm.*;
+ *
+ * // giving an initial value
+ * final Ref ref = new Ref(0);
+ *
+ * // specifying a type but no initial value
+ * final Ref ref = new Ref();
+ * 
*/ class Ref[T](initialValue: T) extends BasicRef[T](initialValue) with Transactional { self => diff --git a/akka-stm/src/main/scala/stm/Stm.scala b/akka-stm/src/main/scala/stm/Stm.scala new file mode 100644 index 0000000000..f9bf1cec6a --- /dev/null +++ b/akka-stm/src/main/scala/stm/Stm.scala @@ -0,0 +1,88 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ + +package akka.stm + +import org.multiverse.api.{StmUtils => MultiverseStmUtils} +import org.multiverse.api.{Transaction => MultiverseTransaction} +import org.multiverse.templates.{TransactionalCallable, OrElseTemplate} + +/** + * Stm trait that defines the atomic block for local transactions. + *

+ * If you need to coordinate transactions across actors @see Coordinated. + *

+ * Example of atomic transaction management using the atomic block (in Scala). + *

+ *

+ * import akka.stm._
+ *
+ * atomic  {
+ *   // do something within a transaction
+ * }
+ * 
+ *

+ * @see Atomic for creating atomic blocks in Java. + */ +trait Stm { + val DefaultTransactionFactory = TransactionFactory(DefaultTransactionConfig, "DefaultTransaction") + + def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultTransactionFactory): T = + atomic(factory)(body) + + def atomic[T](factory: TransactionFactory)(body: => T): T = { + factory.boilerplate.execute(new TransactionalCallable[T]() { + def call(mtx: MultiverseTransaction): T = { + factory.addHooks + body + } + }) + } +} + +/** + * Stm utils for scheduling transaction lifecycle tasks and for blocking transactions. + */ +trait StmUtil { + /** + * Schedule a deferred task on the thread local transaction (use within an atomic). + * This is executed when the transaction commits. + */ + def deferred[T](body: => T): Unit = + MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body }) + + /** + * Schedule a compensating task on the thread local transaction (use within an atomic). + * This is executed when the transaction aborts. + */ + def compensating[T](body: => T): Unit = + MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body }) + + /** + * STM retry for blocking transactions (use within an atomic). + * Can be used to wait for a condition. + */ + def retry = MultiverseStmUtils.retry + + /** + * Use either-orElse to combine two blocking transactions. + * Usage: + *

+   * either {
+   *   ...
+   * } orElse {
+   *   ...
+   * }
+   * 
+ */ + def either[T](firstBody: => T) = new { + def orElse(secondBody: => T) = new OrElseTemplate[T] { + def either(mtx: MultiverseTransaction) = firstBody + def orelse(mtx: MultiverseTransaction) = secondBody + }.execute() + } +} + + + diff --git a/akka-stm/src/main/scala/stm/Transaction.scala b/akka-stm/src/main/scala/stm/Transaction.scala index eba7356ba6..1509bd29d8 100644 --- a/akka-stm/src/main/scala/stm/Transaction.scala +++ b/akka-stm/src/main/scala/stm/Transaction.scala @@ -24,17 +24,45 @@ class NoTransactionInScopeException extends AkkaException("No transaction in sco class TransactionRetryException(message: String) extends AkkaException(message) class StmConfigurationException(message: String) extends AkkaException(message) + +/** + * Internal helper methods for managing Akka-specific transaction. + */ +object TransactionManagement extends TransactionManagement { + private[akka] val transaction = new ThreadLocal[Option[Transaction]]() { + override protected def initialValue: Option[Transaction] = None + } + + private[akka] def getTransaction: Transaction = { + val option = transaction.get + if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope") + option.get + } +} + +/** + * Internal helper methods for managing Akka-specific transaction. + */ +trait TransactionManagement { + private[akka] def setTransaction(tx: Option[Transaction]) = + if (tx.isDefined) TransactionManagement.transaction.set(tx) + + private[akka] def clearTransaction = { + TransactionManagement.transaction.set(None) + setThreadLocalTransaction(null) + } + + private[akka] def getTransactionInScope = TransactionManagement.getTransaction + + private[akka] def isTransactionInScope = { + val option = TransactionManagement.transaction.get + (option ne null) && option.isDefined + } +} + object Transaction { val idFactory = new AtomicLong(-1L) - @deprecated("Use the akka.stm.local package object instead.") - object Local extends LocalStm - - @deprecated("Use the akka.stm.global package object instead.") - object Global extends GlobalStm - - object Util extends StmUtil - /** * Attach an Akka-specific Transaction to the current Multiverse transaction. * Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks @@ -53,34 +81,11 @@ object Transaction { } }) } - - /** - * Mapping to Multiverse PropagationLevel. - */ - object Propagation { - val RequiresNew = MultiversePropagationLevel.RequiresNew - val Mandatory = MultiversePropagationLevel.Mandatory - val Requires = MultiversePropagationLevel.Requires - val Supports = MultiversePropagationLevel.Supports - val Never = MultiversePropagationLevel.Never - } - - /** - * Mapping to Multiverse TraceLevel. - */ - object TraceLevel { - val None = MultiverseTraceLevel.none - val Coarse = MultiverseTraceLevel.course // mispelling? - val Course = MultiverseTraceLevel.course - val Fine = MultiverseTraceLevel.fine - } } /** - * The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc) - * and JTA support. - * - * @author Jonas Bonér + * The Akka-specific Transaction class. + * For integration with persistence modules and JTA support. */ @serializable class Transaction extends Logging { val JTA_AWARE = config.getBool("akka.stm.jta-aware", false) @@ -91,8 +96,8 @@ object Transaction { private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] private[akka] val depth = new AtomicInteger(0) - val jta: Option[JtaModule.TransactionContainer] = - if (JTA_AWARE) Some(JtaModule.createTransactionContainer) + val jta: Option[ReflectiveJtaModule.TransactionContainer] = + if (JTA_AWARE) Some(ReflectiveJtaModule.createTransactionContainer) else None log.trace("Creating transaction " + toString) @@ -154,15 +159,6 @@ object Transaction { throw new StmConfigurationException( "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) - // For reinitialize transaction after sending it over the wire -/* private[akka] def reinit = synchronized { - import net.lag.logging.{Logger, Level} - if (log eq null) { - log = Logger.get(this.getClass.getName) - log.setLevel(Level.ALL) // TODO: preserve logging level - } - } -*/ override def equals(that: Any): Boolean = synchronized { that.isInstanceOf[Transaction] && that.asInstanceOf[Transaction].id == this.id @@ -173,14 +169,8 @@ object Transaction { override def toString = synchronized { "Transaction[" + id + ", " + status + "]" } } -/** - * @author Jonas Bonér - */ @serializable sealed abstract class TransactionStatus -/** - * @author Jonas Bonér - */ object TransactionStatus { case object New extends TransactionStatus case object Active extends TransactionStatus @@ -189,28 +179,33 @@ object TransactionStatus { } /** - * @author Jonas Bonér + * Common trait for all the transactional objects: + * Ref, TransactionalMap, TransactionalVector, + * PersistentRef, PersistentMap, PersistentVector, PersistentQueue, PersistentSortedSet */ -@serializable -trait Transactional { +@serializable trait Transactional { val uuid: String } /** - * @author Jonas Bonér + * Used for integration with the persistence modules. */ trait Committable { def commit(): Unit } /** - * @author Jonas Bonér + * Used for integration with the persistence modules. */ trait Abortable { def abort(): Unit } -object JtaModule { +/** + * For reflective access to the JTA module. + * Allows JTA integration to work when akka-jta.jar is on the classpath. + */ +object ReflectiveJtaModule { type TransactionContainerObject = { def apply(): TransactionContainer } @@ -224,7 +219,7 @@ object JtaModule { lazy val isJtaEnabled = transactionContainerObjectInstance.isDefined def ensureJtaEnabled = if (!isJtaEnabled) throw new ModuleNotAvailableException( - "Can't load the typed actor module, make sure that akka-jta.jar is on the classpath") + "Can't load the JTA module, make sure that akka-jta.jar is on the classpath") val transactionContainerObjectInstance: Option[TransactionContainerObject] = ReflectiveAccess.getObjectFor("akka.jta.TransactionContainer$") diff --git a/akka-stm/src/main/scala/stm/TransactionFactory.scala b/akka-stm/src/main/scala/stm/TransactionFactory.scala index befa097822..d348aee8a5 100644 --- a/akka-stm/src/main/scala/stm/TransactionFactory.scala +++ b/akka-stm/src/main/scala/stm/TransactionFactory.scala @@ -12,8 +12,8 @@ import akka.util.Duration import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.stms.alpha.AlphaStm import org.multiverse.templates.TransactionBoilerplate -import org.multiverse.api.{PropagationLevel => Propagation} -import org.multiverse.api.TraceLevel +import org.multiverse.api.{PropagationLevel => MPropagation} +import org.multiverse.api.{TraceLevel => MTraceLevel} /** * For configuring multiverse transactions. @@ -37,17 +37,17 @@ object TransactionConfig { val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT) def propagation(level: String) = level.toLowerCase match { - case "requiresnew" => Transaction.Propagation.RequiresNew - case "fine" => Transaction.Propagation.Mandatory - case "supports" => Transaction.Propagation.Supports - case "never" => Transaction.Propagation.Never - case _ => Transaction.Propagation.Requires + case "requiresnew" => Propagation.RequiresNew + case "fine" => Propagation.Mandatory + case "supports" => Propagation.Supports + case "never" => Propagation.Never + case _ => Propagation.Requires } def traceLevel(level: String) = level.toLowerCase match { - case "coarse" | "course" => Transaction.TraceLevel.Coarse - case "fine" => Transaction.TraceLevel.Fine - case _ => Transaction.TraceLevel.None + case "coarse" | "course" => TraceLevel.Coarse + case "fine" => TraceLevel.Fine + case _ => TraceLevel.None } /** @@ -67,19 +67,19 @@ object TransactionConfig { * @param traceLevel Transaction trace level. * @param hooks Whether hooks for persistence modules and JTA should be added to the transaction. */ - def apply(familyName: String = FAMILY_NAME, - readonly: JBoolean = READONLY, - maxRetries: Int = MAX_RETRIES, - timeout: Duration = DefaultTimeout, - trackReads: JBoolean = TRACK_READS, - writeSkew: Boolean = WRITE_SKEW, - blockingAllowed: Boolean = BLOCKING_ALLOWED, - interruptible: Boolean = INTERRUPTIBLE, - speculative: Boolean = SPECULATIVE, - quickRelease: Boolean = QUICK_RELEASE, - propagation: Propagation = PROPAGATION, - traceLevel: TraceLevel = TRACE_LEVEL, - hooks: Boolean = HOOKS) = { + def apply(familyName: String = FAMILY_NAME, + readonly: JBoolean = READONLY, + maxRetries: Int = MAX_RETRIES, + timeout: Duration = DefaultTimeout, + trackReads: JBoolean = TRACK_READS, + writeSkew: Boolean = WRITE_SKEW, + blockingAllowed: Boolean = BLOCKING_ALLOWED, + interruptible: Boolean = INTERRUPTIBLE, + speculative: Boolean = SPECULATIVE, + quickRelease: Boolean = QUICK_RELEASE, + propagation: MPropagation = PROPAGATION, + traceLevel: MTraceLevel = TRACE_LEVEL, + hooks: Boolean = HOOKS) = { new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, interruptible, speculative, quickRelease, propagation, traceLevel, hooks) } @@ -102,19 +102,19 @@ object TransactionConfig { *

traceLevel - Transaction trace level. *

hooks - Whether hooks for persistence modules and JTA should be added to the transaction. */ -class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME, - val readonly: JBoolean = TransactionConfig.READONLY, - val maxRetries: Int = TransactionConfig.MAX_RETRIES, - val timeout: Duration = TransactionConfig.DefaultTimeout, - val trackReads: JBoolean = TransactionConfig.TRACK_READS, - val writeSkew: Boolean = TransactionConfig.WRITE_SKEW, - val blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED, - val interruptible: Boolean = TransactionConfig.INTERRUPTIBLE, - val speculative: Boolean = TransactionConfig.SPECULATIVE, - val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, - val propagation: Propagation = TransactionConfig.PROPAGATION, - val traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL, - val hooks: Boolean = TransactionConfig.HOOKS) +class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME, + val readonly: JBoolean = TransactionConfig.READONLY, + val maxRetries: Int = TransactionConfig.MAX_RETRIES, + val timeout: Duration = TransactionConfig.DefaultTimeout, + val trackReads: JBoolean = TransactionConfig.TRACK_READS, + val writeSkew: Boolean = TransactionConfig.WRITE_SKEW, + val blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED, + val interruptible: Boolean = TransactionConfig.INTERRUPTIBLE, + val speculative: Boolean = TransactionConfig.SPECULATIVE, + val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, + val propagation: MPropagation = TransactionConfig.PROPAGATION, + val traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, + val hooks: Boolean = TransactionConfig.HOOKS) object DefaultTransactionConfig extends TransactionConfig @@ -126,19 +126,19 @@ object TransactionFactory { def apply(config: TransactionConfig, defaultName: String) = new TransactionFactory(config, defaultName) - def apply(familyName: String = TransactionConfig.FAMILY_NAME, - readonly: JBoolean = TransactionConfig.READONLY, - maxRetries: Int = TransactionConfig.MAX_RETRIES, - timeout: Duration = TransactionConfig.DefaultTimeout, - trackReads: JBoolean = TransactionConfig.TRACK_READS, - writeSkew: Boolean = TransactionConfig.WRITE_SKEW, - blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED, - interruptible: Boolean = TransactionConfig.INTERRUPTIBLE, - speculative: Boolean = TransactionConfig.SPECULATIVE, - quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, - propagation: Propagation = TransactionConfig.PROPAGATION, - traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL, - hooks: Boolean = TransactionConfig.HOOKS) = { + def apply(familyName: String = TransactionConfig.FAMILY_NAME, + readonly: JBoolean = TransactionConfig.READONLY, + maxRetries: Int = TransactionConfig.MAX_RETRIES, + timeout: Duration = TransactionConfig.DefaultTimeout, + trackReads: JBoolean = TransactionConfig.TRACK_READS, + writeSkew: Boolean = TransactionConfig.WRITE_SKEW, + blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED, + interruptible: Boolean = TransactionConfig.INTERRUPTIBLE, + speculative: Boolean = TransactionConfig.SPECULATIVE, + quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, + propagation: MPropagation = TransactionConfig.PROPAGATION, + traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL, + hooks: Boolean = TransactionConfig.HOOKS) = { val config = new TransactionConfig( familyName, readonly, maxRetries, timeout, trackReads, writeSkew, blockingAllowed, interruptible, speculative, quickRelease, propagation, traceLevel, hooks) @@ -202,3 +202,24 @@ class TransactionFactory( def addHooks = if (config.hooks) Transaction.attach } + +/** + * Mapping to Multiverse PropagationLevel. + */ +object Propagation { + val RequiresNew = MPropagation.RequiresNew + val Mandatory = MPropagation.Mandatory + val Requires = MPropagation.Requires + val Supports = MPropagation.Supports + val Never = MPropagation.Never +} + +/** + * Mapping to Multiverse TraceLevel. + */ +object TraceLevel { + val None = MTraceLevel.none + val Coarse = MTraceLevel.course // mispelling? + val Course = MTraceLevel.course + val Fine = MTraceLevel.fine +} diff --git a/akka-stm/src/main/scala/stm/TransactionFactoryBuilder.scala b/akka-stm/src/main/scala/stm/TransactionFactoryBuilder.scala index dc44461631..07eb1ed6df 100644 --- a/akka-stm/src/main/scala/stm/TransactionFactoryBuilder.scala +++ b/akka-stm/src/main/scala/stm/TransactionFactoryBuilder.scala @@ -8,26 +8,26 @@ import java.lang.{Boolean => JBoolean} import akka.util.Duration -import org.multiverse.api.TraceLevel -import org.multiverse.api.{PropagationLevel => Propagation} +import org.multiverse.api.{TraceLevel => MTraceLevel} +import org.multiverse.api.{PropagationLevel => MPropagation} /** * For more easily creating TransactionConfig from Java. */ class TransactionConfigBuilder { - var familyName: String = TransactionConfig.FAMILY_NAME - var readonly: JBoolean = TransactionConfig.READONLY - var maxRetries: Int = TransactionConfig.MAX_RETRIES - var timeout: Duration = TransactionConfig.DefaultTimeout - var trackReads: JBoolean = TransactionConfig.TRACK_READS - var writeSkew: Boolean = TransactionConfig.WRITE_SKEW - var blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED - var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE - var speculative: Boolean = TransactionConfig.SPECULATIVE - var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE - var propagation: Propagation = TransactionConfig.PROPAGATION - var traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS + var familyName: String = TransactionConfig.FAMILY_NAME + var readonly: JBoolean = TransactionConfig.READONLY + var maxRetries: Int = TransactionConfig.MAX_RETRIES + var timeout: Duration = TransactionConfig.DefaultTimeout + var trackReads: JBoolean = TransactionConfig.TRACK_READS + var writeSkew: Boolean = TransactionConfig.WRITE_SKEW + var blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED + var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE + var speculative: Boolean = TransactionConfig.SPECULATIVE + var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE + var propagation: MPropagation = TransactionConfig.PROPAGATION + var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL + var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -39,8 +39,8 @@ class TransactionConfigBuilder { def setInterruptible(interruptible: Boolean) = { this.interruptible = interruptible; this } def setSpeculative(speculative: Boolean) = { this.speculative = speculative; this } def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } - def setPropagation(propagation: Propagation) = { this.propagation = propagation; this } - def setTraceLevel(traceLevel: TraceLevel) = { this.traceLevel = traceLevel; this } + def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } + def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = new TransactionConfig( @@ -52,19 +52,19 @@ class TransactionConfigBuilder { * For more easily creating TransactionFactory from Java. */ class TransactionFactoryBuilder { - var familyName: String = TransactionConfig.FAMILY_NAME - var readonly: JBoolean = TransactionConfig.READONLY - var maxRetries: Int = TransactionConfig.MAX_RETRIES - var timeout: Duration = TransactionConfig.DefaultTimeout - var trackReads: JBoolean = TransactionConfig.TRACK_READS - var writeSkew: Boolean = TransactionConfig.WRITE_SKEW - var blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED - var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE - var speculative: Boolean = TransactionConfig.SPECULATIVE - var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE - var propagation: Propagation = TransactionConfig.PROPAGATION - var traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL - var hooks: Boolean = TransactionConfig.HOOKS + var familyName: String = TransactionConfig.FAMILY_NAME + var readonly: JBoolean = TransactionConfig.READONLY + var maxRetries: Int = TransactionConfig.MAX_RETRIES + var timeout: Duration = TransactionConfig.DefaultTimeout + var trackReads: JBoolean = TransactionConfig.TRACK_READS + var writeSkew: Boolean = TransactionConfig.WRITE_SKEW + var blockingAllowed: Boolean = TransactionConfig.BLOCKING_ALLOWED + var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE + var speculative: Boolean = TransactionConfig.SPECULATIVE + var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE + var propagation: MPropagation = TransactionConfig.PROPAGATION + var traceLevel: MTraceLevel = TransactionConfig.TRACE_LEVEL + var hooks: Boolean = TransactionConfig.HOOKS def setFamilyName(familyName: String) = { this.familyName = familyName; this } def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this } @@ -76,8 +76,8 @@ class TransactionFactoryBuilder { def setInterruptible(interruptible: Boolean) = { this.interruptible = interruptible; this } def setSpeculative(speculative: Boolean) = { this.speculative = speculative; this } def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this } - def setPropagation(propagation: Propagation) = { this.propagation = propagation; this } - def setTraceLevel(traceLevel: TraceLevel) = { this.traceLevel = traceLevel; this } + def setPropagation(propagation: MPropagation) = { this.propagation = propagation; this } + def setTraceLevel(traceLevel: MTraceLevel) = { this.traceLevel = traceLevel; this } def setHooks(hooks: Boolean) = { this.hooks = hooks; this } def build() = { diff --git a/akka-stm/src/main/scala/stm/TransactionManagement.scala b/akka-stm/src/main/scala/stm/TransactionManagement.scala deleted file mode 100644 index c6f29c866b..0000000000 --- a/akka-stm/src/main/scala/stm/TransactionManagement.scala +++ /dev/null @@ -1,141 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.stm - -import akka.AkkaException - -import org.multiverse.api.{StmUtils => MultiverseStmUtils} -import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.api.{Transaction => MultiverseTransaction} -import org.multiverse.commitbarriers.CountDownCommitBarrier -import org.multiverse.templates.OrElseTemplate - -class TransactionSetAbortedException(msg: String) extends AkkaException(msg) - -/** - * Internal helper methods and properties for transaction management. - */ -object TransactionManagement extends TransactionManagement { - import akka.config.Config._ - - // FIXME move to stm.global.fair? - val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true) - - private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() { - override protected def initialValue: Option[CountDownCommitBarrier] = None - } - - private[akka] val transaction = new ThreadLocal[Option[Transaction]]() { - override protected def initialValue: Option[Transaction] = None - } - - private[akka] def getTransactionSet: CountDownCommitBarrier = { - val option = transactionSet.get - if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction set in scope") - else option.get - } - - private[akka] def getTransaction: Transaction = { - val option = transaction.get - if ((option eq null) || option.isEmpty) throw new StmConfigurationException("No Transaction in scope") - option.get - } -} - -/** - * Internal helper methods for transaction management. - */ -trait TransactionManagement { - - private[akka] def createNewTransactionSet: CountDownCommitBarrier = { - val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS) - TransactionManagement.transactionSet.set(Some(txSet)) - txSet - } - - private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) = - if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet) - - private[akka] def setTransaction(tx: Option[Transaction]) = - if (tx.isDefined) TransactionManagement.transaction.set(tx) - - private[akka] def clearTransactionSet = { - TransactionManagement.transactionSet.set(None) - } - - private[akka] def clearTransaction = { - TransactionManagement.transaction.set(None) - setThreadLocalTransaction(null) - } - - private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet - - private[akka] def getTransactionInScope = TransactionManagement.getTransaction - - private[akka] def isTransactionSetInScope = { - val option = TransactionManagement.transactionSet.get - (option ne null) && option.isDefined - } - - private[akka] def isTransactionInScope = { - val option = TransactionManagement.transaction.get - (option ne null) && option.isDefined - } -} - -trait StmUtil { - /** - * Schedule a deferred task on the thread local transaction (use within an atomic). - * This is executed when the transaction commits. - */ - def deferred[T](body: => T): Unit = - MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body }) - - /** - * Schedule a compensating task on the thread local transaction (use within an atomic). - * This is executed when the transaction aborts. - */ - def compensating[T](body: => T): Unit = - MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body }) - - /** - * STM retry for blocking transactions (use within an atomic). - * Can be used to wait for a condition. - */ - def retry = MultiverseStmUtils.retry - - /** - * Use either-orElse to combine two blocking transactions. - * Usage: - *

-   * either {
-   *   ...
-   * } orElse {
-   *   ...
-   * }
-   * 
- */ - def either[T](firstBody: => T) = new { - def orElse(secondBody: => T) = new OrElseTemplate[T] { - def either(mtx: MultiverseTransaction) = firstBody - def orelse(mtx: MultiverseTransaction) = secondBody - }.execute() - } -} - -trait StmCommon { - type TransactionConfig = akka.stm.TransactionConfig - val TransactionConfig = akka.stm.TransactionConfig - - type TransactionFactory = akka.stm.TransactionFactory - val TransactionFactory = akka.stm.TransactionFactory - - val Propagation = akka.stm.Transaction.Propagation - - val TraceLevel = akka.stm.Transaction.TraceLevel - - type Ref[T] = akka.stm.Ref[T] - val Ref = akka.stm.Ref -} diff --git a/akka-stm/src/main/scala/stm/TransactionalMap.scala b/akka-stm/src/main/scala/stm/TransactionalMap.scala index db42caaf9f..ab7fe7341b 100644 --- a/akka-stm/src/main/scala/stm/TransactionalMap.scala +++ b/akka-stm/src/main/scala/stm/TransactionalMap.scala @@ -10,6 +10,9 @@ import akka.actor.{newUuid} import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction +/** + * Transactional map that implements the mutable Map interface with an underlying Ref and HashMap. + */ object TransactionalMap { def apply[K, V]() = new TransactionalMap[K, V]() @@ -17,9 +20,15 @@ object TransactionalMap { } /** - * Transactional map that implements the mutable map interface with an underlying ref and hash map. - * - * @author Jonas Bonér + * Transactional map that implements the mutable Map interface with an underlying Ref and HashMap. + *

+ * TransactionalMap and TransactionalVector look like regular mutable datastructures, they even + * implement the standard Scala 'Map' and 'IndexedSeq' interfaces, but they are implemented using + * persistent datastructures and managed references under the hood. Therefore they are safe to use + * in a concurrent environment through the STM. Underlying TransactionalMap is HashMap, an immutable + * Map but with near constant time access and modification operations. + *

+ * From Scala you can use TMap as a shorter alias for TransactionalMap. */ class TransactionalMap[K, V](initialValue: HashMap[K, V]) extends Transactional with scala.collection.mutable.Map[K, V] { def this() = this(HashMap[K, V]()) diff --git a/akka-stm/src/main/scala/stm/TransactionalVector.scala b/akka-stm/src/main/scala/stm/TransactionalVector.scala index 963568af6c..dbb6eda307 100644 --- a/akka-stm/src/main/scala/stm/TransactionalVector.scala +++ b/akka-stm/src/main/scala/stm/TransactionalVector.scala @@ -10,6 +10,9 @@ import akka.actor.newUuid import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction +/** + * Transactional vector that implements the IndexedSeq interface with an underlying Ref and Vector. + */ object TransactionalVector { def apply[T]() = new TransactionalVector[T]() @@ -17,9 +20,15 @@ object TransactionalVector { } /** - * Transactional vector that implements the indexed seq interface with an underlying ref and vector. - * - * @author Jonas Bonér + * Transactional vector that implements the IndexedSeq interface with an underlying Ref and Vector. + *

+ * TransactionalMap and TransactionalVector look like regular mutable datastructures, they even + * implement the standard Scala 'Map' and 'IndexedSeq' interfaces, but they are implemented using + * persistent datastructures and managed references under the hood. Therefore they are safe to use + * in a concurrent environment through the STM. Underlying TransactionalVector is Vector, an immutable + * sequence but with near constant time access and modification operations. + *

+ * From Scala you can use TVector as a shorter alias for TransactionalVector. */ class TransactionalVector[T](initialValue: Vector[T]) extends Transactional with IndexedSeq[T] { def this() = this(Vector[T]()) diff --git a/akka-stm/src/main/scala/stm/global/Atomic.scala b/akka-stm/src/main/scala/stm/global/Atomic.scala deleted file mode 100644 index 298c58d63d..0000000000 --- a/akka-stm/src/main/scala/stm/global/Atomic.scala +++ /dev/null @@ -1,41 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.stm.global - -/** - * Java-friendly atomic blocks. - *

- * Example usage (in Java): - *

- *

- * import akka.stm.*;
- * import akka.stm.global.Atomic;
- *
- * final Ref ref = new Ref(0);
- *
- * new Atomic() {
- *     public Object atomically() {
- *         return ref.set(1);
- *     }
- * }.execute();
- *
- * // To configure transactions pass a TransactionFactory
- *
- * TransactionFactory txFactory = new TransactionFactoryBuilder()
- *     .setReadonly(true)
- *     .build();
- *
- * Integer value = new Atomic(txFactory) {
- *     public Integer atomically() {
- *         return ref.get();
- *     }
- * }.execute();
- * 
- */ -abstract class Atomic[T](factory: TransactionFactory) { - def this() = this(DefaultGlobalTransactionFactory) - def atomically: T - def execute: T = atomic(factory)(atomically) -} diff --git a/akka-stm/src/main/scala/stm/global/GlobalStm.scala b/akka-stm/src/main/scala/stm/global/GlobalStm.scala deleted file mode 100644 index 4ff66c7761..0000000000 --- a/akka-stm/src/main/scala/stm/global/GlobalStm.scala +++ /dev/null @@ -1,58 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.stm - -import akka.util.Logging - -import org.multiverse.api.{Transaction => MultiverseTransaction} -import org.multiverse.templates.TransactionalCallable - -object GlobalStm extends Logging - -/** - * Global transaction management, global in the context of multiple threads. - * Use this if you need to have one transaction span multiple threads (or Actors). - *

- * Example of atomic transaction management using the atomic block: - *

- *

- * import akka.stm.global._
- *
- * atomic  {
- *   // do something within a transaction
- * }
- * 
- */ -class GlobalStm extends TransactionManagement { - - val DefaultGlobalTransactionConfig = TransactionConfig() - val DefaultGlobalTransactionFactory = TransactionFactory( - DefaultGlobalTransactionConfig, "DefaultGlobalTransaction") - - def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = - atomic(factory)(body) - - def atomic[T](factory: TransactionFactory)(body: => T): T = { - factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - if (!isTransactionSetInScope) createNewTransactionSet - factory.addHooks - val result = body - val txSet = getTransactionSetInScope - GlobalStm.log.trace( - "Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]") - try { - txSet.tryJoinCommit( - mtx, - TransactionConfig.DefaultTimeout.length, - TransactionConfig.DefaultTimeout.unit) - // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake - } catch { case e: IllegalStateException => {} } - result - } - }) - } -} - diff --git a/akka-stm/src/main/scala/stm/global/package.scala b/akka-stm/src/main/scala/stm/global/package.scala deleted file mode 100644 index cf4b24bf8a..0000000000 --- a/akka-stm/src/main/scala/stm/global/package.scala +++ /dev/null @@ -1,10 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.stm - -/** - * For easily importing global STM. - */ -package object global extends GlobalStm with StmUtil with StmCommon diff --git a/akka-stm/src/main/scala/stm/local/LocalStm.scala b/akka-stm/src/main/scala/stm/local/LocalStm.scala deleted file mode 100644 index 5048a745aa..0000000000 --- a/akka-stm/src/main/scala/stm/local/LocalStm.scala +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.stm - -import akka.util.Logging - -import org.multiverse.api.{Transaction => MultiverseTransaction} -import org.multiverse.templates.TransactionalCallable - -object LocalStm extends Logging - -/** - * Local transaction management, local in the context of threads. - * Use this if you do not need to have one transaction span - * multiple threads (or Actors). - *

- * Example of atomic transaction management using the atomic block. - *

- *

- * import akka.stm.local._
- *
- * atomic  {
- *   // do something within a transaction
- * }
- * 
- */ -class LocalStm extends TransactionManagement with Logging { - - val DefaultLocalTransactionConfig = TransactionConfig() - val DefaultLocalTransactionFactory = TransactionFactory( - DefaultLocalTransactionConfig, "DefaultLocalTransaction") - - def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = - atomic(factory)(body) - - def atomic[T](factory: TransactionFactory)(body: => T): T = { - factory.boilerplate.execute(new TransactionalCallable[T]() { - def call(mtx: MultiverseTransaction): T = { - factory.addHooks - val result = body - LocalStm.log.trace("Committing local transaction [" + mtx + "]") - result - } - }) - } -} diff --git a/akka-stm/src/main/scala/stm/local/package.scala b/akka-stm/src/main/scala/stm/local/package.scala deleted file mode 100644 index 646e63e8fe..0000000000 --- a/akka-stm/src/main/scala/stm/local/package.scala +++ /dev/null @@ -1,10 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package akka.stm - -/** - * For easily importing local STM. - */ -package object local extends LocalStm with StmUtil with StmCommon diff --git a/akka-stm/src/main/scala/stm/transactional.scala b/akka-stm/src/main/scala/stm/package.scala similarity index 64% rename from akka-stm/src/main/scala/stm/transactional.scala rename to akka-stm/src/main/scala/stm/package.scala index 074b1308d8..50f9e709b9 100644 --- a/akka-stm/src/main/scala/stm/transactional.scala +++ b/akka-stm/src/main/scala/stm/package.scala @@ -2,18 +2,22 @@ * Copyright (C) 2009-2010 Scalable Solutions AB */ -package akka.stm +package akka /** - * For importing the transactional datastructures, including the primitive refs - * and transactional data structures from Multiverse. + * For easily importing everthing needed for STM. */ -package object transactional { - type TransactionalMap[K,V] = akka.stm.TransactionalMap[K,V] - val TransactionalMap = akka.stm.TransactionalMap +package object stm extends akka.stm.Stm with akka.stm.StmUtil { - type TransactionalVector[T] = akka.stm.TransactionalVector[T] - val TransactionalVector = akka.stm.TransactionalVector + // Shorter aliases for transactional map and vector + + type TMap[K, V] = akka.stm.TransactionalMap[K, V] + val TMap = akka.stm.TransactionalMap + + type TVector[T] = akka.stm.TransactionalVector[T] + val TVector = akka.stm.TransactionalVector + + // Multiverse primitive refs type BooleanRef = org.multiverse.transactional.refs.BooleanRef type ByteRef = org.multiverse.transactional.refs.ByteRef @@ -24,12 +28,14 @@ package object transactional { type LongRef = org.multiverse.transactional.refs.LongRef type ShortRef = org.multiverse.transactional.refs.ShortRef - type TransactionalReferenceArray[T] = org.multiverse.transactional.arrays.TransactionalReferenceArray[T] + // Multiverse transactional datastructures - // These won't compile - something to do with vararg constructors? Check for Scala bug. + type TransactionalReferenceArray[T] = org.multiverse.transactional.arrays.TransactionalReferenceArray[T] + type TransactionalThreadPoolExecutor = org.multiverse.transactional.executors.TransactionalThreadPoolExecutor + + // These won't compile: + // Transaction arg is added after varargs with byte code rewriting but Scala compiler doesn't allow this // type TransactionalArrayList[T] = org.multiverse.transactional.collections.TransactionalArrayList[T] // type TransactionalLinkedList[T] = org.multiverse.transactional.collections.TransactionalLinkedList[T] - - type TransactionalThreadPoolExecutor = org.multiverse.transactional.executors.TransactionalThreadPoolExecutor } diff --git a/akka-stm/src/test/java/akka/stm/Address.java b/akka-stm/src/test/java/akka/stm/test/Address.java similarity index 90% rename from akka-stm/src/test/java/akka/stm/Address.java rename to akka-stm/src/test/java/akka/stm/test/Address.java index 55b30e991b..3ab7828e3c 100644 --- a/akka-stm/src/test/java/akka/stm/Address.java +++ b/akka-stm/src/test/java/akka/stm/test/Address.java @@ -1,4 +1,4 @@ -package akka.stm; +package akka.stm.test; public class Address { private String location; diff --git a/akka-stm/src/test/java/akka/stm/CounterExample.java b/akka-stm/src/test/java/akka/stm/test/CounterExample.java similarity index 89% rename from akka-stm/src/test/java/akka/stm/CounterExample.java rename to akka-stm/src/test/java/akka/stm/test/CounterExample.java index 6af46ee0df..7e46db90b9 100644 --- a/akka-stm/src/test/java/akka/stm/CounterExample.java +++ b/akka-stm/src/test/java/akka/stm/test/CounterExample.java @@ -1,7 +1,6 @@ -package akka.stm; +package akka.stm.test; -import akka.stm.Ref; -import akka.stm.local.Atomic; +import akka.stm.*; public class CounterExample { final static Ref ref = new Ref(0); diff --git a/akka-stm/src/test/java/akka/stm/JavaStmTests.java b/akka-stm/src/test/java/akka/stm/test/JavaStmTests.java similarity index 98% rename from akka-stm/src/test/java/akka/stm/JavaStmTests.java rename to akka-stm/src/test/java/akka/stm/test/JavaStmTests.java index 15a9129655..79547c1bb6 100644 --- a/akka-stm/src/test/java/akka/stm/JavaStmTests.java +++ b/akka-stm/src/test/java/akka/stm/test/JavaStmTests.java @@ -1,11 +1,10 @@ -package akka.stm; +package akka.stm.test; import static org.junit.Assert.*; import org.junit.Test; import org.junit.Before; import akka.stm.*; -import akka.stm.local.Atomic; import org.multiverse.api.ThreadLocalTransaction; import org.multiverse.api.TransactionConfiguration; diff --git a/akka-stm/src/test/java/akka/stm/RefExample.java b/akka-stm/src/test/java/akka/stm/test/RefExample.java similarity index 92% rename from akka-stm/src/test/java/akka/stm/RefExample.java rename to akka-stm/src/test/java/akka/stm/test/RefExample.java index 22ffa17099..8866edc3f1 100644 --- a/akka-stm/src/test/java/akka/stm/RefExample.java +++ b/akka-stm/src/test/java/akka/stm/test/RefExample.java @@ -1,7 +1,6 @@ -package akka.stm; +package akka.stm.test; -import akka.stm.Ref; -import akka.stm.local.Atomic; +import akka.stm.*; public class RefExample { public static void main(String[] args) { diff --git a/akka-stm/src/test/java/akka/stm/StmExamples.java b/akka-stm/src/test/java/akka/stm/test/StmExamples.java similarity index 84% rename from akka-stm/src/test/java/akka/stm/StmExamples.java rename to akka-stm/src/test/java/akka/stm/test/StmExamples.java index 3e8ca17e8f..d183234a47 100644 --- a/akka-stm/src/test/java/akka/stm/StmExamples.java +++ b/akka-stm/src/test/java/akka/stm/test/StmExamples.java @@ -1,7 +1,4 @@ -package akka.stm; - -import akka.stm.Ref; -import akka.stm.local.Atomic; +package akka.stm.test; public class StmExamples { public static void main(String[] args) { diff --git a/akka-stm/src/test/java/akka/stm/TransactionFactoryExample.java b/akka-stm/src/test/java/akka/stm/test/TransactionFactoryExample.java similarity index 95% rename from akka-stm/src/test/java/akka/stm/TransactionFactoryExample.java rename to akka-stm/src/test/java/akka/stm/test/TransactionFactoryExample.java index b8f189f47f..df33e84d89 100644 --- a/akka-stm/src/test/java/akka/stm/TransactionFactoryExample.java +++ b/akka-stm/src/test/java/akka/stm/test/TransactionFactoryExample.java @@ -1,7 +1,6 @@ -package akka.stm; +package akka.stm.test; import akka.stm.*; -import akka.stm.local.Atomic; import org.multiverse.api.ThreadLocalTransaction; import org.multiverse.api.TransactionConfiguration; diff --git a/akka-stm/src/test/java/akka/stm/TransactionalMapExample.java b/akka-stm/src/test/java/akka/stm/test/TransactionalMapExample.java similarity index 95% rename from akka-stm/src/test/java/akka/stm/TransactionalMapExample.java rename to akka-stm/src/test/java/akka/stm/test/TransactionalMapExample.java index 78dd1cd0ec..6ef144550c 100644 --- a/akka-stm/src/test/java/akka/stm/TransactionalMapExample.java +++ b/akka-stm/src/test/java/akka/stm/test/TransactionalMapExample.java @@ -1,7 +1,6 @@ -package akka.stm; +package akka.stm.test; import akka.stm.*; -import akka.stm.local.Atomic; public class TransactionalMapExample { public static void main(String[] args) { diff --git a/akka-stm/src/test/java/akka/stm/TransactionalVectorExample.java b/akka-stm/src/test/java/akka/stm/test/TransactionalVectorExample.java similarity index 95% rename from akka-stm/src/test/java/akka/stm/TransactionalVectorExample.java rename to akka-stm/src/test/java/akka/stm/test/TransactionalVectorExample.java index 483bf65690..9353f02830 100644 --- a/akka-stm/src/test/java/akka/stm/TransactionalVectorExample.java +++ b/akka-stm/src/test/java/akka/stm/test/TransactionalVectorExample.java @@ -1,7 +1,6 @@ -package akka.stm; +package akka.stm.test; import akka.stm.*; -import akka.stm.local.Atomic; public class TransactionalVectorExample { public static void main(String[] args) { diff --git a/akka-stm/src/test/java/akka/stm/User.java b/akka-stm/src/test/java/akka/stm/test/User.java similarity index 89% rename from akka-stm/src/test/java/akka/stm/User.java rename to akka-stm/src/test/java/akka/stm/test/User.java index 5c148a21a4..9f6ec338b6 100644 --- a/akka-stm/src/test/java/akka/stm/User.java +++ b/akka-stm/src/test/java/akka/stm/test/User.java @@ -1,4 +1,4 @@ -package akka.stm; +package akka.stm.test; public class User { private String name; diff --git a/akka-stm/src/test/scala/stm/JavaStmSpec.scala b/akka-stm/src/test/scala/stm/JavaStmSpec.scala index 6bd80c7b0e..a5847d2e87 100644 --- a/akka-stm/src/test/scala/stm/JavaStmSpec.scala +++ b/akka-stm/src/test/scala/stm/JavaStmSpec.scala @@ -1,5 +1,5 @@ -package akka.stm +package akka.stm.test import org.scalatest.junit.JUnitWrapperSuite -class JavaStmSpec extends JUnitWrapperSuite("akka.stm.JavaStmTests", Thread.currentThread.getContextClassLoader) +class JavaStmSpec extends JUnitWrapperSuite("akka.stm.test.JavaStmTests", Thread.currentThread.getContextClassLoader) diff --git a/akka-stm/src/test/scala/stm/RefSpec.scala b/akka-stm/src/test/scala/stm/RefSpec.scala index 1fde341756..8270706146 100644 --- a/akka-stm/src/test/scala/stm/RefSpec.scala +++ b/akka-stm/src/test/scala/stm/RefSpec.scala @@ -1,11 +1,11 @@ -package akka.stm +package akka.stm.test import org.scalatest.WordSpec import org.scalatest.matchers.MustMatchers class RefSpec extends WordSpec with MustMatchers { - import akka.stm.local._ + import akka.stm._ "A Ref" should { @@ -25,7 +25,7 @@ class RefSpec extends WordSpec with MustMatchers { val ref = Ref(3) try { - atomic(DefaultLocalTransactionFactory) { + atomic(DefaultTransactionFactory) { ref.swap(5) throw new Exception } diff --git a/akka-stm/src/test/scala/stm/StmSpec.scala b/akka-stm/src/test/scala/stm/StmSpec.scala index ed91d0720b..f20545e03b 100644 --- a/akka-stm/src/test/scala/stm/StmSpec.scala +++ b/akka-stm/src/test/scala/stm/StmSpec.scala @@ -1,4 +1,4 @@ -package akka.stm +package akka.stm.test import akka.actor.Actor import Actor._ @@ -10,9 +10,9 @@ import org.scalatest.matchers.MustMatchers class StmSpec extends WordSpec with MustMatchers { - "Local STM" should { + import akka.stm._ - import akka.stm.local._ + "Local STM" should { "be able to do multiple consecutive atomic {..} statements" in { val ref = Ref(0) @@ -66,7 +66,7 @@ class StmSpec extends WordSpec with MustMatchers { } try { - atomic(DefaultLocalTransactionFactory) { + atomic(DefaultTransactionFactory) { increment increment throw new Exception @@ -127,105 +127,3 @@ class StmSpec extends WordSpec with MustMatchers { } } } - -/* - "Global STM" should { - "be able to initialize with atomic {..} block inside actor constructor" in { - import GlobalTransactionVectorTestActor._ - try { - val actor = actorOf[GlobalTransactionVectorTestActor].start - actor !! Add(5) - val size1 = (actor !! Size).as[Int].getOrElse(fail("Could not get Vector::size")) - size1 must be (2) - actor !! Add(2) - val size2 = (actor !! Size).as[Int].getOrElse(fail("Could not get Vector::size")) - size2 must be (3) - } catch { - case e => - e.printStackTrace - fail(e.toString) - } - } - } - - "Transactor" should { - "be able receive message sent with !! and pass it along to nested transactor with !! and receive reply; multiple times in a row" in { - import GlobalTransactionVectorTestActor._ - val actor = actorOf[NestedTransactorLevelOneActor].start - actor !! (Add(2), 10000) - val size1 = (actor !! (Size, 10000)).as[Int].getOrElse(fail("Could not get size")) - size1 must be (2) - actor !! (Add(7), 10000) - actor ! "HiLevelOne" - val size2 = (actor !! (Size, 10000)).as[Int].getOrElse(fail("Could not get size")) - size2 must be (7) - actor !! (Add(0), 10000) - actor ! "HiLevelTwo" - val size3 = (actor !! (Size, 10000)).as[Int].getOrElse(fail("Could not get size")) - size3 must be (0) - actor !! (Add(3), 10000) - val size4 = (actor !! (Size, 10000)).as[Int].getOrElse(fail("Could not get size")) - size4 must be (3) - } - } -} - -object GlobalTransactionVectorTestActor { - case class Add(value: Int) - case object Size - case object Success -} - -class GlobalTransactionVectorTestActor extends Actor { - import GlobalTransactionVectorTestActor._ - import akka.stm.global._ - - private val vector: TransactionalVector[Int] = atomic { TransactionalVector(1) } - - def receive = { - case Add(value) => - atomic { vector + value} - self.reply(Success) - - case Size => - val size = atomic { vector.size } - self.reply(size) - } -} - -class NestedTransactorLevelOneActor extends Actor { - import GlobalTransactionVectorTestActor._ - - private val nested = actorOf[NestedTransactorLevelTwoActor].start - self.timeout = 10000 - - def receive = { - case add @ Add(_) => - self.reply((nested !! add).get) - - case Size => - self.reply((nested !! Size).get) - - case "HiLevelOne" => println("HiLevelOne") - case "HiLevelTwo" => nested ! "HiLevelTwo" - } -} - -class NestedTransactorLevelTwoActor extends Transactor { - import GlobalTransactionVectorTestActor._ - - private val ref = Ref(0) - self.timeout = 10000 - - def receive = { - case Add(value) => - ref.swap(value) - self.reply(Success) - - case Size => - self.reply(ref.getOrElse(-1)) - - case "HiLevelTwo" => println("HiLevelTwo") - } -} -*/