+
diff --git a/embedded-repo/com/twitter/scala-json/1.0/scala-json-1.0.pom b/embedded-repo/com/twitter/scala-json/1.0/scala-json-1.0.pom
index dd4add411e..ce3784907e 100644
--- a/embedded-repo/com/twitter/scala-json/1.0/scala-json-1.0.pom
+++ b/embedded-repo/com/twitter/scala-json/1.0/scala-json-1.0.pom
@@ -3,6 +3,6 @@
4.0.0
com.twitter
scala-json
- 0.1
+ 1.0
jar
\ No newline at end of file
diff --git a/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.jar b/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.jar
new file mode 100755
index 0000000000..6b1b43bf7b
Binary files /dev/null and b/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.jar differ
diff --git a/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.pom b/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.pom
new file mode 100755
index 0000000000..d35560379a
--- /dev/null
+++ b/embedded-repo/com/twitter/scala-stats/1.0/scala-stats-1.0.pom
@@ -0,0 +1,8 @@
+
+
+ 4.0.0
+ com.twitter
+ scala-stats
+ 1.0
+ jar
+
\ No newline at end of file
diff --git a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-trunk/cassandra-0.4.0-trunk.jar
old mode 100644
new mode 100755
similarity index 100%
rename from embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.jar
rename to embedded-repo/org/apache/cassandra/cassandra/0.4.0-trunk/cassandra-0.4.0-trunk.jar
diff --git a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.pom b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-trunk/cassandra-0.4.0-trunk.pom
old mode 100644
new mode 100755
similarity index 90%
rename from embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.pom
rename to embedded-repo/org/apache/cassandra/cassandra/0.4.0-trunk/cassandra-0.4.0-trunk.pom
index 8b158c6808..9ef43c31cd
--- a/embedded-repo/org/apache/cassandra/cassandra/0.4.0-dev/cassandra-0.4.0-dev.pom
+++ b/embedded-repo/org/apache/cassandra/cassandra/0.4.0-trunk/cassandra-0.4.0-trunk.pom
@@ -1,8 +1,8 @@
-
-
- 4.0.0
- org.apache.cassandra
- cassandra
- 0.4.0-dev
- jar
+
+
+ 4.0.0
+ org.apache.cassandra
+ cassandra
+ 0.4.0-trunk
+ jar
\ No newline at end of file
diff --git a/embedded-repo/org/apache/zookeeper/3.1.0/zookeeper-3.1.0.pom b/embedded-repo/org/apache/zookeeper/3.1.0/zookeeper-3.1.0.pom
old mode 100644
new mode 100755
diff --git a/embedded-repo/org/codehaus/aspectwerkz/aspectwerkz-jdk5/2.1/aspectwerkz-jdk5-2.1.jar b/embedded-repo/org/codehaus/aspectwerkz/aspectwerkz-jdk5/2.1/aspectwerkz-jdk5-2.1.jar
new file mode 100755
index 0000000000..c258335f67
Binary files /dev/null and b/embedded-repo/org/codehaus/aspectwerkz/aspectwerkz-jdk5/2.1/aspectwerkz-jdk5-2.1.jar differ
diff --git a/embedded-repo/org/codehaus/aspectwerkz/aspectwerkz-jdk5/2.1/aspectwerkz-jdk5-2.1.pom b/embedded-repo/org/codehaus/aspectwerkz/aspectwerkz-jdk5/2.1/aspectwerkz-jdk5-2.1.pom
new file mode 100755
index 0000000000..5324772a47
--- /dev/null
+++ b/embedded-repo/org/codehaus/aspectwerkz/aspectwerkz-jdk5/2.1/aspectwerkz-jdk5-2.1.pom
@@ -0,0 +1,8 @@
+
+
+ 4.0.0
+ org.codehaus.aspectwerkz
+ aspectwerkz-jdk5
+ 2.1
+ jar
+
\ No newline at end of file
diff --git a/fun-test-java/pom.xml b/fun-test-java/pom.xml
index 7e5a4fb729..36a4890ec7 100644
--- a/fun-test-java/pom.xml
+++ b/fun-test-java/pom.xml
@@ -65,7 +65,7 @@
- src/main
+ src/main/java
src/test/java
@@ -84,7 +84,6 @@
maven-surefire-plugin
- **/Abstract*
**/*Persistent*
diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
index d2cc9b7d0a..fdc2a48c98 100644
--- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
+++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java
@@ -21,7 +21,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
protected void setUp() {
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
- EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
+ EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher("name");
dispatcher
.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(16)
diff --git a/kernel/Makefile b/kernel/Makefile
deleted file mode 100755
index 3c8c4cce60..0000000000
--- a/kernel/Makefile
+++ /dev/null
@@ -1,6 +0,0 @@
-# For Unix:
-# mvn -o |sed -e s/\\[WARNING\\][[:space:]]//g |grep -v "Finished at"
-
-install:
- mvn -o compile |sed -e 's/\[INFO\] //g' |sed -e 's/\[WARNING\] //g' |grep -v "Finished at" |grep -v "Total time"
-
diff --git a/kernel/pom.xml b/kernel/pom.xml
index 5b218b3daf..41c3fc05f3 100644
--- a/kernel/pom.xml
+++ b/kernel/pom.xml
@@ -11,7 +11,8 @@
akka
se.scalablesolutions.akka
- 0.5
+ 0.6
+ ../pom.xml
@@ -19,7 +20,7 @@
akka-util-java
se.scalablesolutions.akka
- 0.5
+ 0.6
org.scala-lang
@@ -31,16 +32,21 @@
aspectwerkz-nodeps-jdk5
2.1
+
+ org.codehaus.aspectwerkz
+ aspectwerkz-jdk5
+ 2.1
+
+
+ com.twitter
+ scala-stats
+ 1.0
+
net.lag
configgy
1.3
-
- org.guiceyfruit
- guiceyfruit-core
- 2.0
-
org.guiceyfruit
guice-core
@@ -105,7 +111,7 @@
org.apache.cassandra
cassandra
- 0.4.0-dev
+ 0.4.0-trunk
com.facebook
@@ -117,33 +123,13 @@
fb303
1.0
-
- commons-collections
- commons-collections
- 3.2.1
-
-
- high-scale-lib
- high-scale-lib
- 1.0
-
-
- commons-lang
- commons-lang
- 2.4
-
-
- se.foldleft
- cassidy
- 0.1
-
commons-pool
commons-pool
1.5.1
-
+
com.sun.grizzly
grizzly-comet-webserver
@@ -214,73 +200,40 @@
0.9.5
test
-
junit
junit
4.5
test
-
- com.sun.jersey
- jersey-client
- 1.1.0-ea
-
- src/main/scala
- src/test/scala
- org.scala-tools
- maven-scala-plugin
+ maven-assembly-plugin
+ 2.2-beta-2
+ create-executable-jar
+ install
- compile
- testCompile
+ single
+
+
+
+ jar-with-dependencies
+
+
+
+
+ se.scalablesolutions.akka.kernel.Kernel
+
+
+
-
-
- -target:jvm-1.5
-
-
- 2.7.5
- 1.1
-
-
-
- org.apache.maven.plugins
- maven-eclipse-plugin
-
- true
-
-
- ch.epfl.lamp.sdt.core.scalabuilder
-
-
-
-
- ch.epfl.lamp.sdt.core.scalanature
-
-
-
-
- org.eclipse.jdt.launching.JRE_CONTAINER
-
-
- ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
-
-
-
maven-antrun-plugin
@@ -289,8 +242,8 @@
install
-
+
@@ -303,30 +256,19 @@
false
- src/main/resources
+ ../config
+
+ akka.conf
+ akka-reference.conf
+
false
- src/main/scala
+ src/main/resources
- **
+ META-INF/*
-
- **/*.scala
-
-
-
-
- org.scala-tools
- maven-scala-plugin
-
- 1.1
- ${scala.version}
-
-
-
-
diff --git a/kernel/src/main/resources/META-INF/aop.xml b/kernel/src/main/resources/META-INF/aop.xml
new file mode 100755
index 0000000000..23bb4575ff
--- /dev/null
+++ b/kernel/src/main/resources/META-INF/aop.xml
@@ -0,0 +1,8 @@
+
+
+
+
+
+
+
+
diff --git a/kernel/src/main/resources/aop.xml b/kernel/src/main/resources/aop.xml
deleted file mode 100644
index fe844fa481..0000000000
--- a/kernel/src/main/resources/aop.xml
+++ /dev/null
@@ -1,9 +0,0 @@
-
-
-
-
-
-
-
diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala
old mode 100644
new mode 100755
index 7e07c35a41..7ecfd61141
--- a/kernel/src/main/scala/Kernel.scala
+++ b/kernel/src/main/scala/Kernel.scala
@@ -14,40 +14,44 @@ import java.net.URLClassLoader
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment, ParseException}
-import kernel.jersey.AkkaCometServlet
+import kernel.rest.AkkaCometServlet
import kernel.nio.RemoteServer
import kernel.state.CassandraStorage
import kernel.util.Logging
+import kernel.management.Management
/**
* @author Jonas Bonér
*/
object Kernel extends Logging {
- @volatile private var hasBooted = false
-
+ val VERSION = "0.6"
val HOME = {
val home = System.getenv("AKKA_HOME")
- if (home == null || home == "") None
+ if (home == null) None
else Some(home)
}
val config = setupConfig
+
+ val CONFIG_VERSION = config.getString("akka.version", "0")
+ if (VERSION != CONFIG_VERSION) throw new IllegalStateException("Akka JAR version [" + VERSION + "] is different than the provided config ('akka.conf') version [" + CONFIG_VERSION + "]")
val BOOT_CLASSES = config.getList("akka.boot")
-
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true)
+ val RUN_MANAGEMENT_SERVICE = config.getBool("akka.management.service", true)
val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
-
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
val REST_HOSTNAME = kernel.Kernel.config.getString("akka.rest.hostname", "localhost")
val REST_URL = "http://" + REST_HOSTNAME
val REST_PORT = kernel.Kernel.config.getInt("akka.rest.port", 9998)
// FIXME add API to shut server down gracefully
+ @volatile private var hasBooted = false
private var remoteServer: RemoteServer = _
private var jerseySelectorThread: SelectorThread = _
private val startTime = System.currentTimeMillis
-
+ private var applicationLoader: Option[ClassLoader] = None
+
def main(args: Array[String]) = boot
def boot = synchronized {
@@ -55,21 +59,24 @@ object Kernel extends Logging {
printBanner
log.info("Starting Akka...")
+ runApplicationBootClasses
+
if (RUN_REMOTE_SERVICE) startRemoteService
+ if (RUN_MANAGEMENT_SERVICE) startManagementService
STORAGE_SYSTEM match {
case "cassandra" => startCassandra
case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
+ case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported")
case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
case _ => throw new UnsupportedOperationException("Unknown storage system [" + STORAGE_SYSTEM + "]")
}
- if (RUN_REST_SERVICE) startJersey
-
- runApplicationBootClasses
+ if (RUN_REST_SERVICE) startREST
+ Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
log.info("Akka started successfully")
hasBooted = true
}
@@ -78,54 +85,64 @@ object Kernel extends Logging {
def uptime = (System.currentTimeMillis - startTime) / 1000
def setupConfig: Config = {
- try {
- Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
- log.info("Config loaded from the application classpath.")
- } catch {
- case e: ParseException =>
+ if (HOME.isDefined) {
try {
- if (HOME.isDefined) {
- val configFile = HOME.get + "/config/akka.conf"
- log.info("AKKA_HOME is defined to [%s], loading config from [%s].", HOME.get, configFile)
- Configgy.configure(configFile)
- } else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
+ val configFile = HOME.get + "/config/akka.conf"
+ Configgy.configure(configFile)
+ log.info("AKKA_HOME is defined to [%s], config loaded from [%s].", HOME.get, configFile)
} catch {
- case e: ParseException => throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
+ case e: ParseException => throw new IllegalStateException("'akka.conf' config file can not be found in [" + HOME + "/config/akka.conf] - aborting. Either add it in the 'config' directory or add it to the classpath.")
}
- }
- //val runtime = new RuntimeEnvironment(getClass)
- //runtime.load(args)
+ } else {
+ try {
+ Configgy.configureFromResource("akka.conf", getClass.getClassLoader)
+ log.info("Config loaded from the application classpath.")
+ } catch {
+ case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
+ }
+ }
val config = Configgy.config
- config.registerWithJmx("com.scalablesolutions.akka.config")
+ config.registerWithJmx("com.scalablesolutions.akka")
// FIXME fix Configgy JMX subscription to allow management
// config.subscribe { c => configure(c.getOrElse(new Config)) }
config
}
- private[akka] def runApplicationBootClasses: Unit = {
+ private[akka] def runApplicationBootClasses = {
+ new management.RestfulJMXBoot // add the REST/JMX service
val loader =
- if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) getClass.getClassLoader
- else if (HOME.isDefined) {
+ if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
val DEPLOY = HOME.get + "/deploy"
val DEPLOY_DIR = new File(DEPLOY)
if (!DEPLOY_DIR.exists) { log.error("Could not find a deploy directory at [" + DEPLOY + "]"); System.exit(-1) }
val toDeploy = for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL
+ //val toDeploy = DEPLOY_DIR.toURL :: (for (f <- DEPLOY_DIR.listFiles().toArray.toList.asInstanceOf[List[File]]) yield f.toURL)
log.info("Deploying applications from [%s]: [%s]", DEPLOY, toDeploy.toArray.toList)
new URLClassLoader(toDeploy.toArray, getClass.getClassLoader)
+ } else if (getClass.getClassLoader.getResourceAsStream("akka.conf") != null) {
+ getClass.getClassLoader
} else throw new IllegalStateException("AKKA_HOME is not defined and no 'akka.conf' can be found on the classpath, aborting")
for (clazz <- BOOT_CLASSES) {
log.info("Loading boot class [%s]", clazz)
loader.loadClass(clazz).newInstance
}
+ applicationLoader = Some(loader)
}
private[akka] def startRemoteService = {
// FIXME manage remote serve thread for graceful shutdown
- val remoteServerThread = new Thread(new Runnable() { def run = RemoteServer.start }, "akka remote service")
+ val remoteServerThread = new Thread(new Runnable() {
+ def run = RemoteServer.start(applicationLoader)
+ }, "Akka Remote Service")
remoteServerThread.start
}
+ private[akka] def startManagementService = {
+ Management("se.scalablesolutions.akka.management")
+ log.info("Management service started successfully.")
+ }
+
private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
System.setProperty("cassandra", "")
if (HOME.isDefined) System.setProperty("storage-config", HOME.get + "/config/")
@@ -133,7 +150,7 @@ object Kernel extends Logging {
CassandraStorage.start
}
- private[akka] def startJersey = {
+ private[akka] def startREST = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()
val scheme = uri.getScheme
@@ -169,7 +186,7 @@ object Kernel extends Logging {
(____ /__|_ \__|_ \(____ /
\/ \/ \/ \/
""")
- log.info(" Running version " + config.getString("akka.version", "Awesome"))
+ log.info(" Running version " + VERSION)
log.info("==============================")
}
diff --git a/kernel/src/main/scala/actor/ActiveObject.scala b/kernel/src/main/scala/actor/ActiveObject.scala
index 3dac74252c..cb22fecbc1 100644
--- a/kernel/src/main/scala/actor/ActiveObject.scala
+++ b/kernel/src/main/scala/actor/ActiveObject.scala
@@ -14,9 +14,11 @@ import kernel.config.ScalaConfig._
import kernel.util._
import serialization.Serializer
-import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice}
+import org.codehaus.aspectwerkz.intercept.{Advisable, AroundAdvice, Advice}
import org.codehaus.aspectwerkz.joinpoint.{MethodRtti, JoinPoint}
import org.codehaus.aspectwerkz.proxy.Proxy
+import org.codehaus.aspectwerkz.annotation.{Aspect, Around}
+import org.codehaus.aspectwerkz.aspect.management.Aspects
sealed class ActiveObjectException(msg: String) extends RuntimeException(msg)
class ActiveObjectInvocationTimeoutException(msg: String) extends ActiveObjectException(msg)
@@ -218,24 +220,28 @@ object ActiveObject {
}
private[kernel] def newInstance[T](target: Class[T], actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ //if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(target, false, true)
actor.initialize(target, proxy)
- // FIXME switch to weaving in the aspect at compile time
- proxy.asInstanceOf[Advisable].aw_addAdvice(
- MATCH_ALL, new ActorAroundAdvice(target, proxy, actor, remoteAddress, timeout))
+ actor.timeout = timeout
+ actor.start
+ AspectInitRegistry.register(proxy, AspectInit(target, actor, remoteAddress, timeout))
proxy.asInstanceOf[T]
}
private[kernel] def newInstance[T](intf: Class[T], target: AnyRef, actor: Dispatcher, remoteAddress: Option[InetSocketAddress], timeout: Long): T = {
+ //if (getClass.getClassLoader.getResourceAsStream("META-INF/aop.xml") != null) println("000000000000000000000 FOUND AOP")
if (remoteAddress.isDefined) actor.makeRemote(remoteAddress.get)
val proxy = Proxy.newInstance(Array(intf), Array(target), false, true)
actor.initialize(target.getClass, target)
- proxy.asInstanceOf[Advisable].aw_addAdvice(
- MATCH_ALL, new ActorAroundAdvice(intf, target, actor, remoteAddress, timeout))
+ actor.timeout = timeout
+ actor.start
+ AspectInitRegistry.register(proxy, AspectInit(intf, actor, remoteAddress, timeout))
proxy.asInstanceOf[T]
}
+
private[kernel] def supervise(restartStrategy: RestartStrategy, components: List[Supervise]): Supervisor = {
object factory extends SupervisorFactory {
override def getSupervisorConfig = SupervisorConfig(restartStrategy, components)
@@ -246,20 +252,46 @@ object ActiveObject {
}
}
+object AspectInitRegistry {
+ private val inits = new java.util.concurrent.ConcurrentHashMap[AnyRef, AspectInit]
+ def initFor(target: AnyRef) = {
+ val init = inits.get(target)
+ inits.remove(target)
+ init
+ }
+ def register(target: AnyRef, init: AspectInit) = inits.put(target, init)
+}
+
+sealed case class AspectInit(
+ val target: Class[_],
+ val actor: Dispatcher,
+ val remoteAddress: Option[InetSocketAddress],
+ val timeout: Long)
+
/**
* @author Jonas Bonér
*/
@serializable
-sealed class ActorAroundAdvice(val target: Class[_],
- val targetInstance: AnyRef,
- val actor: Dispatcher,
- val remoteAddress: Option[InetSocketAddress],
- val timeout: Long) extends AroundAdvice {
- val id = target.getName
- actor.timeout = timeout
- actor.start
-
- def invoke(joinpoint: JoinPoint): AnyRef = dispatch(joinpoint)
+@Aspect("perInstance")
+sealed class ActiveObjectAspect {
+ @volatile var isInitialized = false
+ var target: Class[_] = _
+ var actor: Dispatcher = _
+ var remoteAddress: Option[InetSocketAddress] = _
+ var timeout: Long = _
+
+ @Around("execution(* *..*(..))")
+ def invoke(joinpoint: JoinPoint): AnyRef = {
+ if (!isInitialized) {
+ val init = AspectInitRegistry.initFor(joinpoint.getThis)
+ target = init.target
+ actor = init.actor
+ remoteAddress = init.remoteAddress
+ timeout = init.timeout
+ isInitialized = true
+ }
+ dispatch(joinpoint)
+ }
private def dispatch(joinpoint: JoinPoint) = {
if (remoteAddress.isDefined) remoteDispatch(joinpoint)
diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala
index ea4145a4f3..5e21048fd4 100644
--- a/kernel/src/main/scala/actor/Actor.scala
+++ b/kernel/src/main/scala/actor/Actor.scala
@@ -8,14 +8,17 @@ import com.google.protobuf.ByteString
import java.net.InetSocketAddress
import java.util.concurrent.CopyOnWriteArraySet
-import kernel.reactor._
-import kernel.config.ScalaConfig._
-import kernel.stm.TransactionManagement
-import kernel.util.Helpers.ReadWriteLock
-import kernel.nio.protobuf.RemoteProtocol.RemoteRequest
-import kernel.util.Logging
+import reactor._
+import config.ScalaConfig._
+import stm.TransactionManagement
+import util.Helpers.ReadWriteLock
+import nio.protobuf.RemoteProtocol.RemoteRequest
+import util.Logging
import serialization.{Serializer, Serializable, SerializationProtocol}
import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
+import management.Management
+
+import com.twitter.service.Stats
sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage
@@ -42,14 +45,17 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
* @author Jonas Bonér
*/
object Actor {
- val TIMEOUT = kernel.Kernel.config.getInt("akka.actor.timeout", 5000)
- val SERIALIZE_MESSAGES = kernel.Kernel.config.getBool("akka.actor.serialize-messages", false)
+ val TIMEOUT = Kernel.config.getInt("akka.actor.timeout", 5000)
+ val SERIALIZE_MESSAGES = Kernel.config.getBool("akka.actor.serialize-messages", false)
}
/**
* @author Jonas Bonér
*/
-@serializable trait Actor extends Logging with TransactionManagement {
+trait Actor extends Logging with TransactionManagement {
+ Stats.getCounter("NrOfActors").incr
+ ActorRegistry.register(this)
+
@volatile private[this] var isRunning: Boolean = false
private[this] val remoteFlagLock = new ReadWriteLock
private[this] val transactionalFlagLock = new ReadWriteLock
@@ -64,6 +70,8 @@ object Actor {
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
+ val name = this.getClass.getName
+
// ====================================
// ==== USER CALLBACKS TO OVERRIDE ====
// ====================================
@@ -96,7 +104,7 @@ object Actor {
*
*/
protected[kernel] var dispatcher: MessageDispatcher = {
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
mailbox = dispatcher.messageQueue
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
dispatcher
@@ -529,6 +537,8 @@ object Actor {
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
+ if (Management.RECORD_STATS) Stats.getCounter("NrOfFailures_" + dead.name).incr
+
if (trapExit) {
if (faultHandler.isDefined) {
faultHandler.get match {
@@ -546,6 +556,7 @@ object Actor {
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
private[Actor] def restart(reason: AnyRef) = synchronized {
+ if (Management.RECORD_STATS) Stats.getCounter("NrOfRestarts_" + name).incr
lifeCycleConfig match {
case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.")
diff --git a/kernel/src/main/scala/actor/ActorRegistry.scala b/kernel/src/main/scala/actor/ActorRegistry.scala
new file mode 100755
index 0000000000..3f4275cf9e
--- /dev/null
+++ b/kernel/src/main/scala/actor/ActorRegistry.scala
@@ -0,0 +1,33 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.actor
+
+import kernel.util.Logging
+
+import scala.collection.jcl.HashMap
+
+/**
+ * Registry holding all actor instances, mapped by class..
+ *
+ * @author Jonas Bonér
+ */
+object ActorRegistry extends Logging {
+ private val actors = new HashMap[String, List[Actor]]
+
+ def actorsFor(clazz: Class[_]): List[Actor] = synchronized {
+ actors.get(clazz.getName) match {
+ case None => Nil
+ case Some(instances) => instances
+ }
+ }
+
+ def register(actor: Actor) = synchronized {
+ val name = actor.getClass.getName
+ actors.get(name) match {
+ case Some(instances) => actors + (name -> (actor :: instances))
+ case None => actors + (name -> (actor :: Nil))
+ }
+ }
+}
diff --git a/kernel/src/main/scala/management/JMX.scala b/kernel/src/main/scala/management/JMX.scala
new file mode 100755
index 0000000000..bdeea75324
--- /dev/null
+++ b/kernel/src/main/scala/management/JMX.scala
@@ -0,0 +1,187 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.management
+
+import com.twitter.service.Stats
+
+import scala.collection.jcl
+import scala.collection.mutable.ArrayBuffer
+
+import java.util.concurrent.ThreadPoolExecutor
+import java.lang.management.ManagementFactory
+import javax.{management => jmx}
+import javax.management.remote.{JMXConnectorServerFactory, JMXServiceURL}
+
+import kernel.Kernel.config
+import kernel.util.Logging
+
+/**
+ * @author Jonas Bonér
+ */
+object Management extends Logging {
+ val RECORD_STATS = config.getBool("akka.management.record-stats", true)
+ private var name = "se.scalablesolutions.akka"
+ private val mbeanServer = ManagementFactory.getPlatformMBeanServer
+
+ def apply() = {}
+ def apply(packageName: String) = name = packageName
+
+ java.rmi.registry.LocateRegistry.createRegistry(1099)
+ JMXConnectorServerFactory.newJMXConnectorServer(
+ new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"),
+ null,
+ mbeanServer).start
+
+ registerMBean(new StatisticsMBean, "Stats")
+
+ def registerMBean(mbean: jmx.DynamicMBean, mbeanType: String) = {
+ val objectName = new jmx.ObjectName(name + ":type=" + mbeanType)
+ try { mbeanServer.getMBeanInfo(objectName) } catch {
+ case e: jmx.InstanceNotFoundException =>
+ mbeanServer.registerMBean(mbean, objectName)
+ }
+ }
+
+ def getStats(reset: Boolean) = {
+ var statistics = new ArrayBuffer[Tuple2[String, String]]
+ statistics += (("current time", (System.currentTimeMillis / 1000).toString))
+ statistics += (("akka version", Kernel.VERSION))
+ statistics += (("uptime", Kernel.uptime.toString))
+ for ((key, value) <- Stats.getJvmStats) statistics += (key, value.toString)
+ for ((key, value) <- Stats.getCounterStats) statistics += (key, value.toString)
+ for ((key, value) <- Stats.getTimingStats(reset)) statistics += (key, value.toString)
+ for ((key, value) <- Stats.getGaugeStats(reset)) statistics += (key, value.toString)
+ val report = {for ((key, value) <- statistics) yield "STAT %s %s".format(key, value)}.mkString("", "\r\n", "\r\n")
+ log.info("=========================================\n\t--- Statistics Report ---\n%s=========================================", report)
+ report
+ }
+}
+
+/**
+ * @author Jonas Bonér
+ */
+class StatisticsMBean extends jmx.DynamicMBean {
+ def getMBeanInfo = new jmx.MBeanInfo(
+ "se.scalablesolutions.akka.kernel.management.StatisticsMBean",
+ "runtime statistics",
+ getAttributeInfo,
+ null, null, null,
+ new jmx.ImmutableDescriptor("immutableInfo=false"))
+
+ def getAttribute(name: String): AnyRef = {
+ val segments = name.split("_", 2)
+ segments(0) match {
+ case "counter" =>
+ Stats.getCounterStats()(segments(1)).asInstanceOf[java.lang.Long]
+ case "timing" =>
+ val prefix = segments(1).split("_", 2)
+ val timing = Stats.getTimingStats(false)(prefix(1))
+ val x = prefix(0) match {
+ case "min" => timing.minimum
+ case "max" => timing.maximum
+ case "count" => timing.count
+ case "average" => timing.average
+ }
+ x.asInstanceOf[java.lang.Integer]
+ case "gauge" =>
+ Stats.getGaugeStats(false)(segments(1)).asInstanceOf[java.lang.Double]
+ }
+ }
+
+ def getAttributes(names: Array[String]): jmx.AttributeList = {
+ val rv = new jmx.AttributeList
+ for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
+ rv
+ }
+
+ def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = throw new UnsupportedOperationException
+ def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
+ def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
+
+ private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
+ (Stats.getCounterStats.keys.map { name =>
+ List(new jmx.MBeanAttributeInfo("counter_" + name, "java.lang.Long", "counter", true, false, false))
+ } ++ Stats.getTimingStats(false).keys.map { name =>
+ List("min", "max", "average", "count") map { prefix =>
+ new jmx.MBeanAttributeInfo("timing_" + prefix + "_" + name, "java.lang.Integer", "timing", true, false, false)
+ }
+ } ++ Stats.getGaugeStats(false).keys.map { name =>
+ List(new jmx.MBeanAttributeInfo("gauge_" + name, "java.lang.Long", "gauge", true, false, false))
+ }).toList.flatten[jmx.MBeanAttributeInfo].toArray
+ }
+}
+
+/**
+ * @author Jonas Bonér
+ */
+class ThreadPoolMBean(threadPool: ThreadPoolExecutor) extends jmx.DynamicMBean {
+ val operations: Array[jmx.MBeanOperationInfo] = Array(
+ new jmx.MBeanOperationInfo("purge", "",
+ Array(), "void", jmx.MBeanOperationInfo.ACTION),
+ new jmx.MBeanOperationInfo("shutdown", "",
+ Array(), "void", jmx.MBeanOperationInfo.ACTION),
+ new jmx.MBeanOperationInfo("setCorePoolSize", "",
+ Array(new jmx.MBeanParameterInfo("corePoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION),
+ new jmx.MBeanOperationInfo("setMaximumPoolSize", "",
+ Array(new jmx.MBeanParameterInfo("maximumPoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION),
+ )
+
+ def getMBeanInfo = new jmx.MBeanInfo(
+ "se.scalablesolutions.akka.kernel.management.ThreadPoolMBean",
+ "runtime management",
+ getAttributeInfo,
+ null, operations, null,
+ new jmx.ImmutableDescriptor("immutableInfo=false"))
+
+ def getAttribute(name: String): AnyRef = name match {
+ case "getActiveCount" => threadPool.getActiveCount.asInstanceOf[AnyRef]
+ case "getCompletedTaskCount" => threadPool.getCompletedTaskCount.asInstanceOf[AnyRef]
+ case "getCorePoolSize" => threadPool.getCorePoolSize.asInstanceOf[AnyRef]
+ case "getLargestPoolSize" => threadPool.getLargestPoolSize.asInstanceOf[AnyRef]
+ case "getMaximumPoolSize" => threadPool.getMaximumPoolSize.asInstanceOf[AnyRef]
+ case "getPoolSize" => threadPool.getPoolSize.asInstanceOf[AnyRef]
+ case "getTaskCount" => threadPool.getTaskCount.asInstanceOf[AnyRef]
+ }
+
+ private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
+ Array(
+ new jmx.MBeanAttributeInfo("getCorePoolSize", "java.lang.Int", "", true, false, false),
+ new jmx.MBeanAttributeInfo("getMaximumPoolSize", "java.lang.Int", "", true, false, false),
+ new jmx.MBeanAttributeInfo("getActiveCount", "java.lang.Int", "", true, false, false),
+ new jmx.MBeanAttributeInfo("getCompletedTaskCount", "java.lang.Long", "", true, false, false),
+ new jmx.MBeanAttributeInfo("getLargestPoolSize", "java.lang.Int", "", true, false, false),
+ new jmx.MBeanAttributeInfo("getPoolSize", "java.lang.Int", "", true, false, false),
+ new jmx.MBeanAttributeInfo("getTaskCount", "java.lang.Long", "", true, false, false))
+ }
+
+ def getAttributes(names: Array[String]): jmx.AttributeList = {
+ val rv = new jmx.AttributeList
+ for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
+ rv
+ }
+
+ def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = {
+ try {
+ actionName match {
+ case "purge" => threadPool.purge
+ case "shutdown" => threadPool.shutdown
+ case "setCorePoolSize" =>
+ params match {
+ case Array(corePoolSize: java.lang.Integer) => threadPool.setCorePoolSize(corePoolSize.intValue)
+ case _ => throw new Exception("Bad signature " + params.toList.toString)
+ }
+ case "setMaximumPoolSize" =>
+ params match {
+ case Array(maximumPoolSize: java.lang.Integer) => threadPool.setMaximumPoolSize(maximumPoolSize.intValue)
+ case _ => throw new Exception("Bad signature " + params.toList.toString)
+ }
+ }
+ } catch { case e: Exception => throw new jmx.MBeanException(e) }
+ "Success"
+ }
+
+ def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
+ def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
+}
diff --git a/kernel/src/main/scala/management/RestfulJMX.scala b/kernel/src/main/scala/management/RestfulJMX.scala
new file mode 100755
index 0000000000..05a5aac2a5
--- /dev/null
+++ b/kernel/src/main/scala/management/RestfulJMX.scala
@@ -0,0 +1,96 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.management
+
+import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
+import se.scalablesolutions.akka.kernel.config.ScalaConfig._
+import se.scalablesolutions.akka.kernel.util.Logging
+
+import javax.ws.rs.core.MultivaluedMap
+import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes}
+import javax.management._
+import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
+import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
+import java.util.concurrent.ConcurrentHashMap
+
+/**
+ * REST interface to Akka's JMX service.
+ *
+ * Here is an example that retreives the current number of Actors.
+ *
+ * http://localhost:9998/jmx
+ * ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
+ * &component=se.scalablesolutions.akka:type=Stats
+ * &attribute=counter_NrOfActors
+ *
+ */
+@Path("/jmx")
+class RestfulJMX extends Actor with Logging {
+ private case class Request(service: String, component: String, attribute: String)
+
+ private val connectors = new ConcurrentHashMap[String, JMXConnector]
+
+ @GET
+ @Produces(Array("text/plain"))
+ def queryJMX(
+ @QueryParam("service") service: String,
+ @QueryParam("component") component: String,
+ @QueryParam("attribute") attribute: String): String=
+ (this !! Request(service, component, attribute)).getOrElse("Error in REST JMX management service")
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute))
+ }
+
+ private def retrieveAttribute(service: String, component: String, attribute: String): String = {
+ try {
+ var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service)))
+ connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString
+ } catch {
+ case e: Exception =>
+ if (connectors.contains(service)) connectors.remove(service)
+ throw e
+ }
+ }
+}
+
+/**
+ * REST interface to Akka's statistics recorder.
+ *
+ * Here is an example that retreives a statistics report.
+ *
+ * http://localhost:9998/stats?reset=true
+ *
+ */
+@Path("/stats")
+class StatisticsReporter extends Actor with Logging {
+ private case class Stats(reset: Boolean)
+ @GET
+ @Produces(Array("text/html"))
+ def stats(@QueryParam("reset") reset: String): scala.xml.Elem =
+ (this !! Stats(java.lang.Boolean.valueOf(reset).booleanValue)).getOrElse(Error in REST JMX management service
)
+
+ override def receive: PartialFunction[Any, Unit] = {
+ case Stats(reset) => reply({Management.getStats(reset)})
+ }
+}
+
+class RestfulJMXBoot extends Logging {
+ log.info("Booting Restful JMX servivce")
+ object factory extends SupervisorFactory {
+ override def getSupervisorConfig: SupervisorConfig = {
+ SupervisorConfig(
+ RestartStrategy(OneForOne, 3, 100),
+ Supervise(
+ new RestfulJMX,
+ LifeCycle(Permanent, 100)) ::
+ Supervise(
+ new StatisticsReporter,
+ LifeCycle(Permanent, 100)) ::
+ Nil)
+ }
+ }
+ factory.newSupervisor.startSupervisor
+}
diff --git a/kernel/src/main/scala/management/ScalaJMX.scala b/kernel/src/main/scala/management/ScalaJMX.scala
new file mode 100755
index 0000000000..b4a7800f88
--- /dev/null
+++ b/kernel/src/main/scala/management/ScalaJMX.scala
@@ -0,0 +1,171 @@
+/**
+ * Copyright (C) 2009 Scalable Solutions.
+ */
+
+package se.scalablesolutions.akka.kernel.management
+
+import javax.management._
+import java.lang.management._
+
+/*
+object ScalaJMX {
+
+ val mbeanServer = ManagementFactory.getPlatformMBeanServer
+
+ def register(t: AnyRef, i: Class, name: ObjectName) = mbeanServer.registerMBean(new StandardMBean(t, i), name)
+ def registerBean(bean: DynamicMBean, name: ObjectName): ObjectInstance = mbeanServer.registerMBean(bean, name)
+ def register(t: AnyRef, name: String): ObjectInstance = register(t, beanClass(t), name)
+
+ def info(name: ObjectName): SBean = mbeanServer.getMBeanInfo(name)
+ def bean(name: ObjectName): SBeanInfo = convBeanInfo(name, mbeanServer.getMBeanInfo(name))
+ def invoke(name: ObjectName, operationName: String, params: Array[Object], signature: Array[String]): Object =
+ mbeanServer.invoke(name, operationName, params, signature)
+ def call(name: ObjectName, operationName: String): Object = invoke(name, operationName, Array[Object](), Array[String]())
+
+ def get(name: ObjectName, attribute: String) = mbeanServer.getAttribute(name, attribute)
+ def set(name: ObjectName, attribute: String, value: Object) = mbeanServer.setAttribute(name, new Attribute(attribute, value))
+
+ implicit def instanceToName(oi: ObjectInstance) = oi.getObjectName()
+ implicit def stringToName(name: String) = ObjectName.getInstance(name)
+ implicit def convBean(bi: MBeanInfo):SBean = SBean(bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
+ implicit def seqToArr(seq: Seq[AnyRef]): Array[Object] = seq.toArray
+
+ def convBeanInfo(name: ObjectName, bi: MBeanInfo):SBeanInfo = new SBeanInfo(name, bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
+
+ implicit def convAttrs(attrs: Array[MBeanAttributeInfo]): Seq[SAttr] =
+ for (val a <- attrs) yield a
+ implicit def convParams(params: Array[MBeanParameterInfo]): Seq[SParameter] =
+ for (val p <- params) yield p
+ implicit def convNotes(notes: Array[MBeanNotificationInfo]): Seq[SNotification] =
+ for (val p <- notes) yield p
+ implicit def convCons(cons: Array[MBeanConstructorInfo]): Seq[SConstructor] =
+ for (val p <- cons) yield p
+ implicit def convOps(cons: Array[MBeanOperationInfo]): Seq[SOperation] =
+ for (val p <- cons) yield p
+
+ implicit def convAttr(attr: MBeanAttributeInfo) = SAttr(attr.getName(), attr.getDescription(), attr.getType(), attr.isIs(), attr.isReadable(), attr.isWritable())
+ implicit def convNote(note: MBeanNotificationInfo) = SNotification(note.getName(), note.getDescription(), note.getNotifTypes())
+ implicit def convOp(op: MBeanOperationInfo):SOperation = SOperation(op.getName(), op.getDescription(), op.getImpact(), op.getReturnType(), op.getSignature())
+ implicit def convCon(con: MBeanConstructorInfo):SConstructor = SConstructor(con getName, con getDescription, con getSignature)
+ implicit def convParam(p: MBeanParameterInfo) = SParameter(p getName, p getDescription, p getType)
+
+ private def beanClass(t: AnyRef) = Class.forName(t.getClass().getName() + "MBean")
+}
+
+class MBean(mbeanInterface: String) extends StandardMBean(Class.forName(mbeanInterface))
+
+abstract class SFeature(val name: String, val description: String)
+
+case class SBean(className: String, description: String,
+ attrs: Seq[SAttr], notes: Seq[SNotification],
+ ops: Seq[SOperation], cons: Seq[SConstructor]) {
+ def writable = attrs.toList.filter(sa => sa.writable)
+}
+
+class SBeanInfo(name: ObjectName, className: String, description: String,
+ attrs: Seq[SAttr], notes: Seq[SNotification],
+ ops: Seq[SOperation], cons: Seq[SConstructor])
+extends SBean(className, description, attrs, notes, ops, cons) {
+
+ def get(attribute: String) = SJMX.get(name, attribute)
+ def set(attribute: String, value: Object) = SJMX.set(name, attribute, value)
+ def call(opName: String) = SJMX.call(name, opName)
+}
+
+case class SAttr(
+ override val name: String,
+ override val description: String,
+ jmxType: String, isIs: boolean, readable: boolean, writable: boolean
+) extends SFeature(name, description)
+
+case class SNotification(
+ override val name: String,
+ override val description: String,
+ notifTypes: Array[String]) extends SFeature(name, description)
+
+case class SOperation(
+ override val name: String,
+ override val description: String,
+ impact: int,
+ returnType: String,
+ signature: Seq[SParameter]) extends SFeature(name, description)
+
+case class SParameter(
+ override val name: String,
+ override val description: String,
+ jmxType: String) extends SFeature(name, description)
+
+case class SConstructor(
+ override val name: String,
+ override val description: String,
+ signature: Seq[SParameter]) extends SFeature(name, description)
+
+*/
+
+/*
+package com.soletta.spipe;
+
+import javax.management.{StandardMBean,ObjectName,MBeanInfo};
+
+class SPipe extends MBean("com.soletta.spipe.SPipeMBean") with SPipeMBean {
+
+ import Console.println;
+ import SJMX._;
+
+ private var desc: String = "Yipe!";
+
+ def go = {
+ val oname: ObjectName = "default:name=SPipe";
+ val instance = SJMX.registerBean(this, oname);
+
+ set(oname, "Factor", "Hello!");
+ println(get(oname, "Factor"));
+
+ val SBean(n, d, Seq(_, a2, a3, _*), _, ops, _) = info(oname);
+ println("Bean name is " + n + ", description is " + d);
+ println("Second attribute is " + a2);
+ println("Third attribute is " + a3);
+ println("Writable attributes are " + info(oname).writable);
+ println("Ops: " + ops);
+
+ val x =
+
+ {ops.toList.map(o => )}
+ ;
+
+ println(x);
+
+ val inf = bean(oname);
+ inf.call("start");
+ println(inf.get("Factor"));
+
+ }
+
+ def getName = "SPipe!";
+ def setDescription(d: String) = desc = d;
+ override def getDescription() = desc;
+ def getFactor = desc;
+ def setFactor(s: String) = desc = s;
+ def isHappy = true;
+
+ override def getDescription(info: MBeanInfo) = desc;
+
+}
+
+object PipeMain {
+ def main(args: Array[String]): unit = {
+ (new SPipe) go;
+ }
+}
+
+trait SPipeMBean {
+ def getName: String;
+ def getDescription: String = getName;
+ def setDescription(d: String): unit;
+ def getFactor: String;
+ def setFactor(s: String): unit;
+ def isHappy: boolean;
+
+ def start() = { Console.println("Starting"); }
+ def stop() = { }
+*/
diff --git a/kernel/src/main/scala/nio/RemoteClient.scala b/kernel/src/main/scala/nio/RemoteClient.scala
index 2e62d4dbcf..e91173c4f7 100644
--- a/kernel/src/main/scala/nio/RemoteClient.scala
+++ b/kernel/src/main/scala/nio/RemoteClient.scala
@@ -12,6 +12,7 @@ import kernel.actor.{Exit, Actor}
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
import serialization.{Serializer, Serializable, SerializationProtocol}
import kernel.util.Logging
+import kernel.management.Management
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
@@ -21,6 +22,8 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import scala.collection.mutable.HashMap
+import com.twitter.service.Stats
+
/**
* @author Jonas Bonér
*/
@@ -44,6 +47,10 @@ object RemoteClient extends Logging {
* @author Jonas Bonér
*/
class RemoteClient(hostname: String, port: Int) extends Logging {
+ val name = "RemoteClient@" + hostname
+ val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
+ val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
+
@volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
private val supervisors = new ConcurrentHashMap[String, Actor]
@@ -55,7 +62,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
private val bootstrap = new ClientBootstrap(channelFactory)
- bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(futures, supervisors))
+ bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors))
bootstrap.setOption("tcpNoDelay", true)
bootstrap.setOption("keepAlive", true)
@@ -84,6 +91,10 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
+ if (Management.RECORD_STATS) {
+ NR_OF_BYTES_SENT.incr(request.getSerializedSize)
+ NR_OF_MESSAGES_SENT.incr
+ }
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
@@ -111,15 +122,16 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
/**
* @author Jonas Bonér
*/
-class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFutureResult],
- supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
+class RemoteClientPipelineFactory(name: String,
+ futures: ConcurrentMap[Long, CompletableFutureResult],
+ supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance));
p.addLast("frameEncoder", new LengthFieldPrepender(4));
p.addLast("protobufEncoder", new ProtobufEncoder());
- p.addLast("handler", new RemoteClientHandler(futures, supervisors))
+ p.addLast("handler", new RemoteClientHandler(name, futures, supervisors))
p
}
}
@@ -128,10 +140,14 @@ class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFuture
* @author Jonas Bonér
*/
@ChannelPipelineCoverage { val value = "all" }
-class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult],
+class RemoteClientHandler(val name: String,
+ val futures: ConcurrentMap[Long, CompletableFutureResult],
val supervisors: ConcurrentMap[String, Actor])
extends SimpleChannelUpstreamHandler with Logging {
+ val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
+ val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
+
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.debug(event.toString)
@@ -144,6 +160,10 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_RECEIVED.incr
+ NR_OF_BYTES_RECEIVED.incr(reply.getSerializedSize)
+ }
log.debug("Received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId)
if (reply.getIsSuccessful) {
@@ -159,7 +179,7 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
}
future.completeWithException(null, parseException(reply))
}
- futures.remove(reply.getId)
+ futures.remove(reply.getId)
} else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result)
} catch {
case e: Exception =>
diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala
index b2f1931879..bbc223247e 100644
--- a/kernel/src/main/scala/nio/RemoteServer.scala
+++ b/kernel/src/main/scala/nio/RemoteServer.scala
@@ -13,6 +13,7 @@ import kernel.util._
import protobuf.RemoteProtocol
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import serialization.{Serializer, Serializable, SerializationProtocol}
+import kernel.management.Management
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
@@ -20,22 +21,28 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
+import com.twitter.service.Stats
+
/**
* @author Jonas Bonér
*/
class RemoteServer extends Logging {
- def start = RemoteServer.start
+ def start = RemoteServer.start(None)
}
/**
* @author Jonas Bonér
*/
object RemoteServer extends Logging {
- val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost")
- val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999)
- val CONNECTION_TIMEOUT_MILLIS = kernel.Kernel.config.getInt("akka.remote.connection-timeout", 1000)
+ import kernel.Kernel.config
+ val HOSTNAME = config.getString("akka.remote.hostname", "localhost")
+ val PORT = config.getInt("akka.remote.port", 9999)
+ val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000)
+
+ val name = "RemoteServer@" + HOSTNAME
@volatile private var isRunning = false
+ @volatile private var isConfigured = false
private val factory = new NioServerSocketChannelFactory(
Executors.newCachedThreadPool,
@@ -44,18 +51,15 @@ object RemoteServer extends Logging {
private val activeObjectFactory = new ActiveObjectFactory
private val bootstrap = new ServerBootstrap(factory)
- // FIXME provide different codecs (Thrift, Avro, Protobuf, JSON)
- private val handler = new RemoteServerHandler
- bootstrap.setPipelineFactory(new RemoteServerPipelineFactory)
- bootstrap.setOption("child.tcpNoDelay", true)
- bootstrap.setOption("child.keepAlive", true)
- bootstrap.setOption("child.reuseAddress", true)
- bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
-
- def start = synchronized {
+ def start(loader: Option[ClassLoader]) = synchronized {
if (!isRunning) {
log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT)
+ bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader))
+ bootstrap.setOption("child.tcpNoDelay", true)
+ bootstrap.setOption("child.keepAlive", true)
+ bootstrap.setOption("child.reuseAddress", true)
+ bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
isRunning = true
}
@@ -65,14 +69,14 @@ object RemoteServer extends Logging {
/**
* @author Jonas Bonér
*/
-class RemoteServerPipelineFactory extends ChannelPipelineFactory {
+class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory {
def getPipeline: ChannelPipeline = {
val p = Channels.pipeline()
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteRequest.getDefaultInstance))
p.addLast("frameEncoder", new LengthFieldPrepender(4))
p.addLast("protobufEncoder", new ProtobufEncoder)
- p.addLast("handler", new RemoteServerHandler)
+ p.addLast("handler", new RemoteServerHandler(name, loader))
p
}
}
@@ -81,7 +85,12 @@ class RemoteServerPipelineFactory extends ChannelPipelineFactory {
* @author Jonas Bonér
*/
@ChannelPipelineCoverage { val value = "all" }
-class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
+class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
+ val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
+ val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
+ val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
+ val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
+
private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
@@ -106,6 +115,10 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
}
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_RECEIVED.incr
+ NR_OF_BYTES_RECEIVED.incr(request.getSerializedSize)
+ }
log.debug("Received RemoteRequest[\n%s]", request.toString)
if (request.getIsActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
@@ -128,7 +141,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setIsActor(true)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
- channel.write(replyBuilder.build)
+ val replyMessage = replyBuilder.build
+ channel.write(replyMessage)
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_SENT.incr
+ NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
+ }
} catch {
case e: Throwable =>
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
@@ -139,7 +157,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setIsSuccessful(false)
.setIsActor(true)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
- channel.write(replyBuilder.build)
+ val replyMessage = replyBuilder.build
+ channel.write(replyMessage)
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_SENT.incr
+ NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
+ }
}
}
}
@@ -165,7 +188,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setIsActor(false)
RemoteProtocolBuilder.setMessage(result, replyBuilder)
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
- channel.write(replyBuilder.build)
+ val replyMessage = replyBuilder.build
+ channel.write(replyMessage)
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_SENT.incr
+ NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
+ }
}
} catch {
case e: InvocationTargetException =>
@@ -176,8 +204,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
- if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
- channel.write(replyBuilder.build)
+ if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
+ val replyMessage = replyBuilder.build
+ channel.write(replyMessage)
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_SENT.incr
+ NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
+ }
case e: Throwable =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
e.printStackTrace
@@ -186,8 +219,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
.setException(e.getClass.getName + "$" + e.getMessage)
.setIsSuccessful(false)
.setIsActor(false)
- if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
- channel.write(replyBuilder.build)
+ if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
+ val replyMessage = replyBuilder.build
+ channel.write(replyMessage)
+ if (Management.RECORD_STATS) {
+ NR_OF_MESSAGES_SENT.incr
+ NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
+ }
}
}
@@ -223,8 +261,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
private def createActiveObject(name: String, timeout: Long): AnyRef = {
val activeObjectOrNull = activeObjects.get(name)
if (activeObjectOrNull == null) {
- val clazz = Class.forName(name)
try {
+ val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
+ else Class.forName(name)
val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef]
activeObjects.put(name, newInstance)
newInstance
@@ -240,8 +279,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
private def createActor(name: String, timeout: Long): Actor = {
val actorOrNull = actors.get(name)
if (actorOrNull == null) {
- val clazz = Class.forName(name)
try {
+ val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
+ else Class.forName(name)
val newInstance = clazz.newInstance.asInstanceOf[Actor]
newInstance.timeout = timeout
actors.put(name, newInstance)
diff --git a/kernel/src/main/scala/reactor/Dispatchers.scala b/kernel/src/main/scala/reactor/Dispatchers.scala
index d7697e965c..5c4935bbd5 100644
--- a/kernel/src/main/scala/reactor/Dispatchers.scala
+++ b/kernel/src/main/scala/reactor/Dispatchers.scala
@@ -49,17 +49,17 @@ class DispatcherFactory {
* Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
* Has a fluent builder interface for configuring its semantics.
*/
- def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
- def newConcurrentEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher(true)
+ def newEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name)
+ def newConcurrentEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name, true)
/**
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
*/
- def newEventBasedSingleThreadDispatcher = new EventBasedSingleThreadDispatcher
+ def newEventBasedSingleThreadDispatcher(name: String) = new EventBasedSingleThreadDispatcher(name)
/**
* Creates an thread based dispatcher serving a single actor through the same single thread.
* E.g. each actor consumes its own thread.
*/
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
-}
\ No newline at end of file
+}
diff --git a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
index c3d52e777e..a4c7a0fc80 100644
--- a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala
@@ -10,9 +10,14 @@
*/
package se.scalablesolutions.akka.kernel.reactor
+import kernel.management.Management
+
import java.util.{LinkedList, Queue, List}
-class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
+import com.twitter.service.Stats
+
+class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBase(name) {
+ val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessage_" + name)
def start = if (!active) {
active = true
val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue)
@@ -22,12 +27,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
try {
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
- val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations.iterator
- while (selectedInvocations.hasNext) {
- val invocation = selectedInvocations.next
+ val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
+ if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(selectedInvocations.size)
+ val iter = selectedInvocations.iterator
+ while (iter.hasNext) {
+ val invocation = iter.next
val invoker = messageHandlers.get(invocation.sender)
if (invoker != null) invoker.invoke(invocation)
- selectedInvocations.remove
+ iter.remove
}
}
}
diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
index a3b9b4dbe4..1f96769374 100644
--- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
+++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala
@@ -4,12 +4,16 @@
package se.scalablesolutions.akka.kernel.reactor
+import kernel.management.{Management, ThreadPoolMBean}
+
import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import java.util.{Collection, HashSet, HashMap, LinkedList, List}
+import com.twitter.service.Stats
+
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350].
@@ -56,16 +60,17 @@ import java.util.{Collection, HashSet, HashMap, LinkedList, List}
*
* @author Jonas Bonér
*/
-class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extends MessageDispatcherBase {
- def this() = this(false)
+class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: Boolean) extends MessageDispatcherBase(name) {
+ def this(name: String) = this(name, false)
+ val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
private var inProcessOfBuilding = false
private var executor: ExecutorService = _
private var threadPoolBuilder: ThreadPoolExecutor = _
- private val threadFactory = new MonitorableThreadFactory("akka")
+ private val threadFactory = new MonitorableThreadFactory("akka:" + name)
private var boundedExecutorBound = -1
private val busyInvokers = new HashSet[AnyRef]
@@ -74,6 +79,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
def start = if (!active) {
active = true
+ Management.registerMBean(new ThreadPoolMBean(threadPoolBuilder), "ThreadPool_" + name)
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
@@ -89,6 +95,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
val reservedInvocations = reserve(selectedInvocations)
+ if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(reservedInvocations.size)
val it = reservedInvocations.entrySet.iterator
while (it.hasNext) {
val entry = it.next
@@ -157,6 +164,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
ensureNotActive
verifyNotInConstructionPhase
inProcessOfBuilding = false
+ blockingQueue = queue
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
this
}
@@ -169,7 +177,8 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
- threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory)
+ blockingQueue = new LinkedBlockingQueue[Runnable]
+ threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
boundedExecutorBound = bound
this
}
@@ -177,28 +186,32 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
- threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable](capacity), threadFactory, new CallerRunsPolicy)
+ blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
+ threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
- threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory, new CallerRunsPolicy)
+ blockingQueue = new LinkedBlockingQueue[Runnable]
+ threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
- threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy)
+ blockingQueue = new SynchronousQueue[Runnable](fair)
+ threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
ensureNotActive
verifyNotInConstructionPhase
- threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy)
+ blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair)
+ threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
this
}
@@ -311,13 +324,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
- def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
-/*
- def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
- def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
- def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
- def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
- */
+ def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
}
/**
diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
index 9dd21f2f42..d47db197a5 100644
--- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
+++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala
@@ -4,20 +4,31 @@
package se.scalablesolutions.akka.kernel.reactor
+import kernel.management.Management
+
import java.util.{LinkedList, Queue, List}
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{TimeUnit, BlockingQueue}
import java.util.HashMap
-trait MessageDispatcherBase extends MessageDispatcher {
+import com.twitter.service.Stats
+
+abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher {
+
//val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
val MILLISECONDS = TimeUnit.MILLISECONDS
- val queue = new ReactiveMessageQueue
-
+ val queue = new ReactiveMessageQueue(name)
+ var blockingQueue: BlockingQueue[Runnable] = _
@volatile protected var active: Boolean = false
protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
protected var selectorThread: Thread = _
protected val guard = new Object
+ if (Management.RECORD_STATS) {
+ Stats.makeGauge("SizeOfBlockingQueue_" + name) {
+ guard.synchronized { blockingQueue.size.toDouble }
+ }
+ }
+
def messageQueue = queue
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
@@ -40,10 +51,16 @@ trait MessageDispatcherBase extends MessageDispatcher {
protected def doShutdown = {}
}
-class ReactiveMessageQueue extends MessageQueue {
+class ReactiveMessageQueue(name: String) extends MessageQueue {
private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
@volatile private var interrupted = false
+ if (Management.RECORD_STATS) {
+ Stats.makeGauge("SizeOfReactiveQueue_" + name) {
+ queue.synchronized { queue.size.toDouble }
+ }
+ }
+
def append(handle: MessageInvocation) = queue.synchronized {
queue.offer(handle)
queue.notifyAll
@@ -64,4 +81,4 @@ class ReactiveMessageQueue extends MessageQueue {
interrupted = true
queue.notifyAll
}
-}
\ No newline at end of file
+}
diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
index b01f373dc9..fbae2d8c99 100644
--- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
+++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala
@@ -4,18 +4,24 @@
package se.scalablesolutions.akka.kernel.reactor
+import com.twitter.service.Stats
+
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
+
import kernel.actor.{Actor, ActorMessageInvoker}
+import kernel.management.Management
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
* @author Jonas Bonér
*/
-class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) extends MessageDispatcher {
- def this(actor: Actor) = this(new ActorMessageInvoker(actor))
+class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher {
+ def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor))
- private val queue = new BlockingMessageQueue
+ val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
+
+ private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@@ -27,6 +33,7 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker)
override def run = {
while (active) {
try {
+ if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr
messageHandler.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
@@ -44,7 +51,13 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker)
def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException
}
-class BlockingMessageQueue extends MessageQueue {
+class BlockingMessageQueue(name: String) extends MessageQueue {
+ if (Management.RECORD_STATS) {
+ Stats.makeGauge("SizeOfBlockingQueue_" + name) {
+ queue.size.toDouble
+ }
+ }
+
// FIXME: configure the LBQ
private val queue = new LinkedBlockingQueue[MessageInvocation]
def append(handle: MessageInvocation) = queue.put(handle)
@@ -52,4 +65,4 @@ class BlockingMessageQueue extends MessageQueue {
def take: MessageInvocation = queue.take
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
def interrupt = throw new UnsupportedOperationException
-}
\ No newline at end of file
+}
diff --git a/kernel/src/main/scala/jersey/ActorComponentProvider.scala b/kernel/src/main/scala/rest/ActorComponentProvider.scala
old mode 100644
new mode 100755
similarity index 95%
rename from kernel/src/main/scala/jersey/ActorComponentProvider.scala
rename to kernel/src/main/scala/rest/ActorComponentProvider.scala
index 0f908d6856..f7a577f61f
--- a/kernel/src/main/scala/jersey/ActorComponentProvider.scala
+++ b/kernel/src/main/scala/rest/ActorComponentProvider.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka.kernel.jersey
+package se.scalablesolutions.akka.kernel.rest
import com.sun.jersey.core.spi.component.ioc.IoCFullyManagedComponentProvider
diff --git a/kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala b/kernel/src/main/scala/rest/ActorComponentProviderFactory.scala
old mode 100644
new mode 100755
similarity index 73%
rename from kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala
rename to kernel/src/main/scala/rest/ActorComponentProviderFactory.scala
index 82d0f1e76b..1a46e44ff8
--- a/kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala
+++ b/kernel/src/main/scala/rest/ActorComponentProviderFactory.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka.kernel.jersey
+package se.scalablesolutions.akka.kernel.rest
import kernel.Kernel
import util.Logging
@@ -19,7 +19,6 @@ extends IoCComponentProviderFactory with Logging {
override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz)
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = {
- //log.info("ProviderFactory: resolve => " + clazz.getName)
- configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
+ configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
}
-}
\ No newline at end of file
+}
diff --git a/kernel/src/main/scala/jersey/AkkaServlet.scala b/kernel/src/main/scala/rest/AkkaServlet.scala
old mode 100644
new mode 100755
similarity index 98%
rename from kernel/src/main/scala/jersey/AkkaServlet.scala
rename to kernel/src/main/scala/rest/AkkaServlet.scala
index 2f8356e29e..885d3e6321
--- a/kernel/src/main/scala/jersey/AkkaServlet.scala
+++ b/kernel/src/main/scala/rest/AkkaServlet.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka.kernel.jersey
+package se.scalablesolutions.akka.kernel.rest
import kernel.Kernel
import config.ConfiguratorRepository
diff --git a/kernel/src/main/scala/jersey/NodeWriter.scala b/kernel/src/main/scala/rest/NodeWriter.scala
old mode 100644
new mode 100755
similarity index 95%
rename from kernel/src/main/scala/jersey/NodeWriter.scala
rename to kernel/src/main/scala/rest/NodeWriter.scala
index 7ec65248f6..c301d9c2b5
--- a/kernel/src/main/scala/jersey/NodeWriter.scala
+++ b/kernel/src/main/scala/rest/NodeWriter.scala
@@ -2,7 +2,7 @@
* Copyright (C) 2009 Scalable Solutions.
*/
-package se.scalablesolutions.akka.kernel.jersey
+package se.scalablesolutions.akka.kernel.rest
import java.io.OutputStream
import java.lang.annotation.Annotation
diff --git a/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
index dc272893f5..3ac4eee51a 100644
--- a/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala
@@ -55,7 +55,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
val key = "key"
- val dispatcher = new EventBasedSingleThreadDispatcher
+ val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
@@ -69,7 +69,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(2)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = new EventBasedSingleThreadDispatcher
+ val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
dispatcher.start
@@ -83,7 +83,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = new EventBasedSingleThreadDispatcher
+ val dispatcher = new EventBasedSingleThreadDispatcher("name")
dispatcher.registerHandler(key1, new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
diff --git a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
index 89b908015e..c0b205d6f6 100644
--- a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
+++ b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala
@@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(10)
val key = "key"
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handlersBarrier = new CyclicBarrier(3)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
@@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
val handleLatch = new CountDownLatch(200)
val key1 = "key1"
val key2 = "key2"
- val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
+ val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
.setCorePoolSize(2)
.setMaxPoolSize(4)
diff --git a/kernel/src/test/scala/JerseySpec.scala b/kernel/src/test/scala/JerseySpec.scala
deleted file mode 100644
index 5f0147ea02..0000000000
--- a/kernel/src/test/scala/JerseySpec.scala
+++ /dev/null
@@ -1,91 +0,0 @@
-/**
- * Copyright (C) 2009 Scalable Solutions.
- */
-
-package se.scalablesolutions.akka.kernel
-
-import akka.kernel.config.ActiveObjectGuiceConfigurator
-import kernel.config.ScalaConfig._
-
-import com.sun.grizzly.http.SelectorThread
-import com.sun.jersey.api.client.Client
-import com.sun.jersey.core.header.MediaTypes
-import com.sun.jersey.api.container.grizzly.GrizzlyWebContainerFactory
-import javax.ws.rs.core.UriBuilder
-import javax.ws.rs.{Produces, Path, GET}
-
-import com.google.inject.{AbstractModule, Scopes}
-
-import org.scalatest.Spec
-import org.scalatest.matchers.ShouldMatchers
-
-//simport com.jteigen.scalatest.JUnit4Runner
-import org.junit.runner.RunWith
-import org.junit.Test
-import org.junit.Assert._
-
-/**
- * @author Jonas Bonér
- */
-//@RunWith(classOf[JUnit4Runner])
-class JerseySpec extends Spec with ShouldMatchers {
-
- describe("A Jersey REST service") {
- it("should ...") {
- /*
- val selector = startJersey
- selector.start
- val conf = new ActiveObjectGuiceConfigurator
- conf.configure(
- RestartStrategy(AllForOne, 3, 5000),
- Component(
- classOf[resource.JerseyFoo],
- LifeCycle(Permanent, 1000),
- 1000) ::
- Nil).supervise
-
- conf.getInstance(classOf[resource.JerseyFoo])
- */
-
- /*
- val client = Client.create
- val webResource = client.resource(UriBuilder.fromUri("http://localhost/").port(9998).build)
- //val webResource = client.resource("http://localhost:9998/foo")
- val responseMsg = webResource.get(classOf[String])
- responseMsg should equal ("Hello World")
- selector.stopEndpoint
- */
- }
- }
-
- def startJersey: SelectorThread = {
- val initParams = new java.util.HashMap[String, String]
- initParams.put("com.sun.jersey.config.property.packages", "se.scalablesolutions.akka.kernel")
- GrizzlyWebContainerFactory.create(UriBuilder.fromUri("http://localhost/").port(9998).build(), initParams)
- }
-}
-
-// @GET
-// @Produces("application/json")
-// @Path("/network/{id: [0-9]+}/{nid}")
-// def getUserByNetworkId(@PathParam {val value = "id"} id: Int, @PathParam {val value = "nid"} networkId: String): User = {
-// val q = em.createQuery("SELECT u FROM User u WHERE u.networkId = :id AND u.networkUserId = :nid")
-// q.setParameter("id", id)
-// q.setParameter("nid", networkId)
-// q.getSingleResult.asInstanceOf[User]
-// }
-
-package resource {
- import javax.ws.rs.{Produces, Path, GET}
-
- class JerseyFoo {
- @GET
- @Produces(Array("application/json"))
- def foo: String = { val ret = "JerseyFoo.foo"; println(ret); ret }
- }
- @Path("/foo")
- class JerseyFooSub extends JerseyFoo
- class JerseyBar {
- def bar(msg: String) = msg + "return_bar "
- }
-}
diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala
index 61397e6018..3387aa8eb0 100644
--- a/kernel/src/test/scala/RemoteActorSpec.scala
+++ b/kernel/src/test/scala/RemoteActorSpec.scala
@@ -26,7 +26,6 @@ class RemoteActorSpecActorBidirectional extends Actor {
}
class RemoteActorSpec extends TestCase {
-
kernel.Kernel.config
new Thread(new Runnable() {
def run = {
diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
index 621e3dccfd..1ad5c0b733 100644
--- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
+++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala
@@ -49,7 +49,7 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
val guardLock = new ReentrantLock
val handleLatch = new CountDownLatch(100)
- val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch))
+ val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
dispatcher.start
for (i <- 0 until 100) {
dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None))
@@ -60,7 +60,7 @@ class ThreadBasedDispatcherTest extends TestCase {
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
val handleLatch = new CountDownLatch(100)
- val dispatcher = new ThreadBasedDispatcher(new MessageInvoker {
+ val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
var currentValue = -1;
def invoke(message: MessageInvocation) {
if (threadingIssueDetected.get) return
diff --git a/lib/JSAP-2.1.jar b/lib/JSAP-2.1.jar
deleted file mode 100644
index 15d1f37a78..0000000000
Binary files a/lib/JSAP-2.1.jar and /dev/null differ
diff --git a/lib/antlr-3.1.3.jar b/lib/antlr-3.1.3.jar
deleted file mode 100644
index d3bd9cf942..0000000000
Binary files a/lib/antlr-3.1.3.jar and /dev/null differ
diff --git a/lib/aopalliance-1.0.jar b/lib/aopalliance-1.0.jar
deleted file mode 100644
index 578b1a0c35..0000000000
Binary files a/lib/aopalliance-1.0.jar and /dev/null differ
diff --git a/lib/asm-3.1.jar b/lib/asm-3.1.jar
deleted file mode 100644
index 8217cae0a1..0000000000
Binary files a/lib/asm-3.1.jar and /dev/null differ
diff --git a/lib/aspectwerkz-nodeps-jdk5-2.1.jar b/lib/aspectwerkz-nodeps-jdk5-2.1.jar
deleted file mode 100644
index 3a65f7b3de..0000000000
Binary files a/lib/aspectwerkz-nodeps-jdk5-2.1.jar and /dev/null differ
diff --git a/lib/atmosphere-compat-0.3.jar b/lib/atmosphere-compat-0.3.jar
deleted file mode 100644
index edf4424071..0000000000
Binary files a/lib/atmosphere-compat-0.3.jar and /dev/null differ
diff --git a/lib/atmosphere-core-0.3.jar b/lib/atmosphere-core-0.3.jar
deleted file mode 100644
index aef71c00ba..0000000000
Binary files a/lib/atmosphere-core-0.3.jar and /dev/null differ
diff --git a/lib/atmosphere-core-0.3.jar~HEAD b/lib/atmosphere-core-0.3.jar~HEAD
deleted file mode 100644
index aef71c00ba..0000000000
Binary files a/lib/atmosphere-core-0.3.jar~HEAD and /dev/null differ
diff --git a/lib/atmosphere-portable-runtime-0.3.jar b/lib/atmosphere-portable-runtime-0.3.jar
deleted file mode 100644
index 19875c8c51..0000000000
Binary files a/lib/atmosphere-portable-runtime-0.3.jar and /dev/null differ
diff --git a/lib/atmosphere-portable-runtime-0.3.jar~HEAD b/lib/atmosphere-portable-runtime-0.3.jar~HEAD
deleted file mode 100644
index 19875c8c51..0000000000
Binary files a/lib/atmosphere-portable-runtime-0.3.jar~HEAD and /dev/null differ
diff --git a/lib/camel-core-2.0-SNAPSHOT.jar b/lib/camel-core-2.0-SNAPSHOT.jar
deleted file mode 100644
index 61b8015a70..0000000000
Binary files a/lib/camel-core-2.0-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/cassandra-0.4.0-dev.jar b/lib/cassandra-0.4.0-dev.jar
deleted file mode 100644
index 706336669f..0000000000
Binary files a/lib/cassandra-0.4.0-dev.jar and /dev/null differ
diff --git a/lib/cassidy-0.1.jar b/lib/cassidy-0.1.jar
deleted file mode 100644
index 43fdb1952f..0000000000
Binary files a/lib/cassidy-0.1.jar and /dev/null differ
diff --git a/lib/cglib-2.2.jar b/lib/cglib-2.2.jar
deleted file mode 100644
index 084ef6e54b..0000000000
Binary files a/lib/cglib-2.2.jar and /dev/null differ
diff --git a/lib/commons-cli-1.1.jar b/lib/commons-cli-1.1.jar
deleted file mode 100644
index e633afbe68..0000000000
Binary files a/lib/commons-cli-1.1.jar and /dev/null differ
diff --git a/lib/commons-collections-3.2.1.jar b/lib/commons-collections-3.2.1.jar
deleted file mode 100644
index c35fa1fee1..0000000000
Binary files a/lib/commons-collections-3.2.1.jar and /dev/null differ
diff --git a/lib/commons-io-1.3.2.jar b/lib/commons-io-1.3.2.jar
deleted file mode 100644
index 865c9e41ce..0000000000
Binary files a/lib/commons-io-1.3.2.jar and /dev/null differ
diff --git a/lib/commons-javaflow-1.0-SNAPSHOT.jar b/lib/commons-javaflow-1.0-SNAPSHOT.jar
deleted file mode 100644
index 199b853307..0000000000
Binary files a/lib/commons-javaflow-1.0-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/commons-lang-2.4.jar b/lib/commons-lang-2.4.jar
deleted file mode 100644
index 532939ecab..0000000000
Binary files a/lib/commons-lang-2.4.jar and /dev/null differ
diff --git a/lib/commons-logging-1.0.4.jar b/lib/commons-logging-1.0.4.jar
deleted file mode 100644
index b73a80fab6..0000000000
Binary files a/lib/commons-logging-1.0.4.jar and /dev/null differ
diff --git a/lib/commons-math-1.1.jar b/lib/commons-math-1.1.jar
deleted file mode 100644
index 6888813fc1..0000000000
Binary files a/lib/commons-math-1.1.jar and /dev/null differ
diff --git a/lib/configgy-1.3.jar b/lib/configgy-1.3.jar
deleted file mode 100644
index ebbaea502e..0000000000
Binary files a/lib/configgy-1.3.jar and /dev/null differ
diff --git a/lib/fscontext.jar b/lib/fscontext.jar
deleted file mode 100644
index 0efc25071a..0000000000
Binary files a/lib/fscontext.jar and /dev/null differ
diff --git a/lib/google-collect-snapshot-20090211.jar b/lib/google-collect-snapshot-20090211.jar
deleted file mode 100644
index f5281988e9..0000000000
Binary files a/lib/google-collect-snapshot-20090211.jar and /dev/null differ
diff --git a/lib/grizzly-comet-webserver-1.8.6.3.jar b/lib/grizzly-comet-webserver-1.8.6.3.jar
deleted file mode 100644
index 965c623bac..0000000000
Binary files a/lib/grizzly-comet-webserver-1.8.6.3.jar and /dev/null differ
diff --git a/lib/guice-core-2.0-SNAPSHOT.jar b/lib/guice-core-2.0-SNAPSHOT.jar
deleted file mode 100644
index 2acfd950fa..0000000000
Binary files a/lib/guice-core-2.0-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/guice-jsr250-2.0-SNAPSHOT.jar b/lib/guice-jsr250-2.0-SNAPSHOT.jar
deleted file mode 100644
index f2be8ad48d..0000000000
Binary files a/lib/guice-jsr250-2.0-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/high-scale-lib.jar b/lib/high-scale-lib.jar
deleted file mode 100644
index 421a436eed..0000000000
Binary files a/lib/high-scale-lib.jar and /dev/null differ
diff --git a/lib/jackson-core-asl-1.1.0.jar b/lib/jackson-core-asl-1.1.0.jar
deleted file mode 100644
index 6b561dd2be..0000000000
Binary files a/lib/jackson-core-asl-1.1.0.jar and /dev/null differ
diff --git a/lib/jackson-mapper-asl-1.1.0.jar b/lib/jackson-mapper-asl-1.1.0.jar
deleted file mode 100644
index 1b37ad3772..0000000000
Binary files a/lib/jackson-mapper-asl-1.1.0.jar and /dev/null differ
diff --git a/lib/javautils-2.7.4-0.1.jar b/lib/javautils-2.7.4-0.1.jar
deleted file mode 100644
index a0c51bf7da..0000000000
Binary files a/lib/javautils-2.7.4-0.1.jar and /dev/null differ
diff --git a/lib/jersey-client-1.1.1-ea.jar b/lib/jersey-client-1.1.1-ea.jar
deleted file mode 100755
index fae00c4665..0000000000
Binary files a/lib/jersey-client-1.1.1-ea.jar and /dev/null differ
diff --git a/lib/jersey-core-1.1.1-ea.jar b/lib/jersey-core-1.1.1-ea.jar
deleted file mode 100755
index d269d80aa7..0000000000
Binary files a/lib/jersey-core-1.1.1-ea.jar and /dev/null differ
diff --git a/lib/jersey-json-1.1.1-ea.jar b/lib/jersey-json-1.1.1-ea.jar
deleted file mode 100644
index ed88d9cde7..0000000000
Binary files a/lib/jersey-json-1.1.1-ea.jar and /dev/null differ
diff --git a/lib/jersey-scala-1.1.2-ea-SNAPSHOT.jar b/lib/jersey-scala-1.1.2-ea-SNAPSHOT.jar
deleted file mode 100644
index a24bb19a7d..0000000000
Binary files a/lib/jersey-scala-1.1.2-ea-SNAPSHOT.jar and /dev/null differ
diff --git a/lib/jersey-server-1.1.1-ea.jar b/lib/jersey-server-1.1.1-ea.jar
deleted file mode 100755
index ed5654e9d5..0000000000
Binary files a/lib/jersey-server-1.1.1-ea.jar and /dev/null differ
diff --git a/lib/jsr250-api-1.0.jar b/lib/jsr250-api-1.0.jar
deleted file mode 100644
index c1f29bf844..0000000000
Binary files a/lib/jsr250-api-1.0.jar and /dev/null differ
diff --git a/lib/jsr311-api-1.0.jar b/lib/jsr311-api-1.0.jar
deleted file mode 100644
index 2bede66c1f..0000000000
Binary files a/lib/jsr311-api-1.0.jar and /dev/null differ
diff --git a/lib/junit-4.5.jar b/lib/junit-4.5.jar
deleted file mode 100644
index 733921623d..0000000000
Binary files a/lib/junit-4.5.jar and /dev/null differ
diff --git a/lib/junit4runner-1.0.jar b/lib/junit4runner-1.0.jar
deleted file mode 100644
index 6f91bd8044..0000000000
Binary files a/lib/junit4runner-1.0.jar and /dev/null differ
diff --git a/lib/libfb303.jar b/lib/libfb303.jar
deleted file mode 100644
index 129328deff..0000000000
Binary files a/lib/libfb303.jar and /dev/null differ
diff --git a/lib/libthrift.jar b/lib/libthrift.jar
deleted file mode 100644
index 9782f261ae..0000000000
Binary files a/lib/libthrift.jar and /dev/null differ
diff --git a/lib/log4j-1.2.15.jar b/lib/log4j-1.2.15.jar
deleted file mode 100644
index c930a6ab4d..0000000000
Binary files a/lib/log4j-1.2.15.jar and /dev/null differ
diff --git a/lib/lucene-core-2.2.0.jar b/lib/lucene-core-2.2.0.jar
deleted file mode 100644
index 2469481c38..0000000000
Binary files a/lib/lucene-core-2.2.0.jar and /dev/null differ
diff --git a/lib/netty-3.1.0.GA.jar b/lib/netty-3.1.0.GA.jar
deleted file mode 100644
index 94c9b24902..0000000000
Binary files a/lib/netty-3.1.0.GA.jar and /dev/null differ
diff --git a/lib/protobuf-java-2.1.0.jar b/lib/protobuf-java-2.1.0.jar
deleted file mode 100644
index 961d55122a..0000000000
Binary files a/lib/protobuf-java-2.1.0.jar and /dev/null differ
diff --git a/lib/providerutil.jar b/lib/providerutil.jar
deleted file mode 100644
index 0d6e48f388..0000000000
Binary files a/lib/providerutil.jar and /dev/null differ
diff --git a/lib/sbinary-0.3.jar b/lib/sbinary-0.3.jar
deleted file mode 100644
index e7839cfb24..0000000000
Binary files a/lib/sbinary-0.3.jar and /dev/null differ
diff --git a/lib/scala-json-1.0.jar b/lib/scala-json-1.0.jar
deleted file mode 100644
index df0ec2eaa6..0000000000
Binary files a/lib/scala-json-1.0.jar and /dev/null differ
diff --git a/lib/scala-library-2.7.5.jar b/lib/scala-library-2.7.5.jar
deleted file mode 100644
index 07d2e1381a..0000000000
Binary files a/lib/scala-library-2.7.5.jar and /dev/null differ
diff --git a/lib/scalatest-0.9.5.jar b/lib/scalatest-0.9.5.jar
deleted file mode 100644
index adb241a55a..0000000000
Binary files a/lib/scalatest-0.9.5.jar and /dev/null differ
diff --git a/lib/servlet-api-2.5.jar b/lib/servlet-api-2.5.jar
deleted file mode 100644
index fb52493468..0000000000
Binary files a/lib/servlet-api-2.5.jar and /dev/null differ
diff --git a/lib/slf4j-api-1.4.3.jar b/lib/slf4j-api-1.4.3.jar
deleted file mode 100644
index ec7050cec3..0000000000
Binary files a/lib/slf4j-api-1.4.3.jar and /dev/null differ
diff --git a/lib/slf4j-log4j12-1.4.3.jar b/lib/slf4j-log4j12-1.4.3.jar
deleted file mode 100644
index c8466de59b..0000000000
Binary files a/lib/slf4j-log4j12-1.4.3.jar and /dev/null differ
diff --git a/lib/stringtemplate-3.0.jar b/lib/stringtemplate-3.0.jar
deleted file mode 100644
index df5e6e517f..0000000000
Binary files a/lib/stringtemplate-3.0.jar and /dev/null differ
diff --git a/lib/zookeeper-3.1.0.jar b/lib/zookeeper-3.1.0.jar
deleted file mode 100644
index b7e639e63a..0000000000
Binary files a/lib/zookeeper-3.1.0.jar and /dev/null differ
diff --git a/pom.xml b/pom.xml
index f321ff0e88..abb46463bc 100644
--- a/pom.xml
+++ b/pom.xml
@@ -7,12 +7,12 @@
Akka Actor Kernel
akka
se.scalablesolutions.akka
- 0.5
+ 0.6
2009
pom
- 0.5
+ 0.6
se.scalablesolutions.akka
2.7.5
@@ -117,6 +117,11 @@
false
+
+ guice-maven
+ guice maven
+ http://guice-maven.googlecode.com/svn/trunk
+
google-maven-repository
Google Maven Repository
@@ -141,6 +146,10 @@
+
+ onejar-maven-plugin.googlecode.com
+ http://onejar-maven-plugin.googlecode.com/svn/mavenrepo
+
scala-tools.org
Scala-Tools Maven2 Repository
@@ -149,41 +158,213 @@
+ src/main/scala
+ src/test/scala
+
+ org.mortbay.jetty
+ maven-jetty-plugin
+
+ /
+ 5
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 2.0.2
+
+ 1.5
+ 1.5
+
+
+
+ org.scala-tools
+ maven-scala-plugin
+ 2.10.1
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+ -Xmx1024m
+
+
+
+
+ ${scala.version}
+
+
+
+ true
+ maven-source-plugin
+
+
+ attach-sources
+
+ jar
+
+
+
+
+
+ org.apache.maven.plugins
+ maven-changes-plugin
+ 2.0
+
+
+ org.apache.maven.plugins
+ maven-jar-plugin
+ 2.2
+
+
+
+ ${akka.version}
+
+
+
+
org.apache.maven.plugins
maven-antrun-plugin
-
-
-
-
-
-
+
+
+
+ org.apache.felix
+ maven-bundle-plugin
+ 2.0.0
+ true
+
+
+ J2SE-1.5
+ <_versionpolicy>[$(@),$(version;=+;$(@)))
+
+
+
+
+ create-bundle
+ package
+
+ bundle
+
+
+
+ bundle-install
+ install
+
+ install
+
+
+
+
+
+
-
+
+
+
+ org.codehaus.mojo
+ taglist-maven-plugin
+ 2.3
+
+
+ FIXME
+ TODO
+ XXX
+ @todo
+ @deprecated
+
+
+
+
+ maven-project-info-reports-plugin
+
+
+ org.scala-tools
+ maven-scala-plugin
+ 2.9
+
+
+ -Xmx1024m
+ -DpackageLinkDefs=file://${basedir}/../vscaladocs-packageLinkDefs.properties
+
+
+ -unchecked
+
+ 1.2-SNAPSHOT
+ ${scala.version}
+
+
+
+ org.apache.maven.plugins
+ maven-changes-plugin
+ 2.0-beta-3
+
+
+
+ changes-report
+
+
+
+
+ ${basedir}/changes.xml
+
+
+
+ maven-surefire-report-plugin
+
+
+
+
+ report-only
+
+
+
+
+
+
+
+
+
+
+ release
+
+
+ scala-tools.org
+ http://nexus.scala-tools.org/content/repositories/releases
+
+
+ scala-tools.org
+ file://${user.home}/.m2/mvnsites/akka
+
+
+
+
+ hudson
+
+
+ hudson.scala-tools.org
+ file:///home/scala-tools.org/www/repo-snapshots
+
+
+ hudson.scala-tools.org
+ file:///home/scala-tools.org/www/repo-snapshots
+ false
+
+
+ hudson.scala-tools.org
+ file:///home/scala-tools.org/www/mvnsites-snapshots/akka
+
+
+
+
diff --git a/samples-java/pom.xml b/samples-java/pom.xml
index 9a230ab94a..fc6c4840f9 100644
--- a/samples-java/pom.xml
+++ b/samples-java/pom.xml
@@ -11,6 +11,7 @@
akka
se.scalablesolutions.akka
0.5
+ ../pom.xml
diff --git a/samples-lift/pom.xml b/samples-lift/pom.xml
index 220dd37cd5..a26ade154e 100644
--- a/samples-lift/pom.xml
+++ b/samples-lift/pom.xml
@@ -11,108 +11,14 @@
akka
se.scalablesolutions.akka
0.5
+ ../pom.xml
1.1-SNAPSHOT
-
-
- repo1.maven
- Maven Main Repository
- http://repo1.maven.org/maven2
-
-
- project.embedded.module
- Project Embedded Repository
- file://${basedir}/../embedded-repo
-
-
- scala-tools-snapshots
- Scala-Tools Maven2 Snapshot Repository
- http://scala-tools.org/repo-snapshots
-
-
- scala-tools
- Scala-Tools Maven2 Repository
- http://scala-tools.org/repo-releases
-
-
- lag
- Configgy's' Repository
- http://www.lag.net/repo
-
-
- maven2-repository.dev.java.net
- Java.net Repository for Maven
- http://download.java.net/maven/2
-
-
- java.net
- Java.net Legacy Repository for Maven
- http://download.java.net/maven/1
- legacy
-
-
- guiceyfruit.release
- GuiceyFruit Release Repository
- http://guiceyfruit.googlecode.com/svn/repo/releases/
-
- false
-
-
- true
-
-
-
- guiceyfruit.snapshot
- GuiceyFruit Snapshot Repository
- http://guiceyfruit.googlecode.com/svn/repo/snapshots/
-
- true
-
-
- false
-
-
-
- google-maven-repository
- Google Maven Repository
- http://google-maven-repository.googlecode.com/svn/repository/
-
-
- repository.codehaus.org
- Codehaus Maven Repository
- http://repository.codehaus.org
-
- true
-
-
-
- repository.jboss.org
- JBoss Repository for Maven
- http://repository.jboss.org/maven2
-
- false
-
-
-
-
-
-
- scala-tools.org
- Scala-Tools Maven2 Repository
- http://scala-tools.org/repo-releases
-
-
-
-
- org.codehaus.aspectwerkz
- aspectwerkz-nodeps-jdk5
- 2.1
-
se.scalablesolutions.akka
akka-kernel
@@ -161,7 +67,6 @@
[6.1.6,)
test
-
org.scala-lang
scala-compiler
@@ -169,68 +74,4 @@
test
-
-
- src/main/scala
- src/test/scala
-
-
- org.scala-tools
- maven-scala-plugin
-
-
-
- compile
- testCompile
-
-
-
-
- ${scala.version}
-
-
-
- org.mortbay.jetty
- maven-jetty-plugin
-
- /
- 5
-
-
-
- net.sf.alchim
- yuicompressor-maven-plugin
-
-
-
- compress
-
-
-
-
- true
-
-
-
-
-
- false
- config
-
- akka.conf
-
-
-
-
-
-
-
- org.scala-tools
- maven-scala-plugin
-
- ${scala.version}
-
-
-
-
diff --git a/samples-lift/src/main/webapp/WEB-INF/web.xml b/samples-lift/src/main/webapp/WEB-INF/web.xml
old mode 100644
new mode 100755
index f8ee2157a1..d474da1ca1
--- a/samples-lift/src/main/webapp/WEB-INF/web.xml
+++ b/samples-lift/src/main/webapp/WEB-INF/web.xml
@@ -13,7 +13,7 @@
AkkaServlet
- se.scalablesolutions.akka.kernel.jersey.AkkaServlet
+ se.scalablesolutions.akka.kernel.rest.AkkaServlet
AkkaServlet
diff --git a/samples-scala/pom.xml b/samples-scala/pom.xml
index 7e02e9af1c..e14760003d 100644
--- a/samples-scala/pom.xml
+++ b/samples-scala/pom.xml
@@ -11,6 +11,7 @@
akka
se.scalablesolutions.akka
0.5
+ ../pom.xml
@@ -33,53 +34,7 @@
src/main/scala
- src/test/scala
-
- org.scala-tools
- maven-scala-plugin
-
-
-
- compile
- testCompile
-
-
-
-
-
- -target:jvm-1.5
- -unchecked
-
- 2.7.5
- 1.1
-
-
-
- org.apache.maven.plugins
- maven-eclipse-plugin
-
- true
-
-
- ch.epfl.lamp.sdt.core.scalabuilder
-
-
-
-
- ch.epfl.lamp.sdt.core.scalanature
-
-
-
-
- org.eclipse.jdt.launching.JRE_CONTAINER
-
-
- ch.epfl.lamp.sdt.launching.SCALA_CONTAINER
-
-
-
-
maven-antrun-plugin
@@ -98,21 +53,5 @@
-
-
- false
- src/main/resources
-
-
- false
- src/main/scala
-
- **
-
-
- **/*.scala
-
-
-
diff --git a/scripts/remove_trailing_whitespace.sh b/scripts/remove_trailing_whitespace.sh
index 7ea7480cf3..35038ae44d 100755
--- a/scripts/remove_trailing_whitespace.sh
+++ b/scripts/remove_trailing_whitespace.sh
@@ -1,5 +1,4 @@
#!/bin/sh
echo "removing all trailing whitespace from all *.scala, *.html and *.xml files"
-# find . -type f -name '*.scala' -exec sed -i 's/[ \t]*$//' {} \;
-
+find . -type f -name '*.scala' -exec sed -i 's/[ \t]*$//' {} \;
diff --git a/util-java/pom.xml b/util-java/pom.xml
index 658003e5e9..fdcbbb5113 100644
--- a/util-java/pom.xml
+++ b/util-java/pom.xml
@@ -10,7 +10,8 @@
akka
se.scalablesolutions.akka
- 0.5
+ 0.6
+ ../pom.xml
@@ -19,11 +20,6 @@
guice-core
2.0-SNAPSHOT
-
- org.guiceyfruit
- guice-jsr250
- 2.0-SNAPSHOT
-
com.google.protobuf
protobuf-java
@@ -46,23 +42,6 @@
-
- maven-antrun-plugin
-
-
- install
-
-
-
-
-
-
- run
-
-
-
-
diff --git a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java
index 89632f5d79..052409d5fd 100644
--- a/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java
+++ b/util-java/src/main/java/se/scalablesolutions/akka/kernel/config/ActiveObjectGuiceModule.java
@@ -8,7 +8,7 @@ import java.util.List;
import com.google.inject.AbstractModule;
import com.google.inject.Singleton;
-import com.google.inject.jsr250.ResourceProviderFactory;
+//import com.google.inject.jsr250.ResourceProviderFactory;
/**
* @author Jonas Bonér
@@ -21,7 +21,7 @@ public class ActiveObjectGuiceModule extends AbstractModule {
}
protected void configure() {
- bind(ResourceProviderFactory.class);
+ //bind(ResourceProviderFactory.class);
for (int i = 0; i < bindings.size(); i++) {
final DependencyBinding db = bindings.get(i);
//if (db.getInterface() != null) bind((Class) db.getInterface()).to((Class) db.getTarget()).in(Singleton.class);