fixed last stm issues and failing tests + added new thread-based dispatcher (plus test)

This commit is contained in:
Jonas Boner 2009-07-06 23:45:15 +02:00
parent 3830aed805
commit ff969047cc
40 changed files with 1063 additions and 1837 deletions

View file

@ -9,19 +9,19 @@
<component name="CodeStyleSettingsManager"> <component name="CodeStyleSettingsManager">
<option name="PER_PROJECT_SETTINGS"> <option name="PER_PROJECT_SETTINGS">
<value> <value>
<ADDITIONAL_INDENT_OPTIONS fileType="java"> <ADDITIONAL_INDENT_OPTIONS fileType="">
<option name="INDENT_SIZE" value="4" /> <option name="INDENT_SIZE" value="2" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="4" /> <option name="TAB_SIZE" value="2" />
<option name="USE_TAB_CHARACTER" value="false" /> <option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" /> <option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" /> <option name="LABEL_INDENT_SIZE" value="0" />
<option name="LABEL_INDENT_ABSOLUTE" value="false" /> <option name="LABEL_INDENT_ABSOLUTE" value="false" />
</ADDITIONAL_INDENT_OPTIONS> </ADDITIONAL_INDENT_OPTIONS>
<ADDITIONAL_INDENT_OPTIONS fileType="scala"> <ADDITIONAL_INDENT_OPTIONS fileType="java">
<option name="INDENT_SIZE" value="2" /> <option name="INDENT_SIZE" value="4" />
<option name="CONTINUATION_INDENT_SIZE" value="8" /> <option name="CONTINUATION_INDENT_SIZE" value="8" />
<option name="TAB_SIZE" value="2" /> <option name="TAB_SIZE" value="4" />
<option name="USE_TAB_CHARACTER" value="false" /> <option name="USE_TAB_CHARACTER" value="false" />
<option name="SMART_TABS" value="false" /> <option name="SMART_TABS" value="false" />
<option name="LABEL_INDENT_SIZE" value="0" /> <option name="LABEL_INDENT_SIZE" value="0" />
@ -735,17 +735,6 @@
<root url="jar://$MAVEN_REPOSITORY$/org/guiceyfruit/guice-jsr250/2.0-beta-4/guice-jsr250-2.0-beta-4-sources.jar!/" /> <root url="jar://$MAVEN_REPOSITORY$/org/guiceyfruit/guice-jsr250/2.0-beta-4/guice-jsr250-2.0-beta-4-sources.jar!/" />
</SOURCES> </SOURCES>
</library> </library>
<library name="Maven: org.codehaus.aspectwerkz:aspectwerkz-nodeps-jdk5:2.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/codehaus/aspectwerkz/aspectwerkz-nodeps-jdk5/2.1/aspectwerkz-nodeps-jdk5-2.1.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/codehaus/aspectwerkz/aspectwerkz-nodeps-jdk5/2.1/aspectwerkz-nodeps-jdk5-2.1-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/codehaus/aspectwerkz/aspectwerkz-nodeps-jdk5/2.1/aspectwerkz-nodeps-jdk5-2.1-sources.jar!/" />
</SOURCES>
</library>
<library name="Maven: net.lag:configgy:1.3"> <library name="Maven: net.lag:configgy:1.3">
<CLASSES> <CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/net/lag/configgy/1.3/configgy-1.3.jar!/" /> <root url="jar://$MAVEN_REPOSITORY$/net/lag/configgy/1.3/configgy-1.3.jar!/" />
@ -1431,6 +1420,17 @@
<root url="jar://$PROJECT_DIR$/util-java/pom-sources.jar!/" /> <root url="jar://$PROJECT_DIR$/util-java/pom-sources.jar!/" />
</SOURCES> </SOURCES>
</library> </library>
<library name="Maven: org.codehaus.aspectwerkz:aspectwerkz-nodeps-jdk5:2.1">
<CLASSES>
<root url="jar://$MAVEN_REPOSITORY$/org/codehaus/aspectwerkz/aspectwerkz-nodeps-jdk5/2.1/aspectwerkz-nodeps-jdk5-2.1.jar!/" />
</CLASSES>
<JAVADOC>
<root url="jar://$MAVEN_REPOSITORY$/org/codehaus/aspectwerkz/aspectwerkz-nodeps-jdk5/2.1/aspectwerkz-nodeps-jdk5-2.1-javadoc.jar!/" />
</JAVADOC>
<SOURCES>
<root url="jar://$MAVEN_REPOSITORY$/org/codehaus/aspectwerkz/aspectwerkz-nodeps-jdk5/2.1/aspectwerkz-nodeps-jdk5-2.1-sources.jar!/" />
</SOURCES>
</library>
</component> </component>
<UsedPathMacros> <UsedPathMacros>
<macro name="MAVEN_REPOSITORY" description="Maven Local Repostiry" /> <macro name="MAVEN_REPOSITORY" description="Maven Local Repostiry" />

1744
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,24 @@
package se.scalablesolutions.akka.api;
import junit.framework.TestCase;
import junit.framework.Test;
import junit.framework.TestSuite;
public class AllTest extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite("All Java tests");
suite.addTestSuite(InMemoryStateTest.class);
suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(PersistentStateTest.class);
suite.addTestSuite(PersistentNestedStateTest.class);
suite.addTestSuite(RemoteInMemoryStateTest.class);
suite.addTestSuite(RemotePersistentStateTest.class);
suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
//suite.addTestSuite(RestTest.class);
return suite;
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View file

@ -1,38 +0,0 @@
package se.scalablesolutions.akka.api;
import junit.framework.TestCase;
import junit.framework.Test;
import junit.framework.TestSuite;
public class AllTests extends TestCase {
public static Test suite() {
TestSuite suite = new TestSuite("All tests");
// Java tests
suite.addTestSuite(InMemoryStateTest.class);
suite.addTestSuite(InMemNestedStateTest.class);
suite.addTestSuite(PersistentStateTest.class);
suite.addTestSuite(PersistentNestedStateTest.class);
suite.addTestSuite(RemoteInMemoryStateTest.class);
suite.addTestSuite(RemotePersistentStateTest.class);
suite.addTestSuite(ActiveObjectGuiceConfiguratorTest.class);
suite.addTestSuite(RestTest.class);
// Scala tests
//suite.addTestSuite(se.scalablesolutions.akka.kernel.SupervisorSpec.class);
/*
suite.addTestSuite(se.scalablesolutions.akka.kernel.RemoteSupervisorSpec.class);
suite.addTestSuite(se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest.class);
suite.addTestSuite(se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest.class);
suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.ActorSpec.class);
suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.RemoteActorSpec.class);
suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.InMemStatefulActor.class);
suite.addTestSuite(se.scalablesolutions.akka.kernel.actor.PersistentActor.class);
*/
return suite;
}
public static void main(String[] args) {
junit.textui.TestRunner.run(suite());
}
}

View file

@ -0,0 +1,13 @@
package se.scalablesolutions.akka.api;
public class PersistenceManager {
private static volatile boolean isRunning = false;
public static void init() {
if (!isRunning) {
se.scalablesolutions.akka.kernel.Kernel.config();
se.scalablesolutions.akka.kernel.Kernel.startCassandra();
se.scalablesolutions.akka.kernel.Kernel.startRemoteService();
isRunning = true;
}
}
}

View file

@ -17,13 +17,8 @@ public class PersistentNestedStateTest extends TestCase {
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory(); final private ActiveObjectFactory factory = new ActiveObjectFactory();
static {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
System.setProperty("storage-config", "config");
Kernel.startCassandra();
}
protected void setUp() { protected void setUp() {
PersistenceManager.init();
conf.configureActiveObjects( conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000), new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{ new Component[]{

View file

@ -14,14 +14,10 @@ import junit.framework.TestCase;
public class PersistentStateTest extends TestCase { public class PersistentStateTest extends TestCase {
static String messageLog = ""; static String messageLog = "";
static {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
System.setProperty("storage-config", "config");
Kernel.startCassandra();
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
protected void setUp() { protected void setUp() {
PersistenceManager.init();
conf.configureActiveObjects( conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000), new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] { new Component[] {

View file

@ -52,7 +52,6 @@ public class RemoteInMemoryStateTest extends TestCase {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setVectorState("init"); // set init state stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getVectorState()); assertEquals("new state", stateful.getVectorState());
} }
@ -72,7 +71,6 @@ public class RemoteInMemoryStateTest extends TestCase {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999); InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setRefState("init"); // set init state stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactionrequired
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getRefState()); assertEquals("new state", stateful.getRefState());
} }

View file

@ -14,15 +14,10 @@ import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase { public class RemotePersistentStateTest extends TestCase {
static String messageLog = ""; static String messageLog = "";
static {
Kernel.startCassandra();
Kernel.startRemoteService();
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() { protected void setUp() {
PersistenceManager.init();
conf.configureActiveObjects( conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000), new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] { new Component[] {

View file

@ -28,6 +28,7 @@ import java.util.HashMap;
import se.scalablesolutions.akka.kernel.config.*; import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*; import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
/*
public class RestTest extends TestCase { public class RestTest extends TestCase {
private static int PORT = 9998; private static int PORT = 9998;
@ -36,8 +37,8 @@ public class RestTest extends TestCase {
private static ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava(); private static ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
@BeforeClass @BeforeClass
public static void initialize() throws IOException { protected void setUp() {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config(); se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
conf.configureActiveObjects( conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000), new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] { new Component[] {
@ -50,8 +51,8 @@ public class RestTest extends TestCase {
} }
@Test @Test
public void simpleRequest() throws IOException, InstantiationException { public void testSimpleRequest() throws IOException, InstantiationException {
//selector.start(); selector.listen();
Client client = Client.create(); Client client = Client.create();
WebResource webResource = client.resource(URI); WebResource webResource = client.resource(URI);
String responseMsg = webResource.path("/foo").get(String.class); String responseMsg = webResource.path("/foo").get(String.class);
@ -79,7 +80,7 @@ public class RestTest extends TestCase {
selectorThread.setAlgorithmClassName(StaticStreamAlgorithm.class.getName()); selectorThread.setAlgorithmClassName(StaticStreamAlgorithm.class.getName());
selectorThread.setPort(port); selectorThread.setPort(port);
selectorThread.setAdapter(adapter); selectorThread.setAdapter(adapter);
selectorThread.listen();
return selectorThread; return selectorThread;
} }
} }
*/

View file

@ -245,16 +245,6 @@
</classpathContainers> </classpathContainers>
</configuration> </configuration>
</plugin> </plugin>
<!--plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<includes>
<include>*Spec.scala</include>
</includes>
<junitArtifactName>junit:junit</junitArtifactName>
</configuration>
</plugin-->
</plugins> </plugins>
<resources> <resources>
<resource> <resource>

View file

@ -0,0 +1,9 @@
<!DOCTYPE aspectwerkz PUBLIC
"-//AspectWerkz//DTD 2.0//EN"
"http://aspectwerkz.codehaus.org/dtd/aspectwerkz_2_0.dtd">
<aspectwerkz>
<system id="akka">
<aspect class="se.scalablesolutions.akka.kernel.config.ConfigurationAspect"/>
</system>
</aspectwerkz>

View file

@ -80,7 +80,7 @@ class ActiveObjectFactory {
ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout) ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout)
} }
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor =
ActiveObject.supervise(restartStrategy, components) ActiveObject.supervise(restartStrategy, components)
/* /*
@ -160,7 +160,7 @@ object ActiveObject {
proxy.asInstanceOf[T] proxy.asInstanceOf[T]
} }
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor = { private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
object factory extends SupervisorFactory { object factory extends SupervisorFactory {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components) override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
} }
@ -176,7 +176,7 @@ object ActiveObject {
@serializable @serializable
sealed class ActorAroundAdvice(val target: Class[_], sealed class ActorAroundAdvice(val target: Class[_],
val targetInstance: AnyRef, val targetInstance: AnyRef,
val actor: Dispatcher, val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress], val remoteAddress: Option[InetSocketAddress],
val timeout: Long) extends AroundAdvice { val timeout: Long) extends AroundAdvice {
val id = target.getName val id = target.getName
@ -293,11 +293,11 @@ private[kernel] class Dispatcher extends Actor {
} }
override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) { override protected def preRestart(reason: AnyRef, config: Option[AnyRef]) {
if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY) if (preRestart.isDefined) preRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY: _*)
} }
override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) { override protected def postRestart(reason: AnyRef, config: Option[AnyRef]) {
if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY) if (postRestart.isDefined) postRestart.get.invoke(target.get, ZERO_ITEM_CLASS_ARRAY: _*)
} }
} }

View file

@ -30,8 +30,8 @@ object DispatcherType {
case object ThreadBasedDispatcher extends DispatcherType case object ThreadBasedDispatcher extends DispatcherType
} }
class ActorMessageHandler(val actor: Actor) extends MessageHandler { class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
def handle(handle: MessageHandle) = actor.handle(handle) def invoke(handle: MessageInvocation) = actor.invoke(handle)
} }
object Actor { object Actor {
@ -53,9 +53,6 @@ trait Actor extends Logging with TransactionManagement {
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor] protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
protected[this] var latestMessage: Option[MessageHandle] = None
protected[this] var messageToReschedule: Option[MessageHandle] = None
// ==================================== // ====================================
// ==== USER CALLBACKS TO OVERRIDE ==== // ==== USER CALLBACKS TO OVERRIDE ====
// ==================================== // ====================================
@ -89,7 +86,7 @@ trait Actor extends Logging with TransactionManagement {
protected[kernel] var dispatcher: MessageDispatcher = { protected[kernel] var dispatcher: MessageDispatcher = {
val dispatcher = new EventBasedThreadPoolDispatcher val dispatcher = new EventBasedThreadPoolDispatcher
mailbox = dispatcher.messageQueue mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageHandler(this)) dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher dispatcher
} }
@ -202,10 +199,9 @@ trait Actor extends Logging with TransactionManagement {
/** /**
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics. * Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
*/ */
def !(message: AnyRef): Unit = if (isRunning) { def !(message: AnyRef): Unit =
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(message, timeout, false, true) if (isRunning) postMessageToMailbox(message)
else postMessageToMailbox(message) else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/** /**
* Sends a message asynchronously and waits on a future for a reply message. * Sends a message asynchronously and waits on a future for a reply message.
@ -217,13 +213,9 @@ trait Actor extends Logging with TransactionManagement {
* If not then the sender will unessecary block until the timeout expires. * If not then the sender will unessecary block until the timeout expires.
*/ */
def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) { def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) {
if (TransactionManagement.isTransactionalityEnabled) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
transactionalDispatch(message, timeout, false, false) future.await
} else { getResultOrThrowException(future)
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
future.await
getResultOrThrowException(future)
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/** /**
@ -242,13 +234,9 @@ trait Actor extends Logging with TransactionManagement {
* E.g. send-and-receive-eventually semantics. * E.g. send-and-receive-eventually semantics.
*/ */
def !?[T](message: AnyRef): T = if (isRunning) { def !?[T](message: AnyRef): T = if (isRunning) {
if (TransactionManagement.isTransactionalityEnabled) { val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
transactionalDispatch(message, 0, true, false).get future.awaitBlocking
} else { getResultOrThrowException(future).get
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, 0)
future.awaitBlocking
getResultOrThrowException(future).get
}
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it") } else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/** /**
@ -395,7 +383,7 @@ trait Actor extends Logging with TransactionManagement {
val supervisorUuid = registerSupervisorAsRemoteActor val supervisorUuid = registerSupervisorAsRemoteActor
RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, true, false, supervisorUuid)) RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, true, false, supervisorUuid))
} else { } else {
val handle = new MessageHandle(this, message, None, activeTx) val handle = new MessageInvocation(this, message, None, TransactionManagement.threadBoundTx.get)
mailbox.append(handle) mailbox.append(handle)
latestMessage = Some(handle) latestMessage = Some(handle)
} }
@ -409,87 +397,32 @@ trait Actor extends Logging with TransactionManagement {
else throw new IllegalStateException("Expected a future from remote call to actor " + toString) else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
} else { } else {
val future = new DefaultCompletableFutureResult(timeout) val future = new DefaultCompletableFutureResult(timeout)
val handle = new MessageHandle(this, message, Some(future), TransactionManagement.threadBoundTx.get) val handle = new MessageInvocation(this, message, Some(future), TransactionManagement.threadBoundTx.get)
mailbox.append(handle) mailbox.append(handle)
latestMessage = Some(handle) latestMessage = Some(handle)
future future
} }
} }
private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean, oneWay: Boolean): Option[T] = {
import TransactionManagement._
if (!tryToCommitTransaction) {
var nrRetries = 0 // FIXME only if top-level
var failed = true
do {
Thread.sleep(TIME_WAITING_FOR_COMPLETION)
nrRetries += 1
log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get, TIME_WAITING_FOR_COMPLETION, nrRetries)
failed = !tryToCommitTransaction
} while(nrRetries < NR_OF_TIMES_WAITING_FOR_COMPLETION && failed)
if (failed) {
log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get, latestMessage)
rollback(activeTx)
if (RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get)
else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]")
}
}
if (isInExistingTransaction) joinExistingTransaction
else if (isTransactional) startNewTransaction
incrementTransaction
try {
if (oneWay) {
postMessageToMailbox(message)
None
} else {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
if (blocking) future.awaitBlocking
else future.await
getResultOrThrowException(future)
}
} catch {
case e: TransactionAwareWrapperException =>
e.cause.printStackTrace
rollback(e.tx)
throw e.cause
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
else tryToPrecommitTransaction
TransactionManagement.threadBoundTx.set(None)
if (messageToReschedule.isDefined) {
val handle = messageToReschedule.get
val newTx = startNewTransaction
val clone = new MessageHandle(handle.sender, handle.message, handle.future, newTx)
log.debug("Rescheduling message %s", clone)
mailbox.append(clone) // FIXME append or prepend rescheduled messages?
}
}
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
if (TransactionManagement.isTransactionalityEnabled) throw new TransactionAwareWrapperException(cause, activeTx)
else throw cause
} else {
future.result.asInstanceOf[Option[T]]
}
/** /**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods * Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
*/ */
private[kernel] def handle(messageHandle: MessageHandle) = synchronized { private[kernel] def invoke(messageHandle: MessageInvocation) = synchronized {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(messageHandle)
else dispatch(messageHandle)
}
private def dispatch[T](messageHandle: MessageInvocation) = {
if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
val message = messageHandle.message val message = messageHandle.message
val future = messageHandle.future val future = messageHandle.future
try { try {
if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
senderFuture = future senderFuture = future
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString) else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch { } catch {
case e => case e =>
// FIXME to fix supervisor restart of actor for oneway calls, inject a supervisor proxy that can send notification back to client // FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e) if (supervisor.isDefined) supervisor.get ! Exit(this, e)
if (future.isDefined) future.get.completeWithException(this, e) if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace else e.printStackTrace
@ -498,6 +431,51 @@ trait Actor extends Logging with TransactionManagement {
} }
} }
private def transactionalDispatch[T](messageHandle: MessageInvocation) = {
if (messageHandle.tx.isDefined) TransactionManagement.threadBoundTx.set(messageHandle.tx)
val message = messageHandle.message
val future = messageHandle.future
try {
if (!tryToCommitTransaction && isTransactionTopLevel) handleCollision
if (isInExistingTransaction) joinExistingTransaction
else if (isTransactional) startNewTransaction
incrementTransaction
senderFuture = future
if (base.isDefinedAt(message)) base(message) // invoke user actor's receive partial function
else throw new IllegalArgumentException("No handler matching message [" + message + "] in " + toString)
} catch {
case e =>
rollback(activeTx)
TransactionManagement.threadBoundTx.set(None) // need to clear threadBoundTx before call to supervisor
// FIXME to fix supervisor restart of remote actor for oneway calls, inject a supervisor proxy that can send notification back to client
if (supervisor.isDefined) supervisor.get ! Exit(this, e)
if (future.isDefined) future.get.completeWithException(this, e)
else e.printStackTrace
} finally {
decrementTransaction
if (isTransactionAborted) removeTransactionIfTopLevel
else tryToPrecommitTransaction
rescheduleClashedMessages
TransactionManagement.threadBoundTx.set(None)
}
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw cause
} else future.result.asInstanceOf[Option[T]]
private def rescheduleClashedMessages = if (messageToReschedule.isDefined) {
val handle = messageToReschedule.get
val newTx = startNewTransaction
val clone = new MessageInvocation(handle.sender, handle.message, handle.future, newTx)
log.debug("Rescheduling message %s", clone)
mailbox.append(clone) // FIXME append or prepend rescheduled messages?
}
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive) private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)
private val lifeCycle: PartialFunction[Any, Unit] = { private val lifeCycle: PartialFunction[Any, Unit] = {
@ -564,7 +542,7 @@ trait Actor extends Logging with TransactionManagement {
private[kernel] def swapDispatcher(disp: MessageDispatcher) = { private[kernel] def swapDispatcher(disp: MessageDispatcher) = {
dispatcher = disp dispatcher = disp
mailbox = dispatcher.messageQueue mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageHandler(this)) dispatcher.registerHandler(this, new ActorMessageInvoker(this))
} }
override def toString(): String = "Actor[" + uuid + ":" + id + "]" override def toString(): String = "Actor[" + uuid + ":" + id + "]"

View file

@ -34,11 +34,11 @@ case class OneForOneStrategy(maxNrOfRetries: Int, withinTimeRange: Int) extends
* override protected def getSupervisorConfig: SupervisorConfig = { * override protected def getSupervisorConfig: SupervisorConfig = {
* SupervisorConfig( * SupervisorConfig(
* RestartStrategy(OneForOne, 3, 10), * RestartStrategy(OneForOne, 3, 10),
* Worker( * Supervise(
* myFirstActor, * myFirstActor,
* LifeCycle(Permanent, 1000)) * LifeCycle(Permanent, 1000))
* :: * ::
* Worker( * Supervise(
* mySecondActor, * mySecondActor,
* LifeCycle(Permanent, 1000)) * LifeCycle(Permanent, 1000))
* :: Nil) * :: Nil)
@ -123,7 +123,7 @@ class Supervisor(handler: FaultHandlingStrategy) extends Actor with Logging {
case SupervisorConfig(_, servers) => case SupervisorConfig(_, servers) =>
servers.map(server => servers.map(server =>
server match { server match {
case Worker(actor, lifecycle) => case Supervise(actor, lifecycle) =>
actor.lifeCycleConfig = Some(lifecycle) actor.lifeCycleConfig = Some(lifecycle)
startLink(actor) startLink(actor)

10
kernel/src/main/scala/collection/Vector.scala Executable file → Normal file
View file

@ -300,18 +300,18 @@ class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail
case vec: Vector[T] => { case vec: Vector[T] => {
var back = length == vec.length var back = length == vec.length
var i = 0 var i = 0
while (i < length) { while (i < length) {
back &&= apply(i) == vec.apply(i) back &&= apply(i) == vec.apply(i)
i += 1 i += 1
} }
back back
} }
case _ => false case _ => false
} }
override def hashCode = foldLeft(0) { _ ^ _.hashCode } override def hashCode = foldLeft(0) { _ ^ _.hashCode }
} }
@ -326,7 +326,7 @@ object Vector {
private[collection] def array(elems: AnyRef*) = { private[collection] def array(elems: AnyRef*) = {
val back = new Array[AnyRef](elems.length) val back = new Array[AnyRef](elems.length)
Array.copy(elems, 0, back, 0, back.length) Array.copy(elems, 0, back, 0, back.length)
back back
} }
} }

View file

@ -28,7 +28,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private var supervisor: Option[Supervisor] = None private var supervisor: Option[Supervisor] = None
private var restartStrategy: RestartStrategy = _ private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _ private var components: List[Component] = _
private var workers: List[Worker] = Nil private var supervised: List[Supervise] = Nil
private var bindings: List[DependencyBinding] = Nil private var bindings: List[DependencyBinding] = Nil
private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed? private var configRegistry = new HashMap[Class[_], Component] // TODO is configRegistry needed?
private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]] private var activeObjectRegistry = new HashMap[Class[_], Tuple3[AnyRef, AnyRef, Component]]
@ -104,7 +104,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None else None
val proxy = activeObjectFactory.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] val proxy = activeObjectFactory.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle) supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component)) activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy) new DependencyBinding(targetClass, proxy)
} }
@ -112,14 +112,14 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newDelegatingProxy(component: Component): DependencyBinding = { private def newDelegatingProxy(component: Component): DependencyBinding = {
val targetClass = component.intf.get val targetClass = component.intf.get
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]]()).setAccessible(true) component.target.getConstructor(Array[Class[_]](): _*).setAccessible(true)
val actor = new Dispatcher val actor = new Dispatcher
if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get) if (component.dispatcher.isDefined) actor.swapDispatcher(component.dispatcher.get)
val remoteAddress = val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port)) if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None else None
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef] val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle) supervised ::= Supervise(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component)) activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy) new DependencyBinding(targetClass, proxy)
} }
@ -132,7 +132,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
override def supervise: ActiveObjectConfigurator = synchronized { override def supervise: ActiveObjectConfigurator = synchronized {
if (injector == null) inject if (injector == null) inject
supervisor = Some(activeObjectFactory.supervise(restartStrategy, workers)) supervisor = Some(activeObjectFactory.supervise(restartStrategy, supervised))
//camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this)) //camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
//camelContext.start //camelContext.start
supervisor.get.startSupervisor supervisor.get.startSupervisor

View file

@ -22,14 +22,16 @@ object ScalaConfig {
abstract class Scope extends ConfigElement abstract class Scope extends ConfigElement
case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server case class SupervisorConfig(restartStrategy: RestartStrategy, worker: List[Server]) extends Server
case class Worker(actor: Actor, lifeCycle: LifeCycle) extends Server case class Supervise(actor: Actor, lifeCycle: LifeCycle) extends Server
case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement case class RestartStrategy(scheme: FailOverScheme, maxNrOfRetries: Int, withinTimeRange: Int) extends ConfigElement
case object AllForOne extends FailOverScheme case object AllForOne extends FailOverScheme
case object OneForOne extends FailOverScheme case object OneForOne extends FailOverScheme
case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement case class LifeCycle(scope: Scope, shutdownTime: Int) extends ConfigElement {
def this(scope: Scope) = this(scope, 0)
}
case object Permanent extends Scope case object Permanent extends Scope
case object Transient extends Scope case object Transient extends Scope
case object Temporary extends Scope case object Temporary extends Scope
@ -87,10 +89,13 @@ object JavaConfig {
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartStrategy( def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.RestartStrategy(
scheme.transform, maxNrOfRetries, withinTimeRange) scheme.transform, maxNrOfRetries, withinTimeRange)
} }
// class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int, val callbacks: RestartCallbacks) extends ConfigElement {
class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends ConfigElement { class LifeCycle(@BeanProperty val scope: Scope, @BeanProperty val shutdownTime: Int) extends ConfigElement {
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime) def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.LifeCycle(scope.transform, shutdownTime)
} }
class RestartCallbacks(val preRestart: String, val postRestart: String)
abstract class Scope extends ConfigElement { abstract class Scope extends ConfigElement {
def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.Scope def transform: se.scalablesolutions.akka.kernel.config.ScalaConfig.Scope
} }
@ -115,7 +120,7 @@ object JavaConfig {
} }
class RemoteAddress(@BeanProperty val hostname: String, @BeanProperty val port: Int) class RemoteAddress(@BeanProperty val hostname: String, @BeanProperty val port: Int)
abstract class Server extends ConfigElement abstract class Server extends ConfigElement
class Component(@BeanProperty val intf: Class[_], class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_], @BeanProperty val target: Class[_],
@ -150,8 +155,8 @@ object JavaConfig {
se.scalablesolutions.akka.kernel.config.ScalaConfig.Component(intf, target, lifeCycle.transform, timeout, dispatcher, se.scalablesolutions.akka.kernel.config.ScalaConfig.Component(intf, target, lifeCycle.transform, timeout, dispatcher,
if (remoteAddress != null) se.scalablesolutions.akka.kernel.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null) if (remoteAddress != null) se.scalablesolutions.akka.kernel.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
def newWorker(actor: Actor) = def newSupervised(actor: Actor) =
se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(actor, lifeCycle.transform) se.scalablesolutions.akka.kernel.config.ScalaConfig.Supervise(actor, lifeCycle.transform)
} }
} }

