upgraded to multiverse 0.3-SNAPSHOT + enriched the AMQP API

This commit is contained in:
jboner 2009-10-24 21:21:43 +02:00
parent b876aa58b4
commit 472e4791ad
14 changed files with 406 additions and 322 deletions

View file

@ -55,7 +55,7 @@
<dependency> <dependency>
<groupId>net.lag</groupId> <groupId>net.lag</groupId>
<artifactId>configgy</artifactId> <artifactId>configgy</artifactId>
<version>1.3</version> <version>1.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.guiceyfruit</groupId> <groupId>org.guiceyfruit</groupId>
@ -72,11 +72,6 @@
<artifactId>javautils</artifactId> <artifactId>javautils</artifactId>
<version>2.7.4-0.1</version> <version>2.7.4-0.1</version>
</dependency> </dependency>
<dependency>
<groupId>org.multiverse</groupId>
<artifactId>multiverse</artifactId>
<version>0.3</version>
</dependency>
<!-- For Protocol/Serialization --> <!-- For Protocol/Serialization -->
<dependency> <dependency>

View file

@ -11,8 +11,7 @@ import se.scalablesolutions.akka.state.Committable
import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.{Stm, Transaction => MultiverseTransaction} import org.multiverse.api.{Stm, Transaction => MultiverseTransaction}
import org.multiverse.stms.alpha.AlphaStm import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.utils.GlobalStmInstance
import org.multiverse.utils.TransactionThreadLocal._ import org.multiverse.utils.TransactionThreadLocal._
import org.multiverse.templates.OrElseTemplate import org.multiverse.templates.OrElseTemplate
@ -21,15 +20,6 @@ import scala.collection.mutable.HashMap
class NoTransactionInScopeException extends RuntimeException class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message) class TransactionRetryException(message: String) extends RuntimeException(message)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Multiverse {
val STM: Stm = new AlphaStm
GlobalStmInstance.set(STM)
setThreadLocalTransaction(null)
}
/** /**
* Example of atomic transaction management. * Example of atomic transaction management.
* <pre> * <pre>
@ -58,7 +48,8 @@ object Transaction extends TransactionManagement {
// FIXME implement Transaction::map/flatMap/filter/foreach // FIXME implement Transaction::map/flatMap/filter/foreach
// -- atomic block -------------------------- // -- atomic block --------------------------
def atomic[T](body: => T): T = new AtomicTemplate[T](Multiverse.STM, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) { def atomic[T](body: => T): T = new AtomicTemplate[T](
getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
def execute(mtx: MultiverseTransaction): T = body def execute(mtx: MultiverseTransaction): T = body
override def postStart(mtx: MultiverseTransaction) = { override def postStart(mtx: MultiverseTransaction) = {
val tx = new Transaction val tx = new Transaction
@ -71,6 +62,7 @@ object Transaction extends TransactionManagement {
} }
}.execute() }.execute()
// FIXME: add these other atomic methods
/* /*
def atomic[T](retryCount: Int)(body: => T): T = new AtomicTemplate[T](Multiverse.STM, "akka", false, false, retryCount) { def atomic[T](retryCount: Int)(body: => T): T = new AtomicTemplate[T](Multiverse.STM, "akka", false, false, retryCount) {
def execute(mtx: MultiverseTransaction): T = body def execute(mtx: MultiverseTransaction): T = body

View file

@ -4,14 +4,14 @@
package se.scalablesolutions.akka.state package se.scalablesolutions.akka.state
import se.scalablesolutions.akka.stm.{TransactionManagement} import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.collection._ import se.scalablesolutions.akka.collection._
import org.multiverse.templates.AtomicTemplate import org.multiverse.templates.AtomicTemplate
import org.multiverse.api.Transaction import org.multiverse.api.Transaction
import org.multiverse.datastructures.refs.manual.Ref; import org.multiverse.datastructures.refs.manual.Ref;
import org.codehaus.aspectwerkz.proxy.Uuid import org.codehaus.aspectwerkz.proxy.Uuid
/** /**
@ -47,7 +47,7 @@ object TransactionalState {
*/ */
@serializable @serializable
trait Transactional { trait Transactional {
// FIXME: won't work across the cluster // FIXME: won't work across the remote machines, use [http://johannburkard.de/software/uuid/]
var uuid = Uuid.newUuid.toString var uuid = Uuid.newUuid.toString
} }
@ -62,6 +62,7 @@ trait Committable {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object TransactionalRef { object TransactionalRef {
/** /**
* An implicit conversion that converts an option to an iterable value * An implicit conversion that converts an option to an iterable value
*/ */
@ -78,7 +79,7 @@ object TransactionalRef {
class TransactionalRef[T] extends Transactional { class TransactionalRef[T] extends Transactional {
import org.multiverse.utils.TransactionThreadLocal._ import org.multiverse.utils.TransactionThreadLocal._
private[this] val ref: Ref[T] = new Ref[T]//Ref.createCommittedRef[T] private[this] val ref: Ref[T] = atomic { new Ref }
def swap(elem: T) = ref.set(elem) def swap(elem: T) = ref.set(elem)

View file

@ -27,9 +27,9 @@ class InMemStatefulActor extends Actor {
timeout = 100000 timeout = 100000
makeTransactionRequired makeTransactionRequired
private lazy val mapState: TransactionalMap[String, String] = TransactionalState.newMap[String, String] private lazy val mapState = TransactionalState.newMap[String, String]
private lazy val vectorState: TransactionalVector[String] = TransactionalState.newVector[String] private lazy val vectorState = TransactionalState.newVector[String]
private lazy val refState: TransactionalRef[String] = TransactionalState.newRef[String] private lazy val refState = TransactionalState.newRef[String]
def receive: PartialFunction[Any, Unit] = { def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) => case GetMapState(key) =>

View file

@ -51,14 +51,25 @@ object AMQP extends Actor {
start start
// ====== MESSAGES ===== // ====== MESSAGES =====
class Message(val payload: AnyRef, val routingKey: String, val mandatory: Boolean, val immediate: Boolean, val properties: RabbitMQ.BasicProperties) { class Message(val payload: AnyRef,
override def toString(): String = "Message[payload=" + payload + ", routingKey=" + routingKey + ", properties=" + properties + "]" val routingKey: String,
val mandatory: Boolean,
val immediate: Boolean,
val properties: RabbitMQ.BasicProperties) {
override def toString(): String =
"Message[payload=" + payload +
", routingKey=" + routingKey +
", mandatory=" + mandatory +
", immediate=" + immediate +
", properties=" + properties + "]"
} }
object Message { object Message {
def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] = def unapply(message: Message): Option[Tuple5[AnyRef, String, Boolean, Boolean, RabbitMQ.BasicProperties]] =
Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties)) Some((message.payload, message.routingKey, message.mandatory, message.immediate, message.properties))
def apply(payload: AnyRef, routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message = def apply(payload: AnyRef, routingKey: String, mandatory: Boolean, immediate: Boolean, properties: RabbitMQ.BasicProperties): Message =
new Message(payload, routingKey, mandatory, immediate, properties) new Message(payload, routingKey, mandatory, immediate, properties)
def apply(payload: AnyRef, routingKey: String): Message = def apply(payload: AnyRef, routingKey: String): Message =
new Message(payload, routingKey, false, false, null) new Message(payload, routingKey, false, false, null)
} }

View file

@ -88,17 +88,4 @@ public class InMemStateful {
public void postRestart() { public void postRestart() {
System.out.println("################ POST RESTART"); System.out.println("################ POST RESTART");
} }
/*
public void clashOk(String key, String msg, InMemClasher clasher) {
mapState.put(key, msg);
clasher.clash();
}
public void clashNotOk(String key, String msg, InMemClasher clasher) {
mapState.put(key, msg);
clasher.clash();
this.success("clash", "clash");
}
*/
} }

View file

@ -67,7 +67,7 @@
<dependency> <dependency>
<groupId>net.lag</groupId> <groupId>net.lag</groupId>
<artifactId>configgy</artifactId> <artifactId>configgy</artifactId>
<version>1.3</version> <version>1.4</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.guiceyfruit</groupId> <groupId>org.guiceyfruit</groupId>
@ -101,8 +101,13 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.multiverse</groupId> <groupId>org.multiverse</groupId>
<artifactId>multiverse</artifactId> <artifactId>multiverse-core</artifactId>
<version>0.3</version> <version>0.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.multiverse</groupId>
<artifactId>multiverse-alpha</artifactId>
<version>0.3-SNAPSHOT</version>
</dependency> </dependency>
<dependency> <dependency>
<groupId>com.rabbitmq</groupId> <groupId>com.rabbitmq</groupId>

View file

@ -24,9 +24,8 @@ object Kernel extends Logging {
import Config._ import Config._
val BOOT_CLASSES = config.getList("akka.boot") val BOOT_CLASSES = config.getList("akka.boot")
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", false) val RUN_REMOTE_SERVICE = config.getBool("akka.remote.server.service", true)
val RUN_REST_SERVICE = config.getBool("akka.rest.service", false) val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost") val REST_HOSTNAME = config.getString("akka.rest.hostname", "localhost")
val REST_URL = "http://" + REST_HOSTNAME val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = config.getInt("akka.rest.port", 9998) val REST_PORT = config.getInt("akka.rest.port", 9998)

View file

@ -27,8 +27,13 @@
</dependency> </dependency>
<dependency> <dependency>
<groupId>org.multiverse</groupId> <groupId>org.multiverse</groupId>
<artifactId>multiverse</artifactId> <artifactId>multiverse-core</artifactId>
<version>0.3</version> <version>0.3-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.multiverse</groupId>
<artifactId>multiverse-alpha</artifactId>
<version>0.3-SNAPSHOT</version>
</dependency> </dependency>
</dependencies> </dependencies>

View file

@ -1,5 +1,6 @@
package se.scalablesolutions.akka.stm; package se.scalablesolutions.akka.stm;
import org.multiverse.templates.AbortedException;
import org.multiverse.api.Stm; import org.multiverse.api.Stm;
import org.multiverse.api.Transaction; import org.multiverse.api.Transaction;
import org.multiverse.api.TransactionStatus; import org.multiverse.api.TransactionStatus;
@ -7,13 +8,16 @@ import org.multiverse.api.exceptions.CommitFailureException;
import org.multiverse.api.exceptions.LoadException; import org.multiverse.api.exceptions.LoadException;
import org.multiverse.api.exceptions.RetryError; import org.multiverse.api.exceptions.RetryError;
import org.multiverse.api.exceptions.TooManyRetriesException; import org.multiverse.api.exceptions.TooManyRetriesException;
import org.multiverse.utils.GlobalStmInstance; import static org.multiverse.api.GlobalStmInstance.getGlobalStmInstance;
import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction; import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction;
import static org.multiverse.utils.TransactionThreadLocal.setThreadLocalTransaction; import static org.multiverse.utils.TransactionThreadLocal.setThreadLocalTransaction;
import static java.lang.String.format;
import java.util.logging.Logger;
/** /**
* A Template that handles the boilerplate code for transactions. A transaction will be placed if * A Template that handles the boilerplate code for transactions. A transaction will be placed if none is available
* none is available around a section and if all goes right, commits at the end. * around a section and if all goes right, commits at the end.
* <p/> * <p/>
* example: * example:
* <pre> * <pre>
@ -25,280 +29,308 @@ import static org.multiverse.utils.TransactionThreadLocal.setThreadLocalTransact
* }.execute(); * }.execute();
* </pre> * </pre>
* <p/> * <p/>
* It could also be that the transaction is retried (e.g. caused by optimistic locking failures). This is also a * It could also be that the transaction is retried (e.g. caused by optimistic locking failures). This is also a task
* task for template. In the future this retry behavior will be customizable. * for template. In the future this retry behavior will be customizable.
* <p/> * <p/>
* If a transaction already is available on the TransactionThreadLocal, no new transaction is started * If a transaction already is available on the TransactionThreadLocal, no new transaction is started and essentially
* and essentially the whole AtomicTemplate is ignored. * the whole AtomicTemplate is ignored.
* <p/> * <p/>
* If no transaction is available on the TransactionThreadLocal, a new one will be created and used * If no transaction is available on the TransactionThreadLocal, a new one will be created and used during the execution
* during the execution of the AtomicTemplate and will be removed once the AtomicTemplate finishes. * of the AtomicTemplate and will be removed once the AtomicTemplate finishes.
* <p/> * <p/>
* All uncaught throwable's lead to a rollback of the transaction. * All uncaught throwable's lead to a rollback of the transaction.
* <p/> * <p/>
* AtomicTemplates are not thread-safe to use. * AtomicTemplates are not thread-safe to use.
* <p/> * <p/>
* AtomicTemplates can completely work without threadlocals. See the * AtomicTemplates can completely work without threadlocals. See the {@link AtomicTemplate#AtomicTemplate(org.multiverse.api.Stm
* {@link AtomicTemplate#AtomicTemplate(org.multiverse.api.Stm ,String, boolean, boolean, int)} for more information. * ,String, boolean, boolean, int)} for more information.
* *
* @author Peter Veentjer * @author Peter Veentjer
*/ */
public abstract class AtomicTemplate<E> { public abstract class AtomicTemplate<E> {
private final Stm stm;
private final boolean ignoreThreadLocalTransaction;
private final int retryCount;
private final boolean readonly;
private int attemptCount;
private final String familyName;
/** private final static Logger logger = Logger.getLogger(AtomicTemplate.class.getName());
* Creates a new AtomicTemplate that uses the STM stored in the GlobalStm and
* works the the {@link org.multiverse.utils.TransactionThreadLocal}.
*/
public AtomicTemplate() {
this(GlobalStmInstance.get());
}
public AtomicTemplate(boolean readonly) { private final Stm stm;
this(GlobalStmInstance.get(), null, false, readonly, Integer.MAX_VALUE); private final boolean ignoreThreadLocalTransaction;
} private final int retryCount;
private final boolean readonly;
private int attemptCount;
private final String familyName;
/** /**
* Creates a new AtomicTemplate using the provided stm. The transaction used * Creates a new AtomicTemplate that uses the STM stored in the GlobalStm and works the the {@link
* is stores/retrieved from the {@link org.multiverse.utils.TransactionThreadLocal}. * org.multiverse.utils.TransactionThreadLocal}.
* */
* @param stm the stm to use for transactions. public AtomicTemplate() {
* @throws NullPointerException if stm is null. this(getGlobalStmInstance());
*/
public AtomicTemplate(Stm stm) {
this(stm, null, false, false, Integer.MAX_VALUE);
}
public AtomicTemplate(String familyName, boolean readonly, int retryCount) {
this(GlobalStmInstance.get(), familyName, false, readonly, retryCount);
}
/**
* Creates a new AtomicTemplate that uses the provided STM. This method is provided
* to make Multiverse easy to integrate with environment that don't want to depend on
* threadlocals.
*
* @param stm the stm to use for transactions.
* @param ignoreThreadLocalTransaction true if this Template should completely ignore
* the ThreadLocalTransaction. This is useful for using the AtomicTemplate in other
* environments that don't want to depend on threadlocals but do want to use the AtomicTemplate.
* @throws NullPointerException if stm is null.
*/
public AtomicTemplate(Stm stm, String familyName, boolean ignoreThreadLocalTransaction, boolean readonly, int retryCount) {
if (stm == null) {
throw new NullPointerException();
} }
if (retryCount < 0) {
throw new IllegalArgumentException(); public AtomicTemplate(boolean readonly) {
this(getGlobalStmInstance(), null, false, readonly, Integer.MAX_VALUE);
} }
this.stm = stm;
this.ignoreThreadLocalTransaction = ignoreThreadLocalTransaction;
this.readonly = readonly;
this.retryCount = retryCount;
this.familyName = familyName;
}
public String getFamilyName() { /**
return familyName; * Creates a new AtomicTemplate using the provided stm. The transaction used is stores/retrieved from the {@link
} * org.multiverse.utils.TransactionThreadLocal}.
*
/** * @param stm the stm to use for transactions.
* Returns the current attempt. Value will always be larger than zero and increases * @throws NullPointerException if stm is null.
* everytime the transaction needs to be retried. */
* public AtomicTemplate(Stm stm) {
* @return the current attempt count. this(stm, null, false, false, Integer.MAX_VALUE);
*/
public final int getAttemptCount() {
return attemptCount;
}
/**
* Returns the number of retries that this AtomicTemplate is allowed to do. The returned
* value will always be equal or larger than 0.
*
* @return the number of retries.
*/
public final int getRetryCount() {
return retryCount;
}
/**
* Returns the {@link Stm} used by this AtomicTemplate to execute transactions on.
*
* @return the Stm used by this AtomicTemplate.
*/
public final Stm getStm() {
return stm;
}
/**
* Check if this AtomicTemplate ignores the ThreadLocalTransaction.
*
* @return true if this AtomicTemplate ignores the ThreadLocalTransaction, false otherwise.
*/
public final boolean isIgnoreThreadLocalTransaction() {
return ignoreThreadLocalTransaction;
}
/**
* Checks if this AtomicTemplate executes readonly transactions.
*
* @return true if it executes readonly transactions, false otherwise.
*/
public final boolean isReadonly() {
return readonly;
}
/**
* This is the method that needs to be implemented.
*
* @param t the transaction used for this execution.
* @return the result of the execution.
* @throws Exception the Exception thrown
*/
public abstract E execute(Transaction t) throws Exception;
/**
* This is the method can be overridden to do pre-start tasks.
*/
public void preStart() {
}
/**
* This is the method can be overridden to do post-start tasks.
*
* @param t the transaction used for this execution.
*/
public void postStart(Transaction t) {
}
/**
* This is the method can be overridden to do pre-commit tasks.
*/
public void preCommit() {
}
/**
* This is the method can be overridden to do post-commit tasks.
*/
public void postCommit() {
}
/**
* Executes the template.
*
* @return the result of the {@link #execute(org.multiverse.api.Transaction)} method.
* @throws InvisibleCheckedException if a checked exception was thrown while executing the
* {@link #execute(org.multiverse.api.Transaction)} method.
*/
public final E execute() {
try {
return executeChecked();
} catch (Exception ex) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new AtomicTemplate.InvisibleCheckedException(ex);
}
} }
}
/** public AtomicTemplate(String familyName, boolean readonly, int retryCount) {
* Executes the Template and rethrows the checked exception instead of wrapping it this(getGlobalStmInstance(), familyName, false, readonly, retryCount);
* in a InvisibleCheckedException. }
*
* @return the result /**
* @throws Exception the Exception thrown inside the {@link #execute(org.multiverse.api.Transaction)} * Creates a new AtomicTemplate that uses the provided STM. This method is provided to make Multiverse easy to
* method. * integrate with environment that don't want to depend on threadlocals.
*/ *
public final E executeChecked() throws Exception { * @param stm the stm to use for transactions.
preStart(); * @param ignoreThreadLocalTransaction true if this Template should completely ignore the ThreadLocalTransaction.
Transaction t = getTransaction(); * This is useful for using the AtomicTemplate in other environments that don't
if (t == null || t.getStatus() != TransactionStatus.active) { * want to depend on threadlocals but do want to use the AtomicTemplate.
t = readonly ? stm.startReadOnlyTransaction(familyName) : stm.startUpdateTransaction(familyName); * @throws NullPointerException if stm is null.
setTransaction(t); */
postStart(t); public AtomicTemplate(Stm stm, String familyName, boolean ignoreThreadLocalTransaction, boolean readonly,
try { int retryCount) {
attemptCount = 1; if (stm == null) {
while (attemptCount - 1 <= retryCount) { throw new NullPointerException();
boolean abort = true; }
boolean reset = false; if (retryCount < 0) {
try { throw new IllegalArgumentException();
E result = execute(t); }
preCommit(); this.stm = stm;
t.commit(); this.ignoreThreadLocalTransaction = ignoreThreadLocalTransaction;
abort = false; this.readonly = readonly;
reset = false; this.retryCount = retryCount;
postCommit(); this.familyName = familyName;
return result; }
} catch (RetryError e) {
t.abortAndWaitForRetry(); public String getFamilyName() {
//since the abort is already done, no need to do it again. return familyName;
abort = false; }
} catch (CommitFailureException ex) {
reset = true; /**
//ignore, just retry the transaction * Returns the current attempt. Value will always be larger than zero and increases everytime the transaction needs
} catch (LoadException ex) { * to be retried.
reset = true; *
//ignore, just retry the transaction * @return the current attempt count.
} finally { */
if (abort) { public final int getAttemptCount() {
t.abort(); return attemptCount;
if (reset) { }
t.reset();
} /**
* Returns the number of retries that this AtomicTemplate is allowed to do. The returned value will always be equal
* or larger than 0.
*
* @return the number of retries.
*/
public final int getRetryCount() {
return retryCount;
}
/**
* Returns the {@link Stm} used by this AtomicTemplate to execute transactions on.
*
* @return the Stm used by this AtomicTemplate.
*/
public final Stm getStm() {
return stm;
}
/**
* Check if this AtomicTemplate ignores the ThreadLocalTransaction.
*
* @return true if this AtomicTemplate ignores the ThreadLocalTransaction, false otherwise.
*/
public final boolean isIgnoreThreadLocalTransaction() {
return ignoreThreadLocalTransaction;
}
/**
* Checks if this AtomicTemplate executes readonly transactions.
*
* @return true if it executes readonly transactions, false otherwise.
*/
public final boolean isReadonly() {
return readonly;
}
/**
* This is the method can be overridden to do pre-start tasks.
*/
public void preStart() {
}
/**
* This is the method can be overridden to do post-start tasks.
*
* @param t the transaction used for this execution.
*/
public void postStart(Transaction t) {
}
/**
* This is the method can be overridden to do pre-commit tasks.
*/
public void preCommit() {
}
/**
* This is the method can be overridden to do post-commit tasks.
*/
public void postCommit() {
}
/**
* This is the method that needs to be implemented.
*
* @param t the transaction used for this execution.
* @return the result of the execution.
*
* @throws Exception the Exception thrown
*/
public abstract E execute(Transaction t) throws Exception;
/**
* Executes the template.
*
* @return the result of the {@link #execute(org.multiverse.api.Transaction)} method.
*
* @throws InvisibleCheckedException if a checked exception was thrown while executing the {@link
* #execute(org.multiverse.api.Transaction)} method.
* @throws AbortedException if the exception was explicitly aborted.
* @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last
* failure (also an exception) is included as cause. So you have some idea where
* to look for problems
*/
public final E execute() {
try {
return executeChecked();
} catch (Exception ex) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new AtomicTemplate.InvisibleCheckedException(ex);
} }
} }
attemptCount++; }
/**
* Executes the Template and rethrows the checked exception instead of wrapping it in a InvisibleCheckedException.
*
* @return the result
*
* @throws Exception the Exception thrown inside the {@link #execute(org.multiverse.api.Transaction)}
* method.
* @throws AbortedException if the exception was explicitly aborted.
* @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last
* failure (also an exception) is included as cause. So you have some idea where to
* look for problems
*/
public final E executeChecked() throws Exception {
preStart();
Transaction t = getTransaction();
if (noUsableTransaction(t)) {
t = startTransaction();
setTransaction(t);
postStart(t);
try {
attemptCount = 1;
Exception lastRetryCause = null;
while (attemptCount - 1 <= retryCount) {
boolean abort = true;
boolean reset = false;
try {
E result = execute(t);
if (t.getStatus().equals(TransactionStatus.aborted)) {
String msg = format("Transaction with familyname %s is aborted", t.getFamilyName());
throw new AbortedException(msg);
}
preCommit();
t.commit();
abort = false;
reset = false;
postCommit();
return result;
} catch (RetryError e) {
t.abortAndWaitForRetry();
//since the abort is already done, no need to do it again.
abort = false;
} catch (CommitFailureException ex) {
lastRetryCause = ex;
reset = true;
//ignore, just retry the transaction
} catch (LoadException ex) {
lastRetryCause = ex;
reset = true;
//ignore, just retry the transaction
} finally {
if (abort) {
t.abort();
if (reset) {
t.reset();
}
}
}
attemptCount++;
}
throw new TooManyRetriesException("Too many retries", lastRetryCause);
} finally {
setTransaction(null);
}
} else {
return execute(t);
}
}
private Transaction startTransaction() {
return readonly ? stm.startReadOnlyTransaction(familyName) : stm.startUpdateTransaction(familyName);
}
private boolean noUsableTransaction(Transaction t) {
return t == null || t.getStatus() != TransactionStatus.active;
}
/**
* Gets the current Transaction stored in the TransactionThreadLocal.
* <p/>
* If the ignoreThreadLocalTransaction is set, the threadlocal stuff is completeley ignored.
*
* @return the found transaction, or null if none is found.
*/
private Transaction getTransaction() {
return ignoreThreadLocalTransaction ? null : getThreadLocalTransaction();
}
/**
* Stores the transaction in the TransactionThreadLocal.
* <p/>
* This call is ignored if the ignoreThreadLocalTransaction is true.
*
* @param t the transaction to set (is allowed to be null).
*/
private void setTransaction(Transaction t) {
if (!ignoreThreadLocalTransaction) {
setThreadLocalTransaction(t);
}
}
public static class InvisibleCheckedException extends RuntimeException {
public InvisibleCheckedException(Exception cause) {
super(cause);
} }
throw new TooManyRetriesException(); @Override
} finally { public Exception getCause() {
setTransaction(null); return (Exception) super.getCause();
} }
} else {
return execute(t);
} }
} }
/**
* Gets the current Transaction stored in the TransactionThreadLocal.
* <p/>
* If the ignoreThreadLocalTransaction is set, the threadlocal stuff
* is completeley ignored.
*
* @return the found transaction, or null if none is found.
*/
private Transaction getTransaction() {
return ignoreThreadLocalTransaction ? null : getThreadLocalTransaction();
}
/**
* Stores the transaction in the TransactionThreadLocal.
* <p/>
* This call is ignored if the ignoreThreadLocalTransaction is true.
*
* @param t the transaction to set (is allowed to be null).
*/
private void setTransaction(Transaction t) {
if (!ignoreThreadLocalTransaction) {
setThreadLocalTransaction(t);
}
}
public static class InvisibleCheckedException extends RuntimeException {
public InvisibleCheckedException(Exception cause) {
super(cause);
}
@Override
public Exception getCause() {
return (Exception) super.getCause();
}
}
}

View file

@ -49,14 +49,6 @@
<client> <client>
</remote> </remote>
<rest>
service = on
hostname = "localhost"
port = 9998
filters = "se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"
authenticator = "sample.secure.SimpleAuthenticationService"
</rest>
<storage> <storage>
<cassandra> <cassandra>
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds

View file

@ -1,5 +1,78 @@
include "akka-reference.conf" #include "akka-reference.conf"
# This config import the Akka reference configuration. # This config import the Akka reference configuration.
# In this file you can override any option defined in the 'akka-reference.conf' file. # In this file you can override any option defined in the 'akka-reference.conf' file.
####################
# Akka Config File #
####################
# This file has all the default settings, so all these could be remove with no visible effect.
# Modify as needed.
<log>
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on
# syslog_host = ""
# syslog_server_name = ""
</log>
<akka>
version = "0.6"
# FQN to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.java.Boot", "sample.scala.Boot", "sample.secure.Boot"]
<actor>
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
</actor>
<stm>
service = on
max-nr-of-retries = 10
restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
# if 'off' then throws an exception or rollback for user to handle
wait-for-completion = 1000 # how long time in millis a transaction should be given time to complete when a collision is detected
wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
distributed = off # not implemented yet
</stm>
<rest>
service = true
hostname = "localhost"
port = 9998
filters = "se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"
authenticator = "sample.secure.SimpleAuthenticationService"
</rest>
<remote>
<server>
service = on
hostname = "localhost"
port = 9999
connection-timeout = 1000 # in millis (1 sec default)
<server>
<client>
reconnect-delay = 5000 # in millis (5 sec default)
read-timeout = 10000 # in millis (10 sec default)
<client>
</remote>
<storage>
<cassandra>
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
port = 9160
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
consistency-level = 1
</cassandra>
<mongodb>
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017
dbname = "mydb"
</mongodb>
</storage>
</akka>

View file

@ -1,8 +0,0 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.multiverse</groupId>
<artifactId>multiverse</artifactId>
<version>0.3</version>
<packaging>jar</packaging>
</project>