added remote active objects configuration + remote tx semantics

This commit is contained in:
Jonas Boner 2009-06-25 13:07:58 +02:00
parent 47abc143a4
commit 10a0c16cb2
22 changed files with 1080 additions and 354 deletions

View file

@ -171,7 +171,8 @@
<component name="IdProvider" IDEtalkID="0E3A0445954D4D390C337AFB20B2746C" /> <component name="IdProvider" IDEtalkID="0E3A0445954D4D390C337AFB20B2746C" />
<component name="InspectionProjectProfileManager"> <component name="InspectionProjectProfileManager">
<option name="PROJECT_PROFILE" value="Project Default" /> <option name="PROJECT_PROFILE" value="Project Default" />
<version value="1.0" /> <option name="USE_PROJECT_LEVEL_SETTINGS" value="false" />
<scopes />
<profiles> <profiles>
<profile version="1.0" is_locked="false"> <profile version="1.0" is_locked="false">
<option name="myName" value="Project Default" /> <option name="myName" value="Project Default" />

1029
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -1,9 +1,7 @@
package se.scalablesolutions.akka.api; package se.scalablesolutions.akka.api;
import java.io.Serializable;
public class InMemFailer { public class InMemFailer {
public void fail() { public int fail() {
throw new RuntimeException("expected"); throw new RuntimeException("expected");
} }
} }

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.kernel.config.*; import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.actor.*;
import junit.framework.TestCase; import junit.framework.TestCase;
@ -13,6 +14,7 @@ public class InMemoryStateTest extends TestCase {
static String messageLog = ""; static String messageLog = "";
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() { protected void setUp() {
conf.configureActiveObjects( conf.configureActiveObjects(
@ -30,16 +32,16 @@ public class InMemoryStateTest extends TestCase {
} }
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() { public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class); InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); // conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess")); assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
} }
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() { public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class); InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); // conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class); InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class);
try { try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception"); fail("should have thrown an exception");

View file

@ -2,6 +2,5 @@ package se.scalablesolutions.akka.api;
public class NettyClient { public class NettyClient {
public static void main(String[] args) { public static void main(String[] args) {
new se.scalablesolutions.akka.kernel.nio.NettyClient();
} }
} }

View file

@ -1,7 +1,7 @@
package se.scalablesolutions.akka.api; package se.scalablesolutions.akka.api;
public class PersistentFailer { public class PersistentFailer {
public void fail() { public int fail() {
throw new RuntimeException("expected"); throw new RuntimeException("expected");
} }
} }

View file

@ -50,11 +50,12 @@ public class PersistentStateful {
} }
@transactional @transactional
public void failure(String key, String msg, PersistentFailer failer) { public String failure(String key, String msg, PersistentFailer failer) {
mapState.put(key, msg); mapState.put(key, msg);
vectorState.add(msg); vectorState.add(msg);
refState.swap(msg); refState.swap(msg);
failer.fail(); failer.fail();
return msg;
} }
@transactional @transactional

View file

@ -4,7 +4,7 @@
<facet type="web" name="Web"> <facet type="web" name="Web">
<configuration> <configuration>
<descriptors> <descriptors>
<deploymentDescriptor name="web.xml" url="file://$MODULE_DIR$/web.xml" /> <deploymentDescriptor name="web.xml" url="file://$MODULE_DIR$/web.xml" optional="false" version="2.5" />
</descriptors> </descriptors>
<webroots> <webroots>
<root url="file://$MODULE_DIR$/src/main/resources" relative="/" /> <root url="file://$MODULE_DIR$/src/main/resources" relative="/" />

View file

@ -27,6 +27,7 @@ import scala.collection.mutable.HashMap
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg) sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg) class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
class TransactionAwareException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause)
object Annotations { object Annotations {
import se.scalablesolutions.akka.annotation._ import se.scalablesolutions.akka.annotation._
@ -42,15 +43,36 @@ object Annotations {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a> * @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/ */
class ActiveObjectFactory { class ActiveObjectFactory {
def newInstance[T](target: Class[T], actor: Actor): T = { def newInstance[T](target: Class[T]): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), false)
def newInstance[T](intf: Class[T], target: AnyRef): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), false)
def newRemoteInstance[T](target: Class[T]): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), true)
def newRemoteInstance[T](intf: Class[T], target: AnyRef): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), true)
/*
def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = {
val actor = new Dispatcher(target.getName)
ActiveObject.newInstance(target, actor) ActiveObject.newInstance(target, actor)
} }
def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor): T = { def newInstanceAndLink[T](intf: Class[T], target: AnyRef, supervisor: AnyRef): T = {
val actor = new Dispatcher(target.getName)
ActiveObject.newInstance(intf, target, actor) ActiveObject.newInstance(intf, target, actor)
} }
*/
// ================================================
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean): T = {
ActiveObject.newInstance(target, actor, remote)
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean): T = {
ActiveObject.newInstance(intf, target, actor, remote)
}
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components) ActiveObject.supervise(restartStrategy, components)
} }
@ -62,28 +84,40 @@ class ActiveObjectFactory {
object ActiveObject { object ActiveObject {
val AKKA_CAMEL_ROUTING_SCHEME = "akka" val AKKA_CAMEL_ROUTING_SCHEME = "akka"
val RemoteClient = new NettyClient
private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = { private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = {
val tl = new ThreadLocal[Option[Transaction]] val tl = new ThreadLocal[Option[Transaction]]
tl.set(None) tl.set(None)
tl tl
} }
def newInstance[T](target: Class[T], actor: Actor): T = { def newInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), false)
def newInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), false)
def newRemoteInstance[T](target: Class[T]): T = newInstance(target, new Dispatcher(target.getName), true)
def newRemoteInstance[T](intf: Class[T], target: AnyRef): T = newInstance(intf, target, new Dispatcher(intf.getName), true)
// ================================================
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean): T = {
if (remote) NettyClient.connect
val proxy = Proxy.newInstance(target, false, true) val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time // FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy, actor)) proxy.asInstanceOf[Advisable].aw_addAdvice(
"execution(* *.*(..))", new SequentialTransactionalAroundAdvice(target, proxy, actor, remote))
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor): T = { private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean): T = {
if (remote) NettyClient.connect
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true) val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
proxy.asInstanceOf[Advisable].aw_addAdvice("execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target, actor)) proxy.asInstanceOf[Advisable].aw_addAdvice(
"execution(* *.*(..))", new SequentialTransactionalAroundAdvice(intf, target, actor, remote))
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = { private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = {
object factory extends SupervisorFactory { object factory extends SupervisorFactory {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
} }
@ -98,7 +132,8 @@ object ActiveObject {
*/ */
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
@serializable sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, actor: Actor) extends AroundAdvice { @serializable sealed class SequentialTransactionalAroundAdvice(
target: Class[_], targetInstance: AnyRef, actor: Actor, val remote: Boolean) extends AroundAdvice {
private val changeSet = new ChangeSet(target.getName) private val changeSet = new ChangeSet(target.getName)
private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
@ -119,44 +154,43 @@ object ActiveObject {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val method = rtti.getMethod val method = rtti.getMethod
val remoteCall = true
val isOneWay = rtti.getMethod.getReturnType == java.lang.Void.TYPE val isOneWay = rtti.getMethod.getReturnType == java.lang.Void.TYPE
val result: AnyRef = // FIXME join TX with same id, do not COMMIT
if (remoteCall) { tryToCommitTransaction
// FIXME: Make part of TX semantics?? if (isInExistingTransaction) {
val future = ActiveObject.RemoteClient.send(new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, isOneWay)) joinExistingTransaction
} else {
if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
}
try {
incrementTransaction
if (remote) {
val future = NettyClient.send(
new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, isOneWay, false))
if (isOneWay) null // for void methods if (isOneWay) null // for void methods
else { else {
future.await_? future.await_?
val resultOrFailure = getResultOrThrowException(future) val result = getResultOrThrowException(future)
handleResult(resultOrFailure) if (result.isDefined) result.get
else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
} }
} else { } else {
// FIXME join TX with same id, do not COMMIT if (isOneWay) actor !! Invocation(joinpoint, activeTx) // FIXME investigate why ! causes TX to race
tryToCommitTransaction else {
if (isInExistingTransaction) { val result = actor !! Invocation(joinpoint, activeTx)
joinExistingTransaction if (result.isDefined) result.get
} else { else throw new IllegalStateException("No result defined for invocation [" + joinpoint + "]")
if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction
}
try {
incrementTransaction
if (isOneWay) actor ! Invocation(joinpoint, activeTx)
else {
val result = actor !! Invocation(joinpoint, activeTx)
val resultOrFailure =
if (result.isDefined) result.get.asInstanceOf[ResultOrFailure[AnyRef]]
else throw new ActiveObjectInvocationTimeoutException("TIMED OUT")
handleResult(resultOrFailure)
}
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
else tryToPrecommitTransaction
} }
} }
result } catch {
case e: TransactionAwareException =>
rollback(e.tx)
throw e.cause
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
else tryToPrecommitTransaction
}
} }
// TODO: create a method setCallee/setCaller to the joinpoint interface and compiler // TODO: create a method setCallee/setCaller to the joinpoint interface and compiler
@ -201,7 +235,7 @@ object ActiveObject {
private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = {
try { try {
result() result()
} catch { } catch {
@ -237,6 +271,7 @@ object ActiveObject {
else true else true
} else true } else true
/*
private def sendOneWay(joinpoint: JoinPoint) = private def sendOneWay(joinpoint: JoinPoint) =
mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult)) mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult))
@ -245,21 +280,27 @@ object ActiveObject {
future.await_? future.await_?
getResultOrThrowException(future) getResultOrThrowException(future)
} }
*/
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = { private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
val future = new DefaultCompletableFutureResult(timeout) val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future)) mailbox.append(new MessageHandle(this, message, future))
future future
} }
private def getResultOrThrowException[T](future: FutureResult): ResultOrFailure[AnyRef] = private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw new TransactionAwareException(cause, activeTx)
} else future.result.asInstanceOf[Option[T]]
/*
if (future.exception.isDefined) { if (future.exception.isDefined) {
var resultOrFailure = ResultOrFailure(activeTx) var resultOrFailure = ResultOrFailure(activeTx)
val (toBlame, cause) = future.exception.get val (toBlame, cause) = future.exception.get
resultOrFailure() = throw cause resultOrFailure() = throw cause
resultOrFailure resultOrFailure
} else ResultOrFailure(future.result.get, activeTx) } else ResultOrFailure(future.result.get, activeTx)
*/
/** /**
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top. * Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
*/ */
@ -342,13 +383,15 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) => case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) =>
ActiveObject.threadBoundTx.set(tx) ActiveObject.threadBoundTx.set(tx)
try { try {
reply(ResultOrFailure(joinpoint.proceed, tx)) reply(joinpoint.proceed)
} catch { } catch {
case e => case e =>
throw new TransactionAwareException(e, tx)
/*
val resultOrFailure = ResultOrFailure(tx) val resultOrFailure = ResultOrFailure(tx)
resultOrFailure() = throw e resultOrFailure() = throw e
reply(resultOrFailure) reply(resultOrFailure)
} */ }
case unexpected => case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]") throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")

View file

@ -31,7 +31,7 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler {
} }
trait Actor extends Logging { trait Actor extends Logging {
var timeout: Long = 1000L var timeout: Long = 5000L
@volatile private[this] var isRunning: Boolean = false @volatile private[this] var isRunning: Boolean = false
protected[this] var id: String = super.toString protected[this] var id: String = super.toString
@ -115,7 +115,7 @@ trait Actor extends Logging {
def !(message: AnyRef) = def !(message: AnyRef) =
if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult)) if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult))
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) { def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout) val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
future.await_? future.await_?
@ -200,6 +200,7 @@ trait Actor extends Logging {
else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]") else throw new IllegalArgumentException("No handler matching message [" + message + "] in actor [" + this.getClass.getName + "]")
} catch { } catch {
case e => case e =>
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
future.completeWithException(this, e) future.completeWithException(this, e)
} }
} }
@ -213,8 +214,7 @@ trait Actor extends Logging {
private def getResultOrThrowException[T](future: FutureResult): Option[T] = private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) { if (future.exception.isDefined) {
val (toBlame, cause) = future.exception.get val (_, cause) = future.exception.get
if (supervisor.isDefined) supervisor.get ! Exit(toBlame.asInstanceOf[Actor], cause)
throw cause throw cause
} else future.result.asInstanceOf[Option[T]] } else future.result.asInstanceOf[Option[T]]

View file

@ -109,7 +109,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newSubclassingProxy(component: Component): DependencyBinding = { private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target val targetClass = component.target
val actor = new Dispatcher(targetClass.getName) val actor = new Dispatcher(targetClass.getName)
val proxy = activeObjectFactory.newInstance(targetClass, actor).asInstanceOf[AnyRef] val proxy = activeObjectFactory.newInstance(targetClass, actor, false).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle) workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component)) activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy) new DependencyBinding(targetClass, proxy)
@ -120,7 +120,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]]()).setAccessible(true) component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
val actor = new Dispatcher(targetClass.getName) val actor = new Dispatcher(targetClass.getName)
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor).asInstanceOf[AnyRef] val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, false).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle) workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy) new DependencyBinding(targetClass, proxy)
@ -137,6 +137,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
supervisor = activeObjectFactory.supervise(restartStrategy, workers) supervisor = activeObjectFactory.supervise(restartStrategy, workers)
//camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) //camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
//camelContext.start //camelContext.start
supervisor.startSupervisor
ActiveObjectConfigurator.registerConfigurator(this) ActiveObjectConfigurator.registerConfigurator(this)
this this
} }

View file

@ -16,10 +16,11 @@ import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.channel._ import org.jboss.netty.channel._
class NettyClient extends Logging { object NettyClient extends Logging {
private val HOSTNAME = "localhost" private val HOSTNAME = "localhost"
private val PORT = 9999 private val PORT = 9999
@volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val channelFactory = new NioClientSocketChannelFactory( private val channelFactory = new NioClientSocketChannelFactory(
@ -33,22 +34,31 @@ class NettyClient extends Logging {
bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true) bootstrap.setOption("keepAlive", true)
private val connection = bootstrap.connect(new InetSocketAddress(HOSTNAME, PORT)) private var connection: ChannelFuture = _
log.info("Starting NIO client at [%s:%s]", HOSTNAME, PORT)
// Wait until the connection attempt succeeds or fails. def connect = synchronized {
connection.awaitUninterruptibly if (!isRunning) {
if (!connection.isSuccess) { connection = bootstrap.connect(new InetSocketAddress(HOSTNAME, PORT))
log.error("Connection has failed due to [%s]", connection.getCause) log.info("Starting NIO client at [%s:%s]", HOSTNAME, PORT)
connection.getCause.printStackTrace
// Wait until the connection attempt succeeds or fails.
connection.awaitUninterruptibly
if (!connection.isSuccess) {
log.error("Connection has failed due to [%s]", connection.getCause)
connection.getCause.printStackTrace
}
isRunning = true
}
} }
def shutdown = { def shutdown = synchronized {
connection.getChannel.getCloseFuture.awaitUninterruptibly if (!isRunning) {
channelFactory.releaseExternalResources connection.getChannel.getCloseFuture.awaitUninterruptibly
channelFactory.releaseExternalResources
}
} }
def send(request: RemoteRequest): CompletableFutureResult = { def send(request: RemoteRequest): CompletableFutureResult = if (isRunning) {
val escapedRequest = escapeRequest(request) val escapedRequest = escapeRequest(request)
if (escapedRequest.isOneWay) { if (escapedRequest.isOneWay) {
connection.getChannel.write(escapedRequest) connection.getChannel.write(escapedRequest)
@ -61,18 +71,20 @@ class NettyClient extends Logging {
futureResult futureResult
} }
} }
} } else throw new IllegalStateException("Netty client is not running, make sure you have invoked 'connect' before using the client")
private def escapeRequest(request: RemoteRequest) = { private def escapeRequest(request: RemoteRequest) = {
if (request.message.isInstanceOf[Array[Object]]) { if (request.message.isInstanceOf[Array[Object]]) {
val args = request.message.asInstanceOf[Array[Object]].toList.asInstanceOf[scala.List[Object]] val args = request.message.asInstanceOf[Array[Object]].toList.asInstanceOf[scala.List[Object]]
var isEscaped = false
val escapedArgs = for (arg <- args) yield { val escapedArgs = for (arg <- args) yield {
val clazz = arg.getClass val clazz = arg.getClass
if (clazz.getName.contains("$$ProxiedByAW")) { if (clazz.getName.contains("$$ProxiedByAW")) {
isEscaped = true
new ProxyWrapper(clazz.getSuperclass.getName) new ProxyWrapper(clazz.getSuperclass.getName)
} else arg } else arg
} }
request.cloneWithNewMessage(escapedArgs) request.cloneWithNewMessage(escapedArgs, isEscaped)
} else request } else request
} }
} }
@ -105,11 +117,8 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
if (result.isInstanceOf[RemoteReply]) { if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply] val reply = result.asInstanceOf[RemoteReply]
val future = futures.get(reply.id) val future = futures.get(reply.id)
if (reply.successful) { if (reply.successful) future.completeWithResult(reply.message)
future.completeWithResult(reply.message) else future.completeWithException(null, reply.exception)
} else {
future.completeWithException(null, reply.exception)
}
} else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result) } else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result)
} catch { } catch {
case e: Exception => log.error("Unexpected exception in NIO client handler: %s", e); throw e case e: Exception => log.error("Unexpected exception in NIO client handler: %s", e); throw e
@ -121,9 +130,3 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
event.getChannel.close event.getChannel.close
} }
} }
object NettyClientRunner {
def main(args: Array[String]) = {
new NettyClient
}
}

View file

@ -22,28 +22,27 @@ import org.jboss.netty.handler.codec.serialization.ObjectDecoder
import org.jboss.netty.handler.codec.serialization.ObjectEncoder import org.jboss.netty.handler.codec.serialization.ObjectEncoder
class NettyServer extends Logging { class NettyServer extends Logging {
val HOSTNAME = "localhost" private val HOSTNAME = "localhost"
val PORT = 9999 private val PORT = 9999
val CONNECTION_TIMEOUT_MILLIS = 100 private val CONNECTION_TIMEOUT_MILLIS = 100
log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT) private val factory = new NioServerSocketChannelFactory(
val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool, Executors.newCachedThreadPool,
Executors.newCachedThreadPool) Executors.newCachedThreadPool)
val bootstrap = new ServerBootstrap(factory) private val activeObjectFactory = new ActiveObjectFactory
private val bootstrap = new ServerBootstrap(factory)
// FIXME provide different codecs (Thrift, Avro, Protobuf, JSON) // FIXME provide different codecs (Thrift, Avro, Protobuf, JSON)
val handler = new ObjectServerHandler
private val handler = new ObjectServerHandler
bootstrap.getPipeline.addLast("handler", handler) bootstrap.getPipeline.addLast("handler", handler)
bootstrap.setOption("child.tcpNoDelay", true) bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true) bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true) bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS) bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT)
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT)) bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
} }
@ -65,8 +64,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
} }
override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = {
// Send the first message if this handler is a client-side handler. //e.getChannel.write(firstMessage)
// if (!firstMessage.isEmpty) e.getChannel.write(firstMessage)
} }
override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) ={ override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) ={
@ -90,13 +88,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
} else { } else {
log.debug("Dispatching to [%s :: %s]", request.method, request.target) log.debug("Dispatching to [%s :: %s]", request.method, request.target)
val activeObject = createActiveObject(request.target) val activeObject = createActiveObject(request.target)
// val args = request.message.asInstanceOf[scala.List[Object]]
// val argClassesList = args.map(_.getClass)
// val argClasses = argClassesList.map(_.getClass).toArray
// val method = activeObject.getClass.getDeclaredMethod(request.method, argClasses)
val args = request.message.asInstanceOf[scala.List[AnyRef]] val args = request.message.asInstanceOf[scala.List[AnyRef]]
val argClazzes = args.map(_.getClass)//.toArray.asInstanceOf[Array[Class[_]]] val argClazzes = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes) val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes)
val method = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses) val method = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses)
try { try {
@ -142,15 +135,18 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val activeObjectOrNull = activeObjects.get(name) val activeObjectOrNull = activeObjects.get(name)
if (activeObjectOrNull == null) { if (activeObjectOrNull == null) {
val clazz = Class.forName(name) val clazz = Class.forName(name)
val newInstance = clazz.newInstance.asInstanceOf[AnyRef] // activeObjectFactory.newInstance(clazz, new Dispatcher(invocation.target)).asInstanceOf[AnyRef] try {
activeObjects.put(name, newInstance) val actor = new Dispatcher(clazz.getName)
newInstance actor.start
val newInstance = activeObjectFactory.newInstance(clazz, actor, false).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
} catch {
case e =>
log.debug("Could not create remote active object instance due to: %s", e)
e.printStackTrace
throw e
}
} else activeObjectOrNull } else activeObjectOrNull
} }
} }
object NettyServerRunner {
def main(args: Array[String]) = {
new NettyServer
}
}

View file

@ -14,7 +14,12 @@ object IdFactory {
@serializable class ProxyWrapper(val proxyName: String) @serializable class ProxyWrapper(val proxyName: String)
@serializable class RemoteRequest(val isActor: Boolean, val message: AnyRef, val method: String, val target: String, val isOneWay: Boolean) { @serializable class RemoteRequest(val isActor: Boolean,
val message: AnyRef,
val method: String,
val target: String,
val isOneWay: Boolean,
val isEscaped: Boolean) {
private[RemoteRequest] var _id = IdFactory.nextId private[RemoteRequest] var _id = IdFactory.nextId
def id = _id def id = _id
@ -44,8 +49,8 @@ object IdFactory {
def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error) } def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error) }
def cloneWithNewMessage(message: AnyRef) = synchronized { def cloneWithNewMessage(message: AnyRef, isEscaped: Boolean) = synchronized {
val request = new RemoteRequest(isActor, message, method, target, isOneWay) val request = new RemoteRequest(isActor, message, method, target, isOneWay, isEscaped)
request._id = id request._id = id
request request
} }

View file

@ -17,7 +17,6 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
selectorThread = new Thread { selectorThread = new Thread {
override def run = { override def run = {
while (active) { while (active) {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
try { try {
messageDemultiplexer.select messageDemultiplexer.select
} catch { case e: InterruptedException => active = false } } catch { case e: InterruptedException => active = false }

View file

@ -32,23 +32,18 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private val queue = new LinkedBlockingQueue[Runnable] private val queue = new LinkedBlockingQueue[Runnable]
private val handlerExecutor = new ThreadPoolExecutor(minNrThreads, maxNrThreads, timeOut, timeUnit, queue, threadFactory, rejectedExecutionHandler) private val handlerExecutor = new ThreadPoolExecutor(minNrThreads, maxNrThreads, timeOut, timeUnit, queue, threadFactory, rejectedExecutionHandler)
//private val handlerExecutor = Executors.newCachedThreadPool()
def start = if (!active) { def start = if (!active) {
active = true active = true
val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue) val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue)
selectorThread = new Thread { selectorThread = new Thread {
//val enqued = new LinkedList[MessageHandle]
override def run = { override def run = {
while (active) { while (active) {
try { try {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
try { try {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select messageDemultiplexer.select
} catch {case e: InterruptedException => active = false} } catch {case e: InterruptedException => active = false}
val queue = messageDemultiplexer.acquireSelectedQueue val queue = messageDemultiplexer.acquireSelectedQueue
// while (!queue.isEmpty) {
for (index <- 0 until queue.size) { for (index <- 0 until queue.size) {
val message = queue.peek val message = queue.peek
val messageHandler = getIfNotBusy(message.sender) val messageHandler = getIfNotBusy(message.sender)
@ -63,10 +58,6 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
queue.remove queue.remove
} }
} }
// }
if (!queue.isEmpty) {
for (index <- 0 until queue.size) messageQueue.append(queue.remove)
}
} finally { } finally {
messageDemultiplexer.releaseSelectedQueue messageDemultiplexer.releaseSelectedQueue
} }

View file

@ -39,7 +39,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
private var _result: Option[AnyRef] = None private var _result: Option[AnyRef] = None
private var _exception: Option[Tuple2[AnyRef, Throwable]] = None private var _exception: Option[Tuple2[AnyRef, Throwable]] = None
override def await_? = try { def await_? = try {
_lock.lock _lock.lock
var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) var wait = timeoutInNanos - (currentTimeInNanos - _startTimeInNanos)
while (!_completed && wait > 0) { while (!_completed && wait > 0) {
@ -56,7 +56,7 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock _lock.unlock
} }
override def await_! = try { def await_! = try {
_lock.lock _lock.lock
while (!_completed) { while (!_completed) {
_signal.await _signal.await
@ -65,35 +65,35 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock _lock.unlock
} }
override def isCompleted: Boolean = try { def isCompleted: Boolean = try {
_lock.lock _lock.lock
_completed _completed
} finally { } finally {
_lock.unlock _lock.unlock
} }
override def isExpired: Boolean = try { def isExpired: Boolean = try {
_lock.lock _lock.lock
timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0 timeoutInNanos - (currentTimeInNanos - _startTimeInNanos) <= 0
} finally { } finally {
_lock.unlock _lock.unlock
} }
override def result: Option[AnyRef] = try { def result: Option[AnyRef] = try {
_lock.lock _lock.lock
_result _result
} finally { } finally {
_lock.unlock _lock.unlock
} }
override def exception: Option[Tuple2[AnyRef, Throwable]] = try { def exception: Option[Tuple2[AnyRef, Throwable]] = try {
_lock.lock _lock.lock
_exception _exception
} finally { } finally {
_lock.unlock _lock.unlock
} }
override def completeWithResult(result: AnyRef) = try { def completeWithResult(result: AnyRef) = try {
_lock.lock _lock.lock
if (!_completed) { if (!_completed) {
_completed = true _completed = true
@ -104,13 +104,12 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
_lock.unlock _lock.unlock
} }
override def completeWithException(toBlame: AnyRef, exception: Throwable) = try { def completeWithException(toBlame: AnyRef, exception: Throwable) = try {
_lock.lock _lock.lock
if (!_completed) { if (!_completed) {
_completed = true _completed = true
_exception = Some((toBlame, exception)) _exception = Some((toBlame, exception))
} }
} finally { } finally {
_signal.signalAll _signal.signalAll
_lock.unlock _lock.unlock
@ -120,13 +119,13 @@ class DefaultCompletableFutureResult(timeout: Long) extends CompletableFutureRes
} }
class NullFutureResult extends CompletableFutureResult { class NullFutureResult extends CompletableFutureResult {
override def completeWithResult(result: AnyRef) = {} def completeWithResult(result: AnyRef) = {}
override def completeWithException(toBlame: AnyRef, exception: Throwable) = {} def completeWithException(toBlame: AnyRef, exception: Throwable) = {}
override def await_? = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def await_? = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def await_! = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def await_! = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def isCompleted: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def isExpired: Boolean = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult") def timeoutInNanos: Long = throw new UnsupportedOperationException("Not implemented for NullFutureResult")
override def result: Option[AnyRef] = None def result: Option[AnyRef] = None
override def exception: Option[Tuple2[AnyRef, Throwable]] = None def exception: Option[Tuple2[AnyRef, Throwable]] = None
} }

View file

@ -9,9 +9,9 @@ import java.util.concurrent.{ConcurrentMap, ConcurrentHashMap}
trait MessageDispatcherBase extends MessageDispatcher { trait MessageDispatcherBase extends MessageDispatcher {
val messageQueue = new MessageQueue val messageQueue = new MessageQueue
@volatile protected var active: Boolean = false
protected val messageHandlers = new ConcurrentHashMap[AnyRef, MessageHandler] protected val messageHandlers = new ConcurrentHashMap[AnyRef, MessageHandler]
protected var selectorThread: Thread = _ protected var selectorThread: Thread = _
@volatile protected var active: Boolean = false
protected val guard = new Object protected val guard = new Object
def registerHandler(key: AnyRef, handler: MessageHandler) = guard.synchronized { def registerHandler(key: AnyRef, handler: MessageHandler) = guard.synchronized {

View file

@ -88,7 +88,9 @@ final object CassandraNode extends Logging {
val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index) val column = server.get_column(TABLE_NAME, name, VECTOR_COLUMN_FAMILY + ":" + index)
serializer.in(column.value) serializer.in(column.value)
} catch { } catch {
case e => throw new Predef.NoSuchElementException(e.getMessage) case e =>
e.printStackTrace
throw new Predef.NoSuchElementException(e.getMessage)
} }
} }
@ -132,7 +134,11 @@ final object CassandraNode extends Logging {
try { try {
val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key) val column = server.get_column(TABLE_NAME, name, MAP_COLUMN_FAMILY + ":" + key)
Some(serializer.in(column.value)) Some(serializer.in(column.value))
} catch { case e => None } } catch {
case e =>
e.printStackTrace
None
}
} }
def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = { def getMapStorageFor(name: String): List[Tuple2[String, AnyRef]] = {

View file

@ -50,7 +50,7 @@ object TransactionIdFactory {
def begin(changeSet: ChangeSet) = synchronized { def begin(changeSet: ChangeSet) = synchronized {
ensureIsActiveOrNew ensureIsActiveOrNew
if (status == TransactionStatus.New) log.debug("Server [%s] is starting NEW transaction [%s]", changeSet.id, this) if (status == TransactionStatus.New) log.debug("TX BEGIN - Server [%s] is starting NEW transaction [%s]", changeSet.id, this)
else log.debug("Server [%s] is participating in transaction", changeSet.id) else log.debug("Server [%s] is participating in transaction", changeSet.id)
changeSet.full.foreach(_.begin) changeSet.full.foreach(_.begin)
participants + changeSet participants + changeSet
@ -59,14 +59,14 @@ object TransactionIdFactory {
def precommit(changeSet: ChangeSet) = synchronized { def precommit(changeSet: ChangeSet) = synchronized {
if (status == TransactionStatus.Active) { if (status == TransactionStatus.Active) {
log.debug("Pre-committing transaction [%s] for server [%s]", this, changeSet.id) log.debug("TX PRECOMMIT - Pre-committing transaction [%s] for server [%s]", this, changeSet.id)
precommitted + changeSet precommitted + changeSet
} }
} }
def commit(changeSet: ChangeSet) = synchronized { def commit(changeSet: ChangeSet) = synchronized {
if (status == TransactionStatus.Active) { if (status == TransactionStatus.Active) {
log.debug("Committing transaction [%s] for server [%s]", this, changeSet.id) log.debug("TX COMMIT - Committing transaction [%s] for server [%s]", this, changeSet.id)
val haveAllPreCommitted = val haveAllPreCommitted =
if (participants.size == precommitted.size) {{ if (participants.size == precommitted.size) {{
for (server <- participants) yield { for (server <- participants) yield {
@ -85,7 +85,7 @@ object TransactionIdFactory {
def rollback(changeSet: ChangeSet) = synchronized { def rollback(changeSet: ChangeSet) = synchronized {
ensureIsActiveOrAborted ensureIsActiveOrAborted
log.debug("Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", changeSet.id, this, participants) log.debug("TX ROLLBACK - Server [%s] has initiated transaction rollback for [%s], rolling back [%s]", changeSet.id, this, participants)
participants.foreach(_.full.foreach(_.rollback)) participants.foreach(_.full.foreach(_.rollback))
status = TransactionStatus.Aborted status = TransactionStatus.Aborted
participants.clear participants.clear
@ -94,7 +94,7 @@ object TransactionIdFactory {
def join(changeSet: ChangeSet) = synchronized { def join(changeSet: ChangeSet) = synchronized {
ensureIsActive ensureIsActive
log.debug("Server [%s] is joining transaction [%s]" , changeSet.id, this) log.debug("TX JOIN - Server [%s] is joining transaction [%s]" , changeSet.id, this)
changeSet.full.foreach(_.begin) changeSet.full.foreach(_.begin)
participants + changeSet participants + changeSet
} }

View file

@ -20,7 +20,6 @@ class EventBasedDispatcherTest {
try { try {
if (threadingIssueDetected.get) return if (threadingIssueDetected.get) return
if (guardLock.tryLock) { if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown handleLatch.countDown
} else { } else {
threadingIssueDetected.set(true) threadingIssueDetected.set(true)
@ -55,12 +54,12 @@ class EventBasedDispatcherTest {
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10) val handleLatch = new CountDownLatch(100)
val key = "key" val key = "key"
val dispatcher = new EventBasedSingleThreadDispatcher val dispatcher = new EventBasedSingleThreadDispatcher
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start dispatcher.start
for (i <- 0 until 10) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))

View file

@ -30,38 +30,42 @@ class ThreadBasedDispatcherTest {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
} }
//@Test @Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = { def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
} }
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageHandler {
val guardLock: Lock = new ReentrantLock
def handle(message: MessageHandle) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100) val handleLatch = new CountDownLatch(100)
val key = "key" val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher val dispatcher = new EventBasedSingleThreadDispatcher
dispatcher.registerHandler(key, new MessageHandler { dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
def handle(message: MessageHandle) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else threadingIssueDetected.set(true)
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
})
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key, new Object, new NullFutureResult))
} }
assertTrue(handleLatch.await(5000, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
//dispatcher.shutdown
} }
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = { private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
@ -93,7 +97,7 @@ class ThreadBasedDispatcherTest {
val handleLatch = new CountDownLatch(200) val handleLatch = new CountDownLatch(200)
val key1 = "key1" val key1 = "key1"
val key2 = "key2" val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher val dispatcher = new EventBasedSingleThreadDispatcher
dispatcher.registerHandler(key1, new MessageHandler { dispatcher.registerHandler(key1, new MessageHandler {
var currentValue = -1; var currentValue = -1;
def handle(message: MessageHandle) { def handle(message: MessageHandle) {
@ -111,7 +115,7 @@ class ThreadBasedDispatcherTest {
if (threadingIssueDetected.get) return if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int] val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) { if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue .intValue currentValue = messageValue.intValue
handleLatch.countDown handleLatch.countDown
} else threadingIssueDetected.set(true) } else threadingIssueDetected.set(true)
} }
@ -121,7 +125,7 @@ class ThreadBasedDispatcherTest {
dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), new NullFutureResult))
dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult)) dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), new NullFutureResult))
} }
assertTrue(handleLatch.await(10, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
dispatcher.shutdown dispatcher.shutdown
} }