first draft of MVCC using Clojure's Map as state holder, still one test failing though

This commit is contained in:
Jonas Boner 2009-04-04 21:34:10 +02:00
parent a453930503
commit 3e703a53ab
9 changed files with 212 additions and 92 deletions

View file

@ -12,6 +12,7 @@ import se.scalablesolutions.akka.kernel.ActiveObjectFactory;
import se.scalablesolutions.akka.kernel.ActiveObjectProxy;
import se.scalablesolutions.akka.kernel.Supervisor;
import se.scalablesolutions.akka.kernel.Worker;
import se.scalablesolutions.akka.kernel.TransientStringState;
import java.util.List;
import java.util.ArrayList;
@ -55,6 +56,7 @@ public class ActiveObjectGuiceConfigurator {
this.components = components;
modules.add(new AbstractModule() {
protected void configure() {
bind(TransientStringState.class);
bind(ResourceProviderFactory.class);
for (int i = 0; i < components.length; i++) {
Component c = components[i];

View file

@ -4,8 +4,9 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.oneway;
import se.scalablesolutions.akka.annotation.*;
import se.scalablesolutions.akka.kernel.configuration.*;
import se.scalablesolutions.akka.kernel.TransientObjectState;
import com.google.inject.Inject;
import com.google.inject.AbstractModule;
@ -34,6 +35,16 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
Bar.class,
BarImpl.class,
new LifeCycle(new Permanent(), 100),
1000),
new Component(
Stateful.class,
StatefulImpl.class,
new LifeCycle(new Permanent(), 100),
1000),
new Component(
Failer.class,
FailerImpl.class,
new LifeCycle(new Permanent(), 100),
1000)
}).inject().supervise();
@ -93,6 +104,19 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
} catch (RuntimeException e) {
}
}
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
Stateful stateful = conf.getActiveObject(Stateful.class);
stateful.success("test", "new state");
assertEquals("new state", stateful.getState("test"));
}
public void testShouldRollbackStateForStatefulServerInCaseOfFailure() {
Stateful stateful = conf.getActiveObject(Stateful.class);
Failer failer = conf.getActiveObject(Failer.class);
stateful.failure("test", "new state", failer);
assertEquals("nil", stateful.getState("test"));
}
}
// ============== TEST SERVICES ===============
@ -166,4 +190,35 @@ class ExtImpl implements Ext {
}
}
interface Stateful {
@transactional public void success(String key, String msg);
@transactional public void failure(String key, String msg, Failer failer);
public String getState(String key);
}
@stateful // TODO: make it possible to add @stateful to interface not impl class
class StatefulImpl implements Stateful {
@Inject private TransientObjectState state;
public String getState(String key) {
return (String)state.get(key);
}
public void success(String key, String msg) {
state.put(key, msg);
}
public void failure(String key, String msg, Failer failer) {
state.put(key, msg);
failer.fail();
}
}
interface Failer {
public void fail();
}
class FailerImpl implements Failer {
public void fail() {
throw new RuntimeException("expected");
}
}

View file

