finalized new STM with Multiverse backend + cleaned up Active Object config and factory classes

This commit is contained in:
jboner 2009-10-17 00:37:56 +02:00
commit e82998f182
45 changed files with 1270 additions and 1138 deletions

View file

@ -0,0 +1,304 @@
package se.scalablesolutions.akka.stm;
import org.multiverse.api.Stm;
import org.multiverse.api.Transaction;
import org.multiverse.api.TransactionStatus;
import org.multiverse.api.exceptions.CommitFailureException;
import org.multiverse.api.exceptions.LoadException;
import org.multiverse.api.exceptions.RetryError;
import org.multiverse.api.exceptions.TooManyRetriesException;
import org.multiverse.utils.GlobalStmInstance;
import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction;
import static org.multiverse.utils.TransactionThreadLocal.setThreadLocalTransaction;
/**
* A Template that handles the boilerplate code for transactions. A transaction will be placed if
* none is available around a section and if all goes right, commits at the end.
* <p/>
* example:
* <pre>
* new AtomicTemplate(){
* Object execute(Transaction t){
* queue.push(1);
* return null;
* }
* }.execute();
* </pre>
* <p/>
* It could also be that the transaction is retried (e.g. caused by optimistic locking failures). This is also a
* task for template. In the future this retry behavior will be customizable.
* <p/>
* If a transaction already is available on the TransactionThreadLocal, no new transaction is started
* and essentially the whole AtomicTemplate is ignored.
* <p/>
* If no transaction is available on the TransactionThreadLocal, a new one will be created and used
* during the execution of the AtomicTemplate and will be removed once the AtomicTemplate finishes.
* <p/>
* All uncaught throwable's lead to a rollback of the transaction.
* <p/>
* AtomicTemplates are not thread-safe to use.
* <p/>
* AtomicTemplates can completely work without threadlocals. See the
* {@link AtomicTemplate#AtomicTemplate(org.multiverse.api.Stm ,String, boolean, boolean, int)} for more information.
*
* @author Peter Veentjer
*/
public abstract class AtomicTemplate<E> {
private final Stm stm;
private final boolean ignoreThreadLocalTransaction;
private final int retryCount;
private final boolean readonly;
private int attemptCount;
private final String familyName;
/**
* Creates a new AtomicTemplate that uses the STM stored in the GlobalStm and
* works the the {@link org.multiverse.utils.TransactionThreadLocal}.
*/
public AtomicTemplate() {
this(GlobalStmInstance.get());
}
public AtomicTemplate(boolean readonly) {
this(GlobalStmInstance.get(), null, false, readonly, Integer.MAX_VALUE);
}
/**
* Creates a new AtomicTemplate using the provided stm. The transaction used
* is stores/retrieved from the {@link org.multiverse.utils.TransactionThreadLocal}.
*
* @param stm the stm to use for transactions.
* @throws NullPointerException if stm is null.
*/
public AtomicTemplate(Stm stm) {
this(stm, null, false, false, Integer.MAX_VALUE);
}
public AtomicTemplate(String familyName, boolean readonly, int retryCount) {
this(GlobalStmInstance.get(), familyName, false, readonly, retryCount);
}
/**
* Creates a new AtomicTemplate that uses the provided STM. This method is provided
* to make Multiverse easy to integrate with environment that don't want to depend on
* threadlocals.
*
* @param stm the stm to use for transactions.
* @param ignoreThreadLocalTransaction true if this Template should completely ignore
* the ThreadLocalTransaction. This is useful for using the AtomicTemplate in other
* environments that don't want to depend on threadlocals but do want to use the AtomicTemplate.
* @throws NullPointerException if stm is null.
*/
public AtomicTemplate(Stm stm, String familyName, boolean ignoreThreadLocalTransaction, boolean readonly, int retryCount) {
if (stm == null) {
throw new NullPointerException();
}
if (retryCount < 0) {
throw new IllegalArgumentException();
}
this.stm = stm;
this.ignoreThreadLocalTransaction = ignoreThreadLocalTransaction;
this.readonly = readonly;
this.retryCount = retryCount;
this.familyName = familyName;
}
public String getFamilyName() {
return familyName;
}
/**
* Returns the current attempt. Value will always be larger than zero and increases
* everytime the transaction needs to be retried.
*
* @return the current attempt count.
*/
public final int getAttemptCount() {
return attemptCount;
}
/**
* Returns the number of retries that this AtomicTemplate is allowed to do. The returned
* value will always be equal or larger than 0.
*
* @return the number of retries.
*/
public final int getRetryCount() {
return retryCount;
}
/**
* Returns the {@link Stm} used by this AtomicTemplate to execute transactions on.
*
* @return the Stm used by this AtomicTemplate.
*/
public final Stm getStm() {
return stm;
}
/**
* Check if this AtomicTemplate ignores the ThreadLocalTransaction.
*
* @return true if this AtomicTemplate ignores the ThreadLocalTransaction, false otherwise.
*/
public final boolean isIgnoreThreadLocalTransaction() {
return ignoreThreadLocalTransaction;
}
/**
* Checks if this AtomicTemplate executes readonly transactions.
*
* @return true if it executes readonly transactions, false otherwise.
*/
public final boolean isReadonly() {
return readonly;
}
/**
* This is the method that needs to be implemented.
*
* @param t the transaction used for this execution.
* @return the result of the execution.
* @throws Exception the Exception thrown
*/
public abstract E execute(Transaction t) throws Exception;
/**
* This is the method can be overridden to do pre-start tasks.
*/
public void preStart() {
}
/**
* This is the method can be overridden to do post-start tasks.
*
* @param t the transaction used for this execution.
*/
public void postStart(Transaction t) {
}
/**
* This is the method can be overridden to do pre-commit tasks.
*/
public void preCommit() {
}
/**
* This is the method can be overridden to do post-commit tasks.
*/
public void postCommit() {
}
/**
* Executes the template.
*
* @return the result of the {@link #execute(org.multiverse.api.Transaction)} method.
* @throws InvisibleCheckedException if a checked exception was thrown while executing the
* {@link #execute(org.multiverse.api.Transaction)} method.
*/
public final E execute() {
try {
return executeChecked();
} catch (Exception ex) {
if (ex instanceof RuntimeException) {
throw (RuntimeException) ex;
} else {
throw new AtomicTemplate.InvisibleCheckedException(ex);
}
}
}
/**
* Executes the Template and rethrows the checked exception instead of wrapping it
* in a InvisibleCheckedException.
*
* @return the result
* @throws Exception the Exception thrown inside the {@link #execute(org.multiverse.api.Transaction)}
* method.
*/
public final E executeChecked() throws Exception {
preStart();
Transaction t = getTransaction();
if (t == null || t.getStatus() != TransactionStatus.active) {
t = readonly ? stm.startReadOnlyTransaction(familyName) : stm.startUpdateTransaction(familyName);
setTransaction(t);
postStart(t);
try {
attemptCount = 1;
while (attemptCount - 1 <= retryCount) {
boolean abort = true;
boolean reset = false;
try {
E result = execute(t);
preCommit();
t.commit();
abort = false;
reset = false;
postCommit();
return result;
} catch (RetryError e) {
t.abortAndWaitForRetry();
//since the abort is already done, no need to do it again.
abort = false;
} catch (CommitFailureException ex) {
reset = true;
//ignore, just retry the transaction
} catch (LoadException ex) {
reset = true;
//ignore, just retry the transaction
} finally {
if (abort) {
t.abort();
if (reset) {
t.reset();
}
}
}
attemptCount++;
}
throw new TooManyRetriesException();
} finally {
setTransaction(null);
}
} else {
return execute(t);
}
}
/**
* Gets the current Transaction stored in the TransactionThreadLocal.
* <p/>
* If the ignoreThreadLocalTransaction is set, the threadlocal stuff
* is completeley ignored.
*
* @return the found transaction, or null if none is found.
*/
private Transaction getTransaction() {
return ignoreThreadLocalTransaction ? null : getThreadLocalTransaction();
}
/**
* Stores the transaction in the TransactionThreadLocal.
* <p/>
* This call is ignored if the ignoreThreadLocalTransaction is true.
*
* @param t the transaction to set (is allowed to be null).
*/
private void setTransaction(Transaction t) {
if (!ignoreThreadLocalTransaction) {
setThreadLocalTransaction(t);
}
}
public static class InvisibleCheckedException extends RuntimeException {
public InvisibleCheckedException(Exception cause) {
super(cause);
}
@Override
public Exception getCause() {
return (Exception) super.getCause();
}
}
}

