Added tests to AgentTest and cleaned up Agent
This commit is contained in:
parent
79bfc4e446
commit
fba843e02d
7 changed files with 187 additions and 116 deletions
|
|
@ -101,6 +101,23 @@ object Actor extends Logging {
|
||||||
def receive: PartialFunction[Any, Unit] = body
|
def receive: PartialFunction[Any, Unit] = body
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use to create an anonymous transactional event-driven actor.
|
||||||
|
* The actor is started when created.
|
||||||
|
* Example:
|
||||||
|
* <pre>
|
||||||
|
* import Actor._
|
||||||
|
*
|
||||||
|
* val a = transactor {
|
||||||
|
* case msg => ... // handle message
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
def transactor(body: PartialFunction[Any, Unit]): Actor = new Transactor() {
|
||||||
|
start
|
||||||
|
def receive: PartialFunction[Any, Unit] = body
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
|
* Use to create an anonymous event-driven actor with both an init block and a message loop block.
|
||||||
* The actor is started when created.
|
* The actor is started when created.
|
||||||
|
|
@ -202,7 +219,7 @@ object Actor extends Logging {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
trait Actor extends TransactionManagement {
|
trait Actor extends TransactionManagement with Logging {
|
||||||
implicit protected val self: Option[Actor] = Some(this)
|
implicit protected val self: Option[Actor] = Some(this)
|
||||||
implicit protected val transactionFamilyName: String = this.getClass.getName
|
implicit protected val transactionFamilyName: String = this.getClass.getName
|
||||||
|
|
||||||
|
|
@ -341,7 +358,7 @@ trait Actor extends TransactionManagement {
|
||||||
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
|
* Set to true if messages should have REQUIRES_NEW semantics, e.g. a new transaction should
|
||||||
* start if there is no one running, else it joins the existing transaction.
|
* start if there is no one running, else it joins the existing transaction.
|
||||||
*/
|
*/
|
||||||
@volatile protected var isTransactionRequiresNew = false
|
@volatile protected var isTransactor = false
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* User overridable callback/setting.
|
* User overridable callback/setting.
|
||||||
|
|
@ -649,7 +666,7 @@ trait Actor extends TransactionManagement {
|
||||||
def makeTransactionRequired = synchronized {
|
def makeTransactionRequired = synchronized {
|
||||||
if (_isRunning) throw new IllegalArgumentException(
|
if (_isRunning) throw new IllegalArgumentException(
|
||||||
"Can not make actor transaction required after it has been started")
|
"Can not make actor transaction required after it has been started")
|
||||||
else isTransactionRequiresNew = true
|
else isTransactor = true
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -788,18 +805,13 @@ trait Actor extends TransactionManagement {
|
||||||
|
|
||||||
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
|
private def spawnButDoNotStart[T <: Actor](actorClass: Class[T]): T = {
|
||||||
val actor = actorClass.newInstance.asInstanceOf[T]
|
val actor = actorClass.newInstance.asInstanceOf[T]
|
||||||
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) {
|
if (!dispatcher.isInstanceOf[ThreadBasedDispatcher]) actor.dispatcher = dispatcher
|
||||||
actor.dispatcher = dispatcher
|
|
||||||
}
|
|
||||||
actor
|
actor
|
||||||
}
|
}
|
||||||
|
|
||||||
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
|
protected[akka] def postMessageToMailbox(message: Any, sender: Option[Actor]): Unit = {
|
||||||
if (isTransactionSetInScope) {
|
joinTransaction(message)
|
||||||
log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
|
|
||||||
getTransactionSetInScope.incParties
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_remoteAddress.isDefined) {
|
if (_remoteAddress.isDefined) {
|
||||||
val requestBuilder = RemoteRequest.newBuilder
|
val requestBuilder = RemoteRequest.newBuilder
|
||||||
.setId(RemoteRequestIdFactory.nextId)
|
.setId(RemoteRequestIdFactory.nextId)
|
||||||
|
|
@ -815,14 +827,14 @@ trait Actor extends TransactionManagement {
|
||||||
|
|
||||||
// set the source fields used to reply back to the original sender
|
// set the source fields used to reply back to the original sender
|
||||||
// (i.e. not the remote proxy actor)
|
// (i.e. not the remote proxy actor)
|
||||||
if(sender.isDefined) {
|
if (sender.isDefined) {
|
||||||
val s = sender.get
|
val s = sender.get
|
||||||
requestBuilder.setSourceTarget(s.getClass.getName)
|
requestBuilder.setSourceTarget(s.getClass.getName)
|
||||||
requestBuilder.setSourceUuid(s.uuid)
|
requestBuilder.setSourceUuid(s.uuid)
|
||||||
|
|
||||||
val (host,port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
|
val (host, port) = s._replyToAddress.map(a => (a.getHostName,a.getPort)).getOrElse((Actor.HOSTNAME,Actor.PORT))
|
||||||
|
|
||||||
log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
|
Actor.log.debug("Setting sending actor as %s @ %s:%s", s.getClass.getName, host, port)
|
||||||
|
|
||||||
requestBuilder.setSourceHostname(host)
|
requestBuilder.setSourceHostname(host)
|
||||||
requestBuilder.setSourcePort(port)
|
requestBuilder.setSourcePort(port)
|
||||||
|
|
@ -835,20 +847,15 @@ trait Actor extends TransactionManagement {
|
||||||
_mailbox.add(invocation)
|
_mailbox.add(invocation)
|
||||||
if (_isSuspended) invocation.send
|
if (_isSuspended) invocation.send
|
||||||
}
|
}
|
||||||
else
|
else invocation.send
|
||||||
invocation.send
|
|
||||||
}
|
}
|
||||||
clearTransactionSet
|
|
||||||
}
|
}
|
||||||
|
|
||||||
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
protected[akka] def postMessageToMailboxAndCreateFutureResultWithTimeout(
|
||||||
message: Any,
|
message: Any,
|
||||||
timeout: Long,
|
timeout: Long,
|
||||||
senderFuture: Option[CompletableFuture]): CompletableFuture = {
|
senderFuture: Option[CompletableFuture]): CompletableFuture = {
|
||||||
if (isTransactionSetInScope) {
|
joinTransaction(message)
|
||||||
log.trace("Adding transaction for %s with message [%s] to transaction set", toString, message)
|
|
||||||
getTransactionSetInScope.incParties
|
|
||||||
}
|
|
||||||
|
|
||||||
if (_remoteAddress.isDefined) {
|
if (_remoteAddress.isDefined) {
|
||||||
val requestBuilder = RemoteRequest.newBuilder
|
val requestBuilder = RemoteRequest.newBuilder
|
||||||
|
|
@ -863,7 +870,6 @@ trait Actor extends TransactionManagement {
|
||||||
val id = registerSupervisorAsRemoteActor
|
val id = registerSupervisorAsRemoteActor
|
||||||
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
if (id.isDefined) requestBuilder.setSupervisorUuid(id.get)
|
||||||
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
|
val future = RemoteClient.clientFor(_remoteAddress.get).send(requestBuilder.build, senderFuture)
|
||||||
clearTransactionSet
|
|
||||||
if (future.isDefined) future.get
|
if (future.isDefined) future.get
|
||||||
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
|
||||||
} else {
|
} else {
|
||||||
|
|
@ -874,11 +880,17 @@ trait Actor extends TransactionManagement {
|
||||||
_mailbox.add(invocation)
|
_mailbox.add(invocation)
|
||||||
invocation.send
|
invocation.send
|
||||||
} else invocation.send
|
} else invocation.send
|
||||||
clearTransactionSet
|
|
||||||
future
|
future
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private def joinTransaction(message: Any) = if (isTransactionSetInScope) {
|
||||||
|
// FIXME test to run bench without this trace call
|
||||||
|
Actor.log.trace("Joining transaction set [%s];\n\tactor %s\n\twith message [%s]",
|
||||||
|
getTransactionSetInScope, toString, message)
|
||||||
|
getTransactionSetInScope.incParties
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
|
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods.
|
||||||
*/
|
*/
|
||||||
|
|
@ -921,8 +933,9 @@ trait Actor extends TransactionManagement {
|
||||||
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
|
if (messageHandle.transactionSet.isDefined) messageHandle.transactionSet
|
||||||
else {
|
else {
|
||||||
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
|
topLevelTransaction = true // FIXME create a new internal atomic block that can wait for X seconds if top level tx
|
||||||
if (isTransactionRequiresNew) {
|
if (isTransactor) {
|
||||||
log.trace("Creating a new transaction set (top-level transaction) \nfor actor %s \nwith message %s", toString, messageHandle)
|
Actor.log.trace("Creating a new transaction set (top-level transaction)\n\tfor actor %s\n\twith message %s",
|
||||||
|
toString, messageHandle)
|
||||||
Some(createNewTransactionSet)
|
Some(createNewTransactionSet)
|
||||||
} else None
|
} else None
|
||||||
}
|
}
|
||||||
|
|
@ -932,11 +945,6 @@ trait Actor extends TransactionManagement {
|
||||||
senderFuture = messageHandle.future
|
senderFuture = messageHandle.future
|
||||||
sender = messageHandle.sender
|
sender = messageHandle.sender
|
||||||
|
|
||||||
def clearTx = {
|
|
||||||
clearTransactionSet
|
|
||||||
clearTransaction
|
|
||||||
}
|
|
||||||
|
|
||||||
def proceed = {
|
def proceed = {
|
||||||
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
|
||||||
else throw new IllegalArgumentException(
|
else throw new IllegalArgumentException(
|
||||||
|
|
@ -946,7 +954,7 @@ trait Actor extends TransactionManagement {
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
try {
|
||||||
if (isTransactionRequiresNew) {
|
if (isTransactor) {
|
||||||
atomic {
|
atomic {
|
||||||
proceed
|
proceed
|
||||||
}
|
}
|
||||||
|
|
@ -955,16 +963,21 @@ trait Actor extends TransactionManagement {
|
||||||
case e: IllegalStateException => {}
|
case e: IllegalStateException => {}
|
||||||
case e =>
|
case e =>
|
||||||
// abort transaction set
|
// abort transaction set
|
||||||
if (isTransactionSetInScope) try { getTransactionSetInScope.abort } catch { case e: IllegalStateException => {} }
|
if (isTransactionSetInScope) try {
|
||||||
|
getTransactionSetInScope.abort
|
||||||
|
} catch { case e: IllegalStateException => {} }
|
||||||
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
Actor.log.error(e, "Exception when invoking \n\tactor [%s] \n\twith message [%s]", this, message)
|
||||||
|
|
||||||
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
if (senderFuture.isDefined) senderFuture.get.completeWithException(this, e)
|
||||||
clearTx // need to clear currentTransaction before call to supervisor
|
|
||||||
|
clearTransaction
|
||||||
|
if (topLevelTransaction) clearTransactionSet
|
||||||
|
|
||||||
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
|
||||||
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
|
||||||
} finally {
|
} finally {
|
||||||
clearTx
|
clearTransaction
|
||||||
|
if (topLevelTransaction) clearTransactionSet
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -18,6 +18,8 @@ import se.scalablesolutions.akka.stm.Ref
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
import java.util.concurrent.CountDownLatch
|
import java.util.concurrent.CountDownLatch
|
||||||
|
|
||||||
|
class AgentException private[akka](message: String) extends RuntimeException(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The Agent class was strongly inspired by the agent principle in Clojure.
|
* The Agent class was strongly inspired by the agent principle in Clojure.
|
||||||
|
|
@ -35,14 +37,27 @@ import java.util.concurrent.CountDownLatch
|
||||||
* The code that is submitted to an agent doesn't need to pay attention to
|
* The code that is submitted to an agent doesn't need to pay attention to
|
||||||
* threading or synchronization, the agent will provide such guarantees by itself.
|
* threading or synchronization, the agent will provide such guarantees by itself.
|
||||||
*
|
*
|
||||||
* See the examples of use for more details.
|
* Example of usage:
|
||||||
|
* <pre>
|
||||||
|
* val agent = Agent(5)
|
||||||
|
*
|
||||||
|
* agent update (_ + 1)
|
||||||
|
* agent update (_ * 2)
|
||||||
|
*
|
||||||
|
* val result = agent()
|
||||||
|
* ... // use result
|
||||||
|
*
|
||||||
|
* agent.close
|
||||||
|
* </pre>
|
||||||
*
|
*
|
||||||
* @author Vaclav Pech
|
* @author Vaclav Pech
|
||||||
* Date: Oct 18, 2009
|
* Date: Oct 18, 2009
|
||||||
*
|
*
|
||||||
* AKKA port by
|
* Inital AKKA port by
|
||||||
* @author Viktor Klang
|
* @author Viktor Klang
|
||||||
* Date: Jan 24 2010
|
* Date: Jan 24 2010
|
||||||
|
*
|
||||||
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
sealed class Agent[T] private (initialValue: T) extends Transactor {
|
sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||||
import Agent._
|
import Agent._
|
||||||
|
|
@ -52,7 +67,7 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||||
this ! ValueHolder(initialValue)
|
this ! ValueHolder(initialValue)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Periodically handles incoming messages
|
* Periodically handles incoming messages.
|
||||||
*/
|
*/
|
||||||
def receive = {
|
def receive = {
|
||||||
case ValueHolder(x: T) => updateData(x)
|
case ValueHolder(x: T) => updateData(x)
|
||||||
|
|
@ -61,13 +76,13 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Specifies how a copy of the value is made, defaults to using identity
|
* Specifies how a copy of the value is made, defaults to using identity.
|
||||||
*/
|
*/
|
||||||
protected def copyStrategy(t: T): T = t
|
protected def copyStrategy(t: T): T = t
|
||||||
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Updates the internal state with the value provided as a by-name parameter
|
* Updates the internal state with the value provided as a by-name parameter.
|
||||||
*/
|
*/
|
||||||
private final def updateData(newData: => T): Unit = value.swap(newData)
|
private final def updateData(newData: => T): Unit = value.swap(newData)
|
||||||
|
|
||||||
|
|
@ -79,6 +94,8 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||||
* method and then waits for its result on a CountDownLatch.
|
* method and then waits for its result on a CountDownLatch.
|
||||||
*/
|
*/
|
||||||
final def get: T = {
|
final def get: T = {
|
||||||
|
if (isTransactionInScope) throw new AgentException(
|
||||||
|
"Can't call Agent.get within an enclosing transaction.\n\tWould block indefinitely.\n\tPlease refactor your code.")
|
||||||
val ref = new AtomicReference[T]
|
val ref = new AtomicReference[T]
|
||||||
val latch = new CountDownLatch(1)
|
val latch = new CountDownLatch(1)
|
||||||
get((x: T) => {ref.set(x); latch.countDown})
|
get((x: T) => {ref.set(x); latch.countDown})
|
||||||
|
|
@ -99,33 +116,34 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||||
* the asynchronous getValue() method and then waits for its result on a CountDownLatch.
|
* the asynchronous getValue() method and then waits for its result on a CountDownLatch.
|
||||||
*/
|
*/
|
||||||
final def apply(): T = get
|
final def apply(): T = get
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Asynchronously submits a request to read the internal state. The supplied function
|
* Submits the provided function for execution against the internal agent's state.
|
||||||
* will be executed on the returned internal state value. A copy of the internal state
|
|
||||||
* will be used, depending on the underlying effective copyStrategy.
|
|
||||||
*/
|
|
||||||
// final def apply(message: (T => Unit)) : Unit = get(message)
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Submits the provided function for execution against the internal agent's state
|
|
||||||
*/
|
*/
|
||||||
final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
|
final def apply(message: (T => T)): Unit = this ! FunctionHolder(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits a new value to be set as the new agent's internal state
|
* Submits a new value to be set as the new agent's internal state.
|
||||||
*/
|
*/
|
||||||
final def apply(message: T): Unit = this ! ValueHolder(message)
|
final def apply(message: T): Unit = this ! ValueHolder(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits the provided function for execution against the internal agent's state
|
* Submits the provided function for execution against the internal agent's state.
|
||||||
*/
|
*/
|
||||||
final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
|
final def update(message: (T => T)): Unit = this ! FunctionHolder(message)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Submits a new value to be set as the new agent's internal state
|
* Submits a new value to be set as the new agent's internal state.
|
||||||
*/
|
*/
|
||||||
|
// FIXME Change to 'send' when we have Scala 2.8 and we can remove the Actor.send method
|
||||||
final def update(message: T): Unit = this ! ValueHolder(message)
|
final def update(message: T): Unit = this ! ValueHolder(message)
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Closes the agents and makes it eligable for garbage collection.
|
||||||
|
*
|
||||||
|
* A closed agent can never be used again.
|
||||||
|
*/
|
||||||
|
def close = stop
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -134,22 +152,22 @@ sealed class Agent[T] private (initialValue: T) extends Transactor {
|
||||||
object Agent {
|
object Agent {
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* The internal messages for passing around requests
|
* The internal messages for passing around requests.
|
||||||
*/
|
*/
|
||||||
private case class ProcedureHolder[T](val fun: ((T) => Unit))
|
private case class ProcedureHolder[T](fun: ((T) => Unit))
|
||||||
private case class FunctionHolder[T](val fun: ((T) => T))
|
private case class FunctionHolder[T](fun: ((T) => T))
|
||||||
private case class ValueHolder[T](val value: T)
|
private case class ValueHolder[T](value: T)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new Agent of type T with the initial value of value
|
* Creates a new Agent of type T with the initial value of value.
|
||||||
*/
|
*/
|
||||||
def apply[T](value:T): Agent[T] = new Agent(value)
|
def apply[T](value: T): Agent[T] = new Agent(value)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a new Agent of type T with the initial value of value and with the
|
* Creates a new Agent of type T with the initial value of value and with the
|
||||||
* specified copy function
|
* specified copy function.
|
||||||
*/
|
*/
|
||||||
def apply[T](value:T, newCopyStrategy: (T) => T) = new Agent(value) {
|
def apply[T](value: T, newCopyStrategy: (T) => T) = new Agent(value) {
|
||||||
override def copyStrategy(t : T) = newCopyStrategy(t)
|
override def copyStrategy(t: T) = newCopyStrategy(t)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -97,66 +97,43 @@ class TransactionRetryException(message: String) extends RuntimeException(messag
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
object Transaction extends TransactionManagement {
|
object Transaction extends TransactionManagement with Logging {
|
||||||
val idFactory = new AtomicLong(-1L)
|
val idFactory = new AtomicLong(-1L)
|
||||||
/*
|
|
||||||
import AlphaStm._
|
|
||||||
private val defaultTxBuilder = new AlphaTransactionFactoryBuilder
|
|
||||||
defaultTxBuilder.setReadonly(false)
|
|
||||||
defaultTxBuilder.setInterruptible(INTERRUPTIBLE)
|
|
||||||
defaultTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
|
|
||||||
defaultTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
|
|
||||||
defaultTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
|
|
||||||
defaultTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
|
|
||||||
defaultTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
|
|
||||||
private val readOnlyTxBuilder = new AlphaStm.AlphaTransactionFactoryBuilder
|
|
||||||
readOnlyTxBuilder.setReadonly(true)
|
|
||||||
readOnlyTxBuilder.setInterruptible(INTERRUPTIBLE)
|
|
||||||
readOnlyTxBuilder.setMaxRetryCount(MAX_NR_OF_RETRIES)
|
|
||||||
readOnlyTxBuilder.setPreventWriteSkew(PREVENT_WRITE_SKEW)
|
|
||||||
readOnlyTxBuilder.setAutomaticReadTracking(AUTOMATIC_READ_TRACKING)
|
|
||||||
readOnlyTxBuilder.setSmartTxLengthSelector(SMART_TX_LENGTH_SELECTOR)
|
|
||||||
readOnlyTxBuilder.setBackoffPolicy(new ExponentialBackoffPolicy)
|
|
||||||
*/
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on Transaction class.
|
||||||
*/
|
*/
|
||||||
def map[T](f: => T)(implicit transactionFamilyName: String): T =
|
def map[T](f: => T)(implicit transactionFamilyName: String): T =
|
||||||
atomic {f}
|
atomic {f}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on Transaction class.
|
||||||
*/
|
*/
|
||||||
def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
|
def flatMap[T](f: => T)(implicit transactionFamilyName: String): T =
|
||||||
atomic {f}
|
atomic {f}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* See ScalaDoc on class.
|
* See ScalaDoc on Transaction class.
|
||||||
*/
|
*/
|
||||||
def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
|
def foreach(f: => Unit)(implicit transactionFamilyName: String): Unit =
|
||||||
atomic {f}
|
atomic {f}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates a "pure" STM atomic transaction and by-passes all transactions hooks
|
* See ScalaDoc on Transaction class.
|
||||||
* such as persistence etc.
|
|
||||||
* Only for internal usage.
|
|
||||||
*/
|
|
||||||
private[akka] def pureAtomic[T](body: => T): T = new TransactionTemplate[T]() {
|
|
||||||
def execute(mtx: MultiverseTransaction): T = body
|
|
||||||
}.execute()
|
|
||||||
|
|
||||||
/**
|
|
||||||
* See ScalaDoc on class.
|
|
||||||
*/
|
*/
|
||||||
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
|
def atomic[T](body: => T)(implicit transactionFamilyName: String): T = {
|
||||||
|
// FIXME use Transaction Builder and set the transactionFamilyName
|
||||||
// defaultTxBuilder.setFamilyName(transactionFamilyName)
|
// defaultTxBuilder.setFamilyName(transactionFamilyName)
|
||||||
// new TransactionTemplate[T](defaultTxBuilder.build) {
|
// new TransactionTemplate[T](defaultTxBuilder.build) {
|
||||||
new TransactionTemplate[T]() { // FIXME take factory
|
var isTopLevelTransaction = true
|
||||||
|
new TransactionTemplate[T]() {
|
||||||
def execute(mtx: MultiverseTransaction): T = {
|
def execute(mtx: MultiverseTransaction): T = {
|
||||||
val result = body
|
val result = body
|
||||||
|
|
||||||
log.trace("Committing transaction [%s] \nwith family name [%s] \nby joining transaction set")
|
val txSet = getTransactionSetInScope
|
||||||
getTransactionSetInScope.joinCommit(mtx)
|
log.trace("Committing transaction [%s]\n\twith family name [%s]\n\tby joining transaction set [%s]",
|
||||||
|
mtx, transactionFamilyName, txSet)
|
||||||
|
txSet.joinCommit(mtx)
|
||||||
|
|
||||||
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||||
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
|
||||||
|
|
@ -166,8 +143,11 @@ object Transaction extends TransactionManagement {
|
||||||
}
|
}
|
||||||
|
|
||||||
override def onStart(mtx: MultiverseTransaction) = {
|
override def onStart(mtx: MultiverseTransaction) = {
|
||||||
val txSet = if (!isTransactionSetInScope) createNewTransactionSet
|
val txSet =
|
||||||
else getTransactionSetInScope
|
if (!isTransactionSetInScope) {
|
||||||
|
isTopLevelTransaction = true
|
||||||
|
createNewTransactionSet
|
||||||
|
} else getTransactionSetInScope
|
||||||
val tx = new Transaction
|
val tx = new Transaction
|
||||||
tx.transaction = Some(mtx)
|
tx.transaction = Some(mtx)
|
||||||
setTransaction(Some(tx))
|
setTransaction(Some(tx))
|
||||||
|
|
@ -197,6 +177,16 @@ object Transaction extends TransactionManagement {
|
||||||
def orelserun(t: MultiverseTransaction) = secondBody
|
def orelserun(t: MultiverseTransaction) = secondBody
|
||||||
}.execute()
|
}.execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a STM atomic transaction and by-passes all transactions hooks
|
||||||
|
* such as persistence etc.
|
||||||
|
*
|
||||||
|
* Only for internal usage.
|
||||||
|
*/
|
||||||
|
private[akka] def atomic0[T](body: => T): T = new TransactionTemplate[T]() {
|
||||||
|
def execute(mtx: MultiverseTransaction): T = body
|
||||||
|
}.execute()
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -205,18 +195,19 @@ object Transaction extends TransactionManagement {
|
||||||
@serializable class Transaction extends Logging {
|
@serializable class Transaction extends Logging {
|
||||||
import Transaction._
|
import Transaction._
|
||||||
|
|
||||||
log.trace("Creating %s", toString)
|
|
||||||
val id = Transaction.idFactory.incrementAndGet
|
val id = Transaction.idFactory.incrementAndGet
|
||||||
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
|
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
|
||||||
private[akka] var transaction: Option[MultiverseTransaction] = None
|
private[akka] var transaction: Option[MultiverseTransaction] = None
|
||||||
private[this] val persistentStateMap = new HashMap[String, Committable]
|
private[this] val persistentStateMap = new HashMap[String, Committable]
|
||||||
private[akka] val depth = new AtomicInteger(0)
|
private[akka] val depth = new AtomicInteger(0)
|
||||||
|
|
||||||
|
log.trace("Creating %s", toString)
|
||||||
|
|
||||||
// --- public methods ---------
|
// --- public methods ---------
|
||||||
|
|
||||||
def commit = synchronized {
|
def commit = synchronized {
|
||||||
log.trace("Committing transaction %s", toString)
|
log.trace("Committing transaction %s", toString)
|
||||||
pureAtomic {
|
atomic0 {
|
||||||
persistentStateMap.values.foreach(_.commit)
|
persistentStateMap.values.foreach(_.commit)
|
||||||
}
|
}
|
||||||
status = TransactionStatus.Completed
|
status = TransactionStatus.Completed
|
||||||
|
|
@ -261,21 +252,21 @@ object Transaction extends TransactionManagement {
|
||||||
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
|
"Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
|
||||||
|
|
||||||
// For reinitialize transaction after sending it over the wire
|
// For reinitialize transaction after sending it over the wire
|
||||||
private[akka] def reinit = synchronized {
|
/* private[akka] def reinit = synchronized {
|
||||||
import net.lag.logging.{Logger, Level}
|
import net.lag.logging.{Logger, Level}
|
||||||
if (log eq null) {
|
if (log eq null) {
|
||||||
log = Logger.get(this.getClass.getName)
|
log = Logger.get(this.getClass.getName)
|
||||||
log.setLevel(Level.ALL) // TODO: preserve logging level
|
log.setLevel(Level.ALL) // TODO: preserve logging level
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
*/
|
||||||
override def equals(that: Any): Boolean = synchronized {
|
override def equals(that: Any): Boolean = synchronized {
|
||||||
that != null &&
|
that != null &&
|
||||||
that.isInstanceOf[Transaction] &&
|
that.isInstanceOf[Transaction] &&
|
||||||
that.asInstanceOf[Transaction].id == this.id
|
that.asInstanceOf[Transaction].id == this.id
|
||||||
}
|
}
|
||||||
|
|
||||||
override def hashCode(): Int = synchronized { id.toInt }
|
override def hashCode: Int = synchronized { id.toInt }
|
||||||
|
|
||||||
override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
|
override def toString = synchronized { "Transaction[" + id + ", " + status + "]" }
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,8 +6,6 @@ package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
import java.util.concurrent.atomic.AtomicBoolean
|
||||||
|
|
||||||
import se.scalablesolutions.akka.util.Logging
|
|
||||||
|
|
||||||
import org.multiverse.api.ThreadLocalTransaction._
|
import org.multiverse.api.ThreadLocalTransaction._
|
||||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||||
|
|
||||||
|
|
@ -40,8 +38,8 @@ object TransactionManagement extends TransactionManagement {
|
||||||
|
|
||||||
private[akka] def getTransactionSet: CountDownCommitBarrier = {
|
private[akka] def getTransactionSet: CountDownCommitBarrier = {
|
||||||
val option = transactionSet.get
|
val option = transactionSet.get
|
||||||
if ((option eq null) || option.isEmpty) throw new IllegalStateException("No TransactionSet in scope")
|
if ((option eq null) || option.isEmpty) throw new IllegalStateException("No Transaction set in scope")
|
||||||
option.get
|
else option.get
|
||||||
}
|
}
|
||||||
|
|
||||||
private[akka] def getTransaction: Transaction = {
|
private[akka] def getTransaction: Transaction = {
|
||||||
|
|
@ -51,7 +49,7 @@ object TransactionManagement extends TransactionManagement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
trait TransactionManagement extends Logging {
|
trait TransactionManagement {
|
||||||
|
|
||||||
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
|
private[akka] def createNewTransactionSet: CountDownCommitBarrier = {
|
||||||
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
|
val txSet = new CountDownCommitBarrier(1, TransactionManagement.FAIR_TRANSACTIONS)
|
||||||
|
|
|
||||||
|
|
@ -1,10 +1,14 @@
|
||||||
package se.scalablesolutions.akka.actor
|
package se.scalablesolutions.akka.actor
|
||||||
|
|
||||||
import org.scalatest.Suite
|
import se.scalablesolutions.akka.actor.Actor.transactor
|
||||||
|
import se.scalablesolutions.akka.stm.Transaction.atomic
|
||||||
import se.scalablesolutions.akka.util.Logging
|
import se.scalablesolutions.akka.util.Logging
|
||||||
import org.junit.runner.RunWith
|
|
||||||
|
import org.scalatest.Suite
|
||||||
import org.scalatest.junit.JUnitRunner
|
import org.scalatest.junit.JUnitRunner
|
||||||
import org.scalatest.matchers.MustMatchers
|
import org.scalatest.matchers.MustMatchers
|
||||||
|
|
||||||
|
import org.junit.runner.RunWith
|
||||||
import org.junit.{Test}
|
import org.junit.{Test}
|
||||||
|
|
||||||
@RunWith(classOf[JUnitRunner])
|
@RunWith(classOf[JUnitRunner])
|
||||||
|
|
@ -12,7 +16,9 @@ class AgentTest extends junit.framework.TestCase
|
||||||
with Suite with MustMatchers
|
with Suite with MustMatchers
|
||||||
with ActorTestUtil with Logging {
|
with ActorTestUtil with Logging {
|
||||||
|
|
||||||
@Test def testAgent = verify(new TestActor {
|
implicit val txFamilyName = "test"
|
||||||
|
|
||||||
|
@Test def testSendFun = verify(new TestActor {
|
||||||
def test = {
|
def test = {
|
||||||
val agent = Agent(5)
|
val agent = Agent(5)
|
||||||
handle(agent) {
|
handle(agent) {
|
||||||
|
|
@ -24,4 +30,49 @@ with ActorTestUtil with Logging {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@Test def testSendValue = verify(new TestActor {
|
||||||
|
def test = {
|
||||||
|
val agent = Agent(5)
|
||||||
|
handle(agent) {
|
||||||
|
agent update 6
|
||||||
|
val result = agent()
|
||||||
|
result must be(6)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
@Test def testOneAgentUpdateWithinEnlosingTransactionSuccess = {
|
||||||
|
case object Go
|
||||||
|
val agent = Agent(5)
|
||||||
|
val tx = transactor {
|
||||||
|
case Go => agent update (_ + 1)
|
||||||
|
}
|
||||||
|
tx send Go
|
||||||
|
Thread.sleep(5000)
|
||||||
|
val result = agent()
|
||||||
|
result must be(6)
|
||||||
|
agent.close
|
||||||
|
tx.stop
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test def testDoingAgentGetInEnlosingTransactionShouldYieldException = {
|
||||||
|
import java.util.concurrent.CountDownLatch
|
||||||
|
case object Go
|
||||||
|
val latch = new CountDownLatch(1)
|
||||||
|
val agent = Agent(5)
|
||||||
|
val tx = transactor {
|
||||||
|
case Go =>
|
||||||
|
agent update (_ * 2)
|
||||||
|
try { agent() }
|
||||||
|
catch {
|
||||||
|
case _ => latch.countDown
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tx send Go
|
||||||
|
latch.await // FIXME should await with timeout and fail if timeout
|
||||||
|
agent.close
|
||||||
|
tx.stop
|
||||||
|
assert(true)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -17,7 +17,7 @@ import java.net.UnknownHostException
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
trait Logging {
|
trait Logging {
|
||||||
@transient @volatile var log = Logger.get(this.getClass.getName)
|
@transient @volatile lazy val log = Logger.get(this.getClass.getName)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
|
||||||
|
|
@ -8,7 +8,7 @@
|
||||||
<log>
|
<log>
|
||||||
filename = "./logs/akka.log"
|
filename = "./logs/akka.log"
|
||||||
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
|
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
|
||||||
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
|
level = "trace" # Options: fatal, critical, error, warning, info, debug, trace
|
||||||
console = on
|
console = on
|
||||||
# syslog_host = ""
|
# syslog_host = ""
|
||||||
# syslog_server_name = ""
|
# syslog_server_name = ""
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue