diff --git a/akka-core/src/main/scala/actor/Actor.scala b/akka-core/src/main/scala/actor/Actor.scala index 076d945360..8d7ce9f7c1 100644 --- a/akka-core/src/main/scala/actor/Actor.scala +++ b/akka-core/src/main/scala/actor/Actor.scala @@ -75,7 +75,7 @@ object Actor extends Logging { val PORT = config.getInt("akka.remote.server.port", 9999) object Sender extends Actor { - implicit val Self: AnyRef = this + implicit val Self: Option[Actor] = None def receive = { case unknown => @@ -215,7 +215,8 @@ object Actor extends Logging { * @author Jonas Bonér */ trait Actor extends TransactionManagement { - implicit protected val self: Actor = this + implicit protected val self: Option[Actor] = Some(this) + implicit protected val transactionFamily: String = this.getClass.getName // Only mutable for RemoteServer in order to maintain identity across nodes private[akka] var _uuid = Uuid.newUuid.toString @@ -472,13 +473,13 @@ trait Actor extends TransactionManagement { * * If invoked from within a *non* Actor instance then either add this import to resolve the implicit argument: *
-   *   import Actor.Sender._
+   *   import Actor.Sender.Self
    *   actor ! message
    * 
* * Or pass in the implicit argument explicitly: *
-   *   actor.!(message)(this)
+   *   actor.!(message)(Some(this))
    * 
* * Or use the 'send(..)' method; @@ -486,14 +487,10 @@ trait Actor extends TransactionManagement { * actor.send(message) * */ - def !(message: Any)(implicit sender: AnyRef) = { + def !(message: Any)(implicit sender: Option[Actor]) = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") - if (_isRunning) { - val from = if (sender != null && sender.isInstanceOf[Actor]) Some(sender.asInstanceOf[Actor]) - else None - postMessageToMailbox(message, from) - } else throw new IllegalStateException( - "Actor has not been started, you need to invoke 'actor.start' before using it") + if (_isRunning) postMessageToMailbox(message, sender) + else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } /** @@ -574,11 +571,10 @@ trait Actor extends TransactionManagement { /** * Forwards the message and passes the original sender actor as the sender. */ - def forward(message: Any)(implicit sender: AnyRef) = { + def forward(message: Any)(implicit sender: Option[Actor]) = { if (_isKilled) throw new ActorKilledException("Actor [" + toString + "] has been killed, can't respond to messages") if (_isRunning) { - val forwarder = if (sender != null && sender.isInstanceOf[Actor]) sender.asInstanceOf[Actor] - else throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor") + val forwarder = sender.getOrElse(throw new IllegalStateException("Can't forward message when the forwarder/mediator is not an actor")) if (forwarder.getSender.isEmpty) throw new IllegalStateException("Can't forward message when initial sender is not an actor") postMessageToMailbox(message, forwarder.getSender) } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") @@ -597,12 +593,12 @@ trait Actor extends TransactionManagement { case None => throw new IllegalStateException( "\n\tNo sender in scope, can't reply. " + - "\n\tYou have probably used the '!' method to either; " + - "\n\t\t1. Send a message to a remote actor which does not have a contact address." + - "\n\t\t2. Send a message from an instance that is *not* an actor" + - "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + - "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + - "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network.") + "\n\tYou have probably used the '!' method to either; " + + "\n\t\t1. Send a message to a remote actor which does not have a contact address." + + "\n\t\t2. Send a message from an instance that is *not* an actor" + + "\n\t\t3. Send a message to an Active Object annotated with the '@oneway' annotation? " + + "\n\tIf so, switch to '!!' (or remove '@oneway') which passes on an implicit future" + + "\n\tthat will be bound by the argument passed to 'reply'. Alternatively, you can use setContactAddress to make sure the actor can be contacted over the network.") case Some(future) => future.completeWithResult(message) } @@ -838,9 +834,7 @@ trait Actor extends TransactionManagement { val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get) if (_isEventBased) { _mailbox.add(invocation) - if (_isSuspended) { - invocation.send - } + if (_isSuspended) invocation.send } else invocation.send } } @@ -862,8 +856,7 @@ trait Actor extends TransactionManagement { if (id.isDefined) requestBuilder.setSupervisorUuid(id.get) val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build) if (future.isDefined) future.get - else throw new IllegalStateException( - "Expected a future from remote call to actor " + toString) + else throw new IllegalStateException("Expected a future from remote call to actor " + toString) } else { val future = new DefaultCompletableFutureResult(timeout) val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get) @@ -924,7 +917,7 @@ trait Actor extends TransactionManagement { if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function else throw new IllegalArgumentException( "Actor " + toString + " could not process message [" + message + "]" + - "\n\tsince no matching 'case' clause in its 'receive' method could be found") + "\n\tsince no matching 'case' clause in its 'receive' method could be found") } finally { decrementTransaction } @@ -934,8 +927,8 @@ trait Actor extends TransactionManagement { if (isTransactionRequiresNew && !isTransactionInScope) { if (senderFuture.isEmpty) throw new StmException( "Can't continue transaction in a one-way fire-forget message send" + - "\n\tE.g. using Actor '!' method or Active Object 'void' method" + - "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type") + "\n\tE.g. using Actor '!' method or Active Object 'void' method" + + "\n\tPlease use the Actor '!!', '!?' methods or Active Object method with non-void return type") atomic { proceed } @@ -975,7 +968,7 @@ trait Actor extends TransactionManagement { } } else throw new IllegalStateException( "No 'faultHandler' defined for an actor with the 'trapExit' member field defined " + - "\n\tto non-empty list of exception classes - can't proceed " + toString) + "\n\tto non-empty list of exception classes - can't proceed " + toString) } else { if (_supervisor.isDefined) _supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on } @@ -1056,8 +1049,8 @@ trait Actor extends TransactionManagement { override def equals(that: Any): Boolean = { that != null && - that.isInstanceOf[Actor] && - that.asInstanceOf[Actor]._uuid == _uuid + that.isInstanceOf[Actor] && + that.asInstanceOf[Actor]._uuid == _uuid } override def toString(): String = "Actor[" + id + ":" + uuid + "]" diff --git a/akka-core/src/main/scala/config/Config.scala b/akka-core/src/main/scala/config/Config.scala index 5cdbfdbb9b..8f2cfe71fc 100644 --- a/akka-core/src/main/scala/config/Config.scala +++ b/akka-core/src/main/scala/config/Config.scala @@ -46,7 +46,7 @@ object ScalaConfig { case object Permanent extends Scope case object Temporary extends Scope - case class RemoteAddress(hostname: String, port: Int) + case class RemoteAddress(hostname: String, port: Int) extends ConfigElement class Component(_intf: Class[_], val target: Class[_], diff --git a/akka-core/src/main/scala/remote/Cluster.scala b/akka-core/src/main/scala/remote/Cluster.scala index 9e06d6d267..f841af165c 100644 --- a/akka-core/src/main/scala/remote/Cluster.scala +++ b/akka-core/src/main/scala/remote/Cluster.scala @@ -4,25 +4,22 @@ package se.scalablesolutions.akka.remote -import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener, Receiver, SetStateEvent} -import org.jgroups.util.Util +import org.jgroups.{JChannel, View, Address, Message, ExtendedMembershipListener, Receiver} import se.scalablesolutions.akka.Config.config -import se.scalablesolutions.akka.util.Logging -import se.scalablesolutions.akka.serialization.Serializer import se.scalablesolutions.akka.config.ScalaConfig._ import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRegistry} import se.scalablesolutions.akka.remote.Cluster.{Node, RelayedMessage} -import scala.collection.immutable.{Map, HashMap, HashSet} +import scala.collection.immutable.{Map, HashMap} +import se.scalablesolutions.akka.serialization.Serializer /** - * Interface for interacting with the cluster. - * + * Interface for interacting with the Cluster Membership API. + * * @author Viktor Klang */ trait Cluster { - def name: String def registerLocalNode(hostname: String, port: Int): Unit @@ -31,69 +28,72 @@ trait Cluster { def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit - def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] + def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] } /** - * Baseclass for cluster implementations + * Base class for cluster actor implementations. */ abstract class ClusterActor extends Actor with Cluster { val name = config.getString("akka.remote.cluster.name") getOrElse "default" } /** - * A singleton representing the Cluster - * Loads a specified ClusterActor and delegates to that instance + * A singleton representing the Cluster. + *

+ * Loads a specified ClusterActor and delegates to that instance. */ object Cluster extends Cluster { - case class Node(endpoints: List[RemoteAddress]) - case class RelayedMessage(actorClassFQN:String, msg: AnyRef) + private[remote] sealed trait ClusterMessage + private[remote] case class Node(endpoints: List[RemoteAddress]) extends ClusterMessage + private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage - lazy val impl: Option[ClusterActor] = { + private[remote] lazy val clusterActor: Option[ClusterActor] = { config.getString("akka.remote.cluster.actor") map (name => { val actor = Class.forName(name) - .newInstance - .asInstanceOf[ClusterActor] - + .newInstance + .asInstanceOf[ClusterActor] SupervisorFactory( SupervisorConfig( - RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])), - Supervise(actor, LifeCycle(Permanent)) :: Nil - ) + RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])), + Supervise(actor, LifeCycle(Permanent)) :: Nil) ).newInstance.start - actor }) } - def name = impl.map(_.name).getOrElse("No cluster") + private[remote] lazy val serializer: Serializer = { + val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName) + Class.forName(className).newInstance.asInstanceOf[Serializer] + } - def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] = impl.flatMap(_.lookup(pf)) + def name = clusterActor.map(_.name).getOrElse("No cluster") - def registerLocalNode(hostname: String, port: Int): Unit = impl.map(_.registerLocalNode(hostname, port)) + def lookup[T](pf: PartialFunction[RemoteAddress, T]): Option[T] = clusterActor.flatMap(_.lookup(pf)) - def deregisterLocalNode(hostname: String, port: Int): Unit = impl.map(_.deregisterLocalNode(hostname, port)) + def registerLocalNode(hostname: String, port: Int): Unit = clusterActor.map(_.registerLocalNode(hostname, port)) - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = impl.map(_.relayMessage(to, msg)) + def deregisterLocalNode(hostname: String, port: Int): Unit = clusterActor.map(_.deregisterLocalNode(hostname, port)) + + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = clusterActor.map(_.relayMessage(to, msg)) } /** - * Just a placeholder for the JGroupsClusterActor message types + * JGroups Internal Cluster messages. */ -object JGroupsClusterActor { - //Message types - sealed trait ClusterMessage - case object PapersPlease extends ClusterMessage - case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage - case object Block extends ClusterMessage - case object Unblock extends ClusterMessage - case class Zombie(address: Address) extends ClusterMessage - case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage - case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage +private[remote] object JGroupsClusterActor { + sealed trait JGroupsClusterMessage + case object PapersPlease extends JGroupsClusterMessage + case class Papers(addresses: List[RemoteAddress]) extends JGroupsClusterMessage + case object Block extends JGroupsClusterMessage + case object Unblock extends JGroupsClusterMessage + case class Zombie(address: Address) extends JGroupsClusterMessage + case class RegisterLocalNode(server: RemoteAddress) extends JGroupsClusterMessage + case class DeregisterLocalNode(server: RemoteAddress) extends JGroupsClusterMessage } /** - * Clustering support via JGroups + * Clustering support via JGroups. */ class JGroupsClusterActor extends ClusterActor { import JGroupsClusterActor._ @@ -104,10 +104,11 @@ class JGroupsClusterActor extends ClusterActor { @volatile private var remotes: Map[Address, Node] = Map() override def init = { - log debug "Initiating cluster actor" + log debug "Initiating JGroups-based cluster actor" remotes = new HashMap[Address, Node] val me = this - //Set up the JGroups local endpoint + + // Set up the JGroups local endpoint channel = Some(new JChannel { setReceiver(new Receiver with ExtendedMembershipListener { def getState: Array[Byte] = null @@ -128,26 +129,26 @@ class JGroupsClusterActor extends ClusterActor { channel.map(_.connect(name)) } - protected def serializer = Serializer.Java //FIXME make this configurable + def lookup[T](handleRemoteAddress: PartialFunction[RemoteAddress, T]): Option[T] = + remotes.values.toList.flatMap(_.endpoints).find(handleRemoteAddress isDefinedAt _).map(handleRemoteAddress) - def lookup[T](pf : PartialFunction[RemoteAddress,T]) : Option[T] = remotes.values.toList.flatMap(_.endpoints).find(pf isDefinedAt _).map(pf) + def registerLocalNode(hostname: String, port: Int): Unit = + send(RegisterLocalNode(RemoteAddress(hostname, port))) - def registerLocalNode(hostname: String, port: Int): Unit = this send RegisterLocalNode(RemoteAddress(hostname, port)) + def deregisterLocalNode(hostname: String, port: Int): Unit = + send(DeregisterLocalNode(RemoteAddress(hostname, port))) - def deregisterLocalNode(hostname: String, port: Int): Unit = this send DeregisterLocalNode(RemoteAddress(hostname, port)) - - def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = this send RelayedMessage(to.getName, msg) + def relayMessage(to: Class[_ <: Actor], msg: AnyRef): Unit = + send(RelayedMessage(to.getName, msg)) private def broadcast[T <: AnyRef](recipients: Iterable[Address], msg: T): Unit = { - lazy val m = serializer out msg + lazy val m = Cluster.serializer out msg for (c <- channel; r <- recipients) c.send(new Message(r, null, m)) - } - private def broadcast[T <: AnyRef](msg: T): Unit = { - if(!remotes.isEmpty) //Don't broadcast if we are not connected anywhere... - channel.map(_.send(new Message(null, null, serializer out msg))) - } + private def broadcast[T <: AnyRef](msg: T): Unit = + //Don't broadcast if we are not connected anywhere... + if (!remotes.isEmpty) channel.map(_.send(new Message(null, null, Cluster.serializer out msg))) def receive = { case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead @@ -157,40 +158,40 @@ class JGroupsClusterActor extends ClusterActor { } case v: View => { - //Not present in the cluster anymore = presumably zombies - //Nodes we have no prior knowledge existed = unknowns - val members = Set[Address]() ++ v.getMembers.asScala - channel.get.getAddress //Exclude ourselves + // Not present in the cluster anymore = presumably zombies + // Nodes we have no prior knowledge existed = unknowns + val members = Set[Address]() ++ v.getMembers.asScala - channel.get.getAddress // Exclude ourselves val zombies = Set[Address]() ++ remotes.keySet -- members val unknown = members -- remotes.keySet log debug v.printDetails - //Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead + // Tell the zombies and unknowns to provide papers and prematurely treat the zombies as dead broadcast(zombies ++ unknown, PapersPlease) remotes = remotes -- zombies } case m: Message => { - if (m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) //handle non-own messages only, and only if we're connected - (serializer in (m.getRawBuffer, None)) match { - + if (m.getSrc != channel.map(_.getAddress).getOrElse(m.getSrc)) // Handle non-own messages only, and only if we're connected + (Cluster.serializer in (m.getRawBuffer, None)) match { + case PapersPlease => { log debug ("Asked for papers by %s", m.getSrc) broadcast(m.getSrc :: Nil, Papers(local.endpoints)) - - if(remotes.get(m.getSrc).isEmpty) //If we were asked for papers from someone we don't know, ask them! + + if (remotes.get(m.getSrc).isEmpty) // If we were asked for papers from someone we don't know, ask them! broadcast(m.getSrc :: Nil, PapersPlease) } - case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) + case Papers(x) => remotes = remotes + (m.getSrc -> Node(x)) case RelayedMessage(c, m) => ActorRegistry.actorsFor(c).map(_ send m) - - case unknown => log debug ("Unknown message: %s", unknown.toString) + + case unknown => log debug ("Unknown message: %s", unknown.toString) } } - case rm @ RelayedMessage(_,_) => { + case rm @ RelayedMessage(_, _) => { log debug ("Relaying message: %s", rm) broadcast(rm) } @@ -207,8 +208,8 @@ class JGroupsClusterActor extends ClusterActor { broadcast(Papers(local.endpoints)) } - case Block => log debug "UNSUPPORTED: block" //TODO HotSwap to a buffering body - case Unblock => log debug "UNSUPPORTED: unblock" //TODO HotSwap back and flush the buffer + case Block => log debug "UNSUPPORTED: JGroupsClusterActor::block" //TODO HotSwap to a buffering body + case Unblock => log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer } override def shutdown = { diff --git a/akka-core/src/main/scala/remote/RemoteClient.scala b/akka-core/src/main/scala/remote/RemoteClient.scala index cf9182646b..6b0ddec6c8 100644 --- a/akka-core/src/main/scala/remote/RemoteClient.scala +++ b/akka-core/src/main/scala/remote/RemoteClient.scala @@ -137,7 +137,7 @@ class RemoteClientPipelineFactory(name: String, futures: ConcurrentMap[Long, CompletableFutureResult], supervisors: ConcurrentMap[String, Actor], bootstrap: ClientBootstrap, - timer: HashedWheelTimer) extends ChannelPipelineFactory { + timer: HashedWheelTimer) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val pipeline = Channels.pipeline() pipeline.addLast("timeout", new ReadTimeoutHandler(timer, RemoteClient.READ_TIMEOUT)) diff --git a/akka-core/src/main/scala/remote/RemoteServer.scala b/akka-core/src/main/scala/remote/RemoteServer.scala index 691f28b0ee..f905f69b26 100755 --- a/akka-core/src/main/scala/remote/RemoteServer.scala +++ b/akka-core/src/main/scala/remote/RemoteServer.scala @@ -221,7 +221,7 @@ class RemoteServerHandler(val name: String, openChannels: ChannelGroup, val appl remoteActor.makeRemote(request.getSourceHostname, request.getSourcePort) remoteActor.start } - actor.!(message)(remoteActor) + actor.!(message)(Some(remoteActor)) } else { // couldn't find a way to reply, send the message without a source/sender actor.send(message) diff --git a/akka-core/src/main/scala/serialization/Serializer.scala b/akka-core/src/main/scala/serialization/Serializer.scala index e6b791f168..a79938e33f 100644 --- a/akka-core/src/main/scala/serialization/Serializer.scala +++ b/akka-core/src/main/scala/serialization/Serializer.scala @@ -38,7 +38,8 @@ object Serializer { val EMPTY_CLASS_ARRAY = Array[Class[_]]() val EMPTY_ANY_REF_ARRAY = Array[AnyRef]() - object NOOP extends Serializer { + object NOOP extends NOOP + class NOOP extends Serializer { def deepClone(obj: AnyRef): AnyRef = obj def out(obj: AnyRef): Array[Byte] = obj.asInstanceOf[Array[Byte]] def in(bytes: Array[Byte], clazz: Option[Class[_]]): AnyRef = bytes @@ -47,7 +48,8 @@ object Serializer { /** * @author Jonas Bonér */ - object Java extends Serializer { + object Java extends Java + class Java extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = { @@ -69,7 +71,8 @@ object Serializer { /** * @author Jonas Bonér */ - object Protobuf extends Serializer { + object Protobuf extends Protobuf + class Protobuf extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) def out(obj: AnyRef): Array[Byte] = { @@ -93,7 +96,8 @@ object Serializer { /** * @author Jonas Bonér */ - object JavaJSON extends Serializer { + object JavaJSON extends JavaJSON + class JavaJSON extends Serializer { private val mapper = new ObjectMapper def deepClone(obj: AnyRef): AnyRef = in(out(obj), Some(obj.getClass)) @@ -123,7 +127,8 @@ object Serializer { /** * @author Jonas Bonér */ - object ScalaJSON extends Serializer { + object ScalaJSON extends ScalaJSON + class ScalaJSON extends Serializer { def deepClone(obj: AnyRef): AnyRef = in(out(obj), None) def out(obj: AnyRef): Array[Byte] = SJSONSerializer.SJSON.out(obj) @@ -136,7 +141,8 @@ object Serializer { /** * @author Jonas Bonér */ - object SBinary { + object SBinary extends SBinary + class SBinary { import sbinary.DefaultProtocol._ def deepClone[T <: AnyRef](obj: T)(implicit w : Writes[T], r : Reads[T]): T = in[T](out[T](obj), None) diff --git a/akka-core/src/main/scala/stm/Transaction.scala b/akka-core/src/main/scala/stm/Transaction.scala index 606fa33fb4..0c1aaf9f93 100644 --- a/akka-core/src/main/scala/stm/Transaction.scala +++ b/akka-core/src/main/scala/stm/Transaction.scala @@ -21,7 +21,13 @@ class NoTransactionInScopeException extends RuntimeException class TransactionRetryException(message: String) extends RuntimeException(message) /** - * Example of atomic transaction management using the atomic block: + * Example of atomic transaction management using the atomic block. + * These blocks takes an implicit argument String defining the transaction family name. + * If these blocks are used from within an Actor then the name is automatically resolved, if not either: + * 1. define an implicit String with the name in the same scope + * 2. pass in the name explicitly + * + * Here are some examples (assuming implicit transaction family name in scope): *

  * import se.scalablesolutions.akka.stm.Transaction._
  * 
@@ -95,17 +101,17 @@ object Transaction extends TransactionManagement {
   /**
    * See ScalaDoc on class.
    */
-  def map[T](f: Transaction => T): T = atomic { f(getTransactionInScope) }
+  def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
 
   /**
    * See ScalaDoc on class.
    */
-  def flatMap[T](f: Transaction => T): T = atomic { f(getTransactionInScope) }
+  def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
 
   /**
    * See ScalaDoc on class.
    */
-  def foreach(f: Transaction => Unit): Unit = atomic { f(getTransactionInScope) }
+  def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) }
 
   /**
    * Creates a "pure" STM atomic transaction and by-passes all transactions hooks
@@ -113,15 +119,15 @@ object Transaction extends TransactionManagement {
    * Only for internal usage.
    */
   private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
