Some Java friendliness for STM
This commit is contained in:
parent
8e48ace976
commit
bea8f16264
23 changed files with 611 additions and 122 deletions
|
|
@ -16,7 +16,7 @@ import org.multiverse.transactional.refs.BasicRef
|
||||||
object Ref {
|
object Ref {
|
||||||
def apply[T]() = new Ref[T]()
|
def apply[T]() = new Ref[T]()
|
||||||
|
|
||||||
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
|
def apply[T](initialValue: T) = new Ref[T](initialValue)
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* An implicit conversion that converts a Ref to an Iterable value.
|
* An implicit conversion that converts a Ref to an Iterable value.
|
||||||
|
|
@ -29,13 +29,10 @@ object Ref {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class Ref[T](initialOpt: Option[T] = None)
|
class Ref[T](initialValue: T) extends BasicRef[T](initialValue) with Transactional {
|
||||||
extends BasicRef[T](initialOpt.getOrElse(null.asInstanceOf[T]))
|
|
||||||
with Transactional {
|
|
||||||
|
|
||||||
self =>
|
self =>
|
||||||
|
|
||||||
def this() = this(None) // Java compatibility
|
def this() = this(null.asInstanceOf[T])
|
||||||
|
|
||||||
val uuid = UUID.newUuid.toString
|
val uuid = UUID.newUuid.toString
|
||||||
|
|
||||||
|
|
|
||||||
84
akka-core/src/main/scala/stm/TransactionFactoryBuilder.scala
Normal file
84
akka-core/src/main/scala/stm/TransactionFactoryBuilder.scala
Normal file
|
|
@ -0,0 +1,84 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
|
import java.lang.{Boolean => JBoolean}
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.util.Duration
|
||||||
|
|
||||||
|
import org.multiverse.api.TraceLevel
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For more easily creating TransactionConfig from Java.
|
||||||
|
*/
|
||||||
|
class TransactionConfigBuilder {
|
||||||
|
var familyName: String = TransactionConfig.FAMILY_NAME
|
||||||
|
var readonly: JBoolean = TransactionConfig.READONLY
|
||||||
|
var maxRetries: Int = TransactionConfig.MAX_RETRIES
|
||||||
|
var timeout: Duration = TransactionConfig.DefaultTimeout
|
||||||
|
var trackReads: JBoolean = TransactionConfig.TRACK_READS
|
||||||
|
var writeSkew: Boolean = TransactionConfig.WRITE_SKEW
|
||||||
|
var explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES
|
||||||
|
var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE
|
||||||
|
var speculative: Boolean = TransactionConfig.SPECULATIVE
|
||||||
|
var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
|
||||||
|
var traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL
|
||||||
|
var hooks: Boolean = TransactionConfig.HOOKS
|
||||||
|
|
||||||
|
def setFamilyName(familyName: String) = { this.familyName = familyName; this }
|
||||||
|
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
|
||||||
|
def setMaxRetries(maxRetries: Int) = { this.maxRetries = maxRetries; this }
|
||||||
|
def setTimeout(timeout: Duration) = { this.timeout = timeout; this }
|
||||||
|
def setTrackReads(trackReads: JBoolean) = { this.trackReads = trackReads; this }
|
||||||
|
def setWriteSkew(writeSkew: Boolean) = { this.writeSkew = writeSkew; this }
|
||||||
|
def setExplicitRetries(explicitRetries: Boolean) = { this.explicitRetries = explicitRetries; this }
|
||||||
|
def setInterruptible(interruptible: Boolean) = { this.interruptible = interruptible; this }
|
||||||
|
def setSpeculative(speculative: Boolean) = { this.speculative = speculative; this }
|
||||||
|
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
|
||||||
|
def setTraceLevel(traceLevel: TraceLevel) = { this.traceLevel = traceLevel; this }
|
||||||
|
def setHooks(hooks: Boolean) = { this.hooks = hooks; this }
|
||||||
|
|
||||||
|
def build = new TransactionConfig(
|
||||||
|
familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
|
||||||
|
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For more easily creating TransactionFactory from Java.
|
||||||
|
*/
|
||||||
|
class TransactionFactoryBuilder {
|
||||||
|
var familyName: String = TransactionConfig.FAMILY_NAME
|
||||||
|
var readonly: JBoolean = TransactionConfig.READONLY
|
||||||
|
var maxRetries: Int = TransactionConfig.MAX_RETRIES
|
||||||
|
var timeout: Duration = TransactionConfig.DefaultTimeout
|
||||||
|
var trackReads: JBoolean = TransactionConfig.TRACK_READS
|
||||||
|
var writeSkew: Boolean = TransactionConfig.WRITE_SKEW
|
||||||
|
var explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES
|
||||||
|
var interruptible: Boolean = TransactionConfig.INTERRUPTIBLE
|
||||||
|
var speculative: Boolean = TransactionConfig.SPECULATIVE
|
||||||
|
var quickRelease: Boolean = TransactionConfig.QUICK_RELEASE
|
||||||
|
var traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL
|
||||||
|
var hooks: Boolean = TransactionConfig.HOOKS
|
||||||
|
|
||||||
|
def setFamilyName(familyName: String) = { this.familyName = familyName; this }
|
||||||
|
def setReadonly(readonly: JBoolean) = { this.readonly = readonly; this }
|
||||||
|
def setMaxRetries(maxRetries: Int) = { this.maxRetries = maxRetries; this }
|
||||||
|
def setTimeout(timeout: Duration) = { this.timeout = timeout; this }
|
||||||
|
def setTrackReads(trackReads: JBoolean) = { this.trackReads = trackReads; this }
|
||||||
|
def setWriteSkew(writeSkew: Boolean) = { this.writeSkew = writeSkew; this }
|
||||||
|
def setExplicitRetries(explicitRetries: Boolean) = { this.explicitRetries = explicitRetries; this }
|
||||||
|
def setInterruptible(interruptible: Boolean) = { this.interruptible = interruptible; this }
|
||||||
|
def setSpeculative(speculative: Boolean) = { this.speculative = speculative; this }
|
||||||
|
def setQuickRelease(quickRelease: Boolean) = { this.quickRelease = quickRelease; this }
|
||||||
|
def setTraceLevel(traceLevel: TraceLevel) = { this.traceLevel = traceLevel; this }
|
||||||
|
def setHooks(hooks: Boolean) = { this.hooks = hooks; this }
|
||||||
|
|
||||||
|
def build = {
|
||||||
|
val config = new TransactionConfig(
|
||||||
|
familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
|
||||||
|
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
|
||||||
|
new TransactionFactory(config)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -4,16 +4,11 @@
|
||||||
|
|
||||||
package se.scalablesolutions.akka.stm
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
import se.scalablesolutions.akka.util.Logging
|
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicBoolean
|
|
||||||
import java.util.concurrent.TimeUnit
|
|
||||||
|
|
||||||
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
|
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
|
||||||
import org.multiverse.api.ThreadLocalTransaction._
|
import org.multiverse.api.ThreadLocalTransaction._
|
||||||
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||||
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
import org.multiverse.commitbarriers.CountDownCommitBarrier
|
||||||
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
|
import org.multiverse.templates.OrElseTemplate
|
||||||
|
|
||||||
class TransactionSetAbortedException(msg: String) extends RuntimeException(msg)
|
class TransactionSetAbortedException(msg: String) extends RuntimeException(msg)
|
||||||
|
|
||||||
|
|
@ -93,84 +88,7 @@ trait TransactionManagement {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Local transaction management, local in the context of threads.
|
|
||||||
* Use this if you do <b>not</b> need to have one transaction span
|
|
||||||
* multiple threads (or Actors).
|
|
||||||
* <p/>
|
|
||||||
* Example of atomic transaction management using the atomic block.
|
|
||||||
* <p/>
|
|
||||||
* <pre>
|
|
||||||
* import se.scalablesolutions.akka.stm.local._
|
|
||||||
*
|
|
||||||
* atomic {
|
|
||||||
* // do something within a transaction
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
class LocalStm extends TransactionManagement with Logging {
|
|
||||||
|
|
||||||
val DefaultLocalTransactionConfig = TransactionConfig()
|
|
||||||
val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
|
|
||||||
|
|
||||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
|
|
||||||
|
|
||||||
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
|
||||||
factory.boilerplate.execute(new TransactionalCallable[T]() {
|
|
||||||
def call(mtx: MultiverseTransaction): T = {
|
|
||||||
factory.addHooks
|
|
||||||
val result = body
|
|
||||||
log.trace("Committing local transaction [" + mtx + "]")
|
|
||||||
result
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Global transaction management, global in the context of multiple threads.
|
|
||||||
* Use this if you need to have one transaction span multiple threads (or Actors).
|
|
||||||
* <p/>
|
|
||||||
* Example of atomic transaction management using the atomic block:
|
|
||||||
* <p/>
|
|
||||||
* <pre>
|
|
||||||
* import se.scalablesolutions.akka.stm.global._
|
|
||||||
*
|
|
||||||
* atomic {
|
|
||||||
* // do something within a transaction
|
|
||||||
* }
|
|
||||||
* </pre>
|
|
||||||
*/
|
|
||||||
class GlobalStm extends TransactionManagement with Logging {
|
|
||||||
|
|
||||||
val DefaultGlobalTransactionConfig = TransactionConfig()
|
|
||||||
val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
|
|
||||||
|
|
||||||
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
|
|
||||||
|
|
||||||
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
|
||||||
factory.boilerplate.execute(new TransactionalCallable[T]() {
|
|
||||||
def call(mtx: MultiverseTransaction): T = {
|
|
||||||
if (!isTransactionSetInScope) createNewTransactionSet
|
|
||||||
factory.addHooks
|
|
||||||
val result = body
|
|
||||||
val txSet = getTransactionSetInScope
|
|
||||||
log.trace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]")
|
|
||||||
try {
|
|
||||||
txSet.tryJoinCommit(
|
|
||||||
mtx,
|
|
||||||
TransactionConfig.DefaultTimeout.length,
|
|
||||||
TransactionConfig.DefaultTimeout.unit)
|
|
||||||
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
|
|
||||||
} catch { case e: IllegalStateException => {} }
|
|
||||||
result
|
|
||||||
}
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
trait StmUtil {
|
trait StmUtil {
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Schedule a deferred task on the thread local transaction (use within an atomic).
|
* Schedule a deferred task on the thread local transaction (use within an atomic).
|
||||||
* This is executed when the transaction commits.
|
* This is executed when the transaction commits.
|
||||||
|
|
@ -209,3 +127,14 @@ trait StmUtil {
|
||||||
}.execute()
|
}.execute()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
trait StmCommon {
|
||||||
|
type TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
|
||||||
|
val TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
|
||||||
|
|
||||||
|
type TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
|
||||||
|
val TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
|
||||||
|
|
||||||
|
type Ref[T] = se.scalablesolutions.akka.stm.Ref[T]
|
||||||
|
val Ref = se.scalablesolutions.akka.stm.Ref
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,9 @@ import se.scalablesolutions.akka.util.UUID
|
||||||
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction
|
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction
|
||||||
|
|
||||||
object TransactionalMap {
|
object TransactionalMap {
|
||||||
def apply[K, V]() = new TransactionalMap[K, V]
|
def apply[K, V]() = new TransactionalMap[K, V]()
|
||||||
|
|
||||||
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashMap(pairs: _*)))
|
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(HashMap(pairs: _*))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -21,12 +21,12 @@ object TransactionalMap {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class TransactionalMap[K, V](initialOpt: Option[HashMap[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
|
class TransactionalMap[K, V](initialValue: HashMap[K, V]) extends Transactional with scala.collection.mutable.Map[K, V] {
|
||||||
def this() = this(None) // Java compatibility
|
def this() = this(HashMap[K, V]())
|
||||||
|
|
||||||
val uuid = UUID.newUuid.toString
|
val uuid = UUID.newUuid.toString
|
||||||
|
|
||||||
protected[this] val ref = new Ref(initialOpt.orElse(Some(HashMap[K, V]())))
|
private[this] val ref = Ref(initialValue)
|
||||||
|
|
||||||
def -=(key: K) = {
|
def -=(key: K) = {
|
||||||
remove(key)
|
remove(key)
|
||||||
|
|
|
||||||
|
|
@ -11,9 +11,9 @@ import se.scalablesolutions.akka.util.UUID
|
||||||
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction
|
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction
|
||||||
|
|
||||||
object TransactionalVector {
|
object TransactionalVector {
|
||||||
def apply[T]() = new TransactionalVector[T]
|
def apply[T]() = new TransactionalVector[T]()
|
||||||
|
|
||||||
def apply[T](elems: T*) = new TransactionalVector(Some(Vector(elems: _*)))
|
def apply[T](elems: T*) = new TransactionalVector(Vector(elems: _*))
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -21,12 +21,12 @@ object TransactionalVector {
|
||||||
*
|
*
|
||||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||||
*/
|
*/
|
||||||
class TransactionalVector[T](initialOpt: Option[Vector[T]] = None) extends Transactional with IndexedSeq[T] {
|
class TransactionalVector[T](initialValue: Vector[T]) extends Transactional with IndexedSeq[T] {
|
||||||
def this() = this(None) // Java compatibility
|
def this() = this(Vector[T]())
|
||||||
|
|
||||||
val uuid = UUID.newUuid.toString
|
val uuid = UUID.newUuid.toString
|
||||||
|
|
||||||
private[this] val ref = new Ref(initialOpt.orElse(Some(Vector[T]())))
|
private[this] val ref = Ref(initialValue)
|
||||||
|
|
||||||
def clear = ref.swap(Vector[T]())
|
def clear = ref.swap(Vector[T]())
|
||||||
|
|
||||||
|
|
|
||||||
41
akka-core/src/main/scala/stm/global/Atomic.scala
Normal file
41
akka-core/src/main/scala/stm/global/Atomic.scala
Normal file
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm.global
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java-friendly atomic blocks.
|
||||||
|
* <p/>
|
||||||
|
* Example usage (in Java):
|
||||||
|
* <p/>
|
||||||
|
* <pre>
|
||||||
|
* import se.scalablesolutions.akka.stm.*;
|
||||||
|
* import se.scalablesolutions.akka.stm.global.Atomic;
|
||||||
|
*
|
||||||
|
* final Ref<Integer> ref = new Ref<Integer>(0);
|
||||||
|
*
|
||||||
|
* new Atomic() {
|
||||||
|
* public Object atomically() {
|
||||||
|
* return ref.set(1);
|
||||||
|
* }
|
||||||
|
* }.execute();
|
||||||
|
*
|
||||||
|
* // To configure transactions pass a TransactionFactory
|
||||||
|
*
|
||||||
|
* TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||||
|
* .setReadonly(true)
|
||||||
|
* .build();
|
||||||
|
*
|
||||||
|
* Integer value = new Atomic<Integer>(txFactory) {
|
||||||
|
* public Integer atomically() {
|
||||||
|
* return ref.get();
|
||||||
|
* }
|
||||||
|
* }.execute();
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
abstract class Atomic[T](factory: TransactionFactory) {
|
||||||
|
def this() = this(DefaultGlobalTransactionFactory)
|
||||||
|
def atomically: T
|
||||||
|
def execute: T = atomic(factory)(atomically)
|
||||||
|
}
|
||||||
53
akka-core/src/main/scala/stm/global/GlobalStm.scala
Normal file
53
akka-core/src/main/scala/stm/global/GlobalStm.scala
Normal file
|
|
@ -0,0 +1,53 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.util.Logging
|
||||||
|
|
||||||
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||||
|
import org.multiverse.templates.TransactionalCallable
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Global transaction management, global in the context of multiple threads.
|
||||||
|
* Use this if you need to have one transaction span multiple threads (or Actors).
|
||||||
|
* <p/>
|
||||||
|
* Example of atomic transaction management using the atomic block:
|
||||||
|
* <p/>
|
||||||
|
* <pre>
|
||||||
|
* import se.scalablesolutions.akka.stm.global._
|
||||||
|
*
|
||||||
|
* atomic {
|
||||||
|
* // do something within a transaction
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
class GlobalStm extends TransactionManagement with Logging {
|
||||||
|
|
||||||
|
val DefaultGlobalTransactionConfig = TransactionConfig()
|
||||||
|
val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
|
||||||
|
|
||||||
|
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
|
||||||
|
|
||||||
|
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
||||||
|
factory.boilerplate.execute(new TransactionalCallable[T]() {
|
||||||
|
def call(mtx: MultiverseTransaction): T = {
|
||||||
|
if (!isTransactionSetInScope) createNewTransactionSet
|
||||||
|
factory.addHooks
|
||||||
|
val result = body
|
||||||
|
val txSet = getTransactionSetInScope
|
||||||
|
log.trace("Committing global transaction [" + mtx + "]\n\tand joining transaction set [" + txSet + "]")
|
||||||
|
try {
|
||||||
|
txSet.tryJoinCommit(
|
||||||
|
mtx,
|
||||||
|
TransactionConfig.DefaultTimeout.length,
|
||||||
|
TransactionConfig.DefaultTimeout.unit)
|
||||||
|
// Need to catch IllegalStateException until we have fix in Multiverse, since it throws it by mistake
|
||||||
|
} catch { case e: IllegalStateException => {} }
|
||||||
|
result
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
10
akka-core/src/main/scala/stm/global/package.scala
Normal file
10
akka-core/src/main/scala/stm/global/package.scala
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For easily importing global STM.
|
||||||
|
*/
|
||||||
|
package object global extends GlobalStm with StmUtil with StmCommon
|
||||||
41
akka-core/src/main/scala/stm/local/Atomic.scala
Normal file
41
akka-core/src/main/scala/stm/local/Atomic.scala
Normal file
|
|
@ -0,0 +1,41 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm.local
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Java-friendly atomic blocks.
|
||||||
|
* <p/>
|
||||||
|
* Example usage (in Java):
|
||||||
|
* <p/>
|
||||||
|
* <pre>
|
||||||
|
* import se.scalablesolutions.akka.stm.*;
|
||||||
|
* import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
*
|
||||||
|
* final Ref<Integer> ref = new Ref<Integer>(0);
|
||||||
|
*
|
||||||
|
* new Atomic() {
|
||||||
|
* public Object atomically() {
|
||||||
|
* return ref.set(1);
|
||||||
|
* }
|
||||||
|
* }.execute();
|
||||||
|
*
|
||||||
|
* // To configure transactions pass a TransactionFactory
|
||||||
|
*
|
||||||
|
* TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||||
|
* .setReadonly(true)
|
||||||
|
* .build();
|
||||||
|
*
|
||||||
|
* Integer value = new Atomic<Integer>(txFactory) {
|
||||||
|
* public Integer atomically() {
|
||||||
|
* return ref.get();
|
||||||
|
* }
|
||||||
|
* }.execute();
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
abstract class Atomic[T](factory: TransactionFactory) {
|
||||||
|
def this() = this(DefaultLocalTransactionFactory)
|
||||||
|
def atomically: T
|
||||||
|
def execute: T = atomic(factory)(atomically)
|
||||||
|
}
|
||||||
44
akka-core/src/main/scala/stm/local/LocalStm.scala
Normal file
44
akka-core/src/main/scala/stm/local/LocalStm.scala
Normal file
|
|
@ -0,0 +1,44 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.util.Logging
|
||||||
|
|
||||||
|
import org.multiverse.api.{Transaction => MultiverseTransaction}
|
||||||
|
import org.multiverse.templates.TransactionalCallable
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Local transaction management, local in the context of threads.
|
||||||
|
* Use this if you do <b>not</b> need to have one transaction span
|
||||||
|
* multiple threads (or Actors).
|
||||||
|
* <p/>
|
||||||
|
* Example of atomic transaction management using the atomic block.
|
||||||
|
* <p/>
|
||||||
|
* <pre>
|
||||||
|
* import se.scalablesolutions.akka.stm.local._
|
||||||
|
*
|
||||||
|
* atomic {
|
||||||
|
* // do something within a transaction
|
||||||
|
* }
|
||||||
|
* </pre>
|
||||||
|
*/
|
||||||
|
class LocalStm extends TransactionManagement with Logging {
|
||||||
|
|
||||||
|
val DefaultLocalTransactionConfig = TransactionConfig()
|
||||||
|
val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
|
||||||
|
|
||||||
|
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
|
||||||
|
|
||||||
|
def atomic[T](factory: TransactionFactory)(body: => T): T = {
|
||||||
|
factory.boilerplate.execute(new TransactionalCallable[T]() {
|
||||||
|
def call(mtx: MultiverseTransaction): T = {
|
||||||
|
factory.addHooks
|
||||||
|
val result = body
|
||||||
|
log.trace("Committing local transaction [" + mtx + "]")
|
||||||
|
result
|
||||||
|
}
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
10
akka-core/src/main/scala/stm/local/package.scala
Normal file
10
akka-core/src/main/scala/stm/local/package.scala
Normal file
|
|
@ -0,0 +1,10 @@
|
||||||
|
/**
|
||||||
|
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
|
||||||
|
*/
|
||||||
|
|
||||||
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
|
/**
|
||||||
|
* For easily importing local STM.
|
||||||
|
*/
|
||||||
|
package object local extends LocalStm with StmUtil with StmCommon
|
||||||
|
|
@ -5,28 +5,7 @@
|
||||||
package se.scalablesolutions.akka.stm
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* For importing 'local' STM.
|
* For importing the transactional datastructures, including the primitive refs
|
||||||
*/
|
|
||||||
package object local extends LocalStm with StmUtil with StmCommon
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For importing 'global' STM.
|
|
||||||
*/
|
|
||||||
package object global extends GlobalStm with StmUtil with StmCommon
|
|
||||||
|
|
||||||
trait StmCommon {
|
|
||||||
type TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
|
|
||||||
val TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
|
|
||||||
|
|
||||||
type TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
|
|
||||||
val TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
|
|
||||||
|
|
||||||
type Ref[T] = se.scalablesolutions.akka.stm.Ref[T]
|
|
||||||
val Ref = se.scalablesolutions.akka.stm.Ref
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* For importing the transactional data structures, including the primitive refs
|
|
||||||
* and transactional data structures from Multiverse.
|
* and transactional data structures from Multiverse.
|
||||||
*/
|
*/
|
||||||
package object transactional {
|
package object transactional {
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
public class Address {
|
||||||
|
private String location;
|
||||||
|
|
||||||
|
public Address(String location) {
|
||||||
|
this.location = location;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String toString() {
|
||||||
|
return "Address(" + location + ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,26 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.Ref;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
public class CounterExample {
|
||||||
|
final static Ref<Integer> ref = new Ref<Integer>(0);
|
||||||
|
|
||||||
|
public static int counter() {
|
||||||
|
return new Atomic<Integer>() {
|
||||||
|
public Integer atomically() {
|
||||||
|
int inc = ref.get() + 1;
|
||||||
|
ref.set(inc);
|
||||||
|
return inc;
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("Counter example");
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("counter 1: " + counter());
|
||||||
|
System.out.println("counter 2: " + counter());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,91 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import static org.junit.Assert.*;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.junit.Before;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.*;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
import org.multiverse.api.ThreadLocalTransaction;
|
||||||
|
import org.multiverse.api.TransactionConfiguration;
|
||||||
|
import org.multiverse.api.exceptions.ReadonlyException;
|
||||||
|
|
||||||
|
public class JavaStmTests {
|
||||||
|
|
||||||
|
private Ref<Integer> ref;
|
||||||
|
|
||||||
|
private int getRefValue() {
|
||||||
|
return new Atomic<Integer>() {
|
||||||
|
public Integer atomically() {
|
||||||
|
return ref.get();
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
public int increment() {
|
||||||
|
return new Atomic<Integer>() {
|
||||||
|
public Integer atomically() {
|
||||||
|
int inc = ref.get() + 1;
|
||||||
|
ref.set(inc);
|
||||||
|
return inc;
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Before public void initialise() {
|
||||||
|
ref = new Ref<Integer>(0);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void incrementRef() {
|
||||||
|
assertEquals(0, getRefValue());
|
||||||
|
increment();
|
||||||
|
increment();
|
||||||
|
increment();
|
||||||
|
assertEquals(3, getRefValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void failSetRef() {
|
||||||
|
assertEquals(0, getRefValue());
|
||||||
|
try {
|
||||||
|
new Atomic() {
|
||||||
|
public Object atomically() {
|
||||||
|
ref.set(3);
|
||||||
|
throw new RuntimeException();
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
} catch(RuntimeException e) {}
|
||||||
|
assertEquals(0, getRefValue());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test public void configureTransaction() {
|
||||||
|
TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||||
|
.setFamilyName("example")
|
||||||
|
.setReadonly(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
// get transaction config from multiverse
|
||||||
|
TransactionConfiguration config = new Atomic<TransactionConfiguration>(txFactory) {
|
||||||
|
public TransactionConfiguration atomically() {
|
||||||
|
ref.get();
|
||||||
|
return ThreadLocalTransaction.getThreadLocalTransaction().getConfiguration();
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
assertEquals("example", config.getFamilyName());
|
||||||
|
assertEquals(true, config.isReadonly());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected=ReadonlyException.class) public void failReadonlyTransaction() {
|
||||||
|
TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||||
|
.setFamilyName("example")
|
||||||
|
.setReadonly(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
new Atomic(txFactory) {
|
||||||
|
public Object atomically() {
|
||||||
|
return ref.set(3);
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,36 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.Ref;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
public class RefExample {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("Ref example");
|
||||||
|
System.out.println();
|
||||||
|
|
||||||
|
final Ref<Integer> ref = new Ref<Integer>(0);
|
||||||
|
|
||||||
|
Integer value1 = new Atomic<Integer>() {
|
||||||
|
public Integer atomically() {
|
||||||
|
return ref.get();
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
System.out.println("value 1: " + value1);
|
||||||
|
|
||||||
|
new Atomic() {
|
||||||
|
public Object atomically() {
|
||||||
|
return ref.set(5);
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
Integer value2 = new Atomic<Integer>() {
|
||||||
|
public Integer atomically() {
|
||||||
|
return ref.get();
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
System.out.println("value 2: " + value2);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,18 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.Ref;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
public class StmExamples {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("STM examples");
|
||||||
|
System.out.println();
|
||||||
|
|
||||||
|
CounterExample.main(args);
|
||||||
|
RefExample.main(args);
|
||||||
|
TransactionFactoryExample.main(args);
|
||||||
|
TransactionalMapExample.main(args);
|
||||||
|
TransactionalVectorExample.main(args);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,30 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.*;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
import org.multiverse.api.ThreadLocalTransaction;
|
||||||
|
import org.multiverse.api.TransactionConfiguration;
|
||||||
|
|
||||||
|
public class TransactionFactoryExample {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("TransactionFactory example");
|
||||||
|
System.out.println();
|
||||||
|
|
||||||
|
TransactionFactory txFactory = new TransactionFactoryBuilder()
|
||||||
|
.setFamilyName("example")
|
||||||
|
.setReadonly(true)
|
||||||
|
.build();
|
||||||
|
|
||||||
|
new Atomic(txFactory) {
|
||||||
|
public Object atomically() {
|
||||||
|
// check config has been passed to multiverse
|
||||||
|
TransactionConfiguration config = ThreadLocalTransaction.getThreadLocalTransaction().getConfiguration();
|
||||||
|
System.out.println("family name: " + config.getFamilyName());
|
||||||
|
System.out.println("readonly: " + config.isReadonly());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,35 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.*;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
public class TransactionalMapExample {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("TransactionalMap example");
|
||||||
|
System.out.println();
|
||||||
|
|
||||||
|
final TransactionalMap<String, User> users = new TransactionalMap<String, User>();
|
||||||
|
|
||||||
|
// fill users map (in a transaction)
|
||||||
|
new Atomic() {
|
||||||
|
public Object atomically() {
|
||||||
|
users.put("bill", new User("bill"));
|
||||||
|
users.put("mary", new User("mary"));
|
||||||
|
users.put("john", new User("john"));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
System.out.println("users: " + users);
|
||||||
|
|
||||||
|
// access users map (in a transaction)
|
||||||
|
User user = new Atomic<User>() {
|
||||||
|
public User atomically() {
|
||||||
|
return users.get("bill").get();
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
System.out.println("user: " + user);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,34 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
import se.scalablesolutions.akka.stm.*;
|
||||||
|
import se.scalablesolutions.akka.stm.local.Atomic;
|
||||||
|
|
||||||
|
public class TransactionalVectorExample {
|
||||||
|
public static void main(String[] args) {
|
||||||
|
System.out.println();
|
||||||
|
System.out.println("TransactionalVector example");
|
||||||
|
System.out.println();
|
||||||
|
|
||||||
|
final TransactionalVector<Address> addresses = new TransactionalVector<Address>();
|
||||||
|
|
||||||
|
// fill addresses vector (in a transaction)
|
||||||
|
new Atomic() {
|
||||||
|
public Object atomically() {
|
||||||
|
addresses.add(new Address("somewhere"));
|
||||||
|
addresses.add(new Address("somewhere else"));
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
System.out.println("addresses: " + addresses);
|
||||||
|
|
||||||
|
// access addresses vector (in a transaction)
|
||||||
|
Address address = new Atomic<Address>() {
|
||||||
|
public Address atomically() {
|
||||||
|
return addresses.get(0);
|
||||||
|
}
|
||||||
|
}.execute();
|
||||||
|
|
||||||
|
System.out.println("address: " + address);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
@ -0,0 +1,13 @@
|
||||||
|
package se.scalablesolutions.akka.stm;
|
||||||
|
|
||||||
|
public class User {
|
||||||
|
private String name;
|
||||||
|
|
||||||
|
public User(String name) {
|
||||||
|
this.name = name;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override public String toString() {
|
||||||
|
return "User(" + name + ")";
|
||||||
|
}
|
||||||
|
}
|
||||||
5
akka-core/src/test/scala/stm/JavaStmSpec.scala
Normal file
5
akka-core/src/test/scala/stm/JavaStmSpec.scala
Normal file
|
|
@ -0,0 +1,5 @@
|
||||||
|
package se.scalablesolutions.akka.stm
|
||||||
|
|
||||||
|
import org.scalatest.junit.JUnitWrapperSuite
|
||||||
|
|
||||||
|
class JavaStmSpec extends JUnitWrapperSuite("se.scalablesolutions.akka.stm.JavaStmTests", Thread.currentThread.getContextClassLoader)
|
||||||
|
|
@ -40,7 +40,7 @@ case class Cell(food: Int = 0, pher: Float = 0, ant: Option[Ant] = None, home: B
|
||||||
|
|
||||||
object EmptyCell extends Cell
|
object EmptyCell extends Cell
|
||||||
|
|
||||||
class Place(initCell: Cell = EmptyCell) extends Ref(Some(initCell)) {
|
class Place(initCell: Cell = EmptyCell) extends Ref(initCell) {
|
||||||
def cell: Cell = getOrElse(EmptyCell)
|
def cell: Cell = getOrElse(EmptyCell)
|
||||||
def food: Int = cell.food
|
def food: Int = cell.food
|
||||||
def food(i: Int) = alter(_.addFood(i))
|
def food(i: Int) = alter(_.addFood(i))
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue