added configuration for remote active objects and services

This commit is contained in:
Jonas Boner 2009-07-02 13:23:03 +02:00
parent a4f1092659
commit 45bd6ebe5c
15 changed files with 1081 additions and 412 deletions

846
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -5,11 +5,11 @@
package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.annotation.*;
import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
import se.scalablesolutions.akka.annotation.*;
import se.scalablesolutions.akka.kernel.config.*;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
import se.scalablesolutions.akka.kernel.config.*;
import se.scalablesolutions.akka.kernel.reactor.*;
import se.scalablesolutions.akka.kernel.nio.RemoteServer;
import se.scalablesolutions.akka.kernel.state.TransactionalMap;
import se.scalablesolutions.akka.kernel.state.InMemoryTransactionalMap;
@ -19,27 +19,51 @@ import com.google.inject.Scopes;
import junit.framework.TestCase;
import java.util.concurrent.Executor;
import java.util.concurrent.ThreadPoolExecutor;
public class ActiveObjectGuiceConfiguratorTest extends TestCase {
static String messageLog = "";
static {
new Thread(new Runnable() {
public void run() {
RemoteServer server = new RemoteServer();
server.start();
}
}).start();
try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
protected void setUp() {
ThreadPoolBuilder builder = new ThreadPoolBuilder();
MessageDispatcher dispatcher = new EventBasedThreadPoolDispatcher(builder
.newThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(16)
.setMaxPoolSize(128)
.setKeepAliveTimeInMillis(60000)
.setRejectionPolicy(new ThreadPoolExecutor.CallerRunsPolicy())
.build());
conf.addExternalGuiceModule(new AbstractModule() {
protected void configure() {
bind(Ext.class).to(ExtImpl.class).in(Scopes.SINGLETON);
}
}).configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000), new Component[]{
new Component(
new Component(
Foo.class,
new LifeCycle(new Permanent(), 1000),
10000),
1000,
dispatcher),
//new RemoteAddress("localhost", 9999)),
new Component(
Bar.class,
BarImpl.class,
new LifeCycle(new Permanent(), 1000),
10000)
1000,
dispatcher)
}).inject().supervise();
}

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory;
import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.nio.RemoteServer;
import junit.framework.TestCase;
public class RemoteInMemoryStateTest extends TestCase {
@ -15,8 +16,8 @@ public class RemoteInMemoryStateTest extends TestCase {
static {
new Thread(new Runnable() {
public void run() {
se.scalablesolutions.akka.kernel.nio.RemoteServer server = new se.scalablesolutions.akka.kernel.nio.RemoteServer();
server.connect();
RemoteServer server = new RemoteServer();
server.start();
}
}).start();
try { Thread.currentThread().sleep(1000); } catch (Exception e) {}
@ -24,33 +25,21 @@ public class RemoteInMemoryStateTest extends TestCase {
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
final private ActiveObjectFactory factory = new ActiveObjectFactory();
protected void setUp() {
new se.scalablesolutions.akka.kernel.nio.RemoteServer();
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[]{
// FIXME: remove string-name, add ctor to only accept target class
new Component(InMemStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
new Component(InMemFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
//new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000)
}).inject().supervise();
}
protected void tearDown() {
conf.stop();
}
public void testMapShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000);
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
assertEquals("new state", stateful.getMapState("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess"));
}
public void testMapShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000);
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setMapState("testShouldRollbackStateForStatefulServerInCaseOfFailure", "init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000); //conf.getActiveObject(InMemFailer.class);
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
@ -60,7 +49,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000);
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setVectorState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
@ -68,9 +57,9 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testVectorShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000);
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setVectorState("init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000); //conf.getActiveObject(InMemFailer.class);
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");
@ -80,7 +69,7 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldNotRollbackStateForStatefulServerInCaseOfSuccess() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000);
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setRefState("init"); // set init state
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // transactional
stateful.success("testShouldNotRollbackStateForStatefulServerInCaseOfSuccess", "new state"); // to trigger commit
@ -88,9 +77,9 @@ public class RemoteInMemoryStateTest extends TestCase {
}
public void testRefShouldRollbackStateForStatefulServerInCaseOfFailure() {
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000);
InMemStateful stateful = factory.newRemoteInstance(InMemStateful.class, 1000, "localhost", 9999);
stateful.setRefState("init"); // set init state
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000); //conf.getActiveObject(InMemFailer.class);
InMemFailer failer = factory.newRemoteInstance(InMemFailer.class, 1000, "localhost", 9999); //conf.getActiveObject(InMemFailer.class);
try {
stateful.failure("testShouldRollbackStateForStatefulServerInCaseOfFailure", "new state", failer); // call failing transactional method
fail("should have thrown an exception");

View file

@ -7,6 +7,7 @@ package se.scalablesolutions.akka.api;
import se.scalablesolutions.akka.kernel.actor.ActiveObjectFactory;
import se.scalablesolutions.akka.kernel.config.ActiveObjectGuiceConfiguratorForJava;
import static se.scalablesolutions.akka.kernel.config.JavaConfig.*;
import se.scalablesolutions.akka.kernel.nio.RemoteServer;
import se.scalablesolutions.akka.kernel.Kernel;
import junit.framework.TestCase;
@ -17,12 +18,7 @@ public class RemotePersistentStateTest extends TestCase {
static {
System.setProperty("storage-config", "config");
Kernel.startCassandra();
new Thread(new Runnable() {
public void run() {
se.scalablesolutions.akka.kernel.nio.RemoteServer server = new se.scalablesolutions.akka.kernel.nio.RemoteServer();
server.connect();
}
}).start();
Kernel.startRemoteService();
}
final private ActiveObjectGuiceConfiguratorForJava conf = new ActiveObjectGuiceConfiguratorForJava();
@ -32,9 +28,8 @@ public class RemotePersistentStateTest extends TestCase {
conf.configureActiveObjects(
new RestartStrategy(new AllForOne(), 3, 5000),
new Component[] {
new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 10000000),
new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000)
//new Component(PersistentClasher.class, new LifeCycle(new Permanent(), 1000), 100000)
new Component(PersistentStateful.class, new LifeCycle(new Permanent(), 1000), 1000, new RemoteAddress("localhost", 9999)),
new Component(PersistentFailer.class, new LifeCycle(new Permanent(), 1000), 1000, new RemoteAddress("localhost", 9999))
}).supervise();
}

