completed new actor impl with supervision and STM

This commit is contained in:
Jonas Boner 2009-06-22 13:13:58 +02:00
parent de846d4555
commit 93f712effe
10 changed files with 528 additions and 472 deletions

View file

@ -140,7 +140,7 @@
<option name="MAXIMUM_HEAP_SIZE" value="128" />
</component>
<component name="Encoding" useUTFGuessing="true" native2AsciiForPropertiesFiles="false">
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java" charset="windows-1252" />
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java" charset="MacRoman" />
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java" charset="windows-1252" />
<file url="file://$PROJECT_DIR$/kernel/src/main/scala/Kernel.scala" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/kernel/src/main/scala/collection/HashTrie.scala" charset="windows-1252" />

794
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -9,26 +9,32 @@ public class InMemStateful {
@state private TransactionalVector<String> vectorState = new InMemoryTransactionalVector<String>();
@state private TransactionalRef<String> refState = new TransactionalRef<String>();
@transactional
public String getMapState(String key) {
return (String)mapState.get(key).get();
}
@transactional
public String getVectorState() {
return (String)vectorState.last();
}
@transactional
public String getRefState() {
return (String)refState.get().get();
}
@transactional
public void setMapState(String key, String msg) {
mapState.put(key, msg);
}
@transactional
public void setVectorState(String msg) {
vectorState.add(msg);
}
@transactional
public void setRefState(String msg) {
refState.swap(msg);
}

View file

@ -33,7 +33,6 @@ public class InMemoryStateTest extends TestCase {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}

View file

@ -41,12 +41,12 @@ object Annotations {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectFactory {
def newInstance[T](target: Class[T]): T = {
ActiveObject.newInstance(target)
def newInstance[T](target: Class[T], actor: Actor): T = {
ActiveObject.newInstance(target, actor)
}
def newInstance[T](intf: Class[T], target: AnyRef): T = {
ActiveObject.newInstance(intf, target)
def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor): T = {
ActiveObject.newInstance(intf, target, actor)
}
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
@ -67,16 +67,16 @@ object ActiveObject {
tl
}
def newInstance[T](target: Class[T]): T = {
def newInstance[T](target: Class[T], actor: Actor): T = {
val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy))
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy, actor))
proxy.asInstanceOf[T]
}
def newInstance[T](intf: Class[T], target: AnyRef): T = {
def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor): T = {
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target))
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target, actor))
proxy.asInstanceOf[T]
}
@ -95,7 +95,7 @@ object ActiveObject {
*/
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef) extends AroundAdvice {
sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, actor: Actor) extends AroundAdvice {
private val changeSet = new ChangeSet(target.getName)
private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
@ -115,15 +115,24 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
def invoke(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val method = rtti.getMethod
if (reenteringExistingTransaction) {
tryToCommitTransaction
if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
if (isInExistingTransaction) {
joinExistingTransaction
} else {
if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
}
val result: AnyRef = try {
incrementTransaction
if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) // FIXME put in 2 different aspects
else handleResult(sendAndReceiveEventually(joinpoint))
// if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) // FIXME put in 2 different aspects
// else handleResult(sendAndReceiveEventually(joinpoint))
val result = actor !! Invocation(joinpoint, activeTx)
val resultOrFailure =
if (result.isDefined) result.get.asInstanceOf[ResultOrFailure[AnyRef]]
else throw new ActiveObjectInvocationTimeoutException("TIMED OUT")
handleResult(resultOrFailure)
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
@ -132,19 +141,12 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
result
}
private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
private def removeTransactionIfTopLevel =
if (activeTx.isDefined && activeTx.get.topLevel_?) {
activeTx = None
threadBoundTx.set(None)
}
private def startNewTransaction = {
val newTx = new Transaction
newTx.begin(changeSet)
threadBoundTx.set(Some(newTx))
val tx = Some(newTx)
activeTx = tx
threadBoundTx.set(tx)
}
private def joinExistingTransaction = {
@ -158,17 +160,12 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet)
private def reenteringExistingTransaction= if (activeTx.isDefined) {
val cflowTx = threadBoundTx.get
if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
else true
} else true
private def tryToCommitTransaction = if (activeTx.isDefined) {
private def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
val tx = activeTx.get
tx.commit(changeSet)
removeTransactionIfTopLevel
}
true
} else false
private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = {
try {
@ -186,6 +183,26 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
tx.rollback(changeSet)
}
private def isInExistingTransaction = ActiveObject.threadBoundTx.get.isDefined
private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
private def removeTransactionIfTopLevel =
if (activeTx.isDefined && activeTx.get.topLevel_?) {
activeTx = None
threadBoundTx.set(None)
}
private def reenteringExistingTransaction= if (activeTx.isDefined) {
val cflowTx = threadBoundTx.get
if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
else true
} else true
private def sendOneWay(joinpoint: JoinPoint) =
mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult))
@ -195,8 +212,7 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc
getResultOrThrowException(future)
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: AnyRef, timeout: Long): CompletableFutureResult = {
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future))
future
@ -310,6 +326,30 @@ class ChangeSet(val id: String) {
}
}
/**
* Generic Actor managing Invocation dispatch, transaction and error management.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[kernel] class Dispatcher(val targetName: String) extends Actor {
id = targetName
override def receive: PartialFunction[Any, Unit] = {
case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
ActiveObject.threadBoundTx.set(tx)
try {
reply(ResultOrFailure(joinpoint.proceed, tx))
} catch {
case e =>
val resultOrFailure = ResultOrFailure(tx)
resultOrFailure() = throw e
reply(resultOrFailure)
}
case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
}
/*
ublic class CamelInvocationHandler implements InvocationHandler {
@ -365,32 +405,3 @@ ublic class CamelInvocationHandler implements InvocationHandler {
} else
*/
/**
* Generic GenericServer managing Invocation dispatch, transaction and error management.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
private[kernel] class Dispatcher(val targetName: String) extends Actor {
override def receive: PartialFunction[Any, Unit] = {
case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
ActiveObject.threadBoundTx.set(tx)
try {
reply(ResultOrFailure(joinpoint.proceed, tx))
} catch {
case e =>
val resultOrFailure = ResultOrFailure(tx)
resultOrFailure() = throw e
reply(resultOrFailure)
}
case 'exit =>
exit
case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
override def toString(): String = "Actor[" + targetName + "]"
}

View file

@ -167,6 +167,8 @@ trait Actor extends Logging {
dispatcher = new EventBasedThreadPoolDispatcher
case DispatcherType.ThreadBasedDispatcher =>
dispatcher = new ThreadBasedDispatcher
case DispatcherType.EventBasedThreadPooledProxyInvokingDispatcher =>
dispatcher = new ProxyMessageDispatcher
}
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageHandler(this))

View file

@ -87,6 +87,7 @@ abstract class SupervisorFactory extends Logging {
}
}
// FIXME remove Supervisor - all Actors can be supervisors - move SupervisorFactory config into actor
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/

View file

@ -8,10 +8,12 @@ import com.google.inject._
import com.google.inject.jsr250.ResourceProviderFactory
import ScalaConfig._
import kernel.actor.{Supervisor, ActiveObjectFactory}
import kernel.actor.{Actor, Supervisor, ActiveObjectFactory, Dispatcher}
import kernel.camel.ActiveObjectComponent
import kernel.util.Logging
import org.codehaus.aspectwerkz.intercept.Advisable
import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
import org.apache.camel.{CamelContext, Endpoint, Routes}
@ -106,10 +108,9 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
// FIXME add wrapping Actor and pass into Worker
workers ::= Worker(null, component.lifeCycle)
val proxy = activeObjectFactory.newInstance(targetClass).asInstanceOf[AnyRef]
val actor = new Dispatcher(targetClass.getName)
val proxy = activeObjectFactory.newInstance(targetClass, actor).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
}
@ -117,9 +118,10 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newDelegatingProxy(component: Component): DependencyBinding = {
val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
workers ::= Worker(null, component.lifeCycle) // TODO null is not an Actor
component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance).asInstanceOf[AnyRef]
val actor = new Dispatcher(targetClass.getName)
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}

37
kernel/src/main/scala/state/State.scala Executable file → Normal file
View file

@ -5,7 +5,10 @@
package se.scalablesolutions.akka.kernel.state
import org.codehaus.aspectwerkz.proxy.Uuid
import kernel.actor.ActiveObject
import se.scalablesolutions.akka.collection._
import scala.collection.mutable.HashMap
/**
@ -13,9 +16,13 @@ import scala.collection.mutable.HashMap
*/
trait Transactional {
val uuid = Uuid.newUuid.toString
private[kernel] def begin
private[kernel] def commit
private[kernel] def rollback
protected def isInTransaction = ActiveObject.threadBoundTx.get.isDefined
protected def nonTransactionalCall = throw new IllegalStateException("Can't access transactional map outside the scope of a transaction")
}
/**
@ -29,6 +36,36 @@ trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable
def remove(key: K)
}
trait TransactionalMapGuard[K, V] extends TransactionalMap[K, V] with Transactional {
abstract override def contains(key: K): Boolean =
if (isInTransaction) super.contains(key)
else nonTransactionalCall
abstract override def clear =
if (isInTransaction) super.clear
else nonTransactionalCall
abstract override def size: Int =
if (isInTransaction) super.size
else nonTransactionalCall
abstract override def remove(key: K) =
if (isInTransaction) super.remove(key)
else nonTransactionalCall
abstract override def elements: Iterator[(K, V)] =
if (isInTransaction) super.elements
else nonTransactionalCall
abstract override def get(key: K): Option[V] =
if (isInTransaction) super.get(key)
else nonTransactionalCall
abstract override def put(key: K, value: V): Option[V] =
if (isInTransaction) super.put(key, value)
else nonTransactionalCall
abstract override def -=(key: K) =
if (isInTransaction) super.-=(key)
else nonTransactionalCall
abstract override def update(key: K, value: V) =
if (isInTransaction) super.update(key, value)
else nonTransactionalCall
}
/**
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*

View file

@ -89,6 +89,8 @@ object TransactionIdFactory {
log.debug("Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", changeSet.id, this, participants)
participants.foreach(_.full.foreach(_.rollback))
status = TransactionStatus.Aborted
participants.clear
precommitted.clear
}
def join(changeSet: ChangeSet) = synchronized {