From 93f2fe0c35a59ea3611cf96d60f6c3f341b76645 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Thu, 18 Feb 2010 21:38:53 +0100 Subject: [PATCH 1/8] updated to 0.4 multiverse --- akka-util-java/pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/akka-util-java/pom.xml b/akka-util-java/pom.xml index e0a729491b..3daa008792 100644 --- a/akka-util-java/pom.xml +++ b/akka-util-java/pom.xml @@ -27,7 +27,7 @@ org.multiverse multiverse-alpha - 0.3 + 0.4-SNAPSHOT jar-with-dependencies From 87f66d0c4f3666a0f497cda23f9e2a1a85cb5340 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Mon, 22 Feb 2010 13:22:10 +0100 Subject: [PATCH 2/8] upgraded to multiverse 0.4-SNAPSHOT --- akka-core/pom.xml | 28 ++ akka-core/src/main/scala/actor/Actor.scala | 8 +- .../src/main/scala/stm/Transaction.scala | 132 ++++--- .../scala/stm/TransactionManagement.scala | 4 +- .../main/scala/stm/TransactionalState.scala | 5 +- akka-util-java/pom.xml | 28 -- .../akka/stm/AtomicTemplate.java | 341 ------------------ pom.xml | 52 ++- 8 files changed, 136 insertions(+), 462 deletions(-) delete mode 100644 akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java diff --git a/akka-core/pom.xml b/akka-core/pom.xml index d6ca57ebfe..5d58430aa6 100644 --- a/akka-core/pom.xml +++ b/akka-core/pom.xml @@ -46,6 +46,34 @@ netty 3.2.0.ALPHA3 + + org.multiverse + multiverse-alpha + 0.4-SNAPSHOT + jar-with-dependencies + + + org.multiverse + multiverse-core + + + asm + asm-tree + + + asm + asm-analysis + + + asm + asm-commons + + + asm + asm-util + + + org.scala-tools javautils diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 41b2673bfa..a6d9231b10 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -911,10 +911,10 @@ trait Actor extends TransactionManagement { try { if (isTransactionRequiresNew && !isTransactionInScope) { - if (senderFuture.isEmpty) throw new StmException( - "Can't continue transaction in a one-way fire-forget message send" + - "\n\tE.g. using Actor '!' method or Active Object 'void' method" + - "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type") + //if (senderFuture.isEmpty) throw new StmException( + // "Can't continue transaction in a one-way fire-forget message send" + + // "\n\tE.g. using Actor '!' method or Active Object 'void' method" + + // "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type") atomic { proceed } diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 1637b4c906..d535f98433 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -13,9 +13,10 @@ import se.scalablesolutions.akka.util.Logging import org.multiverse.api.{Transaction => MultiverseTransaction} import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance import org.multiverse.api.ThreadLocalTransaction._ -import org.multiverse.templates.OrElseTemplate +import org.multiverse.commitbarriers.VetoCommitBarrier import scala.collection.mutable.HashMap +import org.multiverse.templates.{TransactionTemplate, OrElseTemplate} class NoTransactionInScopeException extends RuntimeException class TransactionRetryException(message: String) extends RuntimeException(message) @@ -30,8 +31,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag * Here are some examples (assuming implicit transaction family name in scope): *
  * import se.scalablesolutions.akka.stm.Transaction._
- * 
- * atomic {
+ *
+ * atomic  {
  *   .. // do something within a transaction
  * }
  * 
@@ -39,8 +40,8 @@ class TransactionRetryException(message: String) extends RuntimeException(messag * Example of atomic transaction management using atomic block with retry count: *
  * import se.scalablesolutions.akka.stm.Transaction._
- * 
- * atomic(maxNrOfRetries) {
+ *
+ * atomic(maxNrOfRetries)  {
  *   .. // do something within a transaction
  * }
  * 
@@ -49,10 +50,10 @@ class TransactionRetryException(message: String) extends RuntimeException(messag * Which is a good way to reduce contention and transaction collisions. *
  * import se.scalablesolutions.akka.stm.Transaction._
- * 
- * atomically {
+ *
+ * atomically  {
  *   .. // try to do something
- * } orElse {
+ * } orElse  {
  *   .. // if transaction clashes try do do something else to minimize contention
  * }
  * 
@@ -61,11 +62,11 @@ class TransactionRetryException(message: String) extends RuntimeException(messag * *
  * import se.scalablesolutions.akka.stm.Transaction._
- * for (tx <- Transaction) {
+ * for (tx <- Transaction)  {
  *   ... // do transactional stuff
  * }
  *
- * val result = for (tx <- Transaction) yield {
+ * val result = for (tx <- Transaction) yield  {
  *   ... // do transactional stuff yielding a result
  * }
  * 
@@ -78,17 +79,17 @@ class TransactionRetryException(message: String) extends RuntimeException(messag * * // You can use them together with Transaction in a for comprehension since * // TransactionalRef is also monadic - * for { + * for { * tx <- Transaction * ref <- refs * } { * ... // use the ref inside a transaction * } * - * val result = for { + * val result = for { * tx <- Transaction * ref <- refs - * } yield { + * } yield { * ... // use the ref inside a transaction, yield a result * } * @@ -101,57 +102,61 @@ object Transaction extends TransactionManagement { /** * See ScalaDoc on class. */ - def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) } + def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)} /** * See ScalaDoc on class. */ - def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) } + def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)} /** * See ScalaDoc on class. */ - def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) } + def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic {f(getTransactionInScope)} /** * Creates a "pure" STM atomic transaction and by-passes all transactions hooks * such as persistence etc. * Only for internal usage. */ - private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T]( - getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) { + private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() { def execute(mtx: MultiverseTransaction): T = body }.execute() /** * See ScalaDoc on class. */ - def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T]( - getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) { - def execute(mtx: MultiverseTransaction): T = body - override def postStart(mtx: MultiverseTransaction) = { - val tx = new Transaction - tx.transaction = Some(mtx) - setTransaction(Some(tx)) - } - override def postCommit = { - if (isTransactionInScope) getTransactionInScope.commit - else throw new IllegalStateException("No transaction in scope") - } - }.execute() + def atomic[T](body: => T)(implicit transactionFamilyName: String): T = { + new TransactionTemplate[T]() { // FIXME take factory + def execute(mtx: MultiverseTransaction): T = body + + override def onStart(mtx: MultiverseTransaction) = { + val tx = new Transaction + tx.transaction = Some(mtx) + setTransaction(Some(tx)) + } + + override def onPostCommit = { + if (isTransactionInScope) getTransactionInScope.commit + else throw new IllegalStateException("No transaction in scope") + } + }.execute() + } /** * See ScalaDoc on class. */ def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = { - new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) { + new TransactionTemplate[T]() { // FIXME take factory def execute(mtx: MultiverseTransaction): T = body - override def postStart(mtx: MultiverseTransaction) = { + + override def onStart(mtx: MultiverseTransaction) = { val tx = new Transaction tx.transaction = Some(mtx) setTransaction(Some(tx)) } - override def postCommit = { + + override def onPostCommit = { if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } @@ -162,14 +167,16 @@ object Transaction extends TransactionManagement { * See ScalaDoc on class. */ def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = { - new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) { + new TransactionTemplate[T]() { // FIXME take factory def execute(mtx: MultiverseTransaction): T = body - override def postStart(mtx: MultiverseTransaction) = { + + override def onStart(mtx: MultiverseTransaction) = { val tx = new Transaction tx.transaction = Some(mtx) setTransaction(Some(tx)) } - override def postCommit = { + + override def onPostCommit = { if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } @@ -180,14 +187,16 @@ object Transaction extends TransactionManagement { * See ScalaDoc on class. */ def atomicReadOnly[T](body: => T): T = { - new AtomicTemplate[T](true) { + new TransactionTemplate[T]() { // FIXME take factory def execute(mtx: MultiverseTransaction): T = body - override def postStart(mtx: MultiverseTransaction) = { + + override def onStart(mtx: MultiverseTransaction) = { val tx = new Transaction tx.transaction = Some(mtx) setTransaction(Some(tx)) } - override def postCommit = { + + override def onPostCommit = { if (isTransactionInScope) getTransactionInScope.commit else throw new IllegalStateException("No transaction in scope") } @@ -206,6 +215,7 @@ object Transaction extends TransactionManagement { def elseBody[A](firstBody: => A) = new { def orElse(secondBody: => A) = new OrElseTemplate[A] { def run(t: MultiverseTransaction) = firstBody + def orelserun(t: MultiverseTransaction) = secondBody }.execute() } @@ -216,30 +226,38 @@ object Transaction extends TransactionManagement { */ @serializable class Transaction extends Logging { import Transaction._ - + val id = Transaction.idFactory.incrementAndGet @volatile private[this] var status: TransactionStatus = TransactionStatus.New private[akka] var transaction: Option[MultiverseTransaction] = None private[this] val persistentStateMap = new HashMap[String, Committable] private[akka] val depth = new AtomicInteger(0) - + //private[akka] val transactionSet = new VetoCommitBarrier + + //transactionSet.vetoCommit(this) + // --- public methods --------- + def abort = synchronized { + // transactionSet.abort + } + def commit = synchronized { pureAtomic { persistentStateMap.values.foreach(_.commit) TransactionManagement.clearTransaction } + //transactionSet.vetoCommit(this) status = TransactionStatus.Completed } - def isNew = synchronized { status == TransactionStatus.New } + def isNew = synchronized {status == TransactionStatus.New} - def isActive = synchronized { status == TransactionStatus.Active } + def isActive = synchronized {status == TransactionStatus.Active} - def isCompleted = synchronized { status == TransactionStatus.Completed } + def isCompleted = synchronized {status == TransactionStatus.Completed} - def isAborted = synchronized { status == TransactionStatus.Aborted } + def isAborted = synchronized {status == TransactionStatus.Aborted} // --- internal methods --------- @@ -259,13 +277,13 @@ object Transaction extends TransactionManagement { private def ensureIsActiveOrAborted = if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted)) - throw new IllegalStateException( - "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) + throw new IllegalStateException( + "Expected ACTIVE or ABORTED transaction - current status [" + status + "]: " + toString) private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New)) - throw new IllegalStateException( - "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) + throw new IllegalStateException( + "Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) // For reinitialize transaction after sending it over the wire private[akka] def reinit = synchronized { @@ -277,14 +295,14 @@ object Transaction extends TransactionManagement { } override def equals(that: Any): Boolean = synchronized { - that != null && - that.isInstanceOf[Transaction] && - that.asInstanceOf[Transaction].id == this.id + that != null && + that.isInstanceOf[Transaction] && + that.asInstanceOf[Transaction].id == this.id } - - override def hashCode(): Int = synchronized { id.toInt } - - override def toString(): String = synchronized { "Transaction[" + id + ", " + status + "]" } + + override def hashCode(): Int = synchronized {id.toInt} + + override def toString(): String = synchronized {"Transaction[" + id + ", " + status + "]"} } /** diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 2dd7ed9c79..1f2ede3024 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -26,7 +26,7 @@ object TransactionManagement extends TransactionManagement { def isTransactionalityEnabled = TRANSACTION_ENABLED.get def disableTransactions = TRANSACTION_ENABLED.set(false) - private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() { + private[akka] val currentTransaction = new ThreadLocal[Option[Transaction]]() { override protected def initialValue: Option[Transaction] = None } } @@ -52,6 +52,8 @@ trait TransactionManagement extends Logging { private[akka] def isTransactionInScope = currentTransaction.get.isDefined + private[akka] def isTransactionTopLevel = if (isTransactionInScope) getTransactionInScope.isTopLevel + private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala index 6003a89f89..195b134271 100644 --- a/akka-core/src/main/scala/stm/TransactionalState.scala +++ b/akka-core/src/main/scala/stm/TransactionalState.scala @@ -8,8 +8,7 @@ import se.scalablesolutions.akka.stm.Transaction.atomic import se.scalablesolutions.akka.stm.NoTransactionInScopeException import se.scalablesolutions.akka.collection._ import se.scalablesolutions.akka.util.UUID - -import org.multiverse.datastructures.refs.manual.Ref; +import org.multiverse.stms.alpha.AlphaRef /** * Example Scala usage: @@ -78,7 +77,7 @@ class TransactionalRef[T] extends Transactional { implicit val txInitName = "TransactionalRef:Init" val uuid = UUID.newUuid.toString - private[this] val ref: Ref[T] = atomic { new Ref } + private[this] val ref: AlphaRef[T] = atomic { new AlphaRef } def swap(elem: T) = { ensureIsInTransaction diff --git a/akka-util-java/pom.xml b/akka-util-java/pom.xml index 3daa008792..6db055b223 100644 --- a/akka-util-java/pom.xml +++ b/akka-util-java/pom.xml @@ -24,34 +24,6 @@ protobuf-java 2.2.0
- - org.multiverse - multiverse-alpha - 0.4-SNAPSHOT - jar-with-dependencies - - - org.multiverse - multiverse-core - - - asm - asm-tree - - - asm - asm-analysis - - - asm - asm-commons - - - asm - asm-util - - - diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java deleted file mode 100644 index a693ff1248..0000000000 --- a/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java +++ /dev/null @@ -1,341 +0,0 @@ -package se.scalablesolutions.akka.stm; - -import static org.multiverse.api.GlobalStmInstance.getGlobalStmInstance; -import org.multiverse.api.Stm; -import static org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction; -import static org.multiverse.api.ThreadLocalTransaction.setThreadLocalTransaction; -import org.multiverse.api.Transaction; -import org.multiverse.api.TransactionStatus; -import org.multiverse.api.exceptions.CommitFailureException; -import org.multiverse.api.exceptions.LoadException; -import org.multiverse.api.exceptions.RetryError; -import org.multiverse.api.exceptions.TooManyRetriesException; -import org.multiverse.templates.AbortedException; -import org.multiverse.utils.latches.CheapLatch; -import org.multiverse.utils.latches.Latch; - -import static java.lang.String.format; -import java.util.logging.Logger; - -/** - * A Template that handles the boilerplate code for transactions. A transaction will be placed if none is available - * around a section and if all goes right, commits at the end. - *

- * example: - *

- * new AtomicTemplate(){
- *    Object execute(Transaction t){
- *        queue.push(1);
- *        return null;
- *    }
- * }.execute();
- * 
- *

- * It could also be that the transaction is retried (e.g. caused by optimistic locking failures). This is also a task - * for template. In the future this retry behavior will be customizable. - *

- * If a transaction already is available on the TransactionThreadLocal, no new transaction is started and essentially - * the whole AtomicTemplate is ignored. - *

- * If no transaction is available on the TransactionThreadLocal, a new one will be created and used during the execution - * of the AtomicTemplate and will be removed once the AtomicTemplate finishes. - *

- * All uncaught throwable's lead to a rollback of the transaction. - *

- * AtomicTemplates are not thread-safe to use. - *

- * AtomicTemplates can completely work without threadlocals. See the {@link AtomicTemplate#AtomicTemplate(org.multiverse.api.Stm - * ,String, boolean, boolean, int)} for more information. - * - * @author Peter Veentjer - */ -public abstract class AtomicTemplate { - - private final static Logger logger = Logger.getLogger(AtomicTemplate.class.getName()); - - private final Stm stm; - private final boolean ignoreThreadLocalTransaction; - private final int retryCount; - private final boolean readonly; - private int attemptCount; - private final String familyName; - - /** - * Creates a new AtomicTemplate that uses the STM stored in the GlobalStm and works the the {@link - * org.multiverse.utils.ThreadLocalTransaction}. - */ - public AtomicTemplate() { - this(getGlobalStmInstance()); - } - - public AtomicTemplate(boolean readonly) { - this(getGlobalStmInstance(), null, false, readonly, Integer.MAX_VALUE); - } - - /** - * Creates a new AtomicTemplate using the provided stm. The transaction used is stores/retrieved from the {@link - * org.multiverse.utils.ThreadLocalTransaction}. - * - * @param stm the stm to use for transactions. - * @throws NullPointerException if stm is null. - */ - public AtomicTemplate(Stm stm) { - this(stm, null, false, false, Integer.MAX_VALUE); - } - - public AtomicTemplate(String familyName, boolean readonly, int retryCount) { - this(getGlobalStmInstance(), familyName, false, readonly, retryCount); - } - - /** - * Creates a new AtomicTemplate that uses the provided STM. This method is provided to make Multiverse easy to - * integrate with environment that don't want to depend on threadlocals. - * - * @param stm the stm to use for transactions. - * @param ignoreThreadLocalTransaction true if this Template should completely ignore the ThreadLocalTransaction. - * This is useful for using the AtomicTemplate in other environments that don't - * want to depend on threadlocals but do want to use the AtomicTemplate. - * @throws NullPointerException if stm is null. - */ - public AtomicTemplate(Stm stm, String familyName, boolean ignoreThreadLocalTransaction, boolean readonly, - int retryCount) { - if (stm == null) { - throw new NullPointerException(); - } - if (retryCount < 0) { - throw new IllegalArgumentException(); - } - this.stm = stm; - this.ignoreThreadLocalTransaction = ignoreThreadLocalTransaction; - this.readonly = readonly; - this.retryCount = retryCount; - this.familyName = familyName; - } - - public String getFamilyName() { - return familyName; - } - - /** - * Returns the current attempt. Value will always be larger than zero and increases everytime the transaction needs - * to be retried. - * - * @return the current attempt count. - */ - public final int getAttemptCount() { - return attemptCount; - } - - /** - * Returns the number of retries that this AtomicTemplate is allowed to do. The returned value will always be equal - * or larger than 0. - * - * @return the number of retries. - */ - public final int getRetryCount() { - return retryCount; - } - - /** - * Returns the {@link Stm} used by this AtomicTemplate to execute transactions on. - * - * @return the Stm used by this AtomicTemplate. - */ - public final Stm getStm() { - return stm; - } - - /** - * Check if this AtomicTemplate ignores the ThreadLocalTransaction. - * - * @return true if this AtomicTemplate ignores the ThreadLocalTransaction, false otherwise. - */ - public final boolean isIgnoreThreadLocalTransaction() { - return ignoreThreadLocalTransaction; - } - - /** - * Checks if this AtomicTemplate executes readonly transactions. - * - * @return true if it executes readonly transactions, false otherwise. - */ - public final boolean isReadonly() { - return readonly; - } - - /** - * This is the method can be overridden to do pre-start tasks. - */ - public void preStart() { - } - - /** - * This is the method can be overridden to do post-start tasks. - * - * @param t the transaction used for this execution. - */ - public void postStart(Transaction t) { - } - - /** - * This is the method can be overridden to do pre-commit tasks. - */ - public void preCommit() { - } - - /** - * This is the method can be overridden to do post-commit tasks. - */ - public void postCommit() { - } - - /** - * This is the method that needs to be implemented. - * - * @param t the transaction used for this execution. - * @return the result of the execution. - * - * @throws Exception the Exception thrown - */ - public abstract E execute(Transaction t) throws Exception; - - /** - * Executes the template. - * - * @return the result of the {@link #execute(org.multiverse.api.Transaction)} method. - * - * @throws InvisibleCheckedException if a checked exception was thrown while executing the {@link - * #execute(org.multiverse.api.Transaction)} method. - * @throws AbortedException if the exception was explicitly aborted. - * @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last - * failure (also an exception) is included as cause. So you have some idea where - * to look for problems - */ - public final E execute() { - try { - return executeChecked(); - } catch (Exception ex) { - if (ex instanceof RuntimeException) { - throw (RuntimeException) ex; - } else { - throw new AtomicTemplate.InvisibleCheckedException(ex); - } - } - } - - /** - * Executes the Template and rethrows the checked exception instead of wrapping it in a InvisibleCheckedException. - * - * @return the result - * - * @throws Exception the Exception thrown inside the {@link #execute(org.multiverse.api.Transaction)} - * method. - * @throws AbortedException if the exception was explicitly aborted. - * @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last - * failure (also an exception) is included as cause. So you have some idea where to - * look for problems - */ - public final E executeChecked() throws Exception { - preStart(); - Transaction t = getTransaction(); - if (noUsableTransaction(t)) { - t = startTransaction(); - setTransaction(t); - postStart(t); - try { - attemptCount = 1; - Exception lastRetryCause = null; - while (attemptCount - 1 <= retryCount) { - boolean abort = true; - boolean reset = false; - try { - E result = execute(t); - if (t.getStatus().equals(TransactionStatus.aborted)) { - String msg = format("Transaction with familyname %s is aborted", t.getFamilyName()); - throw new AbortedException(msg); - } - preCommit(); - t.commit(); - abort = false; - reset = false; - postCommit(); - return result; - } catch (RetryError e) { - Latch latch = new CheapLatch(); - t.abortAndRegisterRetryLatch(latch); - latch.awaitUninterruptible(); - //since the abort is already done, no need to do it again. - abort = false; - } catch (CommitFailureException ex) { - lastRetryCause = ex; - reset = true; - //ignore, just retry the transaction - } catch (LoadException ex) { - lastRetryCause = ex; - reset = true; - //ignore, just retry the transaction - } finally { - if (abort) { - t.abort(); - if (reset) { - t = t.abortAndReturnRestarted(); - setTransaction(t); - } - } - } - attemptCount++; - } - - throw new TooManyRetriesException("Too many retries", lastRetryCause); - } finally { - setTransaction(null); - } - } else { - return execute(t); - } - } - - private Transaction startTransaction() { - return readonly ? stm.startReadOnlyTransaction(familyName) : stm.startUpdateTransaction(familyName); - } - - private boolean noUsableTransaction(Transaction t) { - return t == null || t.getStatus() != TransactionStatus.active; - } - - /** - * Gets the current Transaction stored in the TransactionThreadLocal. - *

- * If the ignoreThreadLocalTransaction is set, the threadlocal stuff is completeley ignored. - * - * @return the found transaction, or null if none is found. - */ - private Transaction getTransaction() { - return ignoreThreadLocalTransaction ? null : getThreadLocalTransaction(); - } - - /** - * Stores the transaction in the TransactionThreadLocal. - *

- * This call is ignored if the ignoreThreadLocalTransaction is true. - * - * @param t the transaction to set (is allowed to be null). - */ - private void setTransaction(Transaction t) { - if (!ignoreThreadLocalTransaction) { - setThreadLocalTransaction(t); - } - } - - public static class InvisibleCheckedException extends RuntimeException { - - public InvisibleCheckedException(Exception cause) { - super(cause); - } - - @Override - public Exception getCause() { - return (Exception) super.getCause(); - } - } -} diff --git a/pom.xml b/pom.xml index 3cfc2839a8..d2885abf61 100644 --- a/pom.xml +++ b/pom.xml @@ -14,25 +14,27 @@ Akka implements a unique hybrid of: - * Actors , which gives you: - * Simple and high-level abstractions for concurrency and parallelism. - * Asynchronous, non-blocking and highly performant event-driven programming model. - * Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM). - * Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stop, systems that self-heal. - * Software Transactional Memory (STM). (Distributed transactions coming soon). - * Transactors: combine actors and STM into transactional actors. Allows you to compose atomic message flows with automatic rollback and retry. - * Remoting: highly performant distributed actors with remote supervision and error management. - * Cluster membership management. + * Actors , which gives you: + * Simple and high-level abstractions for concurrency and parallelism. + * Asynchronous, non-blocking and highly performant event-driven programming model. + * Very lightweight event-driven processes (create ~6.5 million actors on 4 G RAM). + * Supervision hierarchies with let-it-crash semantics. For writing highly fault-tolerant systems that never stop, + systems that self-heal. + * Software Transactional Memory (STM). (Distributed transactions coming soon). + * Transactors: combine actors and STM into transactional actors. Allows you to compose atomic message flows with + automatic rollback and retry. + * Remoting: highly performant distributed actors with remote supervision and error management. + * Cluster membership management. Akka also has a set of add-on modules: - * Persistence: A set of pluggable back-end storage modules that works in sync with the STM. - * Cassandra distributed and highly scalable database. - * MongoDB document database. - * Redis data structures database (upcoming) - * REST (JAX-RS): Expose actors as REST services. - * Comet: Expose actors as Comet services. - * Security: Digest and Kerberos based security. - * Microkernel: Run Akka as a stand-alone kernel. + * Persistence: A set of pluggable back-end storage modules that works in sync with the STM. + * Cassandra distributed and highly scalable database. + * MongoDB document database. + * Redis data structures database (upcoming) + * REST (JAX-RS): Expose actors as REST services. + * Comet: Expose actors as Comet services. + * Security: Digest and Kerberos based security. + * Microkernel: Run Akka as a stand-alone kernel. @@ -155,15 +157,9 @@ http://www.lag.net/repo - multiverse-releases - http://multiverse.googlecode.com/svn/maven-repository/releases - - false - - - - multiverse-snaphosts - http://multiverse.googlecode.com/svn/maven-repository/snapshots + repository.codehaus.org + Codehaus Maven Repository + http://repository.codehaus.org maven2-repository.dev.java.net @@ -258,7 +254,7 @@ src/main/scala src/test/scala - + org.apache.maven.plugins maven-enforcer-plugin 1.0-beta-1 @@ -276,7 +272,7 @@ - ${env.AKKA_HOME}/embedded-repo + ${env.AKKA_HOME}/embedded-repo From 28c6cc7b54e79e6813b5691733b34219fc8fd3e6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 23 Feb 2010 11:15:37 +0100 Subject: [PATCH 3/8] renamed actor api --- akka-core/src/main/scala/actor/Actor.scala | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a6d9231b10..d6185b65a3 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -98,9 +98,7 @@ object Actor extends Logging { * The actor is started when created. * Example: *

-   * import Actor._
-   *
-   * val a = actor  {
+   * val a = Actor.init  {
    *   ... // init stuff
    * } receive  {
    *   case msg => ... // handle message
@@ -108,7 +106,7 @@ object Actor extends Logging {
    * 
* */ - def actor[A](body: => Unit) = { + def init[A](body: => Unit) = { def handler[A](body: => Unit) = new { def receive(handler: PartialFunction[Any, Unit]) = new Actor() { start From 21c53c2ffda8a1002712e2923e78c73eb959f9fb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Tue, 23 Feb 2010 19:49:01 +0100 Subject: [PATCH 4/8] Upgraded to Multiverse 0.4 and its 2PC CommitBarriers, all tests pass --- akka-core/pom.xml | 23 --- akka-core/src/main/scala/actor/Actor.scala | 81 ++++++--- .../src/main/scala/dispatch/Reactor.scala | 9 +- .../src/main/scala/stm/Transaction.scala | 158 ++++++++---------- .../scala/stm/TransactionManagement.scala | 75 ++++++--- .../main/scala/stm/TransactionalState.scala | 2 +- .../src/test/scala/InMemoryActorTest.scala | 28 ++-- akka-core/src/test/scala/ShutdownSpec.scala | 5 +- .../akka/api/InMemNestedStateTest.java | 3 +- akka-patterns/src/main/scala/Agent.scala | 24 +-- .../src/test/scala/ActorPatternsTest.scala | 6 +- akka-patterns/src/test/scala/AgentTest.scala | 27 +-- .../src/main/scala/Storage.scala | 22 +-- akka.iml | 13 +- config/akka-reference.conf | 6 +- pom.xml | 11 +- 16 files changed, 259 insertions(+), 234 deletions(-) diff --git a/akka-core/pom.xml b/akka-core/pom.xml index 5d58430aa6..d2116cd0f1 100644 --- a/akka-core/pom.xml +++ b/akka-core/pom.xml @@ -50,29 +50,6 @@ org.multiverse multiverse-alpha 0.4-SNAPSHOT - jar-with-dependencies - - - org.multiverse - multiverse-core - - - asm - asm-tree - - - asm - asm-analysis - - - asm - asm-commons - - - asm - asm-util - - org.scala-tools diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index d6185b65a3..2919cc5c71 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.stm.Transaction._ import se.scalablesolutions.akka.stm.TransactionManagement._ -import se.scalablesolutions.akka.stm.{StmException, TransactionManagement} +import se.scalablesolutions.akka.stm.TransactionManagement import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory} import se.scalablesolutions.akka.serialization.Serializer @@ -21,6 +21,7 @@ import org.multiverse.api.ThreadLocalTransaction._ import java.util.{Queue, HashSet} import java.util.concurrent.ConcurrentLinkedQueue import java.net.InetSocketAddress +import org.multiverse.commitbarriers.CountDownCommitBarrier /** * Implements the Transactor abstraction. E.g. a transactional actor. @@ -72,7 +73,7 @@ object Actor extends Logging { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") val PORT = config.getInt("akka.remote.server.port", 9999) - object Sender{ + object Sender { implicit val Self: Option[Actor] = None } @@ -193,7 +194,7 @@ object Actor extends Logging { */ trait Actor extends TransactionManagement { implicit protected val self: Option[Actor] = Some(this) - implicit protected val transactionFamily: String = this.getClass.getName + implicit protected val transactionFamilyName: String = this.getClass.getName // Only mutable for RemoteServer in order to maintain identity across nodes private[akka] var _uuid = UUID.newUuid.toString @@ -216,6 +217,7 @@ trait Actor extends TransactionManagement { private[akka] var _replyToAddress: Option[InetSocketAddress] = None private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation] + // ==================================== // protected fields // ==================================== @@ -333,8 +335,8 @@ trait Actor extends TransactionManagement { /** * User overridable callback/setting. * - * Partial function implementing the server logic. - * To be implemented by subclassing server. + * Partial function implementing the actor logic. + * To be implemented by subclassing actor. *

* Example code: *

@@ -782,6 +784,11 @@ trait Actor extends TransactionManagement {
   }
 
   protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
+    if (isTransactionSetInScope) {
+      log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
+      getTransactionSetInScope.incParties
+    }
+
     if (_remoteAddress.isDefined) {
       val requestBuilder = RemoteRequest.newBuilder
           .setId(RemoteRequestIdFactory.nextId)
@@ -793,8 +800,7 @@ trait Actor extends TransactionManagement {
           .setIsEscaped(false)
       
       val id = registerSupervisorAsRemoteActor
-      if (id.isDefined)
-        requestBuilder.setSupervisorUuid(id.get)
+      if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
 
       // set the source fields used to reply back to the original sender
       // (i.e. not the remote proxy actor)
@@ -813,18 +819,24 @@ trait Actor extends TransactionManagement {
       RemoteProtocolBuilder.setMessage(message, requestBuilder)
       RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
     } else {
-      val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
+      val invocation = new MessageInvocation(this, message, None, sender, transactionSet.get)
       if (_isEventBased) {
         _mailbox.add(invocation)
         if (_isSuspended) invocation.send
       } else invocation.send
     }
+    clearTransactionSet
   }
 
   protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
       message: Any, 
       timeout: Long,
       senderFuture: Option[CompletableFutureResult]): CompletableFutureResult = {
+    if (isTransactionSetInScope) {
+      log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)    
+      getTransactionSetInScope.incParties
+    }
+    
     if (_remoteAddress.isDefined) {
       val requestBuilder = RemoteRequest.newBuilder
           .setId(RemoteRequestIdFactory.nextId)
@@ -838,16 +850,18 @@ trait Actor extends TransactionManagement {
       val id = registerSupervisorAsRemoteActor
       if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
       val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
+      clearTransactionSet
       if (future.isDefined) future.get
       else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
     } else {
       val future = if (senderFuture.isDefined) senderFuture.get
                    else new DefaultCompletableFutureResult(timeout)
-      val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
+      val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get)
       if (_isEventBased) {
         _mailbox.add(invocation)
         invocation.send
       } else invocation.send
+      clearTransactionSet
       future
     }
   }
@@ -856,6 +870,7 @@ trait Actor extends TransactionManagement {
    * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
    */
   private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
+    log.trace("%s is invoked with message %s", toString, messageHandle)
     try {
       if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
       else dispatch(messageHandle)
@@ -867,7 +882,7 @@ trait Actor extends TransactionManagement {
   }
 
   private def dispatch[T](messageHandle: MessageInvocation) = {
-    setTransaction(messageHandle.tx)
+    setTransactionSet(messageHandle.transactionSet)
 
     val message = messageHandle.message //serializeMessage(messageHandle.message)
     senderFuture = messageHandle.future
@@ -889,43 +904,55 @@ trait Actor extends TransactionManagement {
   }
 
   private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
-    setTransaction(messageHandle.tx)
+    var topLevelTransaction = false
+    val txSet: Option[CountDownCommitBarrier] =
+      if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
+      else {
+        topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
+        if (isTransactionRequiresNew) {
+          log.trace("Creating a new transaction set (top-level transaction) \nfor actor %s \nwith message %s", toString, messageHandle)
+          Some(createNewTransactionSet)
+        } else None
+      }
+    setTransactionSet(txSet)
 
     val message = messageHandle.message //serializeMessage(messageHandle.message)
     senderFuture = messageHandle.future
     sender = messageHandle.sender
 
+    def clearTx = {
+      clearTransactionSet
+      clearTransaction
+    }
+
     def proceed = {
-      try {
-        incrementTransaction
-        if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
-        else throw new IllegalArgumentException(
-          "Actor " + toString + " could not process message [" + message + "]" +
-           "\n\tsince no matching 'case' clause in its 'receive' method could be found")
-      } finally {
-        decrementTransaction
-      }
+      if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+      else throw new IllegalArgumentException(
+        toString + " could not process message [" + message + "]" +
+        "\n\tsince no matching 'case' clause in its 'receive' method could be found")
+      setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
     }
 
     try {
-      if (isTransactionRequiresNew && !isTransactionInScope) {
-        //if (senderFuture.isEmpty) throw new StmException(
-        //  "Can't continue transaction in a one-way fire-forget message send" +
-        //  "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
-        //  "\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
+      if (isTransactionRequiresNew) {
         atomic {
           proceed
         }
       } else proceed
     } catch {
+      case e: IllegalStateException => {}
       case e =>
+        // abort transaction set
+        if (isTransactionSetInScope) try { getTransactionSetInScope.abort } catch { case e: IllegalStateException => {} }
         Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
+
         if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
-        clearTransaction // need to clear currentTransaction before call to supervisor
+        clearTx  // need to clear currentTransaction before call to supervisor
+
         // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
         if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
     } finally {
-      clearTransaction
+      clearTx
     }
   }
 
diff --git a/akka-core/src/main/scala/dispatch/Reactor.scala b/akka-core/src/main/scala/dispatch/Reactor.scala
index f7bfa52215..b5d4d634f6 100644
--- a/akka-core/src/main/scala/dispatch/Reactor.scala
+++ b/akka-core/src/main/scala/dispatch/Reactor.scala
@@ -7,16 +7,17 @@ package se.scalablesolutions.akka.dispatch
 import java.util.List
 
 import se.scalablesolutions.akka.util.{HashCode, Logging}
-import se.scalablesolutions.akka.stm.Transaction
 import se.scalablesolutions.akka.actor.Actor
 
 import java.util.concurrent.ConcurrentHashMap
 
+import org.multiverse.commitbarriers.CountDownCommitBarrier
+
 final class MessageInvocation(val receiver: Actor,
                               val message: Any,
                               val future: Option[CompletableFutureResult],
                               val sender: Option[Actor],
-                              val tx: Option[Transaction]) {
+                              val transactionSet: Option[CountDownCommitBarrier]) {
   if (receiver eq null) throw new IllegalArgumentException("receiver is null")
 
   def invoke = receiver.invoke(this)
@@ -37,13 +38,13 @@ final class MessageInvocation(val receiver: Actor,
     that.asInstanceOf[MessageInvocation].message == message
   }
 
-  override def toString(): String = synchronized {
+  override def toString = synchronized {
     "MessageInvocation[" +
      "\n\tmessage = " + message +
      "\n\treceiver = " + receiver +
      "\n\tsender = " + sender +
      "\n\tfuture = " + future +
-     "\n\ttx = " + tx +
+     "\n\ttransactionSet = " + transactionSet +
      "\n]"
   }
 }
diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala
index d535f98433..133c292a6f 100644
--- a/akka-core/src/main/scala/stm/Transaction.scala
+++ b/akka-core/src/main/scala/stm/Transaction.scala
@@ -7,16 +7,18 @@ package se.scalablesolutions.akka.stm
 import java.util.concurrent.atomic.AtomicLong
 import java.util.concurrent.atomic.AtomicInteger
 
+import scala.collection.mutable.HashMap
+
 import se.scalablesolutions.akka.state.Committable
 import se.scalablesolutions.akka.util.Logging
 
 import org.multiverse.api.{Transaction => MultiverseTransaction}
 import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
 import org.multiverse.api.ThreadLocalTransaction._
-import org.multiverse.commitbarriers.VetoCommitBarrier
-
-import scala.collection.mutable.HashMap
 import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
+import org.multiverse.utils.backoff.ExponentialBackoffPolicy
+import org.multiverse.stms.alpha.AlphaStm
+import java.util.concurrent.TimeUnit
 
 class NoTransactionInScopeException extends RuntimeException
 class TransactionRetryException(message: String) extends RuntimeException(message)
@@ -98,21 +100,42 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
  */
 object Transaction extends TransactionManagement {
   val idFactory = new AtomicLong(-1L)
+/*
+  import AlphaStm._
+  private val defaultTxBuilder = new AlphaTransactionFactoryBuilder
+  defaultTxBuilder.setReadonly(false)
+  defaultTxBuilder.setInterruptible(INTERRUPTIBLE)
+  defaultTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
+  defaultTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
+  defaultTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
+  defaultTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
+  defaultTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
+  private val readOnlyTxBuilder = new AlphaStm.AlphaTransactionFactoryBuilder
+  readOnlyTxBuilder.setReadonly(true)
+  readOnlyTxBuilder.setInterruptible(INTERRUPTIBLE)
+  readOnlyTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
+  readOnlyTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
+  readOnlyTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
+  readOnlyTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
+  readOnlyTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
+  */
+  /**
+   * See ScalaDoc on class.
+   */
+  def map[T](f: => T)(implicit transactionFamilyName: String): T =
+    atomic {f}
 
   /**
    * See ScalaDoc on class.
    */
-  def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
+  def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
+    atomic {f}
 
   /**
    * See ScalaDoc on class.
    */
-  def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic {f(getTransactionInScope)}
-
-  /**
-   * See ScalaDoc on class.
-   */
-  def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic {f(getTransactionInScope)}
+  def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
+    atomic {f}
 
   /**
    * Creates a "pure" STM atomic transaction and by-passes all transactions hooks
@@ -127,82 +150,39 @@ object Transaction extends TransactionManagement {
    * See ScalaDoc on class.
    */
   def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
+    //    defaultTxBuilder.setFamilyName(transactionFamilyName)
+    //    new TransactionTemplate[T](defaultTxBuilder.build) {
     new TransactionTemplate[T]() { // FIXME take factory
-      def execute(mtx: MultiverseTransaction): T = body
+      def execute(mtx: MultiverseTransaction): T = {
+        val result = body
+
+        log.trace("Committing transaction [%s] \nwith family name [%s] \nby joining transaction set")
+        getTransactionSetInScope.joinCommit(mtx)
+
+        // FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS) 
+        //getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
+
+        clearTransaction
+        result
+      }
 
       override def onStart(mtx: MultiverseTransaction) = {
+        val txSet = if (!isTransactionSetInScope) createNewTransactionSet
+                    else getTransactionSetInScope
         val tx = new Transaction
         tx.transaction = Some(mtx)
         setTransaction(Some(tx))
-      }
 
-      override def onPostCommit = {
-        if (isTransactionInScope) getTransactionInScope.commit
-        else throw new IllegalStateException("No transaction in scope")
+        txSet.registerOnCommitTask(new Runnable() {
+          def run = tx.commit
+        })
+        txSet.registerOnAbortTask(new Runnable() {
+          def run = tx.abort
+        })
       }
     }.execute()
   }
 
-  /**
-   * See ScalaDoc on class.
-   */
-  def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
-    new TransactionTemplate[T]() { // FIXME take factory
-      def execute(mtx: MultiverseTransaction): T = body
-
-      override def onStart(mtx: MultiverseTransaction) = {
-        val tx = new Transaction
-        tx.transaction = Some(mtx)
-        setTransaction(Some(tx))
-      }
-
-      override def onPostCommit = {
-        if (isTransactionInScope) getTransactionInScope.commit
-        else throw new IllegalStateException("No transaction in scope")
-      }
-    }.execute
-  }
-
-  /**
-   * See ScalaDoc on class.
-   */
-  def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
-    new TransactionTemplate[T]() { // FIXME take factory
-      def execute(mtx: MultiverseTransaction): T = body
-
-      override def onStart(mtx: MultiverseTransaction) = {
-        val tx = new Transaction
-        tx.transaction = Some(mtx)
-        setTransaction(Some(tx))
-      }
-
-      override def onPostCommit = {
-        if (isTransactionInScope) getTransactionInScope.commit
-        else throw new IllegalStateException("No transaction in scope")
-      }
-    }.execute
-  }
-
-  /**
-   * See ScalaDoc on class.
-   */
-  def atomicReadOnly[T](body: => T): T = {
-    new TransactionTemplate[T]() { // FIXME take factory
-      def execute(mtx: MultiverseTransaction): T = body
-
-      override def onStart(mtx: MultiverseTransaction) = {
-        val tx = new Transaction
-        tx.transaction = Some(mtx)
-        setTransaction(Some(tx))
-      }
-
-      override def onPostCommit = {
-        if (isTransactionInScope) getTransactionInScope.commit
-        else throw new IllegalStateException("No transaction in scope")
-      }
-    }.execute
-  }
-
   /**
    * See ScalaDoc on class.
    */
@@ -215,7 +195,6 @@ object Transaction extends TransactionManagement {
   def elseBody[A](firstBody: => A) = new {
     def orElse(secondBody: => A) = new OrElseTemplate[A] {
       def run(t: MultiverseTransaction) = firstBody
-
       def orelserun(t: MultiverseTransaction) = secondBody
     }.execute()
   }
@@ -227,37 +206,34 @@ object Transaction extends TransactionManagement {
 @serializable class Transaction extends Logging {
   import Transaction._
 
+  log.trace("Creating %s", toString)
   val id = Transaction.idFactory.incrementAndGet
   @volatile private[this] var status: TransactionStatus = TransactionStatus.New
   private[akka] var transaction: Option[MultiverseTransaction] = None
   private[this] val persistentStateMap = new HashMap[String, Committable]
   private[akka] val depth = new AtomicInteger(0)
-  //private[akka] val transactionSet = new VetoCommitBarrier
-
-  //transactionSet.vetoCommit(this)
 
   // --- public methods ---------
 
-  def abort = synchronized {
-  //  transactionSet.abort
-  }
-
   def commit = synchronized {
+    log.trace("Committing transaction %s", toString)    
     pureAtomic {
       persistentStateMap.values.foreach(_.commit)
-      TransactionManagement.clearTransaction
     }
-    //transactionSet.vetoCommit(this)
     status = TransactionStatus.Completed
   }
 
-  def isNew = synchronized {status == TransactionStatus.New}
+  def abort = synchronized {
+    log.trace("Aborting transaction %s", toString)    
+  }
 
-  def isActive = synchronized {status == TransactionStatus.Active}
+  def isNew = synchronized { status == TransactionStatus.New }
 
-  def isCompleted = synchronized {status == TransactionStatus.Completed}
+  def isActive = synchronized { status == TransactionStatus.Active }
 
-  def isAborted = synchronized {status == TransactionStatus.Aborted}
+  def isCompleted = synchronized { status == TransactionStatus.Completed }
+
+  def isAborted = synchronized { status == TransactionStatus.Aborted }
 
   // --- internal methods ---------
 
@@ -300,9 +276,9 @@ object Transaction extends TransactionManagement {
         that.asInstanceOf[Transaction].id == this.id
   }
 
-  override def hashCode(): Int = synchronized {id.toInt}
+  override def hashCode(): Int = synchronized { id.toInt }
 
-  override def toString(): String = synchronized {"Transaction[" + id + ", " + status + "]"}
+  override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
 }
 
 /**
diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala
index 1f2ede3024..60a6ae6de3 100644
--- a/akka-core/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-core/src/main/scala/stm/TransactionManagement.scala
@@ -9,53 +9,80 @@ import java.util.concurrent.atomic.AtomicBoolean
 import se.scalablesolutions.akka.util.Logging
 
 import org.multiverse.api.ThreadLocalTransaction._
+import org.multiverse.commitbarriers.CountDownCommitBarrier
 
 class StmException(msg: String) extends RuntimeException(msg)
 
-class TransactionAwareWrapperException(
-    val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
-  override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
+class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
+  override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
 }
 
 object TransactionManagement extends TransactionManagement {
   import se.scalablesolutions.akka.Config._
 
-  val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
-  val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
-
+  val TRANSACTION_ENABLED =      new AtomicBoolean(config.getBool("akka.stm.service", false))
+  val FAIR_TRANSACTIONS =        config.getBool("akka.stm.fair", true)
+  val INTERRUPTIBLE =            config.getBool("akka.stm.interruptible", true)
+  val MAX_NR_OF_RETRIES =        config.getInt("akka.stm.max-nr-of-retries", 1000)
+  val TRANSACTION_TIMEOUT =      config.getInt("akka.stm.timeout", 10000)
+  val SMART_TX_LENGTH_SELECTOR = config.getBool("akka.stm.smart-tx-length-selector", true)
   def isTransactionalityEnabled = TRANSACTION_ENABLED.get
+
   def disableTransactions = TRANSACTION_ENABLED.set(false)
 
-  private[akka] val currentTransaction = new ThreadLocal[Option[Transaction]]() {
+  private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
+    override protected def initialValue: Option[CountDownCommitBarrier] = None
+  }
+
+  private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
     override protected def initialValue: Option[Transaction] = None
   }
+
+  private[akka] def getTransactionSet: CountDownCommitBarrier = {
+    val option = transactionSet.get
+    if ((option eq null) || option.isEmpty) throw new IllegalStateException("No TransactionSet in scope")
+    option.get
+  }
+
+  private[akka] def getTransaction: Transaction = {
+    val option = transaction.get
+    if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction in scope")
+    option.get
+  }
 }
 
 trait TransactionManagement extends Logging {
-  import TransactionManagement.currentTransaction
 
-  private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
-
-  private[akka] def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) {
-    val tx = transaction.get
-    currentTransaction.set(transaction)
-    if (tx.transaction.isDefined) setThreadLocalTransaction(tx.transaction.get)
-    else throw new IllegalStateException("No transaction defined")
+  private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
+    val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
+    TransactionManagement.transactionSet.set(Some(txSet))
+    txSet
   }
 
+  private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) =
+    if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet)
+
+  private[akka] def setTransaction(tx: Option[Transaction]) =
+    if (tx.isDefined) TransactionManagement.transaction.set(tx)
+
+  private[akka] def clearTransactionSet = TransactionManagement.transactionSet.set(None)
+
   private[akka] def clearTransaction = {
-    currentTransaction.set(None)
+    TransactionManagement.transaction.set(None)
     setThreadLocalTransaction(null)
   }
 
-  private[akka] def getTransactionInScope = currentTransaction.get.get
-  
-  private[akka] def isTransactionInScope = currentTransaction.get.isDefined
+  private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet
 
-  private[akka] def isTransactionTopLevel = if (isTransactionInScope) getTransactionInScope.isTopLevel
+  private[akka] def getTransactionInScope = TransactionManagement.getTransaction
 
-  private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
-
-  private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
-}
+  private[akka] def isTransactionSetInScope = {
+    val option = TransactionManagement.transactionSet.get
+    (option ne null) && option.isDefined
+  }
 
+  private[akka] def isTransactionInScope = {
+    val option = TransactionManagement.transaction.get
+    (option ne null) && option.isDefined
+  }
+}
\ No newline at end of file
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 195b134271..1b52faf969 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -77,7 +77,7 @@ class TransactionalRef[T] extends Transactional {
   implicit val txInitName = "TransactionalRef:Init"
   val uuid = UUID.newUuid.toString
 
-  private[this] val ref: AlphaRef[T] = atomic { new AlphaRef }
+  private[this] lazy val ref: AlphaRef[T] = new AlphaRef
 
   def swap(elem: T) = {
     ensureIsInTransaction
diff --git a/akka-core/src/test/scala/InMemoryActorTest.scala b/akka-core/src/test/scala/InMemoryActorTest.scala
index cd06b80d0a..d4be98fcaa 100644
--- a/akka-core/src/test/scala/InMemoryActorTest.scala
+++ b/akka-core/src/test/scala/InMemoryActorTest.scala
@@ -23,7 +23,7 @@ case class SuccessOneWay(key: String, value: String)
 case class FailureOneWay(key: String, value: String, failer: Actor)
 
 class InMemStatefulActor extends Actor {
-  timeout = 100000
+  timeout = 5000
   makeTransactionRequired
 
   private lazy val mapState = TransactionalState.newMap[String, String]
@@ -86,8 +86,8 @@ class InMemFailerActor extends Actor {
 }
                                                         
 class InMemoryActorTest extends JUnitSuite {
+  import Actor.Sender.Self
 
-  /*
   @Test
   def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
     val stateful = new InMemStatefulActor
@@ -98,7 +98,7 @@ class InMemoryActorTest extends JUnitSuite {
     Thread.sleep(1000)
     assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
   }
-  */
+
   @Test
   def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
     val stateful = new InMemStatefulActor
@@ -107,7 +107,7 @@ class InMemoryActorTest extends JUnitSuite {
     stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
     assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
   }
-  /*
+
   @Test
   def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
     val stateful = new InMemStatefulActor
@@ -120,7 +120,7 @@ class InMemoryActorTest extends JUnitSuite {
     Thread.sleep(1000)
     assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
   }
-  */
+
   @Test
   def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
     val stateful = new InMemStatefulActor
@@ -134,7 +134,7 @@ class InMemoryActorTest extends JUnitSuite {
     } catch {case e: RuntimeException => {}}
     assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
   }
-  /*
+
   @Test
   def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
     val stateful = new InMemStatefulActor
@@ -145,7 +145,7 @@ class InMemoryActorTest extends JUnitSuite {
     Thread.sleep(1000)
     assert(2 === (stateful !! GetVectorSize).get)
   }
-  */
+
   @Test
   def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
     val stateful = new InMemStatefulActor
@@ -154,7 +154,7 @@ class InMemoryActorTest extends JUnitSuite {
     stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
     assert(2 === (stateful !! GetVectorSize).get)
   }
-  /*
+
   @Test
   def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
     val stateful = new InMemStatefulActor
@@ -167,7 +167,7 @@ class InMemoryActorTest extends JUnitSuite {
     Thread.sleep(1000)
     assert(1 === (stateful !! GetVectorSize).get)
   }
-  */
+
   @Test
   def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
     val stateful = new InMemStatefulActor
@@ -181,7 +181,7 @@ class InMemoryActorTest extends JUnitSuite {
     } catch {case e: RuntimeException => {}}
     assert(1 === (stateful !! GetVectorSize).get)
   }
-  /*
+
   @Test
   def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
     val stateful = new InMemStatefulActor
@@ -192,7 +192,7 @@ class InMemoryActorTest extends JUnitSuite {
     Thread.sleep(1000)
     assert("new state" === (stateful !! GetRefState).get)
   }
-  */
+
   @Test
   def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
     val stateful = new InMemStatefulActor
@@ -201,7 +201,7 @@ class InMemoryActorTest extends JUnitSuite {
     stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
     assert("new state" === (stateful !! GetRefState).get)
   }
-  /*
+
   @Test
   def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
     val stateful = new InMemStatefulActor
@@ -212,9 +212,9 @@ class InMemoryActorTest extends JUnitSuite {
     failer.start
     stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
     Thread.sleep(1000)
-    assert("init" === (stateful !! GetRefState).get) // check that state is == init state
+    assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
   }
-  */
+
   @Test
   def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
     val stateful = new InMemStatefulActor
diff --git a/akka-core/src/test/scala/ShutdownSpec.scala b/akka-core/src/test/scala/ShutdownSpec.scala
index ba03fbe902..20927bbfb1 100644
--- a/akka-core/src/test/scala/ShutdownSpec.scala
+++ b/akka-core/src/test/scala/ShutdownSpec.scala
@@ -2,9 +2,8 @@ package se.scalablesolutions.akka.remote
 
 import se.scalablesolutions.akka.actor.Actor
 
-object ActorShutdownSpec {
+object ActorShutdownRunner {
   def main(args: Array[String]) {
-
     class MyActor extends Actor {
       def receive = {
         case "test" => println("received test")
@@ -22,7 +21,7 @@ object ActorShutdownSpec {
 
 // case 2
 
-object RemoteServerAndClusterShutdownSpec {
+object RemoteServerAndClusterShutdownRunner {
   def main(args: Array[String]) {
     val s1 = new RemoteServer
     val s2 = new RemoteServer
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 8a51feed6b..366403ef46 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -11,7 +11,7 @@ import static se.scalablesolutions.akka.config.JavaConfig.*;
 import se.scalablesolutions.akka.actor.*;
 import se.scalablesolutions.akka.Kernel;
 import junit.framework.TestCase;
-/*
+
 public class InMemNestedStateTest extends TestCase {
   static String messageLog = "";
 
@@ -133,4 +133,3 @@ public class InMemNestedStateTest extends TestCase {
     assertEquals("init", nested.getRefState()); // check that state is == init state
   }
 }
-*/
\ No newline at end of file
diff --git a/akka-patterns/src/main/scala/Agent.scala b/akka-patterns/src/main/scala/Agent.scala
index 4dd8640c32..aea74530a3 100644
--- a/akka-patterns/src/main/scala/Agent.scala
+++ b/akka-patterns/src/main/scala/Agent.scala
@@ -50,30 +50,30 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
   * Periodically handles incoming messages
   */
   def receive = {
-    case FunctionHolder(fun: (T => T)) => atomic { updateData(fun(value.getOrWait)) }
+    case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
 
     case ValueHolder(x: T) => updateData(x)
 
-    case ProcedureHolder(fun: (T => Unit)) => atomic { fun(copyStrategy(value.getOrWait)) }
+    case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
   }
 
   /**
    * Specifies how a copy of the value is made, defaults to using identity
    */
-  protected def copyStrategy(t : T) : T = t
+  protected def copyStrategy(t: T): T = t
 
 
   /**
   * Updates the internal state with the value provided as a by-name parameter
   */
-  private final def updateData(newData: => T) : Unit = atomic { value.swap(newData) }
+  private final def updateData(newData: => T): Unit = value.swap(newData)
 
   /**
   * Submits a request to read the internal state.
   * A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
   * Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
   */
-  final def get : T = {
+  final def get: T = {
     val ref = new AtomicReference[T]
     val latch = new CountDownLatch(1)
     get((x: T) => {ref.set(x); latch.countDown})
@@ -85,14 +85,14 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
   * Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
   * A copy of the internal state will be used, depending on the underlying effective copyStrategy.
   */
-  final def get(message: (T => Unit)) : Unit = this ! ProcedureHolder(message)
+  final def get(message: (T => Unit)): Unit = this ! ProcedureHolder(message)
 
   /**
   * Submits a request to read the internal state.
   * A copy of the internal state will be returned, depending on the underlying effective copyStrategy.
   * Internally leverages the asynchronous getValue() method and then waits for its result on a CountDownLatch.
   */
-  final def apply() : T = get
+  final def apply(): T = get
 
   /**
   * Asynchronously submits a request to read the internal state. The supplied function will be executed on the returned internal state value.
@@ -103,22 +103,22 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
   /**
   * Submits the provided function for execution against the internal agent's state
   */
-  final def apply(message: (T => T)) : Unit = this ! FunctionHolder(message)
+  final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
 
   /**
   * Submits a new value to be set as the new agent's internal state
   */
-  final def apply(message: T) : Unit = this ! ValueHolder(message)
+  final def apply(message: T): Unit = this ! ValueHolder(message)
 
   /**
   * Submits the provided function for execution against the internal agent's state
   */
-  final def update(message: (T => T)) : Unit = this ! FunctionHolder(message)
+  final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
 
   /**
   * Submits a new value to be set as the new agent's internal state
   */
-  final def update(message: T) : Unit = this ! ValueHolder(message)
+  final def update(message: T): Unit = this ! ValueHolder(message)
 }
 
 /**
@@ -135,7 +135,7 @@ object Agent {
   /**
    * Creates a new Agent of type T with the initial value of value
    */
-  def apply[T](value:T) : Agent[T] = new Agent(value)
+  def apply[T](value:T): Agent[T] = new Agent(value)
 
   /**
    * Creates a new Agent of type T with the initial value of value and with the specified copy function
diff --git a/akka-patterns/src/test/scala/ActorPatternsTest.scala b/akka-patterns/src/test/scala/ActorPatternsTest.scala
index 11f2664640..ae6ae5c0e8 100644
--- a/akka-patterns/src/test/scala/ActorPatternsTest.scala
+++ b/akka-patterns/src/test/scala/ActorPatternsTest.scala
@@ -21,12 +21,12 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
       val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
 
       var targetOk = 0
-      val t1 = actor() receive {
+      val t1: Actor = actor {
         case `testMsg1` => targetOk += 2
         case `testMsg2` => targetOk += 4
       }
 
-      val t2 = actor() receive {
+      val t2: Actor = actor {
           case `testMsg3` => targetOk += 8
       }
 
@@ -48,7 +48,7 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
   @Test def testLogger = verify(new TestActor {
     def test = {
       val msgs = new HashSet[Any]
-      val t1 = actor() receive {
+      val t1: Actor = actor {
         case _ =>
       }
       val l = loggerActor(t1,(x) => msgs += x)
diff --git a/akka-patterns/src/test/scala/AgentTest.scala b/akka-patterns/src/test/scala/AgentTest.scala
index 17ccce8e0a..25961f8222 100644
--- a/akka-patterns/src/test/scala/AgentTest.scala
+++ b/akka-patterns/src/test/scala/AgentTest.scala
@@ -7,18 +7,23 @@ import org.scalatest.junit.JUnitRunner
 import org.scalatest.matchers.MustMatchers
 import org.junit.{Test}
 
+/*
 @RunWith(classOf[JUnitRunner])
 class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
-  @Test def testAgent = verify(new TestActor {
-     def test = {
-        val t = Agent(5)
-        handle(t){
-        t.update( _ + 1 )
-        t.update( _ * 2 )
 
-        val r = t()
-        r must be (12)
-       }
-     }
+  @Test def testAgent = verify(new TestActor {
+    def test = {
+      atomic {
+        val t = Agent(5)
+        handle(t) {
+          t.update(_ + 1)
+          t.update(_ * 2)
+
+          val r = t()
+          r must be(12)
+        }
+      }
+    }
   })
-}
\ No newline at end of file
+}
+*/
\ No newline at end of file
diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
index dc55e0eca1..b4ea8fc381 100644
--- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
+++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala
@@ -4,7 +4,7 @@
 
 package se.scalablesolutions.akka.state
 
-import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
+import se.scalablesolutions.akka.stm.TransactionManagement.transaction
 import se.scalablesolutions.akka.collection._
 import se.scalablesolutions.akka.util.Logging
 
@@ -64,10 +64,6 @@ trait Storage {
     throw new UnsupportedOperationException
 }
 
-
-
-
-
 /**
  * Implementation of PersistentMap for every concrete 
  * storage will have the same workflow. This abstracts the workflow.
@@ -162,8 +158,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
   }
 
   private def register = {
-    if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
-    currentTransaction.get.get.register(uuid, this)
+    if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+    transaction.get.get.register(uuid, this)
   }
 }
 
@@ -236,8 +232,8 @@ trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Com
   def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
 
   private def register = {
-    if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
-    currentTransaction.get.get.register(uuid, this)
+    if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+    transaction.get.get.register(uuid, this)
   }
 }
 
@@ -272,8 +268,8 @@ trait PersistentRef[T] extends Transactional with Committable {
   }
 
   private def register = {
-    if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
-    currentTransaction.get.get.register(uuid, this)
+    if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+    transaction.get.get.register(uuid, this)
   }
 }
 
@@ -397,7 +393,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
     throw new UnsupportedOperationException("dequeueAll not supported")
 
   private def register = {
-    if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
-    currentTransaction.get.get.register(uuid, this)
+    if (transaction.get.isEmpty) throw new NoTransactionInScopeException
+    transaction.get.get.register(uuid, this)
   }
 }
diff --git a/akka.iml b/akka.iml
index 2f07a75716..a39c87020f 100644
--- a/akka.iml
+++ b/akka.iml
@@ -2,7 +2,18 @@
 
   
     
-      
+      
+        
+        
+      
     
   
   
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index 749b599e0b..296b06428b 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -30,8 +30,10 @@
  
   
     service = on
-    max-nr-of-retries = 100
-    distributed = off           # not implemented yet
+    fair = on                     # should transactions be fair or non-fair (non fair yield better performance)
+    max-nr-of-retries = 1000      # max nr of retries of a failing transaction before giving up
+    timeout = 10000               # transaction timeout; if transaction have not committed within the timeout then it is aborted
+    distributed = off             # not implemented yet
   
  
   
diff --git a/pom.xml b/pom.xml
index d2885abf61..480ad8723a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -161,6 +161,11 @@
       Codehaus Maven Repository
       http://repository.codehaus.org
     
+    
+      snapshots.repository.codehaus.org
+      Codehaus Maven Snapshot Repository
+      http://snapshots.repository.codehaus.org
+    
     
       maven2-repository.dev.java.net
       Java.net Repository for Maven
@@ -301,11 +306,11 @@
         
           
             **/*Test.java
-            
+            **/*Spec.java
           
-          
+          
           
             
               akka.home

From 36c0266a5d51be0b6f1189b706fbf2a4b1141552 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= 
Date: Thu, 4 Mar 2010 19:02:23 +0100
Subject: [PATCH 5/8] Redis tests now passes with new STM + misc minor changes
 to Cluster

---
 akka-core/src/main/scala/actor/Actor.scala    |   4 +-
 .../actor/BootableActorLoaderService.scala    |   4 +-
 .../remote/BootableRemoteActorService.scala   |   1 -
 akka-core/src/main/scala/remote/Cluster.scala |  33 +++---
 akka-kernel/src/main/scala/Kernel.scala       |   2 +-
 akka-patterns/src/main/scala/Patterns.scala   |  62 ++++++-----
 .../src/main/scala/Storage.scala              |   5 +-
 .../src/main/scala/RedisStorageBackend.scala  | 103 ++++++++++--------
 .../test/scala/RedisPersistentActorSpec.scala |   6 +-
 akka.iml                                      |   6 +
 config/akka-reference.conf                    |   1 +
 11 files changed, 131 insertions(+), 96 deletions(-)

diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala
index df536fb57e..a6b46d903d 100644
--- a/akka-core/src/main/scala/actor/Actor.scala
+++ b/akka-core/src/main/scala/actor/Actor.scala
@@ -309,9 +309,9 @@ trait Actor extends TransactionManagement {
    * If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
    * Can be one of:
    * 
-   *  AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+   *  faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
    *
-   *  OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
+   *  faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
    * 
*/ protected var faultHandler: Option[FaultHandlingStrategy] = None diff --git a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala index 1bacbf6f59..0c84d0965a 100644 --- a/akka-core/src/main/scala/actor/BootableActorLoaderService.scala +++ b/akka-core/src/main/scala/actor/BootableActorLoaderService.scala @@ -31,10 +31,10 @@ trait BootableActorLoaderService extends Bootable with Logging { val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList) new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader) - } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") ne null) { + } else if (getClass.getClassLoader.getResourceAsStream("aop.xml") ne null) { getClass.getClassLoader } else throw new IllegalStateException( - "AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting") + "AKKA_HOME is not defined and akka-.jar can not be found on the classpath; aborting...") ) } diff --git a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala index 429fdb61ec..1b56dcea5a 100644 --- a/akka-core/src/main/scala/remote/BootableRemoteActorService.scala +++ b/akka-core/src/main/scala/remote/BootableRemoteActorService.scala @@ -24,7 +24,6 @@ trait BootableRemoteActorService extends Bootable with Logging { abstract override def onLoad = { if(config.getBool("akka.remote.server.service", true)){ - log.info("Starting up Cluster Service") Cluster.start super.onLoad //Initialize BootableActorLoaderService before remote service log.info("Initializing Remote Actors Service...") diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index c2e9069a01..294ce5bd94 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -60,19 +60,18 @@ private[remote] object ClusterActor { abstract class BasicClusterActor extends ClusterActor { import ClusterActor._ - case class Message(sender : ADDR_T,msg : Array[Byte]) + case class Message(sender : ADDR_T, msg : Array[Byte]) case object PapersPlease extends ClusterMessage case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage case object Block extends ClusterMessage case object Unblock extends ClusterMessage - case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage + case class View(othersPresent: Set[ADDR_T]) extends ClusterMessage case class Zombie(address: ADDR_T) extends ClusterMessage case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage type ADDR_T - @volatile private var local: Node = Node(Nil) @volatile private var remotes: Map[ADDR_T, Node] = Map() @@ -206,17 +205,21 @@ abstract class BasicClusterActor extends ClusterActor { * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster with Logging { + lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName + @volatile private[remote] var clusterActor: Option[ClusterActor] = None @volatile private[remote] var supervisor: Option[Supervisor] = None + + // FIXME Use the supervisor member field - private[remote] lazy val serializer: Serializer = { - val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) - Class.forName(className).newInstance.asInstanceOf[Serializer] - } + private[remote] lazy val serializer: Serializer = + Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME)) + .newInstance.asInstanceOf[Serializer] - private[remote] def createClusterActor : Option[ClusterActor] = { + private[remote] def createClusterActor: Option[ClusterActor] = { val name = config.getString("akka.remote.cluster.actor") - + if (name.isEmpty) throw new IllegalArgumentException( + "Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined") try { name map { fqn => val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] @@ -225,7 +228,7 @@ object Cluster extends Cluster with Logging { } } catch { - case e => log.error(e,"Couldn't load Cluster provider: [%s]",name.getOrElse("Not specified")); None + case e => log.error(e, "Couldn't load Cluster provider: [%s]", name.getOrElse("Not specified")); None } } @@ -250,10 +253,11 @@ object Cluster extends Cluster with Logging { def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.foreach(_.relayMessage(to, msg)) - def foreach(f : (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f)) + def foreach(f: (RemoteAddress) => Unit) : Unit = clusterActor.foreach(_.foreach(f)) - def start : Unit = synchronized { - if(supervisor.isEmpty) { + def start: Unit = synchronized { + log.info("Starting up Cluster Service...") + if (supervisor.isEmpty) { for(actor <- createClusterActor; sup <- createSupervisor(actor)) { clusterActor = Some(actor) @@ -262,7 +266,8 @@ object Cluster extends Cluster with Logging { } } - def shutdown : Unit = synchronized { + def shutdown: Unit = synchronized { + log.info("Shutting down Cluster Service...") supervisor.foreach(_.stop) supervisor = None clusterActor = None diff --git a/akka-kernel/src/main/scala/Kernel.scala b/akka-kernel/src/main/scala/Kernel.scala index f63a50a0a7..7b2b2e4693 100644 --- a/akka-kernel/src/main/scala/Kernel.scala +++ b/akka-kernel/src/main/scala/Kernel.scala @@ -79,7 +79,7 @@ object Kernel extends Logging { (____ /__|_ \__|_ \(____ / \/ \/ \/ \/ """) - log.info(" Running version %s", Config.VERSION) + log.info(" Running version %s", Config.VERSION) log.info("==============================") } } diff --git a/akka-patterns/src/main/scala/Patterns.scala b/akka-patterns/src/main/scala/Patterns.scala index b967c07df7..9b7e55ccc9 100644 --- a/akka-patterns/src/main/scala/Patterns.scala +++ b/akka-patterns/src/main/scala/Patterns.scala @@ -3,14 +3,14 @@ package se.scalablesolutions.akka.actor.patterns import se.scalablesolutions.akka.actor.Actor object Patterns { - type PF[A,B] = PartialFunction[A,B] + type PF[A, B] = PartialFunction[A, B] /** * Creates a new PartialFunction whose isDefinedAt is a combination * of the two parameters, and whose apply is first to call filter.apply and then filtered.apply */ - def filter[A,B](filter : PF[A,Unit],filtered : PF[A,B]) : PF[A,B] = { - case a : A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) => + def filter[A, B](filter: PF[A, Unit], filtered: PF[A, B]): PF[A, B] = { + case a: A if filtered.isDefinedAt(a) && filter.isDefinedAt(a) => filter(a) filtered(a) } @@ -18,39 +18,42 @@ object Patterns { /** * Interceptor is a filter(x,y) where x.isDefinedAt is considered to be always true */ - def intercept[A,B](interceptor : (A) => Unit, interceptee : PF[A,B]) : PF[A,B] = filter( - { case a if a.isInstanceOf[A] => interceptor(a) }, - interceptee + def intercept[A, B](interceptor: (A) => Unit, interceptee: PF[A, B]): PF[A, B] = filter( + {case a if a.isInstanceOf[A] => interceptor(a)}, + interceptee ) - + //FIXME 2.8, use default params with CyclicIterator - def loadBalancerActor(actors : => InfiniteIterator[Actor]) : Actor = new Actor with LoadBalancer { + def loadBalancerActor(actors: => InfiniteIterator[Actor]): Actor = new Actor with LoadBalancer { val seq = actors } - def dispatcherActor(routing : PF[Any,Actor], msgTransformer : (Any) => Any) : Actor = new Actor with Dispatcher { - override def transform(msg : Any) = msgTransformer(msg) + def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher { + override def transform(msg: Any) = msgTransformer(msg) + def routes = routing } - - def dispatcherActor(routing : PF[Any,Actor]) : Actor = new Actor with Dispatcher { - def routes = routing + + def dispatcherActor(routing: PF[Any, Actor]): Actor = new Actor with Dispatcher { + def routes = routing } - def loggerActor(actorToLog : Actor, logger : (Any) => Unit) : Actor = dispatcherActor ( - { case _ => actorToLog }, + def loggerActor(actorToLog: Actor, logger: (Any) => Unit): Actor = dispatcherActor( + {case _ => actorToLog}, logger - ) + ) } -trait Dispatcher { self : Actor => +trait Dispatcher { + self: Actor => - protected def transform(msg : Any) : Any = msg - protected def routes : PartialFunction[Any,Actor] - - protected def dispatch : PartialFunction[Any,Unit] = { + protected def transform(msg: Any): Any = msg + + protected def routes: PartialFunction[Any, Actor] + + protected def dispatch: PartialFunction[Any, Unit] = { case a if routes.isDefinedAt(a) => { - if(self.sender.isDefined) + if (self.sender.isDefined) routes(a) forward transform(a) else routes(a) send transform(a) @@ -60,19 +63,22 @@ trait Dispatcher { self : Actor => def receive = dispatch } -trait LoadBalancer extends Dispatcher { self : Actor => - protected def seq : InfiniteIterator[Actor] +trait LoadBalancer extends Dispatcher { + self: Actor => + protected def seq: InfiniteIterator[Actor] - protected def routes = { case x if seq.hasNext => seq.next } + protected def routes = {case x if seq.hasNext => seq.next} } trait InfiniteIterator[T] extends Iterator[T] -class CyclicIterator[T](items : List[T]) extends InfiniteIterator[T] { - @volatile private[this] var current : List[T] = items +class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] { + @volatile private[this] var current: List[T] = items + def hasNext = items != Nil + def next = { - val nc = if(current == Nil) items else current + val nc = if (current == Nil) items else current current = nc.tail nc.head } diff --git a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala index b4ea8fc381..f52841b817 100644 --- a/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala +++ b/akka-persistence/akka-persistence-common/src/main/scala/Storage.scala @@ -8,10 +8,11 @@ import se.scalablesolutions.akka.stm.TransactionManagement.transaction import se.scalablesolutions.akka.collection._ import se.scalablesolutions.akka.util.Logging -import org.codehaus.aspectwerkz.proxy.Uuid - +// FIXME move to 'stm' package + add message with more info class NoTransactionInScopeException extends RuntimeException +class StorageException(message: String) extends RuntimeException(message) + /** * Example Scala usage. *

diff --git a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala index 48945d6b8c..be214087f3 100644 --- a/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala +++ b/akka-persistence/akka-persistence-redis/src/main/scala/RedisStorageBackend.scala @@ -72,11 +72,11 @@ private [akka] object RedisStorageBackend extends * base64(T1):base64("debasish.programming_language") -> "scala" * */ - def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) { + def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling { insertMapStorageEntriesFor(name, List((key, value))) } - def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) { + def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling { mset(entries.map(e => (makeRedisKey(name, e._1), new String(e._2)))) } @@ -89,22 +89,22 @@ private [akka] object RedisStorageBackend extends *

  • : is chosen since it cannot appear in base64 encoding charset
  • *
  • both parts of the key need to be based64 encoded since there can be spaces within each of them
  • */ - private [this] def makeRedisKey(name: String, key: Array[Byte]): String = { + private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling { "%s:%s".format(new String(encode(name.getBytes)), new String(encode(key))) } - private [this] def makeKeyFromRedisKey(redisKey: String) = { + private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling { val nk = redisKey.split(':').map{e: String => decode(e.getBytes)} (nk(0), nk(1)) } - private [this] def mset(entries: List[(String, String)]) { + private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling { entries.foreach {e: (String, String) => db.set(e._1, e._2) } } - def removeMapStorageFor(name: String): Unit = { + def removeMapStorageFor(name: String): Unit = withErrorHandling { db.keys("%s:*".format(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -113,18 +113,19 @@ private [akka] object RedisStorageBackend extends } } - def removeMapStorageFor(name: String, key: Array[Byte]): Unit = { + def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling { db.delete(makeRedisKey(name, key)) } - def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = + def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling { db.get(makeRedisKey(name, key)) match { case None => throw new Predef.NoSuchElementException(new String(key) + " not present") case Some(s) => Some(s.getBytes) } + } - def getMapStorageSizeFor(name: String): Int = { + def getMapStorageSizeFor(name: String): Int = withErrorHandling { db.keys("%s:*".format(new String(encode(name.getBytes)))) match { case None => 0 case Some(keys) => @@ -132,7 +133,7 @@ private [akka] object RedisStorageBackend extends } } - def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = { + def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling { db.keys("%s:*".format(new String(encode(name.getBytes)))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -143,7 +144,7 @@ private [akka] object RedisStorageBackend extends def getMapStorageRangeFor(name: String, start: Option[Array[Byte]], finish: Option[Array[Byte]], - count: Int): List[(Array[Byte], Array[Byte])] = { + count: Int): List[(Array[Byte], Array[Byte])] = withErrorHandling { import scala.collection.immutable.TreeMap val wholeSorted = @@ -188,19 +189,19 @@ private [akka] object RedisStorageBackend extends } } - def insertVectorStorageEntryFor(name: String, element: Array[Byte]) { + def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling { db.lpush(new String(encode(name.getBytes)), new String(element)) } - def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) { + def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling { elements.foreach(insertVectorStorageEntryFor(name, _)) } - def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) { + def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling { db.lset(new String(encode(name.getBytes)), index, new String(elem)) } - def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = { + def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling { db.lindex(new String(encode(name.getBytes)), index) match { case None => throw new Predef.NoSuchElementException(name + " does not have element at " + index) @@ -208,7 +209,7 @@ private [akka] object RedisStorageBackend extends } } - def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = { + def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling { /** * count is the max number of results to return. Start with * start or 0 (if start is not defined) and go until @@ -237,11 +238,11 @@ private [akka] object RedisStorageBackend extends } } - def insertRefStorageFor(name: String, element: Array[Byte]) { + def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling { db.set(new String(encode(name.getBytes)), new String(element)) } - def getRefStorageFor(name: String): Option[Array[Byte]] = { + def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling { db.get(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -250,12 +251,13 @@ private [akka] object RedisStorageBackend extends } // add to the end of the queue - def enqueue(name: String, item: Array[Byte]): Boolean = { + def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling { db.rpush(new String(encode(name.getBytes)), new String(item)) } + // pop from the front of the queue - def dequeue(name: String): Option[Array[Byte]] = { + def dequeue(name: String): Option[Array[Byte]] = withErrorHandling { db.lpop(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -265,7 +267,7 @@ private [akka] object RedisStorageBackend extends } // get the size of the queue - def size(name: String): Int = { + def size(name: String): Int = withErrorHandling { db.llen(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -275,26 +277,28 @@ private [akka] object RedisStorageBackend extends // return an array of items currently stored in the queue // start is the item to begin, count is how many items to return - def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match { - case 1 => - db.lindex(new String(encode(name.getBytes)), start) match { - case None => - throw new Predef.NoSuchElementException("No element at " + start) - case Some(s) => - List(s.getBytes) - } - case n => - db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { - case None => - throw new Predef.NoSuchElementException( - "No element found between " + start + " and " + (start + count - 1)) - case Some(es) => - es.map(_.get.getBytes) - } + def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling { + count match { + case 1 => + db.lindex(new String(encode(name.getBytes)), start) match { + case None => + throw new Predef.NoSuchElementException("No element at " + start) + case Some(s) => + List(s.getBytes) + } + case n => + db.lrange(new String(encode(name.getBytes)), start, start + count - 1) match { + case None => + throw new Predef.NoSuchElementException( + "No element found between " + start + " and " + (start + count - 1)) + case Some(es) => + es.map(_.get.getBytes) + } + } } // completely delete the queue - def remove(name: String): Boolean = { + def remove(name: String): Boolean = withErrorHandling { db.delete(new String(encode(name.getBytes))) match { case Some(1) => true case _ => false @@ -302,7 +306,7 @@ private [akka] object RedisStorageBackend extends } // add item to sorted set identified by name - def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = { + def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling { db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match { case Some(1) => true case _ => false @@ -310,7 +314,7 @@ private [akka] object RedisStorageBackend extends } // remove item from sorted set identified by name - def zrem(name: String, item: Array[Byte]): Boolean = { + def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling { db.zrem(new String(encode(name.getBytes)), new String(item)) match { case Some(1) => true case _ => false @@ -318,7 +322,7 @@ private [akka] object RedisStorageBackend extends } // cardinality of the set identified by name - def zcard(name: String): Int = { + def zcard(name: String): Int = withErrorHandling { db.zcard(new String(encode(name.getBytes))) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -326,7 +330,7 @@ private [akka] object RedisStorageBackend extends } } - def zscore(name: String, item: Array[Byte]): String = { + def zscore(name: String, item: Array[Byte]): String = withErrorHandling { db.zscore(new String(encode(name.getBytes)), new String(item)) match { case None => throw new Predef.NoSuchElementException(new String(item) + " not present") @@ -334,7 +338,7 @@ private [akka] object RedisStorageBackend extends } } - def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = { + def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling { db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match { case None => throw new Predef.NoSuchElementException(name + " not present") @@ -343,5 +347,16 @@ private [akka] object RedisStorageBackend extends } } - def flushDB = db.flushDb + def flushDB = withErrorHandling(db.flushDb) + + private def withErrorHandling[T](body: => T): T = { + try { + body + } catch { + case e: java.lang.NullPointerException => + throw new StorageException("Could not connect to Redis server") + case e => + throw new StorageException("Error in Redis: " + e.getMessage) + } + } } diff --git a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala index 86d4384b70..8c91f0ff61 100644 --- a/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-redis/src/test/scala/RedisPersistentActorSpec.scala @@ -29,6 +29,7 @@ case object LogSize class AccountActor extends Transactor { private lazy val accountState = RedisStorage.newMap private lazy val txnLog = RedisStorage.newVector + //timeout = 5000 def receive = { // check balance @@ -86,6 +87,7 @@ class AccountActor extends Transactor { } @serializable class PersistentFailerActor extends Transactor { + //timeout = 5000 def receive = { case "Failure" => throw new RuntimeException("expected") @@ -138,7 +140,7 @@ class RedisPersistentActorSpec extends TestCase { bactor.start bactor !! Credit("a-123", 5000) - assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get) val failer = new PersistentFailerActor failer.start @@ -147,7 +149,7 @@ class RedisPersistentActorSpec extends TestCase { fail("should throw exception") } catch { case e: RuntimeException => {}} - assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get) + assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get) // should not count the failed one assertEquals(3, (bactor !! LogSize).get) diff --git a/akka.iml b/akka.iml index a39c87020f..74542e8e48 100644 --- a/akka.iml +++ b/akka.iml @@ -15,6 +15,12 @@
    + + + + + +
    diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 296b06428b..5a1c33497b 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -49,6 +49,7 @@ zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 + service = on # FIXME add 'service = on' for name = "default" # The name of the cluster #actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class From 8091b6cb2634a04f0e1537d5e74dfec45e5ca833 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Thu, 4 Mar 2010 23:25:55 +0100 Subject: [PATCH 6/8] Fixing a bug in JGroupsClusterActor --- .../src/main/scala/JGroupsClusterActor.scala | 3 +- .../src/main/scala/ShoalClusterActor.scala | 9 ++-- akka-core/src/main/scala/remote/Cluster.scala | 47 +++++++++---------- .../src/main/scala/remote/RemoteServer.scala | 2 +- 4 files changed, 30 insertions(+), 31 deletions(-) diff --git a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala index 2b6730dd60..12d93ef272 100644 --- a/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala +++ b/akka-cluster/akka-cluster-jgroups/src/main/scala/JGroupsClusterActor.scala @@ -44,7 +44,8 @@ class JGroupsClusterActor extends BasicClusterActor { log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer }) }) - channel.map(_.connect(name)) + + channel.foreach(_.connect(name)) } protected def toOneNode(dest : Address, msg: Array[Byte]): Unit = diff --git a/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala b/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala index c656bd4b81..3d83a46ef3 100644 --- a/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala +++ b/akka-cluster/akka-cluster-shoal/src/main/scala/ShoalClusterActor.scala @@ -85,6 +85,7 @@ class ShoalClusterActor extends BasicClusterActor { */ protected def createCallback : CallBack = { import org.scala_tools.javautils.Imports._ + import ClusterActor._ val me = this new CallBack { def processNotification(signal : Signal) { @@ -92,10 +93,10 @@ class ShoalClusterActor extends BasicClusterActor { signal.acquire() if(isActive) { signal match { - case ms : MessageSignal => me send Message(ms.getMemberToken,ms.getMessage) - case jns : JoinNotificationSignal => me send View(Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) - case fss : FailureSuspectedSignal => me send Zombie(fss.getMemberToken) - case fns : FailureNotificationSignal => me send Zombie(fns.getMemberToken) + case ms : MessageSignal => me send Message[ADDR_T](ms.getMemberToken,ms.getMessage) + case jns : JoinNotificationSignal => me send View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName) + case fss : FailureSuspectedSignal => me send Zombie[ADDR_T](fss.getMemberToken) + case fns : FailureNotificationSignal => me send Zombie[ADDR_T](fns.getMemberToken) case _ => log.debug("Unhandled signal: [%s]",signal) } } diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index c2e9069a01..4313cfe98c 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -48,7 +48,15 @@ private[remote] object ClusterActor { sealed trait ClusterMessage private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage - + private[remote] case class Message[ADDR_T](sender : ADDR_T,msg : Array[Byte]) + private[remote] case object PapersPlease extends ClusterMessage + private[remote] case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage + private[remote] case object Block extends ClusterMessage + private[remote] case object Unblock extends ClusterMessage + private[remote] case class View[ADDR_T](othersPresent : Set[ADDR_T]) extends ClusterMessage + private[remote] case class Zombie[ADDR_T](address: ADDR_T) extends ClusterMessage + private[remote] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage + private[remote] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage private[remote] case class Node(endpoints: List[RemoteAddress]) } @@ -60,16 +68,6 @@ private[remote] object ClusterActor { abstract class BasicClusterActor extends ClusterActor { import ClusterActor._ - case class Message(sender : ADDR_T,msg : Array[Byte]) - case object PapersPlease extends ClusterMessage - case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage - case object Block extends ClusterMessage - case object Unblock extends ClusterMessage - case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage - case class Zombie(address: ADDR_T) extends ClusterMessage - case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage - case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage - type ADDR_T @@ -85,14 +83,14 @@ abstract class BasicClusterActor extends ClusterActor { } def receive = { - case v @ View(members) => { + case v : View[ADDR_T] => { // Not present in the cluster anymore = presumably zombies // Nodes we have no prior knowledge existed = unknowns - val zombies = Set[ADDR_T]() ++ remotes.keySet -- members - val unknown = members -- remotes.keySet + val zombies = Set[ADDR_T]() ++ remotes.keySet -- v.othersPresent + val unknown = v.othersPresent -- remotes.keySet log debug ("Updating view") - log debug ("Other memebers: [%s]",members) + log debug ("Other memebers: [%s]",v.othersPresent) log debug ("Zombies: [%s]",zombies) log debug ("Unknowns: [%s]",unknown) @@ -101,10 +99,10 @@ abstract class BasicClusterActor extends ClusterActor { remotes = remotes -- zombies } - case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead - log debug ("Killing Zombie Node: %s", x) - broadcast(x :: Nil, PapersPlease) - remotes = remotes - x + case z : Zombie[ADDR_T] => { //Ask the presumed zombie for papers and prematurely treat it as dead + log debug ("Killing Zombie Node: %s", z.address) + broadcast(z.address :: Nil, PapersPlease) + remotes = remotes - z.address } case rm @ RelayedMessage(_, _) => { @@ -112,7 +110,8 @@ abstract class BasicClusterActor extends ClusterActor { broadcast(rm) } - case m @ Message(src,msg) => { + case m : Message[ADDR_T] => { + val (src,msg) = (m.sender,m.msg) (Cluster.serializer in (msg, None)) match { case PapersPlease => { @@ -207,7 +206,7 @@ abstract class BasicClusterActor extends ClusterActor { */ object Cluster extends Cluster with Logging { @volatile private[remote] var clusterActor: Option[ClusterActor] = None - @volatile private[remote] var supervisor: Option[Supervisor] = None + @volatile private[remote] var supervisor: Option[Supervisor] = None private[remote] lazy val serializer: Serializer = { val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) @@ -219,9 +218,7 @@ object Cluster extends Cluster with Logging { try { name map { fqn => - val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] - a.start - a + Class.forName(fqn).newInstance.asInstanceOf[ClusterActor] } } catch { @@ -235,7 +232,6 @@ object Cluster extends Cluster with Logging { RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), Supervise(actor, LifeCycle(Permanent)) :: Nil) ).newInstance - sup.start Some(sup) } @@ -258,6 +254,7 @@ object Cluster extends Cluster with Logging { sup <- createSupervisor(actor)) { clusterActor = Some(actor) supervisor = Some(sup) + sup.start } } } diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 6da2ceea99..02cf98bcd2 100644 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer */ object RemoteServer { val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost") - val PORT = config.getInt("akka.remote.server.port", 9999) + val PORT = config.getInt("akka.remote.server.port", 9966) val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000) From efa0cc0d5abb0951404b940c7549a5314141ab39 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Fri, 5 Mar 2010 16:07:06 +0100 Subject: [PATCH 7/8] Fixed last persistence issues with new STM, all test pass --- .../scala/CassandraPersistentActorSpec.scala | 16 +++++----------- .../test/scala/MongoPersistentActorSpec.scala | 13 ++++++------- 2 files changed, 11 insertions(+), 18 deletions(-) diff --git a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala index 0e232f5ce9..a7fed923eb 100644 --- a/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-cassandra/src/test/scala/CassandraPersistentActorSpec.scala @@ -1,13 +1,9 @@ package se.scalablesolutions.akka.state -import se.scalablesolutions.akka.actor.Actor - -import junit.framework.TestCase +import se.scalablesolutions.akka.actor.{Actor, Transactor} import org.junit.Test import org.junit.Assert._ -import org.apache.cassandra.service.CassandraDaemon -import org.junit.BeforeClass import org.junit.Before import org.scalatest.junit.JUnitSuite @@ -28,9 +24,8 @@ case class SetRefStateOneWay(key: String) case class SuccessOneWay(key: String, value: String) case class FailureOneWay(key: String, value: String, failer: Actor) -class CassandraPersistentActor extends Actor { +class CassandraPersistentActor extends Transactor { timeout = 100000 - makeTransactionRequired private lazy val mapState = CassandraStorage.newMap private lazy val vectorState = CassandraStorage.newVector @@ -66,8 +61,7 @@ class CassandraPersistentActor extends Actor { } } -@serializable class PersistentFailerActor extends Actor { - makeTransactionRequired +@serializable class PersistentFailerActor extends Transactor { def receive = { case "Failure" => throw new RuntimeException("expected") @@ -76,8 +70,8 @@ class CassandraPersistentActor extends Actor { class CassandraPersistentActorSpec extends JUnitSuite { - @Before - def startCassandra = EmbeddedCassandraService.start + //@Before + //def startCassandra = EmbeddedCassandraService.start @Test def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = { diff --git a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala index 8681ebadb9..8ad5d94355 100644 --- a/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala +++ b/akka-persistence/akka-persistence-mongo/src/test/scala/MongoPersistentActorSpec.scala @@ -8,7 +8,7 @@ import org.junit.Assert._ import _root_.dispatch.json.{JsNumber, JsValue} import _root_.dispatch.json.Js._ -import se.scalablesolutions.akka.actor.Actor +import se.scalablesolutions.akka.actor.{Transactor, Actor} /** * A persistent actor based on MongoDB storage. @@ -29,10 +29,10 @@ case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor) case class Credit(accountNo: String, amount: BigInt) case object LogSize -class BankAccountActor extends Actor { - makeTransactionRequired - private val accountState = MongoStorage.newMap - private val txnLog = MongoStorage.newVector +class BankAccountActor extends Transactor { + + private lazy val accountState = MongoStorage.newMap + private lazy val txnLog = MongoStorage.newVector def receive: PartialFunction[Any, Unit] = { // check balance @@ -91,8 +91,7 @@ class BankAccountActor extends Actor { } } -@serializable class PersistentFailerActor extends Actor { - makeTransactionRequired +@serializable class PersistentFailerActor extends Transactor { def receive = { case "Failure" => throw new RuntimeException("expected") From 57009b112bb43b2157a7159b7612fc97142b67fa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jonas=20Bon=C3=A9r?= Date: Fri, 5 Mar 2010 17:44:11 +0100 Subject: [PATCH 8/8] removed log.trace that gave bad perf --- akka-core/src/main/scala/actor/Actor.scala | 2 +- akka-core/src/test/scala/PerformanceTest.scala | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index a6b46d903d..60f3967e6c 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -873,7 +873,7 @@ trait Actor extends TransactionManagement { * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods. */ private[akka] def invoke(messageHandle: MessageInvocation) = synchronized { - log.trace("%s is invoked with message %s", toString, messageHandle) + //log.trace("%s is invoked with message %s", toString, messageHandle) try { if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle) else dispatch(messageHandle) diff --git a/akka-core/src/test/scala/PerformanceTest.scala b/akka-core/src/test/scala/PerformanceTest.scala index d58d075202..778e4d45cd 100644 --- a/akka-core/src/test/scala/PerformanceTest.scala +++ b/akka-core/src/test/scala/PerformanceTest.scala @@ -1,4 +1,4 @@ -package test +package se.scalablesolutions.akka import org.scalatest.junit.JUnitSuite import org.junit.Test @@ -279,7 +279,7 @@ class PerformanceTest extends JUnitSuite { var nrOfMessages = 2000000 var nrOfActors = 4 - var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 20) + var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 30) var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40) var ratio: Double = scalaTime.toDouble / akkaTime.toDouble