rewrote the state management, tx system still to rewrite
This commit is contained in:
parent
3e703a53ab
commit
8586110449
17 changed files with 2511 additions and 2450 deletions
|
|
@ -71,16 +71,19 @@ object ActiveObject {
|
|||
*/
|
||||
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
|
||||
val transactional = classOf[se.scalablesolutions.akka.annotation.transactional]
|
||||
val oneway = classOf[se.scalablesolutions.akka.annotation.oneway]
|
||||
val immutable = classOf[se.scalablesolutions.akka.annotation.immutable]
|
||||
val stateful= classOf[se.scalablesolutions.akka.annotation.stateful]
|
||||
val oneway = classOf[se.scalablesolutions.akka.annotation.oneway]
|
||||
val immutable = classOf[se.scalablesolutions.akka.annotation.immutable]
|
||||
val state= classOf[se.scalablesolutions.akka.annotation.state]
|
||||
|
||||
private[this] var activeTx: Option[Transaction] = None
|
||||
|
||||
private var targetInstance: AnyRef = _
|
||||
private[kernel] def setTargetInstance(instance: AnyRef) = {
|
||||
targetInstance = instance
|
||||
if (server.state.isDefined) injectState(server.state.get, targetInstance)
|
||||
getStateList(targetInstance) match {
|
||||
case Nil => {}
|
||||
case states => server.states = states
|
||||
}
|
||||
}
|
||||
|
||||
private[this] val dispatcher = new GenericServer {
|
||||
|
|
@ -104,9 +107,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
|
|||
}
|
||||
}
|
||||
|
||||
private[kernel] val server =
|
||||
if (target.isAnnotationPresent(stateful)) new GenericServerContainer(target.getName, () => dispatcher, Some(new TransientObjectState))
|
||||
else new GenericServerContainer(target.getName, () => dispatcher, None)
|
||||
private[kernel] val server = new GenericServerContainer(target.getName, () => dispatcher)
|
||||
server.setTimeout(timeout)
|
||||
|
||||
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
|
||||
|
|
@ -164,19 +165,20 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
|
|||
ActiveObject.threadBoundTx.set(Some(tx))
|
||||
}
|
||||
|
||||
private def injectState(state: TransientObjectState, targetInstance: AnyRef) = {
|
||||
require(state != null)
|
||||
private def getStateList(targetInstance: AnyRef): List[State[_,_]] = {
|
||||
require(targetInstance != null)
|
||||
import se.scalablesolutions.akka.kernel.configuration.ConfigurationException
|
||||
val fields = for {
|
||||
val states = for {
|
||||
field <- target.getDeclaredFields
|
||||
if field.getType == classOf[TransientObjectState]
|
||||
} yield field
|
||||
if (fields.size == 0) throw new ConfigurationException("Stateful active object needs to have a field '@Inject TransientObjectState state' defined")
|
||||
if (fields.size > 1) throw new ConfigurationException("Stateful active object can only have one single field '@Inject TransientObjectState state' defined")
|
||||
val field = fields(0)
|
||||
field.setAccessible(true)
|
||||
field.set(targetInstance, state)
|
||||
if field.isAnnotationPresent(state)
|
||||
state = field.get(targetInstance)
|
||||
if state != null
|
||||
} yield {
|
||||
if (!state.isInstanceOf[State[_, _]]) throw new ConfigurationException("Fields annotated with [@state] needs to to be a subtype of [se.scalablesolutions.akka.kernel.State[K, V]]")
|
||||
state
|
||||
}
|
||||
states
|
||||
// if (fields.size > 1) throw new ConfigurationException("Stateful active object can only have one single field '@Inject TransientObjectState state' defined")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue