diff --git a/buildfile b/buildfile
index 1ac7ddeeea..539679ead9 100644
--- a/buildfile
+++ b/buildfile
@@ -72,11 +72,11 @@ define 'akka' do
# package :jar
#end
- #desc 'Akka Java API'
- #define 'api-java' do
- # compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, GUICEYFRUIT, JUNIT4)
- # package :jar
- #end
+ desc 'Akka Java API'
+ define 'api-java' do
+ compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, GUICEYFRUIT, JUNIT4)
+ package :jar
+ end
package(:zip).include 'README'
package(:zip).include 'bin/*', :path=>'bin'
diff --git a/kernel/src/main/scala/ActiveObject.scala b/kernel/src/main/scala/ActiveObject.scala
index 35d9f9e837..9168202a61 100755
--- a/kernel/src/main/scala/ActiveObject.scala
+++ b/kernel/src/main/scala/ActiveObject.scala
@@ -71,7 +71,9 @@ object ActiveObject {
* @author Jonas Bonér
*/
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
+ val transactional = classOf[se.scalablesolutions.akka.annotation.transactional]
val oneway = classOf[se.scalablesolutions.akka.annotation.oneway]
+ val immutable = classOf[se.scalablesolutions.akka.annotation.immutable]
private[this] var activeTx: Option[Transaction] = None
@@ -82,6 +84,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
override def body: PartialFunction[Any, Unit] = {
case invocation: Invocation =>
val tx = invocation.tx
+ ActiveObject.threadBoundTx.set(tx)
try {
reply(ErrRef(invocation.invoke, tx))
} catch {
@@ -102,8 +105,17 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
server.setTimeout(timeout)
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
+ if (m.isAnnotationPresent(transactional)) {
+ val newTx = new Transaction
+ newTx.begin(server)
+ ActiveObject.threadBoundTx.set(Some(newTx))
+ }
val cflowTx = ActiveObject.threadBoundTx.get
- activeTx.get.asInstanceOf[Option[Transaction]] match {
+
+ println("========== invoking: " + m.getName)
+ println("========== cflowTx: " + cflowTx)
+ println("========== activeTx: " + activeTx)
+ activeTx match {
case Some(tx) =>
if (cflowTx.isDefined && cflowTx.get != tx) {
// new tx in scope; try to commit
@@ -113,6 +125,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
case None =>
if (cflowTx.isDefined) activeTx = Some(cflowTx.get)
}
+ activeTx = ActiveObject.threadBoundTx.get
invoke(Invocation(m, args, targetInstance, activeTx))
}
@@ -120,25 +133,31 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
val result: AnyRef =
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
else {
- val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({
- throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds")
- }, activeTx))
+ val result: ErrRef[AnyRef] =
+ server !!! (invocation, {
+ var ref = ErrRef(activeTx)
+ ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds")
+ ref
+ })
try {
result()
} catch {
- case e =>
- result.tx match {
- case None => // no tx; nothing to do
- case Some(tx) =>
- tx.rollback(server)
- ActiveObject.threadBoundTx.set(Some(tx))
- }
+ case e =>
+ rollback(result.tx)
throw e
}
}
if (activeTx.isDefined) activeTx.get.precommit(server)
result
}
+
+ private def rollback(tx: Option[Transaction]) = tx match {
+ case None => {} // no tx; nothing to do
+ case Some(tx) =>
+ println("================ ROLLING BACK")
+ tx.rollback(server)
+ ActiveObject.threadBoundTx.set(Some(tx))
+ }
}
/**
@@ -147,7 +166,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
* @author Jonas Bonér
*/
case class Invocation(val method: Method,
- val args: Array[Object],
+ val args: Array[AnyRef],
val target: AnyRef,
val tx: Option[Transaction]) {
method.setAccessible(true)
diff --git a/kernel/src/main/scala/ErrRef.scala b/kernel/src/main/scala/ErrRef.scala
index 85e2306681..a810824421 100644
--- a/kernel/src/main/scala/ErrRef.scala
+++ b/kernel/src/main/scala/ErrRef.scala
@@ -38,7 +38,7 @@ package se.scalablesolutions.akka.kernel
*
* @author Jonas Bonér
*/
-class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]){
+class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]) {
private[this] var contents: Either[Throwable, Payload] = Right(payload)
def update(value: => Payload) = {
@@ -54,4 +54,5 @@ class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]){
}
object ErrRef {
def apply[Payload](payload: Payload, tx: Option[Transaction]) = new ErrRef(payload, tx)
+ def apply[AnyRef](tx: Option[Transaction]) = new ErrRef(new Object, tx)
}
diff --git a/kernel/src/main/scala/GenericServer.scala b/kernel/src/main/scala/GenericServer.scala
index 86814e0034..ed8eb362d6 100644
--- a/kernel/src/main/scala/GenericServer.scala
+++ b/kernel/src/main/scala/GenericServer.scala
@@ -263,10 +263,11 @@ class GenericServerContainer(val id: String, var serverFactory: () => GenericSer
*/
private[kernel] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock {
if (shutdownTime > 0) {
- log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime)
- server !? (shutdownTime, Shutdown(reason)) match {
- case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id)
- case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime)
+ log.debug("Waiting [%s milliseconds for the server to shut down before killing it.", shutdownTime)
+// server !? (shutdownTime, Shutdown(reason)) match {
+ server !? Shutdown(reason) match {
+ case Some('success) => log.debug("Server [%s] has been shut down cleanly.", id)
+ case None => log.warning("Server [%s] was **not able** to complete shutdown cleanly within its configured shutdown time [%s]", id, shutdownTime)
}
}
server ! Terminate(reason)
diff --git a/kernel/src/main/scala/Helpers.scala b/kernel/src/main/scala/Helpers.scala
index 42a63a19db..0a02be63bb 100644
--- a/kernel/src/main/scala/Helpers.scala
+++ b/kernel/src/main/scala/Helpers.scala
@@ -77,7 +77,7 @@ object Helpers extends Logging {
def receiveWithin(timeout: Int): Option[A] = value match {
case None => ch.receiveWithin(timeout) {
case TIMEOUT =>
- log.debug("Future timed out while waiting for actor: {}", a)
+ log.debug("Future timed out while waiting for actor [%s]", a)
None
case a =>
value = Some(a)
diff --git a/kernel/src/main/scala/Supervisor.scala b/kernel/src/main/scala/Supervisor.scala
index eeaab98f2b..5f486c9afa 100644
--- a/kernel/src/main/scala/Supervisor.scala
+++ b/kernel/src/main/scala/Supervisor.scala
@@ -146,22 +146,22 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
loop {
react {
case Configure(config, factory) =>
- log.debug("Configuring supervisor:{} ", this)
+ log.debug("Configuring supervisor:%s ", this)
configure(config, factory)
reply('success)
case Start =>
state.serverContainers.foreach { serverContainer =>
serverContainer.start
- log.info("Starting server: {}", serverContainer.getServer)
+ log.info("Starting server: %s", serverContainer.getServer)
}
case Stop =>
state.serverContainers.foreach { serverContainer =>
serverContainer.terminate('normal)
- log.info("Stopping server: {}", serverContainer)
+ log.info("Stopping ser-ver: %s", serverContainer)
}
- log.info("Stopping supervisor: {}", this)
+ log.info("Stopping supervisor: %s", this)
exit('normal)
case Exit(failedServer, reason) =>
@@ -170,7 +170,7 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
case _ => state.faultHandler.handleFailure(state, failedServer, reason)
}
- case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected)
+ case unexpected => log.warning("Unexpected message [%s] from [%s] ignoring...", unexpected, sender)
}
}
}
@@ -194,7 +194,7 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
val newServer = serverContainer.newServer()
newServer.start
self.link(newServer)
- log.debug("Linking actor [{}] to supervisor [{}]", newServer, this)
+ log.debug("Linking actor [%s] to supervisor [%s]", newServer, this)
state.addServerContainer(serverContainer)
newServer
}
@@ -215,7 +215,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang
nrOfRetries += 1
if (timeRangeHasExpired) {
if (hasReachedMaximumNrOfRetries) {
- log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer)
+ log.info("Maximum of restarts [%s] for server [%s] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer)
supervisor ! Stop // execution stops here
} else {
nrOfRetries = 0
@@ -241,17 +241,17 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang
scope match {
case Permanent =>
- log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id)
+ log.debug("Restarting server [%s] configured as PERMANENT.", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
case Temporary =>
if (reason == 'normal) {
- log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id)
+ log.debug("Restarting server [%s] configured as TEMPORARY (since exited naturally).", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
- } else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id)
+ } else log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id)
case Transient =>
- log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id)
+ log.info("Server [%s] configured as TRANSIENT will not be restarted.", serverContainer.id)
}
}
}
@@ -287,7 +287,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang
class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
- log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason)
+ log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason)
for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state)
state.supervisors.foreach(_ ! Exit(failedServer, reason))
}
@@ -302,7 +302,7 @@ extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
- log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason)
+ log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason)
var serverContainer: Option[GenericServerContainer] = None
state.serverContainers.foreach {
container => if (container.getServer == failedServer) serverContainer = Some(container)
diff --git a/kernel/src/main/scala/Transaction.scala b/kernel/src/main/scala/Transaction.scala
index 3e4fed16f3..8703837682 100644
--- a/kernel/src/main/scala/Transaction.scala
+++ b/kernel/src/main/scala/Transaction.scala
@@ -21,6 +21,7 @@ object TransactionStatus {
* @author Jonas Bonér
*/
object TransactionIdFactory {
+ // FIXME: will not work in distributed env
private val currentId = new AtomicLong(0L)
def newId = currentId.getAndIncrement
}
@@ -31,8 +32,10 @@ object TransactionIdFactory {
* @author Jonas Bonér
*/
class Transaction extends Logging {
+ val stateful= classOf[se.scalablesolutions.akka.annotation.stateful]
val id = TransactionIdFactory.newId
-
+
+ log.debug("Creating a new transaction [%s]", id)
private[this] var parent: Option[Transaction] = None
private[this] var oldActorVersions = new HashMap[GenericServerContainer, GenericServer]
private[this] var precommitted: List[GenericServerContainer] = Nil
@@ -43,45 +46,52 @@ class Transaction extends Logging {
if (status == TransactionStatus.Completed) throw new IllegalStateException("Can't begin COMPLETED transaction")
if (status == TransactionStatus.New) log.debug("Actor [%s] is starting NEW transaction", server)
else log.debug("Actor [%s] is participating in transaction", server)
- val oldVersion = server.cloneServerAndReturnOldVersion
- oldActorVersions.put(server, oldVersion)
+ if (server.getServer.getClass.isAnnotationPresent(stateful)) {
+ val oldVersion = server.cloneServerAndReturnOldVersion
+ oldActorVersions.put(server, oldVersion)
+ }
status = TransactionStatus.Active
}
def precommit(server: GenericServerContainer) = synchronized {
- ensureIsActive
- log.debug("Pre-committing transaction for actor [%s]", server)
- precommitted ::= server
+ if (status == TransactionStatus.Active) {
+ log.debug("Pre-committing transaction for actor [%s]", server)
+ precommitted ::= server
+ }
}
def commit(server: GenericServerContainer) = synchronized {
- ensureIsActive
- log.debug("Committing transaction for actor [%s]", server)
- val haveAllPreCommitted =
- if (oldActorVersions.size == precommitted.size) {{
- for (server <- oldActorVersions.keys) yield {
- if (precommitted.exists(_.id == server.id)) true
- else false
- }}.exists(_ == false)
- } else false
-
- if (haveAllPreCommitted) status = TransactionStatus.Completed
- else rollback(server)
+ if (status == TransactionStatus.Active) {
+ log.debug("Committing transaction for actor [%s]", server)
+ val haveAllPreCommitted =
+ if (oldActorVersions.size == precommitted.size) {{
+ for (server <- oldActorVersions.keys) yield {
+ if (precommitted.exists(_.id == server.id)) true
+ else false
+ }}.exists(_ == false)
+ } else false
+ if (haveAllPreCommitted) status = TransactionStatus.Completed
+ else rollback(server)
+ }
}
def rollback(server: GenericServerContainer) = synchronized {
- ensureIsActive
+ ensureIsActiveOrAborted
log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, oldActorVersions.keys)
oldActorVersions.foreach(entry => {
- val (server, backup) = entry
- server.swapServer(backup)
+ val (server, backup) = entry
+ server.swapServer(backup)
})
status = TransactionStatus.Aborted
}
- private def ensureIsActive = if (status == TransactionStatus.Active)
+ private def ensureIsActive = if (status != TransactionStatus.Active)
throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]")
+ private def ensureIsActiveOrAborted =
+ if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
+ throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]")
+
override def equals(that: Any): Boolean =
that != null &&
that.isInstanceOf[Transaction] &&
diff --git a/kernel/src/test/scala/activeObjectSpecs.scala b/kernel/src/test/scala/activeObjectSpecs.scala
index 284153f717..35c9972c7a 100755
--- a/kernel/src/test/scala/activeObjectSpecs.scala
+++ b/kernel/src/test/scala/activeObjectSpecs.scala
@@ -7,59 +7,103 @@ package se.scalablesolutions.akka.kernel
import org.specs.runner.JUnit4
import org.specs.Specification
-import se.scalablesolutions.akka.annotation.oneway
+import se.scalablesolutions.akka.annotation.{oneway, transactional, stateful}
+
+trait Foo {
+ def foo(msg: String): String
+ @transactional def fooInTx(msg: String): String
+ @oneway def bar(msg: String)
+ def longRunning
+ def throwsException
+}
+
+
+class FooImpl extends Foo {
+ val bar: Bar = new BarImpl
+ def foo(msg: String): String = {
+ activeObjectSpec.messageLog += msg
+ "return_foo "
+ }
+ def fooInTx(msg: String): String = {
+ activeObjectSpec.messageLog += msg
+ "return_foo "
+ }
+ def bar(msg: String) = bar.bar(msg)
+ def longRunning = Thread.sleep(10000)
+ def throwsException = error("expected")
+}
+
+trait Bar {
+ @oneway def bar(msg: String)
+}
+
+class BarImpl extends Bar {
+ def bar(msg: String) = {
+ Thread.sleep(100)
+ activeObjectSpec.messageLog += msg
+ }
+}
+
+trait Stateful {
+ @transactional def success(msg: String)
+ @transactional def failure(msg: String, failer: Failer)
+ def state: String
+}
+
+@stateful
+class StatefulImpl extends Stateful {
+ var state: String = "nil"
+ def success(msg: String) = state = msg
+ def failure(msg: String, failer: Failer) = {
+ state = msg
+ failer.fail
+ }
+}
+
+trait Failer {
+ def fail
+}
+
+class FailerImpl extends Failer {
+ def fail = throw new RuntimeException("expected")
+}
+
/**
* @author Jonas Bonér
*/
-
class activeObjectSpecTest extends JUnit4(activeObjectSpec) // for JUnit4 and Maven
object activeObjectSpec extends Specification {
- private var messageLog = ""
+ var messageLog = ""
- trait Foo {
- def foo(msg: String): String
- @oneway def bar(msg: String)
- def longRunning
- def throwsException
- }
+ "make sure default supervisor works correctly" in {
+ val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
- class FooImpl extends Foo {
- val bar: Bar = new BarImpl
- def foo(msg: String): String = {
- messageLog += msg
- "return_foo "
- }
- def bar(msg: String) = bar.bar(msg)
- def longRunning = Thread.sleep(10000)
- def throwsException = error("expected")
- }
+ val result = foo.foo("foo ")
+ messageLog += result
- trait Bar {
- @oneway def bar(msg: String)
- }
+ foo.bar("bar ")
+ messageLog += "before_bar "
- class BarImpl extends Bar {
- def bar(msg: String) = {
- Thread.sleep(100)
- messageLog += msg
- }
- }
+ Thread.sleep(500)
+ messageLog must equalIgnoreCase("foo return_foo before_bar bar ")
+ }
-// "make sure default supervisor works correctly" in {
-// val foo = ActiveObject.newInstance[Foo](classOf[Foo], classOf[FooImpl], 1000)
-//
-// val result = foo.foo("foo ")
-// messageLog += result
-//
-// foo.bar("bar ")
-// messageLog += "before_bar "
-//
-// Thread.sleep(500)
-// messageLog must equalIgnoreCase("foo return_foo before_bar bar ")
-// }
+ "stateful server should not rollback state in case of success" in {
+ val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
+ stateful.success("new state")
+ stateful.state must be_==("new state")
+ }
+
+ "stateful server should rollback state in case of failure" in {
+ val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
+ val failer = ActiveObject.newInstance[Failer](classOf[Failer], new FailerImpl, 1000)
+
+ stateful.failure("new state", failer)
+ stateful.state must be_==("nil")
+ }
}
// @Test { val groups=Array("unit") }