Merged in akka-servlet

This commit is contained in:
Viktor Klang 2010-04-05 13:37:33 +02:00
commit cb533857ed
16 changed files with 352 additions and 179 deletions

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest

View file

@ -12,7 +12,7 @@ import scala.collection.mutable.HashMap
import se.scalablesolutions.akka.util.Logging
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.{Transaction => MultiverseTransaction, TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
@ -22,172 +22,258 @@ import org.multiverse.stms.alpha.AlphaStm
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
/**
* Example of atomic transaction management using the atomic block.
* These blocks takes an implicit argument String defining the transaction family name.
* If these blocks are used from within an Actor then the name is automatically resolved, if not either:
* 1. define an implicit String with the name in the same scope
* 2. pass in the name explicitly
*
* Here are some examples (assuming implicit transaction family name in scope):
* <pre>
* import se.scalablesolutions.akka.stm.Transaction._
*
* atomic {
* .. // do something within a transaction
* }
* </pre>
*
* Example of atomic transaction management using atomic block with retry count:
* <pre>
* import se.scalablesolutions.akka.stm.Transaction._
*
* atomic(maxNrOfRetries) {
* .. // do something within a transaction
* }
* </pre>
*
* Example of atomically-orElse transaction management.
* Which is a good way to reduce contention and transaction collisions.
* <pre>
* import se.scalablesolutions.akka.stm.Transaction._
*
* atomically {
* .. // try to do something
* } orElse {
* .. // if transaction clashes try do do something else to minimize contention
* }
* </pre>
*
* Example of atomic transaction management using for comprehensions (monadic):
*
* <pre>
* import se.scalablesolutions.akka.stm.Transaction._
* for (tx <- Transaction) {
* ... // do transactional stuff
* }
*
* val result = for (tx <- Transaction) yield {
* ... // do transactional stuff yielding a result
* }
* </pre>
*
* Example of using Transaction and TransactionalRef in for comprehensions (monadic):
*
* <pre>
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
* for {
* tx <- Transaction
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
* val result = for {
* tx <- Transaction
* ref <- refs
* } yield {
* ... // use the ref inside a transaction, yield a result
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Transaction extends TransactionManagement with Logging {
object Transaction {
val idFactory = new AtomicLong(-1L)
/**
* See ScalaDoc on Transaction class.
*/
def map[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction class.
*/
def flatMap[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction class.
*/
def foreach(f: => Unit): Unit = atomic {f}
/**
* See ScalaDoc on Transaction class.
*/
def atomic[T](body: => T): T = {
var isTopLevelTransaction = true
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
txSet.joinCommit(mtx)
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
clearTransaction
result
}
override def onStart(mtx: MultiverseTransaction) = {
val txSet =
if (!isTransactionSetInScope) {
isTopLevelTransaction = true
createNewTransactionSet
} else getTransactionSetInScope
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
def run = tx.commit
})
txSet.registerOnAbortTask(new Runnable() {
def run = tx.abort
})
}
}.execute()
}
/**
* See ScalaDoc on class.
*/
def atomically[A](firstBody: => A) = elseBody(firstBody)
/**
* Should only be used together with <code>atomically</code> to form atomically-orElse constructs.
* See ScalaDoc on class.
*/
def elseBody[A](firstBody: => A) = new {
def orElse(secondBody: => A) = new OrElseTemplate[A] {
def run(t: MultiverseTransaction) = firstBody
def orelserun(t: MultiverseTransaction) = secondBody
}.execute()
}
/**
* Creates a STM atomic transaction and by-passes all transactions hooks
* such as persistence etc.
*
*
* Only for internal usage.
*/
private[akka] def atomic0[T](body: => T): T = new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
}.execute()
/**
* Module for "local" transaction management, local in the context of threads.
* You should only use these if you do <b>not</b> need to have one transaction span
* multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Local._
*
* atomic {
* .. // do something within a transaction
* }
* </pre>
*
* Example of atomically-orElse transaction management.
* Which is a good way to reduce contention and transaction collisions.
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Local._
*
* atomically {
* .. // try to do something
* } orElse {
* .. // if transaction clashes try do do something else to minimize contention
* }
* </pre>
*
* Example of atomic transaction management using for comprehensions (monadic):
*
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Local._
* for (tx <- Transaction.Local) {
* ... // do transactional stuff
* }
*
* val result = for (tx <- Transaction.Local) yield {
* ... // do transactional stuff yielding a result
* }
* </pre>
*
* Example of using Transaction and TransactionalRef in for comprehensions (monadic):
*
* <pre>
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
* for {
* tx <- Transaction.Local
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
* val result = for {
* tx <- Transaction.Local
* ref <- refs
* } yield {
* ... // use the ref inside a transaction, yield a result
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Local extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction.Local class.
*/
def map[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Local class.
*/
def flatMap[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Local class.
*/
def foreach(f: => Unit): Unit = atomic {f}
/**
* See ScalaDoc on Transaction.Local class.
*/
def atomic[T](body: => T): T = {
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
override def onStart(mtx: MultiverseTransaction) = {
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(tx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
case "postCommit" => tx.commit
case "postAbort" => tx.abort
case _ => {}
}
})
}
}.execute()
}
/**
* See ScalaDoc on Transaction.Local class.
*/
def atomically[A](firstBody: => A) = elseBody(firstBody)
/**
* Should only be used together with <code>atomically</code> to form atomically-orElse constructs.
* See ScalaDoc on class.
*/
def elseBody[A](firstBody: => A) = new {
def orElse(secondBody: => A) = new OrElseTemplate[A] {
def run(t: MultiverseTransaction) = firstBody
def orelserun(t: MultiverseTransaction) = secondBody
}.execute()
}
}
/**
* Module for "global" transaction management, global in the context of multiple threads.
* You have to use these if you do need to have one transaction span multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* Here are some examples (assuming implicit transaction family name in scope):
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Global._
*
* atomic {
* .. // do something within a transaction
* }
* </pre>
*
* Example of atomic transaction management using for comprehensions (monadic):
*
* <pre>
* import se.scalablesolutions.akka.stm.Transaction
* for (tx <- Transaction.Global) {
* ... // do transactional stuff
* }
*
* val result = for (tx <- Transaction.Global) yield {
* ... // do transactional stuff yielding a result
* }
* </pre>
*
* Example of using Transaction and TransactionalRef in for comprehensions (monadic):
*
* <pre>
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
* for {
* tx <- Transaction.Global
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
* val result = for {
* tx <- Transaction.Global
* ref <- refs
* } yield {
* ... // use the ref inside a transaction, yield a result
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Global extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction.Global class.
*/
def map[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Global class.
*/
def flatMap[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Global class.
*/
def foreach(f: => Unit): Unit = atomic {f}
/**
* See ScalaDoc on Transaction.Global class.
*/
def atomic[T](body: => T): T = {
var isTopLevelTransaction = false
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
txSet.joinCommit(mtx)
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
clearTransaction
result
}
override def onStart(mtx: MultiverseTransaction) = {
val txSet =
if (!isTransactionSetInScope) {
isTopLevelTransaction = true
createNewTransactionSet
} else getTransactionSetInScope
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
txSet.registerOnCommitTask(new Runnable() {
def run = tx.commit
})
txSet.registerOnAbortTask(new Runnable() {
def run = tx.abort
})
}
}.execute()
}
}
}
/**
* The Akka specific Transaction class, keeping track of persistent data structures (as in on-disc).
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable class Transaction extends Logging {
import Transaction._
val id = Transaction.idFactory.incrementAndGet
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[akka] var transaction: Option[MultiverseTransaction] = None
@ -199,15 +285,15 @@ object Transaction extends TransactionManagement with Logging {
// --- public methods ---------
def commit = synchronized {
log.trace("Committing transaction %s", toString)
atomic0 {
log.trace("Committing transaction %s", toString)
Transaction.atomic0 {
persistentStateMap.valuesIterator.foreach(_.commit)
}
status = TransactionStatus.Completed
}
def abort = synchronized {
log.trace("Aborting transaction %s", toString)
log.trace("Aborting transaction %s", toString)
}
def isNew = synchronized { status == TransactionStatus.New }
@ -244,7 +330,7 @@ object Transaction extends TransactionManagement with Logging {
throw new IllegalStateException(
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire
// For reinitialize transaction after sending it over the wire
/* private[akka] def reinit = synchronized {
import net.lag.logging.{Logger, Level}
if (log eq null) {

View file

@ -4,6 +4,8 @@
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.util.Logging
import java.util.concurrent.atomic.AtomicBoolean
import org.multiverse.api.ThreadLocalTransaction._
@ -49,9 +51,10 @@ object TransactionManagement extends TransactionManagement {
}
}
trait TransactionManagement {
trait TransactionManagement extends Logging {
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
log.trace("Creating new transaction set")
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
TransactionManagement.transactionSet.set(Some(txSet))
txSet
@ -63,9 +66,13 @@ trait TransactionManagement {
private[akka] def setTransaction(tx: Option[Transaction]) =
if (tx.isDefined) TransactionManagement.transaction.set(tx)
private[akka] def clearTransactionSet = TransactionManagement.transactionSet.set(None)
private[akka] def clearTransactionSet = {
log.trace("Clearing transaction set")
TransactionManagement.transactionSet.set(None)
}
private[akka] def clearTransaction = {
log.trace("Clearing transaction")
TransactionManagement.transaction.set(None)
setThreadLocalTransaction(null)
}

View file

@ -4,7 +4,6 @@
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.util.UUID
import org.multiverse.stms.alpha.AlphaRef

View file

@ -2,7 +2,7 @@ package se.scalablesolutions.akka.actor
import _root_.java.util.concurrent.TimeUnit
import se.scalablesolutions.akka.actor.Actor.transactor
import se.scalablesolutions.akka.stm.Transaction.atomic
import se.scalablesolutions.akka.stm.Transaction.Global.atomic
import se.scalablesolutions.akka.util.Logging
import org.scalatest.Suite

View file

@ -5,7 +5,7 @@ import org.scalatest.junit.JUnitSuite
import org.junit.Test
class ForwardActorTest extends JUnitSuite {
class ForwardActorSpec extends JUnitSuite {
object ForwardState {
var sender: Actor = null
@ -57,7 +57,7 @@ class ForwardActorTest extends JUnitSuite {
def shouldForwardActorReferenceWhenInvokingForwardOnBang = {
val senderActor = new BangSenderActor
senderActor.start
assert(ForwardState.finished.await(1, TimeUnit.SECONDS))
assert(ForwardState.finished.await(2, TimeUnit.SECONDS))
assert(ForwardState.sender ne null)
assert(senderActor === ForwardState.sender)
}
@ -66,6 +66,6 @@ class ForwardActorTest extends JUnitSuite {
def shouldForwardActorReferenceWhenInvokingForwardOnBangBang = {
val senderActor = new BangBangSenderActor
senderActor.start
assert(ForwardState.finished.await(1, TimeUnit.SECONDS))
assert(ForwardState.finished.await(2, TimeUnit.SECONDS))
}
}

View file

@ -0,0 +1,81 @@
package se.scalablesolutions.akka.actor
import se.scalablesolutions.akka.stm.Transaction.Local._
import se.scalablesolutions.akka.stm._
import org.scalatest.Spec
import org.scalatest.Assertions
import org.scalatest.matchers.ShouldMatchers
import org.scalatest.BeforeAndAfterAll
import org.scalatest.junit.JUnitRunner
import org.junit.runner.RunWith
@RunWith(classOf[JUnitRunner])
class StmSpec extends
Spec with
ShouldMatchers with
BeforeAndAfterAll {
describe("STM outside actors") {
it("should be able to do multiple consecutive atomic {..} statements") {
lazy val ref = TransactionalState.newRef[Int]
def increment = atomic {
ref.swap(ref.get.getOrElse(0) + 1)
}
def total: Int = atomic {
ref.get.getOrElse(0)
}
increment
increment
increment
total should equal(3)
}
it("should be able to do nested atomic {..} statements") {
lazy val ref = TransactionalState.newRef[Int]
def increment = atomic {
ref.swap(ref.get.getOrElse(0) + 1)
}
def total: Int = atomic {
ref.get.getOrElse(0)
}
atomic {
increment
increment
}
atomic {
increment
total should equal(3)
}
}
it("should roll back failing nested atomic {..} statements") {
lazy val ref = TransactionalState.newRef[Int]
def increment = atomic {
ref.swap(ref.get.getOrElse(0) + 1)
}
def total: Int = atomic {
ref.get.getOrElse(0)
}
try {
atomic {
increment
increment
throw new Exception
}
} catch {
case e => {}
}
total should equal(0)
}
}
}

View file

@ -35,4 +35,4 @@ object Kernel extends AkkaLoader {
case x: BootableRemoteActorService => x.startRemoteService
case _ =>
})
}
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>.
*/
package se.scalablesolutions.akka.sample.chat
package sample.chat
import scala.collection.mutable.HashMap
@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, RemoteActor}
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage
import se.scalablesolutions.akka.stm.Transaction._
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.util.Logging

View file

@ -1,4 +1,4 @@
package sample.java;
package sample.rest.java;
import se.scalablesolutions.akka.config.ActiveObjectConfigurator;
import static se.scalablesolutions.akka.config.JavaConfig.*;
@ -11,11 +11,11 @@ public class Boot {
new RestartStrategy(new OneForOne(), 3, 5000, new Class[]{Exception.class}),
new Component[] {
new Component(
sample.java.SimpleService.class,
SimpleService.class,
new LifeCycle(new Permanent()),
1000),
new Component(
sample.java.PersistentSimpleService.class,
PersistentSimpleService.class,
new LifeCycle(new Permanent()),
1000)
}).supervise();

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.java;
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.java;
package sample.rest.java;
import javax.ws.rs.Path;
import javax.ws.rs.GET;

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package sample.scala
package sample.rest.scala
import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
import se.scalablesolutions.akka.stm.TransactionalState

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.security.samples
package sample.security
import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._

View file

@ -20,9 +20,9 @@
# FQN to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
boot = ["sample.camel.Boot",
"sample.java.Boot",
"sample.scala.Boot",
"se.scalablesolutions.akka.security.samples.Boot"]
"sample.rest.java.Boot",
"sample.rest.scala.Boot",
"sample.security.Boot"]
<actor>
timeout = 5000 # default timeout for future based invocations
@ -41,8 +41,8 @@
service = on
hostname = "localhost"
port = 9998
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
authenticator = "se.scalablesolutions.akka.security.samples.BasicAuthenticationService" # The authentication service to use
filters = ["se.scalablesolutions.akka.security.AkkaSecurityFilterFactory"] # List with all jersey filters to use
authenticator = "sample.security.BasicAuthenticationService" # The authentication service to use. Need to be overridden (uses sample now)
</rest>
<remote>

View file

@ -456,7 +456,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// docs
if (genDocs) {
val DOC_FILE_NAME = moduleName + "_%s-%s-%s.jar".format(buildScalaVersion, version, "doc")
val DOC_FILE_NAME = moduleName + "_%s-%s-%s.jar".format(buildScalaVersion, version, "docs")
val DOC_FILE_PATH = projectPath + "/target/scala_%s/".format(buildScalaVersion) + DOC_FILE_NAME
val fromDoc = Path.fromFile(new java.io.File(DOC_FILE_PATH))
val toDoc = Path.fromFile(new java.io.File(toDir + "/" + DOC_FILE_NAME))