Merge branch 'master' of git@github.com:jboner/akka

This commit is contained in:
debasishg 2009-08-14 16:09:20 +05:30
commit 05908f9273
109 changed files with 4788 additions and 5174 deletions

3655
akka.ipr

File diff suppressed because it is too large Load diff

4257
akka.iws

File diff suppressed because it is too large Load diff

View file

@ -1,95 +0,0 @@
#!/bin/bash
VERSION=0.5
BASE_DIR=$(dirname $0)/..
echo 'Starting Akka Kernel from directory' $BASE_DIR
echo 'Resetting persistent storage in' $BASE_DIR/storage
rm -rf $BASE_DIR/storage
mkdir $BASE_DIR/storage
mkdir $BASE_DIR/storage/bootstrap
mkdir $BASE_DIR/storage/callouts
mkdir $BASE_DIR/storage/commitlog
mkdir $BASE_DIR/storage/data
mkdir $BASE_DIR/storage/system
LIB_DIR=$BASE_DIR/lib
CLASSPATH=$BASE_DIR/config
CLASSPATH=$CLASSPATH:$LIB_DIR/akka-kernel-0.5.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/akka-util-java-0.5.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/antlr-3.1.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/aopalliance-1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/asm-3.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/aspectwerkz-nodeps-jdk5-2.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/atmosphere-core-0.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/atmosphere-portable-runtime-0.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/camel-core-2.0-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/atmosphere-compat-0.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/cassandra-0.4.0-dev.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/cglib-2.2.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-cli-1.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-collections-3.2.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-io-1.3.2.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-javaflow-1.0-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-lang-2.4.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-logging-1.0.4.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/commons-math-1.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/configgy-1.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/fscontext.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/google-collect-snapshot-20090211.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/grizzly-comet-webserver-1.8.6.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/guice-core-2.0-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/guice-jsr250-2.0-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/high-scale-lib.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jackson-core-asl-1.1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jackson-mapper-asl-1.1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/javautils-2.7.4-0.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-client-1.1.1-ea.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-core-1.1.1-ea.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-json-1.1.1-ea.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-server-1.1.1-ea.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jersey-scala-1.1.2-ea-SNAPSHOT.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/JSAP-2.1.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jsr250-api-1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/jsr311-api-1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/libfb303.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/libthrift.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/log4j-1.2.15.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/lucene-core-2.2.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/netty-3.1.0.GA.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/providerutil.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/protobuf-java-2.1.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/scala-library-2.7.5.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/servlet-api-2.5.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/slf4j-api-1.4.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/slf4j-log4j12-1.4.3.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/stringtemplate-3.0.jar
CLASSPATH=$CLASSPATH:$LIB_DIR/zookeeper-3.1.0.jar
# Add for debugging: -Xdebug -Xrunjdwp:transport=dt_socket,server=y,suspend=y,address=5005 \
# To have Akka dump the generated classes, add the '-Daspectwerkz.transform.dump=*' option and it will dump classes to $BASE_DIR/_dump
JVM_OPTS=" \
-server \
-Xms128M \
-Xmx2G \
-XX:SurvivorRatio=8 \
-XX:TargetSurvivorRatio=90 \
-XX:+AggressiveOpts \
-XX:+UseParNewGC \
-XX:+UseConcMarkSweepGC \
-XX:CMSInitiatingOccupancyFraction=1 \
-XX:+CMSParallelRemarkEnabled \
-XX:+HeapDumpOnOutOfMemoryError \
-Dcom.sun.management.jmxremote.port=8080 \
-Dcom.sun.management.jmxremote.ssl=false \
-Djava.naming.factory.initial=com.sun.jndi.fscontext.RefFSContextFactory \
-Dcom.sun.grizzly.cometSupport=true \
-Dcom.sun.management.jmxremote.authenticate=false"
#$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.kernel.Kernel ${1}

119
buildfile
View file

@ -1,119 +0,0 @@
require 'buildr/scala'
#options.test = false
VERSION_NUMBER = '0.1'
ENV['AKKA_HOME'] ||= '.'
repositories.remote << 'http://www.ibiblio.org/maven2'
repositories.remote << 'http://scala-tools.org/repo-releases'
repositories.remote << 'http://scala-tools.org/repo-snapshots'
repositories.remote << 'http://www.lag.net/repo'
AKKA_KERNEL = 'se.scalablesolutions.akka:akka-kernel:jar:0.1'
AKKA_UTIL_JAVA = 'se.scalablesolutions.akka:akka-util-java:jar:0.1'
AKKA_API_JAVA = 'se.scalablesolutions.akka:akka-api-java:jar:0.1'
GUICEYFRUIT = ['org.guiceyfruit:guice-core:jar:2.0-SNAPSHOT',
'org.guiceyfruit:guice-jsr250:jar:2.0-SNAPSHOT']
JERSEY = ['com.sun.jersey:jersey-core:jar:1.0.1',
'com.sun.jersey:jersey-server:jar:1.0.1',
'com.sun.jersey:jersey-json:jar:1.0.1',
'com.sun.jersey:jersey-atom:jar:1.0.1',
'javax.ws.rs:jsr311-api:jar:1.0']
GRIZZLY = 'com.sun.grizzly:grizzly-servlet-webserver:jar:1.8.6.3'
NETTY = 'org.jboss.netty:netty:jar:3.1.0.BETA2'
CASSANDRA = 'org.apache.cassandra:cassandra:jar:0.3.0-dev'
CAMEL = 'org.apache.camel:camel-core:jar:2.0-SNAPSHOT'
THRIFT = 'com.facebook:thrift:jar:1.0'
FB303 = 'com.facebook:fb303:jar:1.0'
CONFIGGY = 'net.lag:configgy:jar:1.2'
JSR_250 = 'javax.annotation:jsr250-api:jar:1.0'
SLF4J = ['org.slf4j:slf4j-log4j12:jar:1.4.3',
'org.slf4j:slf4j-api:jar:1.4.3',
'log4j:log4j:jar:1.2.13']
COMMONS_LOGGING = 'commons-logging:commons-logging:jar:1.1.1'
JDOM = 'jdom:jdom:jar:1.0'
CGLIB = 'cglib:cglib-nodep:jar:2.1_3'
AOPALLIANCE = 'aopalliance:aopalliance:jar:1.0'
GOOGLE_COLLECT = 'com.google.code.google-collections:google-collect:jar:snapshot-20080530'
SCALA = 'org.scala-lang:scala-library:jar:2.7.3'
SCALATEST = 'org.scala-tools.testing:scalatest:jar:0.9.5'
JUNIT4 = 'junit:junit:jar:4.5'
JUNIT4RUNNER = 'com.jteigen.scalatest:junit4runner:jar:1.0'
#VOLDEMORT = ['voldemort:voldemort:jar:0.4a',
# 'voldemort:voldemort-contrib:jar:0.4a']
#MINA_CORE = 'com.assembla.scala.mina:mina-core:jar:2.0.0-M2-SNAPSHOT'
#MINA_SCALA = 'com.assembla.scala.mina:mina-integration-scala:jar:2.0.0-M2-SNAPSHOT'
#ZOOKEEPER = 'org.apache:zookeeper:jar:3.1.0'
desc 'The Akka Actor Kernel'
define 'akka' do
project.version = VERSION_NUMBER
project.group = 'se.scalablesolutions.akka'
manifest['Copyright'] = 'Scalable Solutions (C) 2009'
compile.options.target = '1.5'
desc 'Akka Java Utilities (annotations and guice module)'
define 'util-java' do
compile.with(GUICEYFRUIT, AOPALLIANCE)
package :jar
end
desc 'Akka Actor Kernel core implementation'
define 'kernel' do
compile.with(
AKKA_UTIL_JAVA, GUICEYFRUIT, AOPALLIANCE, NETTY, JERSEY, GRIZZLY,
CASSANDRA, THRIFT, FB303, CAMEL, SLF4J, GOOGLE_COLLECT, CGLIB, JSR_250,
COMMONS_LOGGING, CONFIGGY, JUNIT4RUNNER, JUNIT4, SCALATEST)
test.using :junit
package :jar
end
desc 'Akka Java API'
define 'api-java' do
compile.with(AKKA_KERNEL, AKKA_UTIL_JAVA, NETTY, JERSEY, GRIZZLY,
CASSANDRA, THRIFT, FB303, CAMEL, SLF4J, CONFIGGY, GUICEYFRUIT, SCALA,
GOOGLE_COLLECT, AOPALLIANCE, CGLIB, JSR_250)
test.using :junit
package :jar
end
#desc 'Akka DB'
#define 'db' do
# compile.with(AKKA_KERNEL, MINA_CORE, MINA_SCALA, ZOOKEEPER, CONFIGGY, SLF4J)
# test.using :scalatest
# package :jar
#end
package(:zip).include 'README'
package(:zip).include 'bin/*', :path=>'bin'
package(:zip).include 'config/*', :path=>'config'
package(:zip).include 'kernel/lib/*', :path=>'lib'
package(:zip).include 'kernel/target/*.jar', :path=>'lib'
package(:zip).include 'api-java/target/*.jar', :path=>'lib'
package(:zip).include 'util-java/target/*.jar', :path=>'lib'
task :run => [:package] do |t|
puts "-------------------------"
puts "Running Akka Actor Kernel"
puts "-------------------------"
puts "\n"
# uri = URI("file://./lib")
# uri.upload file('kernel')
cp = [SCALA, GUICEYFRUIT, JERSEY, CASSANDRA, GOOGLE_COLLECT, JDOM, ZOOKEEPER, SLF4J, GRIZZLY, CONFIGGY, project('kernel').package(:jar)]
# Java.java('se.scalablesolutions.akka.kernel.Kernel', {:classpath => '-cp ' + cp})
# cp = FileList[_('lib/*')].join(File::PATH_SEPARATOR)
puts "Running with classpath:\n" + cp
Java.java('se.scalablesolutions.akka.Boot', 'se.scalablesolutions.akka.kernel.Kernel', {:classpath => cp})
end
end

