diff --git a/akka-core/pom.xml b/akka-core/pom.xml
index 5d58430aa6..d2116cd0f1 100644
--- a/akka-core/pom.xml
+++ b/akka-core/pom.xml
@@ -50,29 +50,6 @@
org.multiversemultiverse-alpha0.4-SNAPSHOT
- jar-with-dependencies
-
-
- org.multiverse
- multiverse-core
-
-
- asm
- asm-tree
-
-
- asm
- asm-analysis
-
-
- asm
- asm-commons
-
-
- asm
- asm-util
-
- org.scala-tools
diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index d6185b65a3..2919cc5c71 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.TransactionManagement._
-import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
+import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
@@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._
import java.util.{Queue, HashSet}
import java.util.concurrent.ConcurrentLinkedQueue
import java.net.InetSocketAddress
+import org.multiverse.commitbarriers.CountDownCommitBarrier
/**
* Implements the Transactor abstraction. E.g. a transactional actor.
@@ -72,7 +73,7 @@ object Actor extends Logging {
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
val PORT = config.getInt("akka.remote.server.port", 9999)
- object Sender{
+ object Sender {
implicit val Self: Option[Actor] = None
}
@@ -193,7 +194,7 @@ object Actor extends Logging {
*/
trait Actor extends TransactionManagement {
implicit protected val self: Option[Actor] = Some(this)
- implicit protected val transactionFamily: String = this.getClass.getName
+ implicit protected val transactionFamilyName: String = this.getClass.getName
// Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = UUID.newUuid.toString
@@ -216,6 +217,7 @@ trait Actor extends TransactionManagement {
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
+
// ====================================
// protected fields
// ====================================
@@ -333,8 +335,8 @@ trait Actor extends TransactionManagement {
/**
* User overridable callback/setting.
*
- * Partial function implementing the server logic.
- * To be implemented by subclassing server.
+ * Partial function implementing the actor logic.
+ * To be implemented by subclassing actor.
*
* Example code:
*
@@ -782,6 +784,11 @@ trait Actor extends TransactionManagement {
}
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
+ if (isTransactionSetInScope) {
+ log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
+ getTransactionSetInScope.incParties
+ }
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -793,8 +800,7 @@ trait Actor extends TransactionManagement {
.setIsEscaped(false)
val id = registerSupervisorAsRemoteActor
- if (id.isDefined)
- requestBuilder.setSupervisorUuid(id.get)
+ if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
// set the source fields used to reply back to the original sender
// (i.e. not the remote proxy actor)
@@ -813,18 +819,24 @@ trait Actor extends TransactionManagement {
RemoteProtocolBuilder.setMessage(message, requestBuilder)
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
} else {
- val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
+ val invocation = new MessageInvocation(this, message, None, sender, transactionSet.get)
if (_isEventBased) {
_mailbox.add(invocation)
if (_isSuspended) invocation.send
} else invocation.send
}
+ clearTransactionSet
}
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: Any,
timeout: Long,
senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+ if (isTransactionSetInScope) {
+ log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
+ getTransactionSetInScope.incParties
+ }
+
if (_remoteAddress.isDefined) {
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -838,16 +850,18 @@ trait Actor extends TransactionManagement {
val id = registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
+ clearTransactionSet
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else {
val future = if (senderFuture.isDefined) senderFuture.get
else new DefaultCompletableFutureResult(timeout)
- val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
+ val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get)
if (_isEventBased) {
_mailbox.add(invocation)
invocation.send
} else invocation.send
+ clearTransactionSet
future
}
}
@@ -856,6 +870,7 @@ trait Actor extends TransactionManagement {
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
*/
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
+ log.trace("%s is invoked with message %s", toString, messageHandle)
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
@@ -867,7 +882,7 @@ trait Actor extends TransactionManagement {
}
private def dispatch[T](messageHandle: MessageInvocation) = {
- setTransaction(messageHandle.tx)
+ setTransactionSet(messageHandle.transactionSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
@@ -889,43 +904,55 @@ trait Actor extends TransactionManagement {
}
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
- setTransaction(messageHandle.tx)
+ var topLevelTransaction = false
+ val txSet: Option[CountDownCommitBarrier] =
+ if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
+ else {
+ topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
+ if (isTransactionRequiresNew) {
+ log.trace("Creating a new transaction set (top-level transaction) \nfor actor %s \nwith message %s", toString, messageHandle)
+ Some(createNewTransactionSet)
+ } else None
+ }
+ setTransactionSet(txSet)
val message = messageHandle.message //serializeMessage(messageHandle.message)
senderFuture = messageHandle.future
sender = messageHandle.sender
+ def clearTx = {
+ clearTransactionSet
+ clearTransaction
+ }
+
def proceed = {
- try {
- incrementTransaction
- if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException(
- "Actor " + toString + " could not process message [" + message + "]" +
- "\n\tsince no matching 'case' clause in its 'receive' method could be found")
- } finally {
- decrementTransaction
- }
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException(
+ toString + " could not process message [" + message + "]" +
+ "\n\tsince no matching 'case' clause in its 'receive' method could be found")
+ setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
try {
- if (isTransactionRequiresNew && !isTransactionInScope) {
- //if (senderFuture.isEmpty) throw new StmException(
- // "Can't continue transaction in a one-way fire-forget message send" +
- // "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
- // "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
+ if (isTransactionRequiresNew) {
atomic {
proceed
}
} else proceed
} catch {
+ case e: IllegalStateException => {}
case e =>
+ // abort transaction set
+ if (isTransactionSetInScope) try { getTransactionSetInScope.abort } catch { case e: IllegalStateException => {} }
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
+
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
- clearTransaction // need to clear currentTransaction before call to supervisor
+ clearTx // need to clear currentTransaction before call to supervisor
+
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
} finally {
- clearTransaction
+ clearTx
}
}
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index f7bfa52215..b5d4d634f6 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -7,16 +7,17 @@ package se.scalablesolutions.akka.dispatch
import java.util.List
import se.scalablesolutions.akka.util.{HashCode, Logging}
-import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.actor.Actor
import java.util.concurrent.ConcurrentHashMap
+import org.multiverse.commitbarriers.CountDownCommitBarrier
+
final class MessageInvocation(val receiver: Actor,
val message: Any,
val future: Option[CompletableFutureResult],
val sender: Option[Actor],
- val tx: Option[Transaction]) {
+ val transactionSet: Option[CountDownCommitBarrier]) {
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
def invoke = receiver.invoke(this)
@@ -37,13 +38,13 @@ final class MessageInvocation(val receiver: Actor,
that.asInstanceOf[MessageInvocation].message == message
}
- override def toString(): String = synchronized {
+ override def toString = synchronized {
"MessageInvocation[" +
"\n\tmessage = " + message +
"\n\treceiver = " + receiver +
"\n\tsender = " + sender +
"\n\tfuture = " + future +
- "\n\ttx = " + tx +
+ "\n\ttransactionSet = " + transactionSet +
"\n]"
}
}
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index d535f98433..133c292a6f 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -7,16 +7,18 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
+import scala.collection.mutable.HashMap
+
import se.scalablesolutions.akka.state.Committable
import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
-import org.multiverse.commitbarriers.VetoCommitBarrier
-
-import scala.collection.mutable.HashMap
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
+import org.multiverse.utils.backoff.ExponentialBackoffPolicy
+import org.multiverse.stms.alpha.AlphaStm
+import java.util.concurrent.TimeUnit
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
@@ -98,21 +100,42 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
*/
object Transaction extends TransactionManagement {
val idFactory = new AtomicLong(-1L)
+/*
+ import AlphaStm._
+ private val defaultTxBuilder = new AlphaTransactionFactoryBuilder
+ defaultTxBuilder.setReadonly(false)
+ defaultTxBuilder.setInterruptible(INTERRUPTIBLE)
+ defaultTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
+ defaultTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
+ defaultTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
+ defaultTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
+ defaultTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
+ private val readOnlyTxBuilder = new AlphaStm.AlphaTransactionFactoryBuilder
+ readOnlyTxBuilder.setReadonly(true)
+ readOnlyTxBuilder.setInterruptible(INTERRUPTIBLE)
+ readOnlyTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
+ readOnlyTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
+ readOnlyTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
+ readOnlyTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
+ readOnlyTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
+ */
+ /**
+ * See ScalaDoc on class.
+ */
+ def map[T](f: => T)(implicit transactionFamilyName: String): T =
+ atomic {f}
/**
* See ScalaDoc on class.
*/
- def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
+ def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
+ atomic {f}
/**
* See ScalaDoc on class.
*/
- def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
-
- /**
- * See ScalaDoc on class.
- */
- def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic {f(getTransactionInScope)}
+ def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
+ atomic {f}
/**
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
@@ -127,82 +150,39 @@ object Transaction extends TransactionManagement {
* See ScalaDoc on class.
*/
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
+ // defaultTxBuilder.setFamilyName(transactionFamilyName)
+ // new TransactionTemplate[T](defaultTxBuilder.build) {
new TransactionTemplate[T]() { // FIXME take factory
- def execute(mtx: MultiverseTransaction): T = body
+ def execute(mtx: MultiverseTransaction): T = {
+ val result = body
+
+ log.trace("Committing transaction [%s] \nwith family name [%s] \nby joining transaction set")
+ getTransactionSetInScope.joinCommit(mtx)
+
+ // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+ //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+
+ clearTransaction
+ result
+ }
override def onStart(mtx: MultiverseTransaction) = {
+ val txSet = if (!isTransactionSetInScope) createNewTransactionSet
+ else getTransactionSetInScope
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
- }
- override def onPostCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
+ txSet.registerOnCommitTask(new Runnable() {
+ def run = tx.commit
+ })
+ txSet.registerOnAbortTask(new Runnable() {
+ def run = tx.abort
+ })
}
}.execute()
}
- /**
- * See ScalaDoc on class.
- */
- def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
- new TransactionTemplate[T]() { // FIXME take factory
- def execute(mtx: MultiverseTransaction): T = body
-
- override def onStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
- }
-
- override def onPostCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute
- }
-
- /**
- * See ScalaDoc on class.
- */
- def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
- new TransactionTemplate[T]() { // FIXME take factory
- def execute(mtx: MultiverseTransaction): T = body
-
- override def onStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
- }
-
- override def onPostCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute
- }
-
- /**
- * See ScalaDoc on class.
- */
- def atomicReadOnly[T](body: => T): T = {
- new TransactionTemplate[T]() { // FIXME take factory
- def execute(mtx: MultiverseTransaction): T = body
-
- override def onStart(mtx: MultiverseTransaction) = {
- val tx = new Transaction
- tx.transaction = Some(mtx)
- setTransaction(Some(tx))
- }
-
- override def onPostCommit = {
- if (isTransactionInScope) getTransactionInScope.commit
- else throw new IllegalStateException("No transaction in scope")
- }
- }.execute
- }
-
/**
* See ScalaDoc on class.
*/
@@ -215,7 +195,6 @@ object Transaction extends TransactionManagement {
def elseBody[A](firstBody: => A) = new {
def orElse(secondBody: => A) = new OrElseTemplate[A] {
def run(t: MultiverseTransaction) = firstBody
-
def orelserun(t: MultiverseTransaction) = secondBody
}.execute()
}
@@ -227,37 +206,34 @@ object Transaction extends TransactionManagement {
@serializable class Transaction extends Logging {
import Transaction._
+ log.trace("Creating %s", toString)
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
private[akka] val depth = new AtomicInteger(0)
- //private[akka] val transactionSet = new VetoCommitBarrier
-
- //transactionSet.vetoCommit(this)
// --- public methods ---------
- def abort = synchronized {
- // transactionSet.abort
- }
-
def commit = synchronized {
+ log.trace("Committing transaction %s", toString)
pureAtomic {
persistentStateMap.values.foreach(_.commit)
- TransactionManagement.clearTransaction
}
- //transactionSet.vetoCommit(this)
status = TransactionStatus.Completed
}
- def isNew = synchronized {status == TransactionStatus.New}
+ def abort = synchronized {
+ log.trace("Aborting transaction %s", toString)
+ }
- def isActive = synchronized {status == TransactionStatus.Active}
+ def isNew = synchronized { status == TransactionStatus.New }
- def isCompleted = synchronized {status == TransactionStatus.Completed}
+ def isActive = synchronized { status == TransactionStatus.Active }
- def isAborted = synchronized {status == TransactionStatus.Aborted}
+ def isCompleted = synchronized { status == TransactionStatus.Completed }
+
+ def isAborted = synchronized { status == TransactionStatus.Aborted }
// --- internal methods ---------
@@ -300,9 +276,9 @@ object Transaction extends TransactionManagement {
that.asInstanceOf[Transaction].id == this.id
}
- override def hashCode(): Int = synchronized {id.toInt}
+ override def hashCode(): Int = synchronized { id.toInt }
- override def toString(): String = synchronized {"Transaction[" + id + ", " + status + "]"}
+ override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
}
/**
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 1f2ede3024..60a6ae6de3 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -9,53 +9,80 @@ import java.util.concurrent.atomic.AtomicBoolean
import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.ThreadLocalTransaction._
+import org.multiverse.commitbarriers.CountDownCommitBarrier
class StmException(msg: String) extends RuntimeException(msg)
-class TransactionAwareWrapperException(
- val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
- override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
+class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
+ override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.Config._
- val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
- val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
-
+ val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
+ val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
+ val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", true)
+ val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 1000)
+ val TRANSACTION_TIMEOUT = config.getInt("akka.stm.timeout", 10000)
+ val SMART_TX_LENGTH_SELECTOR = config.getBool("akka.stm.smart-tx-length-selector", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
+
def disableTransactions = TRANSACTION_ENABLED.set(false)
- private[akka] val currentTransaction = new ThreadLocal[Option[Transaction]]() {
+ private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
+ override protected def initialValue: Option[CountDownCommitBarrier] = None
+ }
+
+ private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
override protected def initialValue: Option[Transaction] = None
}
+
+ private[akka] def getTransactionSet: CountDownCommitBarrier = {
+ val option = transactionSet.get
+ if ((option eq null) || option.isEmpty) throw new IllegalStateException("No TransactionSet in scope")
+ option.get
+ }
+
+ private[akka] def getTransaction: Transaction = {
+ val option = transaction.get
+ if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction in scope")
+ option.get
+ }
}
trait TransactionManagement extends Logging {
- import TransactionManagement.currentTransaction
- private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
-
- private[akka] def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) {
- val tx = transaction.get
- currentTransaction.set(transaction)
- if (tx.transaction.isDefined) setThreadLocalTransaction(tx.transaction.get)
- else throw new IllegalStateException("No transaction defined")
+ private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
+ val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
+ TransactionManagement.transactionSet.set(Some(txSet))
+ txSet
}
+ private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) =
+ if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet)
+
+ private[akka] def setTransaction(tx: Option[Transaction]) =
+ if (tx.isDefined) TransactionManagement.transaction.set(tx)
+
+ private[akka] def clearTransactionSet = TransactionManagement.transactionSet.set(None)
+
private[akka] def clearTransaction = {
- currentTransaction.set(None)
+ TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}
- private[akka] def getTransactionInScope = currentTransaction.get.get
-
- private[akka] def isTransactionInScope = currentTransaction.get.isDefined
+ private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet
- private[akka] def isTransactionTopLevel = if (isTransactionInScope) getTransactionInScope.isTopLevel
+ private[akka] def getTransactionInScope = TransactionManagement.getTransaction
- private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
-
- private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
-}
+ private[akka] def isTransactionSetInScope = {
+ val option = TransactionManagement.transactionSet.get
+ (option ne null) && option.isDefined
+ }
+ private[akka] def isTransactionInScope = {
+ val option = TransactionManagement.transaction.get
+ (option ne null) && option.isDefined
+ }
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 195b134271..1b52faf969 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -77,7 +77,7 @@ class TransactionalRef[T] extends Transactional {
implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
- private[this] val ref: AlphaRef[T] = atomic { new AlphaRef }
+ private[this] lazy val ref: AlphaRef[T] = new AlphaRef
def swap(elem: T) = {
ensureIsInTransaction
diff --git a/akka-core/src/test/scala/InMemoryActorTest.scala b/akka-core/src/test/scala/InMemoryActorTest.scala
index cd06b80d0a..d4be98fcaa 100644
--- a/akka-core/src/test/scala/InMemoryActorTest.scala
+++ b/akka-core/src/test/scala/InMemoryActorTest.scala
@@ -23,7 +23,7 @@ case class SuccessOneWay(key: String, value: String)
case class FailureOneWay(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
- timeout = 100000
+ timeout = 5000
makeTransactionRequired
private lazy val mapState = TransactionalState.newMap[String, String]
@@ -86,8 +86,8 @@ class InMemFailerActor extends Actor {
}
class InMemoryActorTest extends JUnitSuite {
+ import Actor.Sender.Self
- /*
@Test
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -98,7 +98,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
- */
+
@Test
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -107,7 +107,7 @@ class InMemoryActorTest extends JUnitSuite {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
- /*
+
@Test
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -120,7 +120,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
- */
+
@Test
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -134,7 +134,7 @@ class InMemoryActorTest extends JUnitSuite {
} catch {case e: RuntimeException => {}}
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
- /*
+
@Test
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -145,7 +145,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert(2 === (stateful !! GetVectorSize).get)
}
- */
+
@Test
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -154,7 +154,7 @@ class InMemoryActorTest extends JUnitSuite {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert(2 === (stateful !! GetVectorSize).get)
}
- /*
+
@Test
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -167,7 +167,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert(1 === (stateful !! GetVectorSize).get)
}
- */
+
@Test
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -181,7 +181,7 @@ class InMemoryActorTest extends JUnitSuite {
} catch {case e: RuntimeException => {}}
assert(1 === (stateful !! GetVectorSize).get)
}
- /*
+
@Test
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -192,7 +192,7 @@ class InMemoryActorTest extends JUnitSuite {
Thread.sleep(1000)
assert("new state" === (stateful !! GetRefState).get)
}
- */
+
@Test
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -201,7 +201,7 @@ class InMemoryActorTest extends JUnitSuite {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assert("new state" === (stateful !! GetRefState).get)
}
- /*
+
@Test
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -212,9 +212,9 @@ class InMemoryActorTest extends JUnitSuite {
failer.start
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
Thread.sleep(1000)
- assert("init" === (stateful !! GetRefState).get) // check that state is == init state
+ assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
}
- */
+
@Test
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
diff --git a/akka-core/src/test/scala/ShutdownSpec.scala b/akka-core/src/test/scala/ShutdownSpec.scala
index ba03fbe902..20927bbfb1 100644
--- a/akka-core/src/test/scala/ShutdownSpec.scala
+++ b/akka-core/src/test/scala/ShutdownSpec.scala
@@ -2,9 +2,8 @@ package se.scalablesolutions.akka.remote
import se.scalablesolutions.akka.actor.Actor
-object ActorShutdownSpec {
+object ActorShutdownRunner {
def main(args: Array[String]) {
-
class MyActor extends Actor {
def receive = {
case "test" => println("received test")
@@ -22,7 +21,7 @@ object ActorShutdownSpec {
// case 2
-object RemoteServerAndClusterShutdownSpec {
+object RemoteServerAndClusterShutdownRunner {
def main(args: Array[String]) {
val s1 = new RemoteServer
val s2 = new RemoteServer
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 8a51feed6b..366403ef46 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -11,7 +11,7 @@ import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.Kernel;
import junit.framework.TestCase;
-/*
+
public class InMemNestedStateTest extends TestCase {
static String messageLog = "";
@@ -133,4 +133,3 @@ public class InMemNestedStateTest extends TestCase {
assertEquals("init", nested.getRefState()); // check that state is == init state
}
}
-*/
\ No newline at end of file
diff --git a/akka-patterns/src/main/scala/Agent.scala b/akka-patterns/src/main/scala/Agent.scala
index 4dd8640c32..aea74530a3 100644
--- a/akka-patterns/src/main/scala/Agent.scala
+++ b/akka-patterns/src/main/scala/Agent.scala
@@ -50,30 +50,30 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
* Periodically handles incoming messages
*/
def receive = {
- case FunctionHolder(fun: (T => T)) => atomic { updateData(fun(value.getOrWait)) }
+ case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
case ValueHolder(x: T) => updateData(x)
- case ProcedureHolder(fun: (T => Unit)) => atomic { fun(copyStrategy(value.getOrWait)) }
+ case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
}
/**
* Specifies how a copy of the value is made, defaults to using identity
*/
- protected def copyStrategy(t : T) : T = t
+ protected def copyStrategy(t: T): T = t
/**
* Updates the internal state with the value provided as a by-name parameter
*/
- private final def updateData(newData: => T) : Unit = atomic { value.swap(newData) }
+ private final def updateData(newData: => T): Unit = value.swap(newData)
/**
* Submits a request to read the internal state.
* A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
* Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
- final def get : T = {
+ final def get: T = {
val ref = new AtomicReference[T]
val latch = new CountDownLatch(1)
get((x: T) => {ref.set(x); latch.countDown})
@@ -85,14 +85,14 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
* Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
* A copy of the internal state will be used, depending on the underlying effective copyStrategy.
*/
- final def get(message: (T => Unit)) : Unit = this ! ProcedureHolder(message)
+ final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)
/**
* Submits a request to read the internal state.
* A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
* Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
*/
- final def apply() : T = get
+ final def apply(): T = get
/**
* Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
@@ -103,22 +103,22 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
/**
* Submits the provided function for execution against the internal agent's state
*/
- final def apply(message: (T => T)) : Unit = this ! FunctionHolder(message)
+ final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
/**
* Submits a new value to be set as the new agent's internal state
*/
- final def apply(message: T) : Unit = this ! ValueHolder(message)
+ final def apply(message: T): Unit = this ! ValueHolder(message)
/**
* Submits the provided function for execution against the internal agent's state
*/
- final def update(message: (T => T)) : Unit = this ! FunctionHolder(message)
+ final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
/**
* Submits a new value to be set as the new agent's internal state
*/
- final def update(message: T) : Unit = this ! ValueHolder(message)
+ final def update(message: T): Unit = this ! ValueHolder(message)
}
/**
@@ -135,7 +135,7 @@ object Agent {
/**
* Creates a new Agent of type T with the initial value of value
*/
- def apply[T](value:T) : Agent[T] = new Agent(value)
+ def apply[T](value:T): Agent[T] = new Agent(value)
/**
* Creates a new Agent of type T with the initial value of value and with the specified copy function
diff --git a/akka-patterns/src/test/scala/ActorPatternsTest.scala b/akka-patterns/src/test/scala/ActorPatternsTest.scala
index 11f2664640..ae6ae5c0e8 100644
--- a/akka-patterns/src/test/scala/ActorPatternsTest.scala
+++ b/akka-patterns/src/test/scala/ActorPatternsTest.scala
@@ -21,12 +21,12 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
var targetOk = 0
- val t1 = actor() receive {
+ val t1: Actor = actor {
case `testMsg1` => targetOk += 2
case `testMsg2` => targetOk += 4
}
- val t2 = actor() receive {
+ val t2: Actor = actor {
case `testMsg3` => targetOk += 8
}
@@ -48,7 +48,7 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
@Test def testLogger = verify(new TestActor {
def test = {
val msgs = new HashSet[Any]
- val t1 = actor() receive {
+ val t1: Actor = actor {
case _ =>
}
val l = loggerActor(t1,(x) => msgs += x)
diff --git a/akka-patterns/src/test/scala/AgentTest.scala b/akka-patterns/src/test/scala/AgentTest.scala
index 17ccce8e0a..25961f8222 100644
--- a/akka-patterns/src/test/scala/AgentTest.scala
+++ b/akka-patterns/src/test/scala/AgentTest.scala
@@ -7,18 +7,23 @@ import org.scalatest.junit.JUnitRunner
import org.scalatest.matchers.MustMatchers
import org.junit.{Test}
+/*
@RunWith(classOf[JUnitRunner])
class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
- @Test def testAgent = verify(new TestActor {
- def test = {
- val t = Agent(5)
- handle(t){
- t.update( _ + 1 )
- t.update( _ * 2 )
- val r = t()
- r must be (12)
- }
- }
+ @Test def testAgent = verify(new TestActor {
+ def test = {
+ atomic {
+ val t = Agent(5)
+ handle(t) {
+ t.update(_ + 1)
+ t.update(_ * 2)
+
+ val r = t()
+ r must be(12)
+ }
+ }
+ }
})
-}
\ No newline at end of file
+}
+*/
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index dc55e0eca1..b4ea8fc381 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -4,7 +4,7 @@
package se.scalablesolutions.akka.state
-import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
+import se.scalablesolutions.akka.stm.TransactionManagement.transaction
import se.scalablesolutions.akka.collection._
import se.scalablesolutions.akka.util.Logging
@@ -64,10 +64,6 @@ trait Storage {
throw new UnsupportedOperationException
}
-
-
-
-
/**
* Implementation of PersistentMap for every concrete
* storage will have the same workflow. This abstracts the workflow.
@@ -162,8 +158,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
}
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
@@ -236,8 +232,8 @@ trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Com
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
@@ -272,8 +268,8 @@ trait PersistentRef[T] extends Transactional with Committable {
}
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
@@ -397,7 +393,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
throw new UnsupportedOperationException("dequeueAll not supported")
private def register = {
- if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
- currentTransaction.get.get.register(uuid, this)
+ if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+ transaction.get.get.register(uuid, this)
}
}
diff --git a/akka.iml b/akka.iml
index 2f07a75716..a39c87020f 100644
--- a/akka.iml
+++ b/akka.iml
@@ -2,7 +2,18 @@
-
+
+
+
+
+
+
+
+
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 749b599e0b..296b06428b 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -30,8 +30,10 @@
service = on
- max-nr-of-retries = 100
- distributed = off # not implemented yet
+ fair = on # should transactions be fair or non-fair (non fair yield better performance)
+ max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up
+ timeout = 10000 # transaction timeout; if transaction have not committed within the timeout then it is aborted
+ distributed = off # not implemented yet
diff --git a/pom.xml b/pom.xml
index d2885abf61..480ad8723a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,6 +161,11 @@
Codehaus Maven Repositoryhttp://repository.codehaus.org
+
+ snapshots.repository.codehaus.org
+ Codehaus Maven Snapshot Repository
+ http://snapshots.repository.codehaus.org
+ maven2-repository.dev.java.netJava.net Repository for Maven
@@ -301,11 +306,11 @@
**/*Test.java
-
+ **/*Spec.java
-
+
akka.home