2009-02-17 21:05:07 +01:00
/* *
* Copyright ( C ) 2009 Scalable Solutions .
*/
2009-03-23 19:17:49 +01:00
package se.scalablesolutions.akka.kernel
2009-02-17 21:05:07 +01:00
2009-05-13 08:58:50 +02:00
import kernel.camel. { MessageDriven , ActiveObjectProducer }
2009-05-09 19:55:42 +02:00
import config.ActiveObjectGuiceConfigurator
import config.ScalaConfig._
2009-02-17 21:05:07 +01:00
import java.util. { List => JList , ArrayList }
import java.lang.reflect. { Method , Field , InvocationHandler , Proxy , InvocationTargetException }
import java.lang.annotation.Annotation
2009-05-13 08:58:50 +02:00
2009-05-09 19:55:42 +02:00
import org.apache.camel. { Processor , Exchange }
2009-05-13 08:58:50 +02:00
2009-05-09 19:55:42 +02:00
import scala.collection.mutable.HashMap
2009-03-15 08:35:37 +01:00
2009-02-17 21:05:07 +01:00
sealed class ActiveObjectException ( msg : String ) extends RuntimeException ( msg )
class ActiveObjectInvocationTimeoutException ( msg : String ) extends ActiveObjectException ( msg )
2009-04-27 19:55:57 +02:00
object Annotations {
2009-04-09 15:49:42 +02:00
import se.scalablesolutions.akka.annotation._
val transactional = classOf [ transactional ]
val oneway = classOf [ oneway ]
val immutable = classOf [ immutable ]
val state = classOf [ state ]
}
2009-02-17 21:05:07 +01:00
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
class ActiveObjectFactory {
def newInstance [ T ] ( intf : Class [ _ ] , proxy : ActiveObjectProxy ) : T = ActiveObject . newInstance ( intf , proxy )
def supervise ( restartStrategy : RestartStrategy , components : JList [ Worker ] ) : Supervisor =
ActiveObject . supervise ( restartStrategy , components . toArray . toList . asInstanceOf [ List [ Worker ] ] )
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
object ActiveObject {
2009-05-09 19:55:42 +02:00
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private [ kernel ] val threadBoundTx : ThreadLocal [ Option [ Transaction ] ] = {
val tl = new ThreadLocal [ Option [ Transaction ] ]
tl . set ( None )
tl
}
def newInstance [ T ] ( intf : Class [ _ ] , proxy : ActiveObjectProxy ) : T = {
Proxy . newProxyInstance (
intf . getClassLoader ,
Array ( intf ) ,
proxy ) . asInstanceOf [ T ]
}
2009-05-09 20:40:36 +02:00
def newInstance [ T ] ( intf : Class [ _ ] , target : AnyRef , timeout : Int ) : T = {
val proxy = new ActiveObjectProxy ( intf , target . getClass , timeout )
2009-02-19 15:57:59 +01:00
proxy . setTargetInstance ( target )
supervise ( proxy )
newInstance ( intf , proxy )
}
2009-02-17 21:05:07 +01:00
def supervise ( restartStrategy : RestartStrategy , components : List [ Worker ] ) : Supervisor = {
2009-03-23 19:17:49 +01:00
object factory extends SupervisorFactory {
2009-02-17 21:05:07 +01:00
override def getSupervisorConfig = SupervisorConfig ( restartStrategy , components )
}
val supervisor = factory . newSupervisor
2009-03-23 19:17:49 +01:00
supervisor ! se . scalablesolutions . akka . kernel . Start
2009-02-17 21:05:07 +01:00
supervisor
}
2009-02-19 15:57:59 +01:00
private def supervise ( proxy : ActiveObjectProxy ) : Supervisor =
supervise (
RestartStrategy ( OneForOne , 5 , 1000 ) ,
Worker (
proxy . server ,
LifeCycle ( Permanent , 100 ) )
: : Nil )
2009-02-17 21:05:07 +01:00
}
/* *
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-05-09 19:55:42 +02:00
// FIXME: use interface for ActiveObjectGuiceConfigurator
2009-05-09 20:40:36 +02:00
class ActiveObjectProxy ( val intf : Class [ _ ] , val target : Class [ _ ] , val timeout : Int ) extends InvocationHandler {
2009-04-09 15:49:42 +02:00
import ActiveObject.threadBoundTx
2009-05-09 19:55:42 +02:00
2009-03-23 19:17:49 +01:00
private [ this ] var activeTx : Option [ Transaction ] = None
2009-05-09 19:55:42 +02:00
private [ this ] var targetInstance : AnyRef = _
2009-03-23 19:17:49 +01:00
2009-05-09 19:55:42 +02:00
private [ akka ] def setTargetInstance ( instance : AnyRef ) = {
2009-04-04 21:34:10 +02:00
targetInstance = instance
2009-04-27 19:55:57 +02:00
val ( maps , vectors , refs ) = getTransactionalItemsFor ( targetInstance )
2009-05-09 19:55:42 +02:00
server . transactionalRefs = refs
2009-04-27 19:55:57 +02:00
server . transactionalMaps = maps
server . transactionalVectors = vectors
2009-04-04 21:34:10 +02:00
}
2009-02-17 21:05:07 +01:00
2009-05-09 19:55:42 +02:00
private [ akka ] val server = new GenericServerContainer ( intf . getName , ( ) => new Dispatcher ( target . getName ) )
2009-02-17 21:05:07 +01:00
server . setTimeout ( timeout )
2009-04-04 21:34:10 +02:00
2009-03-23 19:17:49 +01:00
def invoke ( proxy : AnyRef , m : Method , args : Array [ AnyRef ] ) : AnyRef = {
2009-04-27 19:55:57 +02:00
if ( m . isAnnotationPresent ( Annotations . transactional ) ) {
2009-05-13 08:58:50 +02:00
if ( activeTx . isDefined ) {
val tx = activeTx . get
//val cflowTx = threadBoundTx.get
// if (cflowTx.isDefined && cflowTx.get != tx) {
// new tx in scope; try to commit
tx . commit ( server )
threadBoundTx . set ( None )
activeTx = None
// }
}
2009-04-09 15:49:42 +02:00
// FIXME: check if we are already in a transaction if so NEST (set parent)
2009-03-26 20:22:49 +01:00
val newTx = new Transaction
newTx . begin ( server )
2009-04-09 15:49:42 +02:00
threadBoundTx . set ( Some ( newTx ) )
2009-03-26 20:22:49 +01:00
}
2009-05-13 08:58:50 +02:00
2009-04-09 15:49:42 +02:00
val cflowTx = threadBoundTx . get
2009-05-13 08:58:50 +02:00
if ( ! activeTx . isDefined && cflowTx . isDefined ) {
val currentTx = cflowTx . get
currentTx . join ( server )
activeTx = Some ( currentTx )
2009-03-23 19:17:49 +01:00
}
2009-04-09 15:49:42 +02:00
activeTx = threadBoundTx . get
2009-03-23 19:17:49 +01:00
invoke ( Invocation ( m , args , targetInstance , activeTx ) )
}
2009-02-17 21:05:07 +01:00
2009-03-23 19:17:49 +01:00
private def invoke ( invocation : Invocation ) : AnyRef = {
2009-05-09 19:55:42 +02:00
val result : AnyRef =
2009-05-09 20:40:36 +02:00
/*
2009-05-09 19:55:42 +02:00
if ( invocation . target . isInstanceOf [ MessageDriven ] &&
invocation . method . getName == "onMessage" ) {
val m = invocation . method
2009-05-09 20:40:36 +02:00
val endpointName = m . getDeclaringClass . getName + "." + m . getName
2009-05-09 19:55:42 +02:00
val activeObjectName = m . getDeclaringClass . getName
val endpoint = conf . getRoutingEndpoint ( conf . lookupUriFor ( m ) )
val producer = endpoint . createProducer
val exchange = endpoint . createExchange
exchange . getIn ( ) . setBody ( invocation )
producer . process ( exchange )
val fault = exchange . getException ( ) ;
if ( fault != null ) throw new InvocationTargetException ( fault )
// FIXME: need some timeout and future here...
exchange . getOut . getBody
2009-05-09 20:40:36 +02:00
} else */
if ( invocation . method . isAnnotationPresent ( Annotations . oneway ) ) {
2009-05-09 19:55:42 +02:00
server ! invocation
} else {
2009-03-26 20:22:49 +01:00
val result : ErrRef [ AnyRef ] =
server !!! ( invocation , {
var ref = ErrRef ( activeTx )
ref ( ) = throw new ActiveObjectInvocationTimeoutException ( "Invocation to active object [" + targetInstance . getClass . getName + "] timed out after " + timeout + " milliseconds" )
ref
} )
2009-03-23 19:17:49 +01:00
try {
result ( )
} catch {
2009-03-26 20:22:49 +01:00
case e =>
rollback ( result . tx )
2009-03-23 19:17:49 +01:00
throw e
}
2009-03-15 08:35:37 +01:00
}
2009-04-09 15:49:42 +02:00
// FIXME: clear threadBoundTx on successful commit
2009-03-23 19:17:49 +01:00
if ( activeTx . isDefined ) activeTx . get . precommit ( server )
2009-03-15 08:35:37 +01:00
result
2009-02-17 21:05:07 +01:00
}
2009-03-26 20:22:49 +01:00
private def rollback ( tx : Option [ Transaction ] ) = tx match {
case None => { } // no tx; nothing to do
case Some ( tx ) =>
tx . rollback ( server )
2009-04-09 15:49:42 +02:00
threadBoundTx . set ( Some ( tx ) )
2009-03-26 20:22:49 +01:00
}
2009-04-04 21:34:10 +02:00
2009-04-27 19:55:57 +02:00
private def getTransactionalItemsFor ( targetInstance : AnyRef ) :
Tuple3 [ List [ TransactionalMap [ _ , _ ] ] , List [ TransactionalVector [ _ ] ] , List [ TransactionalRef [ _ ] ] ] = {
2009-04-04 21:34:10 +02:00
require ( targetInstance != null )
2009-04-27 19:55:57 +02:00
var maps : List [ TransactionalMap [ _ , _ ] ] = Nil
var vectors : List [ TransactionalVector [ _ ] ] = Nil
var refs : List [ TransactionalRef [ _ ] ] = Nil
for {
field <- target . getDeclaredFields . toArray . toList . asInstanceOf [ List [ Field ] ]
fieldType = field . getType
if fieldType == classOf [ TransactionalMap [ _ , _ ] ] ||
fieldType == classOf [ TransactionalVector [ _ ] ] ||
fieldType == classOf [ TransactionalRef [ _ ] ]
txItem = {
2009-04-09 15:49:42 +02:00
field . setAccessible ( true )
field . get ( targetInstance )
}
2009-04-27 19:55:57 +02:00
if txItem != null
} {
if ( txItem . isInstanceOf [ TransactionalMap [ _ , _ ] ] ) maps : := txItem.asInstanceOf [ TransactionalMap [ _ , _ ] ]
else if ( txItem . isInstanceOf [ TransactionalRef [ _ ] ] ) refs : := txItem.asInstanceOf [ TransactionalRef [ _ ] ]
2009-05-09 19:55:42 +02:00
else if ( txItem . isInstanceOf [ TransactionalVector [ _ ] ] ) vectors : := txItem.asInstanceOf [ TransactionalVector [ _ ] ]
2009-04-06 19:29:35 +02:00
}
2009-04-27 19:55:57 +02:00
( maps , vectors , refs )
2009-04-04 21:34:10 +02:00
}
2009-02-17 21:05:07 +01:00
}
2009-04-09 15:49:42 +02:00
/* *
* Generic GenericServer managing Invocation dispatch , transaction and error management .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
private [ kernel ] class Dispatcher ( val targetName : String ) extends GenericServer {
override def body : PartialFunction [ Any , Unit ] = {
2009-05-09 19:55:42 +02:00
2009-04-09 15:49:42 +02:00
case invocation : Invocation =>
val tx = invocation . tx
ActiveObject . threadBoundTx . set ( tx )
try {
reply ( ErrRef ( invocation . invoke , tx ) )
} catch {
case e : InvocationTargetException =>
val ref = ErrRef ( tx ) ; ref ( ) = throw e . getTargetException ; reply ( ref )
case e =>
val ref = ErrRef ( tx ) ; ref ( ) = throw e ; reply ( ref )
}
2009-05-09 19:55:42 +02:00
2009-04-09 15:49:42 +02:00
case 'exit =>
exit ; reply ( )
2009-05-09 19:55:42 +02:00
2009-05-09 20:40:36 +02:00
/* case exchange: Exchange =>
2009-05-09 19:55:42 +02:00
println ( "=============> Exchange From Actor: " + exchange )
val invocation = exchange . getIn . getBody . asInstanceOf [ Invocation ]
invocation . invoke
2009-05-09 20:40:36 +02:00
*/
2009-04-09 15:49:42 +02:00
case unexpected =>
throw new ActiveObjectException ( "Unexpected message [" + unexpected + "] to [" + this + "] from [" + sender + "]" )
}
override def toString ( ) : String = "GenericServer[" + targetName + "]"
}
2009-02-17 21:05:07 +01:00
/* *
* Represents a snapshot of the current invocation .
*
* @author < a href = "http://jonasboner.com" > Jonas Bon & # 233 ; r </ a >
*/
2009-04-09 15:49:42 +02:00
private [ kernel ] case class Invocation ( val method : Method ,
val args : Array [ AnyRef ] ,
val target : AnyRef ,
val tx : Option [ Transaction ] ) {
2009-03-12 21:18:53 +01:00
method . setAccessible ( true )
2009-02-17 21:05:07 +01:00
2009-04-09 15:49:42 +02:00
def invoke : AnyRef = synchronized {
method . invoke ( target , args : _ * )
}
2009-02-19 15:57:59 +01:00
2009-04-09 15:49:42 +02:00
override def toString : String = synchronized {
2009-03-12 21:18:53 +01:00
"Invocation [method: " + method . getName + ", args: " + argsToString ( args ) + ", target: " + target + "]"
2009-04-09 15:49:42 +02:00
}
override def hashCode ( ) : Int = synchronized {
2009-02-17 21:05:07 +01:00
var result = HashCode . SEED
result = HashCode . hash ( result , method )
result = HashCode . hash ( result , args )
result = HashCode . hash ( result , target )
result
}
2009-04-09 15:49:42 +02:00
override def equals ( that : Any ) : Boolean = synchronized {
2009-02-17 21:05:07 +01:00
that != null &&
that . isInstanceOf [ Invocation ] &&
that . asInstanceOf [ Invocation ] . method == method &&
2009-02-19 15:57:59 +01:00
that . asInstanceOf [ Invocation ] . target == target &&
isEqual ( that . asInstanceOf [ Invocation ] . args , args )
2009-02-17 21:05:07 +01:00
}
2009-04-09 15:49:42 +02:00
private [ this ] def isEqual ( a1 : Array [ Object ] , a2 : Array [ Object ] ) : Boolean =
2009-02-19 15:57:59 +01:00
( a1 == null && a2 == null ) ||
2009-03-10 00:56:42 +01:00
( a1 != null &&
a2 != null &&
a1 . size == a2 . size &&
a1 . zip ( a2 ) . find ( t => t . _1 == t . _2 ) . isDefined )
2009-02-19 15:57:59 +01:00
2009-04-09 15:49:42 +02:00
private [ this ] def argsToString ( array : Array [ Object ] ) : String =
2009-03-12 21:18:53 +01:00
array . foldLeft ( "(" ) ( _ + " " + _ ) + ")"
2009-02-19 15:57:59 +01:00
}
2009-05-09 19:55:42 +02:00
/*
ublic class CamelInvocationHandler implements InvocationHandler {
private final Endpoint endpoint ;
private final Producer producer ;
private final MethodInfoCache methodInfoCache ;
public CamelInvocationHandler ( Endpoint endpoint , Producer producer , MethodInfoCache methodInfoCache ) {
this . endpoint = endpoint ;
this . producer = producer ;
this . methodInfoCache = methodInfoCache ;
}
public Object invoke ( Object proxy , Method method , Object [ ] args ) throws Throwable {
BeanInvocation invocation = new BeanInvocation ( method , args ) ;
ExchangePattern pattern = ExchangePattern . InOut ;
MethodInfo methodInfo = methodInfoCache . getMethodInfo ( method ) ;
if ( methodInfo != null ) {
pattern = methodInfo . getPattern ( ) ;
}
Exchange exchange = new DefaultExchange ( endpoint , pattern ) ;
exchange . getIn ( ) . setBody ( invocation ) ;
producer . process ( exchange ) ;
Throwable fault = exchange . getException ( ) ;
if ( fault != null ) {
throw new InvocationTargetException ( fault ) ;
}
if ( pattern . isOutCapable ( ) ) {
return exchange . getOut ( ) . getBody ( ) ;
} else {
return null ;
}
}
}
*/