2
kernel/src/main/scala/config/Configuration.scala Executable file → Normal file
View file

@ -55,6 +55,6 @@ class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_], @BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle, @BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server { @BeanProperty val timeout: Int) extends Server {
def newWorker(proxy: ActiveObjectProxy) = se.scalablesolutions.akka.kernel.Worker(proxy.server, lifeCycle.transform) def newWorker(proxy: ActiveObjectProxy) = se.scalablesolutions.akka.kernel.Supervise(proxy.server, lifeCycle.transform)
} }
*/ */

View file

@ -123,12 +123,12 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
val argClasses = args.map(_.getClass) val argClasses = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.timeout) val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClasses, request.timeout)
continueTransaction(request) //continueTransaction(request)
try { try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses) val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses: _*)
if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs) if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs: _*)
else { else {
val result = messageReceiver.invoke(activeObject, unescapedArgs) val result = messageReceiver.invoke(activeObject, unescapedArgs: _*)
log.debug("Returning result from remote active object invocation [%s]", result) log.debug("Returning result from remote active object invocation [%s]", result)
//channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get)) //channel.write(request.newReplyWithMessage(result, TransactionManagement.threadBoundTx.get))
channel.write(request.newReplyWithMessage(result, null)) channel.write(request.newReplyWithMessage(result, null))

View file

@ -0,0 +1,44 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.reactor
import kernel.actor.Actor
/**
* Dispatcher factory.
* <p/>
* Example usage:
* <pre/>
* val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
* .withNewThreadPoolWithBoundedBlockingQueue(100)
* .setCorePoolSize(16)
* .setMaxPoolSize(128)
* .setKeepAliveTimeInMillis(60000)
* .setRejectionPolicy(new CallerRunsPolicy)
* .buildThreadPool
* </pre>
* <p/>
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Dispatchers {
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
* Has a fluent builder interface for configuring its semantics.
*/
def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
*/
def newEventBasedSingleThreadDispatcher = new EventBasedSingleThreadDispatcher
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
}

View file

@ -13,18 +13,18 @@ package se.scalablesolutions.akka.kernel.reactor
class EventBasedSingleThreadDispatcher extends MessageDispatcherBase { class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
def start = if (!active) { def start = if (!active) {
active = true active = true
val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(messageQueue) val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue)
selectorThread = new Thread { selectorThread = new Thread {
override def run = { override def run = {
while (active) { while (active) {
try { try {
messageDemultiplexer.select messageDemultiplexer.select
} catch { case e: InterruptedException => active = false } } catch { case e: InterruptedException => active = false }
val queue = messageDemultiplexer.acquireSelectedQueue val selectedQueue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until queue.size) { for (index <- 0 until selectedQueue.size) {
val handle = queue.remove val handle = selectedQueue.remove
val handler = messageHandlers.get(handle.sender) val handler = messageHandlers.get(handle.sender)
if (handler != null) handler.handle(handle) if (handler != null) handler.invoke(handle)
} }
} }
} }
@ -33,14 +33,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
} }
} }
class EventBasedSingleThreadDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { class EventBasedSingleThreadDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
import java.util.{LinkedList, Queue} import java.util.{LinkedList, Queue}
private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle] private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
def select = messageQueue.read(selectedQueue) def select = messageQueue.read(selectedQueue)
def acquireSelectedQueue: Queue[MessageHandle] = selectedQueue def acquireSelectedQueue: Queue[MessageInvocation] = selectedQueue
def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue") def releaseSelectedQueue = throw new UnsupportedOperationException("EventBasedSingleThreadDemultiplexer can't release its queue")

