Fixed bug in using STM together with Active Objects

This commit is contained in:
Jonas Bonér 2010-07-18 07:13:43 +02:00
parent 4dabf49b52
commit 2cdfdb2e26
15 changed files with 152 additions and 111 deletions

1
.gitignore vendored
View file

@ -9,6 +9,7 @@ project/boot/*
lib_managed lib_managed
etags etags
TAGS TAGS
akka.tmproj
reports reports
dist dist
build build

View file

@ -773,7 +773,7 @@ sealed class LocalActorRef private[akka](
address.getHostName, address.getPort, uuid)) address.getHostName, address.getPort, uuid))
RemoteNode.unregister(this) RemoteNode.unregister(this)
nullOutActorRefReferencesFor(actorInstance.get) nullOutActorRefReferencesFor(actorInstance.get)
} else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.")
} }
/** /**
@ -961,7 +961,7 @@ sealed class LocalActorRef private[akka](
* Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation.
*/ */
protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard {
if (isShutdown) Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle)
else { else {
currentMessage = Option(messageHandle) currentMessage = Option(messageHandle)
try { try {
@ -986,29 +986,34 @@ sealed class LocalActorRef private[akka](
case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) => case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) =>
dead.restart(reason, maxNrOfRetries, withinTimeRange) dead.restart(reason, maxNrOfRetries, withinTimeRange)
case None => case None => throw new IllegalActorStateException(
throw new IllegalActorStateException( "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " +
"No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + "\n\tto non-empty list of exception classes - can't proceed " + toString)
"\n\tto non-empty list of exception classes - can't proceed " + toString)
} }
} else { } else {
if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle
_supervisor.foreach(_ ! Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on
} }
} }
protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = { protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = {
if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around
maxNrOfRetriesCount += 1 maxNrOfRetriesCount += 1
if (maxNrOfRetriesCount > maxNrOfRetries || (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange) {
val message = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries
val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange
if (tooManyRestarts || restartingHasExpired) {
val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason)
Actor.log.warning( Actor.log.warning(
"Maximum number of restarts [%s] within time range [%s] reached." + "Maximum number of restarts [%s] within time range [%s] reached." +
"\n\tWill *not* restart actor [%s] anymore." + "\n\tWill *not* restart actor [%s] anymore." +
"\n\tLast exception causing restart was [%s].", "\n\tLast exception causing restart was" +
"\n\t[%s].",
maxNrOfRetries, withinTimeRange, this, reason) maxNrOfRetries, withinTimeRange, this, reason)
_supervisor.foreach { sup => _supervisor.foreach { sup =>
if (sup.isDefinedAt(message)) sup ! message // can supervisor handle the notification?
if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification)
else Actor.log.warning( else Actor.log.warning(
"No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" +
"\n\tCan't send the message to the supervisor [%s].", sup) "\n\tCan't send the message to the supervisor [%s].", sup)
@ -1016,7 +1021,6 @@ sealed class LocalActorRef private[akka](
} else { } else {
_isBeingRestarted = true _isBeingRestarted = true
val failedActor = actorInstance.get val failedActor = actorInstance.get
val lock = guard.lock
guard.withGuard { guard.withGuard {
lifeCycle.get match { lifeCycle.get match {
case LifeCycle(scope, _, _) => { case LifeCycle(scope, _, _) => {
@ -1116,7 +1120,8 @@ sealed class LocalActorRef private[akka](
clearTransactionSet clearTransactionSet
createNewTransactionSet createNewTransactionSet
} else oldTxSet } else oldTxSet
Actor.log.ifTrace("Joining transaction set [" + currentTxSet + "];\n\tactor " + toString + "\n\twith message [" + message + "]") Actor.log.ifTrace("Joining transaction set [" + currentTxSet +
"];\n\tactor " + toString + "\n\twith message [" + message + "]")
val mtx = ThreadLocalTransaction.getThreadLocalTransaction val mtx = ThreadLocalTransaction.getThreadLocalTransaction
if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties
else currentTxSet.incParties(mtx, 1) else currentTxSet.incParties(mtx, 1)
@ -1131,7 +1136,8 @@ sealed class LocalActorRef private[akka](
else { else {
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
if (isTransactor) { if (isTransactor) {
Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + "\n\twith message " + messageHandle) Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString +
"\n\twith message " + messageHandle)
Some(createNewTransactionSet) Some(createNewTransactionSet)
} else None } else None
} }
@ -1172,12 +1178,12 @@ sealed class LocalActorRef private[akka](
"All linked actors have died permanently (they were all configured as TEMPORARY)" + "All linked actors have died permanently (they were all configured as TEMPORARY)" +
"\n\tshutting down and unlinking supervisor actor as well [%s].", "\n\tshutting down and unlinking supervisor actor as well [%s].",
temporaryActor.id) temporaryActor.id)
_supervisor.foreach(_ ! UnlinkAndStop(this)) notifySupervisorWithMessage(UnlinkAndStop(this))
} }
} }
private def handleExceptionInDispatch(e: Throwable, message: Any, topLevelTransaction: Boolean) = { private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = {
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
_isBeingRestarted = true _isBeingRestarted = true
// abort transaction set // abort transaction set
@ -1187,15 +1193,24 @@ sealed class LocalActorRef private[akka](
txSet.abort txSet.abort
} }
senderFuture.foreach(_.completeWithException(this, e)) senderFuture.foreach(_.completeWithException(this, reason))
clearTransaction clearTransaction
if (topLevelTransaction) clearTransactionSet if (topLevelTransaction) clearTransactionSet
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client notifySupervisorWithMessage(Exit(this, reason))
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
} }
private def notifySupervisorWithMessage(notification: LifeCycleMessage) = {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
_supervisor.foreach { sup =>
if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors
// shutdownLinkedActors
// stop
} else sup ! notification // else notify supervisor
}
}
private def nullOutActorRefReferencesFor(actor: Actor) = { private def nullOutActorRefReferencesFor(actor: Actor) = {
actorSelfFields._1.set(actor, null) actorSelfFields._1.set(actor, null)
actorSelfFields._2.set(actor, null) actorSelfFields._2.set(actor, null)
@ -1215,7 +1230,8 @@ sealed class LocalActorRef private[akka](
case e: NoSuchFieldException => case e: NoSuchFieldException =>
val parent = clazz.getSuperclass val parent = clazz.getSuperclass
if (parent != null) findActorSelfField(parent) if (parent != null) findActorSelfField(parent)
else throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") else throw new IllegalActorStateException(
toString + " is not an Actor since it have not mixed in the 'Actor' trait")
} }
} }

View file

@ -161,8 +161,8 @@ sealed class Supervisor private[akka] (
_childActors.put(className, actorRef :: currentActors) _childActors.put(className, actorRef :: currentActors)
actorRef.lifeCycle = Some(lifeCycle) actorRef.lifeCycle = Some(lifeCycle)
supervisor.link(actorRef) supervisor.link(actorRef)
remoteAddress.foreach(address => remoteAddress.foreach(address => RemoteServer.registerActor(
RemoteServer.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)) new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef))
case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration
val childSupervisor = Supervisor(supervisorConfig) val childSupervisor = Supervisor(supervisorConfig)
supervisor.link(childSupervisor.supervisor) supervisor.link(childSupervisor.supervisor)
@ -180,14 +180,23 @@ final class SupervisorActor private[akka] (
handler: FaultHandlingStrategy, handler: FaultHandlingStrategy,
trapExceptions: List[Class[_ <: Throwable]]) extends Actor { trapExceptions: List[Class[_ <: Throwable]]) extends Actor {
import self._ import self._
trapExit = trapExceptions trapExit = trapExceptions
faultHandler = Some(handler) faultHandler = Some(handler)
override def shutdown(): Unit = shutdownLinkedActors override def shutdown(): Unit = shutdownLinkedActors
def receive = { def receive = {
// FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor
case MaximumNumberOfRestartsWithinTimeRangeReached(
victim, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) =>
Actor.log.warning(
"Declaratively configured supervisor received a [MaximumNumberOfRestartsWithinTimeRangeReached] notification," +
"\n\tbut there is currently no way of handling it in a declaratively configured supervisor." +
"\n\tIf you want to be able to handle this error condition then you need to create the supervision tree programatically." +
"\n\tThis will be supported in the future.")
case unknown => throw new SupervisorException( case unknown => throw new SupervisorException(
"SupervisorActor can not respond to messages. Unknown message [" + unknown + "]") "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]")
} }
} }

