cleanup and refactoring of active object code
This commit is contained in:
parent
bbec315eb2
commit
8bc40775b3
3 changed files with 313 additions and 122 deletions
|
|
@ -90,7 +90,7 @@ object ActiveObject {
|
|||
sealed class TransactionalAroundAdvice(target: Class[_],
|
||||
targetInstance: AnyRef,
|
||||
server: GenericServerContainer) extends AroundAdvice {
|
||||
val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
|
||||
private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
|
||||
server.transactionalRefs = refs
|
||||
server.transactionalMaps = maps
|
||||
server.transactionalVectors = vectors
|
||||
|
|
@ -98,53 +98,59 @@ sealed class TransactionalAroundAdvice(target: Class[_],
|
|||
import ActiveObject.threadBoundTx
|
||||
private[this] var activeTx: Option[Transaction] = None
|
||||
|
||||
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
|
||||
def invoke(joinpoint: JoinPoint): AnyRef = {
|
||||
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
|
||||
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
|
||||
val method = rtti.getMethod
|
||||
if (method.isAnnotationPresent(Annotations.transactional)) {
|
||||
if (activeTx.isDefined) {
|
||||
val tx = activeTx.get
|
||||
tx.commit(server)
|
||||
threadBoundTx.set(None)
|
||||
activeTx = None
|
||||
}
|
||||
// FIXME: check if we are already in a transaction if so NEST (set parent)
|
||||
val newTx = new Transaction
|
||||
newTx.begin(server)
|
||||
threadBoundTx.set(Some(newTx))
|
||||
}
|
||||
|
||||
if (method.isAnnotationPresent(Annotations.transactional)) {
|
||||
tryToCommitTransaction
|
||||
startNewTransaction
|
||||
}
|
||||
joinExistingTransaction
|
||||
|
||||
val result: AnyRef = if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint)
|
||||
else handleResult(sendAndReceiveEventually(joinpoint))
|
||||
tryToPrecommitTransaction
|
||||
result
|
||||
}
|
||||
|
||||
private def startNewTransaction = {
|
||||
val newTx = new Transaction
|
||||
newTx.begin(server)
|
||||
threadBoundTx.set(Some(newTx))
|
||||
}
|
||||
|
||||
private def joinExistingTransaction = {
|
||||
val cflowTx = threadBoundTx.get
|
||||
if (!activeTx.isDefined && cflowTx.isDefined) {
|
||||
if (!activeTx.isDefined && cflowTx.isDefined) {
|
||||
val currentTx = cflowTx.get
|
||||
currentTx.join(server)
|
||||
activeTx = Some(currentTx)
|
||||
}
|
||||
activeTx = threadBoundTx.get
|
||||
}
|
||||
|
||||
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
|
||||
val result: AnyRef =
|
||||
if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) {
|
||||
server ! (activeTx, joinpoint)
|
||||
} else {
|
||||
val result: ErrRef[AnyRef] =
|
||||
server !!! ((activeTx, joinpoint), {
|
||||
var ref = ErrRef(activeTx)
|
||||
ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds")
|
||||
ref
|
||||
})
|
||||
try {
|
||||
result()
|
||||
} catch {
|
||||
case e =>
|
||||
rollback(result.tx)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
private def tryToPrecommitTransaction = {
|
||||
// FIXME: clear threadBoundTx on successful commit
|
||||
if (activeTx.isDefined) activeTx.get.precommit(server)
|
||||
result
|
||||
}
|
||||
|
||||
private def tryToCommitTransaction = if (activeTx.isDefined) {
|
||||
val tx = activeTx.get
|
||||
tx.commit(server)
|
||||
threadBoundTx.set(None)
|
||||
activeTx = None
|
||||
}
|
||||
|
||||
private def handleResult(result: ErrRef[AnyRef]): AnyRef = {
|
||||
try {
|
||||
result()
|
||||
} catch {
|
||||
case e =>
|
||||
rollback(result.tx)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
|
||||
private def rollback(tx: Option[Transaction]) = tx match {
|
||||
|
|
@ -154,6 +160,16 @@ sealed class TransactionalAroundAdvice(target: Class[_],
|
|||
threadBoundTx.set(Some(tx))
|
||||
}
|
||||
|
||||
private def sendOneWay(joinpoint: JoinPoint) = server ! (activeTx, joinpoint)
|
||||
|
||||
private def sendAndReceiveEventually(joinpoint: JoinPoint): ErrRef[AnyRef] = {
|
||||
server !!! ((activeTx, joinpoint), {
|
||||
var ref = ErrRef(activeTx)
|
||||
ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds")
|
||||
ref
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue