transactional actors and remote actors implemented

This commit is contained in:
Jonas Boner 2009-06-29 15:01:20 +02:00
parent a585e0ce38
commit 0a915eaff9
20 changed files with 1419 additions and 472 deletions

View file

@ -30,6 +30,9 @@
</value> </value>
</option> </option>
<option name="LINE_SEPARATOR" value="&#10;" /> <option name="LINE_SEPARATOR" value="&#10;" />
<ScalaCodeStyleSettings>
<option name="CHECK_IMPLICITS" value="false" />
</ScalaCodeStyleSettings>
<ADDITIONAL_INDENT_OPTIONS fileType=""> <ADDITIONAL_INDENT_OPTIONS fileType="">
<option name="INDENT_SIZE" value="2" /> <option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />
@ -39,6 +42,15 @@
<option name="LABEL_INDENT_SIZE" value="0" /> <option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" /> <option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS> </ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="clj">
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="2" />
<option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="groovy"> <ADDITIONAL_INDENT_OPTIONS fileType="groovy">
<option name="INDENT_SIZE" value="2" /> <option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />
@ -84,6 +96,15 @@
<option name="LABEL_INDENT_SIZE" value="0" /> <option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" /> <option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS> </ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="scala">
<option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="2" />
<option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="sql"> <ADDITIONAL_INDENT_OPTIONS fileType="sql">
<option name="INDENT_SIZE" value="2" /> <option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />

867
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -6,9 +6,9 @@ import se.scalablesolutions.akka.kernel.state.*;
public class InMemStateful { public class InMemStateful {
private TransactionalState factory = new TransactionalState(); private TransactionalState factory = new TransactionalState();
private TransactionalMap mapState = factory.newMap(new InMemoryMapConfig()); private TransactionalMap<String, String> mapState = factory.newInMemoryMap();
private TransactionalVector vectorState = factory.newVector(new InMemoryVectorConfig());; private TransactionalVector<String> vectorState = factory.newInMemoryVector();
private TransactionalRef refState = factory.newRef(new InMemoryRefConfig()); private TransactionalRef<String> refState = factory.newInMemoryRef();
@transactional @transactional
public String getMapState(String key) { public String getMapState(String key) {

View file

@ -17,7 +17,6 @@ public class InMemoryStateTest extends TestCase {
final private ActiveObjectFactory factory = new ActiveObjectFactory(); final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() { protected void setUp() {
conf.configureActiveObjects( conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000), new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{ new Component[]{

View file

@ -6,9 +6,9 @@ import se.scalablesolutions.akka.annotation.state;
public class PersistentStateful { public class PersistentStateful {
private TransactionalState factory = new TransactionalState(); private TransactionalState factory = new TransactionalState();
private TransactionalMap mapState = factory.newMap(new PersistentMapConfig(new CassandraStorageConfig())); private TransactionalMap mapState = factory.newPersistentMap(new CassandraStorageConfig());
private TransactionalVector vectorState = factory.newVector(new PersistentVectorConfig(new CassandraStorageConfig()));; private TransactionalVector vectorState = factory.newPersistentVector(new CassandraStorageConfig());;
private TransactionalRef refState = factory.newRef(new PersistentRefConfig(new CassandraStorageConfig())); private TransactionalRef refState = factory.newPersistentRef(new CassandraStorageConfig());
@transactional @transactional
public String getMapState(String key) { public String getMapState(String key) {

View file

@ -21,6 +21,7 @@ public class RemoteInMemoryStateTest extends TestCase {
server.connect(); server.connect();
} }
}).start(); }).start();
try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
} }
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory factory = new se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory(); final private se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory factory = new se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory();

View file

@ -12,7 +12,8 @@ import kernel.config.ActiveObjectGuiceConfigurator
import kernel.config.ScalaConfig._ import kernel.config.ScalaConfig._
import kernel.camel.{MessageDriven, ActiveObjectProducer} import kernel.camel.{MessageDriven, ActiveObjectProducer}
import kernel.nio.{RemoteRequest, NettyClient} import kernel.nio.{RemoteRequest, NettyClient}
import kernel.stm.{ChangeSet, Transaction} import kernel.stm.{TransactionManagement, TransactionAwareWrapperException, ChangeSet, Transaction}
import kernel.util.Helpers.ReadWriteLock import kernel.util.Helpers.ReadWriteLock
import kernel.util.{HashCode, ResultOrFailure} import kernel.util.{HashCode, ResultOrFailure}
import kernel.state.{Transactional, TransactionalMap, TransactionalRef, TransactionalVector} import kernel.state.{Transactional, TransactionalMap, TransactionalRef, TransactionalVector}
@ -27,7 +28,6 @@ import scala.collection.mutable.HashMap
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
class TransactionAwareException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause)
object Annotations { object Annotations {
import se.scalablesolutions.akka.annotation._ import se.scalablesolutions.akka.annotation._
@ -84,12 +84,6 @@ class ActiveObjectFactory {
object ActiveObject { object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka" val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]]
tl.set(None)
tl
}
def newInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), false) def newInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), false)
def newInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), false) def newInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), false)
@ -105,7 +99,7 @@ object ActiveObject {
val proxy = Proxy.newInstance(target, false, true) val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time // FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice( proxy.asInstanceOf[Advisable].aw_addAdvice(
"execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy, actor, remote)) "execution(* *.*(..))", new TransactionalAroundAdvice(target, proxy, actor, remote))
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
@ -113,7 +107,7 @@ object ActiveObject {
if (remote) NettyClient.connect if (remote) NettyClient.connect
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
proxy.asInstanceOf[Advisable].aw_addAdvice( proxy.asInstanceOf[Advisable].aw_addAdvice(
"execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target, actor, remote)) "execution(* *.*(..))", new TransactionalAroundAdvice(intf, target, actor, remote))
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
@ -130,60 +124,37 @@ object ActiveObject {
/** /**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
@serializable sealed class TransactionalAroundAdvice(
val target: Class[_], val targetInstance: AnyRef, actor: Actor, val isRemote: Boolean)
extends AroundAdvice with TransactionManagement {
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts val transactionalInstance = targetInstance
@serializable sealed class SequentialTransactionalAroundAdvice(
target: Class[_], targetInstance: AnyRef, actor: Actor, val isRemote: Boolean) extends AroundAdvice {
private val changeSet = new ChangeSet(target.getName)
private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
changeSet.refs = refs
changeSet.maps = maps
changeSet.vectors = vectors
import kernel.reactor._ import kernel.reactor._
private[this] var dispatcher = new ProxyMessageDispatcher private[this] var dispatcher = new ProxyMessageDispatcher
private[this] var mailbox = dispatcher.messageQueue private[this] var mailbox = dispatcher.messageQueue
dispatcher.start dispatcher.start
import ActiveObject.threadBoundTx def invoke(joinpoint: JoinPoint): AnyRef =
private[kernel] var activeTx: Option[Transaction] = None if (TransactionManagement.isTransactionsEnabled) transactionalDispatch(joinpoint)
else
dispatch(joinpoint)
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint private def dispatch(joinpoint: JoinPoint) = {
def invoke(joinpoint: JoinPoint): AnyRef = { if (isRemote) remoteDispatch(joinpoint)
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] else localDispatch(joinpoint)
val method = rtti.getMethod }
val isOneWay = rtti.getMethod.getReturnType == java.lang.Void.TYPE private def transactionalDispatch(joinpoint: JoinPoint) = {
// FIXME join TX with same id, do not COMMIT // FIXME join TX with same id, do not COMMIT
tryToCommitTransaction tryToCommitTransaction
if (isInExistingTransaction) { if (isInExistingTransaction) joinExistingTransaction
joinExistingTransaction else if (isTransactional(joinpoint)) startNewTransaction
} else { incrementTransaction
if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
}
try { try {
incrementTransaction dispatch(joinpoint)
if (isRemote) {
val future = NettyClient.send(
new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, activeTx, isOneWay, false))
if (isOneWay) null // for void methods
else {
future.await_?
val result = getResultOrThrowException(future)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
}
} else {
if (isOneWay) actor !! Invocation(joinpoint, activeTx) // FIXME investigate why ! causes TX to race
else {
val result = actor !! Invocation(joinpoint, activeTx)
if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
}
}
} catch { } catch {
case e: TransactionAwareException => case e: TransactionAwareWrapperException =>
rollback(e.tx) rollback(e.tx)
throw e.cause throw e.cause
} finally { } finally {
@ -193,136 +164,41 @@ object ActiveObject {
} }
} }
// TODO: create a method setCallee/setCaller to the joinpoint interface and compiler private def localDispatch(joinpoint: JoinPoint): AnyRef = {
private def nullOutTransientFieldsInJoinpoint(joinpoint: JoinPoint) = { val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val clazz = joinpoint.getClass if (isOneWay(rtti)) actor !! Invocation(joinpoint, activeTx) // FIXME investigate why ! causes TX to race
val callee = clazz.getDeclaredField("CALLEE") else {
callee.setAccessible(true) val result = actor !! Invocation(joinpoint, activeTx)
callee.set(joinpoint, null) if (result.isDefined) result.get
val caller = clazz.getDeclaredField("CALLER") else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
caller.setAccessible(true)
caller.set(joinpoint, null)
val interceptors = clazz.getDeclaredField("AROUND_INTERCEPTORS")
interceptors.setAccessible(true)
interceptors.set(joinpoint, null)
}
private def startNewTransaction = {
val newTx = new Transaction
newTx.begin(changeSet)
val tx = Some(newTx)
activeTx = tx
threadBoundTx.set(tx)
}
private def joinExistingTransaction = {
val cflowTx = threadBoundTx.get
if (!activeTx.isDefined && cflowTx.isDefined) {
val currentTx = cflowTx.get
currentTx.join(changeSet)
activeTx = Some(currentTx)
} }
} }
private def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet) private def remoteDispatch(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
private def tryToCommitTransaction: Boolean = if (activeTx.isDefined) { val oneWay = isOneWay(rtti)
val tx = activeTx.get val future = NettyClient.send(
tx.commit(changeSet) new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, activeTx, oneWay, false))
removeTransactionIfTopLevel if (oneWay) null // for void methods
true else {
} else false future.await
val result = getResultOrThrowException(future)
private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { if (result.isDefined) result.get
try { else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
result()
} catch {
case e =>
rollback(result.tx)
throw e
} }
} }
private def rollback(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do
case Some(tx) =>
tx.rollback(changeSet)
}
private def isInExistingTransaction = ActiveObject.threadBoundTx.get.isDefined
private def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
private def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
private def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
private def removeTransactionIfTopLevel =
if (activeTx.isDefined && activeTx.get.topLevel_?) {
activeTx = None
threadBoundTx.set(None)
}
private def reenteringExistingTransaction= if (activeTx.isDefined) {
val cflowTx = threadBoundTx.get
if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
else true
} else true
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future))
future
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] = private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) { if (future.exception.isDefined) {
val (_, cause) = future.exception.get val (_, cause) = future.exception.get
throw new TransactionAwareException(cause, activeTx) if (TransactionManagement.isTransactionsEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
} else { else throw cause
if (future.result.isDefined) { } else future.result.asInstanceOf[Option[T]]
val (res, tx) = future.result.get.asInstanceOf[Tuple2[AnyRef, Option[Transaction]]]
Some(res).asInstanceOf[Option[T]]
} else None
}
/** private def isTransactional(joinpoint: JoinPoint) =
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top. joinpoint.getRtti.asInstanceOf[MethodRtti].getMethod.isAnnotationPresent(Annotations.transactional)
*/
private def getTransactionalItemsFor(targetInstance: AnyRef):
Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
require(targetInstance != null)
var maps: List[TransactionalMap[_, _]] = Nil
var refs: List[TransactionalRef[_]] = Nil
var vectors: List[TransactionalVector[_]] = Nil
def getTransactionalItemsFor(target: Class[_]): private def isOneWay(rtti: MethodRtti) = rtti.getMethod.getReturnType == java.lang.Void.TYPE
Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]].foreach(println)
for {
field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]]
fieldType = field.getType
if (fieldType == classOf[TransactionalMap[_, _]]) ||
(fieldType == classOf[TransactionalVector[_]]) ||
(fieldType == classOf[TransactionalRef[_]])
txItem = {
field.setAccessible(true)
field.get(targetInstance)
}
if txItem != null
} {
if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
}
val parent = target.getSuperclass
if (parent == classOf[Object]) (maps, vectors, refs)
else getTransactionalItemsFor(parent)
}
// start the search for transactional items, crawl the class hierarchy up until we reach Object
getTransactionalItemsFor(targetInstance.getClass)
}
} }
/** /**
@ -357,6 +233,7 @@ object ActiveObject {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
private[kernel] class Dispatcher(val targetName: String) extends Actor { private[kernel] class Dispatcher(val targetName: String) extends Actor {
//makeTransactional
id = targetName id = targetName
// FIXME implement the pre/post restart methods and call annotated methods on the POJO // FIXME implement the pre/post restart methods and call annotated methods on the POJO
@ -366,12 +243,12 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
override def receive: PartialFunction[Any, Unit] = { override def receive: PartialFunction[Any, Unit] = {
case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) => case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
ActiveObject.threadBoundTx.set(tx) TransactionManagement.threadBoundTx.set(tx)
try { try {
reply(joinpoint.proceed) reply(joinpoint.proceed)
} catch { } catch {
case e => case e =>
throw new TransactionAwareException(e, tx) throw new TransactionAwareWrapperException(e, tx)
} }
case unexpected => case unexpected =>
@ -379,6 +256,22 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
} }
} }
// TODO: create a method setCallee/setCaller to the joinpoint interface and compiler
/*
private def nullOutTransientFieldsInJoinpoint(joinpoint: JoinPoint) = {
val clazz = joinpoint.getClass
val callee = clazz.getDeclaredField("CALLEE")
callee.setAccessible(true)
callee.set(joinpoint, null)
val caller = clazz.getDeclaredField("CALLER")
caller.setAccessible(true)
caller.set(joinpoint, null)
val interceptors = clazz.getDeclaredField("AROUND_INTERCEPTORS")
interceptors.setAccessible(true)
interceptors.set(joinpoint, null)
}
*/
/* /*
ublic class CamelInvocationHandler implements InvocationHandler { ublic class CamelInvocationHandler implements InvocationHandler {
private final Endpoint endpoint; private final Endpoint endpoint;

View file

@ -9,7 +9,7 @@ import java.util.concurrent.{CopyOnWriteArraySet, TimeUnit}
import kernel.reactor._ import kernel.reactor._
import kernel.config.ScalaConfig._ import kernel.config.ScalaConfig._
import kernel.nio.{NettyClient, RemoteRequest} import kernel.nio.{NettyClient, RemoteRequest}
import kernel.stm.Transaction import kernel.stm.{TransactionAwareWrapperException, TransactionManagement, Transaction}
import kernel.util.Logging import kernel.util.Logging
import kernel.util.Helpers._ import kernel.util.Helpers._
@ -29,13 +29,13 @@ object DispatcherType {
} }
class ActorMessageHandler(val actor: Actor) extends MessageHandler { class ActorMessageHandler(val actor: Actor) extends MessageHandler {
def handle(handle: MessageHandle) = actor.handle(handle.message, handle.future) def handle(handle: MessageHandle) = actor.handle(handle)
} }
trait Actor extends Logging { trait Actor extends Logging with TransactionManagement {
var timeout: Long = 5000L val transactionalInstance = this
var isRemote = false @volatile private[this] var isRemote = false
@volatile private[this] var isTransactional = false
@volatile private[this] var isRunning: Boolean = false @volatile private[this] var isRunning: Boolean = false
protected[this] var id: String = super.toString protected[this] var id: String = super.toString
protected[this] var dispatcher: MessageDispatcher = _ protected[this] var dispatcher: MessageDispatcher = _
@ -118,31 +118,62 @@ trait Actor extends Logging {
// ==== API ==== // ==== API ====
// ============= // =============
/**
* TODO: document
*/
@volatile var timeout: Long = 5000L
/**
* TODO: document
*/
def !(message: AnyRef) = if (isRunning) { def !(message: AnyRef) = if (isRunning) {
if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false)) if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false))
else mailbox.append(new MessageHandle(this, message, new NullFutureResult)) else mailbox.append(new MessageHandle(this, message, new NullFutureResult, activeTx))
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* TODO: document
*/
def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) { def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) if (TransactionManagement.isTransactionsEnabled) {
future.await_? transactionalDispatch(message, timeout, false)
getResultOrThrowException(future) } else {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
future.await
getResultOrThrowException(future)
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* TODO: document
*/
def !![T](message: AnyRef): Option[T] = !![T](message, timeout) def !![T](message: AnyRef): Option[T] = !![T](message, timeout)
/**
* TODO: document
*/
def !?[T](message: AnyRef): T = if (isRunning) { def !?[T](message: AnyRef): T = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0) if (TransactionManagement.isTransactionsEnabled) {
future.await_! transactionalDispatch(message, 0, true).get
getResultOrThrowException(future).get } else {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
future.awaitBlocking
getResultOrThrowException(future).get
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* TODO: document
*/
protected[this] def reply(message: AnyRef) = senderFuture match { protected[this] def reply(message: AnyRef) = senderFuture match {
case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." ) case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." )
case Some(future) => future.completeWithResult(message) case Some(future) => future.completeWithResult(message)
} }
// FIXME can be deadlock prone if cyclic linking? - HOWTO? // FIXME can be deadlock prone if cyclic linking? - HOWTO?
/**
* TODO: document
*/
protected[this] def link(actor: Actor) = synchronized { actor.synchronized { protected[this] def link(actor: Actor) = synchronized { actor.synchronized {
if (isRunning) { if (isRunning) {
linkedActors.add(actor) linkedActors.add(actor)
@ -153,6 +184,9 @@ trait Actor extends Logging {
}} }}
// FIXME can be deadlock prone if cyclic linking? - HOWTO? // FIXME can be deadlock prone if cyclic linking? - HOWTO?
/**
* TODO: document
*/
protected[this] def unlink(actor: Actor) = synchronized { actor.synchronized { protected[this] def unlink(actor: Actor) = synchronized { actor.synchronized {
if (isRunning) { if (isRunning) {
if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink") if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
@ -162,6 +196,9 @@ trait Actor extends Logging {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}} }}
/**
* TODO: document
*/
def start = synchronized { def start = synchronized {
if (!isRunning) { if (!isRunning) {
dispatcherType match { dispatcherType match {
@ -181,6 +218,9 @@ trait Actor extends Logging {
} }
} }
/**
* TODO: document
*/
def stop = synchronized { def stop = synchronized {
if (isRunning) { if (isRunning) {
dispatcher.unregisterHandler(this) dispatcher.unregisterHandler(this)
@ -205,30 +245,57 @@ trait Actor extends Logging {
link(actor) link(actor)
} }
/**
* TODO: document
*/
def spawn(actorClass: Class[_]) = { def spawn(actorClass: Class[_]) = {
// FIXME: should pass in dispatcher etc. - inherit // FIXME: should pass in dispatcher etc. - inherit
} }
/**
* TODO: document
*/
def spawnRemote(actorClass: Class[_]) = { def spawnRemote(actorClass: Class[_]) = {
} }
/**
* TODO: document
*/
def spawnLink(actorClass: Class[_]) = { def spawnLink(actorClass: Class[_]) = {
} }
/**
* TODO: document
*/
def spawnLinkRemote(actorClass: Class[_]) = { def spawnLinkRemote(actorClass: Class[_]) = {
} }
/**
* TODO: document
*/
def makeRemote = isRemote = true def makeRemote = isRemote = true
/**
* TODO: document
*/
def makeTransactional = synchronized {
if (isRunning) throw new IllegalArgumentException("Can not make actor transactional after it has been started")
else isTransactional = true
}
// ================================ // ================================
// ==== IMPLEMENTATION DETAILS ==== // ==== IMPLEMENTATION DETAILS ====
// ================================ // ================================
private[kernel] def handle(message: AnyRef, future: CompletableFutureResult) = synchronized { private[kernel] def handle(messageHandle: MessageHandle) = synchronized {
val message = messageHandle.message
val future = messageHandle.future
try { try {
if (messageHandle.tx.isDefined)
TransactionManagement.threadBoundTx.set(messageHandle.tx)
senderFuture = Some(future) senderFuture = Some(future)
if (base.isDefinedAt(message)) base(message) if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]") else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]")
} catch { } catch {
case e => case e =>
@ -237,21 +304,49 @@ trait Actor extends Logging {
} }
} }
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = { private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult =
if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false)) if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false))
else { else {
val future = new DefaultCompletableFutureResult(timeout) val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future)) mailbox.append(new MessageHandle(this, message, future, TransactionManagement.threadBoundTx.get))
future future
} }
private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean): Option[T] = {
// FIXME join TX with same id, do not COMMIT
println("------ Actor1: " + this)
tryToCommitTransaction
println("------ Actor2: " + this)
if (isInExistingTransaction) joinExistingTransaction
else if (isTransactional) startNewTransaction
println("------ Actor3: " + this)
incrementTransaction
try {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
if (blocking) future.awaitBlocking
else future.await
getResultOrThrowException(future)
} catch {
case e: TransactionAwareWrapperException =>
e.cause.printStackTrace
rollback(e.tx)
throw e.cause
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
else tryToPrecommitTransaction
TransactionManagement.threadBoundTx.set(None)
}
} }
private def getResultOrThrowException[T](future: FutureResult): Option[T] = private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (isRemote) getRemoteResultOrThrowException(future) if (isRemote) getRemoteResultOrThrowException(future)
else { else {
if (future.exception.isDefined) { if (future.exception.isDefined) {
val (_, cause) = future.exception.get val (_, cause) = future.exception.get
throw cause if (TransactionManagement.isTransactionsEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
else throw cause
} else { } else {
future.result.asInstanceOf[Option[T]] future.result.asInstanceOf[Option[T]]
} }
@ -262,7 +357,8 @@ trait Actor extends Logging {
private def getRemoteResultOrThrowException[T](future: FutureResult): Option[T] = private def getRemoteResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) { if (future.exception.isDefined) {
val (_, cause) = future.exception.get val (_, cause) = future.exception.get
throw cause // throw new TransactionAwareException(cause, activeTx) if (TransactionManagement.isTransactionsEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
else throw cause
} else { } else {
if (future.result.isDefined) { if (future.result.isDefined) {
val (res, tx) = future.result.get.asInstanceOf[Tuple2[Option[T], Option[Transaction]]] val (res, tx) = future.result.get.asInstanceOf[Tuple2[Option[T], Option[Transaction]]]

View file

@ -92,6 +92,7 @@ abstract class SupervisorFactory extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging { class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging {
makeTransactional
trapExit = true trapExit = true
faultHandler = Some(handler) faultHandler = Some(handler)

View file

@ -116,9 +116,10 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
val result = event.getMessage val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) { if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply] val reply = result.asInstanceOf[RemoteReply]
val tx = reply.tx // val tx = reply.tx
val future = futures.get(reply.id) val future = futures.get(reply.id)
if (reply.successful) future.completeWithResult((reply.message, tx)) //if (reply.successful) future.completeWithResult((reply.message, tx))
if (reply.successful) future.completeWithResult(reply.message)
else future.completeWithException(null, reply.exception) else future.completeWithException(null, reply.exception)
} else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result) } else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result)
} catch { } catch {

View file

@ -8,7 +8,8 @@ import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors} import java.util.concurrent.{ConcurrentHashMap, Executors}
import kernel.actor._ import kernel.actor._
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} import kernel.reactor.{MessageHandle, DefaultCompletableFutureResult, CompletableFutureResult}
import kernel.stm.TransactionManagement
import kernel.util.Logging import kernel.util.Logging
import java.util.ArrayList import java.util.ArrayList
import java.util.List import java.util.List
@ -66,7 +67,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
private val actors = new ConcurrentHashMap[String, Actor] private val actors = new ConcurrentHashMap[String, Actor]
private val MESSAGE_HANDLE = classOf[Actor].getDeclaredMethod( private val MESSAGE_HANDLE = classOf[Actor].getDeclaredMethod(
"handle", Array[Class[_]](classOf[AnyRef], classOf[CompletableFutureResult])) "handle", Array[Class[_]](classOf[MessageHandle]))
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
@ -118,7 +119,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val resultOrNone = actor !! request.message val resultOrNone = actor !! request.message
val result = if (resultOrNone.isDefined) resultOrNone else null val result = if (resultOrNone.isDefined) resultOrNone else null
log.debug("Returning result from actor invocation [%s]", result) log.debug("Returning result from actor invocation [%s]", result)
channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get)) //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
channel.write(request.newReplyWithMessage(result, null))
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
log.error("Could not invoke remote actor [%s] due to: %s", request.target, e.getCause) log.error("Could not invoke remote actor [%s] due to: %s", request.target, e.getCause)
@ -143,7 +145,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
else { else {
val result = messageReceiver.invoke(activeObject, unescapedArgs) val result = messageReceiver.invoke(activeObject, unescapedArgs)
log.debug("Returning result from active object invocation [%s]", result) log.debug("Returning result from active object invocation [%s]", result)
channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get)) //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
channel.write(request.newReplyWithMessage(result, null))
} }
} catch { } catch {
case e: InvocationTargetException => case e: InvocationTargetException =>
@ -157,8 +160,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val tx = request.tx val tx = request.tx
if (tx.isDefined) { if (tx.isDefined) {
tx.get.reinit tx.get.reinit
ActiveObject.threadBoundTx.set(tx) TransactionManagement.threadBoundTx.set(tx)
} else ActiveObject.threadBoundTx.set(None) } else TransactionManagement.threadBoundTx.set(None)
} }
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = { private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = {

View file

@ -13,8 +13,8 @@ import java.util.concurrent.TimeUnit
class FutureTimeoutException(message: String) extends RuntimeException(message) class FutureTimeoutException(message: String) extends RuntimeException(message)
sealed trait FutureResult { sealed trait FutureResult {
def await_? def await
def await_! def awaitBlocking
def isCompleted: Boolean def isCompleted: Boolean
def isExpired: Boolean def isExpired: Boolean
def timeoutInNanos: Long def timeoutInNanos: Long
@ -39,7 +39,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
private var _result: Option[AnyRef] = None private var _result: Option[AnyRef] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
def await_? = try { def await = try {
_lock.lock _lock.lock
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
while (!_completed && wait > 0) { while (!_completed && wait > 0) {
@ -56,7 +56,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock _lock.unlock
} }
def await_! = try { def awaitBlocking = try {
_lock.lock _lock.lock
while (!_completed) { while (!_completed) {
_signal.await _signal.await
@ -121,8 +121,8 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
class NullFutureResult extends CompletableFutureResult { class NullFutureResult extends CompletableFutureResult {
def completeWithResult(result: AnyRef) = {} def completeWithResult(result: AnyRef) = {}
def completeWithException(toBlame: AnyRef, exception: Throwable) = {} def completeWithException(toBlame: AnyRef, exception: Throwable) = {}
def await_? = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def await = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def await_! = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def awaitBlocking = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult")

View file

@ -13,6 +13,7 @@ package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.atomic.AtomicInteger import java.util.concurrent.atomic.AtomicInteger
import java.util.concurrent.ThreadFactory import java.util.concurrent.ThreadFactory
import java.util.{LinkedList, Queue} import java.util.{LinkedList, Queue}
import kernel.stm.Transaction
import kernel.util.{Logging, HashCode} import kernel.util.{Logging, HashCode}
trait MessageHandler { trait MessageHandler {
def handle(message: MessageHandle) def handle(message: MessageHandle)
@ -33,13 +34,17 @@ trait MessageDemultiplexer {
def wakeUp def wakeUp
} }
class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: CompletableFutureResult) { class MessageHandle(val sender: AnyRef,
val message: AnyRef,
val future: CompletableFutureResult,
val tx: Option[Transaction]) {
override def hashCode(): Int = { override def hashCode(): Int = {
var result = HashCode.SEED var result = HashCode.SEED
result = HashCode.hash(result, sender) result = HashCode.hash(result, sender)
result = HashCode.hash(result, message) result = HashCode.hash(result, message)
result = HashCode.hash(result, future) result = HashCode.hash(result, future)
result = if (tx.isDefined) HashCode.hash(result, tx.get.id) else result
result result
} }
@ -48,7 +53,9 @@ class MessageHandle(val sender: AnyRef, val message: AnyRef, val future: Complet
that.isInstanceOf[MessageHandle] && that.isInstanceOf[MessageHandle] &&
that.asInstanceOf[MessageHandle].sender == sender && that.asInstanceOf[MessageHandle].sender == sender &&
that.asInstanceOf[MessageHandle].message == message && that.asInstanceOf[MessageHandle].message == message &&
that.asInstanceOf[MessageHandle].future == future that.asInstanceOf[MessageHandle].future == future &&
that.asInstanceOf[MessageHandle].tx.isDefined == tx.isDefined &&
that.asInstanceOf[MessageHandle].tx.get.id == tx.get.id
} }
class MessageQueue { class MessageQueue {

View file

@ -4,6 +4,7 @@
package se.scalablesolutions.akka.kernel.state package se.scalablesolutions.akka.kernel.state
import kernel.stm.TransactionManagement
import org.codehaus.aspectwerkz.proxy.Uuid import org.codehaus.aspectwerkz.proxy.Uuid
import kernel.actor.ActiveObject import kernel.actor.ActiveObject
@ -12,71 +13,54 @@ import se.scalablesolutions.akka.collection._
import scala.collection.mutable.HashMap import scala.collection.mutable.HashMap
sealed abstract class TransactionalStateConfig sealed abstract class TransactionalStateConfig
abstract class TransactionalMapConfig extends TransactionalStateConfig
abstract class TransactionalVectorConfig extends TransactionalStateConfig
abstract class TransactionalRefConfig extends TransactionalStateConfig
abstract class PersistentStorageConfig extends TransactionalStateConfig abstract class PersistentStorageConfig extends TransactionalStateConfig
case class CassandraStorageConfig extends PersistentStorageConfig case class CassandraStorageConfig extends PersistentStorageConfig
case class TerracottaStorageConfig extends PersistentStorageConfig case class TerracottaStorageConfig extends PersistentStorageConfig
case class TokyoCabinetStorageConfig extends PersistentStorageConfig case class TokyoCabinetStorageConfig extends PersistentStorageConfig
case class PersistentMapConfig(storage: PersistentStorageConfig) extends TransactionalMapConfig /**
case class InMemoryMapConfig extends TransactionalMapConfig * Scala API.
* <p/>
case class PersistentVectorConfig(storage: PersistentStorageConfig) extends TransactionalVectorConfig * Example Scala usage:
case class InMemoryVectorConfig extends TransactionalVectorConfig * <pre>
* val myMap = TransactionalState.newPersistentMap(CassandraStorageConfig)
case class PersistentRefConfig(storage: PersistentStorageConfig) extends TransactionalRefConfig * </pre>
case class InMemoryRefConfig extends TransactionalRefConfig */
object TransactionalState extends TransactionalState object TransactionalState extends TransactionalState
/**
* Java API.
* <p/>
* Example Java usage:
* <pre>
* TransactionalState state = new TransactionalState();
* TransactionalMap myMap = state.newPersistentMap(new CassandraStorageConfig());
* </pre>
*/
class TransactionalState { class TransactionalState {
def newPersistentMap(config: PersistentStorageConfig): TransactionalMap[String, AnyRef] = config match {
/** case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
* Usage: case TerracottaStorageConfig() => throw new UnsupportedOperationException
* <pre> case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
* val myMap = TransactionalState.newMap(PersistentMapConfig(CassandraStorageConfig))
* </pre>
*/
def newMap(config: TransactionalMapConfig) = config match {
case PersistentMapConfig(storage) => storage match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalMap
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
case InMemoryMapConfig() => new InMemoryTransactionalMap
} }
/** def newPersistentVector(config: PersistentStorageConfig): TransactionalVector[AnyRef] = config match {
* Usage: case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
* <pre> case TerracottaStorageConfig() => throw new UnsupportedOperationException
* val myVector = TransactionalState.newVector(PersistentVectorConfig(CassandraStorageConfig)) case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
* </pre>
*/
def newVector(config: TransactionalVectorConfig) = config match {
case PersistentVectorConfig(storage) => storage match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalVector
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
case InMemoryVectorConfig() => new InMemoryTransactionalVector
} }
/** def newPersistentRef(config: PersistentStorageConfig): TransactionalRef[AnyRef] = config match {
* Usage: case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
* <pre> case TerracottaStorageConfig() => throw new UnsupportedOperationException
* val myRef = TransactionalState.newRef(PersistentRefConfig(CassandraStorageConfig)) case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
* </pre>
*/
def newRef(config: TransactionalRefConfig) = config match {
case PersistentRefConfig(storage) => storage match {
case CassandraStorageConfig() => new CassandraPersistentTransactionalRef
case TerracottaStorageConfig() => throw new UnsupportedOperationException
case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException
}
case InMemoryRefConfig() => new TransactionalRef
} }
def newInMemoryMap[K, V]: TransactionalMap[K, V] = new InMemoryTransactionalMap[K, V]
def newInMemoryVector[T]: TransactionalVector[T] = new InMemoryTransactionalVector[T]
def newInMemoryRef[T]: TransactionalRef[T] = new TransactionalRef[T]
} }
/** /**
@ -90,15 +74,15 @@ trait Transactional {
private[kernel] def commit private[kernel] def commit
private[kernel] def rollback private[kernel] def rollback
protected def isInTransaction = ActiveObject.threadBoundTx.get.isDefined protected def isInTransaction = TransactionManagement.threadBoundTx.get.isDefined
protected def nonTransactionalCall = throw new IllegalStateException("Can't access transactional map outside the scope of a transaction") protected def nonTransactionalCall = throw new IllegalStateException("Can't access transactional map outside the scope of a transaction")
} }
/** /**
* Base trait for all state implementations (persistent or in-memory). * Base trait for all state implementations (persistent or in-memory).
* *
* TODO: Make this class inherit scala.collection.mutable.Map and/or java.util.Map * TODO: Make this class inherit scala.collection.mutable.Map and/or java.util.Map
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] { trait TransactionalMap[K, V] extends Transactional with scala.collection.mutable.Map[K, V] {
@ -118,7 +102,7 @@ trait TransactionalMapGuard[K, V] extends TransactionalMap[K, V] with Transactio
abstract override def remove(key: K) = abstract override def remove(key: K) =
if (isInTransaction) super.remove(key) if (isInTransaction) super.remove(key)
else nonTransactionalCall else nonTransactionalCall
abstract override def elements: Iterator[(K, V)] = abstract override def elements: Iterator[(K, V)] =
if (isInTransaction) super.elements if (isInTransaction) super.elements
else nonTransactionalCall else nonTransactionalCall
abstract override def get(key: K): Option[V] = abstract override def get(key: K): Option[V] =
@ -149,7 +133,7 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
override def begin = snapshot = state override def begin = snapshot = state
override def commit = snapshot = state override def commit = snapshot = state
override def rollback = state = snapshot override def rollback = state = snapshot
// ---- Overriding scala.collection.mutable.Map behavior ---- // ---- Overriding scala.collection.mutable.Map behavior ----
override def contains(key: K): Boolean = state.contains(key) override def contains(key: K): Boolean = state.contains(key)
override def clear = state = new HashTrie[K, V] override def clear = state = new HashTrie[K, V]
@ -171,16 +155,16 @@ class InMemoryTransactionalMap[K, V] extends TransactionalMap[K, V] {
/** /**
* Base class for all persistent transactional map implementations should extend. * Base class for all persistent transactional map implementations should extend.
* Implements a Unit of Work, records changes into a change set. * Implements a Unit of Work, records changes into a change set.
* *
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] { abstract class PersistentTransactionalMap[K, V] extends TransactionalMap[K, V] {
// FIXME: need to handle remove in another changeSet // FIXME: need to handle remove in another changeSet
protected[kernel] val changeSet = new HashMap[K, V] protected[kernel] val changeSet = new HashMap[K, V]
def getRange(start: Int, count: Int) def getRange(start: Int, count: Int)
// ---- For Transactional ---- // ---- For Transactional ----
@ -249,9 +233,9 @@ abstract class TransactionalVector[T] extends Transactional with RandomAccessSeq
/** /**
* Implements an in-memory transactional vector. * Implements an in-memory transactional vector.
* *
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class InMemoryTransactionalVector[T] extends TransactionalVector[T] { class InMemoryTransactionalVector[T] extends TransactionalVector[T] {
@ -308,20 +292,27 @@ class CassandraPersistentTransactionalVector extends PersistentTransactionalVect
override def length: Int = CassandraNode.getVectorStorageSizeFor(uuid) override def length: Int = CassandraNode.getVectorStorageSizeFor(uuid)
override def apply(index: Int): AnyRef = get(index) override def apply(index: Int): AnyRef = get(index)
override def first: AnyRef = get(0) override def first: AnyRef = get(0)
override def last: AnyRef = get(length) override def last: AnyRef = {
val l = length
if (l == 0) throw new NoSuchElementException("Vector is empty")
get(length - 1)
}
// ---- For Transactional ---- // ---- For Transactional ----
override def commit = { override def commit = {
// FIXME: should use batch function once the bug is resolved // FIXME: should use batch function once the bug is resolved
for (element <- changeSet) CassandraNode.insertVectorStorageEntryFor(uuid, element) for (element <- changeSet) {
CassandraNode.insertVectorStorageEntryFor(uuid, element)
println("33333333333 " + CassandraNode.getVectorStorageSizeFor(uuid))
}
} }
} }
/** /**
* Implements a transactional reference. * Implements a transactional reference.
* *
* Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time. * Not thread-safe, but should only be using from within an Actor, e.g. one single thread at a time.
* *
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class TransactionalRef[T] extends Transactional { class TransactionalRef[T] extends Transactional {

View file

@ -0,0 +1,153 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.stm
import java.lang.reflect.Field
import java.util.concurrent.atomic.AtomicBoolean
import kernel.state.{TransactionalMap, TransactionalRef, TransactionalVector}
import kernel.util.Logging
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
}
object TransactionManagement {
private val txEnabled = new AtomicBoolean(true)
def isTransactionsEnabled = txEnabled.get
def enableTransactions = txEnabled.set(true)
private[kernel] lazy val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]]
tl.set(None)
tl
}
}
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
trait TransactionManagement extends Logging {
val transactionalInstance: AnyRef
private lazy val changeSet = new ChangeSet(transactionalInstance.getClass.getName)
import TransactionManagement.threadBoundTx
private[kernel] var activeTx: Option[Transaction] = None
protected def startNewTransaction = {
val (maps, vectors, refs) = getTransactionalItemsFor(transactionalInstance)
changeSet.maps = maps
changeSet.refs = refs
changeSet.vectors = vectors
val newTx = new Transaction
newTx.begin(changeSet)
val tx = Some(newTx)
activeTx = tx
threadBoundTx.set(tx)
}
protected def joinExistingTransaction = {
val cflowTx = threadBoundTx.get
if (activeTx.isDefined && cflowTx.isDefined && activeTx.get.id == cflowTx.get.id) {
val currentTx = cflowTx.get
currentTx.join(changeSet)
activeTx = Some(currentTx)
}
}
protected def tryToPrecommitTransaction = if (activeTx.isDefined) activeTx.get.precommit(changeSet)
protected def tryToCommitTransaction: Boolean = if (activeTx.isDefined) {
val tx = activeTx.get
tx.commit(changeSet)
removeTransactionIfTopLevel
true
} else false
protected def rollback(tx: Option[Transaction]) = tx match {
case None => {} // no tx; nothing to do
case Some(tx) =>
tx.rollback(changeSet)
}
protected def isInExistingTransaction = {
println(TransactionManagement)
println(TransactionManagement.threadBoundTx)
println(TransactionManagement.threadBoundTx.get)
println(TransactionManagement.threadBoundTx.get.isDefined)
TransactionManagement.threadBoundTx.get.isDefined
}
protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
protected def incrementTransaction = if (activeTx.isDefined) activeTx.get.increment
protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
protected def removeTransactionIfTopLevel =
if (activeTx.isDefined && activeTx.get.topLevel_?) {
activeTx = None
threadBoundTx.set(None)
}
protected def reenteringExistingTransaction= if (activeTx.isDefined) {
val cflowTx = threadBoundTx.get
if (cflowTx.isDefined && cflowTx.get.id == activeTx.get.id) false
else true
} else true
/**
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
*/
protected def getTransactionalItemsFor(targetInstance: AnyRef):
Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
require(targetInstance != null)
var maps: List[TransactionalMap[_, _]] = Nil
var refs: List[TransactionalRef[_]] = Nil
var vectors: List[TransactionalVector[_]] = Nil
def getTransactionalItemsFor(target: Class[_]):
Tuple3[List[TransactionalMap[_, _]], List[TransactionalVector[_]], List[TransactionalRef[_]]] = {
for {
field <- target.getDeclaredFields.toArray.toList.asInstanceOf[List[Field]]
fieldType = field.getType
if (fieldType == classOf[TransactionalMap[_, _]]) ||
(fieldType == classOf[TransactionalVector[_]]) ||
(fieldType == classOf[TransactionalRef[_]])
txItem = {
field.setAccessible(true)
field.get(targetInstance)
}
if txItem != null
} {
log.debug("Managing transactional state [%s]", field)
if (txItem.isInstanceOf[TransactionalMap[_, _]]) maps ::= txItem.asInstanceOf[TransactionalMap[_, _]]
else if (txItem.isInstanceOf[TransactionalRef[_]]) refs ::= txItem.asInstanceOf[TransactionalRef[_]]
else if (txItem.isInstanceOf[TransactionalVector[_]]) vectors ::= txItem.asInstanceOf[TransactionalVector[_]]
}
val parent = target.getSuperclass
if (parent == classOf[Object]) (maps, vectors, refs)
else getTransactionalItemsFor(parent)
}
// start the search for transactional items, crawl the class hierarchy up until we reach Object
getTransactionalItemsFor(targetInstance.getClass)
}
/*
protected def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw new TransactionAwareWrapperException(cause, activeTx)
} else {
if (future.result.isDefined) {
val (res, tx) = future.result.get.asInstanceOf[Tuple2[AnyRef, Option[Transaction]]]
Some(res).asInstanceOf[Option[T]]
} else None
}
*/
}

View file

@ -60,7 +60,7 @@ class EventBasedDispatcherTest {
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
@ -74,8 +74,8 @@ class EventBasedDispatcherTest {
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start dispatcher.start
dispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key1, new Object, new NullFutureResult, None))
dispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key2, new Object, new NullFutureResult, None))
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
} }
@ -109,8 +109,8 @@ class EventBasedDispatcherTest {
}) })
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult, None))
dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)

View file

@ -0,0 +1,136 @@
package se.scalablesolutions.akka.kernel.actor
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kernel.state.TransactionalState
import kernel.reactor._
import org.junit.{Test, Before}
import org.junit.Assert._
case class SetMapState(key: String, value: String)
case class SetVectorState(key: String)
case class SetRefState(key: String)
case class GetMapState(key: String)
case object GetVectorState
case object GetRefState
case class Success(key: String, value: String)
case class Failure(key: String, value: String, failer: Actor)
class InMemStatefulActor extends Actor {
makeTransactional
private val mapState = TransactionalState.newInMemoryMap[String, String]
private val vectorState = TransactionalState.newInMemoryVector[String]
private val refState = TransactionalState.newInMemoryRef[String]
def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) =>
reply(mapState.get(key).get)
case GetVectorState =>
reply(vectorState.last)
case GetRefState =>
reply(refState.get.get)
case SetMapState(key, msg) =>
mapState.put(key, msg)
reply(msg)
case SetVectorState(msg) =>
vectorState.add(msg)
reply(msg)
case SetRefState(msg) =>
refState.swap(msg)
reply(msg)
case Success(key, msg) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
reply(msg)
case Failure(key, msg, failer) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
failer !! "Failure"
reply(msg)
}
}
class InMemFailerActor extends Actor {
makeTransactional
def receive: PartialFunction[Any, Unit] = {
case "Failure" =>
throw new RuntimeException("expected")
}
}
class InMemoryActorSpec {
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@Test
def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
val failer = new InMemFailerActor
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
@Test
def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetVectorState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // to trigger commit
assertEquals("new state", (stateful !! GetVectorState).get)
}
@Test
def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetVectorState("init") // set init state
val failer = new InMemFailerActor
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
}
@Test
def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetRefState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // to trigger commit
assertEquals("new state", (stateful !! GetRefState).get)
}
@Test
def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new InMemStatefulActor
stateful.start
stateful !! SetRefState("init") // set init state
val failer = new InMemFailerActor
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
}

View file

@ -0,0 +1,135 @@
package se.scalablesolutions.akka.kernel.actor
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kernel.reactor._
import kernel.state.{CassandraStorageConfig, TransactionalState}
import org.junit.{Test, Before}
import org.junit.Assert._
class PersistentActor extends Actor {
makeTransactional
private val mapState = TransactionalState.newPersistentMap(CassandraStorageConfig())
private val vectorState = TransactionalState.newPersistentVector(CassandraStorageConfig())
private val refState = TransactionalState.newPersistentRef(CassandraStorageConfig())
def receive: PartialFunction[Any, Unit] = {
case GetMapState(key) =>
reply(mapState.get(key).get)
case GetVectorState =>
reply(vectorState.last)
case GetRefState =>
reply(refState.get.get)
case SetMapState(key, msg) =>
mapState.put(key, msg)
reply(msg)
case SetVectorState(msg) =>
vectorState.add(msg)
reply(msg)
case SetRefState(msg) =>
refState.swap(msg)
reply(msg)
case Success(key, msg) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
reply(msg)
case Failure(key, msg, failer) =>
mapState.put(key, msg)
vectorState.add(msg)
refState.swap(msg)
failer !! "Failure"
reply(msg)
}
}
class PersistentFailerActor extends Actor {
makeTransactional
def receive: PartialFunction[Any, Unit] = {
case "Failure" =>
throw new RuntimeException("expected")
}
}
object PersistenceManager {
@volatile var isRunning = false
def init = if (!isRunning) {
System.setProperty("storage-config", "config")
Kernel.startCassandra
isRunning = true
}
}
class PersistentActorSpec {
PersistenceManager.init
@Test
def testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new PersistentActor
stateful.start
stateful !! SetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
assertEquals("new state", (stateful !! GetMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")).get)
}
@Test
def testMapShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new PersistentActor
stateful.start
stateful !! SetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init") // set init state
val failer = new PersistentFailerActor
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")).get) // check that state is == init state
}
@Test
def testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new PersistentActor
stateful.start
stateful !! SetVectorState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
assertEquals("new state", (stateful !! GetVectorState).get)
}
@Test
def testVectorShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new PersistentActor
stateful.start
stateful !! SetVectorState("init") // set init state
val failer = new PersistentFailerActor
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetVectorState).get) // check that state is == init state
}
@Test
def testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess = {
val stateful = new PersistentActor
stateful.start
stateful !! SetRefState("init") // set init state
stateful !! Success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactional
assertEquals("new state", (stateful !! GetRefState).get)
}
@Test
def testRefShouldRollbackStateForStatefulServerInCaseOfFailure = {
val stateful = new PersistentActor
stateful.start
stateful !! SetRefState("init") // set init state
val failer = new PersistentFailerActor
failer.start
try {
stateful !! Failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer) // call failing transactional method
fail("should have thrown an exception")
} catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
}
}

View file

@ -36,6 +36,7 @@ class RemoteActorSpec {
server.connect server.connect
} }
}).start }).start
Thread.sleep(1000)
NettyClient.connect NettyClient.connect
private val unit = TimeUnit.MILLISECONDS private val unit = TimeUnit.MILLISECONDS
@ -74,11 +75,11 @@ class RemoteActorSpec {
actor.stop actor.stop
} }
@Test @Test
def sendReceiveException = { def sendReceiveException = {
implicit val timeout = 5000L implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional val actor = new RemoteActorSpecActorBidirectional
actor.isRemote = true actor.makeRemote
actor.start actor.start
try { try {
actor !! "Failure" actor !! "Failure"

View file

@ -62,7 +62,7 @@ class ThreadBasedDispatcherTest {
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
@ -86,8 +86,8 @@ class ThreadBasedDispatcherTest {
} }
}) })
dispatcher.start dispatcher.start
dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", new NullFutureResult, None))
dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", new NullFutureResult, None))
handlersBarrier.await(5, TimeUnit.SECONDS) handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
//dispatcher.shutdown //dispatcher.shutdown
@ -122,8 +122,8 @@ class ThreadBasedDispatcherTest {
}) })
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult, None))
dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)