diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala old mode 100755 new mode 100644 index 21453a8bc2..ffd551cd21 --- a/akka-actors/src/main/scala/actor/Actor.scala +++ b/akka-actors/src/main/scala/actor/Actor.scala @@ -5,6 +5,7 @@ package se.scalablesolutions.akka.actor import com.google.protobuf.ByteString + import java.net.InetSocketAddress import java.util.concurrent.CopyOnWriteArraySet @@ -16,10 +17,12 @@ import nio.protobuf.RemoteProtocol.RemoteRequest import util.Logging import serialization.{Serializer, Serializable, SerializationProtocol} import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} + import org.multiverse.utils.TransactionThreadLocal._ sealed abstract class LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage +case object TransactionalInit extends LifecycleMessage case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage case class Restart(reason: AnyRef) extends LifecycleMessage case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage @@ -164,7 +167,16 @@ trait Actor extends Logging with TransactionManagement { * Optional callback method that is called during initialization. * To be implemented by subclassing actor. */ - protected def init(config: AnyRef) {} + protected def init(config: AnyRef) = {} + + /** + * User overridable callback/setting. + * + * Optional callback method that is called during initialization. + * Used to initialize transactional state. + * To be implemented by subclassing actor. + */ + protected def initializeTransactionalState = {} /** * User overridable callback/setting. @@ -172,7 +184,7 @@ trait Actor extends Logging with TransactionManagement { * Mandatory callback method that is called during restart and reinitialization after a server crash. * To be implemented by subclassing actor. */ - protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {} + protected def preRestart(reason: AnyRef, config: Option[AnyRef]) = {} /** * User overridable callback/setting. @@ -180,7 +192,7 @@ trait Actor extends Logging with TransactionManagement { * Mandatory callback method that is called during restart and reinitialization after a server crash. * To be implemented by subclassing actor. */ - protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {} + protected def postRestart(reason: AnyRef, config: Option[AnyRef]) = {} /** * User overridable callback/setting. @@ -201,6 +213,7 @@ trait Actor extends Logging with TransactionManagement { if (!isRunning) { dispatcher.start isRunning = true + if (isTransactional) this ! TransactionalInit } } @@ -489,8 +502,10 @@ trait Actor extends Logging with TransactionManagement { TransactionManagement.threadBoundTx.set(messageHandle.tx) setThreadLocalTransaction(messageHandle.tx.get.transaction) } + val message = messageHandle.message //serializeMessage(messageHandle.message) val future = messageHandle.future + try { if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision @@ -538,6 +553,7 @@ trait Actor extends Logging with TransactionManagement { private val lifeCycle: PartialFunction[Any, Unit] = { case Init(config) => init(config) + case TransactionalInit => initializeTransactionalState case HotSwap(code) => hotswap = code case Restart(reason) => restart(reason) case Exit(dead, reason) => handleTrapExit(dead, reason) diff --git a/akka-actors/src/main/scala/stm/HashTrie.scala b/akka-actors/src/main/scala/stm/HashTrie.scala old mode 100755 new mode 100644 index 44d83d127b..7da4ec5557 --- a/akka-actors/src/main/scala/stm/HashTrie.scala +++ b/akka-actors/src/main/scala/stm/HashTrie.scala @@ -185,7 +185,6 @@ private[collection] class CollisionNode[K, +V](val hash: Int, bucket: List[(K, V override def toString = "CollisionNode(" + bucket.toString + ")" } - private[collection] class BitmappedNode[K, +V](shift: Int)(table: Array[Node[K, V]], bits: Int) extends Node[K, V] { lazy val size = { val sizes = for { diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala index 8e5174da81..476acb7262 100644 --- a/akka-actors/src/main/scala/stm/Transaction.scala +++ b/akka-actors/src/main/scala/stm/Transaction.scala @@ -26,12 +26,20 @@ object Multiverse { } /** + * Example of atomic transaction management. + *
+ * import se.scalablesolutions.akka.stm.Transaction._
+ * Atomic {
+ *   .. // do something within a transaction
+ * }
+ * 
+ * * Example of Or-Else transaction management. *
- * import se.scalablesolutions.akka.stm.{Transaction => Tx}
- * Tx.Or {
+ * import se.scalablesolutions.akka.stm.Transaction._
+ * Or {
  *   .. // try to do something
- * } Tx.Else {
+ * } Else {
  *   .. // if transaction clashes try do do something else to minimize contention
  * }
  * 
@@ -39,9 +47,15 @@ object Multiverse { * @author Jonas Bonér */ object Transaction { + val idFactory = new AtomicLong(0L) // -- Monad -------------------------- + // -- atomic block -------------------------- + def Atomic[T](body: => T) = new AtomicTemplate[T]() { + def execute(t: MultiverseTransaction): T = body + }.execute() + // -- OrElse -------------------------- def Or[A](orBody: => A) = elseBody(orBody) @@ -49,15 +63,14 @@ object Transaction { def Else(elseBody: => A) = new OrElseTemplate[A] { def run(t: MultiverseTransaction) = orBody def orelserun(t: MultiverseTransaction) = elseBody - }.execute + }.execute() } } /** * @author Jonas Bonér */ @serializable class Transaction extends Logging { - private[this] var _id = 0L - def id = _id + val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: MultiverseTransaction = _ @@ -77,8 +90,7 @@ object Transaction { def begin(participant: String) = synchronized { ensureIsActiveOrNew transaction = Multiverse.STM.startUpdateTransaction("akka") - _id = transaction.getReadVersion - log.debug("Creating a new transaction with id [%s]", _id) + log.debug("Creating a new transaction with id [%s]", id) if (status == TransactionStatus.New) log.debug("TX BEGIN - Server with UUID [%s] is starting NEW transaction [%s]", participant, toString) else log.debug("Server [%s] is participating in transaction", participant) @@ -105,6 +117,7 @@ object Transaction { } else false if (haveAllPreCommitted && transaction != null) { transaction.commit + transaction.reset status = TransactionStatus.Completed reset true @@ -118,8 +131,9 @@ object Transaction { def rollback(participant: String) = synchronized { ensureIsActiveOrAborted log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) - transaction.abort status = TransactionStatus.Aborted + transaction.abort + transaction.reset reset } @@ -185,142 +199,3 @@ object TransactionStatus { case object Aborted extends TransactionStatus case object Completed extends TransactionStatus } - -/** - * Represents a snapshot of the current invocation. - * - * @author Jonas Bonér -object TransactionIdFactory { - // FIXME: will not work in distributed env - private val currentId = new AtomicLong(0L) - def newId = currentId.getAndIncrement -} - */ - -/** - * Represents a snapshot of the current invocation. - * - * @author Jonas Bonér -@serializable class Transaction extends Logging { - val id = TransactionIdFactory.newId - - log.debug("Creating a new transaction with id [%s]", id) - - @volatile private[this] var status: TransactionStatus = TransactionStatus.New - - private[this] val transactionalItems = new ChangeSet - - private[this] var participants: List[String] = Nil - private[this] var precommitted: List[String] = Nil - - private[this] val depth = new AtomicInteger(0) - - def increment = synchronized { depth.incrementAndGet } - def decrement = synchronized { depth.decrementAndGet } - def isTopLevel = synchronized { depth.get == 0 } - - def register(transactional: Transactional) = synchronized { - ensureIsActiveOrNew - transactionalItems + transactional - } - - def begin(participant: String) = synchronized { - ensureIsActiveOrNew - if (status == TransactionStatus.New) log.debug("TX BEGIN - Server with UUID [%s] is starting NEW transaction [%s]", participant, toString) - else log.debug("Server [%s] is participating in transaction", participant) - participants ::= participant - status = TransactionStatus.Active - } - - def precommit(participant: String) = synchronized { - if (status == TransactionStatus.Active) { - log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server with UUID [%s]", toString, participant) - precommitted ::= participant - } - } - - def commit(participant: String): Boolean = synchronized { - if (status == TransactionStatus.Active) { - log.debug("TX COMMIT - Committing transaction [%s] for server with UUID [%s]", toString, participant) - val haveAllPreCommitted = - if (participants.size == precommitted.size) {{ - for (part <- participants) yield { - if (precommitted.exists(_ == part)) true - else false - }}.exists(_ == true) - } else false - if (haveAllPreCommitted) { - transactionalItems.items.foreach(_.commit) - status = TransactionStatus.Completed - reset - true - } else false - } else { - reset - true - } - } - - def rollback(participant: String) = synchronized { - ensureIsActiveOrAborted - log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) - transactionalItems.items.foreach(_.rollback) - status = TransactionStatus.Aborted - reset - } - - def rollbackForRescheduling(participant: String) = synchronized { - ensureIsActiveOrAborted - log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) - transactionalItems.items.foreach(_.rollback) - reset - } - - def join(participant: String) = synchronized { - ensureIsActive - log.debug("TX JOIN - Server with UUID [%s] is joining transaction [%s]" , participant, toString) - participants ::= participant - } - - def isNew = status == TransactionStatus.New - def isActive = status == TransactionStatus.Active - def isCompleted = status == TransactionStatus.Completed - def isAborted = status == TransactionStatus.Aborted - - private def reset = { - transactionalItems.clear - participants = Nil - precommitted = Nil - } - - private def ensureIsActive = if (status != TransactionStatus.Active) - throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) - throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) - - private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) - throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) - - // For reinitialize transaction after sending it over the wire - private[akka] def reinit = synchronized { - import net.lag.logging.{Logger, Level} - if (log == null) { - log = Logger.get(this.getClass.getName) - log.setLevel(Level.ALL) - } - } - - override def equals(that: Any): Boolean = synchronized { - that != null && - that.isInstanceOf[Transaction] && - that.asInstanceOf[Transaction].id == this.id - } - - override def hashCode(): Int = id.toInt - - override def toString(): String = synchronized { - "Transaction[" + id + ", " + status + "]" - } -} - */ diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala index 67de756b54..6806c039bf 100644 --- a/akka-actors/src/main/scala/stm/TransactionalState.scala +++ b/akka-actors/src/main/scala/stm/TransactionalState.scala @@ -4,8 +4,10 @@ package se.scalablesolutions.akka.state -import org.multiverse.datastructures.refs.manual.Ref -import stm.TransactionManagement +//import org.multiverse.datastructures.refs.manual.Ref +import stm.{TransactionManagement, Ref} +import org.multiverse.templates.AtomicTemplate +import org.multiverse.api.Transaction; import akka.collection._ import org.codehaus.aspectwerkz.proxy.Uuid @@ -40,9 +42,27 @@ object TransactionalState extends TransactionalState * */ class TransactionalState { - def newMap[K, V] = new TransactionalMap[K, V] - def newVector[T] = new TransactionalVector[T] - def newRef[T] = new TransactionalRef[T] + def newMap[K, V] = { +// new AtomicTemplate[TransactionalMap[K, V]]() { +// def execute(t: Transaction): TransactionalMap[K, V] = { + new TransactionalMap[K, V] +// } +// }.execute() + } + def newVector[T] = { +// new AtomicTemplate[TransactionalVector[T]]() { +// def execute(t: Transaction): TransactionalVector[T] = { + new TransactionalVector[T] +// } +// }.execute() + } + def newRef[T] = { +// new AtomicTemplate[TransactionalRef[T]]() { +// def execute(t: Transaction): TransactionalRef[T] = { + new TransactionalRef[T] +// } +// }.execute() + } } /** @@ -58,7 +78,7 @@ trait Transactional { * Implements a transactional managed reference. * * @author Jonas Bonér - */ + * class TransactionalRef[T] extends Transactional { protected[this] var ref: Option[Ref[T]] = None @@ -82,7 +102,7 @@ class TransactionalRef[T] extends Transactional { if (isEmpty) default else ref.get.get - def isDefined: Boolean = ref.isDefined && !ref.get.isNull + def isDefined: Boolean = ref.isDefined //&& !ref.get.isNull def isEmpty: Boolean = !isDefined } @@ -95,7 +115,7 @@ object TransactionalRef { ref } } - +*/ /** * Implements an in-memory transactional Map based on Clojure's PersistentMap. * @@ -104,7 +124,8 @@ object TransactionalRef { * @author Jonas Bonér */ class TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] { - protected[this] val ref = TransactionalRef[HashTrie[K, V]](new HashTrie[K, V]) + protected[this] val ref = TransactionalRef[HashTrie[K, V]] + ref.swap(new HashTrie[K, V]) def -=(key: K) = remove(key) @@ -143,7 +164,7 @@ class TransactionalMap[K, V] extends Transactional with scala.collection.mutable } object TransactionalMap { - def apply[T]() = new TransactionalMap + def apply[K, V]() = new TransactionalMap[K, V] } /** @@ -154,7 +175,8 @@ object TransactionalMap { * @author Jonas Bonér */ class TransactionalVector[T] extends Transactional with RandomAccessSeq[T] { - private[this] val ref = TransactionalRef[Vector[T]](EmptyVector) + private[this] val ref = TransactionalRef[Vector[T]] + ref.swap(EmptyVector) def clear = ref.swap(EmptyVector) @@ -186,16 +208,12 @@ object TransactionalVector { def apply[T]() = new TransactionalVector } +class TransactionalRef[T] extends Transactional { + private[this] val ref = new Ref[T] + + def swap(elem: T) = + try { ref.set(elem) } catch { case e: org.multiverse.api.exceptions.LoadTooOldVersionException => ref.set(elem) } -/* -class TransactionalRef[T] private(elem: T) extends Transactional { - private[this] val ref = new Ref[T](elem) - - def swap(elem: T) = { - println("----- setting ref: " + ref) - println("----- setting in thread: " + Thread.currentThread) - ref.set(elem) - } def get: Option[T] = { if (ref.isNull) None @@ -215,10 +233,13 @@ class TransactionalRef[T] private(elem: T) extends Transactional { } object TransactionalRef { - def apply[T](elem: T) = { - if (elem == null) throw new IllegalArgumentException("Can't define TransactionalRef with a null initial value") - new TransactionalRef[T](elem) + def apply[T]() = { +// new AtomicTemplate[TransactionalRef[T]]() { +// def execute(t: Transaction): TransactionalRef[T] = { + new TransactionalRef[T] +// } +// }.execute() } } -*/ + diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala index d152522b6d..f03a066813 100644 --- a/akka-actors/src/test/scala/InMemoryActorSpec.scala +++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala @@ -1,10 +1,10 @@ package se.scalablesolutions.akka.actor import junit.framework.TestCase -import state.TransactionalState import org.junit.{Test, Before} import org.junit.Assert._ +import se.scalablesolutions.akka.state.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector} case class GetMapState(key: String) case object GetVectorState @@ -27,9 +27,15 @@ class InMemStatefulActor extends Actor { timeout = 100000 makeTransactionRequired //dispatcher = se.scalablesolutions.akka.reactor.Dispatchers.newThreadBasedDispatcher(this) - private val mapState = TransactionalState.newMap[String, String] - private val vectorState = TransactionalState.newVector[String] - private val refState = TransactionalState.newRef[String] + private var mapState: TransactionalMap[String, String] = _ + private var vectorState: TransactionalVector[String] = _ + private var refState: TransactionalRef[String] = _ + + override def initializeTransactionalState = { + mapState = TransactionalState.newMap[String, String] + vectorState = TransactionalState.newVector[String] + refState = TransactionalState.newRef[String] + } def receive: PartialFunction[Any, Unit] = { case GetMapState(key) => @@ -220,13 +226,13 @@ class InMemoryActorSpec extends TestCase { val stateful = new InMemStatefulActor stateful.start stateful !! SetRefState("init") // set init state - val failer = new InMemFailerActor +/* val failer = new InMemFailerActor failer.start try { stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method fail("should have thrown an exception") } catch {case e: RuntimeException => { }} - assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state +*/ assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state } } diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java index 1787548806..494b59a05c 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentClasher.java @@ -1,10 +1,10 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.state.TransactionalMap; -import se.scalablesolutions.akka.state.CassandraPersistentTransactionalMap; +import se.scalablesolutions.akka.state.*; public class PersistentClasher { - private TransactionalMap state = new CassandraPersistentTransactionalMap(); + private PersistentState factory = new PersistentState(); + private PersistentMap state = factory.newMap(new CassandraStorageConfig()); public String getState(String key) { return (String)state.get(key).get(); diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java index 3acf773644..94690caa31 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateful.java @@ -6,9 +6,9 @@ import se.scalablesolutions.akka.state.*; @transactionrequired public class PersistentStateful { private PersistentState factory = new PersistentState(); - private TransactionalMap mapState = factory.newMap(new CassandraStorageConfig()); - private TransactionalVector vectorState = factory.newVector(new CassandraStorageConfig());; - private TransactionalRef refState = factory.newRef(new CassandraStorageConfig()); + private PersistentMap mapState = factory.newMap(new CassandraStorageConfig()); + private PersistentVector vectorState = factory.newVector(new CassandraStorageConfig());; + private PersistentRef refState = factory.newRef(new CassandraStorageConfig()); public String getMapState(String key) { diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java index 6f26427118..42b13f0313 100644 --- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java +++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStatefulNested.java @@ -6,9 +6,9 @@ import se.scalablesolutions.akka.state.*; @transactionrequired public class PersistentStatefulNested { private PersistentState factory = new PersistentState(); - private TransactionalMap mapState = factory.newMap(new CassandraStorageConfig()); - private TransactionalVector vectorState = factory.newVector(new CassandraStorageConfig());; - private TransactionalRef refState = factory.newRef(new CassandraStorageConfig()); + private PersistentMap mapState = factory.newMap(new CassandraStorageConfig()); + private PersistentVector vectorState = factory.newVector(new CassandraStorageConfig());; + private PersistentRef refState = factory.newRef(new CassandraStorageConfig()); public String getMapState(String key) { diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/config/Ref.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/config/Ref.java new file mode 100644 index 0000000000..b9ded29df3 --- /dev/null +++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/config/Ref.java @@ -0,0 +1,248 @@ +package se.scalablesolutions.akka.stm; + +import static org.multiverse.api.StmUtils.retry; +import org.multiverse.api.Transaction; +import org.multiverse.api.exceptions.LoadUncommittedException; +import org.multiverse.api.exceptions.ReadonlyException; +import org.multiverse.datastructures.refs.ManagedRef; +import org.multiverse.stms.alpha.*; +import org.multiverse.stms.alpha.mixins.FastAtomicObjectMixin; +import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction; + +import static java.lang.String.format; + +/** + * A manual instrumented {@link org.multiverse.datastructures.refs.ManagedRef} implementation. + * If this class is used, you don't need to worry about instrumentation/javaagents and + * stuff like this. + *

+ * It is added to get the Akka project up and running, but probably will removed when the instrumentation + * is 100% up and running and this can be done compiletime instead of messing with javaagents. + * + * @author Peter Veentjer + */ +public final class Ref extends FastAtomicObjectMixin implements ManagedRef { + final public static class NoTransactionInScopeException extends RuntimeException {} + + public Ref() { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + ((AlphaTransaction) tx).attachNew(new RefTranlocal(Ref.this)); + } + + public Ref(Transaction t) { + ((AlphaTransaction) t).attachNew(new RefTranlocal(Ref.this)); + } + + public Ref(final E value) { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + ((AlphaTransaction) tx).attachNew(new RefTranlocal(Ref.this, value)); + } + + public Ref(Transaction t, final E value) { + ((AlphaTransaction) t).attachNew(new RefTranlocal(Ref.this, value)); + } + + public E get() { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this); + return tranlocalRef.get(); + } + + public E get(Transaction t) { + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this); + return tranlocalRef.get(); + } + + @Override + public E getOrAwait() { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this); + return tranlocalRef.getOrAwait(); + } + + public E getOrAwait(Transaction t) { + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this); + return tranlocalRef.getOrAwait(); + } + + + @Override + public E set(final E newRef) { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this); + return tranlocalRef.set(newRef); + } + + public E set(Transaction t, final E newRef) { + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this); + return tranlocalRef.set(newRef); + } + + @Override + public boolean isNull() { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this); + return tranlocalRef.isNull(); + } + + public boolean isNull(Transaction t) { + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this); + return tranlocalRef.isNull(); + } + + @Override + public E clear() { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this); + return tranlocalRef.clear(); + } + + public E clear(Transaction t) { + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this); + return tranlocalRef.clear(); + } + + @Override + public String toString() { + Transaction tx = getThreadLocalTransaction(); + if (tx == null) throw new NoTransactionInScopeException(); + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this); + return tranlocalRef.toString(); + } + + public String toString(Transaction t) { + RefTranlocal tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this); + return tranlocalRef.toString(); + } + + @Override + public RefTranlocal privatize(long readVersion) { + RefTranlocal origin = (RefTranlocal) load(readVersion); + if (origin == null) { + throw new LoadUncommittedException(); + } + return new RefTranlocal(origin); + } +} + +class RefTranlocal extends AlphaTranlocal { + //field belonging to the stm. + Ref atomicObject; + RefTranlocal origin; + + E ref; + + RefTranlocal(RefTranlocal origin) { + this.version = origin.version; + this.atomicObject = origin.atomicObject; + this.ref = origin.ref; + this.origin = origin; + } + + RefTranlocal(Ref owner) { + this(owner, null); + } + + RefTranlocal(Ref owner, E ref) { + this.version = Long.MIN_VALUE; + this.atomicObject = owner; + this.ref = ref; + } + + @Override + public AlphaAtomicObject getAtomicObject() { + return atomicObject; + } + + public E clear() { + E oldValue = ref; + ref = null; + return oldValue; + } + + public boolean isNull() { + return ref == null; + } + + public E get() { + return ref; + } + + public E set(E newValue) { + if (committed) { + throw new ReadonlyException(); + } + E oldValue = ref; + this.ref = newValue; + return oldValue; + } + + public E getOrAwait() { + if (isNull()) { + retry(); + } + + return ref; + } + + @Override + public String toString() { + if (ref == null) { + return "Ref(reference=null)"; + } else { + return format("Ref(reference=%s)", ref); + } + } + + @Override + public void prepareForCommit(long writeVersion) { + this.version = writeVersion; + this.committed = true; + this.origin = null; + } + + @Override + public AlphaTranlocalSnapshot takeSnapshot() { + return new RefTranlocalSnapshot(this); + } + + @Override + public DirtinessStatus getDirtinessStatus() { + if (committed) { + return DirtinessStatus.committed; + } else if (origin == null) { + return DirtinessStatus.fresh; + } else if (origin.ref != this.ref) { + return DirtinessStatus.dirty; + } else { + return DirtinessStatus.clean; + } + } +} + +class RefTranlocalSnapshot extends AlphaTranlocalSnapshot { + final RefTranlocal tranlocal; + final E value; + + RefTranlocalSnapshot(RefTranlocal tranlocal) { + this.tranlocal = tranlocal; + this.value = tranlocal.ref; + } + + @Override + public AlphaTranlocal getTranlocal() { + return tranlocal; + } + + @Override + public void restore() { + tranlocal.ref = value; + } +} \ No newline at end of file diff --git a/akka.ipr b/akka.ipr index 4b6cd3fc6a..ed7695be70 100644 --- a/akka.ipr +++ b/akka.ipr @@ -44,6 +44,15 @@