View file

@ -55,7 +55,7 @@ object Kernel extends Logging {
log.info("Starting Akka kernel...")
startRemoteService
startCassandra
cassandraBenchmark
//cassandraBenchmark
//val threadSelector = startJersey
// TODO: handle shutdown of Jersey in separate thread
@ -72,14 +72,12 @@ object Kernel extends Logging {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() {
def run = {
val server = new RemoteServer
server.connect
RemoteServer.start
}
})
remoteServerThread.start
Thread.sleep(1000) // wait for server to start up
RemoteClient.connect
}
private[akka] def startJersey: SelectorThread = {

View file

@ -4,28 +4,16 @@
package se.scalablesolutions.akka.kernel.actor
import java.util.{List => JList, ArrayList}
import java.lang.reflect.{Method, Field}
import java.lang.annotation.Annotation
import kernel.config.ActiveObjectGuiceConfigurator
import java.net.InetSocketAddress
import kernel.config.ScalaConfig._
import kernel.camel.{MessageDriven, ActiveObjectProducer}
import kernel.nio.{RemoteRequest, RemoteClient}
import kernel.stm.{TransactionManagement, TransactionAwareWrapperException, ChangeSet, Transaction}
import kernel.util.Helpers.ReadWriteLock
import kernel.util.{HashCode, ResultOrFailure}
import kernel.state.{Transactional, TransactionalMap, TransactionalRef, TransactionalVector}
import kernel.reactor.FutureResult
import kernel.util.HashCode
import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice}
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import org.apache.camel.{Processor, Exchange}
import scala.collection.mutable.HashMap
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
@ -43,13 +31,28 @@ object Annotations {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ActiveObjectFactory {
def newInstance[T](target: Class[T], timeout: Long): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), false, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), false, timeout)
// FIXME add versions with a MessageDispatcher -- How to pass the current on???????
def newRemoteInstance[T](target: Class[T], timeout: Long): T = ActiveObject.newInstance(target, new Dispatcher(target.getName), true, timeout)
// FIXME call backs to @prerestart @postrestart methods
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), true, timeout)
// FIXME dispatcher.newThreadPoolWith....build
// FIXME JMX enable configuration
// FIXME Configgy for config
def newInstance[T](target: Class[T], timeout: Long): T =
ActiveObject.newInstance(target, new Dispatcher(target.getName), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), None, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
ActiveObject.newInstance(target, new Dispatcher(target.getName), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
ActiveObject.newInstance(intf, target, new Dispatcher(intf.getName), Some(new InetSocketAddress(hostname, port)), timeout)
/*
def newInstanceAndLink[T](target: Class[T], supervisor: AnyRef): T = {
@ -62,16 +65,15 @@ class ActiveObjectFactory {
ActiveObject.newInstance(intf, target, actor)
}
*/
// ================================================
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean, timeout: Long): T = {
ActiveObject.newInstance(target, actor, remote, timeout)
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
ActiveObject.newInstance(target, actor, remoteAddress, timeout)
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean, timeout: Long): T = {
ActiveObject.newInstance(intf, target, actor, remote, timeout)
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
ActiveObject.newInstance(intf, target, actor, remoteAddress, timeout)
}
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Worker]): Supervisor =
ActiveObject.supervise(restartStrategy, components)
}
@ -85,30 +87,32 @@ object ActiveObject {
val MATCH_ALL = "execution(* *.*(..))"
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
def newInstance[T](target: Class[T], timeout: Long): T = newInstance(target, new Dispatcher(target.getName), false, timeout)
def newInstance[T](target: Class[T], timeout: Long): T =
newInstance(target, new Dispatcher(target.getName), None, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = newInstance(intf, target, new Dispatcher(intf.getName), false, timeout)
def newInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T =
newInstance(intf, target, new Dispatcher(intf.getName), None, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long): T = newInstance(target, new Dispatcher(target.getName), true, timeout)
def newRemoteInstance[T](target: Class[T], timeout: Long, hostname: String, port: Int): T =
newInstance(target, new Dispatcher(target.getName), Some(new InetSocketAddress(hostname, port)), timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long): T = newInstance(intf, target, new Dispatcher(intf.getName), true, timeout)
def newRemoteInstance[T](intf: Class[T], target: AnyRef, timeout: Long, hostname: String, port: Int): T =
newInstance(intf, target, new Dispatcher(intf.getName), Some(new InetSocketAddress(hostname, port)), timeout)
// ================================================
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remote: Boolean, timeout: Long): T = {
if (remote) RemoteClient.connect
private[kernel] def newInstance[T](target: Class[T], actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(target, false, true)
// FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice(
MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remote, timeout))
MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remoteAddress, timeout))
proxy.asInstanceOf[T]
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remote: Boolean, timeout: Long): T = {
if (remote) RemoteClient.connect
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Actor, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
proxy.asInstanceOf[Advisable].aw_addAdvice(
MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remote, timeout))
MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remoteAddress, timeout))
proxy.asInstanceOf[T]
}
@ -129,22 +133,16 @@ object ActiveObject {
sealed class ActorAroundAdvice(val target: Class[_],
val targetInstance: AnyRef,
val actor: Actor,
val isRemote: Boolean,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long) extends AroundAdvice {
val id = target.getName
actor.timeout = timeout
actor.start
import kernel.reactor._
// FIXME make configurable!!!!!! MUST
private[this] var dispatcher = new ProxyMessageDispatcher
private[this] var mailbox = dispatcher.messageQueue
dispatcher.start
def invoke(joinpoint: JoinPoint): AnyRef = dispatch(joinpoint)
private def dispatch(joinpoint: JoinPoint) = {
if (isRemote) remoteDispatch(joinpoint)
if (remoteAddress.isDefined) remoteDispatch(joinpoint)
else localDispatch(joinpoint)
}
@ -161,7 +159,7 @@ sealed class ActorAroundAdvice(val target: Class[_],
private def remoteDispatch(joinpoint: JoinPoint): AnyRef = {
val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti]
val oneWay = isOneWay(rtti)
val future = RemoteClient.send(
val future = RemoteClient.clientFor(remoteAddress.get).send(
new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName,
timeout, None, oneWay, false, actor.registerSupervisorAsRemoteActor))
if (oneWay) null // for void methods

View file

@ -4,12 +4,14 @@
package se.scalablesolutions.akka.kernel.actor
import java.net.InetSocketAddress
import java.util.concurrent.CopyOnWriteArraySet
import kernel.nio.{RemoteServer, RemoteClient, RemoteRequest}
import kernel.reactor._
import kernel.config.ScalaConfig._
import kernel.nio.{RemoteClient, RemoteRequest}
import kernel.stm.{TransactionAwareWrapperException, TransactionManagement, Transaction}
import kernel.stm.{TransactionAwareWrapperException, TransactionManagement}
import kernel.util.Helpers.ReadWriteLock
import kernel.util.Logging
import org.codehaus.aspectwerkz.proxy.Uuid
@ -35,18 +37,22 @@ class ActorMessageHandler(val actor: Actor) extends MessageHandler {
trait Actor extends Logging with TransactionManagement {
val id: String = this.getClass.toString
val uuid = Uuid.newUuid.toString
@volatile private[this] var isRunning: Boolean = false
protected[Actor] var mailbox: MessageQueue = _
protected[this] var senderFuture: Option[CompletableFutureResult] = None
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[kernel] var supervisor: Option[Actor] = None
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
private[this] val remoteFlagLock = new ReadWriteLock
private[this] val transactionalFlagLock = new ReadWriteLock
private var hotswap: Option[PartialFunction[Any, Unit]] = None
private var config: Option[AnyRef] = None
@volatile protected[this] var isTransactional = false
@volatile protected[this] var remoteAddress: Option[InetSocketAddress] = None
@volatile protected[kernel] var supervisor: Option[Actor] = None
protected[Actor] var mailbox: MessageQueue = _
protected[this] var senderFuture: Option[CompletableFutureResult] = None
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
@ -58,27 +64,6 @@ trait Actor extends Logging with TransactionManagement {
*/
@volatile var timeout: Long = 5000L
/**
* User overridable callback/setting.
*
* Setting 'isRemote' to true means that an actor will be moved to and invoked on a remote host.
* See also 'makeRemote'.
*/
@volatile private[this] var isRemote = false
/**
* User overridable callback/setting.
*
* Setting 'isTransactional' to true means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
* See also 'makeTransactional'.
*/
@volatile private[this] var isTransactional = false
/**
* User overridable callback/setting.
*
@ -98,7 +83,7 @@ trait Actor extends Logging with TransactionManagement {
* new EventBasedSingleThreadDispatcher
* </pre>
*/
protected[Actor] var dispatcher: MessageDispatcher = {
protected[kernel] var dispatcher: MessageDispatcher = {
val threadPool = ThreadPoolBuilder.newBuilder.newThreadPoolWithLinkedBlockingQueueWithCapacity(0).build
val dispatcher = new EventBasedThreadPoolDispatcher(threadPool)
mailbox = dispatcher.messageQueue
@ -206,7 +191,7 @@ trait Actor extends Logging with TransactionManagement {
}
/**
* TODO: document
* Sends a one-way asynchronous message. E.g. fire-and-forget semantics.
*/
def !(message: AnyRef): Unit = if (isRunning) {
if (TransactionManagement.isTransactionalityEnabled) transactionalDispatch(message, timeout, false, true)
@ -214,7 +199,13 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* TODO: document
* Sends a message asynchronously and waits on a future for a reply message.
* It waits on the reply either until it receives it (returns Some(replyMessage) or until the timeout expires (returns None).
* E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* If you are sending messages using '!!' then you *have to* use reply(..) sending a reply message to the original sender.
* If not then the sender will unessecary block until the timeout expires.
*/
def !![T](message: AnyRef, timeout: Long): Option[T] = if (isRunning) {
if (TransactionManagement.isTransactionalityEnabled) {
@ -227,12 +218,19 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* TODO: document
* Sends a message asynchronously and waits on a future for a reply message.
* It waits on the reply either until it receives it (returns Some(replyMessage) or until the actor default timeout expires (returns None).
* E.g. send-and-receive-eventually semantics.
* <p/>
* <b>NOTE:</b>
* If you are sending messages using '!!' then you *have to* use reply(..) sending a reply message to the original sender.
* If not then the sender will unessecary block until the timeout expires.
*/
def !![T](message: AnyRef): Option[T] = !![T](message, timeout)
/**
* TODO: document
* Sends a message asynchronously, but waits on a future indefinitely. E.g. emulates a synchronous call.
* E.g. send-and-receive-eventually semantics.
*/
def !?[T](message: AnyRef): T = if (isRunning) {
if (TransactionManagement.isTransactionalityEnabled) {
@ -245,21 +243,47 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
/**
* TODO: document
* Use reply(..) to reply with a message to the original sender of the message currently being processed.
*/
protected[this] def reply(message: AnyRef) = senderFuture match {
case None => throw new IllegalStateException("No sender in scope, can't reply. Have you used '!' (async, fire-and-forget)? If so, switch to '!!' which will return a future to wait on." )
case Some(future) => future.completeWithResult(message)
}
// FIXME can be deadlock prone if cyclic linking? - HOWTO?
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(hostname: String, port: Int): Unit = remoteFlagLock.withWriteLock {
makeRemote(new InetSocketAddress(hostname, port))
}
/**
* Invoking 'makeRemote' means that an actor will be moved to and invoked on a remote host.
*/
def makeRemote(address: InetSocketAddress): Unit = remoteFlagLock.withWriteLock {
remoteAddress = Some(address)
}
/**
* Invoking 'makeTransactional' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactional = synchronized {
if (isRunning) throw new IllegalArgumentException("Can not make actor transactional after it has been started")
else isTransactional = true
}
/**
* Links an other actor to this actor. Links are unidirectional and means that a the linking actor will receive a notification nif the linked actor has crashed.
* If the 'trapExit' flag has been set then it will 'trap' the failure and automatically restart the linked actors according to the restart strategy defined by the 'faultHandler'.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def link(actor: Actor) = synchronized {
protected[this] def link(actor: Actor) = {
if (isRunning) {
linkedActors.add(actor)
if (actor.supervisor.isDefined) throw new IllegalStateException("Actor can only have one supervisor [" + actor + "], e.g. link(actor) fails")
@ -268,13 +292,12 @@ trait Actor extends Logging with TransactionManagement {
} else throw new IllegalStateException("Actor has not been started, you need to invoke 'actor.start' before using it")
}
// FIXME can be deadlock prone if cyclic linking? - HOWTO?
/**
* Unlink the actor.
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def unlink(actor: Actor) = synchronized {
protected[this] def unlink(actor: Actor) = {
if (isRunning) {
if (!linkedActors.contains(actor)) throw new IllegalStateException("Actor [" + actor + "] is not a linked actor, can't unlink")
linkedActors.remove(actor)
@ -288,7 +311,7 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def startLink(actor: Actor) = synchronized {
protected[this] def startLink(actor: Actor) = {
actor.start
link(actor)
}
@ -298,8 +321,8 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def startLinkRemote(actor: Actor) = synchronized {
actor.makeRemote
protected[this] def startLinkRemote(actor: Actor) = {
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.start
link(actor)
}
@ -309,7 +332,7 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawn(actorClass: Class[_]): Actor = synchronized {
protected[this] def spawn(actorClass: Class[_]): Actor = {
val actor = actorClass.newInstance.asInstanceOf[Actor]
actor.dispatcher = dispatcher
actor.mailbox = mailbox
@ -322,9 +345,9 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnRemote(actorClass: Class[_]): Actor = synchronized {
protected[this] def spawnRemote(actorClass: Class[_]): Actor = {
val actor = actorClass.newInstance.asInstanceOf[Actor]
actor.makeRemote
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.dispatcher = dispatcher
actor.mailbox = mailbox
actor.start
@ -336,7 +359,7 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnLink(actorClass: Class[_]): Actor = synchronized {
protected[this] def spawnLink(actorClass: Class[_]): Actor = {
val actor = spawn(actorClass)
link(actor)
actor
@ -347,45 +370,28 @@ trait Actor extends Logging with TransactionManagement {
* <p/>
* To be invoked from within the actor itself.
*/
protected[this] def spawnLinkRemote(actorClass: Class[_]): Actor = synchronized {
protected[this] def spawnLinkRemote(actorClass: Class[_]): Actor = {
val actor = spawn(actorClass)
actor.makeRemote
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
link(actor)
actor
}
/**
* Invoking 'makeRemote' means that an actor will be moved to andinvoked on a remote host
*/
def makeRemote = isRemote = true
/**
* Invoking 'makeTransactional' means that the actor will **start** a new transaction if non exists.
* However, it will always participate in an existing transaction.
* If transactionality want to be completely turned off then do it by invoking:
* <pre/>
* TransactionManagement.disableTransactions
* </pre>
*/
def makeTransactional = synchronized {
if (isRunning) throw new IllegalArgumentException("Can not make actor transactional after it has been started")
else isTransactional = true
}
// ================================
// ==== IMPLEMENTATION DETAILS ====
// ================================
private def postMessageToMailbox(message: AnyRef): Unit =
if (isRemote) {
private def postMessageToMailbox(message: AnyRef): Unit = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (remoteAddress.isDefined) {
val supervisorUuid = registerSupervisorAsRemoteActor
RemoteClient.send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, true, false, supervisorUuid))
RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, true, false, supervisorUuid))
} else mailbox.append(new MessageHandle(this, message, None, activeTx))
}
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult =
if (isRemote) {
private def postMessageToMailboxAndCreateFutureResultWithTimeout(message: AnyRef, timeout: Long): CompletableFutureResult = remoteFlagLock.withReadLock { // the price you pay for being able to make an actor remote at runtime
if (remoteAddress.isDefined) {
val supervisorUuid = registerSupervisorAsRemoteActor
val future = RemoteClient.send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, false, false, supervisorUuid))
val future = RemoteClient.clientFor(remoteAddress.get).send(new RemoteRequest(true, message, null, this.getClass.getName, timeout, null, false, false, supervisorUuid))
if (future.isDefined) future.get
else throw new IllegalStateException("Expected a future from remote call to actor " + toString)
}
@ -394,6 +400,7 @@ trait Actor extends Logging with TransactionManagement {
mailbox.append(new MessageHandle(this, message, Some(future), TransactionManagement.threadBoundTx.get))
future
}
}
private def transactionalDispatch[T](message: AnyRef, timeout: Long, blocking: Boolean, oneWay: Boolean): Option[T] = {
tryToCommitTransaction
@ -432,6 +439,9 @@ trait Actor extends Logging with TransactionManagement {
future.result.asInstanceOf[Option[T]]
}
/**
* Callback for the dispatcher. E.g. single entry point to the user code and all protected[this] methods
*/
private[kernel] def handle(messageHandle: MessageHandle) = synchronized {
val message = messageHandle.message
val future = messageHandle.future
@ -461,14 +471,15 @@ trait Actor extends Logging with TransactionManagement {
case Exit(dead, reason) => handleTrapExit(dead, reason)
}
private[kernel] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
if (trapExit) {
if (faultHandler.isDefined) {
faultHandler.get match {
// FIXME: implement support for maxNrOfRetries and withinTimeRange in RestartStrategy
case AllForOneStrategy(maxNrOfRetries, withinTimeRange) => restartLinkedActors(reason)
case OneForOneStrategy(maxNrOfRetries, withinTimeRange) => dead.restart(reason)
}
} else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true " + toString)
} else throw new IllegalStateException("No 'faultHandler' defined for actor with the 'trapExit' flag set to true - can't proceed " + toString)
} else {
if (supervisor.isDefined) supervisor.get ! Exit(dead, reason) // if 'trapExit' is not defined then pass the Exit on
}
@ -480,6 +491,8 @@ trait Actor extends Logging with TransactionManagement {
private[Actor] def restart(reason: AnyRef) = synchronized {
lifeCycleConfig match {
case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.")
// FIXME implement support for shutdown time
case Some(LifeCycle(scope, shutdownTime)) => {
scope match {
case Permanent => {
@ -489,11 +502,12 @@ trait Actor extends Logging with TransactionManagement {
}
case Temporary =>
// FIXME handle temporary actors
// FIXME handle temporary actors correctly - restart if exited normally
// if (reason == 'normal) {
// log.debug("Restarting actor [%s] configured as TEMPORARY (since exited naturally).", id)
// scheduleRestart
// } else log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", id)
// } else
log.info("Server [%s] configured as TEMPORARY will not be restarted (received unnatural exit message).", id)
case Transient =>
log.info("Server [%s] configured as TRANSIENT will not be restarted.", id)
@ -502,9 +516,9 @@ trait Actor extends Logging with TransactionManagement {
}
}
private[kernel] def registerSupervisorAsRemoteActor: Option[String] = {
private[kernel] def registerSupervisorAsRemoteActor: Option[String] = synchronized {
if (supervisor.isDefined) {
RemoteClient.registerSupervisorForActor(this)
RemoteClient.clientFor(remoteAddress.get).registerSupervisorForActor(this)
Some(supervisor.get.uuid)
} else None
}

View file

@ -5,22 +5,18 @@
package se.scalablesolutions.akka.kernel.config
import com.google.inject._
import com.google.inject.jsr250.ResourceProviderFactory
import ScalaConfig._
import kernel.actor.{Actor, Supervisor, ActiveObjectFactory, Dispatcher}
import kernel.camel.ActiveObjectComponent
import kernel.actor.{Supervisor, ActiveObjectFactory, Dispatcher}
import kernel.util.Logging
import org.codehaus.aspectwerkz.intercept.Advisable
import org.apache.camel.impl.{JndiRegistry, DefaultCamelContext}
import org.apache.camel.impl.{DefaultCamelContext}
import org.apache.camel.{CamelContext, Endpoint, Routes}
import scala.collection.mutable.HashMap
import java.net.InetSocketAddress
import java.lang.reflect.Method
import javax.servlet.ServletContext
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -29,7 +25,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
val AKKA_CAMEL_ROUTING_SCHEME = "akka"
private var injector: Injector = _
private var supervisor: Supervisor = _
private var supervisor: Option[Supervisor] = None
private var restartStrategy: RestartStrategy = _
private var components: List[Component] = _
private var workers: List[Worker] = Nil
@ -99,7 +95,12 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
private def newSubclassingProxy(component: Component): DependencyBinding = {
val targetClass = component.target
val actor = new Dispatcher(targetClass.getName)
val proxy = activeObjectFactory.newInstance(targetClass, actor, false, component.timeout).asInstanceOf[AnyRef]
actor.start
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = activeObjectFactory.newInstance(targetClass, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, proxy, component))
new DependencyBinding(targetClass, proxy)
@ -110,7 +111,12 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
val targetInstance = component.target.newInstance.asInstanceOf[AnyRef] // TODO: perhaps need to put in registry
component.target.getConstructor(Array[Class[_]]()).setAccessible(true)
val actor = new Dispatcher(targetClass.getName)
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, false, component.timeout).asInstanceOf[AnyRef]
actor.start
if (component.dispatcher.isDefined) actor.dispatcher = component.dispatcher.get
val remoteAddress =
if (component.remoteAddress.isDefined) Some(new InetSocketAddress(component.remoteAddress.get.hostname, component.remoteAddress.get.port))
else None
val proxy = activeObjectFactory.newInstance(targetClass, targetInstance, actor, remoteAddress, component.timeout).asInstanceOf[AnyRef]
workers ::= Worker(actor, component.lifeCycle)
activeObjectRegistry.put(targetClass, (proxy, targetInstance, component))
new DependencyBinding(targetClass, proxy)
@ -124,10 +130,10 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
override def supervise: ActiveObjectConfigurator = synchronized {
if (injector == null) inject
supervisor = activeObjectFactory.supervise(restartStrategy, workers)
supervisor = Some(activeObjectFactory.supervise(restartStrategy, workers))
//camelContext.addComponent(AKKA_CAMEL_ROUTING_SCHEME, new ActiveObjectComponent(this))
//camelContext.start
supervisor.startSupervisor
supervisor.get.startSupervisor
ActiveObjectConfigurator.registerConfigurator(this)
this
}
@ -170,7 +176,7 @@ class ActiveObjectGuiceConfigurator extends ActiveObjectConfigurator with CamelC
def stop = synchronized {
camelContext.stop
supervisor.stop
if (supervisor.isDefined) supervisor.get.stop
}
def registerMethodForUri(method: Method, componentName: String) =

View file

@ -4,9 +4,11 @@
package se.scalablesolutions.akka.kernel.config
import kernel.actor.Actor
import reflect.BeanProperty
import kernel.actor.Actor
import kernel.reactor.MessageDispatcher
/**
* Configuration classes - not to be used as messages.
*
@ -32,17 +34,43 @@ object ScalaConfig {
case object Transient extends Scope
case object Temporary extends Scope
case class RemoteAddress(hostname: String, port: Int)
class Component(_intf: Class[_],
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Int) extends Server {
val target: Class[_],
val lifeCycle: LifeCycle,
val timeout: Int,
_dispatcher: MessageDispatcher, // optional
_remoteAddress: RemoteAddress // optional
) extends Server {
val intf: Option[Class[_]] = if (_intf == null) None else Some(_intf)
val dispatcher: Option[MessageDispatcher] = if (_dispatcher == null) None else Some(_dispatcher)
val remoteAddress: Option[RemoteAddress] = if (_remoteAddress == null) None else Some(_remoteAddress)
}
object Component {
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
new Component(intf, target, lifeCycle, timeout)
new Component(intf, target, lifeCycle, timeout, null, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
new Component(null, target, lifeCycle, timeout)
new Component(null, target, lifeCycle, timeout, null, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
new Component(intf, target, lifeCycle, timeout, dispatcher, null)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
new Component(null, target, lifeCycle, timeout, dispatcher, null)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
new Component(intf, target, lifeCycle, timeout, null, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
new Component(null, target, lifeCycle, timeout, null, remoteAddress)
def apply(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
new Component(intf, target, lifeCycle, timeout, dispatcher, remoteAddress)
def apply(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
new Component(null, target, lifeCycle, timeout, dispatcher, remoteAddress)
}
}
@ -86,15 +114,42 @@ object JavaConfig {
override def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.OneForOne
}
class RemoteAddress(@BeanProperty val hostname: String, @BeanProperty val port: Int)
abstract class Server extends ConfigElement
class Component(@BeanProperty val intf: Class[_],
@BeanProperty val target: Class[_],
@BeanProperty val lifeCycle: LifeCycle,
@BeanProperty val timeout: Int) extends Server {
@BeanProperty val timeout: Int,
@BeanProperty val dispatcher: MessageDispatcher, // optional
@BeanProperty val remoteAddress: RemoteAddress // optional
) extends Server {
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
this(intf, target, lifeCycle, timeout, null, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int) =
this(null, target, lifeCycle, timeout)
def transform = se.scalablesolutions.akka.kernel.config.ScalaConfig.Component(
intf, target, lifeCycle.transform, timeout)
this(null, target, lifeCycle, timeout, null, null)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
this(intf, target, lifeCycle, timeout, null, remoteAddress)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, remoteAddress: RemoteAddress) =
this(null, target, lifeCycle, timeout, null, remoteAddress)
def this(intf: Class[_], target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
this(intf, target, lifeCycle, timeout, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher) =
this(null, target, lifeCycle, timeout, dispatcher, null)
def this(target: Class[_], lifeCycle: LifeCycle, timeout: Int, dispatcher: MessageDispatcher, remoteAddress: RemoteAddress) =
this(null, target, lifeCycle, timeout, dispatcher, remoteAddress)
def transform =
se.scalablesolutions.akka.kernel.config.ScalaConfig.Component(intf, target, lifeCycle.transform, timeout, dispatcher,
if (remoteAddress != null) se.scalablesolutions.akka.kernel.config.ScalaConfig.RemoteAddress(remoteAddress.hostname, remoteAddress.port) else null)
def newWorker(actor: Actor) =
se.scalablesolutions.akka.kernel.config.ScalaConfig.Worker(actor, lifeCycle.transform)
}

View file

@ -16,12 +16,25 @@ import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory
import org.jboss.netty.handler.codec.serialization.{ObjectEncoder, ObjectDecoder}
import org.jboss.netty.bootstrap.ClientBootstrap
// FIXME need to be able support multiple remote clients
object RemoteClient extends Logging {
// FIXME make host options configurable
private val HOSTNAME = "localhost"
private val PORT = 9999
import scala.collection.mutable.HashMap
object RemoteClient extends Logging {
private val clients = new HashMap[String, RemoteClient]
def clientFor(address: InetSocketAddress): RemoteClient = synchronized {
val hostname = address.getHostName
val port = address.getPort
val hash = hostname + ":" + port
if (clients.contains(hash)) clients(hash)
else {
val client = new RemoteClient(hostname, port)
client.connect
clients + hash -> client
client
}
}
}
class RemoteClient(hostname: String, port: Int) extends Logging {
@volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val supervisors = new ConcurrentHashMap[String, Actor]
@ -42,13 +55,13 @@ object RemoteClient extends Logging {
def connect = synchronized {
if (!isRunning) {
connection = bootstrap.connect(new InetSocketAddress(HOSTNAME, PORT))
log.info("Starting remote client connection to [%s:%s]", HOSTNAME, PORT)
connection = bootstrap.connect(new InetSocketAddress(hostname, port))
log.info("Starting remote client connection to [%s:%s]", hostname, port)
// Wait until the connection attempt succeeds or fails.
connection.awaitUninterruptibly
if (!connection.isSuccess) {
log.error("Remote connection to [%s:%s] has failed due to [%s]", HOSTNAME, PORT, connection.getCause)
log.error("Remote connection to [%s:%s] has failed due to [%s]", hostname, port, connection.getCause)
connection.getCause.printStackTrace
}
isRunning = true

View file

@ -7,17 +7,11 @@ package se.scalablesolutions.akka.kernel.nio
import java.lang.reflect.InvocationTargetException
import java.net.InetSocketAddress
import java.util.concurrent.{ConcurrentHashMap, Executors}
import kernel.actor._
import kernel.reactor.{MessageHandle, DefaultCompletableFutureResult, CompletableFutureResult}
import kernel.stm.TransactionManagement
import kernel.util.Logging
import java.util.ArrayList
import java.util.List
import java.util.concurrent.atomic.AtomicLong
import java.util.logging.Level
import java.util.logging.Logger
import org.codehaus.aspectwerkz.intercept.Advisable
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
@ -25,14 +19,14 @@ import org.jboss.netty.handler.codec.serialization.ObjectDecoder
import org.jboss.netty.handler.codec.serialization.ObjectEncoder
class RemoteServer extends Logging {
def connect = RemoteServer.connect
def start = RemoteServer.start
}
object RemoteServer extends Logging {
// FIXME make all remote server option configurable
private val HOSTNAME = "localhost"
private val PORT = 9999
private val CONNECTION_TIMEOUT_MILLIS = 100
val HOSTNAME = "localhost"
val PORT = 9999
val CONNECTION_TIMEOUT_MILLIS = 100
@volatile private var isRunning = false
@ -52,7 +46,7 @@ object RemoteServer extends Logging {
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
def connect = synchronized {
def start = synchronized {
if (!isRunning) {
log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT)
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
@ -183,8 +177,7 @@ class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging {
if (activeObjectOrNull == null) {
val clazz = Class.forName(name)
try {
val actor = new Dispatcher(clazz.getName)
val newInstance = activeObjectFactory.newInstance(clazz, actor, false, timeout).asInstanceOf[AnyRef]
val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
} catch {

View file

@ -1,92 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
*
* Based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
*/
package se.scalablesolutions.akka.kernel.reactor
import kernel.actor.Invocation
class ProxyMessageDispatcher extends MessageDispatcherBase {
import java.util.concurrent.Executors
import java.util.HashSet
import org.codehaus.aspectwerkz.joinpoint.JoinPoint
// FIXME: make configurable using configgy + JMX
// FIXME: create one executor per invocation to dispatch(..), grab config settings for specific actor (set in registerHandler)
private val threadPoolSize: Int = 100
private val handlerExecutor = Executors.newCachedThreadPool()
def start = if (!active) {
active = true
val messageDemultiplexer = new ProxyDemultiplexer(messageQueue)
selectorThread = new Thread {
override def run = {
while (active) {
try {
guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf]
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val queue = messageDemultiplexer.acquireSelectedQueue
for (index <- 0 until queue.size) {
val handle = queue.remove
handlerExecutor.execute(new Runnable {
val invocation = handle.message.asInstanceOf[Invocation]
override def run = {
val future = handle.future
try {
val result = invocation.joinpoint.proceed
if (future.isDefined) future.get.completeWithResult(result)
} catch {
case e: Exception =>
if (future.isDefined) future.get.completeWithException(invocation.joinpoint.getTarget, e)
}
messageDemultiplexer.wakeUp
}
})
}
} finally {
messageDemultiplexer.releaseSelectedQueue
}
}
}
};
selectorThread.start
}
override protected def doShutdown = handlerExecutor.shutdownNow
}
class ProxyDemultiplexer(private val messageQueue: MessageQueue) extends MessageDemultiplexer {
import java.util.concurrent.locks.ReentrantLock
import java.util.{LinkedList, Queue}
private val selectedQueue: Queue[MessageHandle] = new LinkedList[MessageHandle]
private val selectedQueueLock = new ReentrantLock
def select = try {
selectedQueueLock.lock
messageQueue.read(selectedQueue)
} finally {
selectedQueueLock.unlock
}
def acquireSelectedQueue: Queue[MessageHandle] = {
selectedQueueLock.lock
selectedQueue
}
def releaseSelectedQueue = {
selectedQueue.clear
selectedQueueLock.unlock
}
def wakeUp = messageQueue.interrupt
}

View file

@ -186,10 +186,16 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
def submit[T](callable: Callable[T]) = executor.submit(callable)
def submit[T](runnable: Runnable, t: T) = executor.submit(runnable, t)
def submit(runnable: Runnable) = executor.submit(runnable)
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
/*
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
*/
}
/**

View file

@ -30,11 +30,10 @@ class RemoteActorSpec extends TestCase {
new Thread(new Runnable() {
def run = {
val server = new RemoteServer
server.connect
server.start
}
}).start
Thread.sleep(1000)
RemoteClient.connect
private val unit = TimeUnit.MILLISECONDS
@ -42,7 +41,7 @@ class RemoteActorSpec extends TestCase {
def testSendOneWay = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorUnidirectional
actor.makeRemote
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.start
val result = actor ! "OneWay"
Thread.sleep(100)
@ -54,7 +53,7 @@ class RemoteActorSpec extends TestCase {
def testSendReplySync = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.start
val result: String = actor !? "Hello"
assertEquals("World", result)
@ -65,7 +64,7 @@ class RemoteActorSpec extends TestCase {
def testSendReplyAsync = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.start
val result = actor !! "Hello"
assertEquals("World", result.get.asInstanceOf[String])
@ -76,7 +75,7 @@ class RemoteActorSpec extends TestCase {
def testSendReceiveException = {
implicit val timeout = 5000L
val actor = new RemoteActorSpecActorBidirectional
actor.makeRemote
actor.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
actor.start
try {
actor !! "Failure"

View file

@ -25,11 +25,10 @@ class RemoteSupervisorSpec extends Suite {
new Thread(new Runnable() {
def run = {
val server = new RemoteServer
server.connect
server.start
}
}).start
Thread.sleep(1000)
RemoteClient.connect
var pingpong1: RemotePingPong1Actor = _
var pingpong2: RemotePingPong2Actor = _
@ -466,7 +465,7 @@ class RemoteSupervisorSpec extends Suite {
// implementation of the Actors we want to use.
pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
@ -483,7 +482,7 @@ class RemoteSupervisorSpec extends Suite {
def getSingleActorOneForOneSupervisor: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
@ -500,11 +499,11 @@ class RemoteSupervisorSpec extends Suite {
def getMultipleActorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
pingpong2 = new RemotePingPong2Actor
pingpong2.makeRemote
pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
pingpong3 = new RemotePingPong3Actor
pingpong3.makeRemote
pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
@ -529,11 +528,11 @@ class RemoteSupervisorSpec extends Suite {
def getMultipleActorsOneForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
pingpong2 = new RemotePingPong2Actor
pingpong2.makeRemote
pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
pingpong3 = new RemotePingPong3Actor
pingpong3.makeRemote
pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
@ -558,11 +557,11 @@ class RemoteSupervisorSpec extends Suite {
def getNestedSupervisorsAllForOneConf: Supervisor = {
pingpong1 = new RemotePingPong1Actor
pingpong1.makeRemote
pingpong1.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
pingpong2 = new RemotePingPong2Actor
pingpong2.makeRemote
pingpong2.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
pingpong3 = new RemotePingPong3Actor
pingpong3.makeRemote
pingpong3.makeRemote(RemoteServer.HOSTNAME, RemoteServer.PORT)
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {