fixed bug STM bug, in-mem tests now pass
This commit is contained in:
parent
4ad378b5c4
commit
d470aee538
4 changed files with 354 additions and 334 deletions
|
|
@ -4,17 +4,17 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel
|
||||
|
||||
import kernel.camel.{MessageDriven, ActiveObjectProducer}
|
||||
import config.ActiveObjectGuiceConfigurator
|
||||
import config.ScalaConfig._
|
||||
|
||||
import java.util.{List => JList, ArrayList}
|
||||
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
|
||||
import java.lang.annotation.Annotation
|
||||
import kernel.camel.{MessageDriven, ActiveObjectProducer}
|
||||
|
||||
import org.apache.camel.{Processor, Exchange}
|
||||
|
||||
import scala.collection.mutable.HashMap
|
||||
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
|
||||
//import voldemort.versioning.Versioned
|
||||
|
||||
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
|
||||
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
|
||||
|
|
@ -104,26 +104,27 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
|
|||
|
||||
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
|
||||
if (m.isAnnotationPresent(Annotations.transactional)) {
|
||||
if (activeTx.isDefined) {
|
||||
val tx = activeTx.get
|
||||
//val cflowTx = threadBoundTx.get
|
||||
// if (cflowTx.isDefined && cflowTx.get != tx) {
|
||||
// new tx in scope; try to commit
|
||||
tx.commit(server)
|
||||
threadBoundTx.set(None)
|
||||
activeTx = None
|
||||
// }
|
||||
}
|
||||
// FIXME: check if we are already in a transaction if so NEST (set parent)
|
||||
val newTx = new Transaction
|
||||
newTx.begin(server)
|
||||
threadBoundTx.set(Some(newTx))
|
||||
}
|
||||
|
||||
val cflowTx = threadBoundTx.get
|
||||
activeTx match {
|
||||
case Some(tx) =>
|
||||
if (cflowTx.isDefined && cflowTx.get != tx) {
|
||||
// new tx in scope; try to commit
|
||||
tx.commit(server)
|
||||
threadBoundTx.set(None)
|
||||
activeTx = None
|
||||
}
|
||||
case None =>
|
||||
if (cflowTx.isDefined) {
|
||||
val currentTx = cflowTx.get
|
||||
currentTx.join(server)
|
||||
activeTx = Some(currentTx)
|
||||
}
|
||||
if (!activeTx.isDefined && cflowTx.isDefined) {
|
||||
val currentTx = cflowTx.get
|
||||
currentTx.join(server)
|
||||
activeTx = Some(currentTx)
|
||||
}
|
||||
activeTx = threadBoundTx.get
|
||||
invoke(Invocation(m, args, targetInstance, activeTx))
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue