fixed final issues with AW proxy integration and remaining tests

This commit is contained in:
Jonas Boner 2009-05-23 22:24:02 +02:00
parent e0591005ed
commit bbec315eb2
29 changed files with 933 additions and 866 deletions

View file

@ -10,10 +10,11 @@ import config.ScalaConfig._
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field}
import java.lang.annotation.Annotation
import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice}
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import java.lang.annotation.Annotation
import org.apache.camel.{Processor, Exchange}
@ -39,11 +40,11 @@ class ActiveObjectFactory {
}
def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = {
ActiveObject.newInstance(intf, target, server)
ActiveObject.newInstance(intf, target, server)
}
def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]])
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components)
}
/**
@ -61,13 +62,13 @@ object ActiveObject {
def newInstance[T](target: Class[T], server: GenericServerContainer): T = {
val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new ActorAroundAdvice(target, proxy, server))
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(target, proxy, server))
proxy.asInstanceOf[T]
}
def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = {
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new ActorAroundAdvice(intf, target, server))
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(intf, target, server))
proxy.asInstanceOf[T]
}
@ -79,16 +80,6 @@ object ActiveObject {
supervisor ! se.scalablesolutions.akka.kernel.Start
supervisor
}
/*
private def supervise(proxy: AnyRef): Supervisor =
supervise(
RestartStrategy(OneForOne, 5, 1000),
Worker(
proxy.server,
LifeCycle(Permanent, 100))
:: Nil)
*/
}
/**
@ -96,30 +87,27 @@ object ActiveObject {
*/
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
sealed class ActorAroundAdvice(target: Class[_],
targetInstance: AnyRef,
val server: GenericServerContainer) extends AroundAdvice {
val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
sealed class TransactionalAroundAdvice(target: Class[_],
targetInstance: AnyRef,
server: GenericServerContainer) extends AroundAdvice {
val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
server.transactionalRefs = refs
server.transactionalMaps = maps
server.transactionalVectors = vectors
import ActiveObject.threadBoundTx
private[this] var activeTx: Option[Transaction] = None
def invoke(joinpoint: JoinPoint): AnyRef = {
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
val method = joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val method = rtti.getMethod
if (method.isAnnotationPresent(Annotations.transactional)) {
if (activeTx.isDefined) {
val tx = activeTx.get
//val cflowTx = threadBoundTx.get
// if (cflowTx.isDefined && cflowTx.get != tx) {
// new tx in scope; try to commit
tx.commit(server)
threadBoundTx.set(None)
activeTx = None
// }
}
// FIXME: check if we are already in a transaction if so NEST (set parent)
val newTx = new Transaction
@ -134,37 +122,14 @@ sealed class ActorAroundAdvice(target: Class[_],
activeTx = Some(currentTx)
}
activeTx = threadBoundTx.get
invoke(joinpoint, activeTx)
//invoke(Invocation(method, joinpoint.getRtti.asInstanceOf[MethodRtti].getParameterValues, targetInstance, activeTx))
}
private def invoke(joinpoint: JoinPoint, tx: Option[Transaction]): AnyRef = {
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
val result: AnyRef =
/*
if (joinpoint.target.isInstanceOf[MessageDriven] &&
joinpoint.method.getName == "onMessage") {
val m = joinpoint.method
val endpointName = m.getDeclaringClass.getName + "." + m.getName
val activeObjectName = m.getDeclaringClass.getName
val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m))
val producer = endpoint.createProducer
val exchange = endpoint.createExchange
exchange.getIn().setBody(joinpoint)
producer.process(exchange)
val fault = exchange.getException();
if (fault != null) throw new InvocationTargetException(fault)
// FIXME: need some timeout and future here...
exchange.getOut.getBody
} else */
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
if (joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod.isAnnotationPresent(Annotations.oneway)) {
server ! (tx, joinpoint)
if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) {
server ! (activeTx, joinpoint)
} else {
val result: ErrRef[AnyRef] =
server !!! ((tx, joinpoint), {
server !!! ((activeTx, joinpoint), {
var ref = ErrRef(activeTx)
ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + server.timeout + " milliseconds")
ref
@ -172,8 +137,7 @@ sealed class ActorAroundAdvice(target: Class[_],
try {
result()
} catch {
case e =>
println("$$$$$$$$$$$$$$ " + joinpoint)
case e =>
rollback(result.tx)
throw e
}
@ -190,29 +154,41 @@ sealed class ActorAroundAdvice(target: Class[_],
threadBoundTx.set(Some(tx))
}
private def getTransactionalItemsFor(targetInstance: AnyRef):
/**
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
*/
private def getTransactionalItemsFor(targetInstance: AnyRef):
Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
require(targetInstance != null)
var maps: List[TransactionalMap[_, _]] = Nil
var vectors: List[TransactionalVector[_]] = Nil
var refs: List[TransactionalRef[_]] = Nil
var maps: List[TransactionalMap[_, _]] = Nil
var refs: List[TransactionalRef[_]] = Nil
var vectors: List[TransactionalVector[_]] = Nil
def getTransactionalItemsFor(target: Class[_]):
Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
for {
field <- targetInstance.getClass.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]]
fieldType = field.getType
if fieldType == classOf[TransactionalMap[_, _]] ||
fieldType == classOf[TransactionalVector[_]] ||
fieldType == classOf[TransactionalRef[_]]
txItem = {
field.setAccessible(true)
field.get(targetInstance)
field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]]
fieldType = field.getType
if fieldType == classOf[TransactionalMap[_, _]] ||
fieldType == classOf[TransactionalVector[_]] ||
fieldType == classOf[TransactionalRef[_]]
txItem = {
field.setAccessible(true)
field.get(targetInstance)
}
if txItem != null
} {
if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
}
if txItem != null
} {
if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
val parent = target.getSuperclass
if (parent == null) (maps, vectors, refs)
else getTransactionalItemsFor(parent)
}
(maps, vectors, refs)
// start the search for transactional items, crawl the class hierarchy up until we reach 'null'
getTransactionalItemsFor(targetInstance.getClass)
}
}
@ -244,60 +220,10 @@ private[kernel] class Dispatcher(val targetName: String) extends GenericServer {
case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] to [" + this + "] from [" + sender + "]")
}
override def toString(): String = "GenericServer[" + targetName + "]"
}
/**
* Represents a snapshot of the current invocation.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[kernel] case class Invocation(val method: Method,
val args: Array[AnyRef],
val target: AnyRef,
val tx: Option[Transaction]) {
method.setAccessible(true)
def invoke: AnyRef = synchronized {
println("======== " + this.toString)
if (method.getDeclaringClass.isInterface) {
target.getClass.getDeclaredMethod(method.getName, method.getParameterTypes).invoke(target, args:_*)
} else method.invoke(target, args:_*)
}
override def toString: String = synchronized {
"Invocation [method: " + method.getName + ", args: " + argsToString(args) + ", target: " + target + "]"
}
override def hashCode(): Int = synchronized {
var result = HashCode.SEED
result = HashCode.hash(result, method)
result = HashCode.hash(result, args)
result = HashCode.hash(result, target)
result
}
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[Invocation] &&
that.asInstanceOf[Invocation].method == method &&
that.asInstanceOf[Invocation].target == target &&
isEqual(that.asInstanceOf[Invocation].args, args)
}
private[this] def isEqual(a1: Array[Object], a2: Array[Object]): Boolean =
(a1 == null && a2 == null) ||
(a1 != null &&
a2 != null &&
a1.size == a2.size &&
a1.zip(a2).find(t => t._1 == t._2).isDefined)
private[this] def argsToString(array: Array[Object]): String =
array.foldLeft("(")(_ + " " + _) + ")"
}
/*
ublic class CamelInvocationHandler implements InvocationHandler {
private final Endpoint endpoint;
@ -332,4 +258,24 @@ ublic class CamelInvocationHandler implements InvocationHandler {
}
}
}
if (joinpoint.target.isInstanceOf[MessageDriven] &&
joinpoint.method.getName == "onMessage") {
val m = joinpoint.method
val endpointName = m.getDeclaringClass.getName + "." + m.getName
val activeObjectName = m.getDeclaringClass.getName
val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m))
val producer = endpoint.createProducer
val exchange = endpoint.createExchange
exchange.getIn().setBody(joinpoint)
producer.process(exchange)
val fault = exchange.getException();
if (fault != null) throw new InvocationTargetException(fault)
// FIXME: need some timeout and future here...
exchange.getOut.getBody
} else
*/