mid camel impl

This commit is contained in:
Jonas Boner 2009-05-09 19:55:42 +02:00
parent 46ede93684
commit b1d91818ef
27 changed files with 560 additions and 513 deletions

View file

@ -4,10 +4,15 @@
package se.scalablesolutions.akka.kernel
import config.ActiveObjectGuiceConfigurator
import config.ScalaConfig._
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
import java.lang.annotation.Annotation
import kernel.camel.{MessageDriven, ActiveObjectProducer}
import org.apache.camel.{Processor, Exchange}
import scala.collection.mutable.HashMap
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
//import voldemort.versioning.Versioned
@ -36,21 +41,23 @@ class ActiveObjectFactory {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActiveObject {
private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]]
tl.set(None)
tl
}
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = {
Proxy.newProxyInstance(
intf.getClassLoader,
Array(intf),
proxy).asInstanceOf[T]
}
private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]]
tl.set(None)
tl
}
def newInstance[T](intf: Class[_], target: AnyRef, timeout: Int): T = {
val proxy = new ActiveObjectProxy(intf, target.getClass, timeout)
def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = {
Proxy.newProxyInstance(
intf.getClassLoader,
Array(intf),
proxy).asInstanceOf[T]
}
def newInstance[T](intf: Class[_], target: AnyRef, timeout: Int, conf: ActiveObjectGuiceConfigurator): T = {
val proxy = new ActiveObjectProxy(intf, target.getClass, timeout, conf)
proxy.setTargetInstance(target)
supervise(proxy)
newInstance(intf, proxy)
@ -77,20 +84,22 @@ object ActiveObject {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
// FIXME: use interface for ActiveObjectGuiceConfigurator
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int, conf: ActiveObjectGuiceConfigurator) extends InvocationHandler {
import ActiveObject.threadBoundTx
private[this] var activeTx: Option[Transaction] = None
private var targetInstance: AnyRef = _
private[kernel] def setTargetInstance(instance: AnyRef) = {
private[this] var activeTx: Option[Transaction] = None
private[this] var targetInstance: AnyRef = _
private[akka] def setTargetInstance(instance: AnyRef) = {
targetInstance = instance
val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
server.transactionalRefs = refs
server.transactionalMaps = maps
server.transactionalVectors = vectors
server.transactionalRefs = refs
}
private[kernel] val server = new GenericServerContainer(target.getName, () => new Dispatcher(target.getName))
private[akka] val server = new GenericServerContainer(intf.getName, () => new Dispatcher(target.getName))
server.setTimeout(timeout)
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
@ -121,9 +130,26 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
}
private def invoke(invocation: Invocation): AnyRef = {
val result: AnyRef =
if (invocation.method.isAnnotationPresent(Annotations.oneway)) server ! invocation
else {
val result: AnyRef =
if (invocation.target.isInstanceOf[MessageDriven] &&
invocation.method.getName == "onMessage") {
val m = invocation.method
val endpointName = m.getDeclaringClass.getName + "." + m.getName
val activeObjectName = m.getDeclaringClass.getName
val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m))
val producer = endpoint.createProducer
val exchange = endpoint.createExchange
exchange.getIn().setBody(invocation)
producer.process(exchange)
val fault = exchange.getException();
if (fault != null) throw new InvocationTargetException(fault)
// FIXME: need some timeout and future here...
exchange.getOut.getBody
} else if (invocation.method.isAnnotationPresent(Annotations.oneway)) {
server ! invocation
} else {
val result: ErrRef[AnyRef] =
server !!! (invocation, {
var ref = ErrRef(activeTx)
@ -169,8 +195,8 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
if txItem != null
} {
if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
}
(maps, vectors, refs)
}
@ -183,6 +209,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
*/
private[kernel] class Dispatcher(val targetName: String) extends GenericServer {
override def body: PartialFunction[Any, Unit] = {
case invocation: Invocation =>
val tx = invocation.tx
ActiveObject.threadBoundTx.set(tx)
@ -194,8 +221,15 @@ private[kernel] class Dispatcher(val targetName: String) extends GenericServer {
case e =>
val ref = ErrRef(tx); ref() = throw e; reply(ref)
}
case 'exit =>
exit; reply()
case exchange: Exchange =>
println("=============> Exchange From Actor: " + exchange)
val invocation = exchange.getIn.getBody.asInstanceOf[Invocation]
invocation.invoke
case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] to [" + this + "] from [" + sender + "]")
}
@ -248,3 +282,40 @@ private[kernel] case class Invocation(val method: Method,
private[this] def argsToString(array: Array[Object]): String =
array.foldLeft("(")(_ + " " + _) + ")"
}
/*
ublic class CamelInvocationHandler implements InvocationHandler {
private final Endpoint endpoint;
private final Producer producer;
private final MethodInfoCache methodInfoCache;
public CamelInvocationHandler(Endpoint endpoint, Producer producer, MethodInfoCache methodInfoCache) {
this.endpoint = endpoint;
this.producer = producer;
this.methodInfoCache = methodInfoCache;
}
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {
BeanInvocation invocation = new BeanInvocation(method, args);
ExchangePattern pattern = ExchangePattern.InOut;
MethodInfo methodInfo = methodInfoCache.getMethodInfo(method);
if (methodInfo != null) {
pattern = methodInfo.getPattern();
}
Exchange exchange = new DefaultExchange(endpoint, pattern);
exchange.getIn().setBody(invocation);
producer.process(exchange);
Throwable fault = exchange.getException();
if (fault != null) {
throw new InvocationTargetException(fault);
}
if (pattern.isOutCapable()) {
return exchange.getOut().getBody();
} else {
return null;
}
}
}
*/