pekko/kernel/src/main/scala/ActiveObject.scala

228 lines
7.6 KiB
Scala
Raw Normal View History

2009-02-17 21:05:07 +01:00
/**
* Copyright (C) 2009 Scalable Solutions.
*/
2009-03-23 19:17:49 +01:00
package se.scalablesolutions.akka.kernel
2009-02-17 21:05:07 +01:00
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
import java.lang.annotation.Annotation
2009-03-22 17:26:42 +01:00
//import voldemort.client.{SocketStoreClientFactory, StoreClient, StoreClientFactory}
//import voldemort.versioning.Versioned
2009-03-15 08:35:37 +01:00
2009-02-17 21:05:07 +01:00
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectFactory {
def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = ActiveObject.newInstance(intf, proxy)
def supervise(restartStrategy: RestartStrategy, components: JList[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components.toArray.toList.asInstanceOf[List[Worker]])
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActiveObject {
2009-03-23 19:17:49 +01:00
private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]]
tl.set(None)
tl
}
2009-02-17 21:05:07 +01:00
def newInstance[T](intf: Class[_], proxy: ActiveObjectProxy): T = {
Proxy.newProxyInstance(
intf.getClassLoader,
Array(intf),
proxy).asInstanceOf[T]
}
def newInstance[T](intf: Class[_], target: AnyRef, timeout: Int): T = {
val proxy = new ActiveObjectProxy(intf, target.getClass, timeout)
proxy.setTargetInstance(target)
supervise(proxy)
newInstance(intf, proxy)
}
2009-02-17 21:05:07 +01:00
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
2009-03-23 19:17:49 +01:00
object factory extends SupervisorFactory {
2009-02-17 21:05:07 +01:00
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
}
val supervisor = factory.newSupervisor
2009-03-23 19:17:49 +01:00
supervisor ! se.scalablesolutions.akka.kernel.Start
2009-02-17 21:05:07 +01:00
supervisor
}
private def supervise(proxy: ActiveObjectProxy): Supervisor =
supervise(
RestartStrategy(OneForOne, 5, 1000),
Worker(
proxy.server,
LifeCycle(Permanent, 100))
:: Nil)
2009-02-17 21:05:07 +01:00
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
val transactional = classOf[se.scalablesolutions.akka.annotation.transactional]
val oneway = classOf[se.scalablesolutions.akka.annotation.oneway]
val immutable = classOf[se.scalablesolutions.akka.annotation.immutable]
val state= classOf[se.scalablesolutions.akka.annotation.state]
2009-03-23 19:17:49 +01:00
private[this] var activeTx: Option[Transaction] = None
2009-02-17 21:05:07 +01:00
private var targetInstance: AnyRef = _
private[kernel] def setTargetInstance(instance: AnyRef) = {
targetInstance = instance
getStateList(targetInstance) match {
case Nil => {}
case states => server.states = states
}
}
2009-02-17 21:05:07 +01:00
2009-03-23 19:17:49 +01:00
private[this] val dispatcher = new GenericServer {
2009-02-17 21:05:07 +01:00
override def body: PartialFunction[Any, Unit] = {
case invocation: Invocation =>
2009-03-23 19:17:49 +01:00
val tx = invocation.tx
ActiveObject.threadBoundTx.set(tx)
2009-02-17 21:05:07 +01:00
try {
2009-03-23 19:17:49 +01:00
reply(ErrRef(invocation.invoke, tx))
2009-02-17 21:05:07 +01:00
} catch {
case e: InvocationTargetException =>
val te = e.getTargetException
te.printStackTrace
2009-03-23 19:17:49 +01:00
reply(ErrRef({ throw te }, tx))
case e =>
e.printStackTrace
2009-03-23 19:17:49 +01:00
reply(ErrRef({ throw e }, tx))
2009-02-17 21:05:07 +01:00
}
case 'exit => exit; reply()
case unexpected => throw new ActiveObjectException("Unexpected message to actor proxy: " + unexpected)
}
}
private[kernel] val server = new GenericServerContainer(target.getName, () => dispatcher)
2009-02-17 21:05:07 +01:00
server.setTimeout(timeout)
2009-03-23 19:17:49 +01:00
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
if (m.isAnnotationPresent(transactional)) {
val newTx = new Transaction
newTx.begin(server)
ActiveObject.threadBoundTx.set(Some(newTx))
}
2009-03-23 19:17:49 +01:00
val cflowTx = ActiveObject.threadBoundTx.get
// println("========== invoking: " + m.getName)
// println("========== cflowTx: " + cflowTx)
// println("========== activeTx: " + activeTx)
activeTx match {
2009-03-23 19:17:49 +01:00
case Some(tx) =>
if (cflowTx.isDefined && cflowTx.get != tx) {
// new tx in scope; try to commit
tx.commit(server)
activeTx = None
}
case None =>
if (cflowTx.isDefined) activeTx = Some(cflowTx.get)
}
activeTx = ActiveObject.threadBoundTx.get
2009-03-23 19:17:49 +01:00
invoke(Invocation(m, args, targetInstance, activeTx))
}
2009-02-17 21:05:07 +01:00
2009-03-23 19:17:49 +01:00
private def invoke(invocation: Invocation): AnyRef = {
2009-03-15 08:35:37 +01:00
val result: AnyRef =
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
else {
val result: ErrRef[AnyRef] =
server !!! (invocation, {
var ref = ErrRef(activeTx)
ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds")
ref
})
2009-03-23 19:17:49 +01:00
try {
result()
} catch {
case e =>
rollback(result.tx)
2009-03-23 19:17:49 +01:00
throw e
}
2009-03-15 08:35:37 +01:00
}
2009-03-23 19:17:49 +01:00
if (activeTx.isDefined) activeTx.get.precommit(server)
2009-03-15 08:35:37 +01:00
result
2009-02-17 21:05:07 +01:00
}
private def rollback(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do
case Some(tx) =>
println("================ ROLLING BACK")
tx.rollback(server)
ActiveObject.threadBoundTx.set(Some(tx))
}
private def getStateList(targetInstance: AnyRef): List[State[_,_]] = {
require(targetInstance != null)
import se.scalablesolutions.akka.kernel.configuration.ConfigurationException
val states = for {
field <- target.getDeclaredFields
if field.isAnnotationPresent(state)
state = field.get(targetInstance)
if state != null
} yield {
if (!state.isInstanceOf[State[_, _]]) throw new ConfigurationException("Fields annotated with [@state] needs to to be a subtype of [se.scalablesolutions.akka.kernel.State[K, V]]")
state
}
states
// if (fields.size > 1) throw new ConfigurationException("Stateful active object can only have one single field '@Inject TransientObjectState state' defined")
}
2009-02-17 21:05:07 +01:00
}
/**
* Represents a snapshot of the current invocation.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
2009-03-23 19:17:49 +01:00
case class Invocation(val method: Method,
val args: Array[AnyRef],
2009-03-23 19:17:49 +01:00
val target: AnyRef,
val tx: Option[Transaction]) {
2009-03-12 21:18:53 +01:00
method.setAccessible(true)
2009-02-17 21:05:07 +01:00
def invoke: AnyRef = method.invoke(target, args:_*)
2009-03-12 21:18:53 +01:00
override def toString: String =
"Invocation [method: " + method.getName + ", args: " + argsToString(args) + ", target: " + target + "]"
2009-02-17 21:05:07 +01:00
override def hashCode(): Int = {
var result = HashCode.SEED
result = HashCode.hash(result, method)
result = HashCode.hash(result, args)
result = HashCode.hash(result, target)
result
}
override def equals(that: Any): Boolean = {
that != null &&
that.isInstanceOf[Invocation] &&
that.asInstanceOf[Invocation].method == method &&
that.asInstanceOf[Invocation].target == target &&
isEqual(that.asInstanceOf[Invocation].args, args)
2009-02-17 21:05:07 +01:00
}
private def isEqual(a1: Array[Object], a2: Array[Object]): Boolean =
(a1 == null && a2 == null) ||
2009-03-10 00:56:42 +01:00
(a1 != null &&
a2 != null &&
a1.size == a2.size &&
a1.zip(a2).find(t => t._1 == t._2).isDefined)
2009-03-23 19:17:49 +01:00
private def argsToString(array: Array[Object]): String = synchronized {
2009-03-12 21:18:53 +01:00
array.foldLeft("(")(_ + " " + _) + ")"
}
}