From bd13afdfd0678db4c99586d373558e2c736da346 Mon Sep 17 00:00:00 2001 From: Viktor Klang Date: Mon, 30 Aug 2010 13:17:58 +0200 Subject: [PATCH] Fixing master --- .../se/scalablesolutions/akka/util/UUID.java | 59 +++++++++++++++++++ akka-actor/src/main/scala/actor/Actor.scala | 2 - .../src/main/scala/actor/ActorRef.scala | 2 - .../src/main/scala/stm/Transaction.scala | 15 ++--- akka-actor/src/main/scala/util/Logging.scala | 2 +- .../main/scala/util/ReflectiveAccess.scala | 30 ++++++++-- akka-actor/src/main/scala/util/Uuid.scala | 12 ---- .../stm => akka-jta/src/main/scala}/JTA.scala | 5 ++ .../akka/remote/protocol/RemoteProtocol.java | 0 project/build/AkkaProject.scala | 6 +- 10 files changed, 101 insertions(+), 32 deletions(-) create mode 100644 akka-actor/src/main/java/se/scalablesolutions/akka/util/UUID.java delete mode 100644 akka-actor/src/main/scala/util/Uuid.scala rename {akka-actor/src/main/scala/stm => akka-jta/src/main/scala}/JTA.scala (98%) rename {akka-typed-actor => akka-remote}/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java (100%) diff --git a/akka-actor/src/main/java/se/scalablesolutions/akka/util/UUID.java b/akka-actor/src/main/java/se/scalablesolutions/akka/util/UUID.java new file mode 100644 index 0000000000..f06aab34df --- /dev/null +++ b/akka-actor/src/main/java/se/scalablesolutions/akka/util/UUID.java @@ -0,0 +1,59 @@ +/** + * Copyright (C) 2009-2010 Scalable Solutions AB + */ +package se.scalablesolutions.akka.util; + +/** + * NOTE: + *

+ * This code is based on code from the [Plasmid Replication Engine] project. + *
+ * Licensed under [Mozilla Public License 1.0 (MPL)]. + *

+ * Original JavaDoc: + *

+ * Our distributed objects are generally named most efficiently (and cleanly) + * by their UUID's. This class provides some static helpers for using UUID's. + * If it was efficient to do in Java, I would make the uuid an normal class + * and use instances of it. However, in current JVM's, we would end up using an + * Object to represent a long, which is pretty expensive. Maybe someday. ### + *

+ * UUID format: currently using currentTimeMillis() for the low bits. This uses + * about 40 bits for the next 1000 years, leaving 24 bits for debugging + * and consistency data. I'm using 8 of those for a magic asci 'U' byte. + *

+ * Future: use one instance of Uuid per type of object for better performance + * and more detailed info (instance could be matched to its uuid's via a map or + * array). This all static version bites.### + */ +public final class UUID { + + public static final long UUID_NONE = 0; + public static final long UUID_WILD = -1; + public static final long UUID_MAGICMASK = 0xff << 56; + public static final long UUID_MAGIC = 'U' << 56; + + protected static long lastTime; + + /** + * Generate and return a new Universally Unique ID. + * Happens to be monotonically increasing. + */ + public synchronized static long newUuid() { + long time = System.currentTimeMillis(); + + if (time <= lastTime) { + time = lastTime + 1; + } + lastTime = time; + return UUID_MAGIC | time; + } + + /** + * Returns true if uuid could have been generated by Uuid. + */ + public static boolean isValid(final long uuid) { + return (uuid & UUID_MAGICMASK) == UUID_MAGIC + && (uuid & ~UUID_MAGICMASK) != 0; + } +} \ No newline at end of file diff --git a/akka-actor/src/main/scala/actor/Actor.scala b/akka-actor/src/main/scala/actor/Actor.scala index 6a5365f3fb..d3e7699403 100644 --- a/akka-actor/src/main/scala/actor/Actor.scala +++ b/akka-actor/src/main/scala/actor/Actor.scala @@ -11,8 +11,6 @@ import se.scalablesolutions.akka.util.Helpers.{narrow, narrowSilently} import se.scalablesolutions.akka.util.{Logging, Duration} import se.scalablesolutions.akka.AkkaException -import com.google.protobuf.Message - import java.util.concurrent.TimeUnit import java.net.InetSocketAddress diff --git a/akka-actor/src/main/scala/actor/ActorRef.scala b/akka-actor/src/main/scala/actor/ActorRef.scala index 2f17e830a9..a6b42db579 100644 --- a/akka-actor/src/main/scala/actor/ActorRef.scala +++ b/akka-actor/src/main/scala/actor/ActorRef.scala @@ -19,8 +19,6 @@ import org.multiverse.api.ThreadLocalTransaction._ import org.multiverse.commitbarriers.CountDownCommitBarrier import org.multiverse.api.exceptions.DeadTransactionException -import org.codehaus.aspectwerkz.joinpoint.JoinPoint - import java.net.InetSocketAddress import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.atomic.AtomicReference diff --git a/akka-actor/src/main/scala/stm/Transaction.scala b/akka-actor/src/main/scala/stm/Transaction.scala index 84ea426d05..682e9dd8f1 100644 --- a/akka-actor/src/main/scala/stm/Transaction.scala +++ b/akka-actor/src/main/scala/stm/Transaction.scala @@ -7,10 +7,10 @@ package se.scalablesolutions.akka.stm import java.util.concurrent.atomic.AtomicLong import java.util.concurrent.atomic.AtomicInteger -import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry} - import scala.collection.mutable.HashMap +import se.scalablesolutions.akka.util.ReflectiveAccess.JTAModule._ + import se.scalablesolutions.akka.util.Logging import se.scalablesolutions.akka.config.Config._ import se.scalablesolutions.akka.AkkaException @@ -92,8 +92,8 @@ object Transaction { private[this] val persistentStateMap = new HashMap[String, Committable with Abortable] private[akka] val depth = new AtomicInteger(0) - val jta: Option[TransactionContainer] = - if (JTA_AWARE) Some(TransactionContainer()) + val jta: Option[TransactionContainer] = + if (JTA_AWARE) Some(createTransactionContainer) else None log.trace("Creating transaction " + toString) @@ -102,10 +102,7 @@ object Transaction { def begin = synchronized { log.trace("Starting transaction " + toString) - jta.foreach { txContainer => - txContainer.begin - txContainer.registerSynchronization(new StmSynchronization(txContainer, this)) - } + jta.foreach { _.beginWithStmSynchronization(this) } } def commit = synchronized { @@ -132,7 +129,7 @@ object Transaction { // --- internal methods --------- - private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE + //private def isJtaTxActive(status: Int) = status == Status.STATUS_ACTIVE private[akka] def status_? = status diff --git a/akka-actor/src/main/scala/util/Logging.scala b/akka-actor/src/main/scala/util/Logging.scala index aea7c93740..f13c229653 100644 --- a/akka-actor/src/main/scala/util/Logging.scala +++ b/akka-actor/src/main/scala/util/Logging.scala @@ -17,7 +17,7 @@ import java.net.UnknownHostException * @author Jonas Bonér */ trait Logging { - @sjson.json.JSONProperty(ignore = true) @transient lazy val log = Logger(this.getClass.getName) + @transient lazy val log = Logger(this.getClass.getName) } /** diff --git a/akka-actor/src/main/scala/util/ReflectiveAccess.scala b/akka-actor/src/main/scala/util/ReflectiveAccess.scala index da9fb2f3c6..6a050ef085 100644 --- a/akka-actor/src/main/scala/util/ReflectiveAccess.scala +++ b/akka-actor/src/main/scala/util/ReflectiveAccess.scala @@ -9,6 +9,8 @@ import se.scalablesolutions.akka.dispatch.{Future, CompletableFuture} import se.scalablesolutions.akka.config.{Config, ModuleNotAvailableException} import java.net.InetSocketAddress +import se.scalablesolutions.akka.stm.Transaction +import se.scalablesolutions.akka.AkkaException /** * Helper class for reflective access to different modules in order to allow optional loading of modules. @@ -64,7 +66,7 @@ object ReflectiveAccess { val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) ctor.setAccessible(true) Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteClientObject]) - } catch { case e => None } + } catch { case e: Exception => None } } def register(address: InetSocketAddress, uuid: String) = { @@ -128,7 +130,7 @@ object ReflectiveAccess { val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) ctor.setAccessible(true) Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteServerObject]) - } catch { case e => None } + } catch { case e: Exception => None } } val remoteNodeObjectInstance: Option[RemoteNodeObject] = { @@ -137,7 +139,7 @@ object ReflectiveAccess { val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) ctor.setAccessible(true) Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[RemoteNodeObject]) - } catch { case e => None } + } catch { case e: Exception => None } } def registerActor(address: InetSocketAddress, uuid: String, actorRef: ActorRef) = { @@ -179,7 +181,7 @@ object ReflectiveAccess { val ctor = clazz.getDeclaredConstructor(Array[Class[_]](): _*) ctor.setAccessible(true) Some(ctor.newInstance(Array[AnyRef](): _*).asInstanceOf[TypedActorObject]) - } catch { case e => None } + } catch { case e: Exception => None } } def resolveFutureIfMessageIsJoinPoint(message: Any, future: Future[_]): Boolean = { @@ -190,4 +192,24 @@ object ReflectiveAccess { typedActorObjectInstance.get.isJoinPoint(message) } } + + object JTAModule { + + type TransactionContainer = { + def beginWithStmSynchronization(transaction: Transaction): Unit + def commit: Unit + def rollback: Unit + } + + def createTransactionContainer: TransactionContainer = { + try { + val clazz = Class.forName("se.scalablesolutions.akka.stm.TransactionContainer$") + val instance = clazz.getDeclaredField("MODULE$") + val applyMethod = clazz.getDeclaredMethod("apply") + applyMethod.invoke(instance.get(null)).asInstanceOf[TransactionContainer] + } catch { + case cnfe: ClassNotFoundException => throw new IllegalStateException("Couldn't locale akka-jta, make sure you have it on your classpath") + } + } + } } diff --git a/akka-actor/src/main/scala/util/Uuid.scala b/akka-actor/src/main/scala/util/Uuid.scala deleted file mode 100644 index b437b6288c..0000000000 --- a/akka-actor/src/main/scala/util/Uuid.scala +++ /dev/null @@ -1,12 +0,0 @@ -/** - * Copyright (C) 2009-2010 Scalable Solutions AB - */ - -package se.scalablesolutions.akka.util - -/** - * Factory object for very fast UUID generation. - */ -object UUID { - def newUuid: Long = org.codehaus.aspectwerkz.proxy.Uuid.newUuid -} diff --git a/akka-actor/src/main/scala/stm/JTA.scala b/akka-jta/src/main/scala/JTA.scala similarity index 98% rename from akka-actor/src/main/scala/stm/JTA.scala rename to akka-jta/src/main/scala/JTA.scala index 485f3e5104..72c5a9894d 100644 --- a/akka-actor/src/main/scala/stm/JTA.scala +++ b/akka-jta/src/main/scala/JTA.scala @@ -123,6 +123,11 @@ class TransactionContainer private ( } } + def beginWithStmSynchronization(transaction: Transaction) = { + begin + registerSynchronization(new StmSynchronization(this, transaction)) + } + def begin = { TransactionContainer.log.trace("Starting JTA transaction") tm match { diff --git a/akka-typed-actor/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java b/akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java similarity index 100% rename from akka-typed-actor/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java rename to akka-remote/src/main/java/se/scalablesolutions/akka/remote/protocol/RemoteProtocol.java diff --git a/project/build/AkkaProject.scala b/project/build/AkkaProject.scala index a5dbc5bd93..e49514c3d2 100644 --- a/project/build/AkkaProject.scala +++ b/project/build/AkkaProject.scala @@ -75,6 +75,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val nettyModuleConfig = ModuleConfiguration("org.jboss.netty", JBossRepo) lazy val scalaTestModuleConfig = ModuleConfiguration("org.scalatest", ScalaToolsSnapshots) lazy val logbackModuleConfig = ModuleConfiguration("ch.qos.logback",sbt.DefaultMavenRepository) + lazy val atomikosModuleConfig = ModuleConfiguration("com.atomikos",sbt.DefaultMavenRepository) lazy val embeddedRepo = EmbeddedRepo // This is the only exception, because the embedded repo is fast! // ------------------------------------------------------------------------------------------------------------------- @@ -226,7 +227,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { lazy val akka_spring = project("akka-spring", "akka-spring", new AkkaSpringProject(_), akka_remote, akka_camel) lazy val akka_jta = project("akka-jta", "akka-jta", new AkkaJTAProject(_), akka_remote) lazy val akka_kernel = project("akka-kernel", "akka-kernel", new AkkaKernelProject(_), - akka_remote, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp) + akka_remote, akka_jta, akka_http, akka_spring, akka_camel, akka_persistence, akka_amqp) lazy val akka_osgi = project("akka-osgi", "akka-osgi", new AkkaOSGiParentProject(_)) lazy val akka_samples = project("akka-samples", "akka-samples", new AkkaSamplesParentProject(_)) @@ -355,6 +356,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val aopalliance = Dependencies.aopalliance val werkz = Dependencies.werkz val werkz_core = Dependencies.werkz_core + val guicey = Dependencies.guicey // testing val junit = Dependencies.junit @@ -528,7 +530,7 @@ class AkkaParentProject(info: ProjectInfo) extends DefaultProject(info) { val atomikos_transactions = Dependencies.atomikos_transactions val atomikos_transactions_api = Dependencies.atomikos_transactions_api val atomikos_transactions_jta = Dependencies.atomikos_transactions_jta - val jta_1_1 = Dependencies.jta_1_1 + //val jta_1_1 = Dependencies.jta_1_1 //val atomikos_transactions_util = "com.atomikos" % "transactions-util" % "3.2.3" % "compile" }