@ -5,7 +5,6 @@
package se.scalablesolutions.akka.kernel
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException}
import java.lang.annotation.Annotation
@ -74,11 +73,15 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
val transactional = classOf[se.scalablesolutions.akka.annotation.transactional]
val oneway = classOf[se.scalablesolutions.akka.annotation.oneway]
val immutable = classOf[se.scalablesolutions.akka.annotation.immutable]
val stateful= classOf[se.scalablesolutions.akka.annotation.stateful]
private[this] var activeTx: Option[Transaction] = None
private var targetInstance: AnyRef = _
private[kernel] def setTargetInstance(instance: AnyRef) = targetInstance = instance
private[kernel] def setTargetInstance(instance: AnyRef) = {
targetInstance = instance
if (server.state.isDefined) injectState(server.state.get, targetInstance)
}
private[this] val dispatcher = new GenericServer {
override def body: PartialFunction[Any, Unit] = {
@ -101,9 +104,11 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
}
}
private[kernel] val server = new GenericServerContainer(target.getName, () => dispatcher)
private[kernel] val server =
if (target.isAnnotationPresent(stateful)) new GenericServerContainer(target.getName, () => dispatcher, Some(new TransientObjectState))
else new GenericServerContainer(target.getName, () => dispatcher, None)
server.setTimeout(timeout)
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
if (m.isAnnotationPresent(transactional)) {
val newTx = new Transaction
@ -112,9 +117,9 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
}
val cflowTx = ActiveObject.threadBoundTx.get
println("========== invoking: " + m.getName)
println("========== cflowTx: " + cflowTx)
println("========== activeTx: " + activeTx)
// println("========== invoking: " + m.getName)
// println("========== cflowTx: " + cflowTx)
// println("========== activeTx: " + activeTx)
activeTx match {
case Some(tx) =>
if (cflowTx.isDefined && cflowTx.get != tx) {
@ -158,6 +163,21 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
tx.rollback(server)
ActiveObject.threadBoundTx.set(Some(tx))
}
private def injectState(state: TransientObjectState, targetInstance: AnyRef) = {
require(state != null)
require(targetInstance != null)
import se.scalablesolutions.akka.kernel.configuration.ConfigurationException
val fields = for {
field <- target.getDeclaredFields
if field.getType == classOf[TransientObjectState]
} yield field
if (fields.size == 0) throw new ConfigurationException("Stateful active object needs to have a field '@Inject TransientObjectState state' defined")
if (fields.size > 1) throw new ConfigurationException("Stateful active object can only have one single field '@Inject TransientObjectState state' defined")
val field = fields(0)
field.setAccessible(true)
field.set(targetInstance, state)
}
}
/**

View file

@ -81,7 +81,10 @@ trait GenericServer extends Actor {
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging {
class GenericServerContainer(
val id: String,
var serverFactory: () => GenericServer,
private[kernel] var state: Option[TransientObjectState]) extends Logging {
require(id != null && id != "")
// TODO: see if we can parameterize class and add type safe getActor method

View file

@ -7,6 +7,12 @@ package se.scalablesolutions.akka.kernel
import se.scalablesolutions.akka.collection._
import scala.collection.mutable.{HashMap}
trait Transactional {
private[kernel] def begin
private[kernel] def commit
private[kernel] def rollback
}
sealed trait State[K, V] {
def put(key: K, value: V)
def remove(key: K)
@ -17,28 +23,20 @@ sealed trait State[K, V] {
def clear
}
sealed trait TransactionalState[K, V] extends State[K, V] { this: HashState[K, V] =>
private[kernel] var snapshot = state
private[kernel] val unitOfWork = new HashMap[K, V]
private[kernel] def record = {
snapshot = state
unitOfWork.clear
}
abstract override def put(key: K, value: V) = {
super.put(key, value)
unitOfWork += key -> value
}
abstract override def remove(key: K) = {
super.remove(key)
unitOfWork -= key
}
}
final class HashState[K, V] extends State[K, V] {
sealed class TransientState[K, V] extends State[K, V] with Transactional {
private[kernel] var state = new HashTrie[K, V]
private[kernel] var snapshot = state
private[kernel] override def begin = {
snapshot = state
}
private[kernel] override def commit = {
}
private[kernel] override def rollback = {
state = snapshot
}
override def put(key: K, value: V) = {
state = state.update(key, value)
@ -59,6 +57,29 @@ final class HashState[K, V] extends State[K, V] {
def clear = state = new HashTrie[K, V]
}
final class TransientStringState extends TransientState[String, String]
final class TransientObjectState extends TransientState[String, AnyRef]
trait UnitOfWork[K, V] extends State[K, V] with Transactional {
this: TransientState[K, V] =>
private[kernel] val changeSet = new HashMap[K, V]
abstract override def begin = {
super.begin
changeSet.clear
}
abstract override def put(key: K, value: V) = {
super.put(key, value)
changeSet += key -> value
}
abstract override def remove(key: K) = {
super.remove(key)
changeSet -= key
}
}
//class VectorState[T] {
// private[kernel] var state: Vector[T] = EmptyVector
// private[kernel] var snapshot = state

View file

@ -32,12 +32,11 @@ object TransactionIdFactory {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Transaction extends Logging {
val stateful= classOf[se.scalablesolutions.akka.annotation.stateful]
val id = TransactionIdFactory.newId
log.debug("Creating a new transaction [%s]", id)
private[this] var parent: Option[Transaction] = None
private[this] var oldActorVersions = new HashMap[GenericServerContainer, GenericServer]
private[this] var participants = new HashMap[GenericServerContainer, GenericServer]
private[this] var precommitted: List[GenericServerContainer] = Nil
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
@ -46,10 +45,7 @@ class Transaction extends Logging {
if (status == TransactionStatus.Completed) throw new IllegalStateException("Can't begin COMPLETED transaction")
if (status == TransactionStatus.New) log.debug("Actor [%s] is starting NEW transaction", server)
else log.debug("Actor [%s] is participating in transaction", server)
if (server.getServer.getClass.isAnnotationPresent(stateful)) {
val oldVersion = server.cloneServerAndReturnOldVersion
oldActorVersions.put(server, oldVersion)
}
if (server.state.isDefined) server.state.get.begin
status = TransactionStatus.Active
}
@ -64,8 +60,8 @@ class Transaction extends Logging {
if (status == TransactionStatus.Active) {
log.debug("Committing transaction for actor [%s]", server)
val haveAllPreCommitted =
if (oldActorVersions.size == precommitted.size) {{
for (server <- oldActorVersions.keys) yield {
if (participants.size == precommitted.size) {{
for (server <- participants.keys) yield {
if (precommitted.exists(_.id == server.id)) true
else false
}}.exists(_ == false)
@ -77,10 +73,10 @@ class Transaction extends Logging {
def rollback(server: GenericServerContainer) = synchronized {
ensureIsActiveOrAborted
log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, oldActorVersions.keys)
oldActorVersions.foreach(entry => {
log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, participants.keys)
participants.foreach(entry => {
val (server, backup) = entry
server.swapServer(backup)
if (server.state.isDefined) server.state.get.rollback
})
status = TransactionStatus.Aborted
}

View file

@ -4,11 +4,52 @@
package se.scalablesolutions.akka.kernel
import org.specs.runner.JUnit4
import org.specs.Specification
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.junit.JUnit3Suite
import se.scalablesolutions.akka.annotation.{oneway, transactional, stateful}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActiveObjectSpec {
var messageLog = ""
}
class ActiveObjectSpec extends Spec with ShouldMatchers {
describe("An ActiveObject") {
it("(with default supervisor) should dispatch method calls normally") {
val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
val result = foo.foo("foo ")
ActiveObjectSpec.messageLog += result
foo.bar("bar ")
ActiveObjectSpec.messageLog += "before_bar "
Thread.sleep(500)
ActiveObjectSpec.messageLog should equal ("foo return_foo before_bar bar ")
}
it("should not rollback state for a stateful server in case of success") {
val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
stateful.success("new state")
stateful.state should equal ("new state")
}
it("should rollback state for a stateful server in case of failure") {
val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
val failer = ActiveObject.newInstance[Failer](classOf[Failer], new FailerImpl, 1000)
stateful.failure("new state", failer)
stateful.state should equal ("nil")
}
}
}
trait Foo {
def foo(msg: String): String
@transactional def fooInTx(msg: String): String
@ -17,15 +58,14 @@ trait Foo {
def throwsException
}
class FooImpl extends Foo {
val bar: Bar = new BarImpl
def foo(msg: String): String = {
activeObjectSpec.messageLog += msg
ActiveObjectSpec.messageLog += msg
"return_foo "
}
def fooInTx(msg: String): String = {
activeObjectSpec.messageLog += msg
ActiveObjectSpec.messageLog += msg
"return_foo "
}
def bar(msg: String) = bar.bar(msg)
@ -40,7 +80,7 @@ trait Bar {
class BarImpl extends Bar {
def bar(msg: String) = {
Thread.sleep(100)
activeObjectSpec.messageLog += msg
ActiveObjectSpec.messageLog += msg
}
}
@ -68,44 +108,6 @@ class FailerImpl extends Failer {
def fail = throw new RuntimeException("expected")
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class activeObjectSpecTest extends JUnit4(activeObjectSpec) // for JUnit4 and Maven
object activeObjectSpec extends Specification {
var messageLog = ""
"make sure default supervisor works correctly" in {
val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
val result = foo.foo("foo ")
messageLog += result
foo.bar("bar ")
messageLog += "before_bar "
Thread.sleep(500)
messageLog must equalIgnoreCase("foo return_foo before_bar bar ")
}
"stateful server should not rollback state in case of success" in {
val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
stateful.success("new state")
stateful.state must be_==("new state")
}
"stateful server should rollback state in case of failure" in {
val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
val failer = ActiveObject.newInstance[Failer](classOf[Failer], new FailerImpl, 1000)
stateful.failure("new state", failer)
stateful.state must be_==("nil")
}
}
// @Test { val groups=Array("unit") }
// def testCreateGenericServerBasedComponentUsingCustomSupervisorConfiguration = {
// val proxy = new ActiveObjectProxy(new FooImpl, 1000)

View file

@ -0,0 +1,19 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel
import org.scalatest._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class AllSuite extends SuperSuite(
List(
new ActiveObjectSpec,
new RestManagerSpec
)
)

View file

@ -4,8 +4,8 @@
package se.scalablesolutions.akka.kernel
import org.specs.runner.JUnit4
import org.specs.Specification
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
import javax.ws.rs.{Produces, Path, GET}
@ -17,18 +17,20 @@ import javax.ws.rs.{Produces, Path, GET}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class restManagerSpecTest extends JUnit4(restManagerSpec) // for JUnit4 and Maven
object restManagerSpec extends Specification {
class RestManagerSpec extends Spec with ShouldMatchers {
"jersey server should be able to start and stop" in {
val threadSelector = Kernel.startJersey
/* val cc = new DefaultClientConfig
describe("A RestManager") {
it("should be able to start and stop") {
val threadSelector = Kernel.startJersey
/* val cc = new DefaultClientConfig
val c = Client.create(cc)
val resource = c.proxy("http://localhost:9998/")
val hello = resource.get(classOf[HelloWorldResource])
val msg = hello.getMessage
println("=============: " + msg)
*/ threadSelector.stopEndpoint
}
}
}