-    getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
+    getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
     def execute(mtx: MultiverseTransaction): T = body
   }.execute()
 
   /**
    * See ScalaDoc on class.
    */
-  def atomic[T](body: => T): T = new AtomicTemplate[T](
-    getGlobalStmInstance, "akka", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
+  def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T](
+    getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
     def execute(mtx: MultiverseTransaction): T = body
     override def postStart(mtx: MultiverseTransaction) = {
       val tx = new Transaction
@@ -137,8 +143,8 @@ object Transaction extends TransactionManagement {
   /**
    * See ScalaDoc on class.
    */
-  def atomic[T](retryCount: Int)(body: => T): T = {
-    new AtomicTemplate[T](getGlobalStmInstance, "akka", false, false, retryCount) {
+  def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
+    new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) {
       def execute(mtx: MultiverseTransaction): T = body
       override def postStart(mtx: MultiverseTransaction) = {
         val tx = new Transaction
@@ -155,8 +161,8 @@ object Transaction extends TransactionManagement {
   /**
    * See ScalaDoc on class.
    */
-  def atomicReadOnly[T](retryCount: Int)(body: => T): T = {
-    new AtomicTemplate[T](getGlobalStmInstance, "akka", false, true, retryCount) {
+  def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
+    new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) {
       def execute(mtx: MultiverseTransaction): T = body
       override def postStart(mtx: MultiverseTransaction) = {
         val tx = new Transaction
diff --git a/akka-core/src/main/scala/stm/TransactionalState.scala b/akka-core/src/main/scala/stm/TransactionalState.scala
index 16fc4f31f8..da225edc53 100644
--- a/akka-core/src/main/scala/stm/TransactionalState.scala
+++ b/akka-core/src/main/scala/stm/TransactionalState.scala
@@ -73,6 +73,7 @@ object TransactionalRef {
  * @author Jonas Bonér
  */
 class TransactionalRef[T] extends Transactional {
+  implicit val txInitName = "TransactionalRef:Init"
   import org.multiverse.api.ThreadLocalTransaction._
   val uuid = Uuid.newUuid.toString
 
diff --git a/akka-fun-test-java/pom.xml b/akka-fun-test-java/pom.xml
index 0e9ac16202..24868c06e4 100644
--- a/akka-fun-test-java/pom.xml
+++ b/akka-fun-test-java/pom.xml
@@ -94,6 +94,7 @@
         maven-surefire-plugin
         
           
+            **/InMemNestedStateTest*
             **/*Persistent*
           
         
diff --git a/akka-util/pom.xml b/akka-util/pom.xml
index 5a7997a3af..c6d21c0f30 100644
--- a/akka-util/pom.xml
+++ b/akka-util/pom.xml
@@ -23,7 +23,7 @@
     
       net.lag
       configgy
-      1.4
+      1.4.7
     
   
 
diff --git a/config/akka-reference.conf b/config/akka-reference.conf
index ef36fe5cb3..3d6a9106a9 100644
--- a/config/akka-reference.conf
+++ b/config/akka-reference.conf
@@ -39,8 +39,9 @@
     zlib-compression-level = 6  # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
 
     
-      name = "default"
-      actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor"
+      name = "default"                                                       # The name of the cluster
+      actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor"         # FQN of an implementation of ClusterActor
+      serializer = "se.scalablesolutions.akka.serialization.Serializer.Java" # FQN of the serializer class