Fixing master

This commit is contained in:
Viktor Klang 2010-08-30 13:17:58 +02:00
parent be32e80985
commit bd13afdfd0
10 changed files with 101 additions and 32 deletions

View file

@ -0,0 +1,59 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util;
/**
* NOTE:
* <p/>
* This code is based on code from the [Plasmid Replication Engine] project.
* <br/>
* Licensed under [Mozilla Public License 1.0 (MPL)].
* <p/>
* Original JavaDoc:
* <p/>
* Our distributed objects are generally named most efficiently (and cleanly)
* by their UUID's. This class provides some static helpers for using UUID's.
* If it was efficient to do in Java, I would make the uuid an normal class
* and use instances of it. However, in current JVM's, we would end up using an
* Object to represent a long, which is pretty expensive. Maybe someday. ###
* <p/>
* UUID format: currently using currentTimeMillis() for the low bits. This uses
* about 40 bits for the next 1000 years, leaving 24 bits for debugging
* and consistency data. I'm using 8 of those for a magic asci 'U' byte.
* <p/>
* Future: use one instance of Uuid per type of object for better performance
* and more detailed info (instance could be matched to its uuid's via a map or
* array). This all static version bites.###
*/
public final class UUID {
public static final long UUID_NONE = 0;
public static final long UUID_WILD = -1;
public static final long UUID_MAGICMASK = 0xff << 56;
public static final long UUID_MAGIC = 'U' << 56;
protected static long lastTime;
/**
* Generate and return a new Universally Unique ID.
* Happens to be monotonically increasing.
*/
public synchronized static long newUuid() {
long time = System.currentTimeMillis();
if (time <= lastTime) {
time = lastTime + 1;
}
lastTime = time;
return UUID_MAGIC | time;
}
/**
* Returns true if uuid could have been generated by Uuid.
*/
public static boolean isValid(final long uuid) {
return (uuid & UUID_MAGICMASK) == UUID_MAGIC
&& (uuid & ~UUID_MAGICMASK) != 0;
}
}

View file

@ -11,8 +11,6 @@ import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently}
import se.scalablesolutions.akka.util.{Logging, Duration}
import se.scalablesolutions.akka.AkkaException
import com.google.protobuf.Message
import java.util.concurrent.TimeUnit
import java.net.InetSocketAddress

View file

@ -19,8 +19,6 @@ import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.api.exceptions.DeadTransactionException
import org.codehaus.aspectwerkz.joinpoint.JoinPoint
import java.net.InetSocketAddress
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.atomic.AtomicReference

View file

