added implicit transaction family name for the atomic { .. } blocks + changed implicit sender argument to Option[Actor] (transparent change)

This commit is contained in:
Jonas Bonér 2009-12-26 22:14:06 +01:00
parent ed233e4318
commit e3ceab036c
11 changed files with 131 additions and 122 deletions

View file

@ -75,7 +75,7 @@ object Actor extends Logging {
val PORT = config.getInt("akka.remote.server.port", 9999) val PORT = config.getInt("akka.remote.server.port", 9999)
object Sender extends Actor { object Sender extends Actor {
implicit val Self: AnyRef = this implicit val Self: Option[Actor] = None
def receive = { def receive = {
case unknown => case unknown =>
@ -215,7 +215,8 @@ object Actor extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait Actor extends TransactionManagement { trait Actor extends TransactionManagement {
implicit protected val self: Actor = this implicit protected val self: Option[Actor] = Some(this)
implicit protected val transactionFamily: String = this.getClass.getName
// Only mutable for RemoteServer in order to maintain identity across nodes // Only mutable for RemoteServer in order to maintain identity across nodes
private[akka] var _uuid = Uuid.newUuid.toString private[akka] var _uuid = Uuid.newUuid.toString
@ -472,13 +473,13 @@ trait Actor extends TransactionManagement {
* *
* If invoked from within a *non* Actor instance then either add this import to resolve the implicit argument: * If invoked from within a *non* Actor instance then either add this import to resolve the implicit argument:
* <pre> * <pre>
* import Actor.Sender._ * import Actor.Sender.Self
* actor ! message * actor ! message
* </pre> * </pre>
* *
* Or pass in the implicit argument explicitly: * Or pass in the implicit argument explicitly:
* <pre> * <pre>
* actor.!(message)(this) * actor.!(message)(Some(this))
* </pre> * </pre>
* *
* Or use the 'send(..)' method; * Or use the 'send(..)' method;
@ -486,14 +487,10 @@ trait Actor extends TransactionManagement {
* actor.send(message) * actor.send(message)
* </pre> * </pre>
*/ */
def !(message: Any)(implicit sender: AnyRef) = { def !(message: Any)(implicit sender: Option[Actor]) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) { if (_isRunning) postMessageToMailbox(message, sender)
val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
else None
postMessageToMailbox(message, from)
} else throw new IllegalStateException(
"Actor has not been started, you need to invoke 'actor.start' before using it")
} }
/** /**
@ -574,11 +571,10 @@ trait Actor extends TransactionManagement {
/** /**
* Forwards the message and passes the original sender actor as the sender. * Forwards the message and passes the original sender actor as the sender.
*/ */
def forward(message: Any)(implicit sender: AnyRef) = { def forward(message: Any)(implicit sender: Option[Actor]) = {
if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages")
if (_isRunning) { if (_isRunning) {
val forwarder = if (sender != null && sender.isInstanceOf[Actor]) sender.asInstanceOf[Actor] val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor"))
else throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor")
if (forwarder.getSender.isEmpty) throw new IllegalStateException("Can't forward message when initial sender is not an actor") if (forwarder.getSender.isEmpty) throw new IllegalStateException("Can't forward message when initial sender is not an actor")
postMessageToMailbox(message, forwarder.getSender) postMessageToMailbox(message, forwarder.getSender)
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
@ -838,9 +834,7 @@ trait Actor extends TransactionManagement {
val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get) val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
if (_isEventBased) { if (_isEventBased) {
_mailbox.add(invocation) _mailbox.add(invocation)
if (_isSuspended) { if (_isSuspended) invocation.send
invocation.send
}
} else invocation.send } else invocation.send
} }
} }
@ -862,8 +856,7 @@ trait Actor extends TransactionManagement {
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build)
if (future.isDefined) future.get if (future.isDefined) future.get
else throw new IllegalStateException( else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
"Expected a future from remote call to actor " + toString)
} else { } else {
val future = new DefaultCompletableFutureResult(timeout) val future = new DefaultCompletableFutureResult(timeout)
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get) val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)

View file

@ -46,7 +46,7 @@ object ScalaConfig {
case object Permanent extends Scope case object Permanent extends Scope
case object Temporary extends Scope case object Temporary extends Scope
case class RemoteAddress(hostname: String, port: Int) case class RemoteAddress(hostname: String, port: Int) extends ConfigElement
class Component(_intf: Class[_], class Component(_intf: Class[_],
val target: Class[_], val target: Class[_],

View file

@ -4,25 +4,22 @@
package se.scalablesolutions.akka.remote package se.scalablesolutions.akka.remote
import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener, Receiver, SetStateEvent} import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener, Receiver}
import org.jgroups.util.Util
import se.scalablesolutions.akka.Config.config import se.scalablesolutions.akka.Config.config
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.serialization.Serializer
import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRegistry} import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRegistry}
import se.scalablesolutions.akka.remote.Cluster.{Node, RelayedMessage} import se.scalablesolutions.akka.remote.Cluster.{Node, RelayedMessage}
import scala.collection.immutable.{Map, HashMap, HashSet} import scala.collection.immutable.{Map, HashMap}
import se.scalablesolutions.akka.serialization.Serializer
/** /**
* Interface for interacting with the cluster. * Interface for interacting with the Cluster Membership API.
* *
* @author Viktor Klang * @author Viktor Klang
*/ */
trait Cluster { trait Cluster {
def name: String def name: String
def registerLocalNode(hostname: String, port: Int): Unit def registerLocalNode(hostname: String, port: Int): Unit
@ -35,65 +32,68 @@ trait Cluster {
} }
/** /**
* Baseclass for cluster implementations * Base class for cluster actor implementations.
*/ */
abstract class ClusterActor extends Actor with Cluster { abstract class ClusterActor extends Actor with Cluster {
val name = config.getString("akka.remote.cluster.name") getOrElse "default" val name = config.getString("akka.remote.cluster.name") getOrElse "default"
} }
/** /**
* A singleton representing the Cluster * A singleton representing the Cluster.
* Loads a specified ClusterActor and delegates to that instance * <p/>
* Loads a specified ClusterActor and delegates to that instance.
*/ */
object Cluster extends Cluster { object Cluster extends Cluster {
case class Node(endpoints: List[RemoteAddress]) private[remote] sealed trait ClusterMessage
case class RelayedMessage(actorClassFQN:String, msg: AnyRef) private[remote] case class Node(endpoints: List[RemoteAddress]) extends ClusterMessage
private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
lazy val impl: Option[ClusterActor] = { private[remote] lazy val clusterActor: Option[ClusterActor] = {
config.getString("akka.remote.cluster.actor") map (name => { config.getString("akka.remote.cluster.actor") map (name => {
val actor = Class.forName(name) val actor = Class.forName(name)
.newInstance .newInstance
.asInstanceOf[ClusterActor] .asInstanceOf[ClusterActor]
SupervisorFactory( SupervisorFactory(
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
Supervise(actor, LifeCycle(Permanent)) :: Nil Supervise(actor, LifeCycle(Permanent)) :: Nil)
)
).newInstance.start ).newInstance.start
actor actor
}) })
} }
def name = impl.map(_.name).getOrElse("No cluster") private[remote] lazy val serializer: Serializer = {
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
Class.forName(className).newInstance.asInstanceOf[Serializer]
}
def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] = impl.flatMap(_.lookup(pf)) def name = clusterActor.map(_.name).getOrElse("No cluster")
def registerLocalNode(hostname: String, port: Int): Unit = impl.map(_.registerLocalNode(hostname, port)) def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf))
def deregisterLocalNode(hostname: String, port: Int): Unit = impl.map(_.deregisterLocalNode(hostname, port)) def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.map(_.registerLocalNode(hostname, port))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = impl.map(_.relayMessage(to, msg)) def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.map(_.deregisterLocalNode(hostname, port))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.map(_.relayMessage(to, msg))
} }
/** /**
* Just a placeholder for the JGroupsClusterActor message types * JGroups Internal Cluster messages.
*/ */
object JGroupsClusterActor { private[remote] object JGroupsClusterActor {
//Message types sealed trait JGroupsClusterMessage
sealed trait ClusterMessage case object PapersPlease extends JGroupsClusterMessage
case object PapersPlease extends ClusterMessage case class Papers(addresses: List[RemoteAddress]) extends JGroupsClusterMessage
case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage case object Block extends JGroupsClusterMessage
case object Block extends ClusterMessage case object Unblock extends JGroupsClusterMessage
case object Unblock extends ClusterMessage case class Zombie(address: Address) extends JGroupsClusterMessage
case class Zombie(address: Address) extends ClusterMessage case class RegisterLocalNode(server: RemoteAddress) extends JGroupsClusterMessage
case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage case class DeregisterLocalNode(server: RemoteAddress) extends JGroupsClusterMessage
case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
} }
/** /**
* Clustering support via JGroups * Clustering support via JGroups.
*/ */
class JGroupsClusterActor extends ClusterActor { class JGroupsClusterActor extends ClusterActor {
import JGroupsClusterActor._ import JGroupsClusterActor._
@ -104,9 +104,10 @@ class JGroupsClusterActor extends ClusterActor {
@volatile private var remotes: Map[Address, Node] = Map() @volatile private var remotes: Map[Address, Node] = Map()
override def init = { override def init = {
log debug "Initiating cluster actor" log debug "Initiating JGroups-based cluster actor"
remotes = new HashMap[Address, Node] remotes = new HashMap[Address, Node]
val me = this val me = this
// Set up the JGroups local endpoint // Set up the JGroups local endpoint
channel = Some(new JChannel { channel = Some(new JChannel {
setReceiver(new Receiver with ExtendedMembershipListener { setReceiver(new Receiver with ExtendedMembershipListener {
@ -128,26 +129,26 @@ class JGroupsClusterActor extends ClusterActor {
channel.map(_.connect(name)) channel.map(_.connect(name))
} }
protected def serializer = Serializer.Java //FIXME make this configurable def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] =
remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress)
def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] = remotes.values.toList.flatMap(_.endpoints).find(pf isDefinedAt _).map(pf) def registerLocalNode(hostname: String, port: Int): Unit =
send(RegisterLocalNode(RemoteAddress(hostname, port)))
def registerLocalNode(hostname: String, port: Int): Unit = this send RegisterLocalNode(RemoteAddress(hostname, port)) def deregisterLocalNode(hostname: String, port: Int): Unit =
send(DeregisterLocalNode(RemoteAddress(hostname, port)))
def deregisterLocalNode(hostname: String, port: Int): Unit = this send DeregisterLocalNode(RemoteAddress(hostname, port)) def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit =
send(RelayedMessage(to.getName, msg))
def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = this send RelayedMessage(to.getName, msg)
private def broadcast[T <: AnyRef](recipients: Iterable[Address], msg: T): Unit = { private def broadcast[T <: AnyRef](recipients: Iterable[Address], msg: T): Unit = {
lazy val m = serializer out msg lazy val m = Cluster.serializer out msg
for (c <- channel; r <- recipients) c.send(new Message(r, null, m)) for (c <- channel; r <- recipients) c.send(new Message(r, null, m))
} }
private def broadcast[T <: AnyRef](msg: T): Unit = { private def broadcast[T <: AnyRef](msg: T): Unit =
if(!remotes.isEmpty) //Don't broadcast if we are not connected anywhere... //Don't broadcast if we are not connected anywhere...
channel.map(_.send(new Message(null, null, serializer out msg))) if (!remotes.isEmpty) channel.map(_.send(new Message(null, null, Cluster.serializer out msg)))
}
def receive = { def receive = {
case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
@ -171,8 +172,8 @@ class JGroupsClusterActor extends ClusterActor {
} }
case m: Message => { case m: Message => {
if (m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected if (m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) // Handle non-own messages only, and only if we're connected
(serializer in (m.getRawBuffer, None)) match { (Cluster.serializer in (m.getRawBuffer, None)) match {
case PapersPlease => { case PapersPlease => {
log debug ("Asked for papers by %s", m.getSrc) log debug ("Asked for papers by %s", m.getSrc)
@ -207,8 +208,8 @@ class JGroupsClusterActor extends ClusterActor {
broadcast(Papers(local.endpoints)) broadcast(Papers(local.endpoints))
} }
case Block => log debug "UNSUPPORTED: block" //TODO HotSwap to a buffering body case Block => log debug "UNSUPPORTED: JGroupsClusterActor::block" //TODO HotSwap to a buffering body
case Unblock => log debug "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer case Unblock => log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer
} }
override def shutdown = { override def shutdown = {

View file

@ -221,7 +221,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl
remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort) remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort)
remoteActor.start remoteActor.start
} }
actor.!(message)(remoteActor) actor.!(message)(Some(remoteActor))
} else { } else {
// couldn't find a way to reply, send the message without a source/sender // couldn't find a way to reply, send the message without a source/sender
actor.send(message) actor.send(message)

View file

@ -38,7 +38,8 @@ object Serializer {
val EMPTY_CLASS_ARRAY = Array[Class[_]]() val EMPTY_CLASS_ARRAY = Array[Class[_]]()
val EMPTY_ANY_REF_ARRAY = Array[AnyRef]() val EMPTY_ANY_REF_ARRAY = Array[AnyRef]()
object NOOP extends Serializer { object NOOP extends NOOP
class NOOP extends Serializer {
def deepClone(obj: AnyRef): AnyRef = obj def deepClone(obj: AnyRef): AnyRef = obj
def out(obj: AnyRef): Array[Byte] = obj.asInstanceOf[Array[Byte]] def out(obj: AnyRef): Array[Byte] = obj.asInstanceOf[Array[Byte]]
def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes
@ -47,7 +48,8 @@ object Serializer {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Java extends Serializer { object Java extends Java
class Java extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = { def out(obj: AnyRef): Array[Byte] = {
@ -69,7 +71,8 @@ object Serializer {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object Protobuf extends Serializer { object Protobuf extends Protobuf
class Protobuf extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
def out(obj: AnyRef): Array[Byte] = { def out(obj: AnyRef): Array[Byte] = {
@ -93,7 +96,8 @@ object Serializer {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object JavaJSON extends Serializer { object JavaJSON extends JavaJSON
class JavaJSON extends Serializer {
private val mapper = new ObjectMapper private val mapper = new ObjectMapper
def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass))
@ -123,7 +127,8 @@ object Serializer {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object ScalaJSON extends Serializer { object ScalaJSON extends ScalaJSON
class ScalaJSON extends Serializer {
def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def deepClone(obj: AnyRef): AnyRef = in(out(obj), None)
def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj)
@ -136,7 +141,8 @@ object Serializer {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
object SBinary { object SBinary extends SBinary
class SBinary {
import sbinary.DefaultProtocol._ import sbinary.DefaultProtocol._
def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None)

View file

@ -21,7 +21,13 @@ class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message) class TransactionRetryException(message: String) extends RuntimeException(message)
/** /**
* Example of atomic transaction management using the atomic block: * Example of atomic transaction management using the atomic block.
* These blocks takes an implicit argument String defining the transaction family name.
* If these blocks are used from within an Actor then the name is automatically resolved, if not either:
* 1. define an implicit String with the name in the same scope
* 2. pass in the name explicitly
*
* Here are some examples (assuming implicit transaction family name in scope):
* <pre> * <pre>
* import se.scalablesolutions.akka.stm.Transaction._ * import se.scalablesolutions.akka.stm.Transaction._
* *
@ -95,17 +101,17 @@ object Transaction extends TransactionManagement {
/** /**
* See ScalaDoc on class. * See ScalaDoc on class.
*/ */
def map[T](f: Transaction => T): T = atomic { f(getTransactionInScope) } def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
/** /**
* See ScalaDoc on class. * See ScalaDoc on class.
*/ */
def flatMap[T](f: Transaction => T): T = atomic { f(getTransactionInScope) } def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
/** /**
* See ScalaDoc on class. * See ScalaDoc on class.
*/ */
def foreach(f: Transaction => Unit): Unit = atomic { f(getTransactionInScope) } def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) }
/** /**
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks * Creates a "pure" STM atomic transaction and by-passes all transactions hooks
@ -113,15 +119,15 @@ object Transaction extends TransactionManagement {
* Only for internal usage. * Only for internal usage.
*/ */
private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T]( private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) { getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
def execute(mtx: MultiverseTransaction): T = body def execute(mtx: MultiverseTransaction): T = body
}.execute() }.execute()
/** /**
* See ScalaDoc on class. * See ScalaDoc on class.
*/ */
def atomic[T](body: => T): T = new AtomicTemplate[T]( def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T](
getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) { getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
def execute(mtx: MultiverseTransaction): T = body def execute(mtx: MultiverseTransaction): T = body
override def postStart(mtx: MultiverseTransaction) = { override def postStart(mtx: MultiverseTransaction) = {
val tx = new Transaction val tx = new Transaction
@ -137,8 +143,8 @@ object Transaction extends TransactionManagement {
/** /**
* See ScalaDoc on class. * See ScalaDoc on class.
*/ */
def atomic[T](retryCount: Int)(body: => T): T = { def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
new AtomicTemplate[T](getGlobalStmInstance, "akka", false, false, retryCount) { new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) {
def execute(mtx: MultiverseTransaction): T = body def execute(mtx: MultiverseTransaction): T = body
override def postStart(mtx: MultiverseTransaction) = { override def postStart(mtx: MultiverseTransaction) = {
val tx = new Transaction val tx = new Transaction
@ -155,8 +161,8 @@ object Transaction extends TransactionManagement {
/** /**
* See ScalaDoc on class. * See ScalaDoc on class.
*/ */
def atomicReadOnly[T](retryCount: Int)(body: => T): T = { def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
new AtomicTemplate[T](getGlobalStmInstance, "akka", false, true, retryCount) { new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) {
def execute(mtx: MultiverseTransaction): T = body def execute(mtx: MultiverseTransaction): T = body
override def postStart(mtx: MultiverseTransaction) = { override def postStart(mtx: MultiverseTransaction) = {
val tx = new Transaction val tx = new Transaction

View file

@ -73,6 +73,7 @@ object TransactionalRef {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class TransactionalRef[T] extends Transactional { class TransactionalRef[T] extends Transactional {
implicit val txInitName = "TransactionalRef:Init"
import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.api.ThreadLocalTransaction._
val uuid = Uuid.newUuid.toString val uuid = Uuid.newUuid.toString

View file

@ -94,6 +94,7 @@
<artifactId>maven-surefire-plugin</artifactId> <artifactId>maven-surefire-plugin</artifactId>
<configuration> <configuration>
<excludes> <excludes>
<exclude>**/InMemNestedStateTest*</exclude>
<exclude>**/*Persistent*</exclude> <exclude>**/*Persistent*</exclude>
</excludes> </excludes>
</configuration> </configuration>

View file

@ -23,7 +23,7 @@
<dependency> <dependency>
<groupId>net.lag</groupId> <groupId>net.lag</groupId>
<artifactId>configgy</artifactId> <artifactId>configgy</artifactId>
<version>1.4</version> <version>1.4.7</version>
</dependency> </dependency>
</dependencies> </dependencies>

View file

@ -39,8 +39,9 @@
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6 zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
<cluster> <cluster>
name = "default" name = "default" # The name of the cluster
actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
serializer = "se.scalablesolutions.akka.serialization.Serializer.Java" # FQN of the serializer class
</cluster> </cluster>
<server> <server>