View file

@ -77,7 +77,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
/** /**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/ */
val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(messageQueue) val messageDemultiplexer = new EventBasedThreadPoolDemultiplexer(queue)
selectorThread = new Thread { selectorThread = new Thread {
override def run = { override def run = {
while (active) { while (active) {
@ -86,19 +86,19 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
messageDemultiplexer.select messageDemultiplexer.select
} catch {case e: InterruptedException => active = false} } catch {case e: InterruptedException => active = false}
val queue = messageDemultiplexer.acquireSelectedQueue val selectedQueue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until queue.size) { for (index <- 0 until selectedQueue.size) {
val message = queue.peek val message = selectedQueue.peek
val messageHandler = getIfNotBusy(message.sender) val messageHandler = getIfNotBusy(message.sender)
if (messageHandler.isDefined) { if (messageHandler.isDefined) {
executor.execute(new Runnable { executor.execute(new Runnable {
override def run = { override def run = {
messageHandler.get.handle(message) messageHandler.get.invoke(message)
free(message.sender) free(message.sender)
messageDemultiplexer.wakeUp messageDemultiplexer.wakeUp
} }
}) })
queue.remove selectedQueue.remove
} }
} }
} finally { } finally {
@ -112,7 +112,7 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
override protected def doShutdown = executor.shutdownNow override protected def doShutdown = executor.shutdownNow
private def getIfNotBusy(key: AnyRef): Option[MessageHandler] = guard.synchronized { private def getIfNotBusy(key: AnyRef): Option[MessageInvoker] = guard.synchronized {
if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key)) if (CONCURRENT_MODE && messageHandlers.containsKey(key)) Some(messageHandlers.get(key))
else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) { else if (!busyHandlers.contains(key) && messageHandlers.containsKey(key)) {
busyHandlers.add(key) busyHandlers.add(key)
@ -240,8 +240,8 @@ class EventBasedThreadPoolDispatcher extends MessageDispatcherBase {
private def ensureNotActive = if (active) throw new IllegalStateException("Can't build a new thread pool for a dispatcher that is already up and running") private def ensureNotActive = if (active) throw new IllegalStateException("Can't build a new thread pool for a dispatcher that is already up and running")
} }
class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer { class EventBasedThreadPoolDemultiplexer(private val messageQueue: ReactiveMessageQueue) extends MessageDemultiplexer {
private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle] private val selectedQueue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
private val selectedQueueLock = new ReentrantLock private val selectedQueueLock = new ReentrantLock
def select = try { def select = try {
@ -251,7 +251,7 @@ class EventBasedThreadPoolDemultiplexer(private val messageQueue: MessageQueue)
selectedQueueLock.unlock selectedQueueLock.unlock
} }
def acquireSelectedQueue: Queue[MessageHandle] = { def acquireSelectedQueue: Queue[MessageInvocation] = {
selectedQueueLock.lock selectedQueueLock.lock
selectedQueue selectedQueue
} }

View file

@ -4,6 +4,7 @@
package se.scalablesolutions.akka.kernel.reactor package se.scalablesolutions.akka.kernel.reactor
import java.util.{LinkedList, Queue}
import java.util.concurrent.TimeUnit import java.util.concurrent.TimeUnit
import java.util.HashMap import java.util.HashMap
@ -11,14 +12,16 @@ trait MessageDispatcherBase extends MessageDispatcher {
val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false) val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
val MILLISECONDS = TimeUnit.MILLISECONDS val MILLISECONDS = TimeUnit.MILLISECONDS
val messageQueue = new MessageQueue val queue = new ReactiveMessageQueue
@volatile protected var active: Boolean = false @volatile protected var active: Boolean = false
protected val messageHandlers = new HashMap[AnyRef, MessageHandler] protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
protected var selectorThread: Thread = _ protected var selectorThread: Thread = _
protected val guard = new Object protected val guard = new Object
def registerHandler(key: AnyRef, handler: MessageHandler) = guard.synchronized { def messageQueue = queue
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
messageHandlers.put(key, handler) messageHandlers.put(key, handler)
} }
@ -37,3 +40,29 @@ trait MessageDispatcherBase extends MessageDispatcher {
*/ */
protected def doShutdown = {} protected def doShutdown = {}
} }
class ReactiveMessageQueue extends MessageQueue {
private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
@volatile private var interrupted = false
def append(handle: MessageInvocation) = queue.synchronized {
queue.offer(handle)
queue.notifyAll
}
def prepend(handle: MessageInvocation) = queue.synchronized {
queue.add(handle)
queue.notifyAll
}
def read(destination: Queue[MessageInvocation]) = queue.synchronized {
while (queue.isEmpty && !interrupted) queue.wait
if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
else interrupted = false
}
def interrupt = queue.synchronized {
interrupted = true
queue.notifyAll
}
}

View file

@ -2,26 +2,24 @@
* Copyright (C) 2009 Scalable Solutions. * Copyright (C) 2009 Scalable Solutions.
*/ */
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.kernel.reactor package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.atomic.AtomicInteger import java.util.Queue
import java.util.concurrent.ThreadFactory
import java.util.{LinkedList, Queue}
import kernel.stm.Transaction import kernel.stm.Transaction
import kernel.util.{Logging, HashCode} import kernel.util.HashCode
trait MessageHandler {
def handle(message: MessageHandle) trait MessageQueue {
def append(handle: MessageInvocation)
def prepend(handle: MessageInvocation)
}
trait MessageInvoker {
def invoke(message: MessageInvocation)
} }
trait MessageDispatcher { trait MessageDispatcher {
def messageQueue: MessageQueue def messageQueue: MessageQueue
def registerHandler(key: AnyRef, handler: MessageHandler) def registerHandler(key: AnyRef, handler: MessageInvoker)
def unregisterHandler(key: AnyRef) def unregisterHandler(key: AnyRef)
def start def start
def shutdown def shutdown
@ -29,15 +27,15 @@ trait MessageDispatcher {
trait MessageDemultiplexer { trait MessageDemultiplexer {
def select def select
def acquireSelectedQueue: Queue[MessageHandle] def acquireSelectedQueue: Queue[MessageInvocation]
def releaseSelectedQueue def releaseSelectedQueue
def wakeUp def wakeUp
} }
class MessageHandle(val sender: AnyRef, class MessageInvocation(val sender: AnyRef,
val message: AnyRef, val message: AnyRef,
val future: Option[CompletableFutureResult], val future: Option[CompletableFutureResult],
val tx: Option[Transaction]) { val tx: Option[Transaction]) {
override def hashCode(): Int = { override def hashCode(): Int = {
var result = HashCode.SEED var result = HashCode.SEED
@ -50,39 +48,13 @@ class MessageHandle(val sender: AnyRef,
override def equals(that: Any): Boolean = override def equals(that: Any): Boolean =
that != null && that != null &&
that.isInstanceOf[MessageHandle] && that.isInstanceOf[MessageInvocation] &&
that.asInstanceOf[MessageHandle].sender == sender && that.asInstanceOf[MessageInvocation].sender == sender &&
that.asInstanceOf[MessageHandle].message == message && that.asInstanceOf[MessageInvocation].message == message &&
that.asInstanceOf[MessageHandle].future.isDefined == future.isDefined && that.asInstanceOf[MessageInvocation].future.isDefined == future.isDefined &&
that.asInstanceOf[MessageHandle].future.get == future.get && that.asInstanceOf[MessageInvocation].future.get == future.get &&
that.asInstanceOf[MessageHandle].tx.isDefined == tx.isDefined && that.asInstanceOf[MessageInvocation].tx.isDefined == tx.isDefined &&
that.asInstanceOf[MessageHandle].tx.get.id == tx.get.id that.asInstanceOf[MessageInvocation].tx.get.id == tx.get.id
override def toString(): String = "MessageHandle[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]" override def toString(): String = "MessageInvocation[message = " + message + ", sender = " + sender + ", future = " + future + ", tx = " + tx + "]"
}
class MessageQueue {
private[kernel] val queue: Queue[MessageHandle] = new LinkedList[MessageHandle]
@volatile private var interrupted = false
def append(handle: MessageHandle) = queue.synchronized {
queue.offer(handle)
queue.notifyAll
}
def prepend(handle: MessageHandle) = queue.synchronized {
queue.add(handle)
queue.notifyAll
}
def read(destination: Queue[MessageHandle]) = queue.synchronized {
while (queue.isEmpty && !interrupted) queue.wait
if (!interrupted) while (!queue.isEmpty) destination.offer(queue.remove)
else interrupted = false
}
def interrupt = queue.synchronized {
interrupted = true
queue.notifyAll
}
} }

View file

@ -0,0 +1,55 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import kernel.actor.{Actor, ActorMessageInvoker}
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) extends MessageDispatcher {
def this(actor: Actor) = this(new ActorMessageInvoker(actor))
private val queue = new BlockingMessageQueue
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
def messageQueue = queue
def start = if (!active) {
active = true
selectorThread = new Thread {
override def run = {
while (active) {
try {
messageHandler.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
}
}
selectorThread.start
}
def shutdown = if (active) {
active = false
selectorThread.interrupt
}
def registerHandler(key: AnyRef, handler: MessageInvoker) = throw new UnsupportedOperationException
def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException
}
class BlockingMessageQueue extends MessageQueue {
// FIXME: configure the LBQ
private val queue = new LinkedBlockingQueue[MessageInvocation]
def append(handle: MessageInvocation) = queue.put(handle)
def prepend(handle: MessageInvocation) = queue.add(handle) // FIXME is add prepend???
def take: MessageInvocation = queue.take
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
def interrupt = throw new UnsupportedOperationException
}

View file

@ -53,10 +53,10 @@ final object CassandraStorage extends Logging {
if (!isRunning) { if (!isRunning) {
try { try {
server.start server.start
log.info("Persistent storage has started up successfully"); log.info("Cassandra persistent storage has started up successfully");
} catch { } catch {
case e => case e =>
log.error("Could not start up persistent storage") log.error("Could not start up Cassandra persistent storage")
throw e throw e
} }
if (RUN_THRIFT_SERVICE) { if (RUN_THRIFT_SERVICE) {
@ -214,7 +214,7 @@ class CassandraThriftServer(server: CassandraServer) extends Logging {
options) options)
} catch { } catch {
case e => case e =>
log.error("Could not start up persistent storage node.") log.error("Could not start up Cassandra thrift service")
throw e throw e
} }
@ -222,9 +222,9 @@ class CassandraThriftServer(server: CassandraServer) extends Logging {
private[this] val serverDaemon = actor { private[this] val serverDaemon = actor {
receive { receive {
case Start => case Start =>
log.info("Cassandra thrift service is starting up...")
serverEngine.serve serverEngine.serve
case Stop => log.info("Cassandra thrift service has starting up successfully")
case Stop =>
log.info("Cassandra thrift service is shutting down...") log.info("Cassandra thrift service is shutting down...")
serverEngine.stop serverEngine.stop
} }

View file

@ -39,8 +39,8 @@ object DataFlow {
private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor { private class ReactiveEventBasedThread[MessageType, ReturnType](body: MessageType => ReturnType) extends Actor {
def act = loop { def act = loop {
react { react {
case message: MessageType => sender ! body(message) case 'exit => exit()
case 'exit => exit() case message => sender ! body(message.asInstanceOf[MessageType])
} }
} }
} }

View file

@ -8,8 +8,6 @@ import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import kernel.state.Transactional import kernel.state.Transactional
import kernel.util.Logging import kernel.util.Logging
class TransactionRollbackException(msg: String) extends RuntimeException(msg)
@serializable sealed abstract class TransactionStatus @serializable sealed abstract class TransactionStatus
object TransactionStatus { object TransactionStatus {
case object New extends TransactionStatus case object New extends TransactionStatus
@ -39,22 +37,22 @@ object TransactionIdFactory {
log.debug("Creating a new transaction with id [%s]", id) log.debug("Creating a new transaction with id [%s]", id)
private[this] val transactionals = new ChangeSet @volatile private[this] var status: TransactionStatus = TransactionStatus.New
private[this] val transactionalItems = new ChangeSet
private[this] var participants: List[String] = Nil private[this] var participants: List[String] = Nil
private[this] var precommitted: List[String] = Nil private[this] var precommitted: List[String] = Nil
private[this] val depth = new AtomicInteger(0) private[this] val depth = new AtomicInteger(0)
@volatile private[this] var status: TransactionStatus = TransactionStatus.New
def increment = synchronized { depth.incrementAndGet } def increment = synchronized { depth.incrementAndGet }
def decrement = synchronized { depth.decrementAndGet } def decrement = synchronized { depth.decrementAndGet }
def topLevel_? = synchronized { depth.get == 0 } def isTopLevel = synchronized { depth.get == 0 }
def register(transactional: Transactional) = synchronized { def register(transactional: Transactional) = synchronized {
ensureIsActiveOrNew ensureIsActiveOrNew
transactionals + transactional transactionalItems + transactional
} }
def begin(participant: String) = synchronized { def begin(participant: String) = synchronized {
@ -83,7 +81,7 @@ object TransactionIdFactory {
}}.exists(_ == true) }}.exists(_ == true)
} else false } else false
if (haveAllPreCommitted) { if (haveAllPreCommitted) {
transactionals.items.foreach(_.commit) transactionalItems.items.foreach(_.commit)
status = TransactionStatus.Completed status = TransactionStatus.Completed
reset reset
true true
@ -97,7 +95,7 @@ object TransactionIdFactory {
def rollback(participant: String) = synchronized { def rollback(participant: String) = synchronized {
ensureIsActiveOrAborted ensureIsActiveOrAborted
log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) log.debug("TX ROLLBACK - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
transactionals.items.foreach(_.rollback) transactionalItems.items.foreach(_.rollback)
status = TransactionStatus.Aborted status = TransactionStatus.Aborted
reset reset
} }
@ -105,7 +103,7 @@ object TransactionIdFactory {
def rollbackForRescheduling(participant: String) = synchronized { def rollbackForRescheduling(participant: String) = synchronized {
ensureIsActiveOrAborted ensureIsActiveOrAborted
log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString) log.debug("TX ROLLBACK for recheduling - Server with UUID [%s] has initiated transaction rollback for [%s]", participant, toString)
transactionals.items.foreach(_.rollback) transactionalItems.items.foreach(_.rollback)
reset reset
} }
@ -121,7 +119,7 @@ object TransactionIdFactory {
def isAborted = status == TransactionStatus.Aborted def isAborted = status == TransactionStatus.Aborted
private def reset = { private def reset = {
transactionals.clear transactionalItems.clear
participants = Nil participants = Nil
precommitted = Nil precommitted = Nil
} }
@ -136,7 +134,7 @@ object TransactionIdFactory {
throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString) throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]: " + toString)
// For reinitialize transaction after sending it over the wire // For reinitialize transaction after sending it over the wire
private[kernel] def reinit = { private[kernel] def reinit = synchronized {
import net.lag.logging.{Logger, Level} import net.lag.logging.{Logger, Level}
if (log == null) { if (log == null) {
log = Logger.get(this.getClass.getName) log = Logger.get(this.getClass.getName)

View file

@ -6,8 +6,11 @@ package se.scalablesolutions.akka.kernel.stm
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import kernel.reactor.MessageInvocation
import kernel.util.Logging import kernel.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid import org.codehaus.aspectwerkz.proxy.Uuid // FIXME is java.util.UUID better?
class TransactionRollbackException(msg: String) extends RuntimeException(msg)
class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) { class TransactionAwareWrapperException(val cause: Throwable, val tx: Option[Transaction]) extends RuntimeException(cause) {
override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]" override def toString(): String = "TransactionAwareWrapperException[" + cause + ", " + tx + "]"
@ -17,7 +20,8 @@ object TransactionManagement {
val TIME_WAITING_FOR_COMPLETION = kernel.Kernel.config.getInt("akka.stm.wait-for-completion", 100) val TIME_WAITING_FOR_COMPLETION = kernel.Kernel.config.getInt("akka.stm.wait-for-completion", 100)
val NR_OF_TIMES_WAITING_FOR_COMPLETION = kernel.Kernel.config.getInt("akka.stm.wait-nr-of-times", 3) val NR_OF_TIMES_WAITING_FOR_COMPLETION = kernel.Kernel.config.getInt("akka.stm.wait-nr-of-times", 3)
val TRANSACTION_ENABLED = new AtomicBoolean(kernel.Kernel.config.getBool("akka.stm.service", true)) val TRANSACTION_ENABLED = new AtomicBoolean(kernel.Kernel.config.getBool("akka.stm.service", true))
val RESTART_TRANSACTION_ON_COLLISION = kernel.Kernel.config.getBool("akka.stm.restart-transaction", true) // FIXME reenable 'akka.stm.restart-on-collision' when new STM is in place
val RESTART_TRANSACTION_ON_COLLISION = false //kernel.Kernel.config.getBool("akka.stm.restart-on-collision", true)
def isTransactionalityEnabled = TRANSACTION_ENABLED.get def isTransactionalityEnabled = TRANSACTION_ENABLED.get
def disableTransactions = TRANSACTION_ENABLED.set(false) def disableTransactions = TRANSACTION_ENABLED.set(false)
@ -27,9 +31,11 @@ object TransactionManagement {
} }
} }
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
trait TransactionManagement extends Logging { trait TransactionManagement extends Logging {
val uuid = Uuid.newUuid.toString val uuid = Uuid.newUuid.toString
protected[this] var latestMessage: Option[MessageInvocation] = None
protected[this] var messageToReschedule: Option[MessageInvocation] = None
import TransactionManagement.threadBoundTx import TransactionManagement.threadBoundTx
private[kernel] var activeTx: Option[Transaction] = None private[kernel] var activeTx: Option[Transaction] = None
@ -74,6 +80,25 @@ trait TransactionManagement extends Logging {
tx.rollbackForRescheduling(uuid) tx.rollbackForRescheduling(uuid)
} }
protected def handleCollision = {
var nrRetries = 0
var failed = true
do {
Thread.sleep(TransactionManagement.TIME_WAITING_FOR_COMPLETION)
nrRetries += 1
log.debug("Pending transaction [%s] not completed, waiting %s milliseconds. Attempt %s", activeTx.get, TransactionManagement.TIME_WAITING_FOR_COMPLETION, nrRetries)
failed = !tryToCommitTransaction
} while(nrRetries < TransactionManagement.NR_OF_TIMES_WAITING_FOR_COMPLETION && failed)
if (failed) {
log.debug("Pending transaction [%s] still not completed, aborting and rescheduling message [%s]", activeTx.get, latestMessage)
rollback(activeTx)
if (TransactionManagement.RESTART_TRANSACTION_ON_COLLISION) messageToReschedule = Some(latestMessage.get)
else throw new TransactionRollbackException("Conflicting transactions, rolling back transaction for message [" + latestMessage + "]")
}
}
protected def isTransactionTopLevel = activeTx.isDefined && activeTx.get.isTopLevel
protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined protected def isInExistingTransaction = TransactionManagement.threadBoundTx.get.isDefined
protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted protected def isTransactionAborted = activeTx.isDefined && activeTx.get.isAborted
@ -82,11 +107,10 @@ trait TransactionManagement extends Logging {
protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement protected def decrementTransaction = if (activeTx.isDefined) activeTx.get.decrement
protected def removeTransactionIfTopLevel = protected def removeTransactionIfTopLevel = if (isTransactionTopLevel) {
if (activeTx.isDefined && activeTx.get.topLevel_?) { activeTx = None
activeTx = None threadBoundTx.set(None)
threadBoundTx.set(None) }
}
protected def reenteringExistingTransaction= if (activeTx.isDefined) { protected def reenteringExistingTransaction= if (activeTx.isDefined) {
val cflowTx = threadBoundTx.get val cflowTx = threadBoundTx.get

View file

@ -0,0 +1,26 @@
package se.scalablesolutions.akka.kernel
import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
import kernel.actor.{ActorSpec, RemoteActorSpec, PersistentActorSpec, InMemoryActorSpec}
import kernel.reactor.{EventBasedSingleThreadDispatcherTest, EventBasedThreadPoolDispatcherTest}
object AllTest extends TestCase {
def suite(): Test = {
val suite = new TestSuite("All Scala tests")
suite.addTestSuite(classOf[SupervisorSpec])
suite.addTestSuite(classOf[RemoteSupervisorSpec])
suite.addTestSuite(classOf[EventBasedSingleThreadDispatcherTest])
suite.addTestSuite(classOf[EventBasedThreadPoolDispatcherTest])
suite.addTestSuite(classOf[ActorSpec])
suite.addTestSuite(classOf[RemoteActorSpec])
suite.addTestSuite(classOf[PersistentActorSpec])
suite.addTestSuite(classOf[InMemoryActorSpec])
//suite.addTestSuite(classOf[TransactionClasherSpec])
suite
}
def main(args: Array[String]) = junit.textui.TestRunner.run(suite)
}

View file

@ -1,22 +0,0 @@
package se.scalablesolutions.akka.kernel
import junit.framework.Test
import junit.framework.TestCase
import junit.framework.TestSuite
object AllTests extends TestCase {
def suite(): Test = {
val suite = new TestSuite("All tests")
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.SupervisorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.RemoteSupervisorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.EventBasedDispatcherTest])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.reactor.ThreadBasedDispatcherTest])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.ActorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.RemoteActorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.PersistentActorSpec])
suite.addTestSuite(classOf[se.scalablesolutions.akka.kernel.actor.InMemoryActorSpec])
suite
}
def main(args: Array[String]) = junit.textui.TestRunner.run(suite)
}

View file

@ -9,13 +9,13 @@ import org.junit.{Test, Before}
import org.junit.Assert._ import org.junit.Assert._
import junit.framework.TestCase import junit.framework.TestCase
class EventBasedDispatcherTest extends TestCase { class EventBasedSingleThreadDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null private var threadingIssueDetected: AtomicBoolean = null
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageHandler { class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock val guardLock: Lock = new ReentrantLock
def handle(message: MessageHandle) { def invoke(message: MessageInvocation) {
try { try {
if (threadingIssueDetected.get) return if (threadingIssueDetected.get) return
if (guardLock.tryLock) { if (guardLock.tryLock) {
@ -59,7 +59,7 @@ class EventBasedDispatcherTest extends TestCase {
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None)) dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
@ -73,8 +73,8 @@ class EventBasedDispatcherTest extends TestCase {
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start dispatcher.start
dispatcher.messageQueue.append(new MessageHandle(key1, new Object, None, None)) dispatcher.messageQueue.append(new MessageInvocation(key1, new Object, None, None))
dispatcher.messageQueue.append(new MessageHandle(key2, new Object, None, None)) dispatcher.messageQueue.append(new MessageInvocation(key2, new Object, None, None))
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
} }
@ -84,9 +84,9 @@ class EventBasedDispatcherTest extends TestCase {
val key1 = "key1" val key1 = "key1"
val key2 = "key2" val key2 = "key2"
val dispatcher = new EventBasedSingleThreadDispatcher val dispatcher = new EventBasedSingleThreadDispatcher
dispatcher.registerHandler(key1, new MessageHandler { dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1; var currentValue = -1;
def handle(message: MessageHandle) { def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int] val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) { if (messageValue.intValue == currentValue + 1) {
@ -95,9 +95,9 @@ class EventBasedDispatcherTest extends TestCase {
} else threadingIssueDetected.set(true) } else threadingIssueDetected.set(true)
} }
}) })
dispatcher.registerHandler(key2, new MessageHandler { dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1; var currentValue = -1;
def handle(message: MessageHandle) { def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int] val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) { if (messageValue.intValue == currentValue + 1) {
@ -108,8 +108,8 @@ class EventBasedDispatcherTest extends TestCase {
}) })
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), None, None)) dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), None, None)) dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)

View file

@ -0,0 +1,143 @@
package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit}
import org.junit.Before
import org.junit.Test
import org.junit.Assert._
import junit.framework.TestCase
class EventBasedThreadPoolDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null
@Before
override def setUp = {
threadingIssueDetected = new AtomicBoolean(false)
}
@Test
def testMessagesDispatchedToTheSameHandlerAreExecutedSequentially = {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
}
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
}
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key, new MessageInvoker {
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
}
}
})
dispatcher.start
for (i <- 0 until 10) {
dispatcher.messageQueue.append(new MessageInvocation(key, new Object, None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
def invoke(message: MessageInvocation) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.messageQueue.append(new MessageInvocation(key1, "Sending Message 1", None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, "Sending Message 2", None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation(key1, new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageInvocation(key2, new Integer(i), None, None))
}
assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get)
}
}

View file

@ -140,7 +140,7 @@ class InMemoryActorSpec extends TestCase {
Thread.sleep(100) Thread.sleep(100)
stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired stateful ! SuccessOneWay("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state") // transactionrequired
Thread.sleep(100) Thread.sleep(100)
assertEquals("new state", (stateful !! GetVectorSize).get) assertEquals(2, (stateful !! GetVectorSize).get)
} }
@Test @Test
@ -225,4 +225,5 @@ class InMemoryActorSpec extends TestCase {
} catch {case e: RuntimeException => {}} } catch {case e: RuntimeException => {}}
assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state assertEquals("init", (stateful !! GetRefState).get) // check that state is == init state
} }
} }

View file

@ -474,7 +474,7 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -491,7 +491,7 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100), RestartStrategy(OneForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -512,15 +512,15 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -541,15 +541,15 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100), RestartStrategy(OneForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -570,17 +570,17 @@ class RemoteSupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)

View file

@ -460,7 +460,7 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -476,7 +476,7 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100), RestartStrategy(OneForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -494,15 +494,15 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -520,15 +520,15 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(OneForOne, 3, 100), RestartStrategy(OneForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)
@ -546,17 +546,17 @@ class SupervisorSpec extends junit.framework.TestCase with Suite {
override def getSupervisorConfig: SupervisorConfig = { override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong1, pingpong1,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
SupervisorConfig( SupervisorConfig(
RestartStrategy(AllForOne, 3, 100), RestartStrategy(AllForOne, 3, 100),
Worker( Supervise(
pingpong2, pingpong2,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: ::
Worker( Supervise(
pingpong3, pingpong3,
LifeCycle(Permanent, 100)) LifeCycle(Permanent, 100))
:: Nil) :: Nil)

View file

@ -1,18 +1,36 @@
package se.scalablesolutions.akka.kernel.reactor package se.scalablesolutions.akka.kernel.reactor
import java.util.concurrent.ThreadPoolExecutor.CallerRunsPolicy import java.util.concurrent.CountDownLatch
import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.locks.Lock import java.util.concurrent.locks.Lock
import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.{Executors, CountDownLatch, CyclicBarrier, TimeUnit} import org.junit.{Test, Before}
import org.junit.Before
import org.junit.Test
import org.junit.Assert._ import org.junit.Assert._
import junit.framework.TestCase import junit.framework.TestCase
class ThreadBasedDispatcherTest extends TestCase { class ThreadBasedDispatcherTest extends TestCase {
private var threadingIssueDetected: AtomicBoolean = null private var threadingIssueDetected: AtomicBoolean = null
class TestMessageHandle(handleLatch: CountDownLatch) extends MessageInvoker {
val guardLock: Lock = new ReentrantLock
def invoke(message: MessageInvocation) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true)
} finally {
guardLock.unlock
}
}
}
@Before @Before
override def setUp = { override def setUp = {
threadingIssueDetected = new AtomicBoolean(false) threadingIssueDetected = new AtomicBoolean(false)
@ -23,11 +41,6 @@ class ThreadBasedDispatcherTest extends TestCase {
internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially
} }
@Test
def testMessagesDispatchedToDifferentHandlersAreExecutedConcurrently = {
internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently
}
@Test @Test
def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = { def testMessagesDispatchedToHandlersAreExecutedInFIFOOrder = {
internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder
@ -35,95 +48,21 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10) val handleLatch = new CountDownLatch(100)
val key = "key" val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch))
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key, new MessageHandler {
def handle(message: MessageHandle) {
try {
if (threadingIssueDetected.get) return
if (guardLock.tryLock) {
Thread.sleep(100)
handleLatch.countDown
} else {
threadingIssueDetected.set(true)
}
} catch {
case e: Exception => threadingIssueDetected.set(true); e.printStackTrace
} finally {
guardLock.unlock
}
}
})
dispatcher.start dispatcher.start
for (i <- 0 until 10) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key, new Object, None, None)) dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
} }
private def internalTestMessagesDispatchedToDifferentHandlersAreExecutedConcurrently: Unit = {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.registerHandler(key2, new MessageHandler {
def handle(message: MessageHandle) = synchronized {
try {handlersBarrier.await(1, TimeUnit.SECONDS)}
catch {case e: Exception => threadingIssueDetected.set(true)}
}
})
dispatcher.start
dispatcher.messageQueue.append(new MessageHandle(key1, "Sending Message 1", None, None))
dispatcher.messageQueue.append(new MessageHandle(key2, "Sending Message 2", None, None))
handlersBarrier.await(5, TimeUnit.SECONDS)
assertFalse(threadingIssueDetected.get)
}
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(200) val handleLatch = new CountDownLatch(100)
val key1 = "key1" val dispatcher = new ThreadBasedDispatcher(new MessageInvoker {
val key2 = "key2"
val dispatcher = new EventBasedThreadPoolDispatcher
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new CallerRunsPolicy)
.buildThreadPool
dispatcher.registerHandler(key1, new MessageHandler {
var currentValue = -1; var currentValue = -1;
def handle(message: MessageHandle) { def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) {
currentValue = messageValue.intValue
handleLatch.countDown
} else threadingIssueDetected.set(true)
}
})
dispatcher.registerHandler(key2, new MessageHandler {
var currentValue = -1;
def handle(message: MessageHandle) {
if (threadingIssueDetected.get) return if (threadingIssueDetected.get) return
val messageValue = message.message.asInstanceOf[Int] val messageValue = message.message.asInstanceOf[Int]
if (messageValue.intValue == currentValue + 1) { if (messageValue.intValue == currentValue + 1) {
@ -134,10 +73,10 @@ class ThreadBasedDispatcherTest extends TestCase {
}) })
dispatcher.start dispatcher.start
for (i <- 0 until 100) { for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageHandle(key1, new Integer(i), None, None)) dispatcher.messageQueue.append(new MessageInvocation("id", new Integer(i), None, None))
dispatcher.messageQueue.append(new MessageHandle(key2, new Integer(i), None, None))
} }
assertTrue(handleLatch.await(5, TimeUnit.SECONDS)) assertTrue(handleLatch.await(5, TimeUnit.SECONDS))
assertFalse(threadingIssueDetected.get) assertFalse(threadingIssueDetected.get)
dispatcher.shutdown
} }
} }