47
changes.xml Normal file
View file

@ -0,0 +1,47 @@
<!-- mini guide
<action
invoke with 'mvn changes:changes-report'
dev="Name of developer who committed the change"
type="add|fix|remove|update"
issue="(optional) Id of the issue related to this change"
due-to="(optional)Name of the person 'non-committer' to be credited for this change"
>
description
</action>
see http://maven.apache.org/plugins/maven-changes-plugin/usage.html for full guide
-->
<document>
<properties>
<title>Akka Release Notes</title>
<author></author>
</properties>
<body>
<release version="0.6" date="" description="">
<action dev="Debasish Ghosh" type="add">MongoDB as Akka storage backend </action>
<action dev="Tim Perrett" type="add">Support for using the Lift Web framework with Actors</action>
<action dev="Viktor Klang" type="add">Support for using Scala XML tags in RESTful Actors (scala-jersey)</action>
<action dev="Viktor Klang" type="add">Support for Comet Actors using Atmosphere</action>
<action dev="Jonas Bon&#233;r" type="add">Statistics recorder with JMX and REST APIs</action>
<action dev="Jonas Bon&#233;r" type="add">Management service with JMX and REST APIs</action>
<action dev="Jonas Bon&#233;r" type="add">JSON serialization for Java objects (using Jackson)</action>
<action dev="Jonas Bon&#233;r" type="add">JSON serialization for Scala objects (using scala-json)</action>
<action dev="Jonas Bon&#233;r" type="add">Protobuf serialization for Java and Scala objects</action>
<action dev="Jonas Bon&#233;r" type="add">SBinary serialization for Scala objects</action>
<action dev="Jonas Bon&#233;r" type="add">Protobuf as remote protocol</action>
<action dev="Jonas Bon&#233;r" type="add">Added CassandraSession API (with socket pooling) wrapping Cassandra's Thrift API in Scala and Java APIs</action>
<action dev="Jonas Bon&#233;r" type="add">CassandraStorage is now works with external Cassandra cluster</action>
<action dev="Jonas Bon&#233;r" type="remove">Removed embedded Cassandra mode</action>
<action dev="Jonas Bon&#233;r" type="remove">Removed startup scripts and lib dir</action>
<action dev="Jonas Bon&#233;r" type="add">ActorRegistry for retrieving Actor instances</action>
<action dev="Jonas Bon&#233;r" type="add">Now start up kernel with 'java -jar dist/akka-0.6.jar'</action>
<action dev="Jonas Bon&#233;r" type="fix">Concurrent mode is now per actor basis</action>
<action dev="Jonas Bon&#233;r" type="fix">Made Akka Web App aware, does not require AKKA_HOME when using it as a library</action>
<action dev="Jonas Bon&#233;r" type="fix">Fixed dispatcher bug</action>
<action dev="Jonas Bon&#233;r" type="fix">Cleaned up Maven scripts and distribution in general</action>
<action dev="Jonas Bon&#233;r" type="add">Added mailing list: akka-user@googlegroups.com</action>
<action dev="Jonas Bon&#233;r" type="add">Improved and restructured documentation</action>
<action dev="Jonas Bon&#233;r" type="add">New URL: http://akkasource.org</action>
</release>
<release version="0.5" date="2009-07-12" description="First public release" />
</body>
</document>

View file

@ -1,46 +1,45 @@
####################
# Akka Config File #
####################
# This file has all the default settings, so all these could be remove with no visible effect.
# Modify as needed.
<log>
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
filename = "./logs/akka.log"
roll = "daily" # Options: never, hourly, daily, sunday/monday/...
level = "debug" # Options: fatal, critical, error, warning, info, debug, trace
console = on
# syslog_host = ""
# syslog_server_name = ""
</log>
<akka>
version = "v0.5"
#boot = ["sample.scala.Boot"] # FQN to the class doing initial active object/actor
boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
version = "0.6"
boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
# supervisor bootstrap, should be defined in default constructor
<actor>
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
timeout = 5000 # default timeout for future based invocations
serialize-messages = off # does a deep clone of (non-primitive) messages to ensure immutability
</actor>
<stm>
service = on
restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
# if 'off' then throws an exception or rollback for user to handle
wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
distributed = off # not implemented yet
restart-on-collision = off # (not implemented yet) if 'on' then it reschedules the transaction,
# if 'off' then throws an exception or rollback for user to handle
wait-for-completion = 100 # how long time in millis a transaction should be given time to complete when a collision is detected
wait-nr-of-times = 3 # the number of times it should check for completion of a pending transaction upon collision
distributed = off # not implemented yet
</stm>
<remote>
service = on
service = on
hostname = "localhost"
port = 9999
connection-timeout = 1000 # in millis
connection-timeout = 1000 # in millis
</remote>
<rest>
service = on
hostname = "localhost"
@ -48,26 +47,22 @@
</rest>
<storage>
system = "cassandra" # Options: cassandra (coming: terracotta, mongodb, redis, tokyo-cabinet, voldemort, memcached)
system = "cassandra" # Options: cassandra, mongodb
<cassandra>
service = on
hostname = "127.0.0.1" # ip address or hostname of one of the Cassandra cluster's seeds
hostname = "127.0.0.1" # IP address or hostname of one of the Cassandra cluster's seeds
port = 9160
storage-format = "java" # Options: java, scala-json, java-json, protobuf
consistency-level = 1 #
storage-format = "java" # Options: java, scala-json, java-json, protobuf
consistency-level = 1
</cassandra>
</storage>
<storage>
system = "mongodb"
<mongodb>
service = on
hostname = "127.0.0.1" # ip address or hostname of one of the Cassandra cluster's seeds
hostname = "127.0.0.1" # IP address or hostname of the MongoDB DB instance
port = 27017
dbname = "mydb"
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
storage-format = "scala-json" # Options: java, scala-json, java-json, protobuf
</mongodb>
</storage>
</akka>

View file

@ -15,6 +15,6 @@ log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%5p [%t] %d{ISO8601} %F (line %L) %m%n
# Edit the next line to point to your logs directory
log4j.appender.R.File=./logs/cassandra.log
log4j.appender.R.File=./logs/akka.log
log4j.logger.org.atmosphere=DEBUG

