finished remote actors (with tests) plus half-baked distributed transactions (not complete)

This commit is contained in:
Jonas Boner 2009-06-25 23:47:30 +02:00
parent 10a0c16cb2
commit a585e0ce38
19 changed files with 1055 additions and 817 deletions

View file

@ -142,6 +142,8 @@
<component name="Encoding" useUTFGuessing="true" native2AsciiForPropertiesFiles="false">
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java" charset="MacRoman" />
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/PersistentStateTest.java" charset="windows-1252" />
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemoteInMemoryStateTest.java" charset="MacRoman" />
<file url="file://$PROJECT_DIR$/fun-test-java/src/test/java/se/scalablesolutions/akka/api/RemotePersistentStateTest.java" charset="windows-1252" />
<file url="file://$PROJECT_DIR$/kernel/src/main/scala/Kernel.scala" charset="UTF-8" />
<file url="file://$PROJECT_DIR$/kernel/src/main/scala/collection/HashTrie.scala" charset="windows-1252" />
<file url="file://$PROJECT_DIR$/kernel/src/main/scala/collection/Vector.scala" charset="windows-1252" />

1102
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -17,6 +17,7 @@ public class InMemoryStateTest extends TestCase {
final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() {
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
@ -32,16 +33,16 @@ public class InMemoryStateTest extends TestCase {
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); // conf.getActiveObject(InMemStateful.class);
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class); // conf.getActiveObject(InMemStateful.class);
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class);
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");

View file

@ -0,0 +1,143 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.actor.*;
import se.scalablesolutions.akka.kernel.nio.NettyServer;
import junit.framework.TestCase;
public class RemoteInMemoryStateTest extends TestCase {
static String messageLog = "";
static {
new Thread(new Runnable() {
public void run() {
NettyServer server = new NettyServer();
server.connect();
}
}).start();
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory factory = new se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory();
protected void setUp() {
new se.scalablesolutions.akka.kernel.nio.NettyServer();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
// FIXME: remove string-name, add ctor to only accept target class
new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
//new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000)
}).inject().supervise();
}
protected void tearDown() {
conf.stop();
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getVectorState());
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class);
stateful.setVectorState("init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getVectorState()); // check that state is == init state
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
assertEquals("new state", stateful.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class);
stateful.setRefState("init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class); //conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
}
/*
public void testNestedNonTransactionalMethodHangs() {
InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = conf.getActiveObject(InMemFailer.class);
try {
stateful.thisMethodHangs("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
*/
// public void testShouldRollbackStateForStatefulServerInCaseOfMessageClash()
// {
// InMemStateful stateful = conf.getActiveObject(InMemStateful.class);
// stateful.setState("stateful", "init"); // set init state
//
// InMemClasher clasher = conf.getActiveObject(InMemClasher.class);
// clasher.setState("clasher", "init"); // set init state
//
// // try {
// // stateful.clashOk("stateful", "new state", clasher);
// // } catch (RuntimeException e) { } // expected
// // assertEquals("new state", stateful.getState("stateful")); // check that
// // state is == init state
// // assertEquals("was here", clasher.getState("clasher")); // check that
// // state is == init state
//
// try {
// stateful.clashNotOk("stateful", "new state", clasher);
// fail("should have thrown an exception");
// } catch (RuntimeException e) {
// System.out.println(e);
// } // expected
// assertEquals("init", stateful.getState("stateful")); // check that state is
// // == init state
// // assertEquals("init", clasher.getState("clasher")); // check that state
// is
// // == init state
// }
}

View file

@ -0,0 +1,105 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.*;
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.Kernel;
import se.scalablesolutions.akka.kernel.state.TransactionalMap;
import se.scalablesolutions.akka.kernel.state.CassandraPersistentTransactionalMap;
import se.scalablesolutions.akka.kernel.actor.*;
import se.scalablesolutions.akka.kernel.nio.NettyServer;
import junit.framework.TestCase;
public class RemotePersistentStateTest extends TestCase {
static String messageLog = "";
static {
System.setProperty("storage-config", "config");
Kernel.startCassandra();
new Thread(new Runnable() {
public void run() {
NettyServer server = new NettyServer();
server.connect();
}
}).start();
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() {
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
//new Component(PersistentClasher.class, new LifeCycle(new Permanent(), 1000), 100000)
}).supervise();
}
protected void tearDown() {
conf.stop();
}
public void testShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure")); // check that state is == init state
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("init", stateful.getVectorState(0));
assertEquals("new state", stateful.getVectorState(1));
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setVectorState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getVectorState(0)); // check that state is == init state
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("new state", stateful.getRefState());
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
PersistentStateful stateful = conf.getActiveObject(PersistentStateful.class);
stateful.setRefState("init"); // set init state
PersistentFailer failer = conf.getActiveObject(PersistentFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
} catch (RuntimeException e) {
} // expected
assertEquals("init", stateful.getRefState()); // check that state is == init state
}
}

View file

@ -133,7 +133,7 @@ object ActiveObject {
// FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts
@serializable sealed class SequentialTransactionalAroundAdvice(
target: Class[_], targetInstance: AnyRef, actor: Actor, val remote: Boolean) extends AroundAdvice {
target: Class[_], targetInstance: AnyRef, actor: Actor, val isRemote: Boolean) extends AroundAdvice {
private val changeSet = new ChangeSet(target.getName)
private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance)
@ -147,7 +147,7 @@ object ActiveObject {
dispatcher.start
import ActiveObject.threadBoundTx
private[this] var activeTx: Option[Transaction] = None
private[kernel] var activeTx: Option[Transaction] = None
// FIXME: switch to using PCD annotation matching, break out into its own aspect + switch to StaticJoinPoint
def invoke(joinpoint: JoinPoint): AnyRef = {
@ -164,9 +164,9 @@ object ActiveObject {
}
try {
incrementTransaction
if (remote) {
if (isRemote) {
val future = NettyClient.send(
new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, isOneWay, false))
new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, activeTx, isOneWay, false))
if (isOneWay) null // for void methods
else {
future.await_?
@ -233,8 +233,6 @@ object ActiveObject {
true
} else false
private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = {
try {
result()
@ -271,17 +269,6 @@ object ActiveObject {
else true
} else true
/*
private def sendOneWay(joinpoint: JoinPoint) =
mailbox.append(new MessageHandle(this, Invocation(joinpoint, activeTx), new NullFutureResult))
private def sendAndReceiveEventually(joinpoint: JoinPoint): ResultOrFailure[AnyRef] = {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(Invocation(joinpoint, activeTx), 1000) // FIXME configure
future.await_?
getResultOrThrowException(future)
}
*/
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future))
@ -292,15 +279,13 @@ object ActiveObject {
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw new TransactionAwareException(cause, activeTx)
} else future.result.asInstanceOf[Option[T]]
/*
if (future.exception.isDefined) {
var resultOrFailure = ResultOrFailure(activeTx)
val (toBlame, cause) = future.exception.get
resultOrFailure() = throw cause
resultOrFailure
} else ResultOrFailure(future.result.get, activeTx)
*/
} else {
if (future.result.isDefined) {
val (res, tx) = future.result.get.asInstanceOf[Tuple2[AnyRef, Option[Transaction]]]
Some(res).asInstanceOf[Option[T]]
} else None
}
/**
* Search for transactional items for a specific target instance, crawl the class hierarchy recursively up to the top.
*/
@ -387,11 +372,7 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor {
} catch {
case e =>
throw new TransactionAwareException(e, tx)
/*
val resultOrFailure = ResultOrFailure(tx)
resultOrFailure() = throw e
reply(resultOrFailure)
*/ }
}
case unexpected =>
throw new ActiveObjectException("Unexpected message [" + unexpected + "] sent to [" + this + "]")

View file

@ -8,6 +8,8 @@ import java.util.concurrent.{CopyOnWriteArraySet, TimeUnit}
import kernel.reactor._
import kernel.config.ScalaConfig._
import kernel.nio.{NettyClient, RemoteRequest}
import kernel.stm.Transaction
import kernel.util.Logging
import kernel.util.Helpers._
@ -32,6 +34,7 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler {
trait Actor extends Logging {
var timeout: Long = 5000L
var isRemote = false
@volatile private[this] var isRunning: Boolean = false
protected[this] var id: String = super.toString
@ -50,6 +53,9 @@ trait Actor extends Logging {
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
/**
* TODO: document
*/
protected var faultHandler: Option[FaultHandlingStrategy] = None
/**
@ -61,7 +67,7 @@ trait Actor extends Logging {
/**
* Set trapExit to true if actor should be able to trap linked actors exit messages.
*/
@volatile protected[this] var trapExit: Boolean = false
protected[this] var trapExit: Boolean = false
/**
* Partial function implementing the server logic.
@ -112,9 +118,10 @@ trait Actor extends Logging {
// ==== API ====
// =============
def !(message: AnyRef) =
if (isRunning) mailbox.append(new MessageHandle(this, message, new NullFutureResult))
else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def !(message: AnyRef) = if (isRunning) {
if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, true, false))
else mailbox.append(new MessageHandle(this, message, new NullFutureResult))
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) {
val future = postMessageToMailboxAndCreateFutureResultWithTimeout(message, timeout)
@ -130,8 +137,13 @@ trait Actor extends Logging {
getResultOrThrowException(future).get
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
// FIXME can be deadlock prone - HOWTO?
def link(actor: Actor) = synchronized { actor.synchronized {
protected[this] def reply(message: AnyRef) = senderFuture match {
case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." )
case Some(future) => future.completeWithResult(message)
}
// FIXME can be deadlock prone if cyclic linking? - HOWTO?
protected[this] def link(actor: Actor) = synchronized { actor.synchronized {
if (isRunning) {
linkedActors.add(actor)
if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
@ -140,8 +152,8 @@ trait Actor extends Logging {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}}
// FIXME can be deadlock prone - HOWTO?
def unlink(actor: Actor) = synchronized { actor.synchronized {
// FIXME can be deadlock prone if cyclic linking? - HOWTO?
protected[this] def unlink(actor: Actor) = synchronized { actor.synchronized {
if (isRunning) {
if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
linkedActors.remove(actor)
@ -150,14 +162,6 @@ trait Actor extends Logging {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}}
/**
* Atomically start and link actor.
*/
def spawnLink(actor: Actor) = actor.synchronized {
actor.start
link(actor)
}
def start = synchronized {
if (!isRunning) {
dispatcherType match {
@ -184,11 +188,39 @@ trait Actor extends Logging {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
protected def reply(message: AnyRef) = senderFuture match {
case None => throw new IllegalStateException("No sender future in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so switch to '!!' to wait using a future." )
case Some(future) => future.completeWithResult(message)
/**
* Atomically start and link actor.
*/
def spawnLink(actor: Actor) = actor.synchronized {
actor.start
link(actor)
}
/**
* Atomically start and link a remote actor.
*/
def spawnLinkRemote(actor: Actor) = actor.synchronized {
actor.makeRemote
actor.start
link(actor)
}
def spawn(actorClass: Class[_]) = {
// FIXME: should pass in dispatcher etc. - inherit
}
def spawnRemote(actorClass: Class[_]) = {
}
def spawnLink(actorClass: Class[_]) = {
}
def spawnLinkRemote(actorClass: Class[_]) = {
}
def makeRemote = isRemote = true
// ================================
// ==== IMPLEMENTATION DETAILS ====
// ================================
@ -205,18 +237,38 @@ trait Actor extends Logging {
}
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(
message: AnyRef, timeout: Long): CompletableFutureResult = {
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = {
if (isRemote) NettyClient.send(new RemoteRequest(true, message, null, this.getClass.getName, null, false, false))
else {
val future = new DefaultCompletableFutureResult(timeout)
mailbox.append(new MessageHandle(this, message, future))
future
}
}
private def getResultOrThrowException[T](future: FutureResult): Option[T] =
if (isRemote) getRemoteResultOrThrowException(future)
else {
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw cause
} else future.result.asInstanceOf[Option[T]]
} else {
future.result.asInstanceOf[Option[T]]
}
}
// FIXME: UGLY - Either: make the remote tx work
// FIXME: OR: remove this method along with the tx tuple making in NettyClient.messageReceived and ActiveObject.getResultOrThrowException
private def getRemoteResultOrThrowException[T](future: FutureResult): Option[T] =
if (future.exception.isDefined) {
val (_, cause) = future.exception.get
throw cause // throw new TransactionAwareException(cause, activeTx)
} else {
if (future.result.isDefined) {
val (res, tx) = future.result.get.asInstanceOf[Tuple2[Option[T], Option[Transaction]]]
res
} else None
}
private def base: PartialFunction[Any, Unit] = lifeCycle orElse (hotswap getOrElse receive)

View file

@ -42,6 +42,7 @@ package se.scalablesolutions.akka.collection
* @author Daniel Spiewak
* @author Rich Hickey
*/
@serializable
final class HashTrie[K, +V] private (root: Node[K, V]) extends Map[K, V] {
lazy val size = root.size
@ -73,6 +74,7 @@ object HashTrie {
// ============================================================================
// nodes
@serializable
private[collection] sealed trait Node[K, +V] {
val size: Int
@ -85,7 +87,7 @@ private[collection] sealed trait Node[K, +V] {
def elements: Iterator[(K, V)]
}
@serializable
private[collection] class EmptyNode[K] extends Node[K, Nothing] {
val size = 0

View file

@ -42,6 +42,7 @@ import Vector._
* @author Daniel Spiewak
* @author Rich Hickey
*/
@serializable
class Vector[+T] private (val length: Int, shift: Int, root: Array[AnyRef], tail: Array[AnyRef]) extends RandomAccessSeq[T] { outer =>
private val tailOff = length - tail.length

View file

@ -116,8 +116,9 @@ class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
val tx = reply.tx
val future = futures.get(reply.id)
if (reply.successful) future.completeWithResult(reply.message)
if (reply.successful) future.completeWithResult((reply.message, tx))
else future.completeWithException(null, reply.exception)
} else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result)
} catch {

View file

@ -7,7 +7,8 @@ package se.scalablesolutions.akka.kernel.nio
import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import kernel.actor.{ActiveObjectFactory, Dispatcher, ActiveObject, Invocation}
import kernel.actor._
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
import kernel.util.Logging
import java.util.ArrayList
import java.util.List
@ -15,6 +16,7 @@ import java.util.concurrent.atomic.AtomicLong
import java.util.logging.Level
import java.util.logging.Logger
import org.codehaus.aspectwerkz.intercept.Advisable
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
@ -22,10 +24,16 @@ import org.jboss.netty.handler.codec.serialization.ObjectDecoder
import org.jboss.netty.handler.codec.serialization.ObjectEncoder
class NettyServer extends Logging {
def connect = NettyServer.connect
}
object NettyServer extends Logging {
private val HOSTNAME = "localhost"
private val PORT = 9999
private val CONNECTION_TIMEOUT_MILLIS = 100
@volatile private var isRunning = false
private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool,
Executors.newCachedThreadPool)
@ -42,14 +50,23 @@ class NettyServer extends Logging {
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
def connect = synchronized {
if (!isRunning) {
log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT)
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
isRunning = true
}
}
}
@ChannelPipelineCoverage {val value = "all"}
class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
private val MESSAGE_HANDLE = classOf[Actor].getDeclaredMethod(
"handle", Array[Class[_]](classOf[AnyRef], classOf[CompletableFutureResult]))
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
@ -82,30 +99,8 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
try {
log.debug(request.toString)
if (request.isActor) {
log.debug("Dispatching to [receive :: %s]", request.target)
throw new UnsupportedOperationException("TODO: remote actors")
} else {
log.debug("Dispatching to [%s :: %s]", request.method, request.target)
val activeObject = createActiveObject(request.target)
val args = request.message.asInstanceOf[scala.List[AnyRef]]
val argClazzes = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes)
val method = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses)
try {
if (request.isOneWay) method.invoke(activeObject, unescapedArgs)
else {
val result = method.invoke(activeObject, unescapedArgs)
log.debug("Returning result [%s]", result)
channel.write(request.newReplyWithMessage(result))
}
} catch {
case e: InvocationTargetException =>
log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e.getCause)
e.getCause.printStackTrace
channel.write(request.newReplyWithException(e.getCause))
}
}
if (request.isActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
} catch {
case e: Exception =>
log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e)
@ -113,6 +108,59 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
}
private def dispatchToActor(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to actor [%s]", request.target)
val actor = createActor(request.target)
actor.start
if (request.isOneWay) actor ! request.message
else {
try {
val resultOrNone = actor !! request.message
val result = if (resultOrNone.isDefined) resultOrNone else null
log.debug("Returning result from actor invocation [%s]", result)
channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get))
} catch {
case e: InvocationTargetException =>
log.error("Could not invoke remote actor [%s] due to: %s", request.target, e.getCause)
e.getCause.printStackTrace
channel.write(request.newReplyWithException(e.getCause))
}
}
}
private def dispatchToActiveObject(request: RemoteRequest, channel: Channel) = {
log.debug("Dispatching to [%s :: %s]", request.method, request.target)
val activeObject = createActiveObject(request.target)
val args = request.message.asInstanceOf[scala.List[AnyRef]]
val argClazzes = args.map(_.getClass)
val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes)
continueTransaction(request)
try {
val messageReceiver = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses)
if (request.isOneWay) messageReceiver.invoke(activeObject, unescapedArgs)
else {
val result = messageReceiver.invoke(activeObject, unescapedArgs)
log.debug("Returning result from active object invocation [%s]", result)
channel.write(request.newReplyWithMessage(result, ActiveObject.threadBoundTx.get))
}
} catch {
case e: InvocationTargetException =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.method, request.target, e.getCause)
e.getCause.printStackTrace
channel.write(request.newReplyWithException(e.getCause))
}
}
private def continueTransaction(request: RemoteRequest) = {
val tx = request.tx
if (tx.isDefined) {
tx.get.reinit
ActiveObject.threadBoundTx.set(tx)
} else ActiveObject.threadBoundTx.set(None)
}
private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = {
val unescapedArgs = new Array[AnyRef](args.size)
val unescapedArgClasses = new Array[Class[_]](args.size)
@ -131,7 +179,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
(unescapedArgs, unescapedArgClasses)
}
private def createActiveObject(name: String) = {
private def createActiveObject(name: String): AnyRef = {
val activeObjectOrNull = activeObjects.get(name)
if (activeObjectOrNull == null) {
val clazz = Class.forName(name)
@ -149,4 +197,21 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
} else activeObjectOrNull
}
private def createActor(name: String): Actor = {
val actorOrNull = actors.get(name)
if (actorOrNull == null) {
val clazz = Class.forName(name)
try {
val newInstance = clazz.newInstance.asInstanceOf[Actor]
actors.put(name, newInstance)
newInstance
} catch {
case e =>
log.debug("Could not create remote actor instance due to: %s", e)
e.printStackTrace
throw e
}
} else actorOrNull
}
}

View file

@ -5,6 +5,7 @@
package se.scalablesolutions.akka.kernel.nio
import java.util.concurrent.atomic.AtomicLong
import kernel.stm.Transaction
import kernel.util.HashCode
object IdFactory {
@ -18,13 +19,15 @@ object IdFactory {
val message: AnyRef,
val method: String,
val target: String,
val tx: Option[Transaction],
val isOneWay: Boolean,
val isEscaped: Boolean) {
private[RemoteRequest] var _id = IdFactory.nextId
def id = _id
override def toString: String = synchronized {
"RemoteRequest[isActor: " + isActor + " | message: " + message + " | method: " + method + " | target: " + target + " | isOneWay: " + isOneWay + "]"
"RemoteRequest[isActor: " + isActor + " | message: " + message + " | method: " + method +
" | target: " + target + " | tx: " + tx + " | isOneWay: " + isOneWay + "]"
}
override def hashCode(): Int = synchronized {
@ -45,20 +48,25 @@ object IdFactory {
that.asInstanceOf[RemoteRequest].target == target
}
def newReplyWithMessage(message: AnyRef) = synchronized { new RemoteReply(true, id, message, null) }
def newReplyWithMessage(message: AnyRef, tx: Option[Transaction]) = synchronized { new RemoteReply(true, id, message, null, tx) }
def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error) }
def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error, None) }
def cloneWithNewMessage(message: AnyRef, isEscaped: Boolean) = synchronized {
val request = new RemoteRequest(isActor, message, method, target, isOneWay, isEscaped)
val request = new RemoteRequest(isActor, message, method, target, tx, isOneWay, isEscaped)
request._id = id
request
}
}
@serializable class RemoteReply(val successful: Boolean, val id: Long, val message: AnyRef, val exception: Throwable) {
@serializable class RemoteReply(val successful: Boolean,
val id: Long,
val message: AnyRef,
val exception: Throwable,
val tx: Option[Transaction]) {
override def toString: String = synchronized {
"RemoteReply[successful: " + successful + " | id: " + id + " | message: " + message + " | exception: " + exception + "]"
"RemoteReply[successful: " + successful + " | id: " + id + " | message: " +
message + " | exception: " + exception + " | tx: " + tx + "]"
}
override def hashCode(): Int = synchronized {

View file

@ -82,6 +82,7 @@ class TransactionalState {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable
trait Transactional {
val uuid = Uuid.newUuid.toString

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.kernel.stm
import kernel.state.{Transactional, TransactionalMap, TransactionalVector, TransactionalRef}
import kernel.util.Helpers.ReadWriteLock
@serializable
class ChangeSet(val id: String) {
private val lock = new ReadWriteLock

View file

@ -6,6 +6,7 @@ package se.scalablesolutions.akka.kernel.stm
import java.util.concurrent.atomic.{AtomicInteger, AtomicLong}
import kernel.util.Logging
import scala.collection.mutable.{HashSet, HashMap}
@serializable sealed abstract class TransactionStatus
@ -38,7 +39,7 @@ object TransactionIdFactory {
log.debug("Creating a new transaction with id [%s]", id)
// FIXME: add support for nested transactions
private[this] var parent: Option[Transaction] = None
//private[this] var parent: Option[Transaction] = None
private[this] val participants = new HashSet[ChangeSet]
private[this] val precommitted = new HashSet[ChangeSet]
private[this] val depth = new AtomicInteger(0)
@ -94,6 +95,8 @@ object TransactionIdFactory {
def join(changeSet: ChangeSet) = synchronized {
ensureIsActive
println(" --- log ;" + log)
println(" --- changeset ;" + changeSet)
log.debug("TX JOIN - Server [%s] is joining transaction [%s]" , changeSet.id, this)
changeSet.full.foreach(_.begin)
participants + changeSet
@ -113,6 +116,14 @@ object TransactionIdFactory {
private def ensureIsActiveOrNew = if (!(status == TransactionStatus.Active || status == TransactionStatus.New))
throw new IllegalStateException("Expected ACTIVE or NEW transaction - current status [" + status + "]")
private[kernel] def reinit = {
import net.lag.logging.{Logger, Level}
if (log == null) {
log = Logger.get(this.getClass.getName)
log.setLevel(Level.ALL)
}
}
override def equals(that: Any): Boolean = synchronized {
that != null &&
that.isInstanceOf[Transaction] &&

View file

@ -19,6 +19,7 @@ class SystemFailure(cause: Throwable) extends RuntimeException(cause)
object Helpers extends Logging {
// ================================================
@serializable
class ReadWriteLock {
private val rwl = new ReentrantReadWriteLock
private val readLock = rwl.readLock

View file

@ -20,12 +20,11 @@ import java.net.UnknownHostException;
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Logging {
@transient val log = {
@transient var log = {
val log = Logger.get(this.getClass.getName)
log.setLevel(Level.ALL)
log
}
}
/**

View file

@ -18,7 +18,6 @@ class ActorTest {
case "Failure" =>
throw new RuntimeException("expected")
}
def restart(config: Option[AnyRef]) = {}
}
@Test
@ -29,7 +28,6 @@ class ActorTest {
def receive: PartialFunction[Any, Unit] = {
case "OneWay" => oneWay = "received"
}
def restart(config: Option[AnyRef]) = {}
}
actor.start
val result = actor ! "OneWay"

View file

@ -0,0 +1,92 @@
package se.scalablesolutions.akka.kernel.actor
import concurrent.Lock
import java.util.concurrent.locks.ReentrantLock
import java.util.concurrent.TimeUnit
import kernel.nio.{NettyClient, NettyServer}
import reactor._
import org.junit.{Test, Before}
import org.junit.Assert._
object Global {
var oneWay = "nada"
}
class RemoteActorSpecActorUnidirectional extends Actor {
def receive: PartialFunction[Any, Unit] = {
case "OneWay" =>
Global.oneWay = "received"
}
}
class RemoteActorSpecActorBidirectional extends Actor {
def receive: PartialFunction[Any, Unit] = {
case "Hello" =>
reply("World")
case "Failure" =>
throw new RuntimeException("expected")
}
}
class RemoteActorSpec {
new Thread(new Runnable() {
def run = {
val server = new NettyServer
server.connect
}
}).start
NettyClient.connect
private val unit = TimeUnit.MILLISECONDS
@Test
def sendOneWay = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorUnidirectional
actor.makeRemote
actor.start
val result = actor ! "OneWay"
Thread.sleep(100)
assertEquals("received", Global.oneWay)
actor.stop
}
@Test
def sendReplySync = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote
actor.start
val result: String = actor !? "Hello"
assertEquals("World", result)
actor.stop
}
@Test
def sendReplyAsync = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote
actor.start
val result = actor !! "Hello"
assertEquals("World", result.get.asInstanceOf[String])
actor.stop
}
@Test
def sendReceiveException = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
actor.isRemote = true
actor.start
try {
actor !! "Failure"
fail("Should have thrown an exception")
} catch {
case e =>
assertEquals("expected", e.getMessage())
}
actor.stop
}
}