View file

@ -1,303 +0,0 @@
package se.scalablesolutions.akka.stm;
import org.multiverse.api.Stm;
import static org.multiverse.api.StmUtils.retry;
import org.multiverse.api.Transaction;
import org.multiverse.api.exceptions.LoadUncommittedException;
import org.multiverse.api.exceptions.ReadonlyException;
import org.multiverse.datastructures.refs.ManagedRef;
import org.multiverse.stms.alpha.*;
import org.multiverse.stms.alpha.mixins.FastAtomicObjectMixin;
import org.multiverse.templates.AtomicTemplate;
import org.multiverse.utils.GlobalStmInstance;
import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction;
import static java.lang.String.format;
/**
* A manual instrumented {@link org.multiverse.datastructures.refs.ManagedRef} implementation.
* If this class is used, you don't need to worry about instrumentation/javaagents and
* stuff like this.
* <p/>
* It is added to get the Akka project up and running, but probably will removed when the instrumentation
* is 100% up and running and this can be done compiletime instead of messing with javaagents.
*
* @author Peter Veentjer
*/
public final class Ref<E> extends FastAtomicObjectMixin implements ManagedRef<E> {
final public static class NoTransactionInScopeException extends RuntimeException {
}
/**
* Creates a committed ref with a null value using the Stm in the
* {@link GlobalStmInstance}.
*
* @return the created ref.
* @see #createCommittedRef(org.multiverse.api.Stm, Object)
*/
public static <E> Ref<E> createCommittedRef() {
return createCommittedRef(GlobalStmInstance.get(), null);
}
/**
* Creates a committed ref with a null value.
*
* @param stm the {@Stm} used for committing the ref.
* @return the created ref.
* @see #createCommittedRef(org.multiverse.api.Stm, Object)
*/
public static <E> Ref<E> createCommittedRef(Stm stm) {
return createCommittedRef(stm, null);
}
/**
* Creates a committed ref with the given value using the Stm in the
* {@link GlobalStmInstance}.
*
* @param value the initial value of the Ref.
* @return the created ref.
* @see #createCommittedRef(org.multiverse.api.Stm, Object)
*/
public static <E> Ref<E> createCommittedRef(E value) {
return createCommittedRef(GlobalStmInstance.get(), value);
}
/**
* Creates a committed ref with the given value and using the given Stm.
* <p/>
* This factory method should be called when one doesn't want to lift on the current
* transaction, but you want something to be committed whatever happens. In the future
* behavior will be added propagation levels. But for the time being this is the 'expect_new'
* implementation of this propagation level.
* <p/>
* If the value is an atomicobject or has a reference to it (perhaps indirectly), and
* the transaction this atomicobject is created in is aborted (or hasn't committed) yet,
* you will get the dreaded {@link org.multiverse.api.exceptions.LoadUncommittedException}.
*
* @param stm the {@Stm} used for committing the ref.
* @param value the initial value of the ref. The value is allowed to be null.
* @return the created ref.
*/
public static <E> Ref<E> createCommittedRef(Stm stm, E value) {
Transaction t = stm.startUpdateTransaction("createRef");
Ref<E> ref = new Ref<E>(t, value);
t.commit();
return ref;
}
public Ref() {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
((AlphaTransaction) tx).attachNew(new RefTranlocal(Ref.this));
}
public Ref(Transaction t) {
((AlphaTransaction) t).attachNew(new RefTranlocal(Ref.this));
}
public Ref(final E value) {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
((AlphaTransaction) tx).attachNew(new RefTranlocal(Ref.this, value));
}
public Ref(Transaction t, final E value) {
((AlphaTransaction) t).attachNew(new RefTranlocal(Ref.this, value));
}
public E get() {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this);
return tranlocalRef.get();
}
public E get(Transaction t) {
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this);
return tranlocalRef.get();
}
public E getOrAwait() {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this);
return tranlocalRef.getOrAwait();
}
public E getOrAwait(Transaction t) {
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this);
return tranlocalRef.getOrAwait();
}
public E set(final E newRef) {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this);
return tranlocalRef.set(newRef);
}
public E set(Transaction t, final E newRef) {
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this);
return tranlocalRef.set(newRef);
}
public boolean isNull() {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this);
return tranlocalRef.isNull();
}
public boolean isNull(Transaction t) {
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this);
return tranlocalRef.isNull();
}
public E clear() {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this);
return tranlocalRef.clear();
}
public E clear(Transaction t) {
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this);
return tranlocalRef.clear();
}
@Override
public String toString() {
Transaction tx = getThreadLocalTransaction();
if (tx == null) throw new NoTransactionInScopeException();
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) tx).privatize(Ref.this);
return tranlocalRef.toString();
}
public String toString(Transaction t) {
RefTranlocal<E> tranlocalRef = (RefTranlocal) ((AlphaTransaction) t).privatize(Ref.this);
return tranlocalRef.toString();
}
public RefTranlocal<E> privatize(long readVersion) {
RefTranlocal<E> origin = (RefTranlocal<E>) load(readVersion);
if (origin == null) {
throw new LoadUncommittedException();
}
return new RefTranlocal<E>(origin);
}
}
class RefTranlocal<E> extends AlphaTranlocal {
//field belonging to the stm.
Ref atomicObject;
RefTranlocal origin;
E ref;
RefTranlocal(RefTranlocal<E> origin) {
this.version = origin.version;
this.atomicObject = origin.atomicObject;
this.ref = origin.ref;
this.origin = origin;
}
RefTranlocal(Ref<E> owner) {
this(owner, null);
}
RefTranlocal(Ref<E> owner, E ref) {
this.version = Long.MIN_VALUE;
this.atomicObject = owner;
this.ref = ref;
}
@Override
public AlphaAtomicObject getAtomicObject() {
return atomicObject;
}
public E clear() {
E oldValue = ref;
ref = null;
return oldValue;
}
public boolean isNull() {
return ref == null;
}
public E get() {
return ref;
}
public E set(E newValue) {
if (committed) {
throw new ReadonlyException();
}
E oldValue = ref;
this.ref = newValue;
return oldValue;
}
public E getOrAwait() {
if (isNull()) {
retry();
}
return ref;
}
@Override
public String toString() {
if (ref == null) {
return "Ref(reference=null)";
} else {
return format("Ref(reference=%s)", ref);
}
}
@Override
public void prepareForCommit(long writeVersion) {
this.version = writeVersion;
this.committed = true;
this.origin = null;
}
@Override
public AlphaTranlocalSnapshot takeSnapshot() {
return new RefTranlocalSnapshot<E>(this);
}
@Override
public DirtinessStatus getDirtinessStatus() {
if (committed) {
return DirtinessStatus.committed;
} else if (origin == null) {
return DirtinessStatus.fresh;
} else if (origin.ref != this.ref) {
return DirtinessStatus.dirty;
} else {
return DirtinessStatus.clean;
}
}
}
class RefTranlocalSnapshot<E> extends AlphaTranlocalSnapshot {
final RefTranlocal tranlocal;
final E value;
RefTranlocalSnapshot(RefTranlocal<E> tranlocal) {
this.tranlocal = tranlocal;
this.value = tranlocal.ref;
}
@Override
public AlphaTranlocal getTranlocal() {
return tranlocal;
}
@Override
public void restore() {
tranlocal.ref = value;
}
}