Merge branch 'master' into 281-hseeberger

Conflicts:
	akka-core/src/test/scala/InMemoryActorSpec.scala
	akka-core/src/test/scala/StmSpec.scala
This commit is contained in:
Heiko Seeberger 2010-06-21 09:31:02 +02:00
commit a2dc2017d0
41 changed files with 997 additions and 1656 deletions

View file

@ -10,14 +10,14 @@ import se.scalablesolutions.akka.stm.*;
public class InMemStateful {
private TransactionalMap<String, String> mapState;
private TransactionalVector<String> vectorState;
private TransactionalRef<String> refState;
private Ref<String> refState;
private boolean isInitialized = false;
public void init() {
if (!isInitialized) {
mapState = TransactionalState.newMap();
vectorState = TransactionalState.newVector();
refState = TransactionalState.newRef();
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
}
}

View file

@ -8,14 +8,14 @@ import se.scalablesolutions.akka.stm.*;
public class InMemStatefulNested {
private TransactionalMap<String, String> mapState;
private TransactionalVector<String> vectorState;
private TransactionalRef<String> refState;
private Ref<String> refState;
private boolean isInitialized = false;
public void init() {
if (!isInitialized) {
mapState = TransactionalState.newMap();
vectorState = TransactionalState.newVector();
refState = TransactionalState.newRef();
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
}
}

View file

@ -66,7 +66,10 @@ trait CamelService extends Bootable with Logging {
*
* @see onLoad
*/
def load = onLoad
def load: CamelService = {
onLoad
this
}
/**
* Stops the CamelService.

View file

@ -133,6 +133,11 @@ trait Producer { this: Actor =>
}
}
/**
* Default implementation of Actor.receive
*/
protected def receive = produce
/**
* Creates a new in-only Exchange.
*/

View file

@ -11,7 +11,6 @@ import se.scalablesolutions.akka.actor.Actor._
object ProducerFeatureTest {
class TestProducer(uri: String) extends Actor with Producer {
def endpointUri = uri
def receive = produce
}
}

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.dispatch._
import se.scalablesolutions.akka.config.Config.config
import se.scalablesolutions.akka.config.{AllForOneStrategy, OneForOneStrategy, FaultHandlingStrategy}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.stm.TransactionManagement._
import se.scalablesolutions.akka.stm.TransactionManagement
import se.scalablesolutions.akka.remote.protocol.RemoteProtocol._
@ -283,7 +283,17 @@ trait ActorRef extends TransactionManagement {
*/
@volatile protected[akka] var isTransactor = false
/**v
/**
* Configuration for TransactionFactory. User overridable.
*/
protected[akka] var _transactionConfig: TransactionConfig = DefaultGlobalTransactionConfig
/**
* TransactionFactory to be used for atomic when isTransactor. Configuration is overridable.
*/
private[akka] var _transactionFactory: Option[TransactionFactory] = None
/**
* This lock ensures thread safety in the dispatching: only one message can
* be dispatched at once on the actor.
*/
@ -493,13 +503,19 @@ trait ActorRef extends TransactionManagement {
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired: Unit
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
*/
def transactionConfig_=(config: TransactionConfig): Unit
/**
* Get the transaction configuration for this actor.
*/
def transactionConfig: TransactionConfig
/**
* Returns the home address and port for this actor.
*/
@ -859,10 +875,6 @@ sealed class LocalActorRef private[akka](
/**
* Invoking 'makeTransactionRequired' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactionRequired = guard.withGuard {
if (!isRunning || isBeingRestarted) isTransactor = true
@ -870,6 +882,20 @@ sealed class LocalActorRef private[akka](
"Can not make actor transaction required after it has been started")
}
/**
* Sets the transaction configuration for this actor. Needs to be invoked before the actor is started.
*/
def transactionConfig_=(config: TransactionConfig) = guard.withGuard {
if (!isRunning || isBeingRestarted) _transactionConfig = config
else throw new ActorInitializationException(
"Cannot set transaction configuration for actor after it has been started")
}
/**
* Get the transaction configuration for this actor.
*/
def transactionConfig: TransactionConfig = guard.withGuard { _transactionConfig }
/**
* Set the contact address for this actor. This is used for replying to messages
* sent asynchronously when no reply channel exists.
@ -891,6 +917,9 @@ sealed class LocalActorRef private[akka](
if (!isRunning) {
dispatcher.register(this)
dispatcher.start
if (isTransactor) {
_transactionFactory = Some(TransactionFactory(_transactionConfig, id))
}
_isRunning = true
if (!isInInitialization) initializeActorInstance
else runActorInitialization = true
@ -904,6 +933,7 @@ sealed class LocalActorRef private[akka](
def stop = guard.withGuard {
if (isRunning) {
dispatcher.unregister(this)
_transactionFactory = None
_isRunning = false
_isShutDown = true
actor.shutdown
@ -1168,8 +1198,7 @@ sealed class LocalActorRef private[akka](
sender = messageHandle.sender
senderFuture = messageHandle.senderFuture
try {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
dispatch(messageHandle)
} catch {
case e =>
Actor.log.error(e, "Could not invoke actor [%s]", this)
@ -1178,23 +1207,6 @@ sealed class LocalActorRef private[akka](
}
private def dispatch[T](messageHandle: MessageInvocation) = {
val message = messageHandle.message //serializeMessage(messageHandle.message)
setTransactionSet(messageHandle.transactionSet)
try {
actor.base(message)
} catch {
case e =>
_isBeingRestarted = true
Actor.log.error(e, "Could not invoke actor [%s]", toString)
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (_supervisor.isDefined) _supervisor.get ! Exit(this, e)
senderFuture.foreach(_.completeWithException(this, e))
} finally {
clearTransaction
}
}
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
val message = messageHandle.message //serializeMessage(messageHandle.message)
var topLevelTransaction = false
val txSet: Option[CountDownCommitBarrier] =
@ -1212,7 +1224,8 @@ sealed class LocalActorRef private[akka](
try {
if (isTransactor) {
atomic {
val txFactory = _transactionFactory.getOrElse(DefaultGlobalTransactionFactory)
atomic(txFactory) {
actor.base(message)
setTransactionSet(txSet) // restore transaction set to allow atomic block to do commit
}
@ -1452,6 +1465,8 @@ private[akka] case class RemoteActorRef private[akka] (
def dispatcher_=(md: MessageDispatcher): Unit = unsupported
def dispatcher: MessageDispatcher = unsupported
def makeTransactionRequired: Unit = unsupported
def transactionConfig_=(config: TransactionConfig): Unit = unsupported
def transactionConfig: TransactionConfig = unsupported
def makeRemote(hostname: String, port: Int): Unit = unsupported
def makeRemote(address: InetSocketAddress): Unit = unsupported
def homeAddress_=(address: InetSocketAddress): Unit = unsupported

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
package se.scalablesolutions.akka.dataflow
import java.util.concurrent.atomic.AtomicReference
import java.util.concurrent.{ConcurrentLinkedQueue, LinkedBlockingQueue}

View file

@ -1,364 +0,0 @@
/**
Copyright (c) 2007-2008, Rich Hickey
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
* Neither the name of Clojure nor the names of its contributors
may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
*/
package se.scalablesolutions.akka.stm
trait PersistentDataStructure
/**
* A clean-room port of Rich Hickey's persistent hash trie implementation from
* Clojure (http://clojure.org). Originally presented as a mutable structure in
* a paper by Phil Bagwell.
*
* @author Daniel Spiewak
* @author Rich Hickey
*/
@serializable
final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] with PersistentDataStructure {
override lazy val size = root.size
def this() = this(new EmptyNode[K])
def get(key: K) = root(key, key.hashCode)
override def +[A >: V](pair: (K, A)) = update(pair._1, pair._2)
override def update[A >: V](key: K, value: A) = new HashTrie(root(0, key, key.hashCode) = value)
def -(key: K) = new HashTrie(root.remove(key, key.hashCode))
def iterator = root.elements
def empty[A]: HashTrie[K, A] = new HashTrie(new EmptyNode[K])
def diagnose = root.toString
}
object HashTrie {
def apply[K, V](pairs: (K, V)*) = pairs.foldLeft(new HashTrie[K, V]) { _ + _ }
def unapplySeq[K, V](map: HashTrie[K, V]) = map.toSeq
}
// ============================================================================
// nodes
@serializable
private[stm] sealed trait Node[K, +V] {
val size: Int
def apply(key: K, hash: Int): Option[V]
def update[A >: V](shift: Int, key: K, hash: Int, value: A): Node[K, A]
def remove(key: K, hash: Int): Node[K, V]
def elements: Iterator[(K, V)]
}
@serializable
private[stm] class EmptyNode[K] extends Node[K, Nothing] {
val size = 0
def apply(key: K, hash: Int) = None
def update[V](shift: Int, key: K, hash: Int, value: V) = new LeafNode(key, hash, value)
def remove(key: K, hash: Int) = this
lazy val elements = new Iterator[(K, Nothing)] {
val hasNext = false
val next = null
}
}
private[stm] abstract class SingleNode[K, +V] extends Node[K, V] {
val hash: Int
}
private[stm] class LeafNode[K, +V](key: K, val hash: Int, value: V) extends SingleNode[K, V] {
val size = 1
def apply(key: K, hash: Int) = if (this.key == key) Some(value) else None
def update[A >: V](shift: Int, key: K, hash: Int, value: A) = {
if (this.key == key) {
if (this.value == value) this else new LeafNode(key, hash, value)
} else if (this.hash == hash) {
new CollisionNode(hash, this.key -> this.value, key -> value)
} else {
BitmappedNode(shift)(this, key, hash, value)
}
}
def remove(key: K, hash: Int) = if (this.key == key) new EmptyNode[K] else this
def elements = new Iterator[(K, V)] {
var hasNext = true
def next = {
hasNext = false
(key, value)
}
}
override def toString = "LeafNode(" + key + " -> " + value + ")"
}
private[stm] class CollisionNode[K, +V](val hash: Int, bucket: List[(K, V)]) extends SingleNode[K, V] {
lazy val size = bucket.length
def this(hash: Int, pairs: (K, V)*) = this(hash, pairs.toList)
def apply(key: K, hash: Int) = {
for {
(_, v) <- bucket find { case (k, _) => k == key }
} yield v
}
override def update[A >: V](shift: Int, key: K, hash: Int, value: A): Node[K, A] = {
if (this.hash == hash) {
var found = false
val newBucket = for ((k, v) <- bucket) yield {
if (k == key) {
found = true
(key, value)
} else (k, v)
}
new CollisionNode(hash, if (found) newBucket else (key, value) :: bucket)
} else {
BitmappedNode(shift)(this, key, hash, value)
}
}
override def remove(key: K, hash: Int) = {
val newBucket = bucket filter { case (k, _) => k != key }
if (newBucket.length == bucket.length) this else {
if (newBucket.length == 1) {
val (key, value) = newBucket.head
new LeafNode(key, hash, value)
} else new CollisionNode(hash, newBucket)
}
}
def iterator = bucket.iterator
def elements = bucket.iterator
override def toString = "CollisionNode(" + bucket.toString + ")"
}
private[stm] class BitmappedNode[K, +V](shift: Int)(table: Array[Node[K, V]], bits: Int) extends Node[K, V] {
lazy val size = {
val sizes = for {
n <- table
if n != null
} yield n.size
sizes.foldLeft(0) { _ + _ }
}
def apply(key: K, hash: Int) = {
val i = (hash >>> shift) & 0x01f
val mask = 1 << i
if ((bits & mask) == mask) table(i)(key, hash) else None
}
override def update[A >: V](levelShift: Int, key: K, hash: Int, value: A): Node[K, A] = {
val i = (hash >>> shift) & 0x01f
val mask = 1 << i
if ((bits & mask) == mask) {
val node = (table(i)(shift + 5, key, hash) = value)
if (node == table(i)) this else {
val newTable = new Array[Node[K, A]](table.length)
Array.copy(table, 0, newTable, 0, table.length)
newTable(i) = node
new BitmappedNode(shift)(newTable, bits)
}
} else {
val newTable = new Array[Node[K, A]](math.max(table.length, i + 1))
Array.copy(table, 0, newTable, 0, table.length)
newTable(i) = new LeafNode(key, hash, value)
val newBits = bits | mask
if (newBits == ~0) {
new FullNode(shift)(newTable)
} else {
new BitmappedNode(shift)(newTable, newBits)
}
}
}
def remove(key: K, hash: Int) = {
val i = (hash >>> shift) & 0x01f
val mask = 1 << i
if ((bits & mask) == mask) {
val node = table(i).remove(key, hash)
if (node == table(i)) {
this
} else if (node.isInstanceOf[EmptyNode[_]]) {
if (size == 1) new EmptyNode[K] else {
val adjustedBits = bits ^ mask
val log = math.log(adjustedBits) / math.log(2)
if (log.toInt.toDouble == log) { // last one
table(log.toInt)
} else {
val newTable = new Array[Node[K, V]](table.length)
Array.copy(table, 0, newTable, 0, newTable.length)
newTable(i) = null
new BitmappedNode(shift)(newTable, adjustedBits)
}
}
} else {
val newTable = new Array[Node[K, V]](table.length)
Array.copy(table, 0, newTable, 0, table.length)
newTable(i) = node
new BitmappedNode(shift)(newTable, bits)
}
} else this
}
def elements = {
table.foldLeft(emptyElements) { (it, e) =>
if (e eq null) it else it ++ e.elements
}
}
override def toString = "BitmappedNode(" + size + "," + table.filter(_ ne null).toList.toString + ")"
private lazy val emptyElements: Iterator[(K, V)] = new Iterator[(K, V)] {
val hasNext = false
val next = null
}
}
private[stm] object BitmappedNode {
def apply[K, V](shift: Int)(node: SingleNode[K, V], key: K, hash: Int, value: V) = {
val table = new Array[Node[K, V]](math.max((hash >>> shift) & 0x01f, (node.hash >>> shift) & 0x01f) + 1)
val preBits = {
val i = (node.hash >>> shift) & 0x01f
table(i) = node
1 << i
}
val bits = {
val i = (hash >>> shift) & 0x01f
val mask = 1 << i
if ((preBits & mask) == mask) {
table(i) = (table(i)(shift + 5, key, hash) = value)
} else {
table(i) = new LeafNode(key, hash, value)
}
preBits | mask
}
new BitmappedNode(shift)(table, bits)
}
}
private[stm] class FullNode[K, +V](shift: Int)(table: Array[Node[K, V]]) extends Node[K, V] {
lazy val size = table.foldLeft(0) { _ + _.size }
def apply(key: K, hash: Int) = table((hash >>> shift) & 0x01f)(key, hash)
def update[A >: V](levelShift: Int, key: K, hash: Int, value: A) = {
val i = (hash >>> shift) & 0x01f
val node = (table(i)(shift + 5, key, hash) = value)
if (node == table(i)) this else {
val newTable = new Array[Node[K, A]](32)
Array.copy(table, 0, newTable, 0, 32)
newTable(i) = node
new FullNode(shift)(newTable)
}
}
def remove(key: K, hash: Int) = {
val i = (hash >>> shift) & 0x01f
val mask = 1 << i
val node = table(i).remove(key, hash)
if (node == table(i)) this else {
val newTable = new Array[Node[K, V]](32)
Array.copy(table, 0, newTable, 0, 32)
if (node.isInstanceOf[EmptyNode[_]]) {
newTable(i) = null
new BitmappedNode(shift)(newTable, ~mask)
} else {
newTable(i) = node
new FullNode(shift)(newTable)
}
}
}
def elements = table.foldLeft(emptyElements) { _ ++ _.elements }
override def toString = "FullNode(" + table.foldLeft("") { _.toString + ", " + _.toString } + ")"
private lazy val emptyElements: Iterator[(K, V)] = new Iterator[(K, V)] {
val hasNext = false
val next = null
}
}

View file

@ -0,0 +1,150 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.util.UUID
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
object RefFactory {
private val factory = getGlobalStmInstance.getProgrammaticRefFactoryBuilder.build
def createRef[T] = factory.atomicCreateRef[T]()
def createRef[T](value: T) = factory.atomicCreateRef(value)
}
/**
* Ref.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Ref {
def apply[T]() = new Ref[T]
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
/**
* An implicit conversion that converts a Ref to an Iterable value.
*/
implicit def ref2Iterable[T](ref: Ref[T]): Iterable[T] = ref.toList
}
/**
* Implements a transactional managed reference.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class Ref[T](initialOpt: Option[T] = None) extends Transactional {
self =>
def this() = this(None) // Java compatibility
import org.multiverse.api.ThreadLocalTransaction._
val uuid = UUID.newUuid.toString
private[this] val ref = {
if (initialOpt.isDefined) RefFactory.createRef(initialOpt.get)
else RefFactory.createRef[T]
}
def swap(elem: T) = {
ensureIsInTransaction
ref.set(elem)
}
def alter(f: T => T): T = {
ensureIsInTransaction
ensureNotNull
ref.set(f(ref.get))
ref.get
}
def get: Option[T] = {
ensureIsInTransaction
if (ref.isNull) None
else Some(ref.get)
}
def getOrWait: T = {
ensureIsInTransaction
ref.getOrAwait
}
def getOrElse(default: => T): T = {
ensureIsInTransaction
if (ref.isNull) default
else ref.get
}
def isDefined: Boolean = {
ensureIsInTransaction
!ref.isNull
}
def isEmpty: Boolean = {
ensureIsInTransaction
ref.isNull
}
def map[B](f: T => B): Ref[B] = {
ensureIsInTransaction
if (isEmpty) Ref[B] else Ref(f(ref.get))
}
def flatMap[B](f: T => Ref[B]): Ref[B] = {
ensureIsInTransaction
if (isEmpty) Ref[B] else f(ref.get)
}
def filter(p: T => Boolean): Ref[T] = {
ensureIsInTransaction
if (isDefined && p(ref.get)) Ref(ref.get) else Ref[T]
}
/**
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
*/
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
class WithFilter(p: T => Boolean) {
def map[B](f: T => B): Ref[B] = self filter p map f
def flatMap[B](f: T => Ref[B]): Ref[B] = self filter p flatMap f
def foreach[U](f: T => U): Unit = self filter p foreach f
def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
}
def foreach[U](f: T => U): Unit = {
ensureIsInTransaction
if (isDefined) f(ref.get)
}
def elements: Iterator[T] = {
ensureIsInTransaction
if (isEmpty) Iterator.empty else Iterator(ref.get)
}
def toList: List[T] = {
ensureIsInTransaction
if (isEmpty) List() else List(ref.get)
}
def toRight[X](left: => X) = {
ensureIsInTransaction
if (isEmpty) Left(left) else Right(ref.get)
}
def toLeft[X](right: => X) = {
ensureIsInTransaction
if (isEmpty) Right(right) else Left(ref.get)
}
private def ensureIsInTransaction =
if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
private def ensureNotNull =
if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")
}

View file

@ -1,58 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
/**
* Reference that can hold either a typed value or an exception.
*
* Usage:
* <pre>
* scala> ResultOrFailure(1)
* res0: ResultOrFailure[Int] = ResultOrFailure@a96606
*
* scala> res0()
* res1: Int = 1
*
* scala> res0() = 3
*
* scala> res0()
* res3: Int = 3
*
* scala> res0() = { println("Hello world"); 3}
* Hello world
*
* scala> res0()
* res5: Int = 3
*
* scala> res0() = error("Lets see what happens here...")
*
* scala> res0()
* java.lang.RuntimeException: Lets see what happens here...
* at ResultOrFailure.apply(RefExcept.scala:11)
* at .<init>(<console>:6)
* at .<clinit>(<console>)
* at Re...
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ResultOrFailure[Payload](payload: Payload, val tx: Option[Transaction]) {
private[this] var contents: Either[Throwable, Payload] = Right(payload)
def update(value: => Payload) = {
contents = try { Right(value) } catch { case (e : Throwable) => Left(e) }
}
def apply() = contents match {
case Right(payload) => payload
case Left(e) => throw e
}
override def toString(): String = "ResultOrFailure[" + contents + "]"
}
object ResultOrFailure {
def apply[Payload](payload: Payload, tx: Option[Transaction]) = new ResultOrFailure(payload, tx)
def apply[AnyRef](tx: Option[Transaction]) = new ResultOrFailure(new Object, tx)
}

View file

@ -6,7 +6,6 @@ package se.scalablesolutions.akka.stm
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.TimeUnit
import javax.transaction.{TransactionManager, UserTransaction, Status, TransactionSynchronizationRegistry}
@ -17,283 +16,51 @@ import se.scalablesolutions.akka.config.Config._
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.api.lifecycle.{TransactionLifecycleListener, TransactionLifecycleEvent}
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.templates.{TransactionTemplate, OrElseTemplate}
import org.multiverse.api.backoff.ExponentialBackoffPolicy
import org.multiverse.stms.alpha.AlphaStm
import org.multiverse.api.{TraceLevel => MultiverseTraceLevel}
class NoTransactionInScopeException extends RuntimeException
class TransactionRetryException(message: String) extends RuntimeException(message)
class StmConfigurationException(message: String) extends RuntimeException(message)
/**
* FIXDOC: document AtomicTemplate
* AtomicTemplate can be used to create atomic blocks from Java code.
* <pre>
* User newUser = new AtomicTemplate[User]() {
* User atomic() {
* ... // create user atomically
* return user;
* }
* }.execute();
* </pre>
*/
trait AtomicTemplate[T] {
def atomic: T
def execute: T = Transaction.Local.atomic {
atomic
}
}
object Transaction {
val idFactory = new AtomicLong(-1L)
/**
* Creates a STM atomic transaction and by-passes all transactions hooks
* such as persistence etc.
*
* Only for internal usage.
*/
private[akka] def atomic0[T](body: => T): T = new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
}.execute()
@deprecated("Use the se.scalablesolutions.akka.stm.local package object instead.")
object Local extends LocalStm
@deprecated("Use the se.scalablesolutions.akka.stm.global package object instead.")
object Global extends GlobalStm
object Util extends StmUtil
/**
* Module for "local" transaction management, local in the context of threads.
* You should only use these if you do <b>not</b> need to have one transaction span
* multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Local._
*
* atomic {
* .. // do something within a transaction
* }
* </pre>
*
* Example of atomically-orElse transaction management.
* Which is a good way to reduce contention and transaction collisions.
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Local._
*
* atomically {
* .. // try to do something
* } orElse {
* .. // if transaction clashes try do do something else to minimize contention
* }
* </pre>
*
* Example of atomic transaction management using for comprehensions (monadic):
*
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Local._
* for (tx <- Transaction.Local) {
* ... // do transactional stuff
* }
*
* val result = for (tx <- Transaction.Local) yield {
* ... // do transactional stuff yielding a result
* }
* </pre>
*
* Example of using Transaction and TransactionalRef in for comprehensions (monadic):
*
* <pre>
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
* for {
* tx <- Transaction.Local
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
* val result = for {
* tx <- Transaction.Local
* ref <- refs
* } yield {
* ... // use the ref inside a transaction, yield a result
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
* Attach an Akka-specific Transaction to the current Multiverse transaction.
* Must be called within a Multiverse transaction. Used by TransactionFactory.addHooks
*/
object Local extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction.Local class.
*/
def map[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Local class.
*/
def flatMap[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Local class.
*/
def foreach(f: => Unit): Unit = atomic {f}
/**
* See ScalaDoc on Transaction.Local class.
*/
def atomic[T](body: => T): T = {
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = body
override def onStart(mtx: MultiverseTransaction) = {
val tx = new Transaction
tx.transaction = Some(mtx)
setTransaction(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
case "postCommit" => tx.commit
case "postAbort" => tx.abort
case _ => {}
}
})
}
}.execute()
}
/**
* See ScalaDoc on Transaction.Local class.
*/
def atomically[A](firstBody: => A) = elseBody(firstBody)
/**
* Should only be used together with <code>atomically</code> to form atomically-orElse constructs.
* See ScalaDoc on class.
*/
def elseBody[A](firstBody: => A) = new {
def orElse(secondBody: => A) = new OrElseTemplate[A] {
def run(t: MultiverseTransaction) = firstBody
def orelserun(t: MultiverseTransaction) = secondBody
}.execute()
}
}
/**
* Module for "global" transaction management, global in the context of multiple threads.
* You have to use these if you do need to have one transaction span multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* Here are some examples (assuming implicit transaction family name in scope):
* <pre>
* import se.scalablesolutions.akka.stm.Transaction.Global._
*
* atomic {
* .. // do something within a transaction
* }
* </pre>
*
* Example of atomic transaction management using for comprehensions (monadic):
*
* <pre>
* import se.scalablesolutions.akka.stm.Transaction
* for (tx <- Transaction.Global) {
* ... // do transactional stuff
* }
*
* val result = for (tx <- Transaction.Global) yield {
* ... // do transactional stuff yielding a result
* }
* </pre>
*
* Example of using Transaction and TransactionalRef in for comprehensions (monadic):
*
* <pre>
* // For example, if you have a List with TransactionalRef
* val refs: List[TransactionalRef] = ...
*
* // You can use them together with Transaction in a for comprehension since
* // TransactionalRef is also monadic
* for {
* tx <- Transaction.Global
* ref <- refs
* } {
* ... // use the ref inside a transaction
* }
*
* val result = for {
* tx <- Transaction.Global
* ref <- refs
* } yield {
* ... // use the ref inside a transaction, yield a result
* }
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Global extends TransactionManagement with Logging {
/**
* See ScalaDoc on Transaction.Global class.
*/
def map[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Global class.
*/
def flatMap[T](f: => T): T = atomic {f}
/**
* See ScalaDoc on Transaction.Global class.
*/
def foreach(f: => Unit): Unit = atomic {f}
// FIXME tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
//getTransactionSetInScope.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
/**
* See ScalaDoc on Transaction.Global class.
*/
def atomic[T](body: => T): T = {
var isTopLevelTransaction = false
new TransactionTemplate[T]() {
def execute(mtx: MultiverseTransaction): T = {
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
txSet.joinCommit(mtx)
clearTransaction
result
}
override def onStart(mtx: MultiverseTransaction) = {
val txSet =
if (!isTransactionSetInScope) {
isTopLevelTransaction = true
createNewTransactionSet
} else getTransactionSetInScope
private[akka] def attach = {
val mtx = getRequiredThreadLocalTransaction
val tx = new Transaction
tx.begin
tx.transaction = Some(mtx)
setTransaction(Some(tx))
TransactionManagement.transaction.set(Some(tx))
mtx.registerLifecycleListener(new TransactionLifecycleListener() {
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event.name match {
case "postCommit" =>
log.trace("Committing transaction [%s]", mtx)
tx.commit
case "postAbort" =>
log.trace("Aborting transaction [%s]", mtx)
tx.abort
def notify(mtx: MultiverseTransaction, event: TransactionLifecycleEvent) = event match {
case TransactionLifecycleEvent.PostCommit => tx.commit
case TransactionLifecycleEvent.PostAbort => tx.abort
case _ => {}
}
})
}
}.execute()
}
/**
* Mapping to Multiverse TraceLevel.
*/
object TraceLevel {
val None = MultiverseTraceLevel.none
val Coarse = MultiverseTraceLevel.course // mispelling?
val Course = MultiverseTraceLevel.course
val Fine = MultiverseTraceLevel.fine
}
}
@ -412,3 +179,25 @@ object TransactionStatus {
case object Completed extends TransactionStatus
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable
trait Transactional {
val uuid: String
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Committable {
def commit: Unit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Abortable {
def abort: Unit
}

View file

@ -0,0 +1,187 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import java.lang.{Boolean => JBoolean}
import se.scalablesolutions.akka.config.Config._
import se.scalablesolutions.akka.util.Duration
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
import org.multiverse.stms.alpha.AlphaStm
import org.multiverse.templates.TransactionBoilerplate
import org.multiverse.api.TraceLevel
/**
* For configuring multiverse transactions.
*/
object TransactionConfig {
// note: null values are so that we can default to Multiverse inference when not set
val FAMILY_NAME = "DefaultTransaction"
val READONLY = null.asInstanceOf[JBoolean]
val MAX_RETRIES = config.getInt("akka.stm.max-retries", 1000)
val TIMEOUT = config.getLong("akka.stm.timeout", Long.MaxValue)
val TIME_UNIT = config.getString("akka.stm.time-unit", "seconds")
val TRACK_READS = null.asInstanceOf[JBoolean]
val WRITE_SKEW = config.getBool("akka.stm.write-skew", true)
val EXPLICIT_RETRIES = config.getBool("akka.stm.explicit-retries", false)
val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", false)
val SPECULATIVE = config.getBool("akka.stm.speculative", true)
val QUICK_RELEASE = config.getBool("akka.stm.quick-release", true)
val TRACE_LEVEL = traceLevel(config.getString("akka.stm.trace-level", "none"))
val HOOKS = config.getBool("akka.stm.hooks", true)
val DefaultTimeout = Duration(TIMEOUT, TIME_UNIT)
def traceLevel(level: String) = level.toLowerCase match {
case "coarse" | "course" => Transaction.TraceLevel.Coarse
case "fine" => Transaction.TraceLevel.Fine
case _ => Transaction.TraceLevel.None
}
/**
* For configuring multiverse transactions.
*
* @param familyName Family name for transactions. Useful for debugging.
* @param readonly Sets transaction as readonly. Readonly transactions are cheaper.
* @param maxRetries The maximum number of times a transaction will retry.
* @param timeout The maximum time a transaction will block for.
* @param trackReads Whether all reads should be tracked. Needed for blocking operations.
* @param writeSkew Whether writeskew is allowed. Disable with care.
* @param explicitRetries Whether explicit retries are allowed.
* @param interruptible Whether a blocking transaction can be interrupted.
* @param speculative Whether speculative configuration should be enabled.
* @param quickRelease Whether locks should be released as quickly as possible (before whole commit).
* @param traceLevel Transaction trace level.
* @param hooks Whether hooks for persistence modules and JTA should be added to the transaction.
*/
def apply(familyName: String = FAMILY_NAME,
readonly: JBoolean = READONLY,
maxRetries: Int = MAX_RETRIES,
timeout: Duration = DefaultTimeout,
trackReads: JBoolean = TRACK_READS,
writeSkew: Boolean = WRITE_SKEW,
explicitRetries: Boolean = EXPLICIT_RETRIES,
interruptible: Boolean = INTERRUPTIBLE,
speculative: Boolean = SPECULATIVE,
quickRelease: Boolean = QUICK_RELEASE,
traceLevel: TraceLevel = TRACE_LEVEL,
hooks: Boolean = HOOKS) = {
new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
}
}
/**
* For configuring multiverse transactions.
*
* <p>familyName - Family name for transactions. Useful for debugging.
* <p>readonly - Sets transaction as readonly. Readonly transactions are cheaper.
* <p>maxRetries - The maximum number of times a transaction will retry.
* <p>timeout - The maximum time a transaction will block for.
* <p>trackReads - Whether all reads should be tracked. Needed for blocking operations.
* <p>writeSkew - Whether writeskew is allowed. Disable with care.
* <p>explicitRetries - Whether explicit retries are allowed.
* <p>interruptible - Whether a blocking transaction can be interrupted.
* <p>speculative - Whether speculative configuration should be enabled.
* <p>quickRelease - Whether locks should be released as quickly as possible (before whole commit).
* <p>traceLevel - Transaction trace level.
* <p>hooks - Whether hooks for persistence modules and JTA should be added to the transaction.
*/
class TransactionConfig(val familyName: String = TransactionConfig.FAMILY_NAME,
val readonly: JBoolean = TransactionConfig.READONLY,
val maxRetries: Int = TransactionConfig.MAX_RETRIES,
val timeout: Duration = TransactionConfig.DefaultTimeout,
val trackReads: JBoolean = TransactionConfig.TRACK_READS,
val writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
val explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
val interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
val speculative: Boolean = TransactionConfig.SPECULATIVE,
val quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
val traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
val hooks: Boolean = TransactionConfig.HOOKS)
object DefaultTransactionConfig extends TransactionConfig
/**
* Wrapper for transaction config, factory, and boilerplate. Used by atomic.
*/
object TransactionFactory {
def apply(config: TransactionConfig) = new TransactionFactory(config)
def apply(config: TransactionConfig, defaultName: String) = new TransactionFactory(config, defaultName)
def apply(familyName: String = TransactionConfig.FAMILY_NAME,
readonly: JBoolean = TransactionConfig.READONLY,
maxRetries: Int = TransactionConfig.MAX_RETRIES,
timeout: Duration = TransactionConfig.DefaultTimeout,
trackReads: JBoolean = TransactionConfig.TRACK_READS,
writeSkew: Boolean = TransactionConfig.WRITE_SKEW,
explicitRetries: Boolean = TransactionConfig.EXPLICIT_RETRIES,
interruptible: Boolean = TransactionConfig.INTERRUPTIBLE,
speculative: Boolean = TransactionConfig.SPECULATIVE,
quickRelease: Boolean = TransactionConfig.QUICK_RELEASE,
traceLevel: TraceLevel = TransactionConfig.TRACE_LEVEL,
hooks: Boolean = TransactionConfig.HOOKS) = {
val config = new TransactionConfig(familyName, readonly, maxRetries, timeout, trackReads, writeSkew,
explicitRetries, interruptible, speculative, quickRelease, traceLevel, hooks)
new TransactionFactory(config)
}
}
/**
* Wrapper for transaction config, factory, and boilerplate. Used by atomic.
* Can be passed to atomic implicitly or explicitly.
* <p/>
* <pre>
* implicit val txFactory = TransactionFactory(readonly = true)
* ...
* atomic {
* // do something within a readonly transaction
* }
* </pre>
* <p/>
* Can be created at different levels as needed. For example: as an implicit object
* used throughout a package, as a static implicit val within a singleton object and
* imported where needed, or as an implicit val within each instance of a class.
* <p/>
* If no explicit transaction factory is passed to atomic and there is no implicit
* transaction factory in scope, then a default transaction factory is used.
*
* @see TransactionConfig for configuration options.
*/
class TransactionFactory(val config: TransactionConfig = DefaultTransactionConfig, defaultName: String = TransactionConfig.FAMILY_NAME) {
self =>
// use the config family name if it's been set, otherwise defaultName - used by actors to set class name as default
val familyName = if (config.familyName != TransactionConfig.FAMILY_NAME) config.familyName else defaultName
val factory = {
var builder = (getGlobalStmInstance().asInstanceOf[AlphaStm].getTransactionFactoryBuilder()
.setFamilyName(familyName)
.setMaxRetries(config.maxRetries)
.setTimeoutNs(config.timeout.toNanos)
.setWriteSkewAllowed(config.writeSkew)
.setExplicitRetryAllowed(config.explicitRetries)
.setInterruptible(config.interruptible)
.setSpeculativeConfigurationEnabled(config.speculative)
.setQuickReleaseEnabled(config.quickRelease)
.setTraceLevel(config.traceLevel))
if (config.readonly ne null) {
builder = builder.setReadonly(config.readonly.booleanValue)
} // otherwise default to Multiverse inference
if (config.trackReads ne null) {
builder = builder.setReadTrackingEnabled(config.trackReads.booleanValue)
} // otherwise default to Multiverse inference
builder.build()
}
val boilerplate = new TransactionBoilerplate(factory)
def addHooks = if (config.hooks) Transaction.attach
}

View file

@ -8,8 +8,11 @@ import se.scalablesolutions.akka.util.Logging
import java.util.concurrent.atomic.AtomicBoolean
import org.multiverse.api.{StmUtils => MultiverseStmUtils}
import org.multiverse.api.ThreadLocalTransaction._
import org.multiverse.api.{Transaction => MultiverseTransaction}
import org.multiverse.commitbarriers.CountDownCommitBarrier
import org.multiverse.templates.{TransactionalCallable, OrElseTemplate}
class StmException(msg: String) extends RuntimeException(msg)
@ -20,15 +23,8 @@ class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Tran
object TransactionManagement extends TransactionManagement {
import se.scalablesolutions.akka.config.Config._
val TRANSACTION_ENABLED = new AtomicBoolean(config.getBool("akka.stm.service", false))
// move to stm.global.fair?
val FAIR_TRANSACTIONS = config.getBool("akka.stm.fair", true)
val INTERRUPTIBLE = config.getBool("akka.stm.interruptible", true)
val MAX_NR_OF_RETRIES = config.getInt("akka.stm.max-nr-of-retries", 1000)
val TRANSACTION_TIMEOUT = config.getInt("akka.stm.timeout", 10000)
val SMART_TX_LENGTH_SELECTOR = config.getBool("akka.stm.smart-tx-length-selector", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false)
private[akka] val transactionSet = new ThreadLocal[Option[CountDownCommitBarrier]]() {
override protected def initialValue: Option[CountDownCommitBarrier] = None
@ -88,3 +84,105 @@ trait TransactionManagement {
(option ne null) && option.isDefined
}
}
/**
* Local transaction management, local in the context of threads.
* Use this if you do <b>not</b> need to have one transaction span
* multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block.
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.local._
*
* atomic {
* // do something within a transaction
* }
* </pre>
*/
class LocalStm extends TransactionManagement with Logging {
val DefaultLocalTransactionConfig = TransactionConfig()
val DefaultLocalTransactionFactory = TransactionFactory(DefaultLocalTransactionConfig, "DefaultLocalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultLocalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
factory.addHooks
body
}
})
}
}
/**
* Global transaction management, global in the context of multiple threads.
* Use this if you need to have one transaction span multiple threads (or Actors).
* <p/>
* Example of atomic transaction management using the atomic block:
* <p/>
* <pre>
* import se.scalablesolutions.akka.stm.global._
*
* atomic {
* // do something within a transaction
* }
* </pre>
*/
class GlobalStm extends TransactionManagement with Logging {
val DefaultGlobalTransactionConfig = TransactionConfig()
val DefaultGlobalTransactionFactory = TransactionFactory(DefaultGlobalTransactionConfig, "DefaultGlobalTransaction")
def atomic[T](body: => T)(implicit factory: TransactionFactory = DefaultGlobalTransactionFactory): T = atomic(factory)(body)
def atomic[T](factory: TransactionFactory)(body: => T): T = {
factory.boilerplate.execute(new TransactionalCallable[T]() {
def call(mtx: MultiverseTransaction): T = {
if (!isTransactionSetInScope) createNewTransactionSet
factory.addHooks
val result = body
val txSet = getTransactionSetInScope
log.trace("Committing transaction [%s]\n\tby joining transaction set [%s]", mtx, txSet)
// FIXME ? txSet.tryJoinCommit(mtx, TransactionManagement.TRANSACTION_TIMEOUT, TimeUnit.MILLISECONDS)
txSet.joinCommit(mtx)
clearTransaction
result
}
})
}
}
trait StmUtil {
/**
* Schedule a deferred task on the thread local transaction (use within an atomic).
* This is executed when the transaction commits.
*/
def deferred[T](body: => T): Unit = MultiverseStmUtils.scheduleDeferredTask(new Runnable { def run = body })
/**
* Schedule a compensating task on the thread local transaction (use within an atomic).
* This is executed when the transaction aborts.
*/
def compensating[T](body: => T): Unit = MultiverseStmUtils.scheduleCompensatingTask(new Runnable { def run = body })
/**
* STM retry for blocking transactions (use within an atomic).
* Can be used to wait for a condition.
*/
def retry = MultiverseStmUtils.retry
/**
* Use either-orElse to combine two blocking transactions.
*/
def either[T](firstBody: => T) = new {
def orElse(secondBody: => T) = new OrElseTemplate[T] {
def either(mtx: MultiverseTransaction) = firstBody
def orelse(mtx: MultiverseTransaction) = secondBody
}.execute()
}
}

View file

@ -1,105 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
/*
import kernel.util.Logging
import org.apache.zookeeper.jmx.ManagedUtil
import org.apache.zookeeper.server.persistence.FileTxnSnapLog
import org.apache.zookeeper.server.{ServerConfig, NIOServerCnxn}
import org.apache.zookeeper.{KeeperException, WatchedEvent, Watcher, ZooKeeper, DataMonitor}
*/
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*
class TransactionWatcher extends Logging with Watcher {
val SERVER_URL = "localhost"
val ZOO_KEEPER_URL = SERVER_URL
val ZOO_KEEPER_PORT = 2181
val znode = "master"
private[this] val db = new scala.collection.mutable.HashMap[String, String]
private[this] val zk = new ZooKeeper(ZOO_KEEPER_URL + ":" + ZOO_KEEPER_PORT, 3000, this)
private[this] val dm = new DataMonitor(zk, znode, null, this)
override def process(event: WatchedEvent) = {
log.debug("New ZooKeeper event: %s", event)
val path = event.getPath();
if (event.getType == Event.EventType.None) {
// We are are being told that the state of the connection has changed
event.getState match {
case SyncConnected =>
// In this particular example we don't need to do anything
// here - watches are automatically re-registered with
// server and any watches triggered while the client was
// disconnected will be delivered (in order of course)
case Expired =>
dead = true
listener.closing(KeeperException.Code.SessionExpired)
}
} else {
if (path != null && path.equals(znode)) {
// Something has changed on the node, let's find out
zk.exists(znode, true, this, null)
}
}
if (chainedWatcher ne null) chainedWatcher.process(event);
}
def run: Unit = synchronized {
try {
while (!dm.dead) wait
} catch {
case e: InterruptedException => Thread.currentThread.interrupt
}
}
def closing(rc: Int): Unit = synchronized { notifyAll() }
}
*/
object TransactionWatcher {
def main(args: Array[String]): Unit = {
println("Connecting to ZooKeeper...")
//new TransactionWatcher
}
}
// private[akka] def startZooKeeper = {
// try {
// ManagedUtil.registerLog4jMBeans
// ServerConfig.parse(args)
// } catch {
// case e: JMException => log.warning("Unable to register log4j JMX control: s%", e)
// case e => log.fatal("Error in ZooKeeper config: s%", e)
// }
// val factory = new ZooKeeperServer.Factory() {
// override def createConnectionFactory = new NIOServerCnxn.Factory(ServerConfig.getClientPort)
// override def createServer = {
// val server = new ZooKeeperServer
// val txLog = new FileTxnSnapLog(
// new File(ServerConfig.getDataLogDir),
// new File(ServerConfig.getDataDir))
// server.setTxnLogFactory(txLog)
// server
// }
// }
// try {
// val zooKeeper = factory.createServer
// zooKeeper.startup
// log.info("ZooKeeper started")
// // TODO: handle clean shutdown as below in separate thread
// // val cnxnFactory = serverFactory.createConnectionFactory
// // cnxnFactory.setZooKeeperServer(zooKeeper)
// // cnxnFactory.join
// // if (zooKeeper.isRunning) zooKeeper.shutdown
// } catch { case e => log.fatal("Unexpected exception: s%",e) }
// }

View file

@ -0,0 +1,84 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import scala.collection.immutable.HashMap
import se.scalablesolutions.akka.util.UUID
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction
object TransactionalMap {
def apply[K, V]() = new TransactionalMap[K, V]
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashMap(pairs: _*)))
}
/**
* Transactional map that implements the mutable map interface with an underlying ref and hash map.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalMap[K, V](initialOpt: Option[HashMap[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
def this() = this(None) // Java compatibility
val uuid = UUID.newUuid.toString
protected[this] val ref = new Ref(initialOpt.orElse(Some(HashMap[K, V]())))
def -=(key: K) = {
remove(key)
this
}
def +=(key: K, value: V) = put(key, value)
def +=(kv: (K, V)) = {
put(kv._1,kv._2)
this
}
override def remove(key: K) = {
val map = ref.get.get
val oldValue = map.get(key)
ref.swap(ref.get.get - key)
oldValue
}
def get(key: K): Option[V] = ref.get.get.get(key)
override def put(key: K, value: V): Option[V] = {
val map = ref.get.get
val oldValue = map.get(key)
ref.swap(map.updated(key, value))
oldValue
}
override def update(key: K, value: V) = {
val map = ref.get.get
val oldValue = map.get(key)
ref.swap(map.updated(key, value))
}
def iterator = ref.get.get.iterator
override def elements: Iterator[(K, V)] = ref.get.get.iterator
override def contains(key: K): Boolean = ref.get.get.contains(key)
override def clear = ref.swap(HashMap[K, V]())
override def size: Int = ref.get.get.size
override def hashCode: Int = System.identityHashCode(this);
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalMap[_, _]] &&
other.hashCode == hashCode
override def toString = if (outsideTransaction) "<TransactionalMap>" else super.toString
def outsideTransaction = getThreadLocalTransaction eq null
}

View file

@ -1,343 +0,0 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.util.UUID
import org.multiverse.api.GlobalStmInstance.getGlobalStmInstance
/**
* Example Scala usage:
* <pre>
* val myMap = TransactionalState.newMap
* val myVector = TransactionalState.newVector
* val myRef = TransactionalState.newRef
* </pre>
* Or:
* <pre>
* val myMap = TransactionalMap()
* val myVector = TransactionalVector()
* val myRef = TransactionalRef()
* </pre>
*
* <p/>
* Example Java usage:
* <pre>
* TransactionalMap myMap = TransactionalState.newMap();
* </pre>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionalState {
def newMap[K, V] = TransactionalMap[K, V]()
def newMap[K, V](pairs: (K, V)*) = TransactionalMap(pairs: _*)
def newVector[T] = TransactionalVector[T]()
def newVector[T](elems: T*) = TransactionalVector(elems :_*)
def newRef[T] = TransactionalRef[T]()
def newRef[T](initialValue: T) = TransactionalRef(initialValue)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable
trait Transactional {
val uuid: String
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Committable {
def commit: Unit
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Abortable {
def abort: Unit
}
object RefFactory {
private val factory = getGlobalStmInstance.getProgrammaticReferenceFactoryBuilder.build
def createRef[T] = factory.atomicCreateReference[T]()
def createRef[T](value: T) = factory.atomicCreateReference(value)
}
/**
* Alias to TransactionalRef.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Ref {
type Ref[T] = TransactionalRef[T]
def apply[T]() = new Ref[T]
def apply[T](initialValue: T) = new Ref[T](Some(initialValue))
}
/**
* Alias to Ref.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object TransactionalRef {
/**
* An implicit conversion that converts a TransactionalRef to an Iterable value.
*/
implicit def ref2Iterable[T](ref: TransactionalRef[T]): Iterable[T] = ref.toList
def apply[T]() = new TransactionalRef[T]
def apply[T](initialValue: T) = new TransactionalRef[T](Some(initialValue))
}
/**
* Implements a transactional managed reference.
* Alias to Ref.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalRef[T](initialOpt: Option[T] = None) extends Transactional {
self =>
import org.multiverse.api.ThreadLocalTransaction._
implicit val txInitName = "TransactionalRef:Init"
val uuid = UUID.newUuid.toString
private[this] val ref = {
if (initialOpt.isDefined) RefFactory.createRef(initialOpt.get)
else RefFactory.createRef[T]
}
def swap(elem: T) = {
ensureIsInTransaction
ref.set(elem)
}
def alter(f: T => T): T = {
ensureIsInTransaction
ensureNotNull
ref.set(f(ref.get))
ref.get
}
def get: Option[T] = {
ensureIsInTransaction
if (ref.isNull) None
else Some(ref.get)
}
def getOrWait: T = {
ensureIsInTransaction
ref.getOrAwait
}
def getOrElse(default: => T): T = {
ensureIsInTransaction
if (ref.isNull) default
else ref.get
}
def isDefined: Boolean = {
ensureIsInTransaction
!ref.isNull
}
def isEmpty: Boolean = {
ensureIsInTransaction
ref.isNull
}
def map[B](f: T => B): TransactionalRef[B] = {
ensureIsInTransaction
if (isEmpty) TransactionalRef[B] else TransactionalRef(f(ref.get))
}
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = {
ensureIsInTransaction
if (isEmpty) TransactionalRef[B] else f(ref.get)
}
def filter(p: T => Boolean): TransactionalRef[T] = {
ensureIsInTransaction
if (isDefined && p(ref.get)) TransactionalRef(ref.get) else TransactionalRef[T]
}
/**
* Necessary to keep from being implicitly converted to Iterable in for comprehensions.
*/
def withFilter(p: T => Boolean): WithFilter = new WithFilter(p)
class WithFilter(p: T => Boolean) {
def map[B](f: T => B): TransactionalRef[B] = self filter p map f
def flatMap[B](f: T => TransactionalRef[B]): TransactionalRef[B] = self filter p flatMap f
def foreach[U](f: T => U): Unit = self filter p foreach f
def withFilter(q: T => Boolean): WithFilter = new WithFilter(x => p(x) && q(x))
}
def foreach[U](f: T => U): Unit = {
ensureIsInTransaction
if (isDefined) f(ref.get)
}
def elements: Iterator[T] = {
ensureIsInTransaction
if (isEmpty) Iterator.empty else Iterator(ref.get)
}
def toList: List[T] = {
ensureIsInTransaction
if (isEmpty) List() else List(ref.get)
}
def toRight[X](left: => X) = {
ensureIsInTransaction
if (isEmpty) Left(left) else Right(ref.get)
}
def toLeft[X](right: => X) = {
ensureIsInTransaction
if (isEmpty) Right(right) else Left(ref.get)
}
private def ensureIsInTransaction =
()// if (getThreadLocalTransaction eq null) throw new NoTransactionInScopeException
private def ensureNotNull =
if (ref.isNull) throw new RuntimeException("Cannot alter Ref's value when it is null")
}
object TransactionalMap {
def apply[K, V]() = new TransactionalMap[K, V]
def apply[K, V](pairs: (K, V)*) = new TransactionalMap(Some(HashTrie(pairs: _*)))
}
/**
* Implements an in-memory transactional Map based on Clojure's PersistentMap.
*
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalMap[K, V](initialOpt: Option[HashTrie[K, V]] = None) extends Transactional with scala.collection.mutable.Map[K, V] {
val uuid = UUID.newUuid.toString
protected[this] val ref = new TransactionalRef(initialOpt.orElse(Some(new HashTrie[K, V])))
def -=(key: K) = {
remove(key)
this
}
def +=(key: K, value: V) = put(key, value)
def +=(kv: (K, V)) = {
put(kv._1,kv._2)
this
}
override def remove(key: K) = {
val map = ref.get.get
val oldValue = map.get(key)
ref.swap(ref.get.get - key)
oldValue
}
def get(key: K): Option[V] = ref.get.get.get(key)
override def put(key: K, value: V): Option[V] = {
val map = ref.get.get
val oldValue = map.get(key)
ref.swap(map.update(key, value))
oldValue
}
override def update(key: K, value: V) = {
val map = ref.get.get
val oldValue = map.get(key)
ref.swap(map.update(key, value))
}
def iterator = ref.get.get.iterator
override def elements: Iterator[(K, V)] = ref.get.get.iterator
override def contains(key: K): Boolean = ref.get.get.contains(key)
override def clear = ref.swap(new HashTrie[K, V])
override def size: Int = ref.get.get.size
override def hashCode: Int = System.identityHashCode(this);
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalMap[_, _]] &&
other.hashCode == hashCode
override def toString = if (outsideTransaction) "<TransactionalMap>" else super.toString
def outsideTransaction =
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
}
object TransactionalVector {
def apply[T]() = new TransactionalVector[T]
def apply[T](elems: T*) = new TransactionalVector(Some(Vector(elems: _*)))
}
/**
* Implements an in-memory transactional Vector based on Clojure's PersistentVector.
*
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalVector[T](initialOpt: Option[Vector[T]] = None) extends Transactional with IndexedSeq[T] {
val uuid = UUID.newUuid.toString
private[this] val ref = new TransactionalRef(initialOpt.orElse(Some(EmptyVector)))
def clear = ref.swap(EmptyVector)
def +(elem: T) = add(elem)
def add(elem: T) = ref.swap(ref.get.get + elem)
def get(index: Int): T = ref.get.get.apply(index)
/**
* Removes the <i>tail</i> element of this vector.
*/
def pop = ref.swap(ref.get.get.pop)
def update(index: Int, elem: T) = ref.swap(ref.get.get.update(index, elem))
def length: Int = ref.get.get.length
def apply(index: Int): T = ref.get.get.apply(index)
override def hashCode: Int = System.identityHashCode(this);
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalVector[_]] &&
other.hashCode == hashCode
override def toString = if (outsideTransaction) "<TransactionalVector>" else super.toString
def outsideTransaction =
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
}