View file

@ -29,6 +29,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten
} }
def shutdown = if (active) { def shutdown = if (active) {
log.debug("Shutting down %s", toString)
active = false active = false
selectorThread.interrupt selectorThread.interrupt
doShutdown doShutdown

View file

@ -64,7 +64,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
@volatile private var active: Boolean = false @volatile private var active: Boolean = false
val name: String = "event-driven:executor:dispatcher:" + _name val name = "akka:event-driven:dispatcher:" + _name
init init
def dispatch(invocation: MessageInvocation) = dispatch(invocation.receiver) def dispatch(invocation: MessageInvocation) = dispatch(invocation.receiver)
@ -92,8 +92,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
} while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty))
} }
}) })
} else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") } else log.warning(
"%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver)
/** /**
* Process the messages in the mailbox of the given actor. * Process the messages in the mailbox of the given actor.
@ -107,25 +107,22 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
messageInvocation.invoke messageInvocation.invoke
processedMessages += 1 processedMessages += 1
// check if we simply continue with other messages, or reached the throughput limit // check if we simply continue with other messages, or reached the throughput limit
if (throughput <= 0 || processedMessages < throughput) if (throughput <= 0 || processedMessages < throughput) messageInvocation = receiver.mailbox.poll
messageInvocation = receiver.mailbox.poll
else { else {
return !receiver.mailbox.isEmpty
messageInvocation = null messageInvocation = null
return !receiver.mailbox.isEmpty
} }
} }
false
return false
} }
def start = if (!active) { def start = if (!active) {
log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name) log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput)
log.debug("Throughput for %s = %d", name, throughput)
active = true active = true
} }
def shutdown = if (active) { def shutdown = if (active) {
log.debug("Shutting down ExecutorBasedEventDrivenDispatcher [%s]", name) log.debug("Shutting down %s", toString)
executor.shutdownNow executor.shutdownNow
active = false active = false
references.clear references.clear
@ -135,6 +132,9 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running") "Can't build a new thread pool for a dispatcher that is already up and running")
override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]"
// FIXME: should we have an unbounded queue and not bounded as default ????
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
} }

View file

@ -41,8 +41,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
/** The index in the pooled actors list which was last used to steal work */ /** The index in the pooled actors list which was last used to steal work */
@volatile private var lastThiefIndex = 0 @volatile private var lastThiefIndex = 0
// TODO: is there a naming convention for this name? val name = "akka:event-driven-work-stealing:dispatcher:" + _name
val name: String = "event-driven-work-stealing:executor:dispatcher:" + _name
init init
def dispatch(invocation: MessageInvocation) = if (active) { def dispatch(invocation: MessageInvocation) = if (active) {
@ -129,8 +128,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = {
if (thief.dispatcherLock.tryLock) { if (thief.dispatcherLock.tryLock) {
try { try {
while(donateMessage(receiver, thief)) while(donateMessage(receiver, thief)) processMailbox(thief)
processMailbox(thief)
} finally { } finally {
thief.dispatcherLock.unlock thief.dispatcherLock.unlock
} }
@ -156,7 +154,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
} }
def shutdown = if (active) { def shutdown = if (active) {
log.debug("Shutting down ExecutorBasedEventDrivenWorkStealingDispatcher [%s]", name) log.debug("Shutting down %s", toString)
executor.shutdownNow executor.shutdownNow
active = false active = false
references.clear references.clear
@ -165,6 +163,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running") "Can't build a new thread pool for a dispatcher that is already up and running")
override def toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]"
private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
override def register(actorRef: ActorRef) = { override def register(actorRef: ActorRef) = {
@ -182,15 +182,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess
private def verifyActorsAreOfSameType(actorOfId: ActorRef) = { private def verifyActorsAreOfSameType(actorOfId: ActorRef) = {
actorType match { actorType match {
case None => { case None => actorType = Some(actorOfId.actor.getClass)
actorType = Some(actorOfId.actor.getClass) case Some(aType) =>
}
case Some(aType) => {
if (aType != actorOfId.actor.getClass) if (aType != actorOfId.actor.getClass)
throw new IllegalActorStateException( throw new IllegalActorStateException(String.format(
String.format("Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s",
actorOfId.actor, aType)) actorOfId.actor, aType))
}
} }
} }
} }

View file

@ -12,11 +12,14 @@ package se.scalablesolutions.akka.dispatch
import java.util.{LinkedList, List} import java.util.{LinkedList, List}
class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends AbstractReactorBasedEventDrivenDispatcher(name) { class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) {
def start = if (!active) { def start = if (!active) {
log.debug("Starting up %s", toString)
active = true active = true
val messageDemultiplexer = new Demultiplexer(queue) val messageDemultiplexer = new Demultiplexer(queue)
selectorThread = new Thread("event-driven:reactor:single-thread:dispatcher:" + name) { selectorThread = new Thread(name) {
override def run = { override def run = {
while (active) { while (active) {
try { try {
@ -40,6 +43,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra
def usesActorMailbox = false def usesActorMailbox = false
override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]"
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation] private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation]

View file

@ -64,7 +64,7 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
extends AbstractReactorBasedEventDrivenDispatcher("event-driven:reactor:thread-pool:dispatcher:" + _name) extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name)
with ThreadPoolBuilder { with ThreadPoolBuilder {
private var fair = true private var fair = true
@ -75,17 +75,18 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool
def start = if (!active) { def start = if (!active) {
log.debug("Starting up %s", toString)
active = true active = true
/** /**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa
* [http://code.google.com/p/actorom/].
*/ */
selectorThread = new Thread(name) { selectorThread = new Thread(name) {
override def run = { override def run = {
while (active) { while (active) {
try { try {
try { try {
// guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select messageDemultiplexer.select
} catch { case e: InterruptedException => active = false } } catch { case e: InterruptedException => active = false }
process(messageDemultiplexer.acquireSelectedInvocations) process(messageDemultiplexer.acquireSelectedInvocations)
@ -110,7 +111,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]") if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]")
if (!busyActors.contains(invocation.receiver)) { if (!busyActors.contains(invocation.receiver)) {
val invoker = messageInvokers.get(invocation.receiver) val invoker = messageInvokers.get(invocation.receiver)
if (invoker eq null) throw new IllegalActorStateException("Message invoker for invocation [" + invocation + "] is null") if (invoker eq null) throw new IllegalActorStateException(
"Message invoker for invocation [" + invocation + "] is null")
resume(invocation.receiver) resume(invocation.receiver)
invocations.remove invocations.remove
executor.execute(new Runnable() { executor.execute(new Runnable() {
@ -142,6 +144,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String)
def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException(
"Can't build a new thread pool for a dispatcher that is already up and running") "Can't build a new thread pool for a dispatcher that is already up and running")
override def toString = "ReactorBasedThreadPoolEventDrivenDispatcher[" + name + "]"
class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation] private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedInvocationsLock = new ReentrantLock private val selectedInvocationsLock = new ReentrantLock

View file

@ -16,7 +16,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
*/ */
class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher { class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher {
private val name = actor.getClass.getName + ":" + actor.uuid private val name = actor.getClass.getName + ":" + actor.uuid
private val threadName = "thread-based:dispatcher:" + name private val threadName = "akka:thread-based:dispatcher:" + name
private val queue = new BlockingMessageQueue(name) private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _ private var selectorThread: Thread = _
@volatile private var active: Boolean = false @volatile private var active: Boolean = false
@ -24,6 +24,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatch
def dispatch(invocation: MessageInvocation) = queue.append(invocation) def dispatch(invocation: MessageInvocation) = queue.append(invocation)
def start = if (!active) { def start = if (!active) {
log.debug("Starting up %s", toString)
active = true active = true
selectorThread = new Thread(threadName) { selectorThread = new Thread(threadName) {
override def run = { override def run = {
@ -42,11 +43,13 @@ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatch
def usesActorMailbox = false def usesActorMailbox = false
def shutdown = if (active) { def shutdown = if (active) {
log.debug("Shutting down ThreadBasedDispatcher [%s]", name) log.debug("Shutting down %s", toString)
active = false active = false
selectorThread.interrupt selectorThread.interrupt
references.clear references.clear
} }
override def toString = "ThreadBasedDispatcher[" + threadName + "]"
} }
class BlockingMessageQueue(name: String) extends MessageQueue { class BlockingMessageQueue(name: String) extends MessageQueue {

View file

@ -22,7 +22,7 @@ object TransactionConfig {
val FAMILY_NAME = "DefaultTransaction" val FAMILY_NAME = "DefaultTransaction"
val READONLY = null.asInstanceOf[JBoolean] val READONLY = null.asInstanceOf[JBoolean]
val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000) val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000)
val TIMEOUT = config.getLong("akka.stm.timeout", Long.MaxValue) val TIMEOUT = config.getLong("akka.stm.timeout", 10)
val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds") val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds")
val TRACK_READS = null.asInstanceOf[JBoolean] val TRACK_READS = null.asInstanceOf[JBoolean]
val WRITE_SKEW = config.getBool("akka.stm.write-skew", true) val WRITE_SKEW = config.getBool("akka.stm.write-skew", true)
@ -125,8 +125,9 @@ object TransactionFactory {
quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL, traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
hooks: Boolean = TransactionConfig.HOOKS) = { hooks: Boolean = TransactionConfig.HOOKS) = {
val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, val config = new TransactionConfig(
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
new TransactionFactory(config) new TransactionFactory(config)
} }
} }
@ -152,8 +153,9 @@ object TransactionFactory {
* *
* @see TransactionConfig for configuration options. * @see TransactionConfig for configuration options.
*/ */
class TransactionFactory(val config: TransactionConfig = DefaultTransactionConfig, defaultName: String = TransactionConfig.FAMILY_NAME) { class TransactionFactory(
self => val config: TransactionConfig = DefaultTransactionConfig,
defaultName: String = TransactionConfig.FAMILY_NAME) { self =>
// use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default // use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default
val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName

View file

@ -156,8 +156,13 @@ class GlobalStm extends TransactionManagement with Logging {
val result = body val result = body
val txSet = getTransactionSetInScope val txSet = getTransactionSetInScope
log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]") log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]")
try {
txSet.tryJoinCommit(
mtx,
TransactionConfig.DefaultTimeout.length,
TransactionConfig.DefaultTimeout.unit)
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} } } catch { case e: IllegalStateException => {} }
result result
} }
}) })

View file

@ -21,7 +21,6 @@ public class TransactionalActiveObject {
refState = new Ref(); refState = new Ref();
isInitialized = true; isInitialized = true;
} }
System.out.println("==========> init");
} }
public String getMapState(String key) { public String getMapState(String key) {
@ -38,7 +37,6 @@ public class TransactionalActiveObject {
public void setMapState(String key, String msg) { public void setMapState(String key, String msg) {
mapState.put(key, msg); mapState.put(key, msg);
System.out.println("==========> setMapState");
} }
public void setVectorState(String msg) { public void setVectorState(String msg) {
@ -74,7 +72,6 @@ public class TransactionalActiveObject {
mapState.put(key, msg); mapState.put(key, msg);
vectorState.add(msg); vectorState.add(msg);
refState.swap(msg); refState.swap(msg);
System.out.println("==========> failure");
nested.failure(key, msg, failer); nested.failure(key, msg, failer);
return msg; return msg;
} }

View file

@ -18,7 +18,7 @@ import se.scalablesolutions.akka.config.JavaConfig._
import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor._
@RunWith(classOf[JUnitRunner]) @RunWith(classOf[JUnitRunner])
class NestedTransactionalActiveObjectSpec extends class DeclarativelySupervisedNestedTransactionalActiveObjectSpec extends
Spec with Spec with
ShouldMatchers with ShouldMatchers with
BeforeAndAfterAll { BeforeAndAfterAll {
@ -32,14 +32,14 @@ class NestedTransactionalActiveObjectSpec extends
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
List( List(
new Component(classOf[TransactionalActiveObject], new Component(classOf[TransactionalActiveObject],
new LifeCycle(new Permanent), new LifeCycle(new Permanent),
10000), 10000),
new Component(classOf[NestedTransactionalActiveObject], new Component(classOf[NestedTransactionalActiveObject],
new LifeCycle(new Permanent), new LifeCycle(new Permanent),
10000), 10000),
new Component(classOf[ActiveObjectFailer], new Component(classOf[ActiveObjectFailer],
new LifeCycle(new Permanent), new LifeCycle(new Permanent),
10000) 10000)
).toArray).supervise ).toArray).supervise
} }
@ -47,8 +47,8 @@ class NestedTransactionalActiveObjectSpec extends
conf.stop conf.stop
} }
describe("Transactional nested in-memory Active Object") { describe("Declaratively nested supervised transactional in-memory Active Object") {
/*
it("map should not rollback state for stateful server in case of success") { it("map should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject]) val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init stateful.init
@ -156,6 +156,5 @@ class NestedTransactionalActiveObjectSpec extends
Thread.sleep(100) Thread.sleep(100)
nested.getRefState should equal("init") nested.getRefState should equal("init")
} }
*/
} }
} }

View file

@ -18,7 +18,7 @@ import se.scalablesolutions.akka.config.JavaConfig._
import se.scalablesolutions.akka.actor._ import se.scalablesolutions.akka.actor._
@RunWith(classOf[JUnitRunner]) @RunWith(classOf[JUnitRunner])
class TransactionalActiveObjectSpec extends class DeclarativelySupervisedTransactionalActiveObjectSpec extends
Spec with Spec with
ShouldMatchers with ShouldMatchers with
BeforeAndAfterAll { BeforeAndAfterAll {
@ -29,24 +29,25 @@ class TransactionalActiveObjectSpec extends
override def beforeAll { override def beforeAll {
Config.config Config.config
conf.configure( conf.configure(
new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray),
List( List(
new Component(classOf[TransactionalActiveObject], new Component(
new LifeCycle(new Permanent), classOf[TransactionalActiveObject],
//new RestartCallbacks("preRestart", "postRestart")), new LifeCycle(new Permanent),
10000), 10000),
new Component(classOf[ActiveObjectFailer], new Component(
new LifeCycle(new Permanent), classOf[ActiveObjectFailer],
10000)).toArray new LifeCycle(new Permanent),
).supervise 10000)
).toArray).supervise
} }
override def afterAll { override def afterAll {
conf.stop conf.stop
} }
describe("Transactional in-memory Active Object ") { describe("Declaratively supervised transactional in-memory Active Object ") {
/*
it("map should not rollback state for stateful server in case of success") { it("map should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject]) val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init stateful.init
@ -58,9 +59,7 @@ class TransactionalActiveObjectSpec extends
it("map should rollback state for stateful server in case of failure") { it("map should rollback state for stateful server in case of failure") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject]) val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init stateful.init
Thread.sleep(500)
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init")
Thread.sleep(500)
val failer = conf.getInstance(classOf[ActiveObjectFailer]) val failer = conf.getInstance(classOf[ActiveObjectFailer])
try { try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer)
@ -69,6 +68,14 @@ class TransactionalActiveObjectSpec extends
stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init")
} }
it("vector should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init
stateful.setVectorState("init") // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
stateful.getVectorState should equal("new state")
}
it("vector should rollback state for stateful server in case of failure") { it("vector should rollback state for stateful server in case of failure") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject]) val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init stateful.init
@ -81,12 +88,12 @@ class TransactionalActiveObjectSpec extends
stateful.getVectorState should equal("init") stateful.getVectorState should equal("init")
} }
it("vector should not rollback state for stateful server in case of success") { it("ref should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject]) val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init stateful.init
stateful.setVectorState("init") // set init state stateful.setRefState("init") // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
stateful.getVectorState should equal("new state") stateful.getRefState should equal("new state")
} }
it("ref should rollback state for stateful server in case of failure") { it("ref should rollback state for stateful server in case of failure") {
@ -100,14 +107,5 @@ class TransactionalActiveObjectSpec extends
} catch { case e => {} } } catch { case e => {} }
stateful.getRefState should equal("init") stateful.getRefState should equal("init")
} }
it("ref should not rollback state for stateful server in case of success") {
val stateful = conf.getInstance(classOf[TransactionalActiveObject])
stateful.init
stateful.setRefState("init") // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state")
stateful.getRefState should equal("new state")
}
*/
} }
} }

View file

@ -36,15 +36,19 @@ akka {
} }
stm { stm {
fair = on # should global transactions be fair or non-fair (non fair yield better performance) fair = on # should global transactions be fair or non-fair (non fair yield better performance)
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'. # begin (or join), commit or rollback the JTA transaction. Default is 'off'.
timeout = 5 # default timeout for blocking transactions and transaction set (in unit defined by
# the time-unit property)
# FIXME: use 'time-unit' for all timeouts
time-unit = "seconds" # default timeout time unit
} }
jta { jta {
provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI)
# "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta',
# e.g. you need the akka-jta JARs on classpath). # e.g. you need the akka-jta JARs on classpath).
timeout = 60000 timeout = 60000
} }
@ -52,9 +56,9 @@ akka {
service = on service = on
hostname = "localhost" hostname = "localhost"
port = 9998 port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
#IF you are using a KerberosAuthenticationActor #IF you are using a KerberosAuthenticationActor
# kerberos { # kerberos {