fixed misc bugs and completed first iteration of transactions

This commit is contained in:
Jonas Boner 2009-03-26 20:22:49 +01:00
parent 6cb38f6ce9
commit 123fa5bd50
8 changed files with 173 additions and 98 deletions

View file

@ -72,11 +72,11 @@ define 'akka' do
# package :jar
#end
#desc 'Akka Java API'
#define 'api-java' do
# compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, GUICEYFRUIT, JUNIT4)
# package :jar
#end
desc 'Akka Java API'
define 'api-java' do
compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, GUICEYFRUIT, JUNIT4)
package :jar
end
package(:zip).include 'README'
package(:zip).include 'bin/*', :path=>'bin'

View file

@ -71,7 +71,9 @@ object ActiveObject {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: Int) extends InvocationHandler {
val transactional = classOf[se.scalablesolutions.akka.annotation.transactional]
val oneway = classOf[se.scalablesolutions.akka.annotation.oneway]
val immutable = classOf[se.scalablesolutions.akka.annotation.immutable]
private[this] var activeTx: Option[Transaction] = None
@ -82,6 +84,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
override def body: PartialFunction[Any, Unit] = {
case invocation: Invocation =>
val tx = invocation.tx
ActiveObject.threadBoundTx.set(tx)
try {
reply(ErrRef(invocation.invoke, tx))
} catch {
@ -102,8 +105,17 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
server.setTimeout(timeout)
def invoke(proxy: AnyRef, m: Method, args: Array[AnyRef]): AnyRef = {
if (m.isAnnotationPresent(transactional)) {
val newTx = new Transaction
newTx.begin(server)
ActiveObject.threadBoundTx.set(Some(newTx))
}
val cflowTx = ActiveObject.threadBoundTx.get
activeTx.get.asInstanceOf[Option[Transaction]] match {
println("========== invoking: " + m.getName)
println("========== cflowTx: " + cflowTx)
println("========== activeTx: " + activeTx)
activeTx match {
case Some(tx) =>
if (cflowTx.isDefined && cflowTx.get != tx) {
// new tx in scope; try to commit
@ -113,6 +125,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
case None =>
if (cflowTx.isDefined) activeTx = Some(cflowTx.get)
}
activeTx = ActiveObject.threadBoundTx.get
invoke(Invocation(m, args, targetInstance, activeTx))
}
@ -120,25 +133,31 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
val result: AnyRef =
if (invocation.method.isAnnotationPresent(oneway)) server ! invocation
else {
val result: ErrRef[AnyRef] = server !!! (invocation, ErrRef({
throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds")
}, activeTx))
val result: ErrRef[AnyRef] =
server !!! (invocation, {
var ref = ErrRef(activeTx)
ref() = throw new ActiveObjectInvocationTimeoutException("Invocation to active object [" + targetInstance.getClass.getName + "] timed out after " + timeout + " milliseconds")
ref
})
try {
result()
} catch {
case e =>
result.tx match {
case None => // no tx; nothing to do
case Some(tx) =>
tx.rollback(server)
ActiveObject.threadBoundTx.set(Some(tx))
}
case e =>
rollback(result.tx)
throw e
}
}
if (activeTx.isDefined) activeTx.get.precommit(server)
result
}
private def rollback(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do
case Some(tx) =>
println("================ ROLLING BACK")
tx.rollback(server)
ActiveObject.threadBoundTx.set(Some(tx))
}
}
/**
@ -147,7 +166,7 @@ class ActiveObjectProxy(val intf: Class[_], val target: Class[_], val timeout: I
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
case class Invocation(val method: Method,
val args: Array[Object],
val args: Array[AnyRef],
val target: AnyRef,
val tx: Option[Transaction]) {
method.setAccessible(true)

View file

@ -38,7 +38,7 @@ package se.scalablesolutions.akka.kernel
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]){
class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]) {
private[this] var contents: Either[Throwable, Payload] = Right(payload)
def update(value: => Payload) = {
@ -54,4 +54,5 @@ class ErrRef[Payload](payload: Payload, val tx: Option[Transaction]){
}
object ErrRef {
def apply[Payload](payload: Payload, tx: Option[Transaction]) = new ErrRef(payload, tx)
def apply[AnyRef](tx: Option[Transaction]) = new ErrRef(new Object, tx)
}

View file

@ -263,10 +263,11 @@ class GenericServerContainer(val id: String, var serverFactory: () => GenericSer
*/
private[kernel] def terminate(reason: AnyRef, shutdownTime: Int) = lock.withReadLock {
if (shutdownTime > 0) {
log.debug("Waiting {} milliseconds for the server to shut down before killing it.", shutdownTime)
server !? (shutdownTime, Shutdown(reason)) match {
case Some('success) => log.debug("Server [{}] has been shut down cleanly.", id)
case None => log.warning("Server [{}] was **not able** to complete shutdown cleanly within its configured shutdown time [{}]", id, shutdownTime)
log.debug("Waiting [%s milliseconds for the server to shut down before killing it.", shutdownTime)
// server !? (shutdownTime, Shutdown(reason)) match {
server !? Shutdown(reason) match {
case Some('success) => log.debug("Server [%s] has been shut down cleanly.", id)
case None => log.warning("Server [%s] was **not able** to complete shutdown cleanly within its configured shutdown time [%s]", id, shutdownTime)
}
}
server ! Terminate(reason)

View file

@ -77,7 +77,7 @@ object Helpers extends Logging {
def receiveWithin(timeout: Int): Option[A] = value match {
case None => ch.receiveWithin(timeout) {
case TIMEOUT =>
log.debug("Future timed out while waiting for actor: {}", a)
log.debug("Future timed out while waiting for actor [%s]", a)
None
case a =>
value = Some(a)

View file

@ -146,22 +146,22 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
loop {
react {
case Configure(config, factory) =>
log.debug("Configuring supervisor:{} ", this)
log.debug("Configuring supervisor:%s ", this)
configure(config, factory)
reply('success)
case Start =>
state.serverContainers.foreach { serverContainer =>
serverContainer.start
log.info("Starting server: {}", serverContainer.getServer)
log.info("Starting server: %s", serverContainer.getServer)
}
case Stop =>
state.serverContainers.foreach { serverContainer =>
serverContainer.terminate('normal)
log.info("Stopping server: {}", serverContainer)
log.info("Stopping ser-ver: %s", serverContainer)
}
log.info("Stopping supervisor: {}", this)
log.info("Stopping supervisor: %s", this)
exit('normal)
case Exit(failedServer, reason) =>
@ -170,7 +170,7 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
case _ => state.faultHandler.handleFailure(state, failedServer, reason)
}
case unexpected => log.warning("Unexpected message [{}], ignoring...", unexpected)
case unexpected => log.warning("Unexpected message [%s] from [%s] ignoring...", unexpected, sender)
}
}
}
@ -194,7 +194,7 @@ class Supervisor(faultHandler: FaultHandlingStrategy) extends Actor with Logging
val newServer = serverContainer.newServer()
newServer.start
self.link(newServer)
log.debug("Linking actor [{}] to supervisor [{}]", newServer, this)
log.debug("Linking actor [%s] to supervisor [%s]", newServer, this)
state.addServerContainer(serverContainer)
newServer
}
@ -215,7 +215,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang
nrOfRetries += 1
if (timeRangeHasExpired) {
if (hasReachedMaximumNrOfRetries) {
log.info("Maximum of restarts [{}] for server [{}] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer)
log.info("Maximum of restarts [%s] for server [%s] has been reached - the supervisor including all its servers will now be shut down.", maxNrOfRetries, failedServer)
supervisor ! Stop // execution stops here
} else {
nrOfRetries = 0
@ -241,17 +241,17 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang
scope match {
case Permanent =>
log.debug("Restarting server [{}] configured as PERMANENT.", serverContainer.id)
log.debug("Restarting server [%s] configured as PERMANENT.", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
case Temporary =>
if (reason == 'normal) {
log.debug("Restarting server [{}] configured as TEMPORARY (since exited naturally).", serverContainer.id)
log.debug("Restarting server [%s] configured as TEMPORARY (since exited naturally).", serverContainer.id)
serverContainer.reconfigure(reason, supervisor.spawnLink(serverContainer), state.supervisor)
} else log.info("Server [{}] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id)
} else log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", serverContainer.id)
case Transient =>
log.info("Server [{}] configured as TRANSIENT will not be restarted.", serverContainer.id)
log.info("Server [%s] configured as TRANSIENT will not be restarted.", serverContainer.id)
}
}
}
@ -287,7 +287,7 @@ abstract class FaultHandlingStrategy(val maxNrOfRetries: Int, val withinTimeRang
class AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason)
log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ALL_FOR_ONE.", failedServer, reason)
for (serverContainer <- state.serverContainers) restart(serverContainer, reason, state)
state.supervisors.foreach(_ ! Exit(failedServer, reason))
}
@ -302,7 +302,7 @@ extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
extends FaultHandlingStrategy(maxNrOfRetries, withinTimeRange) {
override def doHandleFailure(state: SupervisorState, failedServer: AbstractActor, reason: AnyRef) = {
log.error("Server [{}] has failed due to [{}] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason)
log.error("Server [%s] has failed due to [%s] - scheduling restart - scheme: ONE_FOR_ONE.", failedServer, reason)
var serverContainer: Option[GenericServerContainer] = None
state.serverContainers.foreach {
container => if (container.getServer == failedServer) serverContainer = Some(container)

View file

@ -21,6 +21,7 @@ object TransactionStatus {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionIdFactory {
// FIXME: will not work in distributed env
private val currentId = new AtomicLong(0L)
def newId = currentId.getAndIncrement
}
@ -31,8 +32,10 @@ object TransactionIdFactory {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Transaction extends Logging {
val stateful= classOf[se.scalablesolutions.akka.annotation.stateful]
val id = TransactionIdFactory.newId
log.debug("Creating a new transaction [%s]", id)
private[this] var parent: Option[Transaction] = None
private[this] var oldActorVersions = new HashMap[GenericServerContainer, GenericServer]
private[this] var precommitted: List[GenericServerContainer] = Nil
@ -43,45 +46,52 @@ class Transaction extends Logging {
if (status == TransactionStatus.Completed) throw new IllegalStateException("Can't begin COMPLETED transaction")
if (status == TransactionStatus.New) log.debug("Actor [%s] is starting NEW transaction", server)
else log.debug("Actor [%s] is participating in transaction", server)
val oldVersion = server.cloneServerAndReturnOldVersion
oldActorVersions.put(server, oldVersion)
if (server.getServer.getClass.isAnnotationPresent(stateful)) {
val oldVersion = server.cloneServerAndReturnOldVersion
oldActorVersions.put(server, oldVersion)
}
status = TransactionStatus.Active
}
def precommit(server: GenericServerContainer) = synchronized {
ensureIsActive
log.debug("Pre-committing transaction for actor [%s]", server)
precommitted ::= server
if (status == TransactionStatus.Active) {
log.debug("Pre-committing transaction for actor [%s]", server)
precommitted ::= server
}
}
def commit(server: GenericServerContainer) = synchronized {
ensureIsActive
log.debug("Committing transaction for actor [%s]", server)
val haveAllPreCommitted =
if (oldActorVersions.size == precommitted.size) {{
for (server <- oldActorVersions.keys) yield {
if (precommitted.exists(_.id == server.id)) true
else false
}}.exists(_ == false)
} else false
if (haveAllPreCommitted) status = TransactionStatus.Completed
else rollback(server)
if (status == TransactionStatus.Active) {
log.debug("Committing transaction for actor [%s]", server)
val haveAllPreCommitted =
if (oldActorVersions.size == precommitted.size) {{
for (server <- oldActorVersions.keys) yield {
if (precommitted.exists(_.id == server.id)) true
else false
}}.exists(_ == false)
} else false
if (haveAllPreCommitted) status = TransactionStatus.Completed
else rollback(server)
}
}
def rollback(server: GenericServerContainer) = synchronized {
ensureIsActive
ensureIsActiveOrAborted
log.debug("Actor [%s] has initiated transaction rollback, rolling back [%s]" , server, oldActorVersions.keys)
oldActorVersions.foreach(entry => {
val (server, backup) = entry
server.swapServer(backup)
val (server, backup) = entry
server.swapServer(backup)
})
status = TransactionStatus.Aborted
}
private def ensureIsActive = if (status == TransactionStatus.Active)
private def ensureIsActive = if (status != TransactionStatus.Active)
throw new IllegalStateException("Expected ACTIVE transaction - current status [" + status + "]")
private def ensureIsActiveOrAborted =
if (!(status == TransactionStatus.Active || status == TransactionStatus.Aborted))
throw new IllegalStateException("Expected ACTIVE or ABORTED transaction - current status [" + status + "]")
override def equals(that: Any): Boolean =
that != null &&
that.isInstanceOf[Transaction] &&

View file

@ -7,59 +7,103 @@ package se.scalablesolutions.akka.kernel
import org.specs.runner.JUnit4
import org.specs.Specification
import se.scalablesolutions.akka.annotation.oneway
import se.scalablesolutions.akka.annotation.{oneway, transactional, stateful}
trait Foo {
def foo(msg: String): String
@transactional def fooInTx(msg: String): String
@oneway def bar(msg: String)
def longRunning
def throwsException
}
class FooImpl extends Foo {
val bar: Bar = new BarImpl
def foo(msg: String): String = {
activeObjectSpec.messageLog += msg
"return_foo "
}
def fooInTx(msg: String): String = {
activeObjectSpec.messageLog += msg
"return_foo "
}
def bar(msg: String) = bar.bar(msg)
def longRunning = Thread.sleep(10000)
def throwsException = error("expected")
}
trait Bar {
@oneway def bar(msg: String)
}
class BarImpl extends Bar {
def bar(msg: String) = {
Thread.sleep(100)
activeObjectSpec.messageLog += msg
}
}
trait Stateful {
@transactional def success(msg: String)
@transactional def failure(msg: String, failer: Failer)
def state: String
}
@stateful
class StatefulImpl extends Stateful {
var state: String = "nil"
def success(msg: String) = state = msg
def failure(msg: String, failer: Failer) = {
state = msg
failer.fail
}
}
trait Failer {
def fail
}
class FailerImpl extends Failer {
def fail = throw new RuntimeException("expected")
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class activeObjectSpecTest extends JUnit4(activeObjectSpec) // for JUnit4 and Maven
object activeObjectSpec extends Specification {
private var messageLog = ""
var messageLog = ""
trait Foo {
def foo(msg: String): String
@oneway def bar(msg: String)
def longRunning
def throwsException
}
"make sure default supervisor works correctly" in {
val foo = ActiveObject.newInstance[Foo](classOf[Foo], new FooImpl, 1000)
class FooImpl extends Foo {
val bar: Bar = new BarImpl
def foo(msg: String): String = {
messageLog += msg
"return_foo "
}
def bar(msg: String) = bar.bar(msg)
def longRunning = Thread.sleep(10000)
def throwsException = error("expected")
}
val result = foo.foo("foo ")
messageLog += result
trait Bar {
@oneway def bar(msg: String)
}
foo.bar("bar ")
messageLog += "before_bar "
class BarImpl extends Bar {
def bar(msg: String) = {
Thread.sleep(100)
messageLog += msg
}
}
Thread.sleep(500)
messageLog must equalIgnoreCase("foo return_foo before_bar bar ")
}
// "make sure default supervisor works correctly" in {
// val foo = ActiveObject.newInstance[Foo](classOf[Foo], classOf[FooImpl], 1000)
//
// val result = foo.foo("foo ")
// messageLog += result
//
// foo.bar("bar ")
// messageLog += "before_bar "
//
// Thread.sleep(500)
// messageLog must equalIgnoreCase("foo return_foo before_bar bar ")
// }
"stateful server should not rollback state in case of success" in {
val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
stateful.success("new state")
stateful.state must be_==("new state")
}
"stateful server should rollback state in case of failure" in {
val stateful = ActiveObject.newInstance[Stateful](classOf[Stateful], new StatefulImpl, 1000)
val failer = ActiveObject.newInstance[Failer](classOf[Failer], new FailerImpl, 1000)
stateful.failure("new state", failer)
stateful.state must be_==("nil")
}
}
// @Test { val groups=Array("unit") }