diff --git a/.gitignore b/.gitignore index 287430d4e5..51b42bf7be 100755 --- a/.gitignore +++ b/.gitignore @@ -9,6 +9,7 @@ project/boot/* lib_managed etags TAGS +akka.tmproj reports dist build diff --git a/akka-core/src/main/scala/actor/ActorRef.scala b/akka-core/src/main/scala/actor/ActorRef.scala index 0d70643dc8..9db90a6e78 100644 --- a/akka-core/src/main/scala/actor/ActorRef.scala +++ b/akka-core/src/main/scala/actor/ActorRef.scala @@ -773,7 +773,7 @@ sealed class LocalActorRef private[akka]( address.getHostName, address.getPort, uuid)) RemoteNode.unregister(this) nullOutActorRefReferencesFor(actorInstance.get) - } else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") + } //else if (isBeingRestarted) throw new ActorKilledException("Actor [" + toString + "] is being restarted.") } /** @@ -961,7 +961,7 @@ sealed class LocalActorRef private[akka]( * Callback for the dispatcher. This is the ingle entry point to the user Actor implementation. */ protected[akka] def invoke(messageHandle: MessageInvocation): Unit = guard.withGuard { - if (isShutdown) Actor.log.warning("Actor [%s] is shut down, ignoring message [%s]", toString, messageHandle) + if (isShutdown) Actor.log.warning("Actor [%s] is shut down,\n\tignoring message [%s]", toString, messageHandle) else { currentMessage = Option(messageHandle) try { @@ -986,29 +986,34 @@ sealed class LocalActorRef private[akka]( case Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange)) => dead.restart(reason, maxNrOfRetries, withinTimeRange) - case None => - throw new IllegalActorStateException( - "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) + case None => throw new IllegalActorStateException( + "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + + "\n\tto non-empty list of exception classes - can't proceed " + toString) } } else { if (lifeCycle.isEmpty) lifeCycle = Some(LifeCycle(Permanent)) // when passing on make sure we have a lifecycle - _supervisor.foreach(_ ! Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on + notifySupervisorWithMessage(Exit(this, reason)) // if 'trapExit' is not defined then pass the Exit on } } protected[akka] def restart(reason: Throwable, maxNrOfRetries: Int, withinTimeRange: Int): Unit = { - if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis + if (maxNrOfRetriesCount == 0) restartsWithinTimeRangeTimestamp = System.currentTimeMillis // first time around maxNrOfRetriesCount += 1 - if (maxNrOfRetriesCount > maxNrOfRetries || (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange) { - val message = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) + + val tooManyRestarts = maxNrOfRetriesCount > maxNrOfRetries + val restartingHasExpired = (System.currentTimeMillis - restartsWithinTimeRangeTimestamp) > withinTimeRange + + if (tooManyRestarts || restartingHasExpired) { + val notification = MaximumNumberOfRestartsWithinTimeRangeReached(this, maxNrOfRetries, withinTimeRange, reason) Actor.log.warning( "Maximum number of restarts [%s] within time range [%s] reached." + "\n\tWill *not* restart actor [%s] anymore." + - "\n\tLast exception causing restart was [%s].", + "\n\tLast exception causing restart was" + + "\n\t[%s].", maxNrOfRetries, withinTimeRange, this, reason) _supervisor.foreach { sup => - if (sup.isDefinedAt(message)) sup ! message + // can supervisor handle the notification? + if (sup.isDefinedAt(notification)) notifySupervisorWithMessage(notification) else Actor.log.warning( "No message handler defined for system message [MaximumNumberOfRestartsWithinTimeRangeReached]" + "\n\tCan't send the message to the supervisor [%s].", sup) @@ -1016,7 +1021,6 @@ sealed class LocalActorRef private[akka]( } else { _isBeingRestarted = true val failedActor = actorInstance.get - val lock = guard.lock guard.withGuard { lifeCycle.get match { case LifeCycle(scope, _, _) => { @@ -1116,7 +1120,8 @@ sealed class LocalActorRef private[akka]( clearTransactionSet createNewTransactionSet } else oldTxSet - Actor.log.ifTrace("Joining transaction set [" + currentTxSet + "];\n\tactor " + toString + "\n\twith message [" + message + "]") + Actor.log.ifTrace("Joining transaction set [" + currentTxSet + + "];\n\tactor " + toString + "\n\twith message [" + message + "]") val mtx = ThreadLocalTransaction.getThreadLocalTransaction if ((mtx eq null) || mtx.getStatus.isDead) currentTxSet.incParties else currentTxSet.incParties(mtx, 1) @@ -1131,7 +1136,8 @@ sealed class LocalActorRef private[akka]( else { topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx if (isTransactor) { - Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + "\n\twith message " + messageHandle) + Actor.log.ifTrace("Creating a new transaction set (top-level transaction)\n\tfor actor " + toString + + "\n\twith message " + messageHandle) Some(createNewTransactionSet) } else None } @@ -1172,12 +1178,12 @@ sealed class LocalActorRef private[akka]( "All linked actors have died permanently (they were all configured as TEMPORARY)" + "\n\tshutting down and unlinking supervisor actor as well [%s].", temporaryActor.id) - _supervisor.foreach(_ ! UnlinkAndStop(this)) + notifySupervisorWithMessage(UnlinkAndStop(this)) } } - private def handleExceptionInDispatch(e: Throwable, message: Any, topLevelTransaction: Boolean) = { - Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) + private def handleExceptionInDispatch(reason: Throwable, message: Any, topLevelTransaction: Boolean) = { + Actor.log.error(reason, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message) _isBeingRestarted = true // abort transaction set @@ -1187,15 +1193,24 @@ sealed class LocalActorRef private[akka]( txSet.abort } - senderFuture.foreach(_.completeWithException(this, e)) + senderFuture.foreach(_.completeWithException(this, reason)) clearTransaction if (topLevelTransaction) clearTransactionSet - // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client - if (_supervisor.isDefined) _supervisor.get ! Exit(this, e) + notifySupervisorWithMessage(Exit(this, reason)) } + private def notifySupervisorWithMessage(notification: LifeCycleMessage) = { + // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client + _supervisor.foreach { sup => + if (sup.isShutdown) { // if supervisor is shut down, game over for all linked actors +// shutdownLinkedActors +// stop + } else sup ! notification // else notify supervisor + } + } + private def nullOutActorRefReferencesFor(actor: Actor) = { actorSelfFields._1.set(actor, null) actorSelfFields._2.set(actor, null) @@ -1215,7 +1230,8 @@ sealed class LocalActorRef private[akka]( case e: NoSuchFieldException => val parent = clazz.getSuperclass if (parent != null) findActorSelfField(parent) - else throw new IllegalActorStateException(toString + " is not an Actor since it have not mixed in the 'Actor' trait") + else throw new IllegalActorStateException( + toString + " is not an Actor since it have not mixed in the 'Actor' trait") } } diff --git a/akka-core/src/main/scala/actor/Supervisor.scala b/akka-core/src/main/scala/actor/Supervisor.scala index ecbbf9bb9d..ede7d380db 100644 --- a/akka-core/src/main/scala/actor/Supervisor.scala +++ b/akka-core/src/main/scala/actor/Supervisor.scala @@ -161,8 +161,8 @@ sealed class Supervisor private[akka] ( _childActors.put(className, actorRef :: currentActors) actorRef.lifeCycle = Some(lifeCycle) supervisor.link(actorRef) - remoteAddress.foreach(address => - RemoteServer.registerActor(new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)) + remoteAddress.foreach(address => RemoteServer.registerActor( + new InetSocketAddress(address.hostname, address.port), actorRef.uuid, actorRef)) case supervisorConfig @ SupervisorConfig(_, _) => // recursive supervisor configuration val childSupervisor = Supervisor(supervisorConfig) supervisor.link(childSupervisor.supervisor) @@ -180,14 +180,23 @@ final class SupervisorActor private[akka] ( handler: FaultHandlingStrategy, trapExceptions: List[Class[_ <: Throwable]]) extends Actor { import self._ + trapExit = trapExceptions faultHandler = Some(handler) override def shutdown(): Unit = shutdownLinkedActors def receive = { + // FIXME add a way to respond to MaximumNumberOfRestartsWithinTimeRangeReached in declaratively configured Supervisor + case MaximumNumberOfRestartsWithinTimeRangeReached( + victim, maxNrOfRetries, withinTimeRange, lastExceptionCausingRestart) => + Actor.log.warning( + "Declaratively configured supervisor received a [MaximumNumberOfRestartsWithinTimeRangeReached] notification," + + "\n\tbut there is currently no way of handling it in a declaratively configured supervisor." + + "\n\tIf you want to be able to handle this error condition then you need to create the supervision tree programatically." + + "\n\tThis will be supported in the future.") case unknown => throw new SupervisorException( - "SupervisorActor can not respond to messages. Unknown message [" + unknown + "]") + "SupervisorActor can not respond to messages.\n\tUnknown message [" + unknown + "]") } } diff --git a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala index 832ae9203a..6bacec73be 100644 --- a/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/AbstractReactorBasedEventDrivenDispatcher.scala @@ -29,6 +29,7 @@ abstract class AbstractReactorBasedEventDrivenDispatcher(val name: String) exten } def shutdown = if (active) { + log.debug("Shutting down %s", toString) active = false selectorThread.interrupt doShutdown diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala index 48b42847a0..1d34083e0a 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenDispatcher.scala @@ -64,7 +64,7 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat @volatile private var active: Boolean = false - val name: String = "event-driven:executor:dispatcher:" + _name + val name = "akka:event-driven:dispatcher:" + _name init def dispatch(invocation: MessageInvocation) = dispatch(invocation.receiver) @@ -92,8 +92,8 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat } while ((lockAcquiredOnce && !finishedBeforeMailboxEmpty && !mailbox.isEmpty)) } }) - } else throw new IllegalActorStateException("Can't submit invocations to dispatcher since it's not started") - + } else log.warning( + "%s is shut down,\n\tignoring the rest of the messages in the mailbox of\n\t%s", toString, receiver) /** * Process the messages in the mailbox of the given actor. @@ -107,25 +107,22 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat messageInvocation.invoke processedMessages += 1 // check if we simply continue with other messages, or reached the throughput limit - if (throughput <= 0 || processedMessages < throughput) - messageInvocation = receiver.mailbox.poll + if (throughput <= 0 || processedMessages < throughput) messageInvocation = receiver.mailbox.poll else { - return !receiver.mailbox.isEmpty messageInvocation = null + return !receiver.mailbox.isEmpty } } - - return false + false } def start = if (!active) { - log.debug("Starting ExecutorBasedEventDrivenDispatcher [%s]", name) - log.debug("Throughput for %s = %d", name, throughput) + log.debug("Starting up %s\n\twith throughput [%d]", toString, throughput) active = true } def shutdown = if (active) { - log.debug("Shutting down ExecutorBasedEventDrivenDispatcher [%s]", name) + log.debug("Shutting down %s", toString) executor.shutdownNow active = false references.clear @@ -135,6 +132,9 @@ class ExecutorBasedEventDrivenDispatcher(_name: String, throughput: Int = Dispat def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") + + override def toString = "ExecutorBasedEventDrivenDispatcher[" + name + "]" + // FIXME: should we have an unbounded queue and not bounded as default ???? private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool } diff --git a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala index 94ef0a2e67..76138dce35 100644 --- a/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ExecutorBasedEventDrivenWorkStealingDispatcher.scala @@ -41,8 +41,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess /** The index in the pooled actors list which was last used to steal work */ @volatile private var lastThiefIndex = 0 - // TODO: is there a naming convention for this name? - val name: String = "event-driven-work-stealing:executor:dispatcher:" + _name + val name = "akka:event-driven-work-stealing:dispatcher:" + _name init def dispatch(invocation: MessageInvocation) = if (active) { @@ -129,8 +128,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def tryDonateAndProcessMessages(receiver: ActorRef, thief: ActorRef) = { if (thief.dispatcherLock.tryLock) { try { - while(donateMessage(receiver, thief)) - processMailbox(thief) + while(donateMessage(receiver, thief)) processMailbox(thief) } finally { thief.dispatcherLock.unlock } @@ -156,7 +154,7 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess } def shutdown = if (active) { - log.debug("Shutting down ExecutorBasedEventDrivenWorkStealingDispatcher [%s]", name) + log.debug("Shutting down %s", toString) executor.shutdownNow active = false references.clear @@ -165,6 +163,8 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") + override def toString = "ExecutorBasedEventDrivenWorkStealingDispatcher[" + name + "]" + private[akka] def init = withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool override def register(actorRef: ActorRef) = { @@ -182,15 +182,12 @@ class ExecutorBasedEventDrivenWorkStealingDispatcher(_name: String) extends Mess private def verifyActorsAreOfSameType(actorOfId: ActorRef) = { actorType match { - case None => { - actorType = Some(actorOfId.actor.getClass) - } - case Some(aType) => { + case None => actorType = Some(actorOfId.actor.getClass) + case Some(aType) => if (aType != actorOfId.actor.getClass) - throw new IllegalActorStateException( - String.format("Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", - actorOfId.actor, aType)) - } + throw new IllegalActorStateException(String.format( + "Can't register actor %s in a work stealing dispatcher which already knows actors of type %s", + actorOfId.actor, aType)) } } } diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala index e8fbe9a221..8a951a4e72 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedSingleThreadEventDrivenDispatcher.scala @@ -12,11 +12,14 @@ package se.scalablesolutions.akka.dispatch import java.util.{LinkedList, List} -class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends AbstractReactorBasedEventDrivenDispatcher(name) { +class ReactorBasedSingleThreadEventDrivenDispatcher(_name: String) + extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:single-thread:dispatcher:" + _name) { + def start = if (!active) { + log.debug("Starting up %s", toString) active = true val messageDemultiplexer = new Demultiplexer(queue) - selectorThread = new Thread("event-driven:reactor:single-thread:dispatcher:" + name) { + selectorThread = new Thread(name) { override def run = { while (active) { try { @@ -40,6 +43,8 @@ class ReactorBasedSingleThreadEventDrivenDispatcher(name: String) extends Abstra def usesActorMailbox = false + override def toString = "ReactorBasedSingleThreadEventDrivenDispatcher[" + name + "]" + class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { private val selectedQueue: List[MessageInvocation] = new LinkedList[MessageInvocation] diff --git a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala index 3e44cf9f6a..370426b2fd 100644 --- a/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ReactorBasedThreadPoolEventDrivenDispatcher.scala @@ -64,7 +64,7 @@ import se.scalablesolutions.akka.actor.IllegalActorStateException * @author Jonas Bonér */ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) - extends AbstractReactorBasedEventDrivenDispatcher("event-driven:reactor:thread-pool:dispatcher:" + _name) + extends AbstractReactorBasedEventDrivenDispatcher("akka:event-driven:reactor:dispatcher:" + _name) with ThreadPoolBuilder { private var fair = true @@ -75,17 +75,18 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity.buildThreadPool def start = if (!active) { + log.debug("Starting up %s", toString) active = true /** - * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. + * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa + * [http://code.google.com/p/actorom/]. */ selectorThread = new Thread(name) { override def run = { while (active) { try { try { - // guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] messageDemultiplexer.select } catch { case e: InterruptedException => active = false } process(messageDemultiplexer.acquireSelectedInvocations) @@ -110,7 +111,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) if (invocation eq null) throw new IllegalActorStateException("Message invocation is null [" + invocation + "]") if (!busyActors.contains(invocation.receiver)) { val invoker = messageInvokers.get(invocation.receiver) - if (invoker eq null) throw new IllegalActorStateException("Message invoker for invocation [" + invocation + "] is null") + if (invoker eq null) throw new IllegalActorStateException( + "Message invoker for invocation [" + invocation + "] is null") resume(invocation.receiver) invocations.remove executor.execute(new Runnable() { @@ -142,6 +144,8 @@ class ReactorBasedThreadPoolEventDrivenDispatcher(_name: String) def ensureNotActive(): Unit = if (active) throw new IllegalActorStateException( "Can't build a new thread pool for a dispatcher that is already up and running") + override def toString = "ReactorBasedThreadPoolEventDrivenDispatcher[" + name + "]" + class Demultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer { private val selectedInvocations: List[MessageInvocation] = new LinkedList[MessageInvocation] private val selectedInvocationsLock = new ReentrantLock diff --git a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala index 5c1cb78a52..7355012b1f 100644 --- a/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala +++ b/akka-core/src/main/scala/dispatch/ThreadBasedDispatcher.scala @@ -16,7 +16,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef} */ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatcher { private val name = actor.getClass.getName + ":" + actor.uuid - private val threadName = "thread-based:dispatcher:" + name + private val threadName = "akka:thread-based:dispatcher:" + name private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -24,6 +24,7 @@ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatch def dispatch(invocation: MessageInvocation) = queue.append(invocation) def start = if (!active) { + log.debug("Starting up %s", toString) active = true selectorThread = new Thread(threadName) { override def run = { @@ -42,11 +43,13 @@ class ThreadBasedDispatcher(private val actor: ActorRef) extends MessageDispatch def usesActorMailbox = false def shutdown = if (active) { - log.debug("Shutting down ThreadBasedDispatcher [%s]", name) + log.debug("Shutting down %s", toString) active = false selectorThread.interrupt references.clear } + + override def toString = "ThreadBasedDispatcher[" + threadName + "]" } class BlockingMessageQueue(name: String) extends MessageQueue { diff --git a/akka-core/src/main/scala/stm/TransactionFactory.scala b/akka-core/src/main/scala/stm/TransactionFactory.scala index d4e61ee04f..24c4712509 100644 --- a/akka-core/src/main/scala/stm/TransactionFactory.scala +++ b/akka-core/src/main/scala/stm/TransactionFactory.scala @@ -22,7 +22,7 @@ object TransactionConfig { val FAMILY_NAME = "DefaultTransaction" val READONLY = null.asInstanceOf[JBoolean] val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000) - val TIMEOUT = config.getLong("akka.stm.timeout", Long.MaxValue) + val TIMEOUT = config.getLong("akka.stm.timeout", 10) val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds") val TRACK_READS = null.asInstanceOf[JBoolean] val WRITE_SKEW = config.getBool("akka.stm.write-skew", true) @@ -125,8 +125,9 @@ object TransactionFactory { quickRelease: Boolean = TransactionConfig.QUICK_RELEASE, traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL, hooks: Boolean = TransactionConfig.HOOKS) = { - val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew, - explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) + val config = new TransactionConfig( + familyName, readonly, maxRetries, timeout, trackReads, writeSkew, + explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks) new TransactionFactory(config) } } @@ -152,8 +153,9 @@ object TransactionFactory { * * @see TransactionConfig for configuration options. */ -class TransactionFactory(val config: TransactionConfig = DefaultTransactionConfig, defaultName: String = TransactionConfig.FAMILY_NAME) { - self => +class TransactionFactory( + val config: TransactionConfig = DefaultTransactionConfig, + defaultName: String = TransactionConfig.FAMILY_NAME) { self => // use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName diff --git a/akka-core/src/main/scala/stm/TransactionManagement.scala b/akka-core/src/main/scala/stm/TransactionManagement.scala index 2eada538c7..25e58f3fb6 100644 --- a/akka-core/src/main/scala/stm/TransactionManagement.scala +++ b/akka-core/src/main/scala/stm/TransactionManagement.scala @@ -156,8 +156,13 @@ class GlobalStm extends TransactionManagement with Logging { val result = body val txSet = getTransactionSetInScope log.ifTrace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]") + try { + txSet.tryJoinCommit( + mtx, + TransactionConfig.DefaultTimeout.length, + TransactionConfig.DefaultTimeout.unit) // Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake - try { txSet.tryJoinCommit(mtx, TransactionConfig.TIMEOUT, TimeUnit.MILLISECONDS) } catch { case e: IllegalStateException => {} } + } catch { case e: IllegalStateException => {} } result } }) diff --git a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java index 825e7ca489..515f4fafee 100644 --- a/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java +++ b/akka-core/src/test/java/se/scalablesolutions/akka/actor/TransactionalActiveObject.java @@ -21,7 +21,6 @@ public class TransactionalActiveObject { refState = new Ref(); isInitialized = true; } - System.out.println("==========> init"); } public String getMapState(String key) { @@ -38,7 +37,6 @@ public class TransactionalActiveObject { public void setMapState(String key, String msg) { mapState.put(key, msg); - System.out.println("==========> setMapState"); } public void setVectorState(String msg) { @@ -74,7 +72,6 @@ public class TransactionalActiveObject { mapState.put(key, msg); vectorState.add(msg); refState.swap(msg); - System.out.println("==========> failure"); nested.failure(key, msg, failer); return msg; } diff --git a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/DeclarativelySupervisedNestedTransactionalActiveObjectSpec.scala similarity index 94% rename from akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala rename to akka-core/src/test/scala/DeclarativelySupervisedNestedTransactionalActiveObjectSpec.scala index 6ad73d5438..ea244bf966 100644 --- a/akka-core/src/test/scala/NestedTransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/DeclarativelySupervisedNestedTransactionalActiveObjectSpec.scala @@ -18,7 +18,7 @@ import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ @RunWith(classOf[JUnitRunner]) -class NestedTransactionalActiveObjectSpec extends +class DeclarativelySupervisedNestedTransactionalActiveObjectSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { @@ -32,14 +32,14 @@ class NestedTransactionalActiveObjectSpec extends new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), List( new Component(classOf[TransactionalActiveObject], - new LifeCycle(new Permanent), - 10000), + new LifeCycle(new Permanent), + 10000), new Component(classOf[NestedTransactionalActiveObject], - new LifeCycle(new Permanent), - 10000), + new LifeCycle(new Permanent), + 10000), new Component(classOf[ActiveObjectFailer], - new LifeCycle(new Permanent), - 10000) + new LifeCycle(new Permanent), + 10000) ).toArray).supervise } @@ -47,8 +47,8 @@ class NestedTransactionalActiveObjectSpec extends conf.stop } - describe("Transactional nested in-memory Active Object") { -/* + describe("Declaratively nested supervised transactional in-memory Active Object") { + it("map should not rollback state for stateful server in case of success") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -156,6 +156,5 @@ class NestedTransactionalActiveObjectSpec extends Thread.sleep(100) nested.getRefState should equal("init") } - */ } } \ No newline at end of file diff --git a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala b/akka-core/src/test/scala/DeclarativelySupervisedTransactionalActiveObjectSpec.scala similarity index 86% rename from akka-core/src/test/scala/TransactionalActiveObjectSpec.scala rename to akka-core/src/test/scala/DeclarativelySupervisedTransactionalActiveObjectSpec.scala index d1c59b1df4..95d7b8e5df 100644 --- a/akka-core/src/test/scala/TransactionalActiveObjectSpec.scala +++ b/akka-core/src/test/scala/DeclarativelySupervisedTransactionalActiveObjectSpec.scala @@ -18,7 +18,7 @@ import se.scalablesolutions.akka.config.JavaConfig._ import se.scalablesolutions.akka.actor._ @RunWith(classOf[JUnitRunner]) -class TransactionalActiveObjectSpec extends +class DeclarativelySupervisedTransactionalActiveObjectSpec extends Spec with ShouldMatchers with BeforeAndAfterAll { @@ -29,24 +29,25 @@ class TransactionalActiveObjectSpec extends override def beforeAll { Config.config conf.configure( - new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), - List( - new Component(classOf[TransactionalActiveObject], - new LifeCycle(new Permanent), - //new RestartCallbacks("preRestart", "postRestart")), - 10000), - new Component(classOf[ActiveObjectFailer], - new LifeCycle(new Permanent), - 10000)).toArray - ).supervise + new RestartStrategy(new AllForOne, 3, 5000, List(classOf[Exception]).toArray), + List( + new Component( + classOf[TransactionalActiveObject], + new LifeCycle(new Permanent), + 10000), + new Component( + classOf[ActiveObjectFailer], + new LifeCycle(new Permanent), + 10000) + ).toArray).supervise } override def afterAll { conf.stop } - describe("Transactional in-memory Active Object ") { - /* + describe("Declaratively supervised transactional in-memory Active Object ") { + it("map should not rollback state for stateful server in case of success") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -58,9 +59,7 @@ class TransactionalActiveObjectSpec extends it("map should rollback state for stateful server in case of failure") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init - Thread.sleep(500) stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") - Thread.sleep(500) val failer = conf.getInstance(classOf[ActiveObjectFailer]) try { stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) @@ -69,6 +68,14 @@ class TransactionalActiveObjectSpec extends stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure") should equal("init") } + it("vector should not rollback state for stateful server in case of success") { + val stateful = conf.getInstance(classOf[TransactionalActiveObject]) + stateful.init + stateful.setVectorState("init") // set init state + stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") + stateful.getVectorState should equal("new state") + } + it("vector should rollback state for stateful server in case of failure") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init @@ -81,12 +88,12 @@ class TransactionalActiveObjectSpec extends stateful.getVectorState should equal("init") } - it("vector should not rollback state for stateful server in case of success") { + it("ref should not rollback state for stateful server in case of success") { val stateful = conf.getInstance(classOf[TransactionalActiveObject]) stateful.init - stateful.setVectorState("init") // set init state + stateful.setRefState("init") // set init state stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") - stateful.getVectorState should equal("new state") + stateful.getRefState should equal("new state") } it("ref should rollback state for stateful server in case of failure") { @@ -100,14 +107,5 @@ class TransactionalActiveObjectSpec extends } catch { case e => {} } stateful.getRefState should equal("init") } - - it("ref should not rollback state for stateful server in case of success") { - val stateful = conf.getInstance(classOf[TransactionalActiveObject]) - stateful.init - stateful.setRefState("init") // set init state - stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") - stateful.getRefState should equal("new state") - } - */ } } diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 326dd64636..6d919ed3bd 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -36,15 +36,19 @@ akka { } stm { - fair = on # should global transactions be fair or non-fair (non fair yield better performance) - jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will - # begin (or join), commit or rollback the JTA transaction. Default is 'off'. + fair = on # should global transactions be fair or non-fair (non fair yield better performance) + jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will + # begin (or join), commit or rollback the JTA transaction. Default is 'off'. + timeout = 5 # default timeout for blocking transactions and transaction set (in unit defined by + # the time-unit property) + # FIXME: use 'time-unit' for all timeouts + time-unit = "seconds" # default timeout time unit } jta { - provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) - # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', - # e.g. you need the akka-jta JARs on classpath). + provider = "from-jndi" # Options: "from-jndi" (means that Akka will try to detect a TransactionManager in the JNDI) + # "atomikos" (means that Akka will use the Atomikos based JTA impl in 'akka-jta', + # e.g. you need the akka-jta JARs on classpath). timeout = 60000 } @@ -52,9 +56,9 @@ akka { service = on hostname = "localhost" port = 9998 - filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use + filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use resource_packages = ["sample.rest.scala","sample.rest.java","sample.security"] # List with all resource packages for your Jersey services - authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) + authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now) #IF you are using a KerberosAuthenticationActor # kerberos {