/** * Copyright (C) 2009 Scalable Solutions. */ package se.scalablesolutions.akka.kernel import java.util.{List => JList, ArrayList} import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException} import java.lang.annotation.Annotation //import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory} //import voldemort.versioning.Versioned sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) /** * @author Jonas Bonér */ class ActiveObjectFactory { def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = ActiveObject.newInstance(intf, proxy) def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor = ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]]) } /** * @author Jonas Bonér */ object ActiveObject { private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = { val tl = new ThreadLocal[Option[Transaction]] tl.set(None) tl } def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = { Proxy.newProxyInstance( intf.getClassLoader, Array(intf), proxy).asInstanceOf[T] } def newInstance[T](intf: Class[_], target: AnyRef, timeout: Int): T = { val proxy = new ActiveObjectProxy(intf, target.getClass, timeout) proxy.setTargetInstance(target) supervise(proxy) newInstance(intf, proxy) } def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = { object factory extends SupervisorFactory { override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) } val supervisor = factory.newSupervisor supervisor ! se.scalablesolutions.akka.kernel.Start supervisor } private def supervise(proxy: ActiveObjectProxy): Supervisor = supervise( RestartStrategy(OneForOne, 5, 1000), Worker( proxy.server, LifeCycle(Permanent, 100)) :: Nil) } /** * @author Jonas Bonér */ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler { val oneway = classOf[se.scalablesolutions.akka.annotation.oneway] private[this] var activeTx: Option[Transaction] = None private var targetInstance: AnyRef = _ private[kernel] def setTargetInstance(instance: AnyRef) = targetInstance = instance private[this] val dispatcher = new GenericServer { override def body: PartialFunction[Any, Unit] = { case invocation: Invocation => val tx = invocation.tx try { reply(ErrRef(invocation.invoke, tx)) } catch { case e: InvocationTargetException => val te = e.getTargetException te.printStackTrace reply(ErrRef({ throw te }, tx)) case e => e.printStackTrace reply(ErrRef({ throw e }, tx)) } case 'exit => exit; reply() case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected) } } private[kernel] val server = new GenericServerContainer(target.getName, () => dispatcher) server.setTimeout(timeout) def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = { val cflowTx = ActiveObject.threadBoundTx.get activeTx.get.asInstanceOf[Option[Transaction]] match { case Some(tx) => if (cflowTx.isDefined && cflowTx.get != tx) { // new tx in scope; try to commit tx.commit(server) activeTx = None } case None => if (cflowTx.isDefined) activeTx = Some(cflowTx.get) } invoke(Invocation(m, args, targetInstance, activeTx)) } private def invoke(invocation: Invocation): AnyRef = { val result: AnyRef = if (invocation.method.isAnnotationPresent(oneway)) server ! invocation else { val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({ throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds") }, activeTx)) try { result() } catch { case e => result.tx match { case None => // no tx; nothing to do case Some(tx) => tx.rollback(server) ActiveObject.threadBoundTx.set(Some(tx)) } throw e } } if (activeTx.isDefined) activeTx.get.precommit(server) result } } /** * Represents a snapshot of the current invocation. * * @author Jonas Bonér */ case class Invocation(val method: Method, val args: Array[Object], val target: AnyRef, val tx: Option[Transaction]) { method.setAccessible(true) def invoke: AnyRef = method.invoke(target, args:_*) override def toString: String = "Invocation [method: " + method.getName + ", args: " + argsToString(args) + ", target: " + target + "]" override def hashCode(): Int = { var result = HashCode.SEED result = HashCode.hash(result, method) result = HashCode.hash(result, args) result = HashCode.hash(result, target) result } override def equals(that: Any): Boolean = { that != null && that.isInstanceOf[Invocation] && that.asInstanceOf[Invocation].method == method && that.asInstanceOf[Invocation].target == target && isEqual(that.asInstanceOf[Invocation].args, args) } private def isEqual(a1: Array[Object], a2: Array[Object]): Boolean = (a1 == null && a2 == null) || (a1 != null && a2 != null && a1.size == a2.size && a1.zip(a2).find(t => t._1 == t._2).isDefined) private def argsToString(array: Array[Object]): String = synchronized { array.foldLeft("(")(_ + " " + _) + ")" } }