@ -7,10 +7,10 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry}
import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.util.ReflectiveAccess.JTAModule._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.AkkaException
@ -92,8 +92,8 @@ object Transaction {
private[this] val persistentStateMap = new HashMap[String, Committable with Abortable]
private[akka] val depth = new AtomicInteger(0)
val jta: Option[TransactionContainer] =
if (JTA_AWARE) Some(TransactionContainer())
val jta: Option[TransactionContainer] =
if (JTA_AWARE) Some(createTransactionContainer)
else None
log.trace("Creating transaction " + toString)
@ -102,10 +102,7 @@ object Transaction {
def begin = synchronized {
log.trace("Starting transaction " + toString)
jta.foreach { txContainer =>
txContainer.begin
txContainer.registerSynchronization(new StmSynchronization(txContainer, this))
}
jta.foreach { _.beginWithStmSynchronization(this) }
}
def commit = synchronized {
@ -132,7 +129,7 @@ object Transaction {
// --- internal methods ---------
private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
//private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE
private[akka] def status_? = status

View file

@ -17,7 +17,7 @@ import java.net.UnknownHostException
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Logging {
@sjson.json.JSONProperty(ignore = true) @transient lazy val log = Logger(this.getClass.getName)
@transient lazy val log = Logger(this.getClass.getName)
}
/**

View file

@ -9,6 +9,8 @@ import se.scalablesolutions.akka.dispatch.{Future, CompletableFuture}
import se.scalablesolutions.akka.config.{Config, ModuleNotAvailableException}
import java.net.InetSocketAddress
import se.scalablesolutions.akka.stm.Transaction
import se.scalablesolutions.akka.AkkaException
/**
* Helper class for reflective access to different modules in order to allow optional loading of modules.
@ -64,7 +66,7 @@ object ReflectiveAccess {
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteClientObject])
} catch { case e => None }
} catch { case e: Exception => None }
}
def register(address: InetSocketAddress, uuid: String) = {
@ -128,7 +130,7 @@ object ReflectiveAccess {
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteServerObject])
} catch { case e => None }
} catch { case e: Exception => None }
}
val remoteNodeObjectInstance: Option[RemoteNodeObject] = {
@ -137,7 +139,7 @@ object ReflectiveAccess {
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteNodeObject])
} catch { case e => None }
} catch { case e: Exception => None }
}
def registerActor(address: InetSocketAddress, uuid: String, actorRef: ActorRef) = {
@ -179,7 +181,7 @@ object ReflectiveAccess {
val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*)
ctor.setAccessible(true)
Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[TypedActorObject])
} catch { case e => None }
} catch { case e: Exception => None }
}
def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = {
@ -190,4 +192,24 @@ object ReflectiveAccess {
typedActorObjectInstance.get.isJoinPoint(message)
}
}
object JTAModule {
type TransactionContainer = {
def beginWithStmSynchronization(transaction: Transaction): Unit
def commit: Unit
def rollback: Unit
}
def createTransactionContainer: TransactionContainer = {
try {
val clazz = Class.forName("se.scalablesolutions.akka.stm.TransactionContainer$")
val instance = clazz.getDeclaredField("MODULE$")
val applyMethod = clazz.getDeclaredMethod("apply")
applyMethod.invoke(instance.get(null)).asInstanceOf[TransactionContainer]
} catch {
case cnfe: ClassNotFoundException => throw new IllegalStateException("Couldn't locale akka-jta, make sure you have it on your classpath")
}
}
}
}

View file

@ -1,12 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
/**
* Factory object for very fast UUID generation.
*/
object UUID {
def newUuid: Long = org.codehaus.aspectwerkz.proxy.Uuid.newUuid
}

View file

@ -123,6 +123,11 @@ class TransactionContainer private (
}
}
def beginWithStmSynchronization(transaction: Transaction) = {
begin
registerSynchronization(new StmSynchronization(this, transaction))
}
def begin = {
TransactionContainer.log.trace("Starting JTA transaction")
tm match {

View file

@ -75,6 +75,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo)
lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots)
lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository)
lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository)
lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast!
// -------------------------------------------------------------------------------------------------------------------
@ -226,7 +227,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_remote, akka_camel)
lazy val akka_jta = project("akka-jta", "akka-jta", new AkkaJTAProject(_), akka_remote)
lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_),
akka_remote, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp)
akka_remote, akka_jta, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp)
lazy val akka_osgi = project("akka-osgi", "akka-osgi", new AkkaOSGiParentProject(_))
lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_))
@ -355,6 +356,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val aopalliance = Dependencies.aopalliance
val werkz = Dependencies.werkz
val werkz_core = Dependencies.werkz_core
val guicey = Dependencies.guicey
// testing
val junit = Dependencies.junit
@ -528,7 +530,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) {
val atomikos_transactions = Dependencies.atomikos_transactions
val atomikos_transactions_api = Dependencies.atomikos_transactions_api
val atomikos_transactions_jta = Dependencies.atomikos_transactions_jta
val jta_1_1 = Dependencies.jta_1_1
//val jta_1_1 = Dependencies.jta_1_1
//val atomikos_transactions_util = "com.atomikos" % "transactions-util" % "3.2.3" % "compile"
}