View file

@ -2,7 +2,7 @@
<!-- Associate a context-root (servlet-mapping) to an AtmosphereHandler.
Request sent using that context-root will be mapped to its associated AtmosphereHandler
-->
<atmosphere-handler context-root="" class-name="se.scalablesolutions.akka.kernel.jersey.AkkaServlet" broadcaster="org.atmosphere.core.JerseyBroadcaster">
<atmosphere-handler context-root="" class-name="se.scalablesolutions.akka.kernel.rest.AkkaServlet" broadcaster="org.atmosphere.core.JerseyBroadcaster">
<!-- Define some AtmosphereHandler properties -->
<property name="com.sun.jersey.spi.container.ResourceFilters" value="org.atmosphere.core.AtmosphereFilter"/>
</atmosphere-handler>

View file

@ -3,6 +3,6 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.twitter</groupId>
<artifactId>scala-json</artifactId>
<version>0.1</version>
<version>1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.twitter</groupId>
<artifactId>scala-stats</artifactId>
<version>1.0</version>
<packaging>jar</packaging>
</project>

View file

@ -1,8 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra</artifactId>
<version>0.4.0-dev</version>
<packaging>jar</packaging>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra</artifactId>
<version>0.4.0-trunk</version>
<packaging>jar</packaging>
</project>

View file

View file

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-jdk5</artifactId>
<version>2.1</version>
<packaging>jar</packaging>
</project>

View file

@ -65,7 +65,7 @@
</dependencies>
<build>
<sourceDirectory>src/main</sourceDirectory>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/test/java</testSourceDirectory>
<plugins>
<plugin>
@ -84,7 +84,6 @@
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<excludes>
<exclude>**/Abstract*</exclude>
<exclude>**/*Persistent*</exclude>
</excludes>
</configuration>

View file

@ -21,7 +21,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
protected void setUp() {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher("name");
dispatcher
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(16)

View file

@ -1,6 +0,0 @@
# For Unix:
# mvn -o <target> |sed -e s/\\[WARNING\\][[:space:]]//g |grep -v "Finished at"
install:
mvn -o compile |sed -e 's/\[INFO\] //g' |sed -e 's/\[WARNING\] //g' |grep -v "Finished at" |grep -v "Total time"

View file

@ -11,7 +11,8 @@
<parent>
<artifactId>akka</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.5</version>
<version>0.6</version>
<relativePath>../pom.xml</relativePath>
</parent>
<!-- Core deps -->
@ -19,7 +20,7 @@
<dependency>
<artifactId>akka-util-java</artifactId>
<groupId>se.scalablesolutions.akka</groupId>
<version>0.5</version>
<version>0.6</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
@ -31,16 +32,21 @@
<artifactId>aspectwerkz-nodeps-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>org.codehaus.aspectwerkz</groupId>
<artifactId>aspectwerkz-jdk5</artifactId>
<version>2.1</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scala-stats</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>net.lag</groupId>
<artifactId>configgy</artifactId>
<version>1.3</version>
</dependency>
<dependency>
<groupId>org.guiceyfruit</groupId>
<artifactId>guiceyfruit-core</artifactId>
<version>2.0</version>
</dependency>
<dependency>
<groupId>org.guiceyfruit</groupId>
<artifactId>guice-core</artifactId>
@ -105,7 +111,7 @@
<dependency>
<groupId>org.apache.cassandra</groupId>
<artifactId>cassandra</artifactId>
<version>0.4.0-dev</version>
<version>0.4.0-trunk</version>
</dependency>
<dependency>
<groupId>com.facebook</groupId>
@ -117,33 +123,13 @@
<artifactId>fb303</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>commons-collections</groupId>
<artifactId>commons-collections</artifactId>
<version>3.2.1</version>
</dependency>
<dependency>
<groupId>high-scale-lib</groupId>
<artifactId>high-scale-lib</artifactId>
<version>1.0</version>
</dependency>
<dependency>
<groupId>commons-lang</groupId>
<artifactId>commons-lang</artifactId>
<version>2.4</version>
</dependency>
<dependency>
<groupId>se.foldleft</groupId>
<artifactId>cassidy</artifactId>
<version>0.1</version>
</dependency>
<dependency>
<groupId>commons-pool</groupId>
<artifactId>commons-pool</artifactId>
<version>1.5.1</version>
</dependency>
<!-- For Jersey -->
<!-- For Jersey & Atmosphere -->
<dependency>
<groupId>com.sun.grizzly</groupId>
<artifactId>grizzly-comet-webserver</artifactId>
@ -214,73 +200,40 @@
<version>0.9.5</version>
<scope>test</scope>
</dependency>
<!--dependency>
<groupId>com.jteigen.scalatest</groupId>
<artifactId>junit4runner</artifactId>
<version>1.0</version>
<scope>test</scope>
</dependency-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.5</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.sun.jersey</groupId>
<artifactId>jersey-client</artifactId>
<version>1.1.0-ea</version>
</dependency>
</dependencies>
<build>
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<artifactId>maven-assembly-plugin</artifactId>
<version>2.2-beta-2</version>
<executions>
<execution>
<id>create-executable-jar</id>
<phase>install</phase>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
<goal>single</goal>
</goals>
<configuration>
<descriptorRefs>
<descriptorRef>
jar-with-dependencies
</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>se.scalablesolutions.akka.kernel.Kernel</mainClass>
</manifest>
</archive>
</configuration>
</execution>
</executions>
<configuration>
<args>
<arg>-target:jvm-1.5</arg>
<!--<arg>-unchecked</arg>-->
</args>
<scalaVersion>2.7.5</scalaVersion>
<vscaladocVersion>1.1</vscaladocVersion>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>
ch.epfl.lamp.sdt.core.scalabuilder
</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>
ch.epfl.lamp.sdt.core.scalanature
</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>
org.eclipse.jdt.launching.JRE_CONTAINER
</classpathContainer>
<classpathContainer>
ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<artifactId>maven-antrun-plugin</artifactId>
@ -289,8 +242,8 @@
<phase>install</phase>
<configuration>
<tasks>
<copy file="target/akka-kernel-${akka.version}.jar"
tofile="../lib/akka-kernel-${akka.version}.jar"/>
<copy file="target/akka-kernel-${akka.version}-jar-with-dependencies.jar"
tofile="../dist/akka-${akka.version}.jar"/>
</tasks>
</configuration>
<goals>
@ -303,30 +256,19 @@
<resources>
<resource>
<filtering>false</filtering>
<directory>src/main/resources</directory>
<directory>../config</directory>
<includes>
<include>akka.conf</include>
<include>akka-reference.conf</include>
</includes>
</resource>
<resource>
<filtering>false</filtering>
<directory>src/main/scala</directory>
<directory>src/main/resources</directory>
<includes>
<include>**</include>
<include>META-INF/*</include>
</includes>
<excludes>
<exclude>**/*.scala</exclude>
</excludes>
</resource>
</resources>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<vscaladocVersion>1.1</vscaladocVersion>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>

View file

@ -0,0 +1,8 @@
<!DOCTYPE aspectwerkz PUBLIC "-//AspectWerkz//DTD//EN" "http://aspectwerkz.codehaus.org/dtd/aspectwerkz2.dtd">
<aspectwerkz>
<system id="akka">
<package name="se.scalablesolutions.akka.kernel.actor">
<aspect class="ActiveObjectAspect" />
</package>
</system>
</aspectwerkz>

View file

@ -1,9 +0,0 @@
<!DOCTYPE aspectwerkz PUBLIC
"-//AspectWerkz//DTD 2.0//EN"
"http://aspectwerkz.codehaus.org/dtd/aspectwerkz_2_0.dtd">
<aspectwerkz>
<system id="akka">
<aspect class="se.scalablesolutions.akka.kernel.config.ConfigurationAspect"/>
</system>
</aspectwerkz>

79
kernel/src/main/scala/Kernel.scala Normal file → Executable file
View file

@ -14,40 +14,44 @@ import java.net.URLClassLoader
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment, ParseException}
import kernel.jersey.AkkaCometServlet
import kernel.rest.AkkaCometServlet
import kernel.nio.RemoteServer
import kernel.state.CassandraStorage
import kernel.util.Logging
import kernel.management.Management
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Kernel extends Logging {
@volatile private var hasBooted = false
val VERSION = "0.6"
val HOME = {
val home = System.getenv("AKKA_HOME")
if (home == null || home == "") None
if (home == null) None
else Some(home)
}
val config = setupConfig
val CONFIG_VERSION = config.getString("akka.version", "0")
if (VERSION != CONFIG_VERSION) throw new IllegalStateException("Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
val BOOT_CLASSES = config.getList("akka.boot")
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true)
val RUN_MANAGEMENT_SERVICE = config.getBool("akka.management.service", true)
val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
val REST_HOSTNAME = kernel.Kernel.config.getString("akka.rest.hostname", "localhost")
val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = kernel.Kernel.config.getInt("akka.rest.port", 9998)
// FIXME add API to shut server down gracefully
@volatile private var hasBooted = false
private var remoteServer: RemoteServer = _
private var jerseySelectorThread: SelectorThread = _
private val startTime = System.currentTimeMillis
private var applicationLoader: Option[ClassLoader] = None
def main(args: Array[String]) = boot
def boot = synchronized {
@ -55,21 +59,24 @@ object Kernel extends Logging {
printBanner
log.info("Starting Akka...")
runApplicationBootClasses
if (RUN_REMOTE_SERVICE) startRemoteService
if (RUN_MANAGEMENT_SERVICE) startManagementService
STORAGE_SYSTEM match {
case "cassandra" => startCassandra
case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported")
case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]")
}
if (RUN_REST_SERVICE) startJersey
runApplicationBootClasses
if (RUN_REST_SERVICE) startREST
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
log.info("Akka started successfully")
hasBooted = true
}
@ -78,54 +85,64 @@ object Kernel extends Logging {
def uptime = (System.currentTimeMillis - startTime) / 1000
def setupConfig: Config = {
try {
Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
log.info("Config loaded from the application classpath.")
} catch {
case e: ParseException =>
if (HOME.isDefined) {
try {
if (HOME.isDefined) {
val configFile = HOME.get + "/config/akka.conf"
log.info("AKKA_HOME is defined to [%s], loading config from [%s].", HOME.get, configFile)
Configgy.configure(configFile)
} else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
val configFile = HOME.get + "/config/akka.conf"
Configgy.configure(configFile)
log.info("AKKA_HOME is defined to [%s], config loaded from [%s].", HOME.get, configFile)
} catch {
case e: ParseException => throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
case e: ParseException => throw new IllegalStateException("'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] - aborting. Either add it in the 'config' directory or add it to the classpath.")
}
}
//val runtime = new RuntimeEnvironment(getClass)
//runtime.load(args)
} else {
try {
Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
log.info("Config loaded from the application classpath.")
} catch {
case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
}
}
val config = Configgy.config
config.registerWithJmx("com.scalablesolutions.akka.config")
config.registerWithJmx("com.scalablesolutions.akka")
// FIXME fix Configgy JMX subscription to allow management
// config.subscribe { c => configure(c.getOrElse(new Config)) }
config
}
private[akka] def runApplicationBootClasses: Unit = {
private[akka] def runApplicationBootClasses = {
new management.RestfulJMXBoot // add the REST/JMX service
val loader =
if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) getClass.getClassLoader
else if (HOME.isDefined) {
if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
val DEPLOY = HOME.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) { log.error("Could not find a deploy directory at [" + DEPLOY + "]"); System.exit(-1) }
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
//val toDeploy = DEPLOY_DIR.toURL :: (for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL)
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
} else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
getClass.getClassLoader
} else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
for (clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
applicationLoader = Some(loader)
}
private[akka] def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
val remoteServerThread = new Thread(new Runnable() { def run = RemoteServer.start }, "akka remote service")
val remoteServerThread = new Thread(new Runnable() {
def run = RemoteServer.start(applicationLoader)
}, "Akka Remote Service")
remoteServerThread.start
}
private[akka] def startManagementService = {
Management("se.scalablesolutions.akka.management")
log.info("Management service started successfully.")
}
private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
System.setProperty("cassandra", "")
if (HOME.isDefined) System.setProperty("storage-config", HOME.get + "/config/")
@ -133,7 +150,7 @@ object Kernel extends Logging {
CassandraStorage.start
}
private[akka] def startJersey = {
private[akka] def startREST = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val scheme = uri.getScheme
@ -169,7 +186,7 @@ object Kernel extends Logging {
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
""")
log.info(" Running version " + config.getString("akka.version", "Awesome"))
log.info(" Running version " + VERSION)
log.info("==============================")
}

View file

@ -14,9 +14,11 @@ import kernel.config.ScalaConfig._
import kernel.util._
import serialization.Serializer
import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice}
import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice, Advice}
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
import org.codehaus.aspectwerkz.aspect.management.Aspects
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
@ -218,24 +220,28 @@ object ActiveObject {
}
private[kernel] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
//if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(target, false, true)
actor.initialize(target, proxy)
// FIXME switch to weaving in the aspect at compile time
proxy.asInstanceOf[Advisable].aw_addAdvice(
MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remoteAddress, timeout))
actor.timeout = timeout
actor.start
AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout))
proxy.asInstanceOf[T]
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
//if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
actor.initialize(target.getClass, target)
proxy.asInstanceOf[Advisable].aw_addAdvice(
MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remoteAddress, timeout))
actor.timeout = timeout
actor.start
AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout))
proxy.asInstanceOf[T]
}
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
object factory extends SupervisorFactory {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
@ -246,20 +252,46 @@ object ActiveObject {
}
}
object AspectInitRegistry {
private val inits = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
def initFor(target: AnyRef) = {
val init = inits.get(target)
inits.remove(target)
init
}
def register(target: AnyRef, init: AspectInit) = inits.put(target, init)
}
sealed case class AspectInit(
val target: Class[_],
val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long)
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable
sealed class ActorAroundAdvice(val target: Class[_],
val targetInstance: AnyRef,
val actor: Dispatcher,
val remoteAddress: Option[InetSocketAddress],
val timeout: Long) extends AroundAdvice {
val id = target.getName
actor.timeout = timeout
actor.start
def invoke(joinpoint: JoinPoint): AnyRef = dispatch(joinpoint)
@Aspect("perInstance")
sealed class ActiveObjectAspect {
@volatile var isInitialized = false
var target: Class[_] = _
var actor: Dispatcher = _
var remoteAddress: Option[InetSocketAddress] = _
var timeout: Long = _
@Around("execution(* *..*(..))")
def invoke(joinpoint: JoinPoint): AnyRef = {
if (!isInitialized) {
val init = AspectInitRegistry.initFor(joinpoint.getThis)
target = init.target
actor = init.actor
remoteAddress = init.remoteAddress
timeout = init.timeout
isInitialized = true
}
dispatch(joinpoint)
}
private def dispatch(joinpoint: JoinPoint) = {
if (remoteAddress.isDefined) remoteDispatch(joinpoint)

View file

@ -8,14 +8,17 @@ import com.google.protobuf.ByteString
import java.net.InetSocketAddress
import java.util.concurrent.CopyOnWriteArraySet
import kernel.reactor._
import kernel.config.ScalaConfig._
import kernel.stm.TransactionManagement
import kernel.util.Helpers.ReadWriteLock
import kernel.nio.protobuf.RemoteProtocol.RemoteRequest
import kernel.util.Logging
import reactor._
import config.ScalaConfig._
import stm.TransactionManagement
import util.Helpers.ReadWriteLock
import nio.protobuf.RemoteProtocol.RemoteRequest
import util.Logging
import serialization.{Serializer, Serializable, SerializationProtocol}
import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
import management.Management
import com.twitter.service.Stats
sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage
@ -42,14 +45,17 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Actor {
val TIMEOUT = kernel.Kernel.config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = kernel.Kernel.config.getBool("akka.actor.serialize-messages", false)
val TIMEOUT = Kernel.config.getInt("akka.actor.timeout", 5000)
val SERIALIZE_MESSAGES = Kernel.config.getBool("akka.actor.serialize-messages", false)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@serializable trait Actor extends Logging with TransactionManagement {
trait Actor extends Logging with TransactionManagement {
Stats.getCounter("NrOfActors").incr
ActorRegistry.register(this)
@volatile private[this] var isRunning: Boolean = false
private[this] val remoteFlagLock = new ReadWriteLock
private[this] val transactionalFlagLock = new ReadWriteLock
@ -64,6 +70,8 @@ object Actor {
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
val name = this.getClass.getName
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
@ -96,7 +104,7 @@ object Actor {
* </pre>
*/
protected[kernel] var dispatcher: MessageDispatcher = {
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher
@ -529,6 +537,8 @@ object Actor {
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
if (Management.RECORD_STATS) Stats.getCounter("NrOfFailures_" + dead.name).incr
if (trapExit) {
if (faultHandler.isDefined) {
faultHandler.get match {
@ -546,6 +556,7 @@ object Actor {
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
private[Actor] def restart(reason: AnyRef) = synchronized {
if (Management.RECORD_STATS) Stats.getCounter("NrOfRestarts_" + name).incr
lifeCycleConfig match {
case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.")

View file

@ -0,0 +1,33 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.actor
import kernel.util.Logging
import scala.collection.jcl.HashMap
/**
* Registry holding all actor instances, mapped by class..
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object ActorRegistry extends Logging {
private val actors = new HashMap[String, List[Actor]]
def actorsFor(clazz: Class[_]): List[Actor] = synchronized {
actors.get(clazz.getName) match {
case None => Nil
case Some(instances) => instances
}
}
def register(actor: Actor) = synchronized {
val name = actor.getClass.getName
actors.get(name) match {
case Some(instances) => actors + (name -> (actor :: instances))
case None => actors + (name -> (actor :: Nil))
}
}
}

View file

@ -0,0 +1,187 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.management
import com.twitter.service.Stats
import scala.collection.jcl
import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.ThreadPoolExecutor
import java.lang.management.ManagementFactory
import javax.{management => jmx}
import javax.management.remote.{JMXConnectorServerFactory, JMXServiceURL}
import kernel.Kernel.config
import kernel.util.Logging
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Management extends Logging {
val RECORD_STATS = config.getBool("akka.management.record-stats", true)
private var name = "se.scalablesolutions.akka"
private val mbeanServer = ManagementFactory.getPlatformMBeanServer
def apply() = {}
def apply(packageName: String) = name = packageName
java.rmi.registry.LocateRegistry.createRegistry(1099)
JMXConnectorServerFactory.newJMXConnectorServer(
new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"),
null,
mbeanServer).start
registerMBean(new StatisticsMBean, "Stats")
def registerMBean(mbean: jmx.DynamicMBean, mbeanType: String) = {
val objectName = new jmx.ObjectName(name + ":type=" + mbeanType)
try { mbeanServer.getMBeanInfo(objectName) } catch {
case e: jmx.InstanceNotFoundException =>
mbeanServer.registerMBean(mbean, objectName)
}
}
def getStats(reset: Boolean) = {
var statistics = new ArrayBuffer[Tuple2[String, String]]
statistics += (("current time", (System.currentTimeMillis / 1000).toString))
statistics += (("akka version", Kernel.VERSION))
statistics += (("uptime", Kernel.uptime.toString))
for ((key, value) <- Stats.getJvmStats) statistics += (key, value.toString)
for ((key, value) <- Stats.getCounterStats) statistics += (key, value.toString)
for ((key, value) <- Stats.getTimingStats(reset)) statistics += (key, value.toString)
for ((key, value) <- Stats.getGaugeStats(reset)) statistics += (key, value.toString)
val report = {for ((key, value) <- statistics) yield "STAT %s %s".format(key, value)}.mkString("", "\r\n", "\r\n")
log.info("=========================================\n\t--- Statistics Report ---\n%s=========================================", report)
report
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class StatisticsMBean extends jmx.DynamicMBean {
def getMBeanInfo = new jmx.MBeanInfo(
"se.scalablesolutions.akka.kernel.management.StatisticsMBean",
"runtime statistics",
getAttributeInfo,
null, null, null,
new jmx.ImmutableDescriptor("immutableInfo=false"))
def getAttribute(name: String): AnyRef = {
val segments = name.split("_", 2)
segments(0) match {
case "counter" =>
Stats.getCounterStats()(segments(1)).asInstanceOf[java.lang.Long]
case "timing" =>
val prefix = segments(1).split("_", 2)
val timing = Stats.getTimingStats(false)(prefix(1))
val x = prefix(0) match {
case "min" => timing.minimum
case "max" => timing.maximum
case "count" => timing.count
case "average" => timing.average
}
x.asInstanceOf[java.lang.Integer]
case "gauge" =>
Stats.getGaugeStats(false)(segments(1)).asInstanceOf[java.lang.Double]
}
}
def getAttributes(names: Array[String]): jmx.AttributeList = {
val rv = new jmx.AttributeList
for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
rv
}
def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = throw new UnsupportedOperationException
def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
(Stats.getCounterStats.keys.map { name =>
List(new jmx.MBeanAttributeInfo("counter_" + name, "java.lang.Long", "counter", true, false, false))
} ++ Stats.getTimingStats(false).keys.map { name =>
List("min", "max", "average", "count") map { prefix =>
new jmx.MBeanAttributeInfo("timing_" + prefix + "_" + name, "java.lang.Integer", "timing", true, false, false)
}
} ++ Stats.getGaugeStats(false).keys.map { name =>
List(new jmx.MBeanAttributeInfo("gauge_" + name, "java.lang.Long", "gauge", true, false, false))
}).toList.flatten[jmx.MBeanAttributeInfo].toArray
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadPoolMBean(threadPool: ThreadPoolExecutor) extends jmx.DynamicMBean {
val operations: Array[jmx.MBeanOperationInfo] = Array(
new jmx.MBeanOperationInfo("purge", "",
Array(), "void", jmx.MBeanOperationInfo.ACTION),
new jmx.MBeanOperationInfo("shutdown", "",
Array(), "void", jmx.MBeanOperationInfo.ACTION),
new jmx.MBeanOperationInfo("setCorePoolSize", "",
Array(new jmx.MBeanParameterInfo("corePoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION),
new jmx.MBeanOperationInfo("setMaximumPoolSize", "",
Array(new jmx.MBeanParameterInfo("maximumPoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION),
)
def getMBeanInfo = new jmx.MBeanInfo(
"se.scalablesolutions.akka.kernel.management.ThreadPoolMBean",
"runtime management",
getAttributeInfo,
null, operations, null,
new jmx.ImmutableDescriptor("immutableInfo=false"))
def getAttribute(name: String): AnyRef = name match {
case "getActiveCount" => threadPool.getActiveCount.asInstanceOf[AnyRef]
case "getCompletedTaskCount" => threadPool.getCompletedTaskCount.asInstanceOf[AnyRef]
case "getCorePoolSize" => threadPool.getCorePoolSize.asInstanceOf[AnyRef]
case "getLargestPoolSize" => threadPool.getLargestPoolSize.asInstanceOf[AnyRef]
case "getMaximumPoolSize" => threadPool.getMaximumPoolSize.asInstanceOf[AnyRef]
case "getPoolSize" => threadPool.getPoolSize.asInstanceOf[AnyRef]
case "getTaskCount" => threadPool.getTaskCount.asInstanceOf[AnyRef]
}
private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
Array(
new jmx.MBeanAttributeInfo("getCorePoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getMaximumPoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getActiveCount", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getCompletedTaskCount", "java.lang.Long", "", true, false, false),
new jmx.MBeanAttributeInfo("getLargestPoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getPoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getTaskCount", "java.lang.Long", "", true, false, false))
}
def getAttributes(names: Array[String]): jmx.AttributeList = {
val rv = new jmx.AttributeList
for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
rv
}
def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = {
try {
actionName match {
case "purge" => threadPool.purge
case "shutdown" => threadPool.shutdown
case "setCorePoolSize" =>
params match {
case Array(corePoolSize: java.lang.Integer) => threadPool.setCorePoolSize(corePoolSize.intValue)
case _ => throw new Exception("Bad signature " + params.toList.toString)
}
case "setMaximumPoolSize" =>
params match {
case Array(maximumPoolSize: java.lang.Integer) => threadPool.setMaximumPoolSize(maximumPoolSize.intValue)
case _ => throw new Exception("Bad signature " + params.toList.toString)
}
}
} catch { case e: Exception => throw new jmx.MBeanException(e) }
"Success"
}
def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
}

View file

@ -0,0 +1,96 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.management
import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
import se.scalablesolutions.akka.kernel.util.Logging
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes}
import javax.management._
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import java.util.concurrent.ConcurrentHashMap
/**
* REST interface to Akka's JMX service.
* <p/>
* Here is an example that retreives the current number of Actors.
* <pre>
* http://localhost:9998/jmx
* ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
* &component=se.scalablesolutions.akka:type=Stats
* &attribute=counter_NrOfActors
* </pre>
*/
@Path("/jmx")
class RestfulJMX extends Actor with Logging {
private case class Request(service: String, component: String, attribute: String)
private val connectors = new ConcurrentHashMap[String, JMXConnector]
@GET
@Produces(Array("text/plain"))
def queryJMX(
@QueryParam("service") service: String,
@QueryParam("component") component: String,
@QueryParam("attribute") attribute: String): String=
(this !! Request(service, component, attribute)).getOrElse("Error in REST JMX management service")
override def receive: PartialFunction[Any, Unit] = {
case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute))
}
private def retrieveAttribute(service: String, component: String, attribute: String): String = {
try {
var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service)))
connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString
} catch {
case e: Exception =>
if (connectors.contains(service)) connectors.remove(service)
throw e
}
}
}
/**
* REST interface to Akka's statistics recorder.
* <p/>
* Here is an example that retreives a statistics report.
* <pre>
* http://localhost:9998/stats?reset=true
* </pre>
*/
@Path("/stats")
class StatisticsReporter extends Actor with Logging {
private case class Stats(reset: Boolean)
@GET
@Produces(Array("text/html"))
def stats(@QueryParam("reset") reset: String): scala.xml.Elem =
(this !! Stats(java.lang.Boolean.valueOf(reset).booleanValue)).getOrElse(<h3>Error in REST JMX management service</h3>)
override def receive: PartialFunction[Any, Unit] = {
case Stats(reset) => reply(<pre>{Management.getStats(reset)}</pre>)
}
}
class RestfulJMXBoot extends Logging {
log.info("Booting Restful JMX servivce")
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
Supervise(
new RestfulJMX,
LifeCycle(Permanent, 100)) ::
Supervise(
new StatisticsReporter,
LifeCycle(Permanent, 100)) ::
Nil)
}
}
factory.newSupervisor.startSupervisor
}

View file

@ -0,0 +1,171 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.management
import javax.management._
import java.lang.management._
/*
object ScalaJMX {
val mbeanServer = ManagementFactory.getPlatformMBeanServer
def register(t: AnyRef, i: Class, name: ObjectName) = mbeanServer.registerMBean(new StandardMBean(t, i), name)
def registerBean(bean: DynamicMBean, name: ObjectName): ObjectInstance = mbeanServer.registerMBean(bean, name)
def register(t: AnyRef, name: String): ObjectInstance = register(t, beanClass(t), name)
def info(name: ObjectName): SBean = mbeanServer.getMBeanInfo(name)
def bean(name: ObjectName): SBeanInfo = convBeanInfo(name, mbeanServer.getMBeanInfo(name))
def invoke(name: ObjectName, operationName: String, params: Array[Object], signature: Array[String]): Object =
mbeanServer.invoke(name, operationName, params, signature)
def call(name: ObjectName, operationName: String): Object = invoke(name, operationName, Array[Object](), Array[String]())
def get(name: ObjectName, attribute: String) = mbeanServer.getAttribute(name, attribute)
def set(name: ObjectName, attribute: String, value: Object) = mbeanServer.setAttribute(name, new Attribute(attribute, value))
implicit def instanceToName(oi: ObjectInstance) = oi.getObjectName()
implicit def stringToName(name: String) = ObjectName.getInstance(name)
implicit def convBean(bi: MBeanInfo):SBean = SBean(bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
implicit def seqToArr(seq: Seq[AnyRef]): Array[Object] = seq.toArray
def convBeanInfo(name: ObjectName, bi: MBeanInfo):SBeanInfo = new SBeanInfo(name, bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
implicit def convAttrs(attrs: Array[MBeanAttributeInfo]): Seq[SAttr] =
for (val a <- attrs) yield a
implicit def convParams(params: Array[MBeanParameterInfo]): Seq[SParameter] =
for (val p <- params) yield p
implicit def convNotes(notes: Array[MBeanNotificationInfo]): Seq[SNotification] =
for (val p <- notes) yield p
implicit def convCons(cons: Array[MBeanConstructorInfo]): Seq[SConstructor] =
for (val p <- cons) yield p
implicit def convOps(cons: Array[MBeanOperationInfo]): Seq[SOperation] =
for (val p <- cons) yield p
implicit def convAttr(attr: MBeanAttributeInfo) = SAttr(attr.getName(), attr.getDescription(), attr.getType(), attr.isIs(), attr.isReadable(), attr.isWritable())
implicit def convNote(note: MBeanNotificationInfo) = SNotification(note.getName(), note.getDescription(), note.getNotifTypes())
implicit def convOp(op: MBeanOperationInfo):SOperation = SOperation(op.getName(), op.getDescription(), op.getImpact(), op.getReturnType(), op.getSignature())
implicit def convCon(con: MBeanConstructorInfo):SConstructor = SConstructor(con getName, con getDescription, con getSignature)
implicit def convParam(p: MBeanParameterInfo) = SParameter(p getName, p getDescription, p getType)
private def beanClass(t: AnyRef) = Class.forName(t.getClass().getName() + "MBean")
}
class MBean(mbeanInterface: String) extends StandardMBean(Class.forName(mbeanInterface))
abstract class SFeature(val name: String, val description: String)
case class SBean(className: String, description: String,
attrs: Seq[SAttr], notes: Seq[SNotification],
ops: Seq[SOperation], cons: Seq[SConstructor]) {
def writable = attrs.toList.filter(sa => sa.writable)
}
class SBeanInfo(name: ObjectName, className: String, description: String,
attrs: Seq[SAttr], notes: Seq[SNotification],
ops: Seq[SOperation], cons: Seq[SConstructor])
extends SBean(className, description, attrs, notes, ops, cons) {
def get(attribute: String) = SJMX.get(name, attribute)
def set(attribute: String, value: Object) = SJMX.set(name, attribute, value)
def call(opName: String) = SJMX.call(name, opName)
}
case class SAttr(
override val name: String,
override val description: String,
jmxType: String, isIs: boolean, readable: boolean, writable: boolean
) extends SFeature(name, description)
case class SNotification(
override val name: String,
override val description: String,
notifTypes: Array[String]) extends SFeature(name, description)
case class SOperation(
override val name: String,
override val description: String,
impact: int,
returnType: String,
signature: Seq[SParameter]) extends SFeature(name, description)
case class SParameter(
override val name: String,
override val description: String,
jmxType: String) extends SFeature(name, description)
case class SConstructor(
override val name: String,
override val description: String,
signature: Seq[SParameter]) extends SFeature(name, description)
*/
/*
package com.soletta.spipe;
import javax.management.{StandardMBean,ObjectName,MBeanInfo};
class SPipe extends MBean("com.soletta.spipe.SPipeMBean") with SPipeMBean {
import Console.println;
import SJMX._;
private var desc: String = "Yipe!";
def go = {
val oname: ObjectName = "default:name=SPipe";
val instance = SJMX.registerBean(this, oname);
set(oname, "Factor", "Hello!");
println(get(oname, "Factor"));
val SBean(n, d, Seq(_, a2, a3, _*), _, ops, _) = info(oname);
println("Bean name is " + n + ", description is " + d);
println("Second attribute is " + a2);
println("Third attribute is " + a3);
println("Writable attributes are " + info(oname).writable);
println("Ops: " + ops);
val x =
<bean name={n} description={d}>
{ops.toList.map(o => <operation name={o.name} description={o.description}/>)}
</bean> ;
println(x);
val inf = bean(oname);
inf.call("start");
println(inf.get("Factor"));
}
def getName = "SPipe!";
def setDescription(d: String) = desc = d;
override def getDescription() = desc;
def getFactor = desc;
def setFactor(s: String) = desc = s;
def isHappy = true;
override def getDescription(info: MBeanInfo) = desc;
}
object PipeMain {
def main(args: Array[String]): unit = {
(new SPipe) go;
}
}
trait SPipeMBean {
def getName: String;
def getDescription: String = getName;
def setDescription(d: String): unit;
def getFactor: String;
def setFactor(s: String): unit;
def isHappy: boolean;
def start() = { Console.println("Starting"); }
def stop() = { }
*/

View file

@ -12,6 +12,7 @@ import kernel.actor.{Exit, Actor}
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
import serialization.{Serializer, Serializable, SerializationProtocol}
import kernel.util.Logging
import kernel.management.Management
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
@ -21,6 +22,8 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import scala.collection.mutable.HashMap
import com.twitter.service.Stats
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -44,6 +47,10 @@ object RemoteClient extends Logging {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname
val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
@volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val supervisors = new ConcurrentHashMap[String, Actor]
@ -55,7 +62,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val bootstrap = new ClientBootstrap(channelFactory)
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(futures, supervisors))
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@ -84,6 +91,10 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
if (Management.RECORD_STATS) {
NR_OF_BYTES_SENT.incr(request.getSerializedSize)
NR_OF_MESSAGES_SENT.incr
}
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
@ -111,15 +122,16 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
class RemoteClientPipelineFactory(name: String,
futures: ConcurrentMap[Long, CompletableFutureResult],
supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance));
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("protobufEncoder", new ProtobufEncoder());
p.addLast("handler", new RemoteClientHandler(futures, supervisors))
p.addLast("handler", new RemoteClientHandler(name, futures, supervisors))
p
}
}
@ -128,10 +140,14 @@ class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFuture
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelPipelineCoverage { val value = "all" }
class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult],
class RemoteClientHandler(val name: String,
val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor])
extends SimpleChannelUpstreamHandler with Logging {
val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.debug(event.toString)
@ -144,6 +160,10 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_RECEIVED.incr
NR_OF_BYTES_RECEIVED.incr(reply.getSerializedSize)
}
log.debug("Received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId)
if (reply.getIsSuccessful) {
@ -159,7 +179,7 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
}
future.completeWithException(null, parseException(reply))
}
futures.remove(reply.getId)
futures.remove(reply.getId)
} else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result)
} catch {
case e: Exception =>

View file

@ -13,6 +13,7 @@ import kernel.util._
import protobuf.RemoteProtocol
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import serialization.{Serializer, Serializable, SerializationProtocol}
import kernel.management.Management
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
@ -20,22 +21,28 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import com.twitter.service.Stats
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServer extends Logging {
def start = RemoteServer.start
def start = RemoteServer.start(None)
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object RemoteServer extends Logging {
val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost")
val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999)
val CONNECTION_TIMEOUT_MILLIS = kernel.Kernel.config.getInt("akka.remote.connection-timeout", 1000)
import kernel.Kernel.config
val HOSTNAME = config.getString("akka.remote.hostname", "localhost")
val PORT = config.getInt("akka.remote.port", 9999)
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000)
val name = "RemoteServer@" + HOSTNAME
@volatile private var isRunning = false
@volatile private var isConfigured = false
private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool,
@ -44,18 +51,15 @@ object RemoteServer extends Logging {
private val activeObjectFactory = new ActiveObjectFactory
private val bootstrap = new ServerBootstrap(factory)
// FIXME provide different codecs (Thrift, Avro, Protobuf, JSON)
private val handler = new RemoteServerHandler
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory)
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
def start = synchronized {
def start(loader: Option[ClassLoader]) = synchronized {
if (!isRunning) {
log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT)
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader))
bootstrap.setOption("child.tcpNoDelay", true)
bootstrap.setOption("child.keepAlive", true)
bootstrap.setOption("child.reuseAddress", true)
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
isRunning = true
}
@ -65,14 +69,14 @@ object RemoteServer extends Logging {
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class RemoteServerPipelineFactory extends ChannelPipelineFactory {
class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteRequest.getDefaultInstance))
p.addLast("frameEncoder", new LengthFieldPrepender(4))
p.addLast("protobufEncoder", new ProtobufEncoder)
p.addLast("handler", new RemoteServerHandler)
p.addLast("handler", new RemoteServerHandler(name, loader))
p
}
}
@ -81,7 +85,12 @@ class RemoteServerPipelineFactory extends ChannelPipelineFactory {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ChannelPipelineCoverage { val value = "all" }
class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
@ -106,6 +115,10 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_RECEIVED.incr
NR_OF_BYTES_RECEIVED.incr(request.getSerializedSize)
}
log.debug("Received RemoteRequest[\n%s]", request.toString)
if (request.getIsActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
@ -128,7 +141,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setIsActor(true)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
} catch {
case e: Throwable =>
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
@ -139,7 +157,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setIsSuccessful(false)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
}
}
}
@ -165,7 +188,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setIsActor(false)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
}
} catch {
case e: InvocationTargetException =>
@ -176,8 +204,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
case e: Throwable =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
e.printStackTrace
@ -186,8 +219,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
channel.write(replyBuilder.build)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
}
}
@ -223,8 +261,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
private def createActiveObject(name: String, timeout: Long): AnyRef = {
val activeObjectOrNull = activeObjects.get(name)
if (activeObjectOrNull == null) {
val clazz = Class.forName(name)
try {
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
@ -240,8 +279,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
private def createActor(name: String, timeout: Long): Actor = {
val actorOrNull = actors.get(name)
if (actorOrNull == null) {
val clazz = Class.forName(name)
try {
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
else Class.forName(name)
val newInstance = clazz.newInstance.asInstanceOf[Actor]
newInstance.timeout = timeout
actors.put(name, newInstance)

View file

@ -49,17 +49,17 @@ class DispatcherFactory {
* Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
* Has a fluent builder interface for configuring its semantics.
*/
def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
def newConcurrentEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher(true)
def newEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name)
def newConcurrentEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name, true)
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
*/
def newEventBasedSingleThreadDispatcher = new EventBasedSingleThreadDispatcher
def newEventBasedSingleThreadDispatcher(name: String) = new EventBasedSingleThreadDispatcher(name)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
}
}

View file

@ -10,9 +10,14 @@
*/
package se.scalablesolutions.akka.kernel.reactor
import kernel.management.Management
import java.util.{LinkedList, Queue, List}
class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
import com.twitter.service.Stats
class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBase(name) {
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessage_" + name)
def start = if (!active) {
active = true
val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue)
@ -22,12 +27,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations.iterator
while (selectedInvocations.hasNext) {
val invocation = selectedInvocations.next
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(selectedInvocations.size)
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next
val invoker = messageHandlers.get(invocation.sender)
if (invoker != null) invoker.invoke(invocation)
selectedInvocations.remove
iter.remove
}
}
}

View file

@ -4,12 +4,16 @@
package se.scalablesolutions.akka.kernel.reactor
import kernel.management.{Management, ThreadPoolMBean}
import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import java.util.{Collection, HashSet, HashMap, LinkedList, List}
import com.twitter.service.Stats
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
* See also this article: [http://today.java.net/cs/user/print/a/350].
@ -56,16 +60,17 @@ import java.util.{Collection, HashSet, HashMap, LinkedList, List}
*
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extends MessageDispatcherBase {
def this() = this(false)
class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: Boolean) extends MessageDispatcherBase(name) {
def this(name: String) = this(name, false)
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private var inProcessOfBuilding = false
private var executor: ExecutorService = _
private var threadPoolBuilder: ThreadPoolExecutor = _
private val threadFactory = new MonitorableThreadFactory("akka")
private val threadFactory = new MonitorableThreadFactory("akka:" + name)
private var boundedExecutorBound = -1
private val busyInvokers = new HashSet[AnyRef]
@ -74,6 +79,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
def start = if (!active) {
active = true
Management.registerMBean(new ThreadPoolMBean(threadPoolBuilder), "ThreadPool_" + name)
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
@ -89,6 +95,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
val reservedInvocations = reserve(selectedInvocations)
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(reservedInvocations.size)
val it = reservedInvocations.entrySet.iterator
while (it.hasNext) {
val entry = it.next
@ -157,6 +164,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
ensureNotActive
verifyNotInConstructionPhase
inProcessOfBuilding = false
blockingQueue = queue
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this
}
@ -169,7 +177,8 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory)
blockingQueue = new LinkedBlockingQueue[Runnable]
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
boundedExecutorBound = bound
this
}
@ -177,28 +186,32 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable](capacity), threadFactory, new CallerRunsPolicy)
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory, new CallerRunsPolicy)
blockingQueue = new LinkedBlockingQueue[Runnable]
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy)
blockingQueue = new SynchronousQueue[Runnable](fair)
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy)
blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair)
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
@ -311,13 +324,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
/*
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
*/
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
/**

View file

@ -4,20 +4,31 @@
package se.scalablesolutions.akka.kernel.reactor
import kernel.management.Management
import java.util.{LinkedList, Queue, List}
import java.util.concurrent.TimeUnit
import java.util.concurrent.{TimeUnit, BlockingQueue}
import java.util.HashMap
trait MessageDispatcherBase extends MessageDispatcher {
import com.twitter.service.Stats
abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher {
//val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
val MILLISECONDS = TimeUnit.MILLISECONDS
val queue = new ReactiveMessageQueue
val queue = new ReactiveMessageQueue(name)
var blockingQueue: BlockingQueue[Runnable] = _
@volatile protected var active: Boolean = false
protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
protected var selectorThread: Thread = _
protected val guard = new Object
if (Management.RECORD_STATS) {
Stats.makeGauge("SizeOfBlockingQueue_" + name) {
guard.synchronized { blockingQueue.size.toDouble }
}
}
def messageQueue = queue
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
@ -40,10 +51,16 @@ trait MessageDispatcherBase extends MessageDispatcher {
protected def doShutdown = {}
}
class ReactiveMessageQueue extends MessageQueue {
class ReactiveMessageQueue(name: String) extends MessageQueue {
private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
@volatile private var interrupted = false
if (Management.RECORD_STATS) {
Stats.makeGauge("SizeOfReactiveQueue_" + name) {
queue.synchronized { queue.size.toDouble }
}
}
def append(handle: MessageInvocation) = queue.synchronized {
queue.offer(handle)
queue.notifyAll
@ -64,4 +81,4 @@ class ReactiveMessageQueue extends MessageQueue {
interrupted = true
queue.notifyAll
}
}
}

View file

@ -4,18 +4,24 @@
package se.scalablesolutions.akka.kernel.reactor
import com.twitter.service.Stats
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import kernel.actor.{Actor, ActorMessageInvoker}
import kernel.management.Management
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) extends MessageDispatcher {
def this(actor: Actor) = this(new ActorMessageInvoker(actor))
class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher {
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor))
private val queue = new BlockingMessageQueue
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@ -27,6 +33,7 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker)
override def run = {
while (active) {
try {
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr
messageHandler.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
@ -44,7 +51,13 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker)
def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException
}
class BlockingMessageQueue extends MessageQueue {
class BlockingMessageQueue(name: String) extends MessageQueue {
if (Management.RECORD_STATS) {
Stats.makeGauge("SizeOfBlockingQueue_" + name) {
queue.size.toDouble
}
}
// FIXME: configure the LBQ
private val queue = new LinkedBlockingQueue[MessageInvocation]
def append(handle: MessageInvocation) = queue.put(handle)
@ -52,4 +65,4 @@ class BlockingMessageQueue extends MessageQueue {
def take: MessageInvocation = queue.take
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
def interrupt = throw new UnsupportedOperationException
}
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.jersey
package se.scalablesolutions.akka.kernel.rest
import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.jersey
package se.scalablesolutions.akka.kernel.rest
import kernel.Kernel
import util.Logging
@ -19,7 +19,6 @@ extends IoCComponentProviderFactory with Logging {
override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = {
//log.info("ProviderFactory: resolve => " + clazz.getName)
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
}
}
}

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.jersey
package se.scalablesolutions.akka.kernel.rest
import kernel.Kernel
import config.ConfiguratorRepository

View file

@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.jersey
package se.scalablesolutions.akka.kernel.rest
import java.io.OutputStream
import java.lang.annotation.Annotation

View file

@ -55,7 +55,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val key = "key"
val dispatcher = new EventBasedSingleThreadDispatcher
val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
@ -69,7 +69,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(2)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedSingleThreadDispatcher
val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start
@ -83,7 +83,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = new EventBasedSingleThreadDispatcher
val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {

View file

@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)

View file

@ -1,91 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel
import akka.kernel.config.ActiveObjectGuiceConfigurator
import kernel.config.ScalaConfig._
import com.sun.grizzly.http.SelectorThread
import com.sun.jersey.api.client.Client
import com.sun.jersey.core.header.MediaTypes
import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
import javax.ws.rs.core.UriBuilder
import javax.ws.rs.{Produces, Path, GET}
import com.google.inject.{AbstractModule, Scopes}
import org.scalatest.Spec
import org.scalatest.matchers.ShouldMatchers
//simport com.jteigen.scalatest.JUnit4Runner
import org.junit.runner.RunWith
import org.junit.Test
import org.junit.Assert._
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
//@RunWith(classOf[JUnit4Runner])
class JerseySpec extends Spec with ShouldMatchers {
describe("A Jersey REST service") {
it("should ...") {
/*
val selector = startJersey
selector.start
val conf = new ActiveObjectGuiceConfigurator
conf.configure(
RestartStrategy(AllForOne, 3, 5000),
Component(
classOf[resource.JerseyFoo],
LifeCycle(Permanent, 1000),
1000) ::
Nil).supervise
conf.getInstance(classOf[resource.JerseyFoo])
*/
/*
val client = Client.create
val webResource = client.resource(UriBuilder.fromUri("http://localhost/").port(9998).build)
//val webResource = client.resource("http://localhost:9998/foo")
val responseMsg = webResource.get(classOf[String])
responseMsg should equal ("Hello World")
selector.stopEndpoint
*/
}
}
def startJersey: SelectorThread = {
val initParams = new java.util.HashMap[String, String]
initParams.put("com.sun.jersey.config.property.packages", "se.scalablesolutions.akka.kernel")
GrizzlyWebContainerFactory.create(UriBuilder.fromUri("http://localhost/").port(9998).build(), initParams)
}
}
// @GET
// @Produces("application/json")
// @Path("/network/{id: [0-9]+}/{nid}")
// def getUserByNetworkId(@PathParam {val value = "id"} id: Int, @PathParam {val value = "nid"} networkId: String): User = {
// val q = em.createQuery("SELECT u FROM User u WHERE u.networkId = :id AND u.networkUserId = :nid")
// q.setParameter("id", id)
// q.setParameter("nid", networkId)
// q.getSingleResult.asInstanceOf[User]
// }
package resource {
import javax.ws.rs.{Produces, Path, GET}
class JerseyFoo {
@GET
@Produces(Array("application/json"))
def foo: String = { val ret = "JerseyFoo.foo"; println(ret); ret }
}
@Path("/foo")
class JerseyFooSub extends JerseyFoo
class JerseyBar {
def bar(msg: String) = msg + "return_bar "
}
}

View file

@ -26,7 +26,6 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
class RemoteActorSpec extends TestCase {
kernel.Kernel.config
new Thread(new Runnable() {
def run = {

View file

@ -49,7 +49,7 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch))
val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None))
@ -60,7 +60,7 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(100)
val dispatcher = new ThreadBasedDispatcher(new MessageInvoker {
val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Some files were not shown because too many files have changed in this diff Show more