diff --git a/akka.ipr b/akka.ipr index 1cf265f7a2..c313f8bddf 100644 --- a/akka.ipr +++ b/akka.ipr @@ -747,17 +747,6 @@ - - - - - - - - - - - @@ -1187,6 +1176,17 @@ + + + + + + + + + + + diff --git a/akka.iws b/akka.iws index 5c5f96ec8a..916c141f70 100644 --- a/akka.iws +++ b/akka.iws @@ -1,14 +1,33 @@ + + + + + + + + + + - - - + + + + + + + + + + + + @@ -31,7 +50,7 @@ - + - + + + + + - @@ -1497,35 +1823,35 @@ - + - + - - + - - + + - - - - - + + + + + - - + + - - + + + - + @@ -1572,74 +1898,58 @@ - + - + - - - - - - - - + - + - + - + - + - + - - - - - - - - - - + - + - + - + - + - + - + @@ -1651,41 +1961,51 @@ - + - + - + - - - - + + + + + + + + + + + + + + + + + + + + + + + - + - + - - - - - - - - - - + diff --git a/fun-test-java/akka-funtest-java.iml b/fun-test-java/akka-funtest-java.iml index d8c959959d..f8ae03bd9d 100644 --- a/fun-test-java/akka-funtest-java.iml +++ b/fun-test-java/akka-funtest-java.iml @@ -4,7 +4,6 @@ - @@ -21,10 +20,6 @@ - - - - @@ -52,7 +47,7 @@ - + diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java index 1af3300300..2f55363956 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemFailer.java @@ -1,5 +1,7 @@ package se.scalablesolutions.akka.api; +import java.io.Serializable; + public class InMemFailer { public void fail() { throw new RuntimeException("expected"); diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java index b02ed15169..8bd9db4c76 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemStateful.java @@ -5,9 +5,10 @@ import se.scalablesolutions.akka.annotation.transactional; import se.scalablesolutions.akka.kernel.state.*; public class InMemStateful { - @state private TransactionalMap mapState = new InMemoryTransactionalMap(); - @state private TransactionalVector vectorState = new InMemoryTransactionalVector(); - @state private TransactionalRef refState = new TransactionalRef(); + private TransactionalState factory = new TransactionalState(); + private TransactionalMap mapState = factory.newMap(new InMemoryMapConfig()); + private TransactionalVector vectorState = factory.newVector(new InMemoryVectorConfig());; + private TransactionalRef refState = factory.newRef(new InMemoryRefConfig()); @transactional public String getMapState(String key) { @@ -47,11 +48,12 @@ public class InMemStateful { } @transactional - public void failure(String key, String msg, InMemFailer failer) { + public String failure(String key, String msg, InMemFailer failer) { mapState.put(key, msg); vectorState.add(msg); refState.swap(msg); failer.fail(); + return msg; } @transactional diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java index 0a0a921e21..0d2da0060d 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/InMemoryStateTest.java @@ -24,7 +24,7 @@ public class InMemoryStateTest extends TestCase { //new Component("inmem-clasher", InMemClasher.class, InMemClasherImpl.class, new LifeCycle(new Permanent(), 1000), 100000) }).inject().supervise(); } - + protected void tearDown() { conf.stop(); } @@ -87,7 +87,6 @@ public class InMemoryStateTest extends TestCase { } // expected assertEquals("init", stateful.getRefState()); // check that state is == init state } - /* public void testNestedNonTransactionalMethodHangs() { InMemStateful stateful = conf.getActiveObject(InMemStateful.class); diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NettyClient.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NettyClient.java new file mode 100644 index 0000000000..15f6a9d113 --- /dev/null +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NettyClient.java @@ -0,0 +1,7 @@ +package se.scalablesolutions.akka.api; + +public class NettyClient { + public static void main(String[] args) { + new se.scalablesolutions.akka.kernel.nio.NettyClient(); + } +} diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NettyServer.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NettyServer.java new file mode 100644 index 0000000000..793d53af74 --- /dev/null +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NettyServer.java @@ -0,0 +1,7 @@ +package se.scalablesolutions.akka.api; + +public class NettyServer { + public static void main(String[] args) { + new se.scalablesolutions.akka.kernel.nio.NettyServer(); + } +} diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java new file mode 100644 index 0000000000..ae7bf0ebf2 --- /dev/null +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/NioTest.java @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.api; + +import org.junit.*; +import static org.junit.Assert.*; + +import junit.framework.TestSuite; +import se.scalablesolutions.akka.kernel.nio.ProxyServer; + +public class NioTest extends TestSuite { + + @BeforeClass + public static void initialize() { + } + + @AfterClass + public static void cleanup() { + } + + @Test + public void simpleRequestReply() { + ProxyServer server = new ProxyServer(); + server.start(); + + + } + +} \ No newline at end of file diff --git a/kernel/akka-kernel.iml b/kernel/akka-kernel.iml index 8e3e83eaa6..b67e97ac49 100644 --- a/kernel/akka-kernel.iml +++ b/kernel/akka-kernel.iml @@ -116,7 +116,7 @@ - + diff --git a/kernel/pom.xml b/kernel/pom.xml old mode 100755 new mode 100644 index 6dfd9e8ff7..17c49efd7f --- a/kernel/pom.xml +++ b/kernel/pom.xml @@ -58,7 +58,7 @@ org.jboss.netty netty - 3.1.0.BETA2 + 3.1.0.CR1 diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala index f07d9fc61d..92b31568ac 100644 --- a/kernel/src/main/scala/actor/ActiveObject.scala +++ b/kernel/src/main/scala/actor/ActiveObject.scala @@ -11,10 +11,11 @@ import java.lang.annotation.Annotation import kernel.config.ActiveObjectGuiceConfigurator import kernel.config.ScalaConfig._ import kernel.camel.{MessageDriven, ActiveObjectProducer} +import kernel.nio.{RemoteRequest, NettyClient} +import kernel.stm.{ChangeSet, Transaction} import kernel.util.Helpers.ReadWriteLock import kernel.util.{HashCode, ResultOrFailure} import kernel.state.{Transactional, TransactionalMap, TransactionalRef, TransactionalVector} -import kernel.stm.Transaction import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice} import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint} @@ -61,6 +62,8 @@ class ActiveObjectFactory { object ActiveObject { val AKKA_CAMEL_ROUTING_SCHEME = "akka" + val RemoteClient = new NettyClient + private[kernel] val threadBoundTx: ThreadLocal[Option[Transaction]] = { val tl = new ThreadLocal[Option[Transaction]] tl.set(None) @@ -95,7 +98,7 @@ object ActiveObject { */ // FIXME: STM that allows concurrent updates, detects collision, rolls back and restarts -sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, actor: Actor) extends AroundAdvice { +@serializable sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstance: AnyRef, actor: Actor) extends AroundAdvice { private val changeSet = new ChangeSet(target.getName) private val (maps, vectors, refs) = getTransactionalItemsFor(targetInstance) @@ -116,31 +119,60 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc val rtti = joinpoint.getRtti.asInstanceOf[MethodRtti] val method = rtti.getMethod - tryToCommitTransaction - - if (isInExistingTransaction) { - joinExistingTransaction - } else { - if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction - } - - val result: AnyRef = try { - incrementTransaction -// if (rtti.getMethod.isAnnotationPresent(Annotations.oneway)) sendOneWay(joinpoint) // FIXME put in 2 different aspects -// else handleResult(sendAndReceiveEventually(joinpoint)) - val result = actor !! Invocation(joinpoint, activeTx) - val resultOrFailure = - if (result.isDefined) result.get.asInstanceOf[ResultOrFailure[AnyRef]] - else throw new ActiveObjectInvocationTimeoutException("TIMED OUT") - handleResult(resultOrFailure) - } finally { - decrementTransaction - if (isTransactionAborted) removeTransactionIfTopLevel - else tryToPrecommitTransaction - } + val remoteCall = true + val isOneWay = rtti.getMethod.getReturnType == java.lang.Void.TYPE + val result: AnyRef = + if (remoteCall) { + // FIXME: Make part of TX semantics?? + val future = ActiveObject.RemoteClient.send(new RemoteRequest(false, rtti.getParameterValues, rtti.getMethod.getName, target.getName, isOneWay)) + if (isOneWay) null // for void methods + else { + future.await_? + val resultOrFailure = getResultOrThrowException(future) + handleResult(resultOrFailure) + } + + } else { + // FIXME join TX with same id, do not COMMIT + tryToCommitTransaction + if (isInExistingTransaction) { + joinExistingTransaction + } else { + if (method.isAnnotationPresent(Annotations.transactional)) startNewTransaction + } + try { + incrementTransaction + if (isOneWay) actor ! Invocation(joinpoint, activeTx) + else { + val result = actor !! Invocation(joinpoint, activeTx) + val resultOrFailure = + if (result.isDefined) result.get.asInstanceOf[ResultOrFailure[AnyRef]] + else throw new ActiveObjectInvocationTimeoutException("TIMED OUT") + handleResult(resultOrFailure) + } + } finally { + decrementTransaction + if (isTransactionAborted) removeTransactionIfTopLevel + else tryToPrecommitTransaction + } + } result } + // TODO: create a method setCallee/setCaller to the joinpoint interface and compiler + private def nullOutTransientFieldsInJoinpoint(joinpoint: JoinPoint) = { + val clazz = joinpoint.getClass + val callee = clazz.getDeclaredField("CALLEE") + callee.setAccessible(true) + callee.set(joinpoint, null) + val caller = clazz.getDeclaredField("CALLER") + caller.setAccessible(true) + caller.set(joinpoint, null) + val interceptors = clazz.getDeclaredField("AROUND_INTERCEPTORS") + interceptors.setAccessible(true) + interceptors.set(joinpoint, null) + } + private def startNewTransaction = { val newTx = new Transaction newTx.begin(changeSet) @@ -167,7 +199,9 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc true } else false - private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { + + + private def handleResult(result: ResultOrFailure[AnyRef]): AnyRef = { try { result() } catch { @@ -265,41 +299,6 @@ sealed class SequentialTransactionalAroundAdvice(target: Class[_], targetInstanc } } -class ChangeSet(val id: String) { - private val lock = new ReadWriteLock - - private[kernel] def full: List[Transactional] = lock.withReadLock { - _maps ::: _vectors ::: _refs - } - - // TX Maps - private[kernel] var _maps: List[TransactionalMap[_, _]] = Nil - private[kernel] def maps_=(maps: List[TransactionalMap[_, _]]) = lock.withWriteLock { - _maps = maps - } - private[kernel] def maps: List[TransactionalMap[_, _]] = lock.withReadLock { - _maps - } - - // TX Vectors - private[kernel] var _vectors: List[TransactionalVector[_]] = Nil - private[kernel] def vectors_=(vectors: List[TransactionalVector[_]]) = lock.withWriteLock { - _vectors = vectors - } - private[kernel] def vectors: List[TransactionalVector[_]] = lock.withReadLock { - _vectors - } - - // TX Refs - private[kernel] var _refs: List[TransactionalRef[_]] = Nil - private[kernel] def refs_=(refs: List[TransactionalRef[_]]) = lock.withWriteLock { - _refs = refs - } - private[kernel] def refs: List[TransactionalRef[_]] = lock.withReadLock { - _refs - } -} - /** * Represents a snapshot of the current invocation. * @@ -333,6 +332,11 @@ class ChangeSet(val id: String) { */ private[kernel] class Dispatcher(val targetName: String) extends Actor { id = targetName + + // FIXME implement the pre/post restart methods and call annotated methods on the POJO + + // FIXME create new POJO on creation and swap POJO at restart - joinpoint.setTarget(new POJO) + override def receive: PartialFunction[Any, Unit] = { case Invocation(joinpoint: JoinPoint, tx: Option[Transaction]) => @@ -353,7 +357,7 @@ private[kernel] class Dispatcher(val targetName: String) extends Actor { /* ublic class CamelInvocationHandler implements InvocationHandler { - private final Endpoint endpoint; + private final Endpoint endpoint; private final Producer producer; private final MethodInfoCache methodInfoCache; diff --git a/kernel/src/main/scala/nio/NettyClient.scala b/kernel/src/main/scala/nio/NettyClient.scala new file mode 100644 index 0000000000..fa3fdc8ec2 --- /dev/null +++ b/kernel/src/main/scala/nio/NettyClient.scala @@ -0,0 +1,129 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.nio + +import java.net.InetSocketAddress +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.{Executors, ConcurrentMap, ConcurrentHashMap} + +import kernel.reactor.{NullFutureResult, DefaultCompletableFutureResult, CompletableFutureResult} +import kernel.util.{HashCode, Logging}; + +import org.jboss.netty.handler.codec.serialization.{ObjectEncoder, ObjectDecoder} +import org.jboss.netty.bootstrap.ClientBootstrap +import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory +import org.jboss.netty.channel._ + +class NettyClient extends Logging { + private val HOSTNAME = "localhost" + private val PORT = 9999 + + private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] + + private val channelFactory = new NioClientSocketChannelFactory( + Executors.newCachedThreadPool, + Executors.newCachedThreadPool) + + private val bootstrap = new ClientBootstrap(channelFactory) + private val handler = new ObjectClientHandler(futures) + + bootstrap.getPipeline.addLast("handler", handler) + bootstrap.setOption("tcpNoDelay", true) + bootstrap.setOption("keepAlive", true) + + private val connection = bootstrap.connect(new InetSocketAddress(HOSTNAME, PORT)) + log.info("Starting NIO client at [%s:%s]", HOSTNAME, PORT) + + // Wait until the connection attempt succeeds or fails. + connection.awaitUninterruptibly + if (!connection.isSuccess) { + log.error("Connection has failed due to [%s]", connection.getCause) + connection.getCause.printStackTrace + } + + def shutdown = { + connection.getChannel.getCloseFuture.awaitUninterruptibly + channelFactory.releaseExternalResources + } + + def send(request: RemoteRequest): CompletableFutureResult = { + val escapedRequest = escapeRequest(request) + if (escapedRequest.isOneWay) { + connection.getChannel.write(escapedRequest) + new NullFutureResult + } else { + futures.synchronized { + val futureResult = new DefaultCompletableFutureResult(100000) + futures.put(request.id, futureResult) + connection.getChannel.write(escapedRequest) + futureResult + } + } + } + + private def escapeRequest(request: RemoteRequest) = { + if (request.message.isInstanceOf[Array[Object]]) { + val args = request.message.asInstanceOf[Array[Object]].toList.asInstanceOf[scala.List[Object]] + val escapedArgs = for (arg <- args) yield { + val clazz = arg.getClass + if (clazz.getName.contains("$$ProxiedByAW")) { + new ProxyWrapper(clazz.getSuperclass.getName) + } else arg + } + request.cloneWithNewMessage(escapedArgs) + } else request + } +} + +@ChannelPipelineCoverage { val value = "all" } +class ObjectClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult]) extends SimpleChannelUpstreamHandler with Logging { + + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { + if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + log.debug(event.toString) + } + super.handleUpstream(ctx, event) + } + + override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + // Add encoder and decoder as soon as a new channel is created so that + // a Java object is serialized and deserialized. + event.getChannel.getPipeline.addFirst("encoder", new ObjectEncoder) + event.getChannel.getPipeline.addFirst("decoder", new ObjectDecoder) + } + + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) { + // Send the first message if this handler is a client-side handler. + // if (!firstMessage.isEmpty) e.getChannel.write(firstMessage) + } + + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) { + try { + val result = event.getMessage + if (result.isInstanceOf[RemoteReply]) { + val reply = result.asInstanceOf[RemoteReply] + val future = futures.get(reply.id) + if (reply.successful) { + future.completeWithResult(reply.message) + } else { + future.completeWithException(null, reply.exception) + } + } else throw new IllegalArgumentException("Unknown message received in NIO client handler: " + result) + } catch { + case e: Exception => log.error("Unexpected exception in NIO client handler: %s", e); throw e + } + } + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) { + log.error("Unexpected exception from downstream: %s", event.getCause) + event.getChannel.close + } +} + +object NettyClientRunner { + def main(args: Array[String]) = { + new NettyClient + } +} \ No newline at end of file diff --git a/kernel/src/main/scala/nio/NettyServer.scala b/kernel/src/main/scala/nio/NettyServer.scala new file mode 100644 index 0000000000..c02b74cd93 --- /dev/null +++ b/kernel/src/main/scala/nio/NettyServer.scala @@ -0,0 +1,156 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.nio + +import java.lang.reflect.InvocationTargetException +import java.net.InetSocketAddress +import java.util.concurrent.{ConcurrentHashMap, Executors} +import kernel.actor.{ActiveObjectFactory, Dispatcher, ActiveObject, Invocation} +import kernel.util.Logging +import java.util.ArrayList +import java.util.List +import java.util.concurrent.atomic.AtomicLong +import java.util.logging.Level +import java.util.logging.Logger + +import org.jboss.netty.bootstrap.ServerBootstrap +import org.jboss.netty.channel._ +import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory +import org.jboss.netty.handler.codec.serialization.ObjectDecoder +import org.jboss.netty.handler.codec.serialization.ObjectEncoder + +class NettyServer extends Logging { + val HOSTNAME = "localhost" + val PORT = 9999 + val CONNECTION_TIMEOUT_MILLIS = 100 + + log.info("Starting NIO server at [%s:%s]", HOSTNAME, PORT) + + val factory = new NioServerSocketChannelFactory( + Executors.newCachedThreadPool, + Executors.newCachedThreadPool) + + val bootstrap = new ServerBootstrap(factory) + + // FIXME provide different codecs (Thrift, Avro, Protobuf, JSON) + val handler = new ObjectServerHandler + + bootstrap.getPipeline.addLast("handler", handler) + + bootstrap.setOption("child.tcpNoDelay", true) + bootstrap.setOption("child.keepAlive", true) + bootstrap.setOption("child.reuseAddress", true) + bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS) + + bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT)) +} + +@ChannelPipelineCoverage {val value = "all"} +class ObjectServerHandler extends SimpleChannelUpstreamHandler with Logging { + private val activeObjectFactory = new ActiveObjectFactory + private val activeObjects = new ConcurrentHashMap[String, AnyRef] + + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { + if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { + log.debug(event.toString) + } + super.handleUpstream(ctx, event) + } + + override def channelOpen(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + event.getChannel.getPipeline.addFirst("encoder", new ObjectEncoder) + event.getChannel.getPipeline.addFirst("decoder", new ObjectDecoder) + } + + override def channelConnected(ctx: ChannelHandlerContext, event: ChannelStateEvent) = { + // Send the first message if this handler is a client-side handler. + // if (!firstMessage.isEmpty) e.getChannel.write(firstMessage) + } + + override def messageReceived(ctx: ChannelHandlerContext, event: MessageEvent) ={ + val message = event.getMessage + if (message == null) throw new IllegalStateException("Message in MessageEvent is null: " + event) + if (message.isInstanceOf[RemoteRequest]) handleRemoteRequest(message.asInstanceOf[RemoteRequest], event.getChannel) + } + + override def exceptionCaught(ctx: ChannelHandlerContext, event: ExceptionEvent) = { + event.getCause.printStackTrace + log.error("Unexpected exception from downstream: %s", event.getCause) + event.getChannel.close + } + + private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { + try { + log.debug(request.toString) + if (request.isActor) { + log.debug("Dispatching to [receive :: %s]", request.target) + throw new UnsupportedOperationException("TODO: remote actors") + } else { + log.debug("Dispatching to [%s :: %s]", request.method, request.target) + val activeObject = createActiveObject(request.target) +// val args = request.message.asInstanceOf[scala.List[Object]] +// val argClassesList = args.map(_.getClass) +// val argClasses = argClassesList.map(_.getClass).toArray +// val method = activeObject.getClass.getDeclaredMethod(request.method, argClasses) + + val args = request.message.asInstanceOf[scala.List[AnyRef]] + val argClazzes = args.map(_.getClass)//.toArray.asInstanceOf[Array[Class[_]]] + val (unescapedArgs, unescapedArgClasses) = unescapeArgs(args, argClazzes) + val method = activeObject.getClass.getDeclaredMethod(request.method, unescapedArgClasses) + try { + if (request.isOneWay) method.invoke(activeObject, unescapedArgs) + else { + val result = method.invoke(activeObject, unescapedArgs) + log.debug("Returning result [%s]", result) + channel.write(request.newReplyWithMessage(result)) + } + } catch { + case e: InvocationTargetException => + log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e.getCause) + e.getCause.printStackTrace + channel.write(request.newReplyWithException(e.getCause)) + } + } + } catch { + case e: Exception => + log.error("Could not invoke remote active object or actor [%s :: %s] due to: %s", request.method, request.target, e) + e.printStackTrace + } + } + + private def unescapeArgs(args: scala.List[AnyRef], argClasses: scala.List[Class[_]]) = { + val unescapedArgs = new Array[AnyRef](args.size) + val unescapedArgClasses = new Array[Class[_]](args.size) + + val escapedArgs = for (i <- 0 until args.size) { + if (args(i).isInstanceOf[ProxyWrapper]) { + val proxyName = args(i).asInstanceOf[ProxyWrapper].proxyName + val activeObject = createActiveObject(proxyName) + unescapedArgs(i) = activeObject + unescapedArgClasses(i) = Class.forName(proxyName) + } else { + unescapedArgs(i) = args(i) + unescapedArgClasses(i) = argClasses(i) + } + } + (unescapedArgs, unescapedArgClasses) + } + + private def createActiveObject(name: String) = { + val activeObjectOrNull = activeObjects.get(name) + if (activeObjectOrNull == null) { + val clazz = Class.forName(name) + val newInstance = clazz.newInstance.asInstanceOf[AnyRef] // activeObjectFactory.newInstance(clazz, new Dispatcher(invocation.target)).asInstanceOf[AnyRef] + activeObjects.put(name, newInstance) + newInstance + } else activeObjectOrNull + } +} + +object NettyServerRunner { + def main(args: Array[String]) = { + new NettyServer + } +} diff --git a/kernel/src/main/scala/nio/ProxyServer.scala b/kernel/src/main/scala/nio/ProxyServer.scala index e69de29bb2..934f1cf37c 100644 --- a/kernel/src/main/scala/nio/ProxyServer.scala +++ b/kernel/src/main/scala/nio/ProxyServer.scala @@ -0,0 +1,97 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.nio + +import java.io.PrintWriter +import java.net.{Socket, InetAddress, InetSocketAddress} + +import java.util.concurrent.Executors +import java.util.{HashSet, Date} +import java.nio.channels.{Selector, ServerSocketChannel, SelectionKey} +import java.nio.channels.spi.SelectorProvider +import kernel.actor.Invocation +import kernel.reactor.{MessageQueue, MessageDemultiplexer, MessageHandle, MessageDispatcherBase} + +class ProxyServer extends MessageDispatcherBase { + val port = 9999 + val host = InetAddress.getLocalHost + + // Selector for incoming time requests + val acceptSelector = SelectorProvider.provider.openSelector + + // Create a new server socket and set to non blocking mode + val ssc = ServerSocketChannel.open + ssc.configureBlocking(true) + + // Bind the server socket to the local host and port + val address = new InetSocketAddress(host, port) + ssc.socket.bind(address) + + // Register accepts on the server socket with the selector. This + // step tells the selector that the socket wants to be put on the + // ready list when accept operations occur, so allowing multiplexed + // non-blocking I/O to take place. + val acceptKey = ssc.register(acceptSelector, SelectionKey.OP_ACCEPT) + + // FIXME: make configurable using configgy + JMX + // FIXME: create one executor per invocation to dispatch(..), grab config settings for specific actor (set in registerHandler) + private val threadPoolSize: Int = 100 + private val handlerExecutor = Executors.newCachedThreadPool() + + def start = if (!active) { + active = true + selectorThread = new Thread { + override def run = { + while (active) { + try { + guard.synchronized { /* empty */ } // prevents risk for deadlock as described in [http://developers.sun.com/learning/javaoneonline/2006/coreplatform/TS-1315.pdf] + + val keysAdded = acceptSelector.select + val readyKeys = acceptSelector.selectedKeys + val iter = readyKeys.iterator + while (iter.hasNext) { + val key = iter.next.asInstanceOf[SelectionKey] + iter.remove +/* + if (key.isValid && key.isReadable) { + eventHandler.onReadableEvent(key.channel) + } + if (key.isValid && key.isWritable) { + key.interestOps(SelectionKey.OP_READ) // reset to read only + eventHandler.onWriteableEvent(key.channel) + } +*/ + val channel = key.channel.asInstanceOf[ServerSocketChannel] + val socket = channel.accept.socket + socket.setKeepAlive(true) + + + val in = socket.getInputStream + val out = new PrintWriter(socket.getOutputStream, true) + out.println(new Date) + out.close + + /* handlerExecutor.execute(new Runnable { + override def run = { + try { + val result = handle.message.asInstanceOf[Invocation].joinpoint.proceed + handle.future.completeWithResult(result) + } catch { + case e: Exception => handle.future.completeWithException(e) + } + } + }) + */ + } + } finally { + } + } + } + } + selectorThread.start + } + + override protected def doShutdown = handlerExecutor.shutdownNow +} diff --git a/kernel/src/main/scala/nio/RequestReply.scala b/kernel/src/main/scala/nio/RequestReply.scala new file mode 100644 index 0000000000..6e0f5cab53 --- /dev/null +++ b/kernel/src/main/scala/nio/RequestReply.scala @@ -0,0 +1,76 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.nio + +import java.util.concurrent.atomic.AtomicLong +import kernel.util.HashCode + +object IdFactory { + private val id = new AtomicLong + def nextId = id.getAndIncrement +} + +@serializable class ProxyWrapper(val proxyName: String) + +@serializable class RemoteRequest(val isActor: Boolean, val message: AnyRef, val method: String, val target: String, val isOneWay: Boolean) { + private[RemoteRequest] var _id = IdFactory.nextId + def id = _id + + override def toString: String = synchronized { + "RemoteRequest[isActor: " + isActor + " | message: " + message + " | method: " + method + " | target: " + target + " | isOneWay: " + isOneWay + "]" + } + + override def hashCode(): Int = synchronized { + var result = HashCode.SEED + result = HashCode.hash(result, isActor) + result = HashCode.hash(result, message) + result = HashCode.hash(result, method) + result = HashCode.hash(result, target) + result + } + + override def equals(that: Any): Boolean = synchronized { + that != null && + that.isInstanceOf[RemoteRequest] && + that.asInstanceOf[RemoteRequest].isActor == isActor && + that.asInstanceOf[RemoteRequest].message == message && + that.asInstanceOf[RemoteRequest].method == method && + that.asInstanceOf[RemoteRequest].target == target + } + + def newReplyWithMessage(message: AnyRef) = synchronized { new RemoteReply(true, id, message, null) } + + def newReplyWithException(error: Throwable) = synchronized { new RemoteReply(false, id, null, error) } + + def cloneWithNewMessage(message: AnyRef) = synchronized { + val request = new RemoteRequest(isActor, message, method, target, isOneWay) + request._id = id + request + } +} + +@serializable class RemoteReply(val successful: Boolean, val id: Long, val message: AnyRef, val exception: Throwable) { + override def toString: String = synchronized { + "RemoteReply[successful: " + successful + " | id: " + id + " | message: " + message + " | exception: " + exception + "]" + } + + override def hashCode(): Int = synchronized { + var result = HashCode.SEED + result = HashCode.hash(result, successful) + result = HashCode.hash(result, id) + result = HashCode.hash(result, message) + result = HashCode.hash(result, exception) + result + } + + override def equals(that: Any): Boolean = synchronized { + that != null && + that.isInstanceOf[RemoteReply] && + that.asInstanceOf[RemoteReply].successful == successful && + that.asInstanceOf[RemoteReply].id == id && + that.asInstanceOf[RemoteReply].message == message && + that.asInstanceOf[RemoteReply].exception == exception + } +} \ No newline at end of file diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala index ec4f7711af..42b2aa9f37 100644 --- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala @@ -10,15 +10,29 @@ */ package se.scalablesolutions.akka.kernel.reactor + +import java.util.concurrent.{LinkedBlockingQueue, TimeUnit, RejectedExecutionHandler, ThreadPoolExecutor} + class EventBasedThreadPoolDispatcher extends MessageDispatcherBase { import java.util.concurrent.Executors import java.util.HashSet // FIXME: make configurable using configgy + JMX - // FIXME: create one executor per invocation to dispatch(..), grab config settings for specific actor (set in registerHandler) - private val threadPoolSize: Int = 100 private val busyHandlers = new HashSet[AnyRef] - private val handlerExecutor = Executors.newCachedThreadPool() + + private val minNrThreads, maxNrThreads = 10 + private val timeOut = 1000L // ???? + private val timeUnit = TimeUnit.MILLISECONDS + private val threadFactory = new MonitorableThreadFactory("akka:kernel") + private val rejectedExecutionHandler = new RejectedExecutionHandler() { + def rejectedExecution(runnable: Runnable, threadPoolExecutor: ThreadPoolExecutor) { + + } + } + private val queue = new LinkedBlockingQueue[Runnable] + private val handlerExecutor = new ThreadPoolExecutor(minNrThreads, maxNrThreads, timeOut, timeUnit, queue, threadFactory, rejectedExecutionHandler) + + //private val handlerExecutor = Executors.newCachedThreadPool() def start = if (!active) { active = true diff --git a/kernel/src/main/scala/reactor/Reactor.scala b/kernel/src/main/scala/reactor/Reactor.scala index 461b7a8deb..86a9a73981 100644 --- a/kernel/src/main/scala/reactor/Reactor.scala +++ b/kernel/src/main/scala/reactor/Reactor.scala @@ -10,9 +10,10 @@ */ package se.scalablesolutions.akka.kernel.reactor +import java.util.concurrent.atomic.AtomicInteger +import java.util.concurrent.ThreadFactory import java.util.{LinkedList, Queue} -import kernel.util.HashCode - +import kernel.util.{Logging, HashCode} trait MessageHandler { def handle(message: MessageHandle) } @@ -75,3 +76,34 @@ class MessageQueue { queue.notifyAll } } + +class MonitorableThreadFactory(val name: String) extends ThreadFactory { + def newThread(runnable: Runnable) = + //new MonitorableThread(runnable, name) + new Thread(runnable) +} + +object MonitorableThread { + val DEFAULT_NAME = "MonitorableThread" + val created = new AtomicInteger + val alive = new AtomicInteger + @volatile val debugLifecycle = false +} +class MonitorableThread(runnable: Runnable, name: String) + extends Thread(runnable, name + "-" + MonitorableThread.created.incrementAndGet) {//with Logging { + setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { + def uncaughtException(thread: Thread, cause: Throwable) = {} //log.error("UNCAUGHT in thread [%s] cause [%s]", thread.getName, cause) + }) + + override def run = { + val debug = MonitorableThread.debugLifecycle + //if (debug) log.debug("Created %s", getName) + try { + MonitorableThread.alive.incrementAndGet + super.run + } finally { + MonitorableThread.alive.decrementAndGet + //if (debug) log.debug("Exiting %s", getName) + } + } +} diff --git a/kernel/src/main/scala/state/State.scala b/kernel/src/main/scala/state/State.scala index fbd2eb05dd..fec5b0871c 100644 --- a/kernel/src/main/scala/state/State.scala +++ b/kernel/src/main/scala/state/State.scala @@ -18,17 +18,17 @@ abstract class TransactionalRefConfig extends TransactionalStateConfig abstract class PersistentStorageConfig extends TransactionalStateConfig case class CassandraStorageConfig extends PersistentStorageConfig -case object TerracottaStorageConfig extends PersistentStorageConfig -case object TokyoCabinetStorageConfig extends PersistentStorageConfig +case class TerracottaStorageConfig extends PersistentStorageConfig +case class TokyoCabinetStorageConfig extends PersistentStorageConfig case class PersistentMapConfig(storage: PersistentStorageConfig) extends TransactionalMapConfig -case object InMemoryMapConfig extends TransactionalMapConfig +case class InMemoryMapConfig extends TransactionalMapConfig case class PersistentVectorConfig(storage: PersistentStorageConfig) extends TransactionalVectorConfig -case object InMemoryVectorConfig extends TransactionalVectorConfig +case class InMemoryVectorConfig extends TransactionalVectorConfig case class PersistentRefConfig(storage: PersistentStorageConfig) extends TransactionalRefConfig -case object InMemoryRefConfig extends TransactionalRefConfig +case class InMemoryRefConfig extends TransactionalRefConfig object TransactionalState extends TransactionalState class TransactionalState { @@ -39,13 +39,13 @@ class TransactionalState { * val myMap = TransactionalState.newMap(PersistentMapConfig(CassandraStorageConfig)) * */ - def newMap(config: TransactionalMapConfig): TransactionalMap = config match { + def newMap(config: TransactionalMapConfig) = config match { case PersistentMapConfig(storage) => storage match { case CassandraStorageConfig() => new CassandraPersistentTransactionalMap - case TerracottaStorageConfig => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig => throw new UnsupportedOperationException + case TerracottaStorageConfig() => throw new UnsupportedOperationException + case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - case InMemoryMapConfig => new InMemoryTransactionalMap + case InMemoryMapConfig() => new InMemoryTransactionalMap } /** @@ -54,13 +54,13 @@ class TransactionalState { * val myVector = TransactionalState.newVector(PersistentVectorConfig(CassandraStorageConfig)) * */ - def newVector(config: TransactionalVectorConfig): TransactionalVector = config match { + def newVector(config: TransactionalVectorConfig) = config match { case PersistentVectorConfig(storage) => storage match { case CassandraStorageConfig() => new CassandraPersistentTransactionalVector - case TerracottaStorageConfig => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig => throw new UnsupportedOperationException + case TerracottaStorageConfig() => throw new UnsupportedOperationException + case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - case InMemoryVectorConfig => new InMemoryTransactionalVector + case InMemoryVectorConfig() => new InMemoryTransactionalVector } /** @@ -69,13 +69,13 @@ class TransactionalState { * val myRef = TransactionalState.newRef(PersistentRefConfig(CassandraStorageConfig)) * */ - def newRef(config: TransactionalRefConfig): TransactionalRef = config match { + def newRef(config: TransactionalRefConfig) = config match { case PersistentRefConfig(storage) => storage match { case CassandraStorageConfig() => new CassandraPersistentTransactionalRef - case TerracottaStorageConfig => throw new UnsupportedOperationException - case TokyoCabinetStorageConfig => throw new UnsupportedOperationException + case TerracottaStorageConfig() => throw new UnsupportedOperationException + case TokyoCabinetStorageConfig() => throw new UnsupportedOperationException } - case InMemoryRefConfig => new TransactionalRef + case InMemoryRefConfig() => new TransactionalRef } } diff --git a/kernel/src/main/scala/stm/ChangeSet.scala b/kernel/src/main/scala/stm/ChangeSet.scala new file mode 100644 index 0000000000..e7ba12974b --- /dev/null +++ b/kernel/src/main/scala/stm/ChangeSet.scala @@ -0,0 +1,44 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.stm + +import kernel.state.{Transactional, TransactionalMap, TransactionalVector, TransactionalRef} +import kernel.util.Helpers.ReadWriteLock + +class ChangeSet(val id: String) { + private val lock = new ReadWriteLock + + private[kernel] def full: List[Transactional] = lock.withReadLock { + _maps ::: _vectors ::: _refs + } + + // TX Maps + private[kernel] var _maps: List[TransactionalMap[_, _]] = Nil + private[kernel] def maps_=(maps: List[TransactionalMap[_, _]]) = lock.withWriteLock { + _maps = maps + } + private[kernel] def maps: List[TransactionalMap[_, _]] = lock.withReadLock { + _maps + } + + // TX Vectors + private[kernel] var _vectors: List[TransactionalVector[_]] = Nil + private[kernel] def vectors_=(vectors: List[TransactionalVector[_]]) = lock.withWriteLock { + _vectors = vectors + } + private[kernel] def vectors: List[TransactionalVector[_]] = lock.withReadLock { + _vectors + } + + // TX Refs + private[kernel] var _refs: List[TransactionalRef[_]] = Nil + private[kernel] def refs_=(refs: List[TransactionalRef[_]]) = lock.withWriteLock { + _refs = refs + } + private[kernel] def refs: List[TransactionalRef[_]] = lock.withReadLock { + _refs + } +} + diff --git a/kernel/src/main/scala/stm/Transaction.scala b/kernel/src/main/scala/stm/Transaction.scala index ef7d7290b5..8fa7384d00 100644 --- a/kernel/src/main/scala/stm/Transaction.scala +++ b/kernel/src/main/scala/stm/Transaction.scala @@ -5,7 +5,6 @@ package se.scalablesolutions.akka.kernel.stm import java.util.concurrent.atomic.{AtomicInteger, AtomicLong} -import kernel.actor.ChangeSet import kernel.util.Logging import scala.collection.mutable.{HashSet, HashMap} diff --git a/lib/aspectwerkz-nodeps-jdk5-2.1.jar b/lib/aspectwerkz-nodeps-jdk5-2.1.jar index c655479cbe..62725b9769 100644 Binary files a/lib/aspectwerkz-nodeps-jdk5-2.1.jar and b/lib/aspectwerkz-nodeps-jdk5-2.1.jar differ