merged sbt branch with master
This commit is contained in:
commit
b7ed47ee73
31 changed files with 525 additions and 740 deletions
|
|
@ -44,7 +44,8 @@ class JGroupsClusterActor extends BasicClusterActor {
|
||||||
log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer
|
log debug "UNSUPPORTED: JGroupsClusterActor::unblock" //TODO HotSwap back and flush the buffer
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
channel.map(_.connect(name))
|
|
||||||
|
channel.foreach(_.connect(name))
|
||||||
}
|
}
|
||||||
|
|
||||||
protected def toOneNode(dest : Address, msg: Array[Byte]): Unit =
|
protected def toOneNode(dest : Address, msg: Array[Byte]): Unit =
|
||||||
|
|
|
||||||
|
|
@ -85,6 +85,7 @@ class ShoalClusterActor extends BasicClusterActor {
|
||||||
*/
|
*/
|
||||||
protected def createCallback : CallBack = {
|
protected def createCallback : CallBack = {
|
||||||
import org.scala_tools.javautils.Imports._
|
import org.scala_tools.javautils.Imports._
|
||||||
|
import ClusterActor._
|
||||||
val me = this
|
val me = this
|
||||||
new CallBack {
|
new CallBack {
|
||||||
def processNotification(signal : Signal) {
|
def processNotification(signal : Signal) {
|
||||||
|
|
@ -92,10 +93,10 @@ class ShoalClusterActor extends BasicClusterActor {
|
||||||
signal.acquire()
|
signal.acquire()
|
||||||
if(isActive) {
|
if(isActive) {
|
||||||
signal match {
|
signal match {
|
||||||
case ms : MessageSignal => me send Message(ms.getMemberToken,ms.getMessage)
|
case ms : MessageSignal => me send Message[ADDR_T](ms.getMemberToken,ms.getMessage)
|
||||||
case jns : JoinNotificationSignal => me send View(Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName)
|
case jns : JoinNotificationSignal => me send View[ADDR_T](Set[ADDR_T]() ++ jns.getCurrentCoreMembers.asScala - serverName)
|
||||||
case fss : FailureSuspectedSignal => me send Zombie(fss.getMemberToken)
|
case fss : FailureSuspectedSignal => me send Zombie[ADDR_T](fss.getMemberToken)
|
||||||
case fns : FailureNotificationSignal => me send Zombie(fns.getMemberToken)
|
case fns : FailureNotificationSignal => me send Zombie[ADDR_T](fns.getMemberToken)
|
||||||
case _ => log.debug("Unhandled signal: [%s]",signal)
|
case _ => log.debug("Unhandled signal: [%s]",signal)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -10,13 +10,14 @@ import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, F
|
||||||
import se.scalablesolutions.akka.config.ScalaConfig._
|
import se.scalablesolutions.akka.config.ScalaConfig._
|
||||||
import se.scalablesolutions.akka.stm.Transaction._
|
import se.scalablesolutions.akka.stm.Transaction._
|
||||||
import se.scalablesolutions.akka.stm.TransactionManagement._
|
import se.scalablesolutions.akka.stm.TransactionManagement._
|
||||||
import se.scalablesolutions.akka.stm.{StmException, TransactionManagement}
|
import se.scalablesolutions.akka.stm.TransactionManagement
|
||||||
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
|
import se.scalablesolutions.akka.remote.protobuf.RemoteProtocol.RemoteRequest
|
||||||
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
|
import se.scalablesolutions.akka.remote.{RemoteProtocolBuilder, RemoteClient, RemoteRequestIdFactory}
|
||||||
import se.scalablesolutions.akka.serialization.Serializer
|
import se.scalablesolutions.akka.serialization.Serializer
|
||||||
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
|
import se.scalablesolutions.akka.util.{HashCode, Logging, UUID}
|
||||||
|
|
||||||
import org.multiverse.api.ThreadLocalTransaction._
|
import org.multiverse.api.ThreadLocalTransaction._
|
||||||
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||||
|
|
||||||
import java.util.{Queue, HashSet}
|
import java.util.{Queue, HashSet}
|
||||||
import java.util.concurrent.ConcurrentLinkedQueue
|
import java.util.concurrent.ConcurrentLinkedQueue
|
||||||
|
|
@ -98,9 +99,7 @@ object Actor extends Logging {
|
||||||
* The actor is started when created.
|
* The actor is started when created.
|
||||||
* Example:
|
* Example:
|
||||||
* <pre>
|
* <pre>
|
||||||
* import Actor._
|
* val a = Actor.init {
|
||||||
*
|
|
||||||
* val a = actor {
|
|
||||||
* ... // init stuff
|
* ... // init stuff
|
||||||
* } receive {
|
* } receive {
|
||||||
* case msg => ... // handle message
|
* case msg => ... // handle message
|
||||||
|
|
@ -108,8 +107,8 @@ object Actor extends Logging {
|
||||||
* </pre>
|
* </pre>
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
def actor(body: => Unit) = {
|
def init[A](body: => Unit) = {
|
||||||
def handler(body: => Unit) = new {
|
def handler[A](body: => Unit) = new {
|
||||||
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
|
def receive(handler: PartialFunction[Any, Unit]) = new Actor() {
|
||||||
start
|
start
|
||||||
body
|
body
|
||||||
|
|
@ -198,7 +197,7 @@ object Actor extends Logging {
|
||||||
*/
|
*/
|
||||||
trait Actor extends TransactionManagement {
|
trait Actor extends TransactionManagement {
|
||||||
implicit protected val self: Option[Actor] = Some(this)
|
implicit protected val self: Option[Actor] = Some(this)
|
||||||
implicit protected val transactionFamily: String = this.getClass.getName
|
implicit protected val transactionFamilyName: String = this.getClass.getName
|
||||||
|
|
||||||
// Only mutable for RemoteServer in order to maintain identity across nodes
|
// Only mutable for RemoteServer in order to maintain identity across nodes
|
||||||
private[akka] var _uuid = UUID.newUuid.toString
|
private[akka] var _uuid = UUID.newUuid.toString
|
||||||
|
|
@ -219,6 +218,7 @@ trait Actor extends TransactionManagement {
|
||||||
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
|
private[akka] var _replyToAddress: Option[InetSocketAddress] = None
|
||||||
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
|
private[akka] val _mailbox: Queue[MessageInvocation] = new ConcurrentLinkedQueue[MessageInvocation]
|
||||||
|
|
||||||
|
|
||||||
// ====================================
|
// ====================================
|
||||||
// protected fields
|
// protected fields
|
||||||
// ====================================
|
// ====================================
|
||||||
|
|
@ -309,9 +309,9 @@ trait Actor extends TransactionManagement {
|
||||||
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
* If 'trapExit' is set for the actor to act as supervisor, then a faultHandler must be defined.
|
||||||
* Can be one of:
|
* Can be one of:
|
||||||
* <pre/>
|
* <pre/>
|
||||||
* AllForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
|
* faultHandler = Some(AllForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||||
*
|
*
|
||||||
* OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int)
|
* faultHandler = Some(OneForOneStrategy(maxNrOfRetries, withinTimeRange))
|
||||||
* </pre>
|
* </pre>
|
||||||
*/
|
*/
|
||||||
protected var faultHandler: Option[FaultHandlingStrategy] = None
|
protected var faultHandler: Option[FaultHandlingStrategy] = None
|
||||||
|
|
@ -334,8 +334,8 @@ trait Actor extends TransactionManagement {
|
||||||
/**
|
/**
|
||||||
* User overridable callback/setting.
|
* User overridable callback/setting.
|
||||||
*
|
*
|
||||||
* Partial function implementing the server logic.
|
* Partial function implementing the actor logic.
|
||||||
* To be implemented by subclassing server.
|
* To be implemented by subclassing actor.
|
||||||
* <p/>
|
* <p/>
|
||||||
* Example code:
|
* Example code:
|
||||||
* <pre>
|
* <pre>
|
||||||
|
|
@ -785,6 +785,11 @@ trait Actor extends TransactionManagement {
|
||||||
}
|
}
|
||||||
|
|
||||||
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
|
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
|
||||||
|
if (isTransactionSetInScope) {
|
||||||
|
log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
|
||||||
|
getTransactionSetInScope.incParties
|
||||||
|
}
|
||||||
|
|
||||||
if (_remoteAddress.isDefined) {
|
if (_remoteAddress.isDefined) {
|
||||||
val requestBuilder = RemoteRequest.newBuilder
|
val requestBuilder = RemoteRequest.newBuilder
|
||||||
.setId(RemoteRequestIdFactory.nextId)
|
.setId(RemoteRequestIdFactory.nextId)
|
||||||
|
|
@ -796,8 +801,7 @@ trait Actor extends TransactionManagement {
|
||||||
.setIsEscaped(false)
|
.setIsEscaped(false)
|
||||||
|
|
||||||
val id = registerSupervisorAsRemoteActor
|
val id = registerSupervisorAsRemoteActor
|
||||||
if(id.isDefined)
|
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||||
requestBuilder.setSupervisorUuid(id.get)
|
|
||||||
|
|
||||||
// set the source fields used to reply back to the original sender
|
// set the source fields used to reply back to the original sender
|
||||||
// (i.e. not the remote proxy actor)
|
// (i.e. not the remote proxy actor)
|
||||||
|
|
@ -816,7 +820,7 @@ trait Actor extends TransactionManagement {
|
||||||
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
RemoteProtocolBuilder.setMessage(message, requestBuilder)
|
||||||
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
|
RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, None)
|
||||||
} else {
|
} else {
|
||||||
val invocation = new MessageInvocation(this, message, None, sender, currentTransaction.get)
|
val invocation = new MessageInvocation(this, message, None, sender, transactionSet.get)
|
||||||
if (_isEventBased) {
|
if (_isEventBased) {
|
||||||
_mailbox.add(invocation)
|
_mailbox.add(invocation)
|
||||||
if (_isSuspended) invocation.send
|
if (_isSuspended) invocation.send
|
||||||
|
|
@ -824,12 +828,18 @@ trait Actor extends TransactionManagement {
|
||||||
else
|
else
|
||||||
invocation.send
|
invocation.send
|
||||||
}
|
}
|
||||||
|
clearTransactionSet
|
||||||
}
|
}
|
||||||
|
|
||||||
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||||
message: Any,
|
message: Any,
|
||||||
timeout: Long,
|
timeout: Long,
|
||||||
senderFuture: Option[CompletableFuture]): CompletableFuture = {
|
senderFuture: Option[CompletableFuture]): CompletableFuture = {
|
||||||
|
if (isTransactionSetInScope) {
|
||||||
|
log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
|
||||||
|
getTransactionSetInScope.incParties
|
||||||
|
}
|
||||||
|
|
||||||
if (_remoteAddress.isDefined) {
|
if (_remoteAddress.isDefined) {
|
||||||
val requestBuilder = RemoteRequest.newBuilder
|
val requestBuilder = RemoteRequest.newBuilder
|
||||||
.setId(RemoteRequestIdFactory.nextId)
|
.setId(RemoteRequestIdFactory.nextId)
|
||||||
|
|
@ -843,16 +853,18 @@ trait Actor extends TransactionManagement {
|
||||||
val id = registerSupervisorAsRemoteActor
|
val id = registerSupervisorAsRemoteActor
|
||||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||||
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
|
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
|
||||||
|
clearTransactionSet
|
||||||
if (future.isDefined) future.get
|
if (future.isDefined) future.get
|
||||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||||
} else {
|
} else {
|
||||||
val future = if (senderFuture.isDefined) senderFuture.get
|
val future = if (senderFuture.isDefined) senderFuture.get
|
||||||
else new DefaultCompletableFuture(timeout)
|
else new DefaultCompletableFuture(timeout)
|
||||||
val invocation = new MessageInvocation(this, message, Some(future), None, currentTransaction.get)
|
val invocation = new MessageInvocation(this, message, Some(future), None, transactionSet.get)
|
||||||
if (_isEventBased) {
|
if (_isEventBased) {
|
||||||
_mailbox.add(invocation)
|
_mailbox.add(invocation)
|
||||||
invocation.send
|
invocation.send
|
||||||
} else invocation.send
|
} else invocation.send
|
||||||
|
clearTransactionSet
|
||||||
future
|
future
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -861,6 +873,7 @@ trait Actor extends TransactionManagement {
|
||||||
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
|
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
|
||||||
*/
|
*/
|
||||||
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
|
private[akka] def invoke(messageHandle: MessageInvocation) = synchronized {
|
||||||
|
//log.trace("%s is invoked with message %s", toString, messageHandle)
|
||||||
try {
|
try {
|
||||||
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
|
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
|
||||||
else dispatch(messageHandle)
|
else dispatch(messageHandle)
|
||||||
|
|
@ -872,7 +885,7 @@ trait Actor extends TransactionManagement {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def dispatch[T](messageHandle: MessageInvocation) = {
|
private def dispatch[T](messageHandle: MessageInvocation) = {
|
||||||
setTransaction(messageHandle.tx)
|
setTransactionSet(messageHandle.transactionSet)
|
||||||
|
|
||||||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||||
senderFuture = messageHandle.future
|
senderFuture = messageHandle.future
|
||||||
|
|
@ -894,43 +907,55 @@ trait Actor extends TransactionManagement {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
|
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
|
||||||
setTransaction(messageHandle.tx)
|
var topLevelTransaction = false
|
||||||
|
val txSet: Option[CountDownCommitBarrier] =
|
||||||
|
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
|
||||||
|
else {
|
||||||
|
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
|
||||||
|
if (isTransactionRequiresNew) {
|
||||||
|
log.trace("Creating a new transaction set (top-level transaction) \nfor actor %s \nwith message %s", toString, messageHandle)
|
||||||
|
Some(createNewTransactionSet)
|
||||||
|
} else None
|
||||||
|
}
|
||||||
|
setTransactionSet(txSet)
|
||||||
|
|
||||||
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
val message = messageHandle.message //serializeMessage(messageHandle.message)
|
||||||
senderFuture = messageHandle.future
|
senderFuture = messageHandle.future
|
||||||
sender = messageHandle.sender
|
sender = messageHandle.sender
|
||||||
|
|
||||||
|
def clearTx = {
|
||||||
|
clearTransactionSet
|
||||||
|
clearTransaction
|
||||||
|
}
|
||||||
|
|
||||||
def proceed = {
|
def proceed = {
|
||||||
try {
|
|
||||||
incrementTransaction
|
|
||||||
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
||||||
else throw new IllegalArgumentException(
|
else throw new IllegalArgumentException(
|
||||||
"Actor " + toString + " could not process message [" + message + "]" +
|
toString + " could not process message [" + message + "]" +
|
||||||
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
|
"\n\tsince no matching 'case' clause in its 'receive' method could be found")
|
||||||
} finally {
|
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
|
||||||
decrementTransaction
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isTransactionRequiresNew && !isTransactionInScope) {
|
if (isTransactionRequiresNew) {
|
||||||
if (senderFuture.isEmpty) throw new StmException(
|
|
||||||
"Can't continue transaction in a one-way fire-forget message send" +
|
|
||||||
"\n\tE.g. using Actor '!' method or Active Object 'void' method" +
|
|
||||||
"\n\tPlease use the Actor '!!' method or Active Object method with non-void return type")
|
|
||||||
atomic {
|
atomic {
|
||||||
proceed
|
proceed
|
||||||
}
|
}
|
||||||
} else proceed
|
} else proceed
|
||||||
} catch {
|
} catch {
|
||||||
|
case e: IllegalStateException => {}
|
||||||
case e =>
|
case e =>
|
||||||
|
// abort transaction set
|
||||||
|
if (isTransactionSetInScope) try { getTransactionSetInScope.abort } catch { case e: IllegalStateException => {} }
|
||||||
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
||||||
|
|
||||||
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
||||||
clearTransaction // need to clear currentTransaction before call to supervisor
|
clearTx // need to clear currentTransaction before call to supervisor
|
||||||
|
|
||||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||||
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
||||||
} finally {
|
} finally {
|
||||||
clearTransaction
|
clearTx
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -1042,6 +1067,5 @@ trait Actor extends TransactionManagement {
|
||||||
that.asInstanceOf[Actor]._uuid == _uuid
|
that.asInstanceOf[Actor]._uuid == _uuid
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString(): String = "Actor[" + id + ":" + uuid + "]"
|
override def toString = "Actor[" + id + ":" + uuid + "]"
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -31,10 +31,10 @@ trait BootableActorLoaderService extends Bootable with Logging {
|
||||||
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
|
||||||
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
|
||||||
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
|
new URLClassLoader(toDeploy.toArray, ClassLoader.getSystemClassLoader)
|
||||||
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") ne null) {
|
} else if (getClass.getClassLoader.getResourceAsStream("aop.xml") ne null) {
|
||||||
getClass.getClassLoader
|
getClass.getClassLoader
|
||||||
} else throw new IllegalStateException(
|
} else throw new IllegalStateException(
|
||||||
"AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
|
"AKKA_HOME is not defined and akka-<version>.jar can not be found on the classpath; aborting...")
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -13,6 +13,7 @@ class FutureTimeoutException(message: String) extends RuntimeException(message)
|
||||||
object Futures {
|
object Futures {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* FIXME document
|
||||||
* <pre>
|
* <pre>
|
||||||
* val future = Futures.future(1000) {
|
* val future = Futures.future(1000) {
|
||||||
* ... // do stuff
|
* ... // do stuff
|
||||||
|
|
|
||||||
|
|
@ -7,16 +7,17 @@ package se.scalablesolutions.akka.dispatch
|
||||||
import java.util.List
|
import java.util.List
|
||||||
|
|
||||||
import se.scalablesolutions.akka.util.{HashCode, Logging}
|
import se.scalablesolutions.akka.util.{HashCode, Logging}
|
||||||
import se.scalablesolutions.akka.stm.Transaction
|
|
||||||
import se.scalablesolutions.akka.actor.Actor
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
||||||
import java.util.concurrent.ConcurrentHashMap
|
import java.util.concurrent.ConcurrentHashMap
|
||||||
|
|
||||||
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||||
|
|
||||||
final class MessageInvocation(val receiver: Actor,
|
final class MessageInvocation(val receiver: Actor,
|
||||||
val message: Any,
|
val message: Any,
|
||||||
val future: Option[CompletableFuture],
|
val future: Option[CompletableFuture],
|
||||||
val sender: Option[Actor],
|
val sender: Option[Actor],
|
||||||
val tx: Option[Transaction]) {
|
val transactionSet: Option[CountDownCommitBarrier]) {
|
||||||
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
|
if (receiver eq null) throw new IllegalArgumentException("receiver is null")
|
||||||
|
|
||||||
def invoke = receiver.invoke(this)
|
def invoke = receiver.invoke(this)
|
||||||
|
|
@ -37,13 +38,13 @@ final class MessageInvocation(val receiver: Actor,
|
||||||
that.asInstanceOf[MessageInvocation].message == message
|
that.asInstanceOf[MessageInvocation].message == message
|
||||||
}
|
}
|
||||||
|
|
||||||
override def toString(): String = synchronized {
|
override def toString = synchronized {
|
||||||
"MessageInvocation[" +
|
"MessageInvocation[" +
|
||||||
"\n\tmessage = " + message +
|
"\n\tmessage = " + message +
|
||||||
"\n\treceiver = " + receiver +
|
"\n\treceiver = " + receiver +
|
||||||
"\n\tsender = " + sender +
|
"\n\tsender = " + sender +
|
||||||
"\n\tfuture = " + future +
|
"\n\tfuture = " + future +
|
||||||
"\n\ttx = " + tx +
|
"\n\ttransactionSet = " + transactionSet +
|
||||||
"\n]"
|
"\n]"
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -24,7 +24,6 @@ trait BootableRemoteActorService extends Bootable with Logging {
|
||||||
|
|
||||||
abstract override def onLoad = {
|
abstract override def onLoad = {
|
||||||
if(config.getBool("akka.remote.server.service", true)){
|
if(config.getBool("akka.remote.server.service", true)){
|
||||||
log.info("Starting up Cluster Service")
|
|
||||||
Cluster.start
|
Cluster.start
|
||||||
super.onLoad //Initialize BootableActorLoaderService before remote service
|
super.onLoad //Initialize BootableActorLoaderService before remote service
|
||||||
log.info("Initializing Remote Actors Service...")
|
log.info("Initializing Remote Actors Service...")
|
||||||
|
|
|
||||||
|
|
@ -48,7 +48,15 @@ private[remote] object ClusterActor {
|
||||||
sealed trait ClusterMessage
|
sealed trait ClusterMessage
|
||||||
|
|
||||||
private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
|
private[remote] case class RelayedMessage(actorClassFQN: String, msg: AnyRef) extends ClusterMessage
|
||||||
|
private[remote] case class Message[ADDR_T](sender: ADDR_T, msg: Array[Byte])
|
||||||
|
private[remote] case object PapersPlease extends ClusterMessage
|
||||||
|
private[remote] case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
|
||||||
|
private[remote] case object Block extends ClusterMessage
|
||||||
|
private[remote] case object Unblock extends ClusterMessage
|
||||||
|
private[remote] case class View[ADDR_T](othersPresent: Set[ADDR_T]) extends ClusterMessage
|
||||||
|
private[remote] case class Zombie[ADDR_T](address: ADDR_T) extends ClusterMessage
|
||||||
|
private[remote] case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
||||||
|
private[remote] case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
||||||
private[remote] case class Node(endpoints: List[RemoteAddress])
|
private[remote] case class Node(endpoints: List[RemoteAddress])
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -59,20 +67,8 @@ private[remote] object ClusterActor {
|
||||||
*/
|
*/
|
||||||
abstract class BasicClusterActor extends ClusterActor {
|
abstract class BasicClusterActor extends ClusterActor {
|
||||||
import ClusterActor._
|
import ClusterActor._
|
||||||
|
|
||||||
case class Message(sender : ADDR_T,msg : Array[Byte])
|
|
||||||
case object PapersPlease extends ClusterMessage
|
|
||||||
case class Papers(addresses: List[RemoteAddress]) extends ClusterMessage
|
|
||||||
case object Block extends ClusterMessage
|
|
||||||
case object Unblock extends ClusterMessage
|
|
||||||
case class View(othersPresent : Set[ADDR_T]) extends ClusterMessage
|
|
||||||
case class Zombie(address: ADDR_T) extends ClusterMessage
|
|
||||||
case class RegisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
|
||||||
case class DeregisterLocalNode(server: RemoteAddress) extends ClusterMessage
|
|
||||||
|
|
||||||
type ADDR_T
|
type ADDR_T
|
||||||
|
|
||||||
|
|
||||||
@volatile private var local: Node = Node(Nil)
|
@volatile private var local: Node = Node(Nil)
|
||||||
@volatile private var remotes: Map[ADDR_T, Node] = Map()
|
@volatile private var remotes: Map[ADDR_T, Node] = Map()
|
||||||
|
|
||||||
|
|
@ -85,14 +81,14 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
}
|
}
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case v @ View(members) => {
|
case v: View[ADDR_T] => {
|
||||||
// Not present in the cluster anymore = presumably zombies
|
// Not present in the cluster anymore = presumably zombies
|
||||||
// Nodes we have no prior knowledge existed = unknowns
|
// Nodes we have no prior knowledge existed = unknowns
|
||||||
val zombies = Set[ADDR_T]() ++ remotes.keySet -- members
|
val zombies = Set[ADDR_T]() ++ remotes.keySet -- v.othersPresent
|
||||||
val unknown = members -- remotes.keySet
|
val unknown = v.othersPresent -- remotes.keySet
|
||||||
|
|
||||||
log debug ("Updating view")
|
log debug ("Updating view")
|
||||||
log debug ("Other memebers: [%s]",members)
|
log debug ("Other memebers: [%s]", v.othersPresent)
|
||||||
log debug ("Zombies: [%s]", zombies)
|
log debug ("Zombies: [%s]", zombies)
|
||||||
log debug ("Unknowns: [%s]", unknown)
|
log debug ("Unknowns: [%s]", unknown)
|
||||||
|
|
||||||
|
|
@ -101,10 +97,10 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
remotes = remotes -- zombies
|
remotes = remotes -- zombies
|
||||||
}
|
}
|
||||||
|
|
||||||
case Zombie(x) => { //Ask the presumed zombie for papers and prematurely treat it as dead
|
case z: Zombie[ADDR_T] => { //Ask the presumed zombie for papers and prematurely treat it as dead
|
||||||
log debug ("Killing Zombie Node: %s", x)
|
log debug ("Killing Zombie Node: %s", z.address)
|
||||||
broadcast(x :: Nil, PapersPlease)
|
broadcast(z.address :: Nil, PapersPlease)
|
||||||
remotes = remotes - x
|
remotes = remotes - z.address
|
||||||
}
|
}
|
||||||
|
|
||||||
case rm@RelayedMessage(_, _) => {
|
case rm@RelayedMessage(_, _) => {
|
||||||
|
|
@ -112,7 +108,8 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
broadcast(rm)
|
broadcast(rm)
|
||||||
}
|
}
|
||||||
|
|
||||||
case m @ Message(src,msg) => {
|
case m: Message[ADDR_T] => {
|
||||||
|
val (src, msg) = (m.sender, m.msg)
|
||||||
(Cluster.serializer in (msg, None)) match {
|
(Cluster.serializer in (msg, None)) match {
|
||||||
|
|
||||||
case PapersPlease => {
|
case PapersPlease => {
|
||||||
|
|
@ -206,22 +203,25 @@ abstract class BasicClusterActor extends ClusterActor {
|
||||||
* Loads a specified ClusterActor and delegates to that instance.
|
* Loads a specified ClusterActor and delegates to that instance.
|
||||||
*/
|
*/
|
||||||
object Cluster extends Cluster with Logging {
|
object Cluster extends Cluster with Logging {
|
||||||
|
lazy val DEFAULT_SERIALIZER_CLASS_NAME = Serializer.Java.getClass.getName
|
||||||
|
|
||||||
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
@volatile private[remote] var clusterActor: Option[ClusterActor] = None
|
||||||
|
|
||||||
|
// FIXME Use the supervisor member field
|
||||||
@volatile private[remote] var supervisor: Option[Supervisor] = None
|
@volatile private[remote] var supervisor: Option[Supervisor] = None
|
||||||
|
|
||||||
private[remote] lazy val serializer: Serializer = {
|
private[remote] lazy val serializer: Serializer =
|
||||||
val className = config.getString("akka.remote.cluster.serializer", Serializer.Java.getClass.getName)
|
Class.forName(config.getString("akka.remote.cluster.serializer", DEFAULT_SERIALIZER_CLASS_NAME))
|
||||||
Class.forName(className).newInstance.asInstanceOf[Serializer]
|
.newInstance.asInstanceOf[Serializer]
|
||||||
}
|
|
||||||
|
|
||||||
private[remote] def createClusterActor: Option[ClusterActor] = {
|
private[remote] def createClusterActor: Option[ClusterActor] = {
|
||||||
val name = config.getString("akka.remote.cluster.actor")
|
val name = config.getString("akka.remote.cluster.actor")
|
||||||
|
if (name.isEmpty) throw new IllegalArgumentException(
|
||||||
|
"Can't start cluster since the 'akka.remote.cluster.actor' configuration option is not defined")
|
||||||
try {
|
try {
|
||||||
name map { fqn =>
|
name map {
|
||||||
val a = Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
fqn =>
|
||||||
a.start
|
Class.forName(fqn).newInstance.asInstanceOf[ClusterActor]
|
||||||
a
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
catch {
|
catch {
|
||||||
|
|
@ -235,7 +235,6 @@ object Cluster extends Cluster with Logging {
|
||||||
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
|
RestartStrategy(OneForOne, 5, 1000, List(classOf[Exception])),
|
||||||
Supervise(actor, LifeCycle(Permanent)) :: Nil)
|
Supervise(actor, LifeCycle(Permanent)) :: Nil)
|
||||||
).newInstance
|
).newInstance
|
||||||
sup.start
|
|
||||||
Some(sup)
|
Some(sup)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -253,16 +252,19 @@ object Cluster extends Cluster with Logging {
|
||||||
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
|
def foreach(f: (RemoteAddress) => Unit): Unit = clusterActor.foreach(_.foreach(f))
|
||||||
|
|
||||||
def start: Unit = synchronized {
|
def start: Unit = synchronized {
|
||||||
|
log.info("Starting up Cluster Service...")
|
||||||
if (supervisor.isEmpty) {
|
if (supervisor.isEmpty) {
|
||||||
for (actor <- createClusterActor;
|
for (actor <- createClusterActor;
|
||||||
sup <- createSupervisor(actor)) {
|
sup <- createSupervisor(actor)) {
|
||||||
clusterActor = Some(actor)
|
clusterActor = Some(actor)
|
||||||
supervisor = Some(sup)
|
supervisor = Some(sup)
|
||||||
|
sup.start
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def shutdown: Unit = synchronized {
|
def shutdown: Unit = synchronized {
|
||||||
|
log.info("Shutting down Cluster Service...")
|
||||||
supervisor.foreach(_.stop)
|
supervisor.foreach(_.stop)
|
||||||
supervisor = None
|
supervisor = None
|
||||||
clusterActor = None
|
clusterActor = None
|
||||||
|
|
|
||||||
|
|
@ -58,7 +58,7 @@ object RemoteNode extends RemoteServer
|
||||||
*/
|
*/
|
||||||
object RemoteServer {
|
object RemoteServer {
|
||||||
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
val HOSTNAME = config.getString("akka.remote.server.hostname", "localhost")
|
||||||
val PORT = config.getInt("akka.remote.server.port", 9999)
|
val PORT = config.getInt("akka.remote.server.port", 9966)
|
||||||
|
|
||||||
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
|
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.server.connection-timeout", 1000)
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -7,15 +7,18 @@ package se.scalablesolutions.akka.stm
|
||||||
import java.util.concurrent.atomic.AtomicLong
|
import java.util.concurrent.atomic.AtomicLong
|
||||||
import java.util.concurrent.atomic.AtomicInteger
|
import java.util.concurrent.atomic.AtomicInteger
|
||||||
|
|
||||||
|
import scala.collection.mutable.HashMap
|
||||||
|
|
||||||
import se.scalablesolutions.akka.state.Committable
|
import se.scalablesolutions.akka.state.Committable
|
||||||
import se.scalablesolutions.akka.util.Logging
|
import se.scalablesolutions.akka.util.Logging
|
||||||
|
|
||||||
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||||
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
|
||||||
import org.multiverse.api.ThreadLocalTransaction._
|
import org.multiverse.api.ThreadLocalTransaction._
|
||||||
import org.multiverse.templates.OrElseTemplate
|
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
|
||||||
|
import org.multiverse.utils.backoff.ExponentialBackoffPolicy
|
||||||
import scala.collection.mutable.HashMap
|
import org.multiverse.stms.alpha.AlphaStm
|
||||||
|
import java.util.concurrent.TimeUnit
|
||||||
|
|
||||||
class NoTransactionInScopeException extends RuntimeException
|
class NoTransactionInScopeException extends RuntimeException
|
||||||
class TransactionRetryException(message: String) extends RuntimeException(message)
|
class TransactionRetryException(message: String) extends RuntimeException(message)
|
||||||
|
|
@ -97,101 +100,87 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
*/
|
*/
|
||||||
object Transaction extends TransactionManagement {
|
object Transaction extends TransactionManagement {
|
||||||
val idFactory = new AtomicLong(-1L)
|
val idFactory = new AtomicLong(-1L)
|
||||||
|
/*
|
||||||
|
import AlphaStm._
|
||||||
|
private val defaultTxBuilder = new AlphaTransactionFactoryBuilder
|
||||||
|
defaultTxBuilder.setReadonly(false)
|
||||||
|
defaultTxBuilder.setInterruptible(INTERRUPTIBLE)
|
||||||
|
defaultTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
|
||||||
|
defaultTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
|
||||||
|
defaultTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
|
||||||
|
defaultTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
|
||||||
|
defaultTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
|
||||||
|
private val readOnlyTxBuilder = new AlphaStm.AlphaTransactionFactoryBuilder
|
||||||
|
readOnlyTxBuilder.setReadonly(true)
|
||||||
|
readOnlyTxBuilder.setInterruptible(INTERRUPTIBLE)
|
||||||
|
readOnlyTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
|
||||||
|
readOnlyTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
|
||||||
|
readOnlyTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
|
||||||
|
readOnlyTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
|
||||||
|
readOnlyTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
|
||||||
|
*/
|
||||||
|
/**
|
||||||
|
* See ScalaDoc on class.
|
||||||
|
*/
|
||||||
|
def map[T](f: => T)(implicit transactionFamilyName: String): T =
|
||||||
|
atomic {f}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def map[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
|
def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
|
||||||
|
atomic {f}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def flatMap[T](f: Transaction => T)(implicit transactionFamilyName: String): T = atomic { f(getTransactionInScope) }
|
def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
|
||||||
|
atomic {f}
|
||||||
/**
|
|
||||||
* See ScalaDoc on class.
|
|
||||||
*/
|
|
||||||
def foreach(f: Transaction => Unit)(implicit transactionFamilyName: String): Unit = atomic { f(getTransactionInScope) }
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
|
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
|
||||||
* such as persistence etc.
|
* such as persistence etc.
|
||||||
* Only for internal usage.
|
* Only for internal usage.
|
||||||
*/
|
*/
|
||||||
private[akka] def pureAtomic[T](body: => T): T = new AtomicTemplate[T](
|
private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() {
|
||||||
getGlobalStmInstance, "internal", false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
|
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
}.execute()
|
}.execute()
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on class.
|
||||||
*/
|
*/
|
||||||
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = new AtomicTemplate[T](
|
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
|
||||||
getGlobalStmInstance, transactionFamilyName, false, false, TransactionManagement.MAX_NR_OF_RETRIES) {
|
// defaultTxBuilder.setFamilyName(transactionFamilyName)
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
// new TransactionTemplate[T](defaultTxBuilder.build) {
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
new TransactionTemplate[T]() { // FIXME take factory
|
||||||
|
def execute(mtx: MultiverseTransaction): T = {
|
||||||
|
val result = body
|
||||||
|
|
||||||
|
log.trace("Committing transaction [%s] \nwith family name [%s] \nby joining transaction set")
|
||||||
|
getTransactionSetInScope.joinCommit(mtx)
|
||||||
|
|
||||||
|
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||||
|
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||||
|
|
||||||
|
clearTransaction
|
||||||
|
result
|
||||||
|
}
|
||||||
|
|
||||||
|
override def onStart(mtx: MultiverseTransaction) = {
|
||||||
|
val txSet = if (!isTransactionSetInScope) createNewTransactionSet
|
||||||
|
else getTransactionSetInScope
|
||||||
val tx = new Transaction
|
val tx = new Transaction
|
||||||
tx.transaction = Some(mtx)
|
tx.transaction = Some(mtx)
|
||||||
setTransaction(Some(tx))
|
setTransaction(Some(tx))
|
||||||
}
|
|
||||||
override def postCommit = {
|
txSet.registerOnCommitTask(new Runnable() {
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
def run = tx.commit
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
})
|
||||||
|
txSet.registerOnAbortTask(new Runnable() {
|
||||||
|
def run = tx.abort
|
||||||
|
})
|
||||||
}
|
}
|
||||||
}.execute()
|
}.execute()
|
||||||
|
|
||||||
/**
|
|
||||||
* See ScalaDoc on class.
|
|
||||||
*/
|
|
||||||
def atomic[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
|
|
||||||
new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, false, retryCount) {
|
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
val tx = new Transaction
|
|
||||||
tx.transaction = Some(mtx)
|
|
||||||
setTransaction(Some(tx))
|
|
||||||
}
|
|
||||||
override def postCommit = {
|
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
|
||||||
}
|
|
||||||
}.execute
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See ScalaDoc on class.
|
|
||||||
*/
|
|
||||||
def atomicReadOnly[T](retryCount: Int)(body: => T)(implicit transactionFamilyName: String): T = {
|
|
||||||
new AtomicTemplate[T](getGlobalStmInstance, transactionFamilyName, false, true, retryCount) {
|
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
val tx = new Transaction
|
|
||||||
tx.transaction = Some(mtx)
|
|
||||||
setTransaction(Some(tx))
|
|
||||||
}
|
|
||||||
override def postCommit = {
|
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
|
||||||
}
|
|
||||||
}.execute
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See ScalaDoc on class.
|
|
||||||
*/
|
|
||||||
def atomicReadOnly[T](body: => T): T = {
|
|
||||||
new AtomicTemplate[T](true) {
|
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
|
||||||
override def postStart(mtx: MultiverseTransaction) = {
|
|
||||||
val tx = new Transaction
|
|
||||||
tx.transaction = Some(mtx)
|
|
||||||
setTransaction(Some(tx))
|
|
||||||
}
|
|
||||||
override def postCommit = {
|
|
||||||
if (isTransactionInScope) getTransactionInScope.commit
|
|
||||||
else throw new IllegalStateException("No transaction in scope")
|
|
||||||
}
|
|
||||||
}.execute
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -217,6 +206,7 @@ object Transaction extends TransactionManagement {
|
||||||
@serializable class Transaction extends Logging {
|
@serializable class Transaction extends Logging {
|
||||||
import Transaction._
|
import Transaction._
|
||||||
|
|
||||||
|
log.trace("Creating %s", toString)
|
||||||
val id = Transaction.idFactory.incrementAndGet
|
val id = Transaction.idFactory.incrementAndGet
|
||||||
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
|
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
|
||||||
private[akka] var transaction: Option[MultiverseTransaction] = None
|
private[akka] var transaction: Option[MultiverseTransaction] = None
|
||||||
|
|
@ -226,13 +216,17 @@ object Transaction extends TransactionManagement {
|
||||||
// --- public methods ---------
|
// --- public methods ---------
|
||||||
|
|
||||||
def commit = synchronized {
|
def commit = synchronized {
|
||||||
|
log.trace("Committing transaction %s", toString)
|
||||||
pureAtomic {
|
pureAtomic {
|
||||||
persistentStateMap.values.foreach(_.commit)
|
persistentStateMap.values.foreach(_.commit)
|
||||||
TransactionManagement.clearTransaction
|
|
||||||
}
|
}
|
||||||
status = TransactionStatus.Completed
|
status = TransactionStatus.Completed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
def abort = synchronized {
|
||||||
|
log.trace("Aborting transaction %s", toString)
|
||||||
|
}
|
||||||
|
|
||||||
def isNew = synchronized { status == TransactionStatus.New }
|
def isNew = synchronized { status == TransactionStatus.New }
|
||||||
|
|
||||||
def isActive = synchronized { status == TransactionStatus.Active }
|
def isActive = synchronized { status == TransactionStatus.Active }
|
||||||
|
|
@ -284,7 +278,7 @@ object Transaction extends TransactionManagement {
|
||||||
|
|
||||||
override def hashCode(): Int = synchronized { id.toInt }
|
override def hashCode(): Int = synchronized { id.toInt }
|
||||||
|
|
||||||
override def toString(): String = synchronized { "Transaction[" + id + ", " + status + "]" }
|
override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -9,51 +9,80 @@ import java.util.concurrent.atomic.AtomicBoolean
|
||||||
import se.scalablesolutions.akka.util.Logging
|
import se.scalablesolutions.akka.util.Logging
|
||||||
|
|
||||||
import org.multiverse.api.ThreadLocalTransaction._
|
import org.multiverse.api.ThreadLocalTransaction._
|
||||||
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||||
|
|
||||||
class StmException(msg: String) extends RuntimeException(msg)
|
class StmException(msg: String) extends RuntimeException(msg)
|
||||||
|
|
||||||
class TransactionAwareWrapperException(
|
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
|
||||||
val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
|
override def toString = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
|
||||||
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
|
|
||||||
}
|
}
|
||||||
|
|
||||||
object TransactionManagement extends TransactionManagement {
|
object TransactionManagement extends TransactionManagement {
|
||||||
import se.scalablesolutions.akka.Config._
|
import se.scalablesolutions.akka.Config._
|
||||||
|
|
||||||
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 100)
|
|
||||||
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
|
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
|
||||||
|
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
|
||||||
|
val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", true)
|
||||||
|
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 1000)
|
||||||
|
val TRANSACTION_TIMEOUT = config.getInt("akka.stm.timeout", 10000)
|
||||||
|
val SMART_TX_LENGTH_SELECTOR = config.getBool("akka.stm.smart-tx-length-selector", true)
|
||||||
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
|
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
|
||||||
|
|
||||||
def disableTransactions = TRANSACTION_ENABLED.set(false)
|
def disableTransactions = TRANSACTION_ENABLED.set(false)
|
||||||
|
|
||||||
private[akka] val currentTransaction: ThreadLocal[Option[Transaction]] = new ThreadLocal[Option[Transaction]]() {
|
private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
|
||||||
|
override protected def initialValue: Option[CountDownCommitBarrier] = None
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] val transaction = new ThreadLocal[Option[Transaction]]() {
|
||||||
override protected def initialValue: Option[Transaction] = None
|
override protected def initialValue: Option[Transaction] = None
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[akka] def getTransactionSet: CountDownCommitBarrier = {
|
||||||
|
val option = transactionSet.get
|
||||||
|
if ((option eq null) || option.isEmpty) throw new IllegalStateException("No TransactionSet in scope")
|
||||||
|
option.get
|
||||||
|
}
|
||||||
|
|
||||||
|
private[akka] def getTransaction: Transaction = {
|
||||||
|
val option = transaction.get
|
||||||
|
if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction in scope")
|
||||||
|
option.get
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait TransactionManagement extends Logging {
|
trait TransactionManagement extends Logging {
|
||||||
import TransactionManagement.currentTransaction
|
|
||||||
|
|
||||||
private[akka] def createNewTransaction = currentTransaction.set(Some(new Transaction))
|
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
|
||||||
|
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
|
||||||
private[akka] def setTransaction(transaction: Option[Transaction]) = if (transaction.isDefined) {
|
TransactionManagement.transactionSet.set(Some(txSet))
|
||||||
val tx = transaction.get
|
txSet
|
||||||
currentTransaction.set(transaction)
|
|
||||||
if (tx.transaction.isDefined) setThreadLocalTransaction(tx.transaction.get)
|
|
||||||
else throw new IllegalStateException("No transaction defined")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[akka] def setTransactionSet(txSet: Option[CountDownCommitBarrier]) =
|
||||||
|
if (txSet.isDefined) TransactionManagement.transactionSet.set(txSet)
|
||||||
|
|
||||||
|
private[akka] def setTransaction(tx: Option[Transaction]) =
|
||||||
|
if (tx.isDefined) TransactionManagement.transaction.set(tx)
|
||||||
|
|
||||||
|
private[akka] def clearTransactionSet = TransactionManagement.transactionSet.set(None)
|
||||||
|
|
||||||
private[akka] def clearTransaction = {
|
private[akka] def clearTransaction = {
|
||||||
currentTransaction.set(None)
|
TransactionManagement.transaction.set(None)
|
||||||
setThreadLocalTransaction(null)
|
setThreadLocalTransaction(null)
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def getTransactionInScope = currentTransaction.get.get
|
private[akka] def getTransactionSetInScope = TransactionManagement.getTransactionSet
|
||||||
|
|
||||||
private[akka] def isTransactionInScope = currentTransaction.get.isDefined
|
private[akka] def getTransactionInScope = TransactionManagement.getTransaction
|
||||||
|
|
||||||
private[akka] def incrementTransaction = if (isTransactionInScope) getTransactionInScope.increment
|
private[akka] def isTransactionSetInScope = {
|
||||||
|
val option = TransactionManagement.transactionSet.get
|
||||||
private[akka] def decrementTransaction = if (isTransactionInScope) getTransactionInScope.decrement
|
(option ne null) && option.isDefined
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private[akka] def isTransactionInScope = {
|
||||||
|
val option = TransactionManagement.transaction.get
|
||||||
|
(option ne null) && option.isDefined
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -8,8 +8,7 @@ import se.scalablesolutions.akka.stm.Transaction.atomic
|
||||||
import se.scalablesolutions.akka.stm.NoTransactionInScopeException
|
import se.scalablesolutions.akka.stm.NoTransactionInScopeException
|
||||||
import se.scalablesolutions.akka.collection._
|
import se.scalablesolutions.akka.collection._
|
||||||
import se.scalablesolutions.akka.util.UUID
|
import se.scalablesolutions.akka.util.UUID
|
||||||
|
import org.multiverse.stms.alpha.AlphaRef
|
||||||
import org.multiverse.datastructures.refs.manual.Ref;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Example Scala usage:
|
* Example Scala usage:
|
||||||
|
|
@ -78,7 +77,7 @@ class TransactionalRef[T] extends Transactional {
|
||||||
implicit val txInitName = "TransactionalRef:Init"
|
implicit val txInitName = "TransactionalRef:Init"
|
||||||
val uuid = UUID.newUuid.toString
|
val uuid = UUID.newUuid.toString
|
||||||
|
|
||||||
private[this] val ref: Ref[T] = atomic { new Ref }
|
private[this] lazy val ref: AlphaRef[T] = new AlphaRef
|
||||||
|
|
||||||
def swap(elem: T) = {
|
def swap(elem: T) = {
|
||||||
ensureIsInTransaction
|
ensureIsInTransaction
|
||||||
|
|
|
||||||
|
|
@ -23,7 +23,7 @@ case class SuccessOneWay(key: String, value: String)
|
||||||
case class FailureOneWay(key: String, value: String, failer: Actor)
|
case class FailureOneWay(key: String, value: String, failer: Actor)
|
||||||
|
|
||||||
class InMemStatefulActor extends Actor {
|
class InMemStatefulActor extends Actor {
|
||||||
timeout = 100000
|
timeout = 5000
|
||||||
makeTransactionRequired
|
makeTransactionRequired
|
||||||
|
|
||||||
private lazy val mapState = TransactionalState.newMap[String, String]
|
private lazy val mapState = TransactionalState.newMap[String, String]
|
||||||
|
|
@ -86,8 +86,8 @@ class InMemFailerActor extends Actor {
|
||||||
}
|
}
|
||||||
|
|
||||||
class InMemoryActorTest extends JUnitSuite {
|
class InMemoryActorTest extends JUnitSuite {
|
||||||
|
import Actor.Sender.Self
|
||||||
|
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def shouldOneWayMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -98,7 +98,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def shouldMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -107,7 +107,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||||
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
assert("new state" === (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
def shouldOneWayMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -120,7 +120,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
def shouldMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -134,7 +134,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
} catch {case e: RuntimeException => {}}
|
} catch {case e: RuntimeException => {}}
|
||||||
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
assert("init" === (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def shouldOneWayVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -145,7 +145,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert(2 === (stateful !! GetVectorSize).get)
|
assert(2 === (stateful !! GetVectorSize).get)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def shouldVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -154,7 +154,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||||
assert(2 === (stateful !! GetVectorSize).get)
|
assert(2 === (stateful !! GetVectorSize).get)
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
def shouldOneWayVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -167,7 +167,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert(1 === (stateful !! GetVectorSize).get)
|
assert(1 === (stateful !! GetVectorSize).get)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
def shouldVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -181,7 +181,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
} catch {case e: RuntimeException => {}}
|
} catch {case e: RuntimeException => {}}
|
||||||
assert(1 === (stateful !! GetVectorSize).get)
|
assert(1 === (stateful !! GetVectorSize).get)
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def shouldOneWayRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -192,7 +192,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert("new state" === (stateful !! GetRefState).get)
|
assert("new state" === (stateful !! GetRefState).get)
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def shouldRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -201,7 +201,7 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
|
||||||
assert("new state" === (stateful !! GetRefState).get)
|
assert("new state" === (stateful !! GetRefState).get)
|
||||||
}
|
}
|
||||||
/*
|
|
||||||
@Test
|
@Test
|
||||||
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
def shouldOneWayRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
@ -212,9 +212,9 @@ class InMemoryActorTest extends JUnitSuite {
|
||||||
failer.start
|
failer.start
|
||||||
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
stateful ! FailureOneWay("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactionrequired method
|
||||||
Thread.sleep(1000)
|
Thread.sleep(1000)
|
||||||
assert("init" === (stateful !! GetRefState).get) // check that state is == init state
|
assert("init" === (stateful !! (GetRefState, 1000000)).get) // check that state is == init state
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
@Test
|
@Test
|
||||||
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
def shouldRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
|
||||||
val stateful = new InMemStatefulActor
|
val stateful = new InMemStatefulActor
|
||||||
|
|
|
||||||
|
|
@ -22,9 +22,11 @@ class MemoryFootprintTest extends JUnitSuite {
|
||||||
// Actors are put in AspectRegistry when created so they won't be GCd here
|
// Actors are put in AspectRegistry when created so they won't be GCd here
|
||||||
|
|
||||||
val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
|
val totalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
|
||||||
|
println("Memory before " + totalMem)
|
||||||
(1 until NR_OF_ACTORS).foreach(i => new Mem)
|
(1 until NR_OF_ACTORS).foreach(i => new Mem)
|
||||||
|
|
||||||
val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
|
val newTotalMem = Runtime.getRuntime.totalMemory - Runtime.getRuntime.freeMemory
|
||||||
|
println("Memory aftor " + newTotalMem)
|
||||||
val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS
|
val memPerActor = (newTotalMem - totalMem) / NR_OF_ACTORS
|
||||||
|
|
||||||
println("Memory footprint per actor is : " + memPerActor)
|
println("Memory footprint per actor is : " + memPerActor)
|
||||||
|
|
|
||||||
|
|
@ -1,4 +1,4 @@
|
||||||
package test
|
package se.scalablesolutions.akka
|
||||||
|
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
|
|
@ -279,7 +279,7 @@ class PerformanceTest extends JUnitSuite {
|
||||||
|
|
||||||
var nrOfMessages = 2000000
|
var nrOfMessages = 2000000
|
||||||
var nrOfActors = 4
|
var nrOfActors = 4
|
||||||
var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 20)
|
var akkaTime = stressTestAkkaActors(nrOfMessages, nrOfActors, 1000 * 30)
|
||||||
var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40)
|
var scalaTime = stressTestScalaActors(nrOfMessages, nrOfActors, 1000 * 40)
|
||||||
var ratio: Double = scalaTime.toDouble / akkaTime.toDouble
|
var ratio: Double = scalaTime.toDouble / akkaTime.toDouble
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -2,9 +2,8 @@ package se.scalablesolutions.akka.remote
|
||||||
|
|
||||||
import se.scalablesolutions.akka.actor.Actor
|
import se.scalablesolutions.akka.actor.Actor
|
||||||
|
|
||||||
object ActorShutdownSpec {
|
object ActorShutdownRunner {
|
||||||
def main(args: Array[String]) {
|
def main(args: Array[String]) {
|
||||||
|
|
||||||
class MyActor extends Actor {
|
class MyActor extends Actor {
|
||||||
def receive = {
|
def receive = {
|
||||||
case "test" => println("received test")
|
case "test" => println("received test")
|
||||||
|
|
@ -22,7 +21,7 @@ object ActorShutdownSpec {
|
||||||
|
|
||||||
// case 2
|
// case 2
|
||||||
|
|
||||||
object RemoteServerAndClusterShutdownSpec {
|
object RemoteServerAndClusterShutdownRunner {
|
||||||
def main(args: Array[String]) {
|
def main(args: Array[String]) {
|
||||||
val s1 = new RemoteServer
|
val s1 = new RemoteServer
|
||||||
val s2 = new RemoteServer
|
val s2 = new RemoteServer
|
||||||
|
|
|
||||||
|
|
@ -11,7 +11,7 @@ import static se.scalablesolutions.akka.config.JavaConfig.*;
|
||||||
import se.scalablesolutions.akka.actor.*;
|
import se.scalablesolutions.akka.actor.*;
|
||||||
import se.scalablesolutions.akka.Kernel;
|
import se.scalablesolutions.akka.Kernel;
|
||||||
import junit.framework.TestCase;
|
import junit.framework.TestCase;
|
||||||
/*
|
|
||||||
public class InMemNestedStateTest extends TestCase {
|
public class InMemNestedStateTest extends TestCase {
|
||||||
static String messageLog = "";
|
static String messageLog = "";
|
||||||
|
|
||||||
|
|
@ -133,4 +133,3 @@ public class InMemNestedStateTest extends TestCase {
|
||||||
assertEquals("init", nested.getRefState()); // check that state is == init state
|
assertEquals("init", nested.getRefState()); // check that state is == init state
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
*/
|
|
||||||
|
|
@ -50,11 +50,11 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
|
||||||
* Periodically handles incoming messages
|
* Periodically handles incoming messages
|
||||||
*/
|
*/
|
||||||
def receive = {
|
def receive = {
|
||||||
case FunctionHolder(fun: (T => T)) => atomic { updateData(fun(value.getOrWait)) }
|
case FunctionHolder(fun: (T => T)) => updateData(fun(value.getOrWait))
|
||||||
|
|
||||||
case ValueHolder(x: T) => updateData(x)
|
case ValueHolder(x: T) => updateData(x)
|
||||||
|
|
||||||
case ProcedureHolder(fun: (T => Unit)) => atomic { fun(copyStrategy(value.getOrWait)) }
|
case ProcedureHolder(fun: (T => Unit)) => fun(copyStrategy(value.getOrWait))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -66,7 +66,7 @@ sealed class Agent[T] private (initialValue: T) extends Actor {
|
||||||
/**
|
/**
|
||||||
* Updates the internal state with the value provided as a by-name parameter
|
* Updates the internal state with the value provided as a by-name parameter
|
||||||
*/
|
*/
|
||||||
private final def updateData(newData: => T) : Unit = atomic { value.swap(newData) }
|
private final def updateData(newData: => T): Unit = value.swap(newData)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits a request to read the internal state.
|
* Submits a request to read the internal state.
|
||||||
|
|
|
||||||
|
|
@ -30,6 +30,7 @@ object Patterns {
|
||||||
|
|
||||||
def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher {
|
def dispatcherActor(routing: PF[Any, Actor], msgTransformer: (Any) => Any): Actor = new Actor with Dispatcher {
|
||||||
override def transform(msg: Any) = msgTransformer(msg)
|
override def transform(msg: Any) = msgTransformer(msg)
|
||||||
|
|
||||||
def routes = routing
|
def routes = routing
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -43,9 +44,11 @@ object Patterns {
|
||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
trait Dispatcher { self : Actor =>
|
trait Dispatcher {
|
||||||
|
self: Actor =>
|
||||||
|
|
||||||
protected def transform(msg: Any): Any = msg
|
protected def transform(msg: Any): Any = msg
|
||||||
|
|
||||||
protected def routes: PartialFunction[Any, Actor]
|
protected def routes: PartialFunction[Any, Actor]
|
||||||
|
|
||||||
protected def dispatch: PartialFunction[Any, Unit] = {
|
protected def dispatch: PartialFunction[Any, Unit] = {
|
||||||
|
|
@ -60,7 +63,8 @@ trait Dispatcher { self : Actor =>
|
||||||
def receive = dispatch
|
def receive = dispatch
|
||||||
}
|
}
|
||||||
|
|
||||||
trait LoadBalancer extends Dispatcher { self : Actor =>
|
trait LoadBalancer extends Dispatcher {
|
||||||
|
self: Actor =>
|
||||||
protected def seq: InfiniteIterator[Actor]
|
protected def seq: InfiniteIterator[Actor]
|
||||||
|
|
||||||
protected def routes = {case x if seq.hasNext => seq.next}
|
protected def routes = {case x if seq.hasNext => seq.next}
|
||||||
|
|
@ -70,7 +74,9 @@ trait InfiniteIterator[T] extends Iterator[T]
|
||||||
|
|
||||||
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
class CyclicIterator[T](items: List[T]) extends InfiniteIterator[T] {
|
||||||
@volatile private[this] var current: List[T] = items
|
@volatile private[this] var current: List[T] = items
|
||||||
|
|
||||||
def hasNext = items != Nil
|
def hasNext = items != Nil
|
||||||
|
|
||||||
def next = {
|
def next = {
|
||||||
val nc = if (current == Nil) items else current
|
val nc = if (current == Nil) items else current
|
||||||
current = nc.tail
|
current = nc.tail
|
||||||
|
|
|
||||||
|
|
@ -21,12 +21,12 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
|
||||||
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
|
val (testMsg1,testMsg2,testMsg3,testMsg4) = ("test1","test2","test3","test4")
|
||||||
|
|
||||||
var targetOk = 0
|
var targetOk = 0
|
||||||
val t1 = actor() receive {
|
val t1: Actor = actor {
|
||||||
case `testMsg1` => targetOk += 2
|
case `testMsg1` => targetOk += 2
|
||||||
case `testMsg2` => targetOk += 4
|
case `testMsg2` => targetOk += 4
|
||||||
}
|
}
|
||||||
|
|
||||||
val t2 = actor() receive {
|
val t2: Actor = actor {
|
||||||
case `testMsg3` => targetOk += 8
|
case `testMsg3` => targetOk += 8
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -48,7 +48,7 @@ class ActorPatternsTest extends junit.framework.TestCase with Suite with MustMat
|
||||||
@Test def testLogger = verify(new TestActor {
|
@Test def testLogger = verify(new TestActor {
|
||||||
def test = {
|
def test = {
|
||||||
val msgs = new HashSet[Any]
|
val msgs = new HashSet[Any]
|
||||||
val t1 = actor() receive {
|
val t1: Actor = actor {
|
||||||
case _ =>
|
case _ =>
|
||||||
}
|
}
|
||||||
val l = loggerActor(t1,(x) => msgs += x)
|
val l = loggerActor(t1,(x) => msgs += x)
|
||||||
|
|
|
||||||
|
|
@ -7,10 +7,13 @@ import org.scalatest.junit.JUnitRunner
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
import org.junit.{Test}
|
import org.junit.{Test}
|
||||||
|
|
||||||
|
/*
|
||||||
@RunWith(classOf[JUnitRunner])
|
@RunWith(classOf[JUnitRunner])
|
||||||
class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
|
class AgentTest extends junit.framework.TestCase with Suite with MustMatchers with ActorTestUtil with Logging {
|
||||||
|
|
||||||
@Test def testAgent = verify(new TestActor {
|
@Test def testAgent = verify(new TestActor {
|
||||||
def test = {
|
def test = {
|
||||||
|
atomic {
|
||||||
val t = Agent(5)
|
val t = Agent(5)
|
||||||
handle(t) {
|
handle(t) {
|
||||||
t.update(_ + 1)
|
t.update(_ + 1)
|
||||||
|
|
@ -20,5 +23,7 @@ class AgentTest extends junit.framework.TestCase with Suite with MustMatchers wi
|
||||||
r must be(12)
|
r must be(12)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
|
|
|
||||||
|
|
@ -1,13 +1,9 @@
|
||||||
package se.scalablesolutions.akka.state
|
package se.scalablesolutions.akka.state
|
||||||
|
|
||||||
import se.scalablesolutions.akka.actor.Actor
|
import se.scalablesolutions.akka.actor.{Actor, Transactor}
|
||||||
|
|
||||||
import junit.framework.TestCase
|
|
||||||
|
|
||||||
import org.junit.Test
|
import org.junit.Test
|
||||||
import org.junit.Assert._
|
import org.junit.Assert._
|
||||||
import org.apache.cassandra.service.CassandraDaemon
|
|
||||||
import org.junit.BeforeClass
|
|
||||||
import org.junit.Before
|
import org.junit.Before
|
||||||
import org.scalatest.junit.JUnitSuite
|
import org.scalatest.junit.JUnitSuite
|
||||||
|
|
||||||
|
|
@ -28,9 +24,8 @@ case class SetRefStateOneWay(key: String)
|
||||||
case class SuccessOneWay(key: String, value: String)
|
case class SuccessOneWay(key: String, value: String)
|
||||||
case class FailureOneWay(key: String, value: String, failer: Actor)
|
case class FailureOneWay(key: String, value: String, failer: Actor)
|
||||||
|
|
||||||
class CassandraPersistentActor extends Actor {
|
class CassandraPersistentActor extends Transactor {
|
||||||
timeout = 100000
|
timeout = 100000
|
||||||
makeTransactionRequired
|
|
||||||
|
|
||||||
private lazy val mapState = CassandraStorage.newMap
|
private lazy val mapState = CassandraStorage.newMap
|
||||||
private lazy val vectorState = CassandraStorage.newVector
|
private lazy val vectorState = CassandraStorage.newVector
|
||||||
|
|
@ -66,8 +61,7 @@ class CassandraPersistentActor extends Actor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@serializable class PersistentFailerActor extends Actor {
|
@serializable class PersistentFailerActor extends Transactor {
|
||||||
makeTransactionRequired
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Failure" =>
|
case "Failure" =>
|
||||||
throw new RuntimeException("expected")
|
throw new RuntimeException("expected")
|
||||||
|
|
@ -76,8 +70,8 @@ class CassandraPersistentActor extends Actor {
|
||||||
|
|
||||||
class CassandraPersistentActorSpec extends JUnitSuite {
|
class CassandraPersistentActorSpec extends JUnitSuite {
|
||||||
|
|
||||||
@Before
|
//@Before
|
||||||
def startCassandra = EmbeddedCassandraService.start
|
//def startCassandra = EmbeddedCassandraService.start
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
|
||||||
|
|
|
||||||
|
|
@ -4,14 +4,15 @@
|
||||||
|
|
||||||
package se.scalablesolutions.akka.state
|
package se.scalablesolutions.akka.state
|
||||||
|
|
||||||
import se.scalablesolutions.akka.stm.TransactionManagement.currentTransaction
|
import se.scalablesolutions.akka.stm.TransactionManagement.transaction
|
||||||
import se.scalablesolutions.akka.collection._
|
import se.scalablesolutions.akka.collection._
|
||||||
import se.scalablesolutions.akka.util.Logging
|
import se.scalablesolutions.akka.util.Logging
|
||||||
|
|
||||||
import org.codehaus.aspectwerkz.proxy.Uuid
|
// FIXME move to 'stm' package + add message with more info
|
||||||
|
|
||||||
class NoTransactionInScopeException extends RuntimeException
|
class NoTransactionInScopeException extends RuntimeException
|
||||||
|
|
||||||
|
class StorageException(message: String) extends RuntimeException(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Example Scala usage.
|
* Example Scala usage.
|
||||||
* <p/>
|
* <p/>
|
||||||
|
|
@ -64,10 +65,6 @@ trait Storage {
|
||||||
throw new UnsupportedOperationException
|
throw new UnsupportedOperationException
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Implementation of <tt>PersistentMap</tt> for every concrete
|
* Implementation of <tt>PersistentMap</tt> for every concrete
|
||||||
* storage will have the same workflow. This abstracts the workflow.
|
* storage will have the same workflow. This abstracts the workflow.
|
||||||
|
|
@ -162,8 +159,8 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
|
||||||
}
|
}
|
||||||
|
|
||||||
private def register = {
|
private def register = {
|
||||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||||
currentTransaction.get.get.register(uuid, this)
|
transaction.get.get.register(uuid, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -236,8 +233,8 @@ trait PersistentVector[T] extends RandomAccessSeq[T] with Transactional with Com
|
||||||
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
|
def length: Int = storage.getVectorStorageSizeFor(uuid) + newElems.length
|
||||||
|
|
||||||
private def register = {
|
private def register = {
|
||||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||||
currentTransaction.get.get.register(uuid, this)
|
transaction.get.get.register(uuid, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -272,8 +269,8 @@ trait PersistentRef[T] extends Transactional with Committable {
|
||||||
}
|
}
|
||||||
|
|
||||||
private def register = {
|
private def register = {
|
||||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||||
currentTransaction.get.get.register(uuid, this)
|
transaction.get.get.register(uuid, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
@ -397,7 +394,7 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
|
||||||
throw new UnsupportedOperationException("dequeueAll not supported")
|
throw new UnsupportedOperationException("dequeueAll not supported")
|
||||||
|
|
||||||
private def register = {
|
private def register = {
|
||||||
if (currentTransaction.get.isEmpty) throw new NoTransactionInScopeException
|
if (transaction.get.isEmpty) throw new NoTransactionInScopeException
|
||||||
currentTransaction.get.get.register(uuid, this)
|
transaction.get.get.register(uuid, this)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@ import org.junit.Assert._
|
||||||
import _root_.dispatch.json.{JsNumber, JsValue}
|
import _root_.dispatch.json.{JsNumber, JsValue}
|
||||||
import _root_.dispatch.json.Js._
|
import _root_.dispatch.json.Js._
|
||||||
|
|
||||||
import se.scalablesolutions.akka.actor.Actor
|
import se.scalablesolutions.akka.actor.{Transactor, Actor}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A persistent actor based on MongoDB storage.
|
* A persistent actor based on MongoDB storage.
|
||||||
|
|
@ -29,10 +29,10 @@ case class MultiDebit(accountNo: String, amounts: List[BigInt], failer: Actor)
|
||||||
case class Credit(accountNo: String, amount: BigInt)
|
case class Credit(accountNo: String, amount: BigInt)
|
||||||
case object LogSize
|
case object LogSize
|
||||||
|
|
||||||
class BankAccountActor extends Actor {
|
class BankAccountActor extends Transactor {
|
||||||
makeTransactionRequired
|
|
||||||
private val accountState = MongoStorage.newMap
|
private lazy val accountState = MongoStorage.newMap
|
||||||
private val txnLog = MongoStorage.newVector
|
private lazy val txnLog = MongoStorage.newVector
|
||||||
|
|
||||||
def receive: PartialFunction[Any, Unit] = {
|
def receive: PartialFunction[Any, Unit] = {
|
||||||
// check balance
|
// check balance
|
||||||
|
|
@ -91,8 +91,7 @@ class BankAccountActor extends Actor {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@serializable class PersistentFailerActor extends Actor {
|
@serializable class PersistentFailerActor extends Transactor {
|
||||||
makeTransactionRequired
|
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Failure" =>
|
case "Failure" =>
|
||||||
throw new RuntimeException("expected")
|
throw new RuntimeException("expected")
|
||||||
|
|
|
||||||
|
|
@ -72,11 +72,11 @@ private [akka] object RedisStorageBackend extends
|
||||||
* base64(T1):base64("debasish.programming_language") -> "scala"
|
* base64(T1):base64("debasish.programming_language") -> "scala"
|
||||||
* </i>
|
* </i>
|
||||||
*/
|
*/
|
||||||
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]) {
|
def insertMapStorageEntryFor(name: String, key: Array[Byte], value: Array[Byte]): Unit = withErrorHandling {
|
||||||
insertMapStorageEntriesFor(name, List((key, value)))
|
insertMapStorageEntriesFor(name, List((key, value)))
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]) {
|
def insertMapStorageEntriesFor(name: String, entries: List[Tuple2[Array[Byte], Array[Byte]]]): Unit = withErrorHandling {
|
||||||
mset(entries.map(e =>
|
mset(entries.map(e =>
|
||||||
(makeRedisKey(name, e._1), new String(e._2))))
|
(makeRedisKey(name, e._1), new String(e._2))))
|
||||||
}
|
}
|
||||||
|
|
@ -89,22 +89,22 @@ private [akka] object RedisStorageBackend extends
|
||||||
* <li>: is chosen since it cannot appear in base64 encoding charset</li>
|
* <li>: is chosen since it cannot appear in base64 encoding charset</li>
|
||||||
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
|
* <li>both parts of the key need to be based64 encoded since there can be spaces within each of them</li>
|
||||||
*/
|
*/
|
||||||
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = {
|
private [this] def makeRedisKey(name: String, key: Array[Byte]): String = withErrorHandling {
|
||||||
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
|
"%s:%s".format(new String(encode(name.getBytes)), new String(encode(key)))
|
||||||
}
|
}
|
||||||
|
|
||||||
private [this] def makeKeyFromRedisKey(redisKey: String) = {
|
private [this] def makeKeyFromRedisKey(redisKey: String) = withErrorHandling {
|
||||||
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
|
val nk = redisKey.split(':').map{e: String => decode(e.getBytes)}
|
||||||
(nk(0), nk(1))
|
(nk(0), nk(1))
|
||||||
}
|
}
|
||||||
|
|
||||||
private [this] def mset(entries: List[(String, String)]) {
|
private [this] def mset(entries: List[(String, String)]): Unit = withErrorHandling {
|
||||||
entries.foreach {e: (String, String) =>
|
entries.foreach {e: (String, String) =>
|
||||||
db.set(e._1, e._2)
|
db.set(e._1, e._2)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def removeMapStorageFor(name: String): Unit = {
|
def removeMapStorageFor(name: String): Unit = withErrorHandling {
|
||||||
db.keys("%s:*".format(encode(name.getBytes))) match {
|
db.keys("%s:*".format(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -113,18 +113,19 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = {
|
def removeMapStorageFor(name: String, key: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.delete(makeRedisKey(name, key))
|
db.delete(makeRedisKey(name, key))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] =
|
def getMapStorageEntryFor(name: String, key: Array[Byte]): Option[Array[Byte]] = withErrorHandling {
|
||||||
db.get(makeRedisKey(name, key)) match {
|
db.get(makeRedisKey(name, key)) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(new String(key) + " not present")
|
throw new Predef.NoSuchElementException(new String(key) + " not present")
|
||||||
case Some(s) => Some(s.getBytes)
|
case Some(s) => Some(s.getBytes)
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
def getMapStorageSizeFor(name: String): Int = {
|
def getMapStorageSizeFor(name: String): Int = withErrorHandling {
|
||||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||||
case None => 0
|
case None => 0
|
||||||
case Some(keys) =>
|
case Some(keys) =>
|
||||||
|
|
@ -132,7 +133,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = {
|
def getMapStorageFor(name: String): List[(Array[Byte], Array[Byte])] = withErrorHandling {
|
||||||
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
db.keys("%s:*".format(new String(encode(name.getBytes)))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -143,7 +144,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
|
|
||||||
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
|
def getMapStorageRangeFor(name: String, start: Option[Array[Byte]],
|
||||||
finish: Option[Array[Byte]],
|
finish: Option[Array[Byte]],
|
||||||
count: Int): List[(Array[Byte], Array[Byte])] = {
|
count: Int): List[(Array[Byte], Array[Byte])] = withErrorHandling {
|
||||||
|
|
||||||
import scala.collection.immutable.TreeMap
|
import scala.collection.immutable.TreeMap
|
||||||
val wholeSorted =
|
val wholeSorted =
|
||||||
|
|
@ -188,19 +189,19 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertVectorStorageEntryFor(name: String, element: Array[Byte]) {
|
def insertVectorStorageEntryFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.lpush(new String(encode(name.getBytes)), new String(element))
|
db.lpush(new String(encode(name.getBytes)), new String(element))
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]) {
|
def insertVectorStorageEntriesFor(name: String, elements: List[Array[Byte]]): Unit = withErrorHandling {
|
||||||
elements.foreach(insertVectorStorageEntryFor(name, _))
|
elements.foreach(insertVectorStorageEntryFor(name, _))
|
||||||
}
|
}
|
||||||
|
|
||||||
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]) {
|
def updateVectorStorageEntryFor(name: String, index: Int, elem: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.lset(new String(encode(name.getBytes)), index, new String(elem))
|
db.lset(new String(encode(name.getBytes)), index, new String(elem))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = {
|
def getVectorStorageEntryFor(name: String, index: Int): Array[Byte] = withErrorHandling {
|
||||||
db.lindex(new String(encode(name.getBytes)), index) match {
|
db.lindex(new String(encode(name.getBytes)), index) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
throw new Predef.NoSuchElementException(name + " does not have element at " + index)
|
||||||
|
|
@ -208,7 +209,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = {
|
def getVectorStorageRangeFor(name: String, start: Option[Int], finish: Option[Int], count: Int): List[Array[Byte]] = withErrorHandling {
|
||||||
/**
|
/**
|
||||||
* <tt>count</tt> is the max number of results to return. Start with
|
* <tt>count</tt> is the max number of results to return. Start with
|
||||||
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
* <tt>start</tt> or 0 (if <tt>start</tt> is not defined) and go until
|
||||||
|
|
@ -237,11 +238,11 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def insertRefStorageFor(name: String, element: Array[Byte]) {
|
def insertRefStorageFor(name: String, element: Array[Byte]): Unit = withErrorHandling {
|
||||||
db.set(new String(encode(name.getBytes)), new String(element))
|
db.set(new String(encode(name.getBytes)), new String(element))
|
||||||
}
|
}
|
||||||
|
|
||||||
def getRefStorageFor(name: String): Option[Array[Byte]] = {
|
def getRefStorageFor(name: String): Option[Array[Byte]] = withErrorHandling {
|
||||||
db.get(new String(encode(name.getBytes))) match {
|
db.get(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -250,12 +251,13 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// add to the end of the queue
|
// add to the end of the queue
|
||||||
def enqueue(name: String, item: Array[Byte]): Boolean = {
|
def enqueue(name: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.rpush(new String(encode(name.getBytes)), new String(item))
|
db.rpush(new String(encode(name.getBytes)), new String(item))
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
// pop from the front of the queue
|
// pop from the front of the queue
|
||||||
def dequeue(name: String): Option[Array[Byte]] = {
|
def dequeue(name: String): Option[Array[Byte]] = withErrorHandling {
|
||||||
db.lpop(new String(encode(name.getBytes))) match {
|
db.lpop(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -265,7 +267,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// get the size of the queue
|
// get the size of the queue
|
||||||
def size(name: String): Int = {
|
def size(name: String): Int = withErrorHandling {
|
||||||
db.llen(new String(encode(name.getBytes))) match {
|
db.llen(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -275,7 +277,8 @@ private [akka] object RedisStorageBackend extends
|
||||||
|
|
||||||
// return an array of items currently stored in the queue
|
// return an array of items currently stored in the queue
|
||||||
// start is the item to begin, count is how many items to return
|
// start is the item to begin, count is how many items to return
|
||||||
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = count match {
|
def peek(name: String, start: Int, count: Int): List[Array[Byte]] = withErrorHandling {
|
||||||
|
count match {
|
||||||
case 1 =>
|
case 1 =>
|
||||||
db.lindex(new String(encode(name.getBytes)), start) match {
|
db.lindex(new String(encode(name.getBytes)), start) match {
|
||||||
case None =>
|
case None =>
|
||||||
|
|
@ -292,9 +295,10 @@ private [akka] object RedisStorageBackend extends
|
||||||
es.map(_.get.getBytes)
|
es.map(_.get.getBytes)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// completely delete the queue
|
// completely delete the queue
|
||||||
def remove(name: String): Boolean = {
|
def remove(name: String): Boolean = withErrorHandling {
|
||||||
db.delete(new String(encode(name.getBytes))) match {
|
db.delete(new String(encode(name.getBytes))) match {
|
||||||
case Some(1) => true
|
case Some(1) => true
|
||||||
case _ => false
|
case _ => false
|
||||||
|
|
@ -302,7 +306,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// add item to sorted set identified by name
|
// add item to sorted set identified by name
|
||||||
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = {
|
def zadd(name: String, zscore: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
|
db.zadd(new String(encode(name.getBytes)), zscore, new String(item)) match {
|
||||||
case Some(1) => true
|
case Some(1) => true
|
||||||
case _ => false
|
case _ => false
|
||||||
|
|
@ -310,7 +314,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// remove item from sorted set identified by name
|
// remove item from sorted set identified by name
|
||||||
def zrem(name: String, item: Array[Byte]): Boolean = {
|
def zrem(name: String, item: Array[Byte]): Boolean = withErrorHandling {
|
||||||
db.zrem(new String(encode(name.getBytes)), new String(item)) match {
|
db.zrem(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
case Some(1) => true
|
case Some(1) => true
|
||||||
case _ => false
|
case _ => false
|
||||||
|
|
@ -318,7 +322,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
|
|
||||||
// cardinality of the set identified by name
|
// cardinality of the set identified by name
|
||||||
def zcard(name: String): Int = {
|
def zcard(name: String): Int = withErrorHandling {
|
||||||
db.zcard(new String(encode(name.getBytes))) match {
|
db.zcard(new String(encode(name.getBytes))) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -326,7 +330,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def zscore(name: String, item: Array[Byte]): String = {
|
def zscore(name: String, item: Array[Byte]): String = withErrorHandling {
|
||||||
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
|
db.zscore(new String(encode(name.getBytes)), new String(item)) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
throw new Predef.NoSuchElementException(new String(item) + " not present")
|
||||||
|
|
@ -334,7 +338,7 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = {
|
def zrange(name: String, start: Int, end: Int): List[Array[Byte]] = withErrorHandling {
|
||||||
db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
|
db.zrange(new String(encode(name.getBytes)), start.toString, end.toString, RedisClient.ASC, false) match {
|
||||||
case None =>
|
case None =>
|
||||||
throw new Predef.NoSuchElementException(name + " not present")
|
throw new Predef.NoSuchElementException(name + " not present")
|
||||||
|
|
@ -343,5 +347,16 @@ private [akka] object RedisStorageBackend extends
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
def flushDB = db.flushDb
|
def flushDB = withErrorHandling(db.flushDb)
|
||||||
|
|
||||||
|
private def withErrorHandling[T](body: => T): T = {
|
||||||
|
try {
|
||||||
|
body
|
||||||
|
} catch {
|
||||||
|
case e: java.lang.NullPointerException =>
|
||||||
|
throw new StorageException("Could not connect to Redis server")
|
||||||
|
case e =>
|
||||||
|
throw new StorageException("Error in Redis: " + e.getMessage)
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -29,6 +29,7 @@ case object LogSize
|
||||||
class AccountActor extends Transactor {
|
class AccountActor extends Transactor {
|
||||||
private lazy val accountState = RedisStorage.newMap
|
private lazy val accountState = RedisStorage.newMap
|
||||||
private lazy val txnLog = RedisStorage.newVector
|
private lazy val txnLog = RedisStorage.newVector
|
||||||
|
//timeout = 5000
|
||||||
|
|
||||||
def receive = {
|
def receive = {
|
||||||
// check balance
|
// check balance
|
||||||
|
|
@ -86,6 +87,7 @@ class AccountActor extends Transactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
@serializable class PersistentFailerActor extends Transactor {
|
@serializable class PersistentFailerActor extends Transactor {
|
||||||
|
//timeout = 5000
|
||||||
def receive = {
|
def receive = {
|
||||||
case "Failure" =>
|
case "Failure" =>
|
||||||
throw new RuntimeException("expected")
|
throw new RuntimeException("expected")
|
||||||
|
|
@ -138,7 +140,7 @@ class RedisPersistentActorSpec extends TestCase {
|
||||||
bactor.start
|
bactor.start
|
||||||
bactor !! Credit("a-123", 5000)
|
bactor !! Credit("a-123", 5000)
|
||||||
|
|
||||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
|
||||||
|
|
||||||
val failer = new PersistentFailerActor
|
val failer = new PersistentFailerActor
|
||||||
failer.start
|
failer.start
|
||||||
|
|
@ -147,7 +149,7 @@ class RedisPersistentActorSpec extends TestCase {
|
||||||
fail("should throw exception")
|
fail("should throw exception")
|
||||||
} catch { case e: RuntimeException => {}}
|
} catch { case e: RuntimeException => {}}
|
||||||
|
|
||||||
assertEquals(BigInt(5000), (bactor !! Balance("a-123")).get)
|
assertEquals(BigInt(5000), (bactor !! (Balance("a-123"), 5000)).get)
|
||||||
|
|
||||||
// should not count the failed one
|
// should not count the failed one
|
||||||
assertEquals(3, (bactor !! LogSize).get)
|
assertEquals(3, (bactor !! LogSize).get)
|
||||||
|
|
|
||||||
|
|
@ -1,341 +0,0 @@
|
||||||
package se.scalablesolutions.akka.stm;
|
|
||||||
|
|
||||||
import static org.multiverse.api.GlobalStmInstance.getGlobalStmInstance;
|
|
||||||
import org.multiverse.api.Stm;
|
|
||||||
import static org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction;
|
|
||||||
import static org.multiverse.api.ThreadLocalTransaction.setThreadLocalTransaction;
|
|
||||||
import org.multiverse.api.Transaction;
|
|
||||||
import org.multiverse.api.TransactionStatus;
|
|
||||||
import org.multiverse.api.exceptions.CommitFailureException;
|
|
||||||
import org.multiverse.api.exceptions.LoadException;
|
|
||||||
import org.multiverse.api.exceptions.RetryError;
|
|
||||||
import org.multiverse.api.exceptions.TooManyRetriesException;
|
|
||||||
import org.multiverse.templates.AbortedException;
|
|
||||||
import org.multiverse.utils.latches.CheapLatch;
|
|
||||||
import org.multiverse.utils.latches.Latch;
|
|
||||||
|
|
||||||
import static java.lang.String.format;
|
|
||||||
import java.util.logging.Logger;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* A Template that handles the boilerplate code for transactions. A transaction will be placed if none is available
|
|
||||||
* around a section and if all goes right, commits at the end.
|
|
||||||
* <p/>
|
|
||||||
* example:
|
|
||||||
* <pre>
|
|
||||||
* new AtomicTemplate(){
|
|
||||||
* Object execute(Transaction t){
|
|
||||||
* queue.push(1);
|
|
||||||
* return null;
|
|
||||||
* }
|
|
||||||
* }.execute();
|
|
||||||
* </pre>
|
|
||||||
* <p/>
|
|
||||||
* It could also be that the transaction is retried (e.g. caused by optimistic locking failures). This is also a task
|
|
||||||
* for template. In the future this retry behavior will be customizable.
|
|
||||||
* <p/>
|
|
||||||
* If a transaction already is available on the TransactionThreadLocal, no new transaction is started and essentially
|
|
||||||
* the whole AtomicTemplate is ignored.
|
|
||||||
* <p/>
|
|
||||||
* If no transaction is available on the TransactionThreadLocal, a new one will be created and used during the execution
|
|
||||||
* of the AtomicTemplate and will be removed once the AtomicTemplate finishes.
|
|
||||||
* <p/>
|
|
||||||
* All uncaught throwable's lead to a rollback of the transaction.
|
|
||||||
* <p/>
|
|
||||||
* AtomicTemplates are not thread-safe to use.
|
|
||||||
* <p/>
|
|
||||||
* AtomicTemplates can completely work without threadlocals. See the {@link AtomicTemplate#AtomicTemplate(org.multiverse.api.Stm
|
|
||||||
* ,String, boolean, boolean, int)} for more information.
|
|
||||||
*
|
|
||||||
* @author Peter Veentjer
|
|
||||||
*/
|
|
||||||
public abstract class AtomicTemplate<E> {
|
|
||||||
|
|
||||||
private final static Logger logger = Logger.getLogger(AtomicTemplate.class.getName());
|
|
||||||
|
|
||||||
private final Stm stm;
|
|
||||||
private final boolean ignoreThreadLocalTransaction;
|
|
||||||
private final int retryCount;
|
|
||||||
private final boolean readonly;
|
|
||||||
private int attemptCount;
|
|
||||||
private final String familyName;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new AtomicTemplate that uses the STM stored in the GlobalStm and works the the {@link
|
|
||||||
* org.multiverse.utils.ThreadLocalTransaction}.
|
|
||||||
*/
|
|
||||||
public AtomicTemplate() {
|
|
||||||
this(getGlobalStmInstance());
|
|
||||||
}
|
|
||||||
|
|
||||||
public AtomicTemplate(boolean readonly) {
|
|
||||||
this(getGlobalStmInstance(), null, false, readonly, Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new AtomicTemplate using the provided stm. The transaction used is stores/retrieved from the {@link
|
|
||||||
* org.multiverse.utils.ThreadLocalTransaction}.
|
|
||||||
*
|
|
||||||
* @param stm the stm to use for transactions.
|
|
||||||
* @throws NullPointerException if stm is null.
|
|
||||||
*/
|
|
||||||
public AtomicTemplate(Stm stm) {
|
|
||||||
this(stm, null, false, false, Integer.MAX_VALUE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public AtomicTemplate(String familyName, boolean readonly, int retryCount) {
|
|
||||||
this(getGlobalStmInstance(), familyName, false, readonly, retryCount);
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Creates a new AtomicTemplate that uses the provided STM. This method is provided to make Multiverse easy to
|
|
||||||
* integrate with environment that don't want to depend on threadlocals.
|
|
||||||
*
|
|
||||||
* @param stm the stm to use for transactions.
|
|
||||||
* @param ignoreThreadLocalTransaction true if this Template should completely ignore the ThreadLocalTransaction.
|
|
||||||
* This is useful for using the AtomicTemplate in other environments that don't
|
|
||||||
* want to depend on threadlocals but do want to use the AtomicTemplate.
|
|
||||||
* @throws NullPointerException if stm is null.
|
|
||||||
*/
|
|
||||||
public AtomicTemplate(Stm stm, String familyName, boolean ignoreThreadLocalTransaction, boolean readonly,
|
|
||||||
int retryCount) {
|
|
||||||
if (stm == null) {
|
|
||||||
throw new NullPointerException();
|
|
||||||
}
|
|
||||||
if (retryCount < 0) {
|
|
||||||
throw new IllegalArgumentException();
|
|
||||||
}
|
|
||||||
this.stm = stm;
|
|
||||||
this.ignoreThreadLocalTransaction = ignoreThreadLocalTransaction;
|
|
||||||
this.readonly = readonly;
|
|
||||||
this.retryCount = retryCount;
|
|
||||||
this.familyName = familyName;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getFamilyName() {
|
|
||||||
return familyName;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the current attempt. Value will always be larger than zero and increases everytime the transaction needs
|
|
||||||
* to be retried.
|
|
||||||
*
|
|
||||||
* @return the current attempt count.
|
|
||||||
*/
|
|
||||||
public final int getAttemptCount() {
|
|
||||||
return attemptCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the number of retries that this AtomicTemplate is allowed to do. The returned value will always be equal
|
|
||||||
* or larger than 0.
|
|
||||||
*
|
|
||||||
* @return the number of retries.
|
|
||||||
*/
|
|
||||||
public final int getRetryCount() {
|
|
||||||
return retryCount;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns the {@link Stm} used by this AtomicTemplate to execute transactions on.
|
|
||||||
*
|
|
||||||
* @return the Stm used by this AtomicTemplate.
|
|
||||||
*/
|
|
||||||
public final Stm getStm() {
|
|
||||||
return stm;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Check if this AtomicTemplate ignores the ThreadLocalTransaction.
|
|
||||||
*
|
|
||||||
* @return true if this AtomicTemplate ignores the ThreadLocalTransaction, false otherwise.
|
|
||||||
*/
|
|
||||||
public final boolean isIgnoreThreadLocalTransaction() {
|
|
||||||
return ignoreThreadLocalTransaction;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Checks if this AtomicTemplate executes readonly transactions.
|
|
||||||
*
|
|
||||||
* @return true if it executes readonly transactions, false otherwise.
|
|
||||||
*/
|
|
||||||
public final boolean isReadonly() {
|
|
||||||
return readonly;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do pre-start tasks.
|
|
||||||
*/
|
|
||||||
public void preStart() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do post-start tasks.
|
|
||||||
*
|
|
||||||
* @param t the transaction used for this execution.
|
|
||||||
*/
|
|
||||||
public void postStart(Transaction t) {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do pre-commit tasks.
|
|
||||||
*/
|
|
||||||
public void preCommit() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method can be overridden to do post-commit tasks.
|
|
||||||
*/
|
|
||||||
public void postCommit() {
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* This is the method that needs to be implemented.
|
|
||||||
*
|
|
||||||
* @param t the transaction used for this execution.
|
|
||||||
* @return the result of the execution.
|
|
||||||
*
|
|
||||||
* @throws Exception the Exception thrown
|
|
||||||
*/
|
|
||||||
public abstract E execute(Transaction t) throws Exception;
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes the template.
|
|
||||||
*
|
|
||||||
* @return the result of the {@link #execute(org.multiverse.api.Transaction)} method.
|
|
||||||
*
|
|
||||||
* @throws InvisibleCheckedException if a checked exception was thrown while executing the {@link
|
|
||||||
* #execute(org.multiverse.api.Transaction)} method.
|
|
||||||
* @throws AbortedException if the exception was explicitly aborted.
|
|
||||||
* @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last
|
|
||||||
* failure (also an exception) is included as cause. So you have some idea where
|
|
||||||
* to look for problems
|
|
||||||
*/
|
|
||||||
public final E execute() {
|
|
||||||
try {
|
|
||||||
return executeChecked();
|
|
||||||
} catch (Exception ex) {
|
|
||||||
if (ex instanceof RuntimeException) {
|
|
||||||
throw (RuntimeException) ex;
|
|
||||||
} else {
|
|
||||||
throw new AtomicTemplate.InvisibleCheckedException(ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Executes the Template and rethrows the checked exception instead of wrapping it in a InvisibleCheckedException.
|
|
||||||
*
|
|
||||||
* @return the result
|
|
||||||
*
|
|
||||||
* @throws Exception the Exception thrown inside the {@link #execute(org.multiverse.api.Transaction)}
|
|
||||||
* method.
|
|
||||||
* @throws AbortedException if the exception was explicitly aborted.
|
|
||||||
* @throws TooManyRetriesException if the template retried the transaction too many times. The cause of the last
|
|
||||||
* failure (also an exception) is included as cause. So you have some idea where to
|
|
||||||
* look for problems
|
|
||||||
*/
|
|
||||||
public final E executeChecked() throws Exception {
|
|
||||||
preStart();
|
|
||||||
Transaction t = getTransaction();
|
|
||||||
if (noUsableTransaction(t)) {
|
|
||||||
t = startTransaction();
|
|
||||||
setTransaction(t);
|
|
||||||
postStart(t);
|
|
||||||
try {
|
|
||||||
attemptCount = 1;
|
|
||||||
Exception lastRetryCause = null;
|
|
||||||
while (attemptCount - 1 <= retryCount) {
|
|
||||||
boolean abort = true;
|
|
||||||
boolean reset = false;
|
|
||||||
try {
|
|
||||||
E result = execute(t);
|
|
||||||
if (t.getStatus().equals(TransactionStatus.aborted)) {
|
|
||||||
String msg = format("Transaction with familyname %s is aborted", t.getFamilyName());
|
|
||||||
throw new AbortedException(msg);
|
|
||||||
}
|
|
||||||
preCommit();
|
|
||||||
t.commit();
|
|
||||||
abort = false;
|
|
||||||
reset = false;
|
|
||||||
postCommit();
|
|
||||||
return result;
|
|
||||||
} catch (RetryError e) {
|
|
||||||
Latch latch = new CheapLatch();
|
|
||||||
t.abortAndRegisterRetryLatch(latch);
|
|
||||||
latch.awaitUninterruptible();
|
|
||||||
//since the abort is already done, no need to do it again.
|
|
||||||
abort = false;
|
|
||||||
} catch (CommitFailureException ex) {
|
|
||||||
lastRetryCause = ex;
|
|
||||||
reset = true;
|
|
||||||
//ignore, just retry the transaction
|
|
||||||
} catch (LoadException ex) {
|
|
||||||
lastRetryCause = ex;
|
|
||||||
reset = true;
|
|
||||||
//ignore, just retry the transaction
|
|
||||||
} finally {
|
|
||||||
if (abort) {
|
|
||||||
t.abort();
|
|
||||||
if (reset) {
|
|
||||||
t = t.abortAndReturnRestarted();
|
|
||||||
setTransaction(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
attemptCount++;
|
|
||||||
}
|
|
||||||
|
|
||||||
throw new TooManyRetriesException("Too many retries", lastRetryCause);
|
|
||||||
} finally {
|
|
||||||
setTransaction(null);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
return execute(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private Transaction startTransaction() {
|
|
||||||
return readonly ? stm.startReadOnlyTransaction(familyName) : stm.startUpdateTransaction(familyName);
|
|
||||||
}
|
|
||||||
|
|
||||||
private boolean noUsableTransaction(Transaction t) {
|
|
||||||
return t == null || t.getStatus() != TransactionStatus.active;
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Gets the current Transaction stored in the TransactionThreadLocal.
|
|
||||||
* <p/>
|
|
||||||
* If the ignoreThreadLocalTransaction is set, the threadlocal stuff is completeley ignored.
|
|
||||||
*
|
|
||||||
* @return the found transaction, or null if none is found.
|
|
||||||
*/
|
|
||||||
private Transaction getTransaction() {
|
|
||||||
return ignoreThreadLocalTransaction ? null : getThreadLocalTransaction();
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Stores the transaction in the TransactionThreadLocal.
|
|
||||||
* <p/>
|
|
||||||
* This call is ignored if the ignoreThreadLocalTransaction is true.
|
|
||||||
*
|
|
||||||
* @param t the transaction to set (is allowed to be null).
|
|
||||||
*/
|
|
||||||
private void setTransaction(Transaction t) {
|
|
||||||
if (!ignoreThreadLocalTransaction) {
|
|
||||||
setThreadLocalTransaction(t);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public static class InvisibleCheckedException extends RuntimeException {
|
|
||||||
|
|
||||||
public InvisibleCheckedException(Exception cause) {
|
|
||||||
super(cause);
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public Exception getCause() {
|
|
||||||
return (Exception) super.getCause();
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
17
akka.iml
17
akka.iml
|
|
@ -2,6 +2,23 @@
|
||||||
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
|
<module org.jetbrains.idea.maven.project.MavenProjectsManager.isMavenModule="true" type="JAVA_MODULE" version="4">
|
||||||
<component name="FacetManager">
|
<component name="FacetManager">
|
||||||
<facet type="Scala" name="Scala">
|
<facet type="Scala" name="Scala">
|
||||||
|
<configuration>
|
||||||
|
<option name="myScalaCompilerJarPaths">
|
||||||
|
<array>
|
||||||
|
<option value="$APPLICATION_HOME_DIR$/plugins/Scala/lib/scala-compiler.jar" />
|
||||||
|
</array>
|
||||||
|
</option>
|
||||||
|
<option name="myScalaSdkJarPaths">
|
||||||
|
<array>
|
||||||
|
<option value="$APPLICATION_HOME_DIR$/plugins/Scala/lib/scala-library.jar" />
|
||||||
|
</array>
|
||||||
|
</option>
|
||||||
|
</configuration>
|
||||||
|
</facet>
|
||||||
|
<facet type="Spring" name="Spring">
|
||||||
|
<configuration />
|
||||||
|
</facet>
|
||||||
|
<facet type="WebBeans" name="Web Beans">
|
||||||
<configuration />
|
<configuration />
|
||||||
</facet>
|
</facet>
|
||||||
</component>
|
</component>
|
||||||
|
|
|
||||||
|
|
@ -30,7 +30,9 @@
|
||||||
|
|
||||||
<stm>
|
<stm>
|
||||||
service = on
|
service = on
|
||||||
max-nr-of-retries = 100
|
fair = on # should transactions be fair or non-fair (non fair yield better performance)
|
||||||
|
max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up
|
||||||
|
timeout = 10000 # transaction timeout; if transaction have not committed within the timeout then it is aborted
|
||||||
distributed = off # not implemented yet
|
distributed = off # not implemented yet
|
||||||
</stm>
|
</stm>
|
||||||
|
|
||||||
|
|
@ -47,6 +49,7 @@
|
||||||
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
zlib-compression-level = 6 # Options: 0-9 (1 being fastest and 9 being the most compressed), default is 6
|
||||||
|
|
||||||
<cluster>
|
<cluster>
|
||||||
|
service = on # FIXME add 'service = on' for <cluster>
|
||||||
name = "default" # The name of the cluster
|
name = "default" # The name of the cluster
|
||||||
#actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
|
#actor = "se.scalablesolutions.akka.remote.JGroupsClusterActor" # FQN of an implementation of ClusterActor
|
||||||
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
|
serializer = "se.scalablesolutions.akka.serialization.Serializer$Java" # FQN of the serializer class
|
||||||
|
|
|
||||||
|
|
@ -1,3 +1,39 @@
|
||||||
|
/*-------------------------------------------------------------------------------
|
||||||
|
Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
|
||||||
|
----------------------------------------------------
|
||||||
|
-------- sbt buildfile for the Akka project --------
|
||||||
|
----------------------------------------------------
|
||||||
|
|
||||||
|
Akka implements a unique hybrid of:
|
||||||
|
* Actors , which gives you:
|
||||||
|
* Simple and high-level abstractions for concurrency and parallelism.
|
||||||
|
* Asynchronous, non-blocking and highly performant event-driven programming
|
||||||
|
model.
|
||||||
|
* Very lightweight event-driven processes (create ~6.5 million actors on
|
||||||
|
4 G RAM).
|
||||||
|
* Supervision hierarchies with let-it-crash semantics. For writing highly
|
||||||
|
fault-tolerant systems that never stop, systems that self-heal.
|
||||||
|
* Software Transactional Memory (STM). (Distributed transactions coming soon).
|
||||||
|
* Transactors: combine actors and STM into transactional actors. Allows you to
|
||||||
|
compose atomic message flows with automatic rollback and retry.
|
||||||
|
* Remoting: highly performant distributed actors with remote supervision and
|
||||||
|
error management.
|
||||||
|
* Cluster membership management.
|
||||||
|
|
||||||
|
Akka also has a set of add-on modules:
|
||||||
|
* Persistence: A set of pluggable back-end storage modules that works in sync
|
||||||
|
with the STM.
|
||||||
|
* Cassandra distributed and highly scalable database.
|
||||||
|
* MongoDB document database.
|
||||||
|
* Redis data structures database (upcoming)
|
||||||
|
* REST (JAX-RS): Expose actors as REST services.
|
||||||
|
* Comet: Expose actors as Comet services.
|
||||||
|
* Security: Digest and Kerberos based security.
|
||||||
|
* Microkernel: Run Akka as a stand-alone kernel.
|
||||||
|
|
||||||
|
-------------------------------------------------------------------------------*/
|
||||||
|
|
||||||
import sbt._
|
import sbt._
|
||||||
|
|
||||||
class AkkaParent(info: ProjectInfo) extends ParentProject(info) {
|
class AkkaParent(info: ProjectInfo) extends ParentProject(info) {
|
||||||
|
|
@ -5,7 +41,8 @@ class AkkaParent(info: ProjectInfo) extends ParentProject(info) {
|
||||||
val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo"
|
val sunjdmk = "sunjdmk" at "http://wp5.e-taxonomy.eu/cdmlib/mavenrepo"
|
||||||
val databinder = "DataBinder" at "http://databinder.net/repo"
|
val databinder = "DataBinder" at "http://databinder.net/repo"
|
||||||
val configgy = "Configgy" at "http://www.lag.net/repo"
|
val configgy = "Configgy" at "http://www.lag.net/repo"
|
||||||
val multiverse = "Multiverse" at "http://multiverse.googlecode.com/svn/maven-repository/releases"
|
val codehaus = "Codehaus" at "http://repository.codehaus.org"
|
||||||
|
val codehaus_snapshots = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
|
||||||
val jboss = "jBoss" at "http://repository.jboss.org/maven2"
|
val jboss = "jBoss" at "http://repository.jboss.org/maven2"
|
||||||
val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
|
val guiceyfruit = "GuiceyFruit" at "http://guiceyfruit.googlecode.com/svn/repo/releases/"
|
||||||
val embeddedrepo = "embedded repo" at "http://guice-maven.googlecode.com/svn/trunk"
|
val embeddedrepo = "embedded repo" at "http://guice-maven.googlecode.com/svn/trunk"
|
||||||
|
|
@ -63,7 +100,7 @@ class AkkaParent(info: ProjectInfo) extends ParentProject(info) {
|
||||||
class AkkaJavaUtilProject(info: ProjectInfo) extends DefaultProject(info) {
|
class AkkaJavaUtilProject(info: ProjectInfo) extends DefaultProject(info) {
|
||||||
val guicey = "org.guiceyfruit" % "guice-core" % "2.0-beta-4" % "compile"
|
val guicey = "org.guiceyfruit" % "guice-core" % "2.0-beta-4" % "compile"
|
||||||
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.2.0" % "compile"
|
val protobuf = "com.google.protobuf" % "protobuf-java" % "2.2.0" % "compile"
|
||||||
val multiverse = "org.multiverse" % "multiverse-alpha" % "0.3" % "compile"
|
val multiverse = "org.multiverse" % "multiverse-alpha" % "0.4-SNAPSHOT" % "compile"
|
||||||
}
|
}
|
||||||
|
|
||||||
class AkkaAMQPProject(info: ProjectInfo) extends DefaultProject(info) {
|
class AkkaAMQPProject(info: ProjectInfo) extends DefaultProject(info) {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue