diff --git a/api-java/src/main/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java b/api-java/src/main/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java index 2eecce344c..0747f04e62 100755 --- a/api-java/src/main/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java +++ b/api-java/src/main/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfigurator.java @@ -12,6 +12,7 @@ import se.scalablesolutions.akka.kernel.ActiveObjectFactory; import se.scalablesolutions.akka.kernel.ActiveObjectProxy; import se.scalablesolutions.akka.kernel.Supervisor; import se.scalablesolutions.akka.kernel.Worker; +import se.scalablesolutions.akka.kernel.TransientStringState; import java.util.List; import java.util.ArrayList; @@ -55,6 +56,7 @@ public class ActiveObjectGuiceConfigurator { this.components = components; modules.add(new AbstractModule() { protected void configure() { + bind(TransientStringState.class); bind(ResourceProviderFactory.class); for (int i = 0; i < components.length; i++) { Component c = components[i]; diff --git a/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java index 0f40e57bbc..9ca1926bea 100755 --- a/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java +++ b/api-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java @@ -4,8 +4,9 @@ package se.scalablesolutions.akka.api; -import se.scalablesolutions.akka.annotation.oneway; +import se.scalablesolutions.akka.annotation.*; import se.scalablesolutions.akka.kernel.configuration.*; +import se.scalablesolutions.akka.kernel.TransientObjectState; import com.google.inject.Inject; import com.google.inject.AbstractModule; @@ -34,6 +35,16 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { Bar.class, BarImpl.class, new LifeCycle(new Permanent(), 100), + 1000), + new Component( + Stateful.class, + StatefulImpl.class, + new LifeCycle(new Permanent(), 100), + 1000), + new Component( + Failer.class, + FailerImpl.class, + new LifeCycle(new Permanent(), 100), 1000) }).inject().supervise(); @@ -93,6 +104,19 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { } catch (RuntimeException e) { } } + + public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { + Stateful stateful = conf.getActiveObject(Stateful.class); + stateful.success("test", "new state"); + assertEquals("new state", stateful.getState("test")); + } + + public void testShouldRollbackStateForStatefulServerInCaseOfFailure() { + Stateful stateful = conf.getActiveObject(Stateful.class); + Failer failer = conf.getActiveObject(Failer.class); + stateful.failure("test", "new state", failer); + assertEquals("nil", stateful.getState("test")); + } } // ============== TEST SERVICES =============== @@ -166,4 +190,35 @@ class ExtImpl implements Ext { } } +interface Stateful { + @transactional public void success(String key, String msg); + @transactional public void failure(String key, String msg, Failer failer); + public String getState(String key); +} + +@stateful // TODO: make it possible to add @stateful to interface not impl class +class StatefulImpl implements Stateful { + @Inject private TransientObjectState state; + public String getState(String key) { + return (String)state.get(key); + } + public void success(String key, String msg) { + state.put(key, msg); + } + public void failure(String key, String msg, Failer failer) { + state.put(key, msg); + failer.fail(); + } +} + +interface Failer { + public void fail(); +} + +class FailerImpl implements Failer { + public void fail() { + throw new RuntimeException("expected"); + } +} + diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala index 9168202a61..df7467c332 100755 --- a/kernel/src/main/scala/ActiveObject.scala +++ b/kernel/src/main/scala/ActiveObject.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.kernel import java.util.{List => JList, ArrayList} - import java.lang.reflect.{Method, Field, InvocationHandler, Proxy, InvocationTargetException} import java.lang.annotation.Annotation @@ -74,11 +73,15 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I val transactional = classOf[se.scalablesolutions.akka.annotation.transactional] val oneway = classOf[se.scalablesolutions.akka.annotation.oneway] val immutable = classOf[se.scalablesolutions.akka.annotation.immutable] + val stateful= classOf[se.scalablesolutions.akka.annotation.stateful] private[this] var activeTx: Option[Transaction] = None private var targetInstance: AnyRef = _ - private[kernel] def setTargetInstance(instance: AnyRef) = targetInstance = instance + private[kernel] def setTargetInstance(instance: AnyRef) = { + targetInstance = instance + if (server.state.isDefined) injectState(server.state.get, targetInstance) + } private[this] val dispatcher = new GenericServer { override def body: PartialFunction[Any, Unit] = { @@ -101,9 +104,11 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I } } - private[kernel] val server = new GenericServerContainer(target.getName, () => dispatcher) + private[kernel] val server = + if (target.isAnnotationPresent(stateful)) new GenericServerContainer(target.getName, () => dispatcher, Some(new TransientObjectState)) + else new GenericServerContainer(target.getName, () => dispatcher, None) server.setTimeout(timeout) - + def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = { if (m.isAnnotationPresent(transactional)) { val newTx = new Transaction @@ -112,9 +117,9 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I } val cflowTx = ActiveObject.threadBoundTx.get - println("========== invoking: " + m.getName) - println("========== cflowTx: " + cflowTx) - println("========== activeTx: " + activeTx) +// println("========== invoking: " + m.getName) +// println("========== cflowTx: " + cflowTx) +// println("========== activeTx: " + activeTx) activeTx match { case Some(tx) => if (cflowTx.isDefined && cflowTx.get != tx) { @@ -158,6 +163,21 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I tx.rollback(server) ActiveObject.threadBoundTx.set(Some(tx)) } + + private def injectState(state: TransientObjectState, targetInstance: AnyRef) = { + require(state != null) + require(targetInstance != null) + import se.scalablesolutions.akka.kernel.configuration.ConfigurationException + val fields = for { + field <- target.getDeclaredFields + if field.getType == classOf[TransientObjectState] + } yield field + if (fields.size == 0) throw new ConfigurationException("Stateful active object needs to have a field '@Inject TransientObjectState state' defined") + if (fields.size > 1) throw new ConfigurationException("Stateful active object can only have one single field '@Inject TransientObjectState state' defined") + val field = fields(0) + field.setAccessible(true) + field.set(targetInstance, state) + } } /** diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala index ed8eb362d6..266201b817 100644 --- a/kernel/src/main/scala/GenericServer.scala +++ b/kernel/src/main/scala/GenericServer.scala @@ -81,7 +81,10 @@ trait GenericServer extends Actor { * * @author Jonas Bonér */ -class GenericServerContainer(val id: String, var serverFactory: () => GenericServer) extends Logging { +class GenericServerContainer( + val id: String, + var serverFactory: () => GenericServer, + private[kernel] var state: Option[TransientObjectState]) extends Logging { require(id != null && id != "") // TODO: see if we can parameterize class and add type safe getActor method diff --git a/kernel/src/main/scala/State.scala b/kernel/src/main/scala/State.scala index ba125d6ebf..8295e77320 100755 --- a/kernel/src/main/scala/State.scala +++ b/kernel/src/main/scala/State.scala @@ -7,6 +7,12 @@ package se.scalablesolutions.akka.kernel import se.scalablesolutions.akka.collection._ import scala.collection.mutable.{HashMap} +trait Transactional { + private[kernel] def begin + private[kernel] def commit + private[kernel] def rollback +} + sealed trait State[K, V] { def put(key: K, value: V) def remove(key: K) @@ -17,28 +23,20 @@ sealed trait State[K, V] { def clear } -sealed trait TransactionalState[K, V] extends State[K, V] { this: HashState[K, V] => - private[kernel] var snapshot = state - private[kernel] val unitOfWork = new HashMap[K, V] - - private[kernel] def record = { - snapshot = state - unitOfWork.clear - } - - abstract override def put(key: K, value: V) = { - super.put(key, value) - unitOfWork += key -> value - } - - abstract override def remove(key: K) = { - super.remove(key) - unitOfWork -= key - } -} - -final class HashState[K, V] extends State[K, V] { +sealed class TransientState[K, V] extends State[K, V] with Transactional { private[kernel] var state = new HashTrie[K, V] + private[kernel] var snapshot = state + + private[kernel] override def begin = { + snapshot = state + } + + private[kernel] override def commit = { + } + + private[kernel] override def rollback = { + state = snapshot + } override def put(key: K, value: V) = { state = state.update(key, value) @@ -59,6 +57,29 @@ final class HashState[K, V] extends State[K, V] { def clear = state = new HashTrie[K, V] } +final class TransientStringState extends TransientState[String, String] +final class TransientObjectState extends TransientState[String, AnyRef] + +trait UnitOfWork[K, V] extends State[K, V] with Transactional { + this: TransientState[K, V] => + private[kernel] val changeSet = new HashMap[K, V] + + abstract override def begin = { + super.begin + changeSet.clear + } + + abstract override def put(key: K, value: V) = { + super.put(key, value) + changeSet += key -> value + } + + abstract override def remove(key: K) = { + super.remove(key) + changeSet -= key + } +} + //class VectorState[T] { // private[kernel] var state: Vector[T] = EmptyVector // private[kernel] var snapshot = state diff --git a/kernel/src/main/scala/Transaction.scala b/kernel/src/main/scala/Transaction.scala index 8703837682..8114d93464 100644 --- a/kernel/src/main/scala/Transaction.scala +++ b/kernel/src/main/scala/Transaction.scala @@ -32,12 +32,11 @@ object TransactionIdFactory { * @author Jonas Bonér */ class Transaction extends Logging { - val stateful= classOf[se.scalablesolutions.akka.annotation.stateful] val id = TransactionIdFactory.newId log.debug("Creating a new transaction [%s]", id) private[this] var parent: Option[Transaction] = None - private[this] var oldActorVersions = new HashMap[GenericServerContainer, GenericServer] + private[this] var participants = new HashMap[GenericServerContainer, GenericServer] private[this] var precommitted: List[GenericServerContainer] = Nil @volatile private[this] var status: TransactionStatus = TransactionStatus.New @@ -46,10 +45,7 @@ class Transaction extends Logging { if (status == TransactionStatus.Completed) throw new IllegalStateException("Can't begin COMPLETED transaction") if (status == TransactionStatus.New) log.debug("Actor [%s] is starting NEW transaction", server) else log.debug("Actor [%s] is participating in transaction", server) - if (server.getServer.getClass.isAnnotationPresent(stateful)) { - val oldVersion = server.cloneServerAndReturnOldVersion - oldActorVersions.put(server, oldVersion) - } + if (server.state.isDefined) server.state.get.begin status = TransactionStatus.Active } @@ -64,8 +60,8 @@ class Transaction extends Logging { if (status == TransactionStatus.Active) { log.debug("Committing transaction for actor [%s]", server) val haveAllPreCommitted = - if (oldActorVersions.size == precommitted.size) {{ - for (server <- oldActorVersions.keys) yield { + if (participants.size == precommitted.size) {{ + for (server <- participants.keys) yield { if (precommitted.exists(_.id == server.id)) true else false }}.exists(_ == false) @@ -77,10 +73,10 @@ class Transaction extends Logging { def rollback(server: GenericServerContainer) = synchronized { ensureIsActiveOrAborted - log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, oldActorVersions.keys) - oldActorVersions.foreach(entry => { + log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, participants.keys) + participants.foreach(entry => { val (server, backup) = entry - server.swapServer(backup) + if (server.state.isDefined) server.state.get.rollback }) status = TransactionStatus.Aborted } diff --git a/kernel/src/test/scala/activeObjectSpecs.scala b/kernel/src/test/scala/ActiveObjectSpec.scala similarity index 72% rename from kernel/src/test/scala/activeObjectSpecs.scala rename to kernel/src/test/scala/ActiveObjectSpec.scala index 35c9972c7a..b4903bde6c 100755 --- a/kernel/src/test/scala/activeObjectSpecs.scala +++ b/kernel/src/test/scala/ActiveObjectSpec.scala @@ -4,11 +4,52 @@ package se.scalablesolutions.akka.kernel -import org.specs.runner.JUnit4 -import org.specs.Specification +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers +import org.scalatest.junit.JUnit3Suite import se.scalablesolutions.akka.annotation.{oneway, transactional, stateful} +/** + * @author Jonas Bonér + */ +object ActiveObjectSpec { + var messageLog = "" +} +class ActiveObjectSpec extends Spec with ShouldMatchers { + + describe("An ActiveObject") { + + it("(with default supervisor) should dispatch method calls normally") { + val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000) + + val result = foo.foo("foo ") + ActiveObjectSpec.messageLog += result + + foo.bar("bar ") + ActiveObjectSpec.messageLog += "before_bar " + + Thread.sleep(500) + ActiveObjectSpec.messageLog should equal ("foo return_foo before_bar bar ") + } + + it("should not rollback state for a stateful server in case of success") { + val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000) + + stateful.success("new state") + stateful.state should equal ("new state") + } + + it("should rollback state for a stateful server in case of failure") { + val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000) + val failer = ActiveObject.newInstance[Failer](classOf[Failer], new FailerImpl, 1000) + + stateful.failure("new state", failer) + stateful.state should equal ("nil") + } + } +} + trait Foo { def foo(msg: String): String @transactional def fooInTx(msg: String): String @@ -17,15 +58,14 @@ trait Foo { def throwsException } - class FooImpl extends Foo { val bar: Bar = new BarImpl def foo(msg: String): String = { - activeObjectSpec.messageLog += msg + ActiveObjectSpec.messageLog += msg "return_foo " } def fooInTx(msg: String): String = { - activeObjectSpec.messageLog += msg + ActiveObjectSpec.messageLog += msg "return_foo " } def bar(msg: String) = bar.bar(msg) @@ -40,7 +80,7 @@ trait Bar { class BarImpl extends Bar { def bar(msg: String) = { Thread.sleep(100) - activeObjectSpec.messageLog += msg + ActiveObjectSpec.messageLog += msg } } @@ -68,44 +108,6 @@ class FailerImpl extends Failer { def fail = throw new RuntimeException("expected") } - -/** - * @author Jonas Bonér - */ -class activeObjectSpecTest extends JUnit4(activeObjectSpec) // for JUnit4 and Maven -object activeObjectSpec extends Specification { - - var messageLog = "" - - "make sure default supervisor works correctly" in { - val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000) - - val result = foo.foo("foo ") - messageLog += result - - foo.bar("bar ") - messageLog += "before_bar " - - Thread.sleep(500) - messageLog must equalIgnoreCase("foo return_foo before_bar bar ") - } - - "stateful server should not rollback state in case of success" in { - val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000) - - stateful.success("new state") - stateful.state must be_==("new state") - } - - "stateful server should rollback state in case of failure" in { - val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000) - val failer = ActiveObject.newInstance[Failer](classOf[Failer], new FailerImpl, 1000) - - stateful.failure("new state", failer) - stateful.state must be_==("nil") - } - -} // @Test { val groups=Array("unit") } // def testCreateGenericServerBasedComponentUsingCustomSupervisorConfiguration = { // val proxy = new ActiveObjectProxy(new FooImpl, 1000) diff --git a/kernel/src/test/scala/AllSuite.scala b/kernel/src/test/scala/AllSuite.scala new file mode 100755 index 0000000000..de4e3ef678 --- /dev/null +++ b/kernel/src/test/scala/AllSuite.scala @@ -0,0 +1,19 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel + +import org.scalatest._ + +/** + * @author Jonas Bonér + */ +class AllSuite extends SuperSuite( + List( + new ActiveObjectSpec, + new RestManagerSpec + ) +) + + diff --git a/kernel/src/test/scala/restManagerSpecs.scala b/kernel/src/test/scala/RestManagerSpec.scala similarity index 70% rename from kernel/src/test/scala/restManagerSpecs.scala rename to kernel/src/test/scala/RestManagerSpec.scala index bdfa2d167a..b80ad38abb 100755 --- a/kernel/src/test/scala/restManagerSpecs.scala +++ b/kernel/src/test/scala/RestManagerSpec.scala @@ -4,8 +4,8 @@ package se.scalablesolutions.akka.kernel -import org.specs.runner.JUnit4 -import org.specs.Specification +import org.scalatest.Spec +import org.scalatest.matchers.ShouldMatchers import javax.ws.rs.{Produces, Path, GET} @@ -17,18 +17,20 @@ import javax.ws.rs.{Produces, Path, GET} /** * @author Jonas Bonér */ -class restManagerSpecTest extends JUnit4(restManagerSpec) // for JUnit4 and Maven -object restManagerSpec extends Specification { +class RestManagerSpec extends Spec with ShouldMatchers { - "jersey server should be able to start and stop" in { - val threadSelector = Kernel.startJersey -/* val cc = new DefaultClientConfig + describe("A RestManager") { + + it("should be able to start and stop") { + val threadSelector = Kernel.startJersey + /* val cc = new DefaultClientConfig val c = Client.create(cc) val resource = c.proxy("http://localhost:9998/") val hello = resource.get(classOf[HelloWorldResource]) val msg = hello.getMessage println("=============: " + msg) */ threadSelector.stopEndpoint + } } }