View file

@ -0,0 +1,61 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
import scala.collection.immutable.Vector
import se.scalablesolutions.akka.util.UUID
import org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction
object TransactionalVector {
def apply[T]() = new TransactionalVector[T]
def apply[T](elems: T*) = new TransactionalVector(Some(Vector(elems: _*)))
}
/**
* Transactional vector that implements the indexed seq interface with an underlying ref and vector.
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class TransactionalVector[T](initialOpt: Option[Vector[T]] = None) extends Transactional with IndexedSeq[T] {
def this() = this(None) // Java compatibility
val uuid = UUID.newUuid.toString
private[this] val ref = new Ref(initialOpt.orElse(Some(Vector[T]())))
def clear = ref.swap(Vector[T]())
def +(elem: T) = add(elem)
def add(elem: T) = ref.swap(ref.get.get :+ elem)
def get(index: Int): T = ref.get.get.apply(index)
/**
* Removes the <i>tail</i> element of this vector.
*/
def pop = ref.swap(ref.get.get.dropRight(1))
def update(index: Int, elem: T) = ref.swap(ref.get.get.updated(index, elem))
def length: Int = ref.get.get.length
def apply(index: Int): T = ref.get.get.apply(index)
override def hashCode: Int = System.identityHashCode(this);
override def equals(other: Any): Boolean =
other.isInstanceOf[TransactionalVector[_]] &&
other.hashCode == hashCode
override def toString = if (outsideTransaction) "<TransactionalVector>" else super.toString
def outsideTransaction =
org.multiverse.api.ThreadLocalTransaction.getThreadLocalTransaction eq null
}

View file

@ -1,353 +0,0 @@
/**
Copyright (c) 2007-2008, Rich Hickey
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions
are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above
copyright notice, this list of conditions and the following
disclaimer in the documentation and/or other materials provided
with the distribution.
* Neither the name of Clojure nor the names of its contributors
may be used to endorse or promote products derived from this
software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
"AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
POSSIBILITY OF SUCH DAMAGE.
**/
package se.scalablesolutions.akka.stm
import Vector._
/**
* A straight port of Clojure's <code>PersistentVector</code> class.
*
* @author Daniel Spiewak
* @author Rich Hickey
*/
@serializable
class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail: Array[AnyRef])
extends IndexedSeq[T] with PersistentDataStructure { outer =>
private val tailOff = length - tail.length
/*
* The design of this data structure inherantly requires heterogenous arrays.
* It is *possible* to design around this, but the result is comparatively
* quite inefficient. With respect to this fact, I have left the original
* (somewhat dynamically-typed) implementation in place.
*/
private[stm] def this() = this(0, 5, EmptyArray, EmptyArray)
def apply(i: Int): T = {
if (i >= 0 && i < length) {
if (i >= tailOff) {
tail(i & 0x01f).asInstanceOf[T]
} else {
var arr = root
var level = shift
while (level > 0) {
arr = arr((i >>> level) & 0x01f).asInstanceOf[Array[AnyRef]]
level -= 5
}
arr(i & 0x01f).asInstanceOf[T]
}
} else throw new IndexOutOfBoundsException(i.toString)
}
def update[A >: T](i: Int, obj: A): Vector[A] = {
if (i >= 0 && i < length) {
if (i >= tailOff) {
val newTail = new Array[AnyRef](tail.length)
Array.copy(tail, 0, newTail, 0, tail.length)
newTail(i & 0x01f) = obj.asInstanceOf[AnyRef]
new Vector[A](length, shift, root, newTail)
} else {
new Vector[A](length, shift, doAssoc(shift, root, i, obj), tail)
}
} else if (i == length) {
this + obj
} else throw new IndexOutOfBoundsException(i.toString)
}
private def doAssoc[A >: T](level: Int, arr: Array[AnyRef], i: Int, obj: A): Array[AnyRef] = {
val ret = new Array[AnyRef](arr.length)
Array.copy(arr, 0, ret, 0, arr.length)
if (level == 0) {
ret(i & 0x01f) = obj.asInstanceOf[AnyRef]
} else {
val subidx = (i >>> level) & 0x01f
ret(subidx) = doAssoc(level - 5, arr(subidx).asInstanceOf[Array[AnyRef]], i, obj)
}
ret
}
def ++[A >: T](other: Iterable[A]) = other.foldLeft(this:Vector[A]) { _ + _ }
def +[A >: T](obj: A): Vector[A] = {
if (tail.length < 32) {
val newTail = new Array[AnyRef](tail.length + 1)
Array.copy(tail, 0, newTail, 0, tail.length)
newTail(tail.length) = obj.asInstanceOf[AnyRef]
new Vector[A](length + 1, shift, root, newTail)
} else {
var (newRoot, expansion) = pushTail(shift - 5, root, tail, null)
var newShift = shift
if (expansion ne null) {
newRoot = array(newRoot, expansion)
newShift += 5
}
new Vector[A](length + 1, newShift, newRoot, array(obj.asInstanceOf[AnyRef]))
}
}
private def pushTail(level: Int, arr: Array[AnyRef], tailNode: Array[AnyRef], expansion: AnyRef): (Array[AnyRef], AnyRef) = {
val newChild = if (level == 0) tailNode else {
val (newChild, subExpansion) = pushTail(level - 5, arr(arr.length - 1).asInstanceOf[Array[AnyRef]], tailNode, expansion)
if (subExpansion eq null) {
val ret = new Array[AnyRef](arr.length)
Array.copy(arr, 0, ret, 0, arr.length)
ret(arr.length - 1) = newChild
return (ret, null)
} else subExpansion
}
// expansion
if (arr.length == 32) {
(arr, array(newChild))
} else {
val ret = new Array[AnyRef](arr.length + 1)
Array.copy(arr, 0, ret, 0, arr.length)
ret(arr.length) = newChild
(ret, null)
}
}
/**
* Removes the <i>tail</i> element of this vector.
*/
def pop: Vector[T] = {
if (length == 0) {
throw new IllegalStateException("Can't pop empty vector")
} else if (length == 1) {
EmptyVector
} else if (tail.length > 1) {
val newTail = new Array[AnyRef](tail.length - 1)
Array.copy(tail, 0, newTail, 0, newTail.length)
new Vector[T](length - 1, shift, root, newTail)
} else {
var (newRoot, pTail) = popTail(shift - 5, root, null)
var newShift = shift
if (newRoot eq null) {
newRoot = EmptyArray
}
if (shift > 5 && newRoot.length == 1) {
newRoot = newRoot(0).asInstanceOf[Array[AnyRef]]
newShift -= 5
}
new Vector[T](length - 1, newShift, newRoot, pTail.asInstanceOf[Array[AnyRef]])
}
}
private def popTail(shift: Int, arr: Array[AnyRef], pTail: AnyRef): (Array[AnyRef], AnyRef) = {
val newPTail = if (shift > 0) {
val (newChild, subPTail) = popTail(shift - 5, arr(arr.length - 1).asInstanceOf[Array[AnyRef]], pTail)
if (newChild ne null) {
val ret = new Array[AnyRef](arr.length)
Array.copy(arr, 0, ret, 0, arr.length)
ret(arr.length - 1) = newChild
return (ret, subPTail)
}
subPTail
} else if (shift == 0) {
arr(arr.length - 1)
} else pTail
// contraction
if (arr.length == 1) {
(null, newPTail)
} else {
val ret = new Array[AnyRef](arr.length - 1)
Array.copy(arr, 0, ret, 0, ret.length)
(ret, newPTail)
}
}
override def filter(p: (T)=>Boolean) = {
var back = new Vector[T]
var i = 0
while (i < length) {
val e = apply(i)
if (p(e)) back += e
i += 1
}
back
}
def flatMap[A](f: (T)=>Iterable[A]): Vector[A] = {
var back = new Vector[A]
var i = 0
while (i < length) {
f(apply(i)) foreach { back += _ }
i += 1
}
back
}
def map[A](f: (T)=>A): Vector[A] = {
var back = new Vector[A]
var i = 0
while (i < length) {
back += f(apply(i))
i += 1
}
back
}
override def reverse: Vector[T] = new VectorProjection[T] {
override val length = outer.length
override def apply(i: Int) = outer.apply(length - i - 1)
}
def subseq(from: Int, end: Int) = subVector(from, end)
def subVector(from: Int, end: Int): Vector[T] = {
if (from < 0) {
throw new IndexOutOfBoundsException(from.toString)
} else if (end >= length) {
throw new IndexOutOfBoundsException(end.toString)
} else if (end <= from) {
throw new IllegalArgumentException("Invalid range: " + from + ".." + end)
} else {
new VectorProjection[T] {
override val length = end - from
override def apply(i: Int) = outer.apply(i + from)
}
}
}
def zip[A](that: Vector[A]) = {
var back = new Vector[(T, A)]
var i = 0
val limit = math.min(length, that.length)
while (i < limit) {
back += (apply(i), that(i))
i += 1
}
back
}
def zipWithIndex = {
var back = new Vector[(T, Int)]
var i = 0
while (i < length) {
back += (apply(i), i)
i += 1
}
back
}
override def equals(other: Any) = other match {
case vec: Vector[_] => {
var back = length == vec.length
var i = 0
while (i < length) {
back &&= apply(i) == vec.apply(i)
i += 1
}
back
}
case _ => false
}
override def hashCode = foldLeft(0) { _ ^ _.hashCode }
}
object Vector {
private[stm] val EmptyArray = new Array[AnyRef](0)
def apply[T](elems: T*) = elems.foldLeft(EmptyVector:Vector[T]) { _ + _ }
def unapplySeq[T](vec: Vector[T]): Option[Seq[T]] = Some(vec)
@inline
private[stm] def array(elems: AnyRef*) = {
val back = new Array[AnyRef](elems.length)
Array.copy(elems.toArray, 0, back, 0, back.length)
back
}
}
object EmptyVector extends Vector[Nothing]
private[stm] abstract class VectorProjection[+T] extends Vector[T] {
override val length: Int
override def apply(i: Int): T
override def +[A >: T](e: A) = innerCopy + e
override def update[A >: T](i: Int, e: A) = {
if (i < 0) {
throw new IndexOutOfBoundsException(i.toString)
} else if (i > length) {
throw new IndexOutOfBoundsException(i.toString)
} else innerCopy(i) = e
}
private lazy val innerCopy = foldLeft(EmptyVector:Vector[T]) { _ + _ }
}

