/** * Copyright (C) 2009 Scalable Solutions. */ package se.scalablesolutions.akka.kernel import kernel.camel.{MessageDriven, ActiveObjectProducer} import config.ActiveObjectGuiceConfigurator import config.ScalaConfig._ import java.util.{List => JList, ArrayList} import java.lang.reflect.{Method, Field} import java.lang.annotation.Annotation import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} import org.codehaus.aspectwerkz.proxy.Proxy import org.apache.camel.{Processor, Exchange} import scala.collection.mutable.HashMap sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) object Annotations { import se.scalablesolutions.akka.annotation._ val transactional = classOf[transactional] val oneway = classOf[oneway] val immutable = classOf[immutable] val state = classOf[state] } /** * @author Jonas Bonér */ class ActiveObjectFactory { def newInstance[T](target: Class[T], server: GenericServerContainer): T = { ActiveObject.newInstance(target, server) } def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { ActiveObject.newInstance(intf, target, server) } def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = ActiveObject.supervise(restartStrategy, components) } /** * @author Jonas Bonér */ object ActiveObject { val AKKA_CAMEL_ROUTING_SCHEME = "akka" private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = { val tl = new ThreadLocal[Option[Transaction]] tl.set(None) tl } def newInstance[T](target: Class[T], server: GenericServerContainer): T = { val proxy = Proxy.newInstance(target, false, true) // FIXME switch to weaving in the aspect at compile time proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(target, proxy, server)) proxy.asInstanceOf[T] } def newInstance[T](intf: Class[T], target: AnyRef, server: GenericServerContainer): T = { val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new TransactionalAroundAdvice(intf, target, server)) proxy.asInstanceOf[T] } def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = { object factory extends SupervisorFactory { override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) } val supervisor = factory.newSupervisor supervisor ! se.scalablesolutions.akka.kernel.Start supervisor } } /** * @author Jonas Bonér */ // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts sealed class TransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, server: GenericServerContainer) extends AroundAdvice { private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) server.transactionalRefs = refs server.transactionalMaps = maps server.transactionalVectors = vectors import kernel.reactor._ private[this] var dispatcher = new ProxyMessageDispatcher private[this] var mailbox = dispatcher.messageQueue dispatcher.start import ActiveObject.threadBoundTx private[this] var activeTx: Option[Transaction] = None // FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint def invoke(joinpoint: JoinPoint): AnyRef = { val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val method = rtti.getMethod if (method.isAnnotationPresent(Annotations.transactional)) { tryToCommitTransaction startNewTransaction } joinExistingTransaction incrementTransaction val result: AnyRef = try { if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) else handleResult(sendAndReceiveEventually(joinpoint)) } finally { decrementTransaction if (isTransactionAborted) removeTransactionIfTopLevel else tryToPrecommitTransaction } result } private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement private def removeTransactionIfTopLevel = if (activeTx.isDefined && activeTx.get.topLevel_?) { activeTx = None threadBoundTx.set(None) } private def startNewTransaction = { val newTx = new Transaction newTx.begin(server) threadBoundTx.set(Some(newTx)) } private def joinExistingTransaction = { val cflowTx = threadBoundTx.get if (!activeTx.isDefined && cflowTx.isDefined) { val currentTx = cflowTx.get currentTx.join(server) activeTx = Some(currentTx) } activeTx = threadBoundTx.get } private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(server) private def tryToCommitTransaction = if (activeTx.isDefined) { val tx = activeTx.get tx.commit(server) removeTransactionIfTopLevel } private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { try { result() } catch { case e => rollback(result.tx) throw e } } private def rollback(tx: Option[Transaction]) = tx match { case None => {} // no tx; nothing to do case Some(tx) => tx.rollback(server) } private def sendOneWay(joinpoint: JoinPoint) = mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult)) private def sendAndReceiveEventually(joinpoint: JoinPoint): ResultOrFailure[AnyRef] = { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(Invocation(joinpoint, activeTx), 1000) future.await_? getResultOrThrowException(future) } private def postMessageToMailboxAndCreateFutureResultWithTimeout( message: AnyRef, timeout: Long): CompletableFutureResult = { val future = new DefaultCompletableFutureResult(timeout) mailbox.append(new MessageHandle(this, message, future)) future } private def getResultOrThrowException[T](future: FutureResult): ResultOrFailure[AnyRef] = if (future.exception.isDefined) { var resultOrFailure = ResultOrFailure(activeTx) resultOrFailure() = throw future.exception.get resultOrFailure } else ResultOrFailure(future.result.get, activeTx) /** * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top. */ private def getTransactionalItemsFor(targetInstance: AnyRef): Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = { require(targetInstance != null) var maps: List[TransactionalMap[_, _]] = Nil var refs: List[TransactionalRef[_]] = Nil var vectors: List[TransactionalVector[_]] = Nil def getTransactionalItemsFor(target: Class[_]): Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = { target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]].foreach(println) for { field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]] fieldType = field.getType if (fieldType == classOf[TransactionalMap[_, _]]) || (fieldType == classOf[TransactionalVector[_]]) || (fieldType == classOf[TransactionalRef[_]]) txItem = { field.setAccessible(true) field.get(targetInstance) } if txItem != null } { if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]] else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]] else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]] } val parent = target.getSuperclass if (parent == classOf[Object]) (maps, vectors, refs) else getTransactionalItemsFor(parent) } // start the search for transactional items, crawl the class hierarchy up until we reach 'null' getTransactionalItemsFor(targetInstance.getClass) } } /** * Generic GenericServer managing Invocation dispatch, transaction and error management. * * @author Jonas Bonér */ private[kernel] class Dispatcher(val targetName: String) extends GenericServer { override def body: PartialFunction[Any, Unit] = { case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) => ActiveObject.threadBoundTx.set(tx) try { reply(ResultOrFailure(joinpoint.proceed, tx)) } catch { case e => val resultOrFailure = ResultOrFailure(tx) resultOrFailure() = throw e reply(resultOrFailure) } case 'exit => exit /* case exchange: Exchange => println("=============> Exchange From Actor: " + exchange) val invocation = exchange.getIn.getBody.asInstanceOf[Invocation] invocation.invoke */ case unexpected => throw new ActiveObjectException("Unexpected message [" + unexpected + "] to [" + this + "] from [" + sender + "]") } override def toString(): String = "GenericServer[" + targetName + "]" } /** * Represents a snapshot of the current invocation. * * @author Jonas Bonér */ private[kernel] case class Invocation(val joinpoint: JoinPoint, val transaction: Option[Transaction]) { override def toString: String = synchronized { "Invocation [joinpoint: " + joinpoint.toString+ " | transaction: " + transaction.toString + "]" } override def hashCode(): Int = synchronized { var result = HashCode.SEED result = HashCode.hash(result, joinpoint) if (transaction.isDefined) result = HashCode.hash(result, transaction.get) result } override def equals(that: Any): Boolean = synchronized { that != null && that.isInstanceOf[Invocation] && that.asInstanceOf[Invocation].joinpoint == joinpoint && that.asInstanceOf[Invocation].transaction.getOrElse(false) == transaction.getOrElse(false) } } /* ublic class CamelInvocationHandler implements InvocationHandler { private final Endpoint endpoint; private final Producer producer; private final MethodInfoCache methodInfoCache; public CamelInvocationHandler(Endpoint endpoint, Producer producer, MethodInfoCache methodInfoCache) { this.endpoint = endpoint; this.producer = producer; this.methodInfoCache = methodInfoCache; } public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { BeanInvocation invocation = new BeanInvocation(method, args); ExchangePattern pattern = ExchangePattern.InOut; MethodInfo methodInfo = methodInfoCache.getMethodInfo(method); if (methodInfo != null) { pattern = methodInfo.getPattern(); } Exchange exchange = new DefaultExchange(endpoint, pattern); exchange.getIn().setBody(invocation); producer.process(exchange); Throwable fault = exchange.getException(); if (fault != null) { throw new InvocationTargetException(fault); } if (pattern.isOutCapable()) { return exchange.getOut().getBody(); } else { return null; } } } if (joinpoint.target.isInstanceOf[MessageDriven] && joinpoint.method.getName == "onMessage") { val m = joinpoint.method val endpointName = m.getDeclaringClass.getName + "." + m.getName val activeObjectName = m.getDeclaringClass.getName val endpoint = conf.getRoutingEndpoint(conf.lookupUriFor(m)) val producer = endpoint.createProducer val exchange = endpoint.createExchange exchange.getIn().setBody(joinpoint) producer.process(exchange) val fault = exchange.getException(); if (fault != null) throw new InvocationTargetException(fault) // FIXME: need some timeout and future here... exchange.getOut.getBody } else */