added prerestart and postrestart annotations and hooks into the supervisor fault handler for active objects

This commit is contained in:
Jonas Boner 2009-07-02 18:07:29 +02:00
parent 45bd6ebe5c
commit 5c99b4ed8d
11 changed files with 646 additions and 509 deletions

948
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -2,6 +2,8 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.state; import se.scalablesolutions.akka.annotation.state;
import se.scalablesolutions.akka.annotation.transactional; import se.scalablesolutions.akka.annotation.transactional;
import se.scalablesolutions.akka.annotation.prerestart;
import se.scalablesolutions.akka.annotation.postrestart;
import se.scalablesolutions.akka.kernel.state.*; import se.scalablesolutions.akka.kernel.state.*;
public class InMemStateful { public class InMemStateful {
@ -77,6 +79,16 @@ public class InMemStateful {
setMapState(key, msg); setMapState(key, msg);
} }
@prerestart
public void preRestart() {
System.out.println("################ PRE RESTART");
}
@postrestart
public void postRestart() {
System.out.println("################ POST RESTART");
}
/* /*
public void clashOk(String key, String msg, InMemClasher clasher) { public void clashOk(String key, String msg, InMemClasher clasher) {
mapState.put(key, msg); mapState.put(key, msg);

View file

@ -4,10 +4,11 @@
package se.scalablesolutions.akka.kernel.actor package se.scalablesolutions.akka.kernel.actor
import java.lang.reflect.Method
import java.net.InetSocketAddress import java.net.InetSocketAddress
import kernel.config.ScalaConfig._ import kernel.config.ScalaConfig._
import kernel.nio.{RemoteRequest, RemoteClient} import kernel.nio.{RemoteRequest, RemoteClient}
import kernel.reactor.FutureResult import kernel.reactor.{MessageDispatcher, FutureResult}
import kernel.util.HashCode import kernel.util.HashCode
import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice}
@ -22,7 +23,8 @@ object Annotations {
val transactional = classOf[transactional] val transactional = classOf[transactional]
val oneway = classOf[oneway] val oneway = classOf[oneway]
val immutable = classOf[immutable] val immutable = classOf[immutable]
val state = classOf[state] val prerestart = classOf[prerestart]
val postrestart = classOf[postrestart]
} }
/** /**
@ -32,7 +34,7 @@ object Annotations {
*/ */
class ActiveObjectFactory { class ActiveObjectFactory {
// FIXME add versions with a MessageDispatcher -- How to pass the current on??????? // FIXME How to pass the MessageDispatcher on from active object to child???????
// FIXME call backs to @prerestart @postrestart methods // FIXME call backs to @prerestart @postrestart methods
@ -43,16 +45,51 @@ class ActiveObjectFactory {
// FIXME Configgy for config // FIXME Configgy for config
def newInstance[T](target: Class[T], timeout: Long): T = def newInstance[T](target: Class[T], timeout: Long): T =
ActiveObject.newInstance(target, new Dispatcher(target.getName), None, timeout) ActiveObject.newInstance(target, new Dispatcher, None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), None, timeout) ActiveObject.newInstance(intf, target, new Dispatcher, None, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
ActiveObject.newInstance(target, new Dispatcher(target.getName), Some(new InetSocketAddress(hostname, port)), timeout) ActiveObject.newInstance(target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), Some(new InetSocketAddress(hostname, port)), timeout) ActiveObject.newInstance(intf, target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout)
def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
ActiveObject.newInstance(target, actor, None, timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
ActiveObject.newInstance(intf, target, actor, None, timeout)
}
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
private[kernel] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
ActiveObject.newInstance(target, actor, remoteAddress, timeout)
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout)
}
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components)
/* /*
def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = { def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = {
@ -65,17 +102,6 @@ class ActiveObjectFactory {
ActiveObject.newInstance(intf, target, actor) ActiveObject.newInstance(intf, target, actor)
} }
*/ */
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
ActiveObject.newInstance(target, actor, remoteAddress, timeout)
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout)
}
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components)
} }
/** /**
@ -88,29 +114,55 @@ object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka" val AKKA_CAMEL_ROUTING_SCHEME = "akka"
def newInstance[T](target: Class[T], timeout: Long): T = def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(target.getName), None, timeout) newInstance(target, new Dispatcher, None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, new Dispatcher(intf.getName), None, timeout) newInstance(intf, target, new Dispatcher, None, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T = def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(target.getName), Some(new InetSocketAddress(hostname, port)), timeout) newInstance(target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T = def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
newInstance(intf, target, new Dispatcher(intf.getName), Some(new InetSocketAddress(hostname, port)), timeout) newInstance(intf, target, new Dispatcher, Some(new InetSocketAddress(hostname, port)), timeout)
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
newInstance(target, actor, None, timeout)
}
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
newInstance(intf, target, actor, None, timeout)
}
def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
val actor = new Dispatcher
actor.dispatcher = dispatcher
newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
}
private[kernel] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(target, false, true) val proxy = Proxy.newInstance(target, false, true)
actor.initialize(target, proxy)
// FIXME switch to weaving in the aspect at compile time // FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice( proxy.asInstanceOf[Advisable].aw_addAdvice(
MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remoteAddress, timeout)) MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remoteAddress, timeout))
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = { private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get) if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
actor.initialize(target.getClass, target)
proxy.asInstanceOf[Advisable].aw_addAdvice( proxy.asInstanceOf[Advisable].aw_addAdvice(
MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remoteAddress, timeout)) MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remoteAddress, timeout))
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
@ -132,7 +184,7 @@ object ActiveObject {
@serializable @serializable
sealed class ActorAroundAdvice(val target: Class[_], sealed class ActorAroundAdvice(val target: Class[_],
val targetInstance: AnyRef, val targetInstance: AnyRef,
val actor: Actor, val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress], val remoteAddress: Option[InetSocketAddress],
val timeout: Long) extends AroundAdvice { val timeout: Long) extends AroundAdvice {
val id = target.getName val id = target.getName
@ -213,10 +265,30 @@ sealed class ActorAroundAdvice(val target: Class[_],
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
private[kernel] class Dispatcher(val targetName: String) extends Actor { private[kernel] class Dispatcher extends Actor {
private val ZERO_ITEM_CLASS_ARRAY = Array[Class[_]]()
makeTransactional makeTransactional
// FIXME implement the pre/post restart methods and call annotated methods on the POJO private[actor] var target: Option[AnyRef] = None
private var preRestart: Option[Method] = None
private var postRestart: Option[Method] = None
private[actor] def initialize(targetClass: Class[_], targetInstance: AnyRef) = {
id = targetClass.getName
target = Some(targetInstance)
val methods = targetInstance.getClass.getDeclaredMethods.toList
preRestart = methods.find( m => m.isAnnotationPresent(Annotations.prerestart) && m.getName.startsWith("aw$original"))
if (preRestart.isDefined) preRestart.get.setAccessible(true)
if (preRestart.isDefined && preRestart.get.getParameterTypes.length != 0)
throw new IllegalStateException("Method annotated with @prerestart in [" + targetClass.getName + "] must have a zero argument definition")
postRestart = methods.find( m => m.isAnnotationPresent(Annotations.postrestart) && m.getName.startsWith("aw$original"))
if (postRestart.isDefined) postRestart.get.setAccessible(true)
if (postRestart.isDefined && postRestart.get.getParameterTypes.length != 0)
throw new IllegalStateException("Method annotated with @postrestart in [" + targetClass.getName + "] must have a zero argument definition")
}
override def receive: PartialFunction[Any, Unit] = { override def receive: PartialFunction[Any, Unit] = {
case Invocation(joinpoint: JoinPoint) => case Invocation(joinpoint: JoinPoint) =>
@ -224,6 +296,14 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
case unexpected => case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]") throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
} }
override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {
if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY)
}
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY)
}
} }
/* /*

View file

@ -35,9 +35,6 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler {
} }
trait Actor extends Logging with TransactionManagement { trait Actor extends Logging with TransactionManagement {
val id: String = this.getClass.toString
val uuid = Uuid.newUuid.toString
@volatile private[this] var isRunning: Boolean = false @volatile private[this] var isRunning: Boolean = false
private[this] val remoteFlagLock = new ReadWriteLock private[this] val remoteFlagLock = new ReadWriteLock
private[this] val transactionalFlagLock = new ReadWriteLock private[this] val transactionalFlagLock = new ReadWriteLock
@ -91,6 +88,13 @@ trait Actor extends Logging with TransactionManagement {
dispatcher dispatcher
} }
/**
* User overridable callback/setting.
*
* Identifier for actor, does not have to be a unique one. Simply the one used in logging etc.
*/
protected[this] var id: String = this.getClass.toString
/** /**
* User overridable callback/setting. * User overridable callback/setting.
* *

View file

@ -94,7 +94,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newSubclassingProxy(component: Component): DependencyBinding = { private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target val targetClass = component.target
val actor = new Dispatcher(targetClass.getName) val actor = new Dispatcher
actor.start actor.start
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress = val remoteAddress =
@ -110,7 +110,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
val targetClass = component.intf.get val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]]()).setAccessible(true) component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
val actor = new Dispatcher(targetClass.getName) val actor = new Dispatcher
actor.start actor.start
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress = val remoteAddress =

View file

@ -59,7 +59,7 @@ object TransactionIdFactory {
def begin(participant: String) = synchronized { def begin(participant: String) = synchronized {
ensureIsActiveOrNew ensureIsActiveOrNew
if (status == TransactionStatus.New) log.debug("TX BEGIN - Server [%s] is starting NEW transaction [%s]", participant, toString) if (status == TransactionStatus.New) log.debug("TX BEGIN - Server with UUID [%s] is starting NEW transaction [%s]", participant, toString)
else log.debug("Server [%s] is participating in transaction", participant) else log.debug("Server [%s] is participating in transaction", participant)
participants ::= participant participants ::= participant
status = TransactionStatus.Active status = TransactionStatus.Active
@ -67,14 +67,14 @@ object TransactionIdFactory {
def precommit(participant: String) = synchronized { def precommit(participant: String) = synchronized {
if (status == TransactionStatus.Active) { if (status == TransactionStatus.Active) {
log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server [%s]", toString, participant) log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server with UUID [%s]", toString, participant)
precommitted ::= participant precommitted ::= participant
} }
} }
def commit(participant: String) = synchronized { def commit(participant: String) = synchronized {
if (status == TransactionStatus.Active) { if (status == TransactionStatus.Active) {
log.debug("TX COMMIT - Committing transaction [%s] for server [%s]", toString, participant) log.debug("TX COMMIT - Committing transaction [%s] for server with UUID [%s]", toString, participant)
val haveAllPreCommitted = val haveAllPreCommitted =
if (participants.size == precommitted.size) {{ if (participants.size == precommitted.size) {{
for (part <- participants) yield { for (part <- participants) yield {
@ -92,7 +92,7 @@ object TransactionIdFactory {
def rollback(participant: String) = synchronized { def rollback(participant: String) = synchronized {
ensureIsActiveOrAborted ensureIsActiveOrAborted
log.debug("TX ROLLBACK - Server [%s] has initiated transaction rollback for [%s]", participant, toString) log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
transactionals.items.foreach(_.rollback) transactionals.items.foreach(_.rollback)
status = TransactionStatus.Aborted status = TransactionStatus.Aborted
reset reset
@ -100,7 +100,7 @@ object TransactionIdFactory {
def join(participant: String) = synchronized { def join(participant: String) = synchronized {
ensureIsActive ensureIsActive
log.debug("TX JOIN - Server [%s] is joining transaction [%s]" , participant, toString) log.debug("TX JOIN - Server with UUID [%s] is joining transaction [%s]" , participant, toString)
participants ::= participant participants ::= participant
} }

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.kernel.stm
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kernel.util.Logging import kernel.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) { class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]" override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
@ -25,14 +26,14 @@ object TransactionManagement {
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
trait TransactionManagement extends Logging { trait TransactionManagement extends Logging {
val id: String val uuid = Uuid.newUuid.toString
import TransactionManagement.threadBoundTx import TransactionManagement.threadBoundTx
private[kernel] var activeTx: Option[Transaction] = None private[kernel] var activeTx: Option[Transaction] = None
protected def startNewTransaction = { protected def startNewTransaction = {
val newTx = new Transaction val newTx = new Transaction
newTx.begin(id) newTx.begin(uuid)
val tx = Some(newTx) val tx = Some(newTx)
activeTx = tx activeTx = tx
threadBoundTx.set(tx) threadBoundTx.set(tx)
@ -42,16 +43,16 @@ trait TransactionManagement extends Logging {
val cflowTx = threadBoundTx.get val cflowTx = threadBoundTx.get
if (!activeTx.isDefined && cflowTx.isDefined) { if (!activeTx.isDefined && cflowTx.isDefined) {
val currentTx = cflowTx.get val currentTx = cflowTx.get
currentTx.join(id) currentTx.join(uuid)
activeTx = Some(currentTx) activeTx = Some(currentTx)
} }
} }
protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(id) protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(uuid)
protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) { protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
val tx = activeTx.get val tx = activeTx.get
tx.commit(id) tx.commit(uuid)
removeTransactionIfTopLevel removeTransactionIfTopLevel
true true
} else false } else false
@ -59,7 +60,7 @@ trait TransactionManagement extends Logging {
protected def rollback(tx: Option[Transaction]) = tx match { protected def rollback(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do case None => {} // no tx; nothing to do
case Some(tx) => case Some(tx) =>
tx.rollback(id) tx.rollback(uuid)
} }
protected def isInExistingTransaction = protected def isInExistingTransaction =

Binary file not shown.

Binary file not shown.

View file

@ -0,0 +1,11 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface postrestart {}

View file

@ -0,0 +1,11 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface prerestart {}