View file

@ -0,0 +1,56 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.stm
/**
* For importing 'local' STM.
*/
package object local extends LocalStm with StmUtil with StmCommon
/**
* For importing 'global' STM.
*/
package object global extends GlobalStm with StmUtil with StmCommon
trait StmCommon {
type TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
val TransactionConfig = se.scalablesolutions.akka.stm.TransactionConfig
type TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
val TransactionFactory = se.scalablesolutions.akka.stm.TransactionFactory
type Ref[T] = se.scalablesolutions.akka.stm.Ref[T]
val Ref = se.scalablesolutions.akka.stm.Ref
}
/**
* For importing the transactional data structures, including the primitive refs
* and transactional data structures from Multiverse.
*/
package object transactional {
type TransactionalMap[K,V] = se.scalablesolutions.akka.stm.TransactionalMap[K,V]
val TransactionalMap = se.scalablesolutions.akka.stm.TransactionalMap
type TransactionalVector[T] = se.scalablesolutions.akka.stm.TransactionalVector[T]
val TransactionalVector = se.scalablesolutions.akka.stm.TransactionalVector
type BooleanRef = org.multiverse.transactional.refs.BooleanRef
type ByteRef = org.multiverse.transactional.refs.ByteRef
type CharRef = org.multiverse.transactional.refs.CharRef
type DoubleRef = org.multiverse.transactional.refs.DoubleRef
type FloatRef = org.multiverse.transactional.refs.FloatRef
type IntRef = org.multiverse.transactional.refs.IntRef
type LongRef = org.multiverse.transactional.refs.LongRef
type ShortRef = org.multiverse.transactional.refs.ShortRef
type TransactionalReferenceArray[T] = org.multiverse.transactional.arrays.TransactionalReferenceArray[T]
// These won't compile - something to do with vararg constructors? Check for Scala bug.
// type TransactionalArrayList[T] = org.multiverse.transactional.collections.TransactionalArrayList[T]
// type TransactionalLinkedList[T] = org.multiverse.transactional.collections.TransactionalLinkedList[T]
type TransactionalThreadPoolExecutor = org.multiverse.transactional.executors.TransactionalThreadPoolExecutor
}

View file

@ -0,0 +1,93 @@
/**
* Copyright (C) 2009-2010 Scalable Solutions AB <http://scalablesolutions.se>
*/
package se.scalablesolutions.akka.util
import java.util.concurrent.TimeUnit
object Duration {
def apply(length: Long, unit: TimeUnit) = new Duration(length, unit)
def apply(length: Long, unit: String) = new Duration(length, timeUnit(unit))
def timeUnit(unit: String) = unit.toLowerCase match {
case "nanoseconds" | "nanos" | "nanosecond" | "nano" => TimeUnit.NANOSECONDS
case "microseconds" | "micros" | "microsecond" | "micro" => TimeUnit.MICROSECONDS
case "milliseconds" | "millis" | "millisecond" | "milli" => TimeUnit.MILLISECONDS
case _ => TimeUnit.SECONDS
}
}
/**
* Utility for working with java.util.concurrent.TimeUnit durations.
* <p/>
* Example:
* <pre>
* import se.scalablesolutions.akka.util.Duration
* import java.util.concurrent.TimeUnit
*
* val duration = Duration(100, TimeUnit.MILLISECONDS)
* val duration = Duration(100, "millis")
*
* duration.toNanos
* </pre>
* <p/>
* Implicits are also provided for Int and Long. Example usage:
* <pre>
* import se.scalablesolutions.akka.util.duration._
*
* val duration = 100.millis
* </pre>
*/
class Duration(val length: Long, val unit: TimeUnit) {
def toNanos = unit.toNanos(length)
def toMicros = unit.toMicros(length)
def toMillis = unit.toMillis(length)
def toSeconds = unit.toSeconds(length)
override def toString = "Duration(" + length + ", " + unit + ")"
}
package object duration {
implicit def intToDurationInt(n: Int) = new DurationInt(n)
implicit def longToDurationLong(n: Long) = new DurationLong(n)
}
class DurationInt(n: Int) {
def nanoseconds = Duration(n, TimeUnit.NANOSECONDS)
def nanos = Duration(n, TimeUnit.NANOSECONDS)
def nanosecond = Duration(n, TimeUnit.NANOSECONDS)
def nano = Duration(n, TimeUnit.NANOSECONDS)
def microseconds = Duration(n, TimeUnit.MICROSECONDS)
def micros = Duration(n, TimeUnit.MICROSECONDS)
def microsecond = Duration(n, TimeUnit.MICROSECONDS)
def micro = Duration(n, TimeUnit.MICROSECONDS)
def milliseconds = Duration(n, TimeUnit.MILLISECONDS)
def millis = Duration(n, TimeUnit.MILLISECONDS)
def millisecond = Duration(n, TimeUnit.MILLISECONDS)
def milli = Duration(n, TimeUnit.MILLISECONDS)
def seconds = Duration(n, TimeUnit.SECONDS)
def second = Duration(n, TimeUnit.SECONDS)
}
class DurationLong(n: Long) {
def nanoseconds = Duration(n, TimeUnit.NANOSECONDS)
def nanos = Duration(n, TimeUnit.NANOSECONDS)
def nanosecond = Duration(n, TimeUnit.NANOSECONDS)
def nano = Duration(n, TimeUnit.NANOSECONDS)
def microseconds = Duration(n, TimeUnit.MICROSECONDS)
def micros = Duration(n, TimeUnit.MICROSECONDS)
def microsecond = Duration(n, TimeUnit.MICROSECONDS)
def micro = Duration(n, TimeUnit.MICROSECONDS)
def milliseconds = Duration(n, TimeUnit.MILLISECONDS)
def millis = Duration(n, TimeUnit.MILLISECONDS)
def millisecond = Duration(n, TimeUnit.MILLISECONDS)
def milli = Duration(n, TimeUnit.MILLISECONDS)
def seconds = Duration(n, TimeUnit.SECONDS)
def second = Duration(n, TimeUnit.SECONDS)
}

