diff --git a/README.textile b/README.textile
index c729b0901c..2acfb63e8e 100755
--- a/README.textile
+++ b/README.textile
@@ -1,6 +1,6 @@
h1. Akka Transactors
-h2. RESTful Distributed Persistent Transactional Actors
+h2. Distributed Transactional RESTful Persistent Actors
h3. "http://akkasource.org":http://akkasource.org
diff --git a/akka-actors/pom.xml b/akka-actors/pom.xml
index 260e357b45..787a1131c4 100644
--- a/akka-actors/pom.xml
+++ b/akka-actors/pom.xml
@@ -107,7 +107,7 @@
sjson.jsonsjson
- 0.1
+ 0.2
diff --git a/akka-actors/src/main/scala/actor/ActiveObject.scala b/akka-actors/src/main/scala/actor/ActiveObject.scala
index b0f90c3e31..0cb0710191 100644
--- a/akka-actors/src/main/scala/actor/ActiveObject.scala
+++ b/akka-actors/src/main/scala/actor/ActiveObject.scala
@@ -4,7 +4,6 @@
package se.scalablesolutions.akka.actor
-import java.lang.reflect.{InvocationTargetException, Method}
import java.net.InetSocketAddress
import se.scalablesolutions.akka.dispatch.{MessageDispatcher, FutureResult}
@@ -17,9 +16,7 @@ import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import se.scalablesolutions.akka.serialization.Serializer
-
-sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
-class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
+import java.lang.reflect.{InvocationTargetException, Method}
object Annotations {
import se.scalablesolutions.akka.annotation._
@@ -32,100 +29,7 @@ object Annotations {
}
/**
- * Factory for Java API.
- *
- * @author Jonas Bonér
- */
-class ActiveObjectFactory {
-
- // FIXME How to pass the MessageDispatcher on from active object to child???????
-
- def newInstance[T](target: Class[T], timeout: Long): T =
- ActiveObject.newInstance(target, new Dispatcher(None), None, timeout)
-
- def newInstance[T](target: Class[T], timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
- ActiveObject.newInstance(target, new Dispatcher(restartCallbacks), None, timeout)
-
- def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
- ActiveObject.newInstance(intf, target, new Dispatcher(None), None, timeout)
-
- def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, restartCallbacks: Option[RestartCallbacks]): T =
- ActiveObject.newInstance(intf, target, new Dispatcher(restartCallbacks), None, timeout)
-
- def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
- ActiveObject.newInstance(target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout)
-
- def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
- ActiveObject.newInstance(target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
-
- def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
- ActiveObject.newInstance(intf, target, new Dispatcher(None), Some(new InetSocketAddress(hostname, port)), timeout)
-
- def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T =
- ActiveObject.newInstance(intf, target, new Dispatcher(restartCallbacks), Some(new InetSocketAddress(hostname, port)), timeout)
-
- def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher): T = {
- val actor = new Dispatcher(None)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(target, actor, None, timeout)
- }
-
- def newInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
- val actor = new Dispatcher(restartCallbacks)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(target, actor, None, timeout)
- }
-
- def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher): T = {
- val actor = new Dispatcher(None)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(intf, target, actor, None, timeout)
- }
-
- def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, restartCallbacks: Option[RestartCallbacks]): T = {
- val actor = new Dispatcher(restartCallbacks)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(intf, target, actor, None, timeout)
- }
-
- def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
- val actor = new Dispatcher(None)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
- }
-
- def newRemoteInstance[T](target: Class[T], timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
- val actor = new Dispatcher(restartCallbacks)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
- }
-
- def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int): T = {
- val actor = new Dispatcher(None)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
- }
-
- def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, dispatcher: MessageDispatcher, hostname: String, port: Int, restartCallbacks: Option[RestartCallbacks]): T = {
- val actor = new Dispatcher(restartCallbacks)
- actor.dispatcher = dispatcher
- ActiveObject.newInstance(intf, target, actor, Some(new InetSocketAddress(hostname, port)), timeout)
- }
-
- private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
- ActiveObject.newInstance(target, actor, remoteAddress, timeout)
- }
-
- private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
- ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout)
- }
-
- private[akka] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
- ActiveObject.supervise(restartStrategy, components)
-}
-
-/**
- * Factory for Scala API.
+ * Factory object for Active Objects.
*
* @author Jonas Bonér
*/
@@ -205,7 +109,6 @@ object ActiveObject {
}
private[akka] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
- //if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(target, false, true)
actor.initialize(target, proxy)
actor.timeout = timeout
@@ -215,7 +118,6 @@ object ActiveObject {
}
private[akka] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
- //if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
actor.initialize(target.getClass, target)
actor.timeout = timeout
@@ -235,28 +137,32 @@ object ActiveObject {
}
}
-object AspectInitRegistry {
+private[akka] object AspectInitRegistry {
private val inits = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
+
def initFor(target: AnyRef) = {
val init = inits.get(target)
inits.remove(target)
init
}
+
def register(target: AnyRef, init: AspectInit) = inits.put(target, init)
}
-sealed case class AspectInit(
+private[akka] sealed case class AspectInit(
val target: Class[_],
val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long)
-
+
/**
+ * AspectWerkz Aspect that is turning POJOs into Active Object.
+ * Is deployed on a 'per-instance' basis.
+ *
* @author Jonas Bonér
*/
-@serializable
@Aspect("perInstance")
-sealed class ActiveObjectAspect {
+private[akka] sealed class ActiveObjectAspect {
@volatile var isInitialized = false
var target: Class[_] = _
var actor: Dispatcher = _
@@ -264,36 +170,36 @@ sealed class ActiveObjectAspect {
var timeout: Long = _
@Around("execution(* *.*(..))")
- def invoke(joinpoint: JoinPoint): AnyRef = {
+ def invoke(joinPoint: JoinPoint): AnyRef = {
if (!isInitialized) {
- val init = AspectInitRegistry.initFor(joinpoint.getThis)
+ val init = AspectInitRegistry.initFor(joinPoint.getThis)
target = init.target
actor = init.actor
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
}
- dispatch(joinpoint)
+ dispatch(joinPoint)
}
- private def dispatch(joinpoint: JoinPoint) = {
- if (remoteAddress.isDefined) remoteDispatch(joinpoint)
- else localDispatch(joinpoint)
+ private def dispatch(joinPoint: JoinPoint) = {
+ if (remoteAddress.isDefined) remoteDispatch(joinPoint)
+ else localDispatch(joinPoint)
}
- private def localDispatch(joinpoint: JoinPoint): AnyRef = {
- val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
- if (isOneWay(rtti)) actor ! Invocation(joinpoint, true)
+ private def localDispatch(joinPoint: JoinPoint): AnyRef = {
+ val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
+ if (isOneWay(rtti)) actor ! Invocation(joinPoint, true, true)
else {
- val result = actor !! Invocation(joinpoint, false)
+ val result = actor !! Invocation(joinPoint, false, isVoid(rtti))
if (result.isDefined) result.get
- else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
+ else throw new IllegalStateException("No result defined for invocation [" + joinPoint + "]")
}
}
- private def remoteDispatch(joinpoint: JoinPoint): AnyRef = {
- val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
- val oneWay = isOneWay(rtti)
+ private def remoteDispatch(joinPoint: JoinPoint): AnyRef = {
+ val rtti = joinPoint.getRtti.asInstanceOf[MethodRtti]
+ val oneWay_? = isOneWay(rtti)
val (message: Array[AnyRef], isEscaped) = escapeArguments(rtti.getParameterValues)
val requestBuilder = RemoteRequest.newBuilder
.setId(RemoteRequestIdFactory.nextId)
@@ -301,21 +207,21 @@ sealed class ActiveObjectAspect {
.setTarget(target.getName)
.setTimeout(timeout)
.setIsActor(false)
- .setIsOneWay(oneWay)
+ .setIsOneWay(oneWay_?)
.setIsEscaped(false)
RemoteProtocolBuilder.setMessage(message, requestBuilder)
val id = actor.registerSupervisorAsRemoteActor
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val remoteMessage = requestBuilder.build
val future = RemoteClient.clientFor(remoteAddress.get).send(remoteMessage)
- if (oneWay) null // for void methods
+ if (oneWay_?) null // for void methods
else {
if (future.isDefined) {
future.get.await
val result = getResultOrThrowException(future.get)
if (result.isDefined) result.get
- else throw new IllegalStateException("No result returned from call to [" + joinpoint + "]")
- } else throw new IllegalStateException("No future returned from call to [" + joinpoint + "]")
+ else throw new IllegalStateException("No result returned from call to [" + joinPoint + "]")
+ } else throw new IllegalStateException("No future returned from call to [" + joinPoint + "]")
}
}
@@ -325,9 +231,9 @@ sealed class ActiveObjectAspect {
throw cause
} else future.result.asInstanceOf[Option[T]]
- private def isOneWay(rtti: MethodRtti) =
- rtti.getMethod.getReturnType == java.lang.Void.TYPE ||
- rtti.getMethod.isAnnotationPresent(Annotations.oneway)
+ private def isOneWay(rtti: MethodRtti) = rtti.getMethod.isAnnotationPresent(Annotations.oneway)
+
+ private def isVoid(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
private def escapeArguments(args: Array[AnyRef]): Tuple2[Array[AnyRef], Boolean] = {
var isEscaped = false
@@ -347,24 +253,26 @@ sealed class ActiveObjectAspect {
*
* @author Jonas Bonér
*/
-@serializable private[akka] case class Invocation(val joinpoint: JoinPoint, val isOneWay: Boolean) {
+@serializable private[akka] case class Invocation(joinPoint: JoinPoint, isOneWay: Boolean, isVoid: Boolean) {
override def toString: String = synchronized {
- "Invocation [joinpoint: " + joinpoint.toString + ", isOneWay: " + isOneWay + "]"
+ "Invocation [joinPoint: " + joinPoint.toString + ", isOneWay: " + isOneWay + ", isVoid: " + isVoid + "]"
}
- override def hashCode(): Int = synchronized {
+ override def hashCode: Int = synchronized {
var result = HashCode.SEED
- result = HashCode.hash(result, joinpoint)
+ result = HashCode.hash(result, joinPoint)
result = HashCode.hash(result, isOneWay)
+ result = HashCode.hash(result, isVoid)
result
}
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[Invocation] &&
- that.asInstanceOf[Invocation].joinpoint == joinpoint &&
- that.asInstanceOf[Invocation].isOneWay == isOneWay
+ that.asInstanceOf[Invocation].joinPoint == joinPoint &&
+ that.asInstanceOf[Invocation].isOneWay == isOneWay &&
+ that.asInstanceOf[Invocation].isVoid == isVoid
}
}
@@ -419,12 +327,12 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends
}
override def receive: PartialFunction[Any, Unit] = {
- case Invocation(joinpoint, oneWay) =>
- if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinpoint)
- if (oneWay) joinpoint.proceed
- else reply(joinpoint.proceed)
+ case Invocation(joinPoint, isOneWay, _) =>
+ if (Actor.SERIALIZE_MESSAGES) serializeArguments(joinPoint)
+ if (isOneWay) joinPoint.proceed
+ else reply(joinPoint.proceed)
case unexpected =>
- throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
+ throw new IllegalStateException("Unexpected message [" + unexpected + "] sent to [" + this + "]")
}
override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {
@@ -445,8 +353,8 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends
// } catch { case e: InvocationTargetException => throw e.getCause }
//}
- private def serializeArguments(joinpoint: JoinPoint) = {
- val args = joinpoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
+ private def serializeArguments(joinPoint: JoinPoint) = {
+ val args = joinPoint.getRtti.asInstanceOf[MethodRtti].getParameterValues
var unserializable = false
var hasMutableArgument = false
for (arg <- args.toList) {
@@ -473,7 +381,7 @@ private[akka] class Dispatcher(val callbacks: Option[RestartCallbacks]) extends
if (!unserializable && hasMutableArgument) {
// FIXME: can we have another default deep cloner?
val copyOfArgs = Serializer.Java.deepClone(args)
- joinpoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
+ joinPoint.getRtti.asInstanceOf[MethodRtti].setParameterValues(copyOfArgs.asInstanceOf[Array[AnyRef]])
}
}
}
diff --git a/akka-actors/src/main/scala/actor/Actor.scala b/akka-actors/src/main/scala/actor/Actor.scala
index 322142dfad..db3ee221bb 100644
--- a/akka-actors/src/main/scala/actor/Actor.scala
+++ b/akka-actors/src/main/scala/actor/Actor.scala
@@ -10,20 +10,22 @@ import java.util.HashSet
import se.scalablesolutions.akka.Config._
import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.TransactionManagement._
-import se.scalablesolutions.akka.stm.TransactionManagement
+import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
import se.scalablesolutions.akka.nio.protobuf.RemoteProtocol.RemoteRequest
import se.scalablesolutions.akka.nio.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.util.Helpers.ReadWriteLock
import se.scalablesolutions.akka.util.Logging
+import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
+
import org.multiverse.utils.TransactionThreadLocal._
-import org.multiverse.api.exceptions.StmException
sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage
-case object TransactionalInit extends LifecycleMessage
+//case object TransactionalInit extends LifecycleMessage
case class HotSwap(code: Option[PartialFunction[Any, Unit]]) extends LifecycleMessage
case class Restart(reason: AnyRef) extends LifecycleMessage
case class Exit(dead: Actor, killer: Throwable) extends LifecycleMessage
@@ -64,7 +66,7 @@ trait Actor extends Logging with TransactionManagement {
private var hotswap: Option[PartialFunction[Any, Unit]] = None
private var config: Option[AnyRef] = None
- @volatile protected[this] var isTransactional = false
+ @volatile protected[this] var isTransactionRequiresNew = false
@volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None
@volatile protected[akka] var supervisor: Option[Actor] = None
@@ -214,7 +216,7 @@ trait Actor extends Logging with TransactionManagement {
if (!isRunning) {
dispatcher.start
isRunning = true
- if (isTransactional) this ! TransactionalInit
+ //if (isTransactional) this !! TransactionalInit
}
log.info("[%s] has started", toString)
}
@@ -235,39 +237,44 @@ trait Actor extends Logging with TransactionManagement {
/**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
*/
- def !(message: AnyRef): Unit =
+ def !(message: AnyRef) =
if (isRunning) postMessageToMailbox(message)
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* Sends a message asynchronously and waits on a future for a reply message.
- * It waits on the reply either until it receives it (returns Some(replyMessage) or until the timeout expires (returns None).
- * E.g. send-and-receive-eventually semantics.
+ *
+ * It waits on the reply either until it receives it (in the form of Some(replyMessage))
+ * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
*
* NOTE:
- * If you are sending messages using '!!' then you *have to* use reply(..) sending a reply message to the original sender.
- * If not then the sender will unessecary block until the timeout expires.
+ * If you are sending messages using !! then you have to use reply(..)
+ * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !: Option[T] = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
+ if (message.isInstanceOf[Invocation] && message.asInstanceOf[Invocation].isVoid) future.completeWithResult(None)
future.await
getResultOrThrowException(future)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
-
+
/**
* Sends a message asynchronously and waits on a future for a reply message.
- * It waits on the reply either until it receives it (returns Some(replyMessage) or until the actor default timeout expires (returns None).
- * E.g. send-and-receive-eventually semantics.
+ *
+ * It waits on the reply either until it receives it (in the form of Some(replyMessage))
+ * or until the timeout expires (which will return None). E.g. send-and-receive-eventually semantics.
*
* NOTE:
- * If you are sending messages using '!!' then you *have to* use reply(..) sending a reply message to the original sender.
- * If not then the sender will unessecary block until the timeout expires.
+ * If you are sending messages using !! then you have to use reply(..)
+ * to send a reply message to the original sender. If not then the sender will block until the timeout expires.
*/
def !: Option[T] = !
/**
* Sends a message asynchronously, but waits on a future indefinitely. E.g. emulates a synchronous call.
- * E.g. send-and-receive-eventually semantics.
+ *
+ * NOTE:
+ * Should be used with care.
*/
def !?[T](message: AnyRef): T = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
@@ -276,10 +283,18 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
- * Use reply(..) to reply with a message to the original sender of the message currently being processed.
+ * Use reply(..) to reply with a message to the original sender of the message currently
+ * being processed.
+ *
+ * NOTE:
+ * Does only work together with the actor !! method and/or active objects not annotated
+ * with @oneway.
*/
protected[this] def reply(message: AnyRef) = senderFuture match {
- case None => throw new IllegalStateException("No sender in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." )
+ case None => throw new IllegalStateException(
+ "\n\tNo sender in scope, can't reply. " +
+ "\n\tHave you used the '!' message send or the '@oneway' active object annotation? " +
+ "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future that will be bound by the argument passed to 'reply'." )
case Some(future) => future.completeWithResult(message)
}
@@ -318,7 +333,7 @@ trait Actor extends Logging with TransactionManagement {
*/
def makeTransactionRequired = synchronized {
if (isRunning) throw new IllegalArgumentException("Can not make actor transaction required after it has been started")
- else isTransactional = true
+ else isTransactionRequiresNew = true
}
/**
@@ -505,41 +520,30 @@ trait Actor extends Logging with TransactionManagement {
val message = messageHandle.message //serializeMessage(messageHandle.message)
val future = messageHandle.future
- try {
- tryToCommitTransactions
-
- if (isInExistingTransaction) joinExistingTransaction
- else if (isTransactional) startNewTransaction(messageHandle)
-
- incrementTransaction
- senderFuture = future
- if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
- else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
- decrementTransaction
-
- } catch {
- case e: StmException =>
- e.printStackTrace
+ def proceed = {
+ try {
+ incrementTransaction
+ if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
+ else throw new IllegalArgumentException("Actor " + toString + " could not process message [" + message + "] since no matching 'case' clause in its 'receive' method could be found")
+ } finally {
decrementTransaction
-
- val tx = currentTransaction.get
- if (tx.isDefined) {
- rollback(tx.get)
- if (tx.get.isTopLevel) {
- val done = tx.get.retry
- if (done) {
- if (future.isDefined) future.get.completeWithException(this, e)
- else e.printStackTrace
- }
- }
+ }
+ }
+
+ try {
+ senderFuture = future
+ if (isTransactionRequiresNew && !isTransactionInScope) {
+ if (senderFuture.isEmpty) throw new StmException(
+ "\n\tCan't continue transaction in a one-way fire-forget message send" +
+ "\n\tE.g. using Actor '!' method or Active Object 'void' method" +
+ "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type")
+ atomic {
+ proceed
}
-
+ } else proceed
+ } catch {
case e =>
e.printStackTrace
- decrementTransaction
-
- val tx = currentTransaction.get
- if (tx.isDefined) rollback(tx.get)
if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace
@@ -548,9 +552,7 @@ trait Actor extends Logging with TransactionManagement {
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
-
} finally {
- tryToPrecommitTransactions
clearTransaction
}
}
@@ -566,7 +568,7 @@ trait Actor extends Logging with TransactionManagement {
case HotSwap(code) => hotswap = code
case Restart(reason) => restart(reason)
case Exit(dead, reason) => handleTrapExit(dead, reason)
- case TransactionalInit => initTransactionalState
+// case TransactionalInit => initTransactionalState
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
diff --git a/akka-actors/src/main/scala/actor/ActorRegistry.scala b/akka-actors/src/main/scala/actor/ActorRegistry.scala
index 6247d4d93e..0fc1860682 100755
--- a/akka-actors/src/main/scala/actor/ActorRegistry.scala
+++ b/akka-actors/src/main/scala/actor/ActorRegistry.scala
@@ -4,18 +4,20 @@
package se.scalablesolutions.akka.actor
-import util.Logging
+import se.scalablesolutions.akka.util.Logging
-import scala.collection.jcl.HashMap
+import scala.collection.mutable.HashMap
/**
- * Registry holding all actor instances, mapped by class..
+ * Registry holding all actor instances, mapped by class.
*
* @author Jonas Bonér
*/
object ActorRegistry extends Logging {
private val actors = new HashMap[String, List[Actor]]
+ def actorsFor(clazz: Class[_]): List[Actor] = actorsFor(clazz.getName)
+
def actorsFor(fqn : String): List[Actor] = synchronized {
actors.get(fqn) match {
case None => Nil
@@ -23,8 +25,6 @@ object ActorRegistry extends Logging {
}
}
- def actorsFor(clazz: Class[_]) : List[Actor] = actorsFor(clazz.getName)
-
def register(actor: Actor) = synchronized {
val name = actor.getClass.getName
actors.get(name) match {
diff --git a/akka-actors/src/main/scala/config/ActiveObjectManager.scala b/akka-actors/src/main/scala/config/ActiveObjectConfigurator.scala
similarity index 69%
rename from akka-actors/src/main/scala/config/ActiveObjectManager.scala
rename to akka-actors/src/main/scala/config/ActiveObjectConfigurator.scala
index f5d49d426a..be98fe6e09 100644
--- a/akka-actors/src/main/scala/config/ActiveObjectManager.scala
+++ b/akka-actors/src/main/scala/config/ActiveObjectConfigurator.scala
@@ -13,10 +13,17 @@ import java.util._
//import org.apache.camel.{Endpoint, Routes}
/**
+ * Configurator for the Active Objects. Used to do declarative configuration of supervision.
+ * It also doing dependency injection with and into Active Objects using dependency injection
+ * frameworks such as Google Guice or Spring.
+ *
+ * If you don't want declarative configuration then you should use the ActiveObject
+ * factory methods.
*
* @author Jonas Bonér
*/
-class ActiveObjectManager {
+class ActiveObjectConfigurator {
+ // TODO: make pluggable once we have f.e a SpringConfigurator
private val INSTANCE = new ActiveObjectGuiceConfigurator
/**
@@ -27,29 +34,29 @@ class ActiveObjectManager {
*/
def getInstance[T](clazz: Class[T]): T = INSTANCE.getInstance(clazz)
- def configure(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectManager = {
+ def configure(restartStrategy: RestartStrategy, components: Array[Component]): ActiveObjectConfigurator = {
INSTANCE.configure(
restartStrategy.transform,
components.toList.asInstanceOf[scala.List[Component]].map(_.transform))
this
}
- def inject(): ActiveObjectManager = {
+ def inject: ActiveObjectConfigurator = {
INSTANCE.inject
this
}
- def supervise: ActiveObjectManager = {
+ def supervise: ActiveObjectConfigurator = {
INSTANCE.supervise
this
}
- def addExternalGuiceModule(module: Module): ActiveObjectManager = {
+ def addExternalGuiceModule(module: Module): ActiveObjectConfigurator = {
INSTANCE.addExternalGuiceModule(module)
this
}
- //def addRoutes(routes: Routes): ActiveObjectManager = {
+ //def addRoutes(routes: Routes): ActiveObjectConfigurator = {
// INSTANCE.addRoutes(routes)
// this
// }
@@ -69,6 +76,7 @@ class ActiveObjectManager {
//def getRoutingEndpoints(uri: String): java.util.Collection[Endpoint] = INSTANCE.getRoutingEndpoints(uri)
+ // TODO: should this be exposed?
def getGuiceModules: List[Module] = INSTANCE.getGuiceModules
def reset = INSTANCE.reset
diff --git a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
index 060b1234a0..2a0d454d22 100644
--- a/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
+++ b/akka-actors/src/main/scala/config/ActiveObjectGuiceConfigurator.scala
@@ -7,7 +7,7 @@ package se.scalablesolutions.akka.config
import com.google.inject._
import ScalaConfig._
-import se.scalablesolutions.akka.actor.{Supervisor, ActiveObjectFactory, Dispatcher}
+import se.scalablesolutions.akka.actor.{Supervisor, ActiveObject, Dispatcher}
import se.scalablesolutions.akka.util.Logging
//import org.apache.camel.impl.{DefaultCamelContext}
@@ -21,7 +21,7 @@ import java.lang.reflect.Method
/**
* @author Jonas Bonér
*/
-class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with Logging { // with CamelConfigurator {
+private[akka] class ActiveObjectGuiceConfigurator extends ActiveObjectConfiguratorBase with Logging { // with CamelConfigurator {
//val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private var injector: Injector = _
@@ -32,7 +32,6 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with Loggin
private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
- private var activeObjectFactory = new ActiveObjectFactory
//private var camelContext = new DefaultCamelContext
private var modules = new java.util.ArrayList[Module]
private var methodToUriRegistry = new HashMap[Method, String]
@@ -79,7 +78,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with Loggin
}
*/
override def configure(restartStrategy: RestartStrategy, components: List[Component]):
- ActiveObjectConfigurator = synchronized {
+ ActiveObjectConfiguratorBase = synchronized {
this.restartStrategy = restartStrategy
this.components = components.toArray.toList.asInstanceOf[List[Component]]
bindings = for (component <- this.components) yield {
@@ -102,7 +101,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with Loggin
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
- val proxy = activeObjectFactory.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ val proxy = ActiveObject.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
@@ -117,21 +116,21 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with Loggin
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
- val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
+ val proxy = ActiveObject.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
}
- override def inject: ActiveObjectConfigurator = synchronized {
+ override def inject: ActiveObjectConfiguratorBase = synchronized {
if (injector != null) throw new IllegalStateException("inject() has already been called on this configurator")
injector = Guice.createInjector(modules)
this
}
- override def supervise: ActiveObjectConfigurator = synchronized {
+ override def supervise: ActiveObjectConfiguratorBase = synchronized {
if (injector == null) inject
- supervisor = Some(activeObjectFactory.supervise(restartStrategy, supervised))
+ supervisor = Some(ActiveObject.supervise(restartStrategy, supervised))
//camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
//camelContext.start
supervisor.get.startSupervisor
@@ -151,12 +150,12 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with Loggin
* }})
*
*/
- def addExternalGuiceModule(module: Module): ActiveObjectConfigurator = synchronized {
+ def addExternalGuiceModule(module: Module): ActiveObjectConfiguratorBase = synchronized {
modules.add(module)
this
}
/*
- override def addRoutes(routes: Routes): ActiveObjectConfigurator = synchronized {
+ override def addRoutes(routes: Routes): ActiveObjectConfiguratorBase = synchronized {
camelContext.addRoutes(routes)
this
}
diff --git a/akka-actors/src/main/scala/config/Configurator.scala b/akka-actors/src/main/scala/config/Configurator.scala
index c4a57e91a4..c87138169d 100644
--- a/akka-actors/src/main/scala/config/Configurator.scala
+++ b/akka-actors/src/main/scala/config/Configurator.scala
@@ -20,14 +20,14 @@ trait Configurator {
def isDefined(clazz: Class[_]): Boolean
}
-trait ActiveObjectConfigurator extends Configurator {
+private[akka] trait ActiveObjectConfiguratorBase extends Configurator {
def getExternalDependency[T](clazz: Class[T]): T
- def configure(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectConfigurator
+ def configure(restartStrategy: RestartStrategy, components: List[Component]): ActiveObjectConfiguratorBase
- def inject: ActiveObjectConfigurator
+ def inject: ActiveObjectConfiguratorBase
- def supervise: ActiveObjectConfigurator
+ def supervise: ActiveObjectConfiguratorBase
def reset
diff --git a/akka-actors/src/main/scala/dispatch/Future.scala b/akka-actors/src/main/scala/dispatch/Future.scala
index 94f6960a6d..1f9a1a4726 100644
--- a/akka-actors/src/main/scala/dispatch/Future.scala
+++ b/akka-actors/src/main/scala/dispatch/Future.scala
@@ -7,7 +7,7 @@
*/
package se.scalablesolutions.akka.dispatch
-import java.util.concurrent.locks.{Lock, Condition, ReentrantLock}
+import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
class FutureTimeoutException(message: String) extends RuntimeException(message)
diff --git a/akka-actors/src/main/scala/nio/RemoteServer.scala b/akka-actors/src/main/scala/nio/RemoteServer.scala
index 98693b2dad..79e9984664 100755
--- a/akka-actors/src/main/scala/nio/RemoteServer.scala
+++ b/akka-actors/src/main/scala/nio/RemoteServer.scala
@@ -43,8 +43,6 @@ object RemoteServer extends Logging {
Executors.newCachedThreadPool,
Executors.newCachedThreadPool)
- private val activeObjectFactory = new ActiveObjectFactory
-
private val bootstrap = new ServerBootstrap(factory)
def start: Unit = start(None)
@@ -85,7 +83,6 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) ext
*/
@ChannelPipelineCoverage { val value = "all" }
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
- private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
@@ -239,7 +236,7 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
log.info("Creating a new remote active object [%s]", name)
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
- val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef]
+ val newInstance = ActiveObject.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
} catch {
diff --git a/akka-actors/src/main/scala/stm/Transaction.scala b/akka-actors/src/main/scala/stm/Transaction.scala
index 895c65f383..03ffa120f6 100644
--- a/akka-actors/src/main/scala/stm/Transaction.scala
+++ b/akka-actors/src/main/scala/stm/Transaction.scala
@@ -7,25 +7,25 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
-import se.scalablesolutions.akka.dispatch.MessageInvocation
-import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.state.Committable
+import se.scalablesolutions.akka.util.Logging
-import org.multiverse.api.{Transaction => MultiverseTransaction}
+import org.multiverse.api.{Stm, Transaction => MultiverseTransaction}
import org.multiverse.stms.alpha.AlphaStm
import org.multiverse.utils.GlobalStmInstance
import org.multiverse.utils.TransactionThreadLocal._
-import org.multiverse.templates.{OrElseTemplate, AtomicTemplate}
+import org.multiverse.templates.OrElseTemplate
import scala.collection.mutable.HashMap
+class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
/**
* @author Jonas Bonér
*/
object Multiverse {
- val STM = new AlphaStm
+ val STM: Stm = new AlphaStm
GlobalStmInstance.set(STM)
setThreadLocalTransaction(null)
}
@@ -39,10 +39,10 @@ object Multiverse {
* }
*
*
- * Example of Do-OrElse transaction management.
+ * Example of Run-OrElse transaction management.
*
* import se.scalablesolutions.akka.stm.Transaction._
- * do {
+ * run {
* .. // try to do something
* } orElse {
* .. // if transaction clashes try do do something else to minimize contention
@@ -51,20 +51,52 @@ object Multiverse {
*
* @author Jonas Bonér
*/
-object Transaction {
+object Transaction extends TransactionManagement {
val idFactory = new AtomicLong(-1L)
// -- Monad --------------------------
// FIXME implement Transaction::map/flatMap/filter/foreach
// -- atomic block --------------------------
- def atomic[T](body: => T) = new AtomicTemplate[T]() {
- def execute(t: MultiverseTransaction): T = body
+ def atomic[T](body: => T): T = new AtomicTemplate[T](Multiverse.STM, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
+ def execute(mtx: MultiverseTransaction): T = body
+ override def postStart(mtx: MultiverseTransaction) = {
+ println("------ SETTING TX")
+ val tx = new Transaction
+ tx.transaction = Some(mtx)
+ setTransaction(Some(tx))
+ }
+ override def postCommit = {
+ println("------ GETTING TX")
+ if (isTransactionInScope) {}///getTransactionInScope.commit
+ else throw new IllegalStateException("No transaction in scope")
+ }
}.execute()
+/*
+ def atomic[T](retryCount: Int)(body: => T): T = new AtomicTemplate[T](Multiverse.STM, "akka", false, false, retryCount) {
+ def execute(mtx: MultiverseTransaction): T = body
+ override def postCommit =
+ if (isTransactionInScope) getTransactionInScope.commit
+ else throw new IllegalStateException("No transaction in scope")
+ }.execute
- // -- Do-OrElse --------------------------
- def do[A](orBody: => A) = elseBody(orBody)
+ def atomicReadOnly[T](retryCount: Int)(body: => T): T = new AtomicTemplate[T](Multiverse.STM, "akka", false, true, retryCount) {
+ def execute(mtx: MultiverseTransaction): T = body
+ override def postCommit =
+ if (isTransactionInScope) getTransactionInScope.commit
+ else throw new IllegalStateException("No transaction in scope")
+ }.execute
+
+ def atomicReadOnly[T](body: => T): T = new AtomicTemplate[T](true) {
+ def execute(mtx: MultiverseTransaction): T = body
+ override def postCommit =
+ if (isTransactionInScope) getTransactionInScope.commit
+ else throw new IllegalStateException("No transaction in scope")
+ }.execute
+*/
+ // -- Run-OrElse --------------------------
+ def run[A](orBody: => A) = elseBody(orBody)
def elseBody[A](orBody: => A) = new {
def orElse(elseBody: => A) = new OrElseTemplate[A] {
def run(t: MultiverseTransaction) = orBody
@@ -77,14 +109,11 @@ object Transaction {
* @author Jonas Bonér
*/
@serializable class Transaction extends Logging {
+ import Transaction._
+
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
- private[akka] var transaction: MultiverseTransaction = _
-
- private[this] var message: Option[MessageInvocation] = None
-
- private[this] var participants: List[String] = Nil
- private[this] var precommitted: List[String] = Nil
+ private[akka] var transaction: Option[MultiverseTransaction] = None
private[this] val persistentStateMap = new HashMap[String, Committable]
@@ -95,81 +124,13 @@ object Transaction {
def isTopLevel = depth.get == 0
def register(uuid: String, storage: Committable) = persistentStateMap.put(uuid, storage)
-
- def begin(participant: String, msg: MessageInvocation) = synchronized {
- ensureIsActiveOrNew
- message = Some(msg)
- transaction = Multiverse.STM.startUpdateTransaction("akka")
- log.debug("TX BEGIN - Creating a new transaction with id [%s]", id)
- if (status == TransactionStatus.New) log.debug("TX BEGIN - Actor with UUID [%s] is starting NEW transaction [%s]", participant, toString)
- else log.debug("Actor [%s] is participating in transaction", participant)
- participants ::= participant
- status = TransactionStatus.Active
- }
-
- def precommit(participant: String) = synchronized {
- if (status == TransactionStatus.Active) {
- log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server with UUID [%s]", toString, participant)
- precommitted ::= participant
+ def commit = synchronized {
+ atomic {
+ persistentStateMap.values.foreach(_.commit)
+ TransactionManagement.clearTransaction
}
- }
-
- def commit(participant: String): Boolean = synchronized {
- if (status == TransactionStatus.Active) {
- log.debug("TX COMMIT - Trying to commit transaction [%s] for server with UUID [%s]", toString, participant)
- val haveAllPreCommitted =
- if (participants.size == precommitted.size) {{
- for (part <- participants) yield {
- if (precommitted.exists(_ == part)) true
- else false
- }}.exists(_ == true)
- } else false
- if (haveAllPreCommitted && transaction != null) {
- setThreadLocalTransaction(transaction)
- log.debug("TX COMMIT - Committing transaction [%s] for server with UUID [%s]", toString, participant)
- transaction.commit
- reset
- status = TransactionStatus.Completed
- Transaction.atomic {
- persistentStateMap.values.foreach(_.commit)
- }
- setThreadLocalTransaction(null)
- true
- } else false
- } else {
- true
- }
- }
-
- def rollback(participant: String) = synchronized {
- ensureIsActiveOrAborted
- log.debug("TX ROLLBACK - Actor with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
- status = TransactionStatus.Aborted
- transaction.abort
- reset
- }
-
- def join(participant: String) = synchronized {
- ensureIsActive
- log.debug("TX JOIN - Actor with UUID [%s] is joining transaction [%s]" , participant, toString)
- participants ::= participant
- }
-
- def retry: Boolean = synchronized {
- println("----- 2 " + message.isDefined)
- println("----- 3 " + message.get.nrOfDeliveryAttempts)
- if (message.isDefined && message.get.nrOfDeliveryAttempts.get < TransactionManagement.MAX_NR_OF_RETRIES) {
- log.debug("TX RETRY - Restarting transaction [%s] resending message [%s]", transaction, message.get)
- message.get.send
- true
- } else false
- }
-
- private def reset = synchronized {
- transaction.reset
- participants = Nil
- precommitted = Nil
+ status = TransactionStatus.Completed
}
def status_? = status
diff --git a/akka-actors/src/main/scala/stm/TransactionManagement.scala b/akka-actors/src/main/scala/stm/TransactionManagement.scala
index c13c92a1c8..2d92b7fa56 100644
--- a/akka-actors/src/main/scala/stm/TransactionManagement.scala
+++ b/akka-actors/src/main/scala/stm/TransactionManagement.scala
@@ -6,31 +6,28 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicBoolean
-import se.scalablesolutions.akka.dispatch.MessageInvocation
import se.scalablesolutions.akka.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
import scala.collection.mutable.HashSet
-// FIXME is java.util.UUID better?
-
import org.multiverse.utils.TransactionThreadLocal._
-class TransactionRollbackException(msg: String) extends RuntimeException(msg)
+class StmException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
-object TransactionManagement {
+object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.Config._
- val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 1000)
+ // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
+ val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
+ val TIME_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-for-completion", 1000)
val NR_OF_TIMES_WAITING_FOR_COMPLETION = config.getInt("akka.stm.wait-nr-of-times", 3)
- val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 10)
- val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
- // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
- val RESTART_TRANSACTION_ON_COLLISION = false //akka.Kernel.config.getBool("akka.stm.restart-on-collision", true)
+ val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
+ val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
@@ -41,69 +38,40 @@ object TransactionManagement {
}
trait TransactionManagement extends Logging {
+ // FIXME is java.util.UUID better?
var uuid = Uuid.newUuid.toString
import TransactionManagement.currentTransaction
private[akka] val activeTransactions = new HashSet[Transaction]
- protected def startNewTransaction(message: MessageInvocation) = {
- val newTx = new Transaction
- newTx.begin(uuid, message)
- activeTransactions += newTx
- currentTransaction.set(Some(newTx))
- setThreadLocalTransaction(newTx.transaction)
- }
+ private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
- protected def joinExistingTransaction = {
- val cflowTx = currentTransaction.get
- if (activeTransactions.isEmpty && cflowTx.isDefined) {
- val currentTx = cflowTx.get
- currentTx.join(uuid)
- activeTransactions += currentTx
- }
- }
-
- protected def tryToPrecommitTransactions = activeTransactions.foreach(_.precommit(uuid))
-
- protected def tryToCommitTransactions = {
- for (tx <- activeTransactions) {
- if (tx.commit(uuid)) activeTransactions -= tx
- else if (tx.isTopLevel) {
- println("------------ COULD NOT COMMIT -- WAITING OR TIMEOUT? ---------")
- //tx.retry
- } else {
- println("---------- tryToCommitTransactions ")
- println("---------- tryToCommitTransactions tx.isTopLevel " + tx.isTopLevel)
- println("---------- tryToCommitTransactions tx.depth.get " + tx.depth.get)
- println("---------- tryToCommitTransactions tx.status_? " + tx.status_?)
- rollback(tx)
- // continue, try to commit on next received message
- // FIXME check if TX hase timed out => throw exception
- }
- }
- }
-
- protected def rollback(tx: Transaction) = {
- tx.rollback(uuid)
- activeTransactions -= tx
- }
-
- protected def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) {
+ private[akka] def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) {
+ val tx = transaction.get
+ //log.debug("Setting transaction [%s]", transaction.get)
currentTransaction.set(transaction)
- setThreadLocalTransaction(transaction.get.transaction)
+ if (tx.transaction.isDefined) setThreadLocalTransaction(tx.transaction.get)
+ else throw new IllegalStateException("No transaction defined")
}
- protected def clearTransaction = {
+ private[akka] def clearTransaction = {
+ //if (isTransactionInScope) log.debug("Clearing transaction [%s]", getTransactionInScope)
currentTransaction.set(None)
setThreadLocalTransaction(null)
}
- protected def isInExistingTransaction = currentTransaction.get.isDefined
+ private[akka] def getTransactionInScope = currentTransaction.get.get
+
+ private[akka] def isTransactionInScope = currentTransaction.get.isDefined
- protected def incrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.increment
+ private[akka] def incrementTransaction =
+ if (isTransactionInScope) getTransactionInScope.increment
+ //else throw new IllegalStateException("No transaction in scope")
- protected def decrementTransaction = if (currentTransaction.get.isDefined) currentTransaction.get.get.decrement
-
- protected def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx }
+ private[akka] def decrementTransaction =
+ if (isTransactionInScope) getTransactionInScope.decrement
+ //else throw new IllegalStateException("No transaction in scope")
+
+ private[akka] def removeTransactionIfTopLevel(tx: Transaction) = if (tx.isTopLevel) { activeTransactions -= tx }
}
diff --git a/akka-actors/src/main/scala/stm/TransactionalState.scala b/akka-actors/src/main/scala/stm/TransactionalState.scala
index c18a273209..459ecdafd5 100644
--- a/akka-actors/src/main/scala/stm/TransactionalState.scala
+++ b/akka-actors/src/main/scala/stm/TransactionalState.scala
@@ -4,10 +4,13 @@
package se.scalablesolutions.akka.state
-import stm.{TransactionManagement, Ref}
+import se.scalablesolutions.akka.stm.{TransactionManagement}
+import se.scalablesolutions.akka.collection._
+
import org.multiverse.templates.AtomicTemplate
-import org.multiverse.api.Transaction;
-import collection._
+import org.multiverse.api.Transaction
+import org.multiverse.datastructures.refs.manual.Ref;
+
import org.codehaus.aspectwerkz.proxy.Uuid
diff --git a/akka-actors/src/test/scala/InMemoryActorSpec.scala b/akka-actors/src/test/scala/InMemoryActorSpec.scala
index a879426f76..d6186043ac 100644
--- a/akka-actors/src/test/scala/InMemoryActorSpec.scala
+++ b/akka-actors/src/test/scala/InMemoryActorSpec.scala
@@ -88,6 +88,7 @@ class InMemFailerActor extends Actor {
class InMemoryActorSpec extends TestCase {
+ /*
@Test
def testOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -98,7 +99,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(1000)
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
-
+ */
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -107,7 +108,7 @@ class InMemoryActorSpec extends TestCase {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
-
+ /*
@Test
def testOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -120,7 +121,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(1000)
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
-
+ */
@Test
def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -134,7 +135,7 @@ class InMemoryActorSpec extends TestCase {
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
-
+ /*
@Test
def testOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -145,7 +146,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(1000)
assertEquals(2, (stateful !! GetVectorSize).get)
}
-
+ */
@Test
def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -154,7 +155,7 @@ class InMemoryActorSpec extends TestCase {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals(2, (stateful !! GetVectorSize).get)
}
-
+ /*
@Test
def testOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -167,7 +168,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(1000)
assertEquals(1, (stateful !! GetVectorSize).get)
}
-
+ */
@Test
def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -181,7 +182,7 @@ class InMemoryActorSpec extends TestCase {
} catch {case e: RuntimeException => {}}
assertEquals(1, (stateful !! GetVectorSize).get)
}
-
+ /*
@Test
def testOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -192,7 +193,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(1000)
assertEquals("new state", (stateful !! GetRefState).get)
}
-
+ */
@Test
def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
@@ -201,7 +202,7 @@ class InMemoryActorSpec extends TestCase {
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
assertEquals("new state", (stateful !! GetRefState).get)
}
-
+ /*
@Test
def testOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -214,7 +215,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(1000)
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
-
+ */
@Test
def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
@@ -226,7 +227,6 @@ class InMemoryActorSpec extends TestCase {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
- println("---------- BACK")
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
}
diff --git a/akka-actors/src/test/scala/RemoteActorSpec.scala b/akka-actors/src/test/scala/RemoteActorSpec.scala
index f492189fe2..ea60574cef 100644
--- a/akka-actors/src/test/scala/RemoteActorSpec.scala
+++ b/akka-actors/src/test/scala/RemoteActorSpec.scala
@@ -3,7 +3,7 @@ package se.scalablesolutions.akka.actor
import java.util.concurrent.TimeUnit
import junit.framework.TestCase
-import nio.{RemoteServer, RemoteClient}
+import se.scalablesolutions.akka.nio.{RemoteServer, RemoteClient}
import org.junit.{Test, Before}
import org.junit.Assert._
diff --git a/akka-actors/src/test/scala/TransactionClasherSpec.scala b/akka-actors/src/test/scala/TransactionClasherSpec.scala
index 5ef047198c..4679ff7872 100644
--- a/akka-actors/src/test/scala/TransactionClasherSpec.scala
+++ b/akka-actors/src/test/scala/TransactionClasherSpec.scala
@@ -2,7 +2,6 @@ package se.scalablesolutions.akka.actor
import junit.framework.TestCase
-import stm.TransactionRollbackException
import org.junit.{Test, Before}
import org.junit.Assert._
@@ -95,8 +94,8 @@ class TransactionClasherSpec extends TestCase {
Thread.sleep(1000)
try {
txActor2 !! "Second"
- fail("Expected TransactionRollbackException")
- } catch { case e: TransactionRollbackException => {} }
+ fail("Expected Exception")
+ } catch { case e: Exception => {} }
}
@Test
@@ -116,8 +115,8 @@ class TransactionClasherSpec extends TestCase {
Thread.sleep(1000)
try {
txActor2 ! "Second"
- fail("Expected TransactionRollbackException")
- } catch { case e: TransactionRollbackException => {} }
+ fail("Expected Exception")
+ } catch { case e: Exception => {} }
}
/*
diff --git a/akka-camel/src/main/scala/CamelConfigurator.scala b/akka-camel/src/main/scala/CamelConfigurator.scala
index 680680f188..76ddebeaf1 100644
--- a/akka-camel/src/main/scala/CamelConfigurator.scala
+++ b/akka-camel/src/main/scala/CamelConfigurator.scala
@@ -23,7 +23,7 @@ trait CamelConfigurator {
* }).inject().supervise();
*
*/
- def addRoutes(routes: Routes): ActiveObjectConfigurator
+ def addRoutes(routes: Routes): ActiveObjectConfiguratorBase
def getCamelContext: CamelContext
diff --git a/akka-fun-test-java/pom.xml b/akka-fun-test-java/pom.xml
index 7d3aac2219..7b5e829220 100644
--- a/akka-fun-test-java/pom.xml
+++ b/akka-fun-test-java/pom.xml
@@ -29,25 +29,25 @@
com.sun.jerseyjersey-server
- 1.1.2-ea
+ 1.1.3-eatestcom.sun.jerseyjersey-json
- 1.1.2-ea
+ 1.1.3-eatestcom.sun.jerseyjersey-client
- 1.1.2-ea
+ 1.1.3-eatestcom.sun.jerseyjersey-atom
- 1.1.2-ea
+ 1.1.3-eatest
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index 3f3f65bde3..738fd765ba 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -10,6 +10,7 @@ import com.google.inject.Scopes;
import junit.framework.TestCase;
import se.scalablesolutions.akka.Config;
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import se.scalablesolutions.akka.dispatch.EventBasedThreadPoolDispatcher;
import static se.scalablesolutions.akka.config.JavaConfig.*;
@@ -18,7 +19,7 @@ import java.util.concurrent.ThreadPoolExecutor;
public class ActiveObjectGuiceConfiguratorTest extends TestCase {
static String messageLog = "";
- final private se.scalablesolutions.akka.config.ActiveObjectManager conf = new se.scalablesolutions.akka.config.ActiveObjectManager();
+ final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
Config.config();
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
index 7c6b37a0a0..48fff17cce 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemNestedStateTest.java
@@ -6,6 +6,7 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.Config;
import se.scalablesolutions.akka.config.*;
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.Kernel;
@@ -14,8 +15,7 @@ import junit.framework.TestCase;
public class InMemNestedStateTest extends TestCase {
static String messageLog = "";
- final private ActiveObjectManager conf = new ActiveObjectManager();
- final private ActiveObjectFactory factory = new ActiveObjectFactory();
+ final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
conf.configure(
@@ -44,7 +44,6 @@ public class InMemNestedStateTest extends TestCase {
InMemStatefulNested nested = conf.getInstance(InMemStatefulNested.class);
nested.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state", nested); // transactionrequired
- System.out.println("-- BACK --");
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
assertEquals("new state", nested.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
index 8fa2ffcd81..84d962ba03 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java
@@ -57,7 +57,6 @@ public class InMemStateful {
vectorState.add(msg);
refState.swap(msg);
nested.success(key, msg);
- System.out.println("--- after success ");
}
public String failure(String key, String msg, InMemFailer failer) {
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
index 35879be8eb..430048908f 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java
@@ -8,6 +8,7 @@ import junit.framework.TestCase;
import se.scalablesolutions.akka.Config;
import se.scalablesolutions.akka.config.*;
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.Kernel;
@@ -15,15 +16,13 @@ import se.scalablesolutions.akka.Kernel;
public class InMemoryStateTest extends TestCase {
static String messageLog = "";
- final private ActiveObjectManager conf = new ActiveObjectManager();
-
+ final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
Config.config();
conf.configure(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
- // FIXME: remove string-name, add ctor to only accept target class
new Component(InMemStateful.class,
new LifeCycle(new Permanent(), 1000),
//new RestartCallbacks("preRestart", "postRestart")),
@@ -63,7 +62,6 @@ public class InMemoryStateTest extends TestCase {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getVectorState());
}
@@ -83,7 +81,6 @@ public class InMemoryStateTest extends TestCase {
InMemStateful stateful = conf.getInstance(InMemStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
- stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getRefState());
}
@@ -131,7 +128,6 @@ public class InMemoryStateTest extends TestCase {
// stateful.clashNotOk("stateful", "new state", clasher);
// fail("should have thrown an exception");
// } catch (RuntimeException e) {
- // System.out.println(e);
// } // expected
// assertEquals("init", stateful.getState("stateful")); // check that state is
// // == init state
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
index 03e06fb7b0..89b6ceab06 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentNestedStateTest.java
@@ -5,6 +5,7 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
import se.scalablesolutions.akka.actor.*;
import se.scalablesolutions.akka.Kernel;
@@ -14,8 +15,7 @@ import junit.framework.TestCase;
public class PersistentNestedStateTest extends TestCase {
static String messageLog = "";
- final private ActiveObjectManager conf = new ActiveObjectManager();
- final private ActiveObjectFactory factory = new ActiveObjectFactory();
+ final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
index 98b6b0d892..b7641868d0 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java
@@ -6,15 +6,13 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
-import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.Kernel;
import junit.framework.TestCase;
public class PersistentStateTest extends TestCase {
static String messageLog = "";
- final private ActiveObjectManager conf = new ActiveObjectManager();
+ final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
@@ -23,7 +21,7 @@ public class PersistentStateTest extends TestCase {
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
- //new Component(PersistentClasher.class, new LifeCycle(new Permanent(), 1000), 100000)
+ //new Component(PersistentClasher.class, new LifeCycle(new Permanent(), 1000), 100000)
}).supervise();
}
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
index b7693ecd07..32c9fe167c 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java
@@ -5,8 +5,8 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.Config;
-import se.scalablesolutions.akka.actor.ActiveObjectFactory;
-import se.scalablesolutions.akka.config.ActiveObjectManager;
+import se.scalablesolutions.akka.actor.ActiveObject;
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import se.scalablesolutions.akka.nio.RemoteServer;
import junit.framework.TestCase;
@@ -23,15 +23,14 @@ public class RemoteInMemoryStateTest extends TestCase {
try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
Config.config();
}
- final ActiveObjectManager conf = new ActiveObjectManager();
- final private ActiveObjectFactory factory = new ActiveObjectFactory();
+ final ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void tearDown() {
conf.stop();
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
+ InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.init();
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@@ -39,10 +38,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
- InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
+ InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.init();
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
- InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
+ InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
@@ -52,7 +51,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
+ InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.init();
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@@ -60,10 +59,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
- InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
+ InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.init();
stateful.setVectorState("init"); // set init state
- InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
+ InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
@@ -73,7 +72,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
- InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
+ InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.init();
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
@@ -81,10 +80,10 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
- InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
+ InMemStateful stateful = ActiveObject.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.init();
stateful.setRefState("init"); // set init state
- InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
+ InMemFailer failer = ActiveObject.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getInstance(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactionrequired method
fail("should have thrown an exception");
@@ -125,7 +124,6 @@ public class RemoteInMemoryStateTest extends TestCase {
// stateful.clashNotOk("stateful", "new state", clasher);
// fail("should have thrown an exception");
// } catch (RuntimeException e) {
- // System.out.println(e);
// } // expected
// assertEquals("init", stateful.getState("stateful")); // check that state is
// // == init state
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
index 3c4c15dc30..e81f28f562 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java
@@ -6,15 +6,13 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
-import se.scalablesolutions.akka.actor.*;
-import se.scalablesolutions.akka.Kernel;
import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase {
static String messageLog = "";
- final private ActiveObjectManager conf = new ActiveObjectManager();
+ final private ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
protected void setUp() {
PersistenceManager.init();
diff --git a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
index 5dbe454b0b..0ba51ab239 100644
--- a/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
+++ b/akka-fun-test-java/src/test/java/se/scalablesolutions/akka/api/RestTest.java
@@ -4,9 +4,6 @@
package se.scalablesolutions.akka.api;
-import com.sun.jersey.api.client.Client;
-import com.sun.jersey.api.client.WebResource;
-import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory;
import com.sun.grizzly.http.SelectorThread;
import com.sun.grizzly.http.servlet.ServletAdapter;
import com.sun.grizzly.tcp.Adapter;
@@ -15,15 +12,11 @@ import com.sun.grizzly.standalone.StaticStreamAlgorithm;
import javax.ws.rs.core.UriBuilder;
import javax.servlet.Servlet;
-import junit.framework.TestSuite;
import junit.framework.TestCase;
import org.junit.*;
-import static org.junit.Assert.*;
import java.io.IOException;
import java.net.URI;
-import java.util.Map;
-import java.util.HashMap;
import se.scalablesolutions.akka.config.*;
import static se.scalablesolutions.akka.config.JavaConfig.*;
@@ -34,7 +27,7 @@ public class RestTest extends TestCase {
private static int PORT = 9998;
private static URI URI = UriBuilder.fromUri("http://localhost/").port(PORT).build();
private static SelectorThread selector = null;
- private static ActiveObjectManager conf = new ActiveObjectManager();
+ private static ActiveObjectConfigurator conf = new ActiveObjectConfigurator();
@BeforeClass
protected void setUp() {
diff --git a/akka-kernel/pom.xml b/akka-kernel/pom.xml
index d14b0d2c86..17d2e314b4 100755
--- a/akka-kernel/pom.xml
+++ b/akka-kernel/pom.xml
@@ -144,7 +144,7 @@
sjson.jsonsjson
- 0.1
+ 0.2
@@ -207,17 +207,17 @@
com.sun.jerseyjersey-core
- 1.1.2-ea
+ 1.1.3-eacom.sun.jerseyjersey-server
- 1.1.2-ea
+ 1.1.3-eacom.sun.jerseyjersey-json
- 1.1.2-ea
+ 1.1.3-eajavax.ws.rs
@@ -227,7 +227,7 @@
com.sun.jersey.contribsjersey-scala
- 1.1.2-ea-SNAPSHOT
+ 1.1.3-eaorg.atmosphere
diff --git a/akka-kernel/src/main/scala/AkkaServlet.scala b/akka-kernel/src/main/scala/AkkaServlet.scala
index 7831fd2d01..966403ea3b 100755
--- a/akka-kernel/src/main/scala/AkkaServlet.scala
+++ b/akka-kernel/src/main/scala/AkkaServlet.scala
@@ -14,10 +14,10 @@ import com.sun.jersey.spi.container.WebApplication
import javax.servlet.{ServletConfig}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
-import org.atmosphere.cpr.{AtmosphereServletProcessor, AtmosphereEvent}
+import org.atmosphere.cpr.{AtmosphereServletProcessor, AtmosphereResource, AtmosphereResourceEvent}
import org.atmosphere.cpr.AtmosphereServlet.AtmosphereHandlerWrapper
import org.atmosphere.container.GrizzlyCometSupport
-import org.atmosphere.handler.ReflectorServletProcessor
+import org.atmosphere.handler.{ReflectorServletProcessor, AbstractReflectorAtmosphereHandler}
import org.atmosphere.core.{JerseyBroadcaster}
import java.net.URLClassLoader
@@ -28,8 +28,7 @@ import scala.collection.jcl.Conversions._
/**
* @author Jonas Bonér
*/
-class AkkaServlet extends ServletContainer with AtmosphereServletProcessor with Logging {
-
+class AkkaServlet extends ServletContainer with Logging {
override def initiate(rc: ResourceConfig, wa: WebApplication) = {
akka.Kernel.boot // will boot if not already booted by 'main'
val configurators = ConfiguratorRepository.getConfigurators
@@ -39,39 +38,30 @@ class AkkaServlet extends ServletContainer with AtmosphereServletProcessor with
wa.initiate(rc, new ActorComponentProviderFactory(configurators))
}
-
- // Borrowed from AbstractReflectorAtmosphereHandler
- override def onMessage(event: AtmosphereEvent[HttpServletRequest, HttpServletResponse]): AtmosphereEvent[_, _] = {
-
- val response = event.getResponse
- val data = if(event.getMessage ne null) event.getMessage.toString else null
- val isUsingStream = try {
- response.getWriter
- false
- } catch { case e: IllegalStateException => true }
-
- if (isUsingStream) {
- if (data != null) response.getOutputStream.write(data.getBytes)
- response.getOutputStream.flush
- } else {
- if (data != null) response.getWriter.write(data)
- response.getWriter.flush
- }
- event
- }
-
- override def onEvent(event: AtmosphereEvent[HttpServletRequest, HttpServletResponse]): AtmosphereEvent[_, _] = {
- event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_EVENT, event)
- event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_HANDLER, this)
- service(event.getRequest, event.getResponse)
- event
- }
}
class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet {
override def init(sconf: ServletConfig) = {
- val servlet = new AkkaServlet
- config = new AtmosphereConfig { ah = servlet }
+ val servlet = new AkkaServlet with AtmosphereServletProcessor {
+
+ //Delegate to implement the behavior for AtmosphereHandler
+ private val handler = new AbstractReflectorAtmosphereHandler {
+ override def onRequest(event: AtmosphereResource[HttpServletRequest, HttpServletResponse]): Unit = {
+ event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_RESOURCE, event)
+ event.getRequest.setAttribute(ReflectorServletProcessor.ATMOSPHERE_HANDLER, this)
+ service(event.getRequest, event.getResponse)
+ }
+ }
+
+ override def onStateChange(event: AtmosphereResourceEvent[HttpServletRequest, HttpServletResponse]) {
+ handler onStateChange event
+ }
+
+ override def onRequest(resource: AtmosphereResource[HttpServletRequest, HttpServletResponse]) {
+ handler onRequest resource
+ }
+ }
+ config = new AtmosphereConfig {ah = servlet}
atmosphereHandlers.put("/*", new AtmosphereHandlerWrapper(servlet, new JerseyBroadcaster))
setCometSupport(new GrizzlyCometSupport(config))
getCometSupport.init(sconf)
@@ -79,4 +69,4 @@ class AkkaCometServlet extends org.atmosphere.cpr.AtmosphereServlet {
}
override def loadAtmosphereDotXml(is: InputStream, urlc: URLClassLoader) = () //Hide it
-}
+}
\ No newline at end of file
diff --git a/akka-persistence/src/main/scala/PersistentState.scala b/akka-persistence/src/main/scala/PersistentState.scala
index e32acff6c9..769d064a0e 100644
--- a/akka-persistence/src/main/scala/PersistentState.scala
+++ b/akka-persistence/src/main/scala/PersistentState.scala
@@ -4,14 +4,11 @@
package se.scalablesolutions.akka.state
-import stm.TransactionManagement
-import stm.TransactionManagement.currentTransaction
-import akka.collection._
+import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
+import se.scalablesolutions.akka.collection._
import org.codehaus.aspectwerkz.proxy.Uuid
-import scala.collection.mutable.{ArrayBuffer, HashMap}
-
class NoTransactionInScopeException extends RuntimeException
sealed abstract class PersistentStateConfig
diff --git a/akka-rest/pom.xml b/akka-rest/pom.xml
index c3c3c5a04c..d72a32ef33 100644
--- a/akka-rest/pom.xml
+++ b/akka-rest/pom.xml
@@ -37,17 +37,17 @@
com.sun.jerseyjersey-core
- 1.1.2-ea
+ 1.1.3-eacom.sun.jerseyjersey-server
- 1.1.2-ea
+ 1.1.3-eacom.sun.jerseyjersey-json
- 1.1.2-ea
+ 1.1.3-eajavax.ws.rs
@@ -57,7 +57,7 @@
com.sun.jersey.contribsjersey-scala
- 1.1.2-ea-SNAPSHOT
+ 1.1.3-eaorg.atmosphere
diff --git a/akka-samples-java/src/main/java/sample/java/Boot.java b/akka-samples-java/src/main/java/sample/java/Boot.java
index cd94aea2d2..f610316d16 100644
--- a/akka-samples-java/src/main/java/sample/java/Boot.java
+++ b/akka-samples-java/src/main/java/sample/java/Boot.java
@@ -1,10 +1,10 @@
package sample.java;
-import se.scalablesolutions.akka.config.ActiveObjectManager;
+import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
public class Boot {
- final private ActiveObjectManager manager = new ActiveObjectManager();
+ final private ActiveObjectConfigurator manager = new ActiveObjectConfigurator();
public Boot() throws Exception {
manager.configure(
diff --git a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml
index d474da1ca1..23348604bb 100755
--- a/akka-samples-lift/src/main/webapp/WEB-INF/web.xml
+++ b/akka-samples-lift/src/main/webapp/WEB-INF/web.xml
@@ -13,7 +13,7 @@
AkkaServlet
- se.scalablesolutions.akka.kernel.rest.AkkaServlet
+ se.scalablesolutions.akka.rest.AkkaServletAkkaServlet
diff --git a/akka-samples-scala/src/main/scala/SimpleService.scala b/akka-samples-scala/src/main/scala/SimpleService.scala
index 0f595d2cf5..9b866bf4dc 100644
--- a/akka-samples-scala/src/main/scala/SimpleService.scala
+++ b/akka-samples-scala/src/main/scala/SimpleService.scala
@@ -10,7 +10,7 @@ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import javax.ws.rs.core.MultivaluedMap
-import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes}
+import javax.ws.rs.{GET, POST, Path, Produces, WebApplicationException, Consumes}
import org.atmosphere.core.annotation.{Broadcast, Suspend}
import org.atmosphere.util.XSSHtmlFilter
diff --git a/akka-samples-security/src/main/scala/SimpleService.scala b/akka-samples-security/src/main/scala/SimpleService.scala
index a3627586e6..3afa587d04 100644
--- a/akka-samples-security/src/main/scala/SimpleService.scala
+++ b/akka-samples-security/src/main/scala/SimpleService.scala
@@ -4,13 +4,14 @@
package sample.secure
-import _root_.se.scalablesolutions.akka.state.{TransactionalState,PersistentState, CassandraStorageConfig}
-import _root_.se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
-import _root_.se.scalablesolutions.akka.config.ScalaConfig._
-import _root_.se.scalablesolutions.akka.util.Logging
-import _root_.se.scalablesolutions.akka.security.{DigestAuthenticationActor, UserInfo}
-import _root_.javax.annotation.security.{DenyAll,PermitAll,RolesAllowed}
-import javax.ws.rs.{GET, POST, Path, Produces, Consumes}
+import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
+import se.scalablesolutions.akka.config.ScalaConfig._
+import se.scalablesolutions.akka.util.Logging
+import se.scalablesolutions.akka.security.{DigestAuthenticationActor, UserInfo}
+import se.scalablesolutions.akka.state.TransactionalState
+
+import javax.annotation.security.RolesAllowed
+import javax.ws.rs.{GET, Path, Produces}
class Boot {
object factory extends SupervisorFactory {
@@ -66,7 +67,7 @@ class SecureService extends Actor with Logging {
case object Tick
private val KEY = "COUNTER";
private var hasStartedTicking = false;
- private val storage = PersistentState.newMap(CassandraStorageConfig())
+ private val storage = TransactionalState.newMap
@GET
@Produces(Array("text/html"))
diff --git a/akka-security/pom.xml b/akka-security/pom.xml
index 088d6fa85b..1a3e831dc9 100644
--- a/akka-security/pom.xml
+++ b/akka-security/pom.xml
@@ -48,7 +48,7 @@
com.sun.jerseyjersey-server
- 1.1.2-ea
+ 1.1.3-eajavax.ws.rs
diff --git a/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java
new file mode 100644
index 0000000000..93b8c6f3a6
--- /dev/null
+++ b/akka-util-java/src/main/java/se/scalablesolutions/akka/stm/AtomicTemplate.java
@@ -0,0 +1,304 @@
+package se.scalablesolutions.akka.stm;
+
+import org.multiverse.api.Stm;
+import org.multiverse.api.Transaction;
+import org.multiverse.api.TransactionStatus;
+import org.multiverse.api.exceptions.CommitFailureException;
+import org.multiverse.api.exceptions.LoadException;
+import org.multiverse.api.exceptions.RetryError;
+import org.multiverse.api.exceptions.TooManyRetriesException;
+import org.multiverse.utils.GlobalStmInstance;
+import static org.multiverse.utils.TransactionThreadLocal.getThreadLocalTransaction;
+import static org.multiverse.utils.TransactionThreadLocal.setThreadLocalTransaction;
+
+/**
+ * A Template that handles the boilerplate code for transactions. A transaction will be placed if
+ * none is available around a section and if all goes right, commits at the end.
+ *
+ * example:
+ *
diff --git a/changes.xml b/changes.xml
index 63ecce6eed..e4e06ab32a 100644
--- a/changes.xml
+++ b/changes.xml
@@ -26,13 +26,19 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui
Support for Comet Actors using AtmosphereAMQP integration; abstracted as actors in a supervisor hierarchy. Impl AMQP 0.9.1Rewritten STM, now integrated with Multiverse STM
- Added STM API for atomic {..} and or {..} else {..}
+ Added STM API for atomic {..} and run {..} orElse {..}
+ Added STM retry
+ Complete rewrite of the persistence transaction management, now based on Unit of Work and Multiverse STM
+ Monadic API to TransactionalRef (use it in for-comprehension)
+ New Scala JSON parser based on sjson
+ Smoother web app integration; just add akka.conf to WEB-INF/classes, no need for AKKA_HOMEModularization of distribution into a thin core (actors, remoting and STM) and the rest in submodulesJSON serialization for Java objects (using Jackson)JSON serialization for Scala objects (using SJSON)Protobuf serialization for Java and Scala objectsSBinary serialization for Scala objectsProtobuf as remote protocol
+ Updated Cassandra integration and CassandraSession API to v0.4Added CassandraSession API (with socket pooling) wrapping Cassandra's Thrift API in Scala and Java APIsCassandraStorage is now works with external Cassandra clusterRemoved embedded Cassandra mode
@@ -41,12 +47,12 @@ see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full gui
SchedulerActor for scheduling periodic tasksNow start up kernel with 'java -jar dist/akka-0.6.jar'Concurrent mode is now per actor basis
- Made Akka Web App aware, does not require AKKA_HOME when using it as a libraryFixed dispatcher bugCleaned up Maven scripts and distribution in generalAdded mailing list: akka-user@googlegroups.comImproved and restructured documentationNew URL: http://akkasource.org
+ Fixed many many bugs