View file

@ -48,9 +48,42 @@ class TxClasherActor extends Actor {
} }
} }
class TxActorOneWay(clasher: Actor) extends Actor {
timeout = 1000000
makeTransactional
def receive: PartialFunction[Any, Unit] = {
case msg: AnyRef =>
clasher ! msg
}
}
class TxClasherActorOneWay extends Actor {
val vector = TransactionalState.newInMemoryVector[String]
timeout = 1000000
makeTransactional
var count = 0
def receive: PartialFunction[Any, Unit] = {
case "First" =>
if (count == 0) Thread.sleep(5000)
count += 1
println("FIRST")
vector.add("First")
println("--- VECTOR: " + vector)
case "Second" =>
println("SECOND")
vector.add("Second")
println("--- VECTOR: " + vector)
case "Index0" =>
reply(vector(0))
case "Index1" =>
reply(vector(1))
}
}
class TransactionClasherSpec extends TestCase { class TransactionClasherSpec extends TestCase {
@Test @Test
def testX = { def testBangBangClash = {
val clasher = new TxClasherActor val clasher = new TxClasherActor
clasher.start clasher.start
val txActor1 = new TxActor(clasher) val txActor1 = new TxActor(clasher)
@ -70,6 +103,27 @@ class TransactionClasherSpec extends TestCase {
} catch { case e: TransactionRollbackException => {} } } catch { case e: TransactionRollbackException => {} }
} }
@Test
def testBangClash = {
val clasher = new TxClasherActorOneWay
clasher.start
val txActor1 = new TxActorOneWay(clasher)
txActor1.start
val txActor2 = new TxActorOneWay(clasher)
txActor2.start
val t1 = new Thread(new Runnable() {
def run = {
txActor1 ! "First"
}
}).start
Thread.sleep(1000)
try {
txActor2 ! "Second"
fail("Expected TransactionRollbackException")
} catch { case e: TransactionRollbackException => {} }
}
/* /*
@Test @Test
def testX = { def testX = {

View file

@ -0,0 +1,11 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.annotation;
import java.lang.annotation.*;
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface configuration {}