View file

@ -4,7 +4,7 @@ import java.util.concurrent.{TimeUnit, CountDownLatch}
import org.scalatest.junit.JUnitSuite
import org.junit.Test
import se.scalablesolutions.akka.stm.{TransactionalState, TransactionalMap, TransactionalRef, TransactionalVector}
import se.scalablesolutions.akka.stm.{Ref, TransactionalMap, TransactionalVector}
import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
@ -36,9 +36,9 @@ class InMemStatefulActor(expectedInvocationCount: Int) extends Transactor {
val notifier = new CountDownLatch(expectedInvocationCount)
private lazy val mapState = TransactionalState.newMap[String, String]
private lazy val vectorState = TransactionalState.newVector[String]
private lazy val refState = TransactionalState.newRef[String]
private lazy val mapState = TransactionalMap[String, String]()
private lazy val vectorState = TransactionalVector[String]()
private lazy val refState = Ref[String]()
def receive = {
case GetNotifier =>

View file

@ -8,10 +8,10 @@ import org.junit.runner.RunWith
import se.scalablesolutions.akka.actor.Actor._
@RunWith(classOf[JUnitRunner])
class TransactionalRefSpec extends Spec with ShouldMatchers {
class RefSpec extends Spec with ShouldMatchers {
describe("A TransactionalRef") {
import Transaction.Local._
describe("A Ref") {
import local._
it("should optionally accept an initial value") {
val emptyRef = Ref[Int]
@ -29,7 +29,7 @@ class TransactionalRefSpec extends Spec with ShouldMatchers {
val ref = Ref(3)
try {
atomic {
atomic(DefaultLocalTransactionFactory) {
ref.swap(5)
throw new Exception
}

View file

@ -1,8 +1,7 @@
package se.scalablesolutions.akka.actor
package se.scalablesolutions.akka.stm
import se.scalablesolutions.akka.stm._
import se.scalablesolutions.akka.actor.{Actor, Transactor}
import se.scalablesolutions.akka.util.Helpers.narrow
import Actor._
import org.scalatest.Spec
@ -18,11 +17,11 @@ class StmSpec extends
ShouldMatchers with
BeforeAndAfterAll {
describe("Transaction.Local") {
describe("Local STM") {
it("should be able to do multiple consecutive atomic {..} statements") {
import Transaction.Local._
import local._
lazy val ref = TransactionalState.newRef[Int]
lazy val ref = Ref[Int]()
def increment = atomic {
ref.swap(ref.get.getOrElse(0) + 1)
@ -39,9 +38,9 @@ class StmSpec extends
}
it("should be able to do nested atomic {..} statements") {
import Transaction.Local._
import local._
lazy val ref = TransactionalState.newRef[Int]
lazy val ref = Ref[Int]()
def increment = atomic {
ref.swap(ref.get.getOrElse(0) + 1)
@ -61,9 +60,9 @@ class StmSpec extends
}
it("should roll back failing nested atomic {..} statements") {
import Transaction.Local._
import local._
lazy val ref = TransactionalState.newRef[Int]
lazy val ref = Ref[Int]()
def increment = atomic {
ref.swap(ref.get.getOrElse(0) + 1)
@ -72,7 +71,7 @@ class StmSpec extends
ref.get.getOrElse(0)
}
try {
atomic {
atomic(DefaultLocalTransactionFactory) {
increment
increment
throw new Exception
@ -84,7 +83,7 @@ class StmSpec extends
}
}
describe("Transaction.Global") {
describe("Global STM") {
it("should be able to initialize with atomic {..} block inside actor constructor") {
import GlobalTransactionVectorTestActor._
try {
@ -183,17 +182,17 @@ object GlobalTransactionVectorTestActor {
}
class GlobalTransactionVectorTestActor extends Actor {
import GlobalTransactionVectorTestActor._
import se.scalablesolutions.akka.stm.Transaction.Global
import se.scalablesolutions.akka.stm.global._
private val vector: TransactionalVector[Int] = Global.atomic { TransactionalVector(1) }
private val vector: TransactionalVector[Int] = atomic { TransactionalVector(1) }
def receive = {
case Add(value) =>
Global.atomic { vector + value}
atomic { vector + value}
self.reply(Success)
case Size =>
val size = Global.atomic { vector.size }
val size = atomic { vector.size }
self.reply(size)
}
}
@ -214,7 +213,7 @@ class NestedTransactorLevelOneActor extends Actor {
}
}
class NestedTransactorLevelTwoActor extends Actor {
class NestedTransactorLevelTwoActor extends Transactor {
import GlobalTransactionVectorTestActor._
private val ref = Ref(0)

View file

@ -1,17 +0,0 @@
package se.scalablesolutions.akka.stm
import org.scalatest.FunSuite
import Transaction.Global._
class TransactionalVectorBugTestSuite extends FunSuite {
test("adding more than 32 items to a Vector shouldn't blow it up") {
atomic {
var v1 = new Vector[Int]()
for (i <- 0 to 31) {
v1 = v1 + i
}
v1 = v1 + 32
}
}
}

View file

@ -82,9 +82,9 @@ trait Storage {
*/
trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
with Transactional with Committable with Abortable with Logging {
protected val newAndUpdatedEntries = TransactionalState.newMap[K, V]
protected val removedEntries = TransactionalState.newVector[K]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
protected val newAndUpdatedEntries = TransactionalMap[K, V]()
protected val removedEntries = TransactionalVector[K]()
protected val shouldClearOnCommit = Ref[Boolean]()
// to be concretized in subclasses
val storage: MapStorageBackend[K, V]
@ -195,10 +195,10 @@ trait PersistentMap[K, V] extends scala.collection.mutable.Map[K, V]
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newVector[T]
protected val updatedElems = TransactionalState.newMap[Int, T]
protected val removedElems = TransactionalState.newVector[T]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
protected val newElems = TransactionalVector[T]()
protected val updatedElems = TransactionalMap[Int, T]()
protected val removedElems = TransactionalVector[T]()
protected val shouldClearOnCommit = Ref[Boolean]()
val storage: VectorStorageBackend[T]
@ -276,7 +276,7 @@ trait PersistentVector[T] extends IndexedSeq[T] with Transactional with Committa
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait PersistentRef[T] extends Transactional with Committable with Abortable {
protected val ref = new TransactionalRef[T]
protected val ref = Ref[T]()
val storage: RefStorageBackend[T]
@ -343,14 +343,14 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
import scala.collection.immutable.Queue
// current trail that will be played on commit to the underlying store
protected val enqueuedNDequeuedEntries = TransactionalState.newVector[(Option[A], QueueOp)]
protected val shouldClearOnCommit = TransactionalRef[Boolean]()
protected val enqueuedNDequeuedEntries = TransactionalVector[(Option[A], QueueOp)]()
protected val shouldClearOnCommit = Ref[Boolean]()
// local queue that will record all enqueues and dequeues in the current txn
protected val localQ = TransactionalRef[Queue[A]]()
protected val localQ = Ref[Queue[A]]()
// keeps a pointer to the underlying storage for the enxt candidate to be dequeued
protected val pickMeForDQ = TransactionalRef[Int]()
protected val pickMeForDQ = Ref[Int]()
localQ.swap(Queue.empty)
pickMeForDQ.swap(0)
@ -481,8 +481,8 @@ trait PersistentQueue[A] extends scala.collection.mutable.Queue[A]
*/
trait PersistentSortedSet[A] extends Transactional with Committable with Abortable {
protected val newElems = TransactionalState.newMap[A, Float]
protected val removedElems = TransactionalState.newVector[A]
protected val newElems = TransactionalMap[A, Float]()
protected val removedElems = TransactionalVector[A]()
val storage: SortedSetStorageBackend[A]

View file

@ -8,7 +8,7 @@ import se.scalablesolutions.akka.actor.{Actor, ActorRef}
import se.scalablesolutions.akka.config.OneForOneStrategy
import Actor._
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging

View file

@ -7,15 +7,13 @@ package sample.ants
import java.util.concurrent.TimeUnit
import scala.util.Random.{nextInt => randomInt}
import se.scalablesolutions.akka
import akka.actor.{ActorRef, Transactor, Scheduler}
import akka.actor.{Actor, ActorRef, Scheduler}
import akka.actor.Actor.actorOf
import akka.stm.{Vector => _, _}
import akka.stm.Ref.Ref
import akka.stm.Transaction.Local._
import akka.stm.local._
object Config {
val Dim = 80 // dimensions of square world
val AntsSqrt = 7 // number of ants = AntsSqrt^2
val AntsSqrt = 20 // number of ants = AntsSqrt^2
val FoodPlaces = 35 // number of places with food
val FoodRange = 100 // range of amount of food at a place
val PherScale = 10 // scale factor for pheromone drawing
@ -43,7 +41,7 @@ case class Cell(food: Int = 0, pher: Float = 0, ant: Option[Ant] = None, home: B
object EmptyCell extends Cell
class Place(initCell: Cell = EmptyCell) extends Ref(Some(initCell)) {
def cell: Cell = get.get
def cell: Cell = getOrElse(EmptyCell)
def food: Int = cell.food
def food(i: Int) = alter(_.addFood(i))
def hasFood = food > 0
@ -60,6 +58,8 @@ class Place(initCell: Cell = EmptyCell) extends Ref(Some(initCell)) {
def home: Boolean = cell.home
}
case object Ping
object World {
import Config._
@ -68,6 +68,10 @@ object World {
lazy val ants = setup
lazy val evaporator = actorOf[Evaporator].start
private val snapshotFactory = TransactionFactory(readonly = true, familyName = "snapshot")
def snapshot = atomic(snapshotFactory) { Array.tabulate(Dim, Dim)(place(_, _).get) }
def place(loc: (Int, Int)) = places(loc._1)(loc._2)
private def setup = atomic {
@ -83,12 +87,12 @@ object World {
}
def start = {
ants foreach (pingEvery(_, AntMillis))
pingEvery(evaporator, EvapMillis)
ants foreach pingEvery(AntMillis)
pingEvery(EvapMillis)(evaporator)
}
private def pingEvery(actor: ActorRef, millis: Long) =
Scheduler.schedule(actor, "ping", Config.StartDelay, millis, TimeUnit.MILLISECONDS)
private def pingEvery(millis: Long)(actor: ActorRef) =
Scheduler.schedule(actor, Ping, Config.StartDelay, millis, TimeUnit.MILLISECONDS)
}
object Util {
@ -123,9 +127,9 @@ object Util {
}
}
trait WorldActor extends Transactor {
trait WorldActor extends Actor {
def act
def receive = { case "ping" => act }
def receive = { case Ping => act }
}
class AntActor(initLoc: (Int, Int)) extends WorldActor {
@ -133,13 +137,17 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor {
import Util._
val locRef = Ref(initLoc)
val name = "ant-from-" + initLoc._1 + "-" + initLoc._2
implicit val txFactory = TransactionFactory(familyName = name)
val homing = (p: Place) => p.pher + (100 * (if (p.home) 0 else 1))
val foraging = (p: Place) => p.pher + p.food
def loc = locRef.get.getOrElse(initLoc)
def loc = locRef.getOrElse(initLoc)
def newLoc(l: (Int, Int)) = locRef swap l
def act = {
def act = atomic {
val (x, y) = loc
val current = place(x, y)
for (ant <- current.ant) {
@ -202,6 +210,11 @@ class AntActor(initLoc: (Int, Int)) extends WorldActor {
class Evaporator extends WorldActor {
import Config._
import World._
implicit val txFactory = TransactionFactory(familyName = "evaporator")
val evaporate = (pher: Float) => pher * EvapRate
def act = for (x <- 0 until Dim; y <- 0 until Dim) place(x, y) pher evaporate
def act = for (x <- 0 until Dim; y <- 0 until Dim) {
atomic { place(x, y) pher evaporate }
}
}

View file

@ -1,6 +1,6 @@
import sample.ants._
import sample.ants.Config._
import se.scalablesolutions.akka.stm.Transaction.Local._
import se.scalablesolutions.akka.stm.local._
val scale = 5
@ -13,8 +13,8 @@ override def setup() {
}
def draw() {
for (x <- 0 until Dim; y <- 0 until Dim) {
val cell = atomic { World.place(x, y).cell }
val world = World.snapshot
for (x <- 0 until Dim; y <- 0 until Dim; cell <- world(x)(y)) {
val (rx, ry, rw, rh) = (x * scale, y * scale, scale, scale)
noStroke()
fill(255)

View file

@ -7,6 +7,6 @@ http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
http://www.akkasource.org/schema/akka
http://scalablesolutions.se/akka/akka.xsd">
<akka:active-object id="blah2" target="sample.camel.BeanImpl" timeout="1000" />
<akka:active-object id="pojo3" target="sample.camel.BeanImpl" timeout="1000" />
</beans>

View file

@ -32,8 +32,6 @@ class Producer1 extends Actor with Producer {
override def oneway = false // default
override def async = true // default
protected def receive = produce
}
class Consumer1 extends Actor with Consumer with Logging {
@ -102,7 +100,6 @@ class Publisher(name: String, uri: String) extends Actor with Producer {
self.id = name
def endpointUri = uri
override def oneway = true
protected def receive = produce
}
class PublisherBridge(uri: String, publisher: ActorRef) extends Actor with Consumer {

View file

@ -27,14 +27,18 @@ class Boot {
CamelContextManager.context.addRoutes(new CustomRouteBuilder)
// -----------------------------------------------------------------------
// Basic example (using a supervisor for consumer actors)
// Basic example
// -----------------------------------------------------------------------
val supervisor = Supervisor(
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
Supervise(actorOf[Consumer1], LifeCycle(Permanent)) ::
Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
actorOf[Consumer1].start
actorOf[Consumer2].start
// Alternatively, use a supervisor for these actors
//val supervisor = Supervisor(
// SupervisorConfig(
// RestartStrategy(OneForOne, 3, 100, List(classOf[Exception])),
// Supervise(actorOf[Consumer1], LifeCycle(Permanent)) ::
// Supervise(actorOf[Consumer2], LifeCycle(Permanent)) :: Nil))
// -----------------------------------------------------------------------
// Routing example

View file

@ -14,8 +14,7 @@ object ServerApplication {
//
def main(args: Array[String]) {
val camelService = CamelService.newInstance
camelService.load
val camelService = CamelService.newInstance.load
RemoteNode.start("localhost", 7777)
RemoteNode.register("remote2", actorOf[RemoteActor2].start)
}

View file

@ -2,6 +2,8 @@ package sample.camel
import org.apache.camel.impl.{DefaultCamelContext, SimpleRegistry}
import org.apache.camel.builder.RouteBuilder
import org.apache.camel.spring.spi.ApplicationContextRegistry
import org.springframework.context.support.ClassPathXmlApplicationContext
import se.scalablesolutions.akka.camel.{CamelService, CamelContextManager}
import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
@ -9,7 +11,7 @@ import se.scalablesolutions.akka.actor.{ActorRegistry, ActiveObject}
/**
* @author Martin Krasser
*/
object PlainApplication {
object StandaloneApplication {
def main(args: Array[String]) {
import CamelContextManager.context
@ -20,19 +22,18 @@ object PlainApplication {
// customize CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new PlainApplicationRoute)
CamelContextManager.context.addRoutes(new StandaloneApplicationRoute)
// start CamelService
val camelService = CamelService.newInstance
camelService.load
val camelService = CamelService.newInstance.load
// access 'externally' registered active objects
assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test1", "msg1"))
assert("hello msg2" == context.createProducerTemplate.requestBody("direct:test2", "msg2"))
// 'internally' register active object (requires CamelService)
ActiveObject.newInstance(classOf[ConsumerPojo2])
// access 'externally' registered active objects with active-object component
assert("hello msg1" == context.createProducerTemplate.requestBody("direct:test1", "msg1"))
assert("hello msg2" == context.createProducerTemplate.requestBody("direct:test2", "msg2"))
// internal registration is done in background. Wait a bit ...
Thread.sleep(1000)
@ -48,13 +49,43 @@ object PlainApplication {
}
}
class PlainApplicationRoute extends RouteBuilder {
class StandaloneApplicationRoute extends RouteBuilder {
def configure = {
// routes to active objects (in SimpleRegistry)
from("direct:test1").to("active-object:pojo1?method=foo")
from("direct:test2").to("active-object:pojo2?method=foo")
}
}
object SpringApplication {
// TODO
object StandaloneSpringApplication {
def main(args: Array[String]) {
import CamelContextManager.context
// use Spring application context as active object registry
val springctx = new ClassPathXmlApplicationContext("/context-standalone.xml")
val registry = new ApplicationContextRegistry(springctx)
// customize CamelContext
CamelContextManager.init(new DefaultCamelContext(registry))
CamelContextManager.context.addRoutes(new StandaloneSpringApplicationRoute)
// start CamelService
val camelService = CamelService.newInstance.load
// access 'externally' registered active objects with active-object component
assert("hello msg3" == context.createProducerTemplate.requestBody("direct:test3", "msg3"))
// shutdown CamelService
camelService.unload
// shutdown all (internally) created actors
ActorRegistry.shutdownAll
}
}
class StandaloneSpringApplicationRoute extends RouteBuilder {
def configure = {
// routes to active object (in ApplicationContextRegistry)
from("direct:test3").to("active-object:pojo3?method=foo")
}
}

View file

@ -10,7 +10,7 @@ import se.scalablesolutions.akka.actor.{SupervisorFactory, Actor, ActorRef, Remo
import se.scalablesolutions.akka.remote.{RemoteNode, RemoteClient}
import se.scalablesolutions.akka.persistence.common.PersistentVector
import se.scalablesolutions.akka.persistence.redis.RedisStorage
import se.scalablesolutions.akka.stm.Transaction.Global._
import se.scalablesolutions.akka.stm.global._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.config.OneForOneStrategy
import se.scalablesolutions.akka.util.Logging

View file

@ -2,7 +2,7 @@ package sample.lift
import se.scalablesolutions.akka.actor.{Transactor, Actor}
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.stm.TransactionalState
import se.scalablesolutions.akka.stm.TransactionalMap
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
import Actor._
@ -22,7 +22,7 @@ class SimpleService extends Transactor {
case object Tick
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalState.newMap[String, Integer]
private lazy val storage = TransactionalMap[String, Integer]()
@GET
@Produces(Array("text/html"))

View file

@ -9,7 +9,6 @@ import se.scalablesolutions.akka.actor.ActiveObjectContext;
import se.scalablesolutions.akka.actor.annotation.transactionrequired;
import se.scalablesolutions.akka.actor.annotation.prerestart;
import se.scalablesolutions.akka.actor.annotation.postrestart;
import se.scalablesolutions.akka.stm.TransactionalState;
import se.scalablesolutions.akka.stm.TransactionalMap;
@transactionrequired
@ -21,7 +20,7 @@ public class SimpleService {
private Receiver receiver = ActiveObject.newInstance(Receiver.class);
public String count() {
if (storage == null) storage = TransactionalState.newMap();
if (storage == null) storage = new TransactionalMap<String, Integer>();
if (!hasStartedTicking) {
storage.put(KEY, 0);
hasStartedTicking = true;

View file

@ -6,7 +6,7 @@ package sample.rest.scala
import se.scalablesolutions.akka.actor.{Transactor, SupervisorFactory, Actor}
import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.stm.TransactionalState
import se.scalablesolutions.akka.stm.TransactionalMap
import se.scalablesolutions.akka.persistence.cassandra.CassandraStorage
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
@ -63,7 +63,7 @@ class SimpleService {
class SimpleServiceActor extends Transactor {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalState.newMap[String, Integer]
private lazy val storage = TransactionalMap[String, Integer]()
def receive = {
case "Tick" => if (hasStartedTicking) {

View file

@ -9,7 +9,7 @@ import se.scalablesolutions.akka.actor.Actor._
import se.scalablesolutions.akka.config.ScalaConfig._
import se.scalablesolutions.akka.util.Logging
import se.scalablesolutions.akka.security.{BasicAuthenticationActor,BasicCredentials,SpnegoAuthenticationActor,DigestAuthenticationActor, UserInfo}
import se.scalablesolutions.akka.stm.TransactionalState
import se.scalablesolutions.akka.stm.TransactionalMap
import se.scalablesolutions.akka.actor.ActorRegistry.actorsFor
class Boot {
@ -135,7 +135,7 @@ class SecureTickService {
class SecureTickActor extends Transactor with Logging {
private val KEY = "COUNTER"
private var hasStartedTicking = false
private lazy val storage = TransactionalState.newMap[String, Integer]
private lazy val storage = TransactionalMap[String, Integer]()
def receive = {
case "Tick" => if (hasStartedTicking) {
val counter = storage.get(KEY).get.intValue

View file

@ -3,21 +3,20 @@ package se.scalablesolutions.akka.spring.foo;
import se.scalablesolutions.akka.actor.annotation.inittransactionalstate;
import se.scalablesolutions.akka.stm.TransactionalMap;
import se.scalablesolutions.akka.stm.TransactionalVector;
import se.scalablesolutions.akka.stm.TransactionalRef;
import se.scalablesolutions.akka.stm.TransactionalState;
import se.scalablesolutions.akka.stm.Ref;
public class StatefulPojo {
private TransactionalMap<String, String> mapState;
private TransactionalVector<String> vectorState;
private TransactionalRef<String> refState;
private Ref<String> refState;
private boolean isInitialized = false;
@inittransactionalstate
public void init() {
if (!isInitialized) {
mapState = TransactionalState.newMap();
vectorState = TransactionalState.newVector();
refState = TransactionalState.newRef();
mapState = new TransactionalMap();
vectorState = new TransactionalVector();
refState = new Ref();
isInitialized = true;
}
}

View file

@ -31,10 +31,7 @@
</actor>
<stm>
service = on
fair = on # should transactions be fair or non-fair (non fair yield better performance)
max-nr-of-retries = 1000 # max nr of retries of a failing transaction before giving up
timeout = 10000 # transaction timeout; if transaction has not committed within the timeout then it is aborted
fair = on # should global transactions be fair or non-fair (non fair yield better performance)
jta-aware = off # 'on' means that if there JTA Transaction Manager available then the STM will
# begin (or join), commit or rollback the JTA transaction. Default is 'off'.
</stm>

View file

@ -19,7 +19,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val CASSANDRA_VERSION = "0.6.1"
val LIFT_VERSION = "2.0-scala280-SNAPSHOT"
val SCALATEST_VERSION = "1.2-for-scala-2.8.0.RC3-SNAPSHOT"
val MULTIVERSE_VERSION = "0.5.2"
val MULTIVERSE_VERSION = "0.6-SNAPSHOT"
// ------------------------------------------------------------
lazy val deployPath = info.projectPath / "deploy"
@ -65,6 +65,8 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
val grizzlyModuleConfig = ModuleConfiguration("com.sun.grizzly", javaNetRepo)
// val atmosphereModuleConfig = ModuleConfiguration("org.atmosphere", sonatypeSnapshotRepo)
val liftModuleConfig = ModuleConfiguration("net.liftweb", ScalaToolsSnapshots)
def codehausSnapshotRepo = "Codehaus Snapshots" at "http://snapshots.repository.codehaus.org"
val multiverseModuleConfig = ModuleConfiguration("org.multiverse", codehausSnapshotRepo)
// ------------------------------------------------------------
// project defintions
@ -174,6 +176,7 @@ class AkkaParent(info: ProjectInfo) extends DefaultProject(info) {
// subprojects
class AkkaCoreProject(info: ProjectInfo) extends AkkaDefaultProject(info, distPath) {
val netty = "org.jboss.netty" % "netty" % "3.2.0.CR1" % "compile"
val commons_codec = "commons-codec" % "commons-codec" % "1.4" % "compile"
val commons_io = "commons-io" % "commons-io" % "1.4" % "compile"
val dispatch_json = "net.databinder" % "dispatch-json_2.8.0.RC3" % "0.7.4" % "compile"
val dispatch_htdisttp = "net.databinder" % "dispatch-http_2.8.0.RC3" % "0.7.4" % "compile"