implemented statistics recording with JMX and REST APIs (based on scala-stats)
This commit is contained in:
parent
8316c90278
commit
9d2aaebe0b
28 changed files with 446 additions and 109 deletions
|
|
@ -3,7 +3,6 @@
|
|||
VERSION=0.5
|
||||
|
||||
BASE_DIR=$(dirname $0)/..
|
||||
|
||||
echo 'Starting Akka Kernel from directory' $BASE_DIR
|
||||
|
||||
echo 'Resetting persistent storage in' $BASE_DIR/storage
|
||||
|
|
@ -20,7 +19,6 @@ LIB_DIR=$BASE_DIR/lib
|
|||
CLASSPATH=$BASE_DIR/config
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/akka-kernel-0.5.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/akka-util-java-0.5.jar
|
||||
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/antlr-3.1.3.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/aopalliance-1.0.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/asm-3.1.jar
|
||||
|
|
@ -63,6 +61,7 @@ CLASSPATH=$CLASSPATH:$LIB_DIR/netty-3.1.0.CR1.jar
|
|||
CLASSPATH=$CLASSPATH:$LIB_DIR/providerutil.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/protobuf-java-2.1.0.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/scala-library-2.7.5.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/scala-stats-1.0.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/servlet-api-2.5.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/slf4j-api-1.4.3.jar
|
||||
CLASSPATH=$CLASSPATH:$LIB_DIR/slf4j-log4j12-1.4.3.jar
|
||||
|
|
@ -74,7 +73,7 @@ CLASSPATH=$CLASSPATH:$LIB_DIR/zookeeper-3.1.0.jar
|
|||
JVM_OPTS=" \
|
||||
-server \
|
||||
-Xms128M \
|
||||
-Xmx2G \
|
||||
-Xmx1G \
|
||||
-XX:SurvivorRatio=8 \
|
||||
-XX:TargetSurvivorRatio=90 \
|
||||
-XX:+AggressiveOpts \
|
||||
|
|
@ -91,4 +90,5 @@ JVM_OPTS=" \
|
|||
|
||||
|
||||
#$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1}
|
||||
echo $JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.kernel.Kernel ${1}
|
||||
$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.kernel.Kernel ${1}
|
||||
|
|
|
|||
|
|
@ -1,6 +1,6 @@
|
|||
#################################
|
||||
# Akka Actor Kernel Config File #
|
||||
#################################
|
||||
#####################
|
||||
# Akka Config File #
|
||||
###################
|
||||
|
||||
# This file has all the default settings, so all these could be remove with no visible effect.
|
||||
# Modify as needed.
|
||||
|
|
@ -19,11 +19,16 @@
|
|||
|
||||
boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor
|
||||
# supervisor bootstrap, should be defined in default constructor
|
||||
<management>
|
||||
service = on
|
||||
record-stats = on
|
||||
</management>
|
||||
|
||||
<actor>
|
||||
timeout = 5000 # default timeout for future based invocations
|
||||
concurrent-mode = off # if turned on, then the same actor instance is allowed to execute concurrently -
|
||||
# e.g. departing from the actor model for better performance
|
||||
serialize-messages = on # does a deep clone of (non-primitive) messages to ensure immutability
|
||||
serialize-messages = on # does a deep clone of (non-primitive) messages to ensure immutability
|
||||
</actor>
|
||||
|
||||
<stm>
|
||||
|
|
|
|||
|
|
@ -21,7 +21,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase {
|
|||
|
||||
protected void setUp() {
|
||||
se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config();
|
||||
EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher();
|
||||
EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher("name");
|
||||
dispatcher
|
||||
.withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
.setCorePoolSize(16)
|
||||
|
|
|
|||
|
|
@ -125,11 +125,6 @@
|
|||
<artifactId>commons-lang</artifactId>
|
||||
<version>2.4</version>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>se.foldleft</groupId>
|
||||
<artifactId>cassidy</artifactId>
|
||||
<version>0.1</version>
|
||||
</dependency>
|
||||
|
||||
<!-- For Jersey -->
|
||||
<dependency>
|
||||
|
|
|
|||
|
|
@ -12,12 +12,13 @@ import javax.ws.rs.core.UriBuilder
|
|||
import java.io.File
|
||||
import java.net.URLClassLoader
|
||||
|
||||
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment}
|
||||
import net.lag.configgy.{Config, Configgy, RuntimeEnvironment, ParseException}
|
||||
|
||||
import kernel.jersey.AkkaCometServlet
|
||||
import kernel.nio.RemoteServer
|
||||
import kernel.state.CassandraStorage
|
||||
import kernel.util.Logging
|
||||
import kernel.management.Management
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
|
|
@ -32,6 +33,7 @@ object Kernel extends Logging {
|
|||
val BOOT_CLASSES = config.getList("akka.boot")
|
||||
|
||||
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true)
|
||||
val RUN_MANAGEMENT_SERVICE = config.getBool("akka.management.service", true)
|
||||
val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
|
||||
|
||||
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
|
||||
|
|
@ -43,6 +45,7 @@ object Kernel extends Logging {
|
|||
private var remoteServer: RemoteServer = _
|
||||
private var jerseySelectorThread: SelectorThread = _
|
||||
private val startTime = System.currentTimeMillis
|
||||
private var applicationLoader: Option[ClassLoader] = None
|
||||
|
||||
def main(args: Array[String]) = boot
|
||||
|
||||
|
|
@ -51,11 +54,15 @@ object Kernel extends Logging {
|
|||
printBanner
|
||||
log.info("Starting Akka kernel...")
|
||||
|
||||
runApplicationBootClasses
|
||||
|
||||
if (RUN_REMOTE_SERVICE) startRemoteService
|
||||
if (RUN_MANAGEMENT_SERVICE) startManagementService
|
||||
|
||||
STORAGE_SYSTEM match {
|
||||
case "cassandra" => startCassandra
|
||||
case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported")
|
||||
case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported")
|
||||
case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported")
|
||||
case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported")
|
||||
case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported")
|
||||
|
|
@ -64,8 +71,6 @@ object Kernel extends Logging {
|
|||
|
||||
if (RUN_REST_SERVICE) startJersey
|
||||
|
||||
runApplicationBootClasses
|
||||
|
||||
log.info("Akka kernel started successfully")
|
||||
hasBooted = true
|
||||
}
|
||||
|
|
@ -79,17 +84,18 @@ object Kernel extends Logging {
|
|||
val runtime = new RuntimeEnvironment(getClass)
|
||||
//runtime.load(args)
|
||||
val config = Configgy.config
|
||||
config.registerWithJmx("com.scalablesolutions.akka.config")
|
||||
config.registerWithJmx("se.scalablesolutions.akka")
|
||||
|
||||
// FIXME fix Configgy JMX subscription to allow management
|
||||
// config.subscribe { c => configure(c.getOrElse(new Config)) }
|
||||
config
|
||||
} catch {
|
||||
case e: net.lag.configgy.ParseException => throw new Error("Could not retreive the akka.conf config file. Make sure you have set the AKKA_HOME environment variable to the root of the distribution.")
|
||||
case e: ParseException => throw new Error("Could not retreive the akka.conf config file. Make sure you have set the AKKA_HOME environment variable to the root of the distribution.")
|
||||
}
|
||||
}
|
||||
|
||||
private[akka] def runApplicationBootClasses = {
|
||||
new management.RestfulJMXBoot // add the REST/JMX service
|
||||
val HOME = try { System.getenv("AKKA_HOME") } catch { case e: NullPointerException => throw new IllegalStateException("AKKA_HOME system variable needs to be set. Should point to the root of the Akka distribution.") }
|
||||
//val CLASSES = HOME + "/kernel/target/classes" // FIXME remove for dist
|
||||
//val LIB = HOME + "/lib"
|
||||
|
|
@ -105,16 +111,22 @@ object Kernel extends Logging {
|
|||
log.info("Booting with boot class [%s]", clazz)
|
||||
loader.loadClass(clazz).newInstance
|
||||
}
|
||||
applicationLoader = Some(loader)
|
||||
}
|
||||
|
||||
private[akka] def startRemoteService = {
|
||||
// FIXME manage remote serve thread for graceful shutdown
|
||||
val remoteServerThread = new Thread(new Runnable() {
|
||||
def run = RemoteServer.start
|
||||
}, "akka remote service")
|
||||
def run = RemoteServer.start(applicationLoader)
|
||||
}, "Akka Remote Service")
|
||||
remoteServerThread.start
|
||||
}
|
||||
|
||||
private[akka] def startManagementService = {
|
||||
Management.startJMX("se.scalablesolutions.akka")
|
||||
log.info("Management service started successfully.")
|
||||
}
|
||||
|
||||
private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) {
|
||||
System.setProperty("cassandra", "")
|
||||
System.setProperty("storage-config", akka.Boot.CONFIG + "/")
|
||||
|
|
|
|||
|
|
@ -8,14 +8,17 @@ import com.google.protobuf.ByteString
|
|||
import java.net.InetSocketAddress
|
||||
import java.util.concurrent.CopyOnWriteArraySet
|
||||
|
||||
import kernel.reactor._
|
||||
import kernel.config.ScalaConfig._
|
||||
import kernel.stm.TransactionManagement
|
||||
import kernel.util.Helpers.ReadWriteLock
|
||||
import kernel.nio.protobuf.RemoteProtocol.RemoteRequest
|
||||
import kernel.util.Logging
|
||||
import reactor._
|
||||
import config.ScalaConfig._
|
||||
import stm.TransactionManagement
|
||||
import util.Helpers.ReadWriteLock
|
||||
import nio.protobuf.RemoteProtocol.RemoteRequest
|
||||
import util.Logging
|
||||
import serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
|
||||
import management.Management
|
||||
|
||||
import com.twitter.service.Stats
|
||||
|
||||
sealed abstract class LifecycleMessage
|
||||
case class Init(config: AnyRef) extends LifecycleMessage
|
||||
|
|
@ -42,14 +45,16 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Actor {
|
||||
val TIMEOUT = kernel.Kernel.config.getInt("akka.actor.timeout", 5000)
|
||||
val SERIALIZE_MESSAGES = kernel.Kernel.config.getBool("akka.actor.serialize-messages", false)
|
||||
val TIMEOUT = Kernel.config.getInt("akka.actor.timeout", 5000)
|
||||
val SERIALIZE_MESSAGES = Kernel.config.getBool("akka.actor.serialize-messages", false)
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@serializable trait Actor extends Logging with TransactionManagement {
|
||||
trait Actor extends Logging with TransactionManagement {
|
||||
Stats.getCounter("NrOfActors").incr
|
||||
|
||||
@volatile private[this] var isRunning: Boolean = false
|
||||
private[this] val remoteFlagLock = new ReadWriteLock
|
||||
private[this] val transactionalFlagLock = new ReadWriteLock
|
||||
|
|
@ -64,6 +69,8 @@ object Actor {
|
|||
protected[this] val linkedActors = new CopyOnWriteArraySet[Actor]
|
||||
protected[actor] var lifeCycleConfig: Option[LifeCycle] = None
|
||||
|
||||
val name = this.getClass.getName
|
||||
|
||||
// ====================================
|
||||
// ==== USER CALLBACKS TO OVERRIDE ====
|
||||
// ====================================
|
||||
|
|
@ -96,7 +103,7 @@ object Actor {
|
|||
* </pre>
|
||||
*/
|
||||
protected[kernel] var dispatcher: MessageDispatcher = {
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName)
|
||||
mailbox = dispatcher.messageQueue
|
||||
dispatcher.registerHandler(this, new ActorMessageInvoker(this))
|
||||
dispatcher
|
||||
|
|
@ -529,6 +536,8 @@ object Actor {
|
|||
}
|
||||
|
||||
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
|
||||
if (Management.RECORD_STATS) Stats.getCounter("NrOfFailures_" + dead.name).incr
|
||||
|
||||
if (trapExit) {
|
||||
if (faultHandler.isDefined) {
|
||||
faultHandler.get match {
|
||||
|
|
@ -546,6 +555,7 @@ object Actor {
|
|||
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
|
||||
|
||||
private[Actor] def restart(reason: AnyRef) = synchronized {
|
||||
if (Management.RECORD_STATS) Stats.getCounter("NrOfRestarts_" + name).incr
|
||||
lifeCycleConfig match {
|
||||
case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.")
|
||||
|
||||
|
|
|
|||
|
|
@ -19,7 +19,6 @@ extends IoCComponentProviderFactory with Logging {
|
|||
override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz)
|
||||
|
||||
override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = {
|
||||
//log.info("ProviderFactory: resolve => " + clazz.getName)
|
||||
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
|
||||
configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
143
kernel/src/main/scala/management/JMX.scala
Executable file
143
kernel/src/main/scala/management/JMX.scala
Executable file
|
|
@ -0,0 +1,143 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.kernel.management
|
||||
|
||||
import com.twitter.service.Stats
|
||||
|
||||
import scala.collection.jcl
|
||||
|
||||
import java.lang.management.ManagementFactory
|
||||
import javax.{management => jmx}
|
||||
import javax.management.remote.{JMXConnectorServerFactory, JMXServiceURL}
|
||||
|
||||
import kernel.Kernel.config
|
||||
import kernel.util.Logging
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object Management {
|
||||
val RECORD_STATS = config.getBool("akka.management.record-stats", true)
|
||||
|
||||
def startJMX(packageName: String): Unit =
|
||||
startJMX(packageName, ManagementFactory.getPlatformMBeanServer)
|
||||
|
||||
def startJMX(packageName: String, mbeanServer: jmx.MBeanServer): Unit = {
|
||||
mbeanServer.registerMBean(new StatisticsMBean, new jmx.ObjectName(packageName + ":type=Stats"))
|
||||
mbeanServer.registerMBean(new ManagementMBean, new jmx.ObjectName(packageName + ":type=Management"))
|
||||
java.rmi.registry.LocateRegistry.createRegistry(1099)
|
||||
JMXConnectorServerFactory.newJMXConnectorServer(
|
||||
new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"),
|
||||
null,
|
||||
mbeanServer).start
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class StatisticsMBean extends jmx.DynamicMBean with Logging {
|
||||
def getMBeanInfo = new jmx.MBeanInfo(
|
||||
"se.scalablesolutions.akka.management.Stats",
|
||||
"runtime statistics",
|
||||
getAttributeInfo,
|
||||
null, null, null,
|
||||
new jmx.ImmutableDescriptor("immutableInfo=false"))
|
||||
|
||||
def getAttribute(name: String): AnyRef = {
|
||||
val segments = name.split("_", 2)
|
||||
segments(0) match {
|
||||
case "counter" =>
|
||||
Stats.getCounterStats()(segments(1)).asInstanceOf[java.lang.Long]
|
||||
case "timing" =>
|
||||
val prefix = segments(1).split("_", 2)
|
||||
val timing = Stats.getTimingStats(false)(prefix(1))
|
||||
val x = prefix(0) match {
|
||||
case "min" => timing.minimum
|
||||
case "max" => timing.maximum
|
||||
case "count" => timing.count
|
||||
case "average" => timing.average
|
||||
}
|
||||
x.asInstanceOf[java.lang.Integer]
|
||||
case "gauge" =>
|
||||
Stats.getGaugeStats(false)(segments(1)).asInstanceOf[java.lang.Double]
|
||||
}
|
||||
}
|
||||
|
||||
def getAttributes(names: Array[String]): jmx.AttributeList = {
|
||||
val rv = new jmx.AttributeList
|
||||
for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
|
||||
rv
|
||||
}
|
||||
|
||||
def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = throw new UnsupportedOperationException
|
||||
def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
|
||||
def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
|
||||
|
||||
private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
|
||||
(Stats.getCounterStats.keys.map { name =>
|
||||
List(new jmx.MBeanAttributeInfo("counter_" + name, "java.lang.Long", "counter", true, false, false))
|
||||
} ++ Stats.getTimingStats(false).keys.map { name =>
|
||||
List("min", "max", "average", "count") map { prefix =>
|
||||
new jmx.MBeanAttributeInfo("timing_" + prefix + "_" + name, "java.lang.Integer", "timing", true, false, false)
|
||||
}
|
||||
} ++ Stats.getGaugeStats(false).keys.map { name =>
|
||||
List(new jmx.MBeanAttributeInfo("gauge_" + name, "java.lang.Long", "gauge", true, false, false))
|
||||
}).toList.flatten[jmx.MBeanAttributeInfo].toArray
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ManagementMBean extends jmx.DynamicMBean with Logging {
|
||||
val operations: Array[jmx.MBeanOperationInfo] = Array(
|
||||
new jmx.MBeanOperationInfo("dosomething", "TODO",
|
||||
Array(
|
||||
new jmx.MBeanParameterInfo("key", "java.lang.String", "TODO"),
|
||||
new jmx.MBeanParameterInfo("value", "java.lang.String", "TODO")
|
||||
), "void", jmx.MBeanOperationInfo.ACTION)
|
||||
)
|
||||
|
||||
def getMBeanInfo = new jmx.MBeanInfo(
|
||||
"se.scalablesolutions.akka.management.Management",
|
||||
"runtime management",
|
||||
Array[jmx.MBeanAttributeInfo](),
|
||||
null, operations, null,
|
||||
new jmx.ImmutableDescriptor("immutableInfo=false"))
|
||||
|
||||
def getAttribute(name: String): AnyRef = throw new UnsupportedOperationException
|
||||
|
||||
def getAttributes(names: Array[String]): jmx.AttributeList = throw new UnsupportedOperationException
|
||||
|
||||
def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = {
|
||||
actionName match {
|
||||
case "dosomething" =>
|
||||
params match {
|
||||
case Array(name: String, value: String) =>
|
||||
try {
|
||||
{}
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
log.warning("Exception: %s", e.getMessage)
|
||||
throw e
|
||||
}
|
||||
case _ => throw new jmx.MBeanException(new Exception("Bad signature " + params.toList.toString))
|
||||
}
|
||||
case _ => throw new jmx.MBeanException(new Exception("No such method"))
|
||||
}
|
||||
null
|
||||
}
|
||||
|
||||
def setAttribute(attr: jmx.Attribute): Unit = attr.getValue match {
|
||||
case s: String => println("Setting: " + s)
|
||||
case _ => throw new jmx.InvalidAttributeValueException()
|
||||
}
|
||||
|
||||
def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = {
|
||||
for (attr <- jcl.Buffer(attrs.asList)) setAttribute(attr)
|
||||
attrs
|
||||
}
|
||||
}
|
||||
71
kernel/src/main/scala/management/RestfulJMX.scala
Executable file
71
kernel/src/main/scala/management/RestfulJMX.scala
Executable file
|
|
@ -0,0 +1,71 @@
|
|||
/**
|
||||
* Copyright (C) 2009 Scalable Solutions.
|
||||
*/
|
||||
|
||||
package se.scalablesolutions.akka.kernel.management
|
||||
|
||||
import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
|
||||
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
|
||||
import se.scalablesolutions.akka.kernel.util.Logging
|
||||
|
||||
import javax.ws.rs.core.MultivaluedMap
|
||||
import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes}
|
||||
import javax.management._
|
||||
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
|
||||
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
|
||||
import java.util.concurrent.ConcurrentHashMap
|
||||
|
||||
/**
|
||||
* REST interface to Akka's JMX service.
|
||||
* <p/>
|
||||
* Here is an example that retreives the current number of Actors.
|
||||
* <pre>
|
||||
* http://localhost:9998/management
|
||||
* ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
|
||||
* &component=se.scalablesolutions.akka:type=Stats
|
||||
* &attribute=counter_NrOfActors
|
||||
* </pre>
|
||||
*/
|
||||
@Path("/management")
|
||||
class RestfulJMX extends Actor with Logging {
|
||||
private case class Request(service: String, component: String, attribute: String)
|
||||
private val connectors = new ConcurrentHashMap[String, JMXConnector]
|
||||
|
||||
@GET
|
||||
@Produces(Array("text/html"))
|
||||
def queryJMX(
|
||||
@QueryParam("service") service: String,
|
||||
@QueryParam("component") component: String,
|
||||
@QueryParam("attribute") attribute: String) =
|
||||
(this !! Request(service, component, attribute)).getOrElse(<error>Error in REST JMX management service</error>)
|
||||
|
||||
override def receive: PartialFunction[Any, Unit] = {
|
||||
case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute))
|
||||
}
|
||||
|
||||
private def retrieveAttribute(service: String, component: String, attribute: String): scala.xml.Elem = {
|
||||
try {
|
||||
var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service)))
|
||||
<div>{connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString}</div>
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
if (connectors.contains(service)) connectors.remove(service)
|
||||
throw e
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
class RestfulJMXBoot extends Logging {
|
||||
log.info("Booting Restful JMX servivce")
|
||||
object factory extends SupervisorFactory {
|
||||
override def getSupervisorConfig: SupervisorConfig = {
|
||||
SupervisorConfig(
|
||||
RestartStrategy(OneForOne, 3, 100),
|
||||
Supervise(
|
||||
new RestfulJMX,
|
||||
LifeCycle(Permanent, 100))
|
||||
:: Nil)
|
||||
}
|
||||
}
|
||||
factory.newSupervisor.startSupervisor
|
||||
}
|
||||
|
|
@ -12,6 +12,7 @@ import kernel.actor.{Exit, Actor}
|
|||
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
|
||||
import serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import kernel.util.Logging
|
||||
import kernel.management.Management
|
||||
|
||||
import org.jboss.netty.bootstrap.ClientBootstrap
|
||||
import org.jboss.netty.channel._
|
||||
|
|
@ -21,6 +22,8 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
|||
|
||||
import scala.collection.mutable.HashMap
|
||||
|
||||
import com.twitter.service.Stats
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
|
|
@ -44,6 +47,10 @@ object RemoteClient extends Logging {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClient(hostname: String, port: Int) extends Logging {
|
||||
val name = "RemoteClient@" + hostname
|
||||
val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
|
||||
val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
|
||||
|
||||
@volatile private var isRunning = false
|
||||
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
|
||||
private val supervisors = new ConcurrentHashMap[String, Actor]
|
||||
|
|
@ -55,7 +62,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
|
||||
private val bootstrap = new ClientBootstrap(channelFactory)
|
||||
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(futures, supervisors))
|
||||
bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors))
|
||||
bootstrap.setOption("tcpNoDelay", true)
|
||||
bootstrap.setOption("keepAlive", true)
|
||||
|
||||
|
|
@ -84,6 +91,10 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
}
|
||||
|
||||
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_BYTES_SENT.incr(request.getSerializedSize)
|
||||
NR_OF_MESSAGES_SENT.incr
|
||||
}
|
||||
if (request.getIsOneWay) {
|
||||
connection.getChannel.write(request)
|
||||
None
|
||||
|
|
@ -111,15 +122,16 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFutureResult],
|
||||
supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
|
||||
class RemoteClientPipelineFactory(name: String,
|
||||
futures: ConcurrentMap[Long, CompletableFutureResult],
|
||||
supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory {
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val p = Channels.pipeline()
|
||||
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4));
|
||||
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance));
|
||||
p.addLast("frameEncoder", new LengthFieldPrepender(4));
|
||||
p.addLast("protobufEncoder", new ProtobufEncoder());
|
||||
p.addLast("handler", new RemoteClientHandler(futures, supervisors))
|
||||
p.addLast("handler", new RemoteClientHandler(name, futures, supervisors))
|
||||
p
|
||||
}
|
||||
}
|
||||
|
|
@ -128,10 +140,14 @@ class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFuture
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@ChannelPipelineCoverage { val value = "all" }
|
||||
class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult],
|
||||
class RemoteClientHandler(val name: String,
|
||||
val futures: ConcurrentMap[Long, CompletableFutureResult],
|
||||
val supervisors: ConcurrentMap[String, Actor])
|
||||
extends SimpleChannelUpstreamHandler with Logging {
|
||||
|
||||
val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
|
||||
val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
|
||||
|
||||
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
|
||||
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
|
||||
log.debug(event.toString)
|
||||
|
|
@ -144,6 +160,10 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
|
|||
val result = event.getMessage
|
||||
if (result.isInstanceOf[RemoteReply]) {
|
||||
val reply = result.asInstanceOf[RemoteReply]
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_RECEIVED.incr
|
||||
NR_OF_BYTES_RECEIVED.incr(reply.getSerializedSize)
|
||||
}
|
||||
log.debug("Received RemoteReply[\n%s]", reply.toString)
|
||||
val future = futures.get(reply.getId)
|
||||
if (reply.getIsSuccessful) {
|
||||
|
|
@ -159,7 +179,7 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu
|
|||
}
|
||||
future.completeWithException(null, parseException(reply))
|
||||
}
|
||||
futures.remove(reply.getId)
|
||||
futures.remove(reply.getId)
|
||||
} else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result)
|
||||
} catch {
|
||||
case e: Exception =>
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ import kernel.util._
|
|||
import protobuf.RemoteProtocol
|
||||
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
|
||||
import serialization.{Serializer, Serializable, SerializationProtocol}
|
||||
import kernel.management.Management
|
||||
|
||||
import org.jboss.netty.bootstrap.ServerBootstrap
|
||||
import org.jboss.netty.channel._
|
||||
|
|
@ -20,22 +21,28 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
|
|||
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
|
||||
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
|
||||
|
||||
import com.twitter.service.Stats
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteServer extends Logging {
|
||||
def start = RemoteServer.start
|
||||
def start = RemoteServer.start(None)
|
||||
}
|
||||
|
||||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
object RemoteServer extends Logging {
|
||||
val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost")
|
||||
val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999)
|
||||
val CONNECTION_TIMEOUT_MILLIS = kernel.Kernel.config.getInt("akka.remote.connection-timeout", 1000)
|
||||
import kernel.Kernel.config
|
||||
val HOSTNAME = config.getString("akka.remote.hostname", "localhost")
|
||||
val PORT = config.getInt("akka.remote.port", 9999)
|
||||
val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000)
|
||||
|
||||
val name = "RemoteServer@" + HOSTNAME
|
||||
|
||||
@volatile private var isRunning = false
|
||||
@volatile private var isConfigured = false
|
||||
|
||||
private val factory = new NioServerSocketChannelFactory(
|
||||
Executors.newCachedThreadPool,
|
||||
|
|
@ -44,18 +51,15 @@ object RemoteServer extends Logging {
|
|||
private val activeObjectFactory = new ActiveObjectFactory
|
||||
|
||||
private val bootstrap = new ServerBootstrap(factory)
|
||||
// FIXME provide different codecs (Thrift, Avro, Protobuf, JSON)
|
||||
|
||||
private val handler = new RemoteServerHandler
|
||||
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory)
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
bootstrap.setOption("child.keepAlive", true)
|
||||
bootstrap.setOption("child.reuseAddress", true)
|
||||
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
|
||||
|
||||
def start = synchronized {
|
||||
def start(loader: Option[ClassLoader]) = synchronized {
|
||||
if (!isRunning) {
|
||||
log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT)
|
||||
bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader))
|
||||
bootstrap.setOption("child.tcpNoDelay", true)
|
||||
bootstrap.setOption("child.keepAlive", true)
|
||||
bootstrap.setOption("child.reuseAddress", true)
|
||||
bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS)
|
||||
bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT))
|
||||
isRunning = true
|
||||
}
|
||||
|
|
@ -65,14 +69,14 @@ object RemoteServer extends Logging {
|
|||
/**
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class RemoteServerPipelineFactory extends ChannelPipelineFactory {
|
||||
class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory {
|
||||
def getPipeline: ChannelPipeline = {
|
||||
val p = Channels.pipeline()
|
||||
p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4))
|
||||
p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteRequest.getDefaultInstance))
|
||||
p.addLast("frameEncoder", new LengthFieldPrepender(4))
|
||||
p.addLast("protobufEncoder", new ProtobufEncoder)
|
||||
p.addLast("handler", new RemoteServerHandler)
|
||||
p.addLast("handler", new RemoteServerHandler(name, loader))
|
||||
p
|
||||
}
|
||||
}
|
||||
|
|
@ -81,7 +85,12 @@ class RemoteServerPipelineFactory extends ChannelPipelineFactory {
|
|||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
@ChannelPipelineCoverage { val value = "all" }
|
||||
class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
||||
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
|
||||
val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
|
||||
val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
|
||||
val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
|
||||
val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
|
||||
|
||||
private val activeObjectFactory = new ActiveObjectFactory
|
||||
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
|
||||
private val actors = new ConcurrentHashMap[String, Actor]
|
||||
|
|
@ -106,6 +115,10 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
}
|
||||
|
||||
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_RECEIVED.incr
|
||||
NR_OF_BYTES_RECEIVED.incr(request.getSerializedSize)
|
||||
}
|
||||
log.debug("Received RemoteRequest[\n%s]", request.toString)
|
||||
if (request.getIsActor) dispatchToActor(request, channel)
|
||||
else dispatchToActiveObject(request, channel)
|
||||
|
|
@ -128,7 +141,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
.setIsActor(true)
|
||||
RemoteProtocolBuilder.setMessage(result, replyBuilder)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(replyBuilder.build)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_SENT.incr
|
||||
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
|
||||
}
|
||||
} catch {
|
||||
case e: Throwable =>
|
||||
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
|
||||
|
|
@ -139,7 +157,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
.setIsSuccessful(false)
|
||||
.setIsActor(true)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(replyBuilder.build)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_SENT.incr
|
||||
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -165,7 +188,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
.setIsActor(false)
|
||||
RemoteProtocolBuilder.setMessage(result, replyBuilder)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(replyBuilder.build)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_SENT.incr
|
||||
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
|
||||
}
|
||||
}
|
||||
} catch {
|
||||
case e: InvocationTargetException =>
|
||||
|
|
@ -176,8 +204,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
.setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(replyBuilder.build)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_SENT.incr
|
||||
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
|
||||
}
|
||||
case e: Throwable =>
|
||||
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
|
||||
e.printStackTrace
|
||||
|
|
@ -186,8 +219,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
.setException(e.getClass.getName + "$" + e.getMessage)
|
||||
.setIsSuccessful(false)
|
||||
.setIsActor(false)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
channel.write(replyBuilder.build)
|
||||
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
|
||||
val replyMessage = replyBuilder.build
|
||||
channel.write(replyMessage)
|
||||
if (Management.RECORD_STATS) {
|
||||
NR_OF_MESSAGES_SENT.incr
|
||||
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -223,8 +261,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
private def createActiveObject(name: String, timeout: Long): AnyRef = {
|
||||
val activeObjectOrNull = activeObjects.get(name)
|
||||
if (activeObjectOrNull == null) {
|
||||
val clazz = Class.forName(name)
|
||||
try {
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef]
|
||||
activeObjects.put(name, newInstance)
|
||||
newInstance
|
||||
|
|
@ -240,8 +279,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging {
|
|||
private def createActor(name: String, timeout: Long): Actor = {
|
||||
val actorOrNull = actors.get(name)
|
||||
if (actorOrNull == null) {
|
||||
val clazz = Class.forName(name)
|
||||
try {
|
||||
val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name)
|
||||
else Class.forName(name)
|
||||
val newInstance = clazz.newInstance.asInstanceOf[Actor]
|
||||
newInstance.timeout = timeout
|
||||
actors.put(name, newInstance)
|
||||
|
|
|
|||
|
|
@ -49,17 +49,17 @@ class DispatcherFactory {
|
|||
* Creates an event based dispatcher serving multiple (millions) of actors through a thread pool.
|
||||
* Has a fluent builder interface for configuring its semantics.
|
||||
*/
|
||||
def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher
|
||||
def newConcurrentEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher(true)
|
||||
def newEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name)
|
||||
def newConcurrentEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name, true)
|
||||
|
||||
/**
|
||||
* Creates an event based dispatcher serving multiple (millions) of actors through a single thread.
|
||||
*/
|
||||
def newEventBasedSingleThreadDispatcher = new EventBasedSingleThreadDispatcher
|
||||
def newEventBasedSingleThreadDispatcher(name: String) = new EventBasedSingleThreadDispatcher(name)
|
||||
|
||||
/**
|
||||
* Creates an thread based dispatcher serving a single actor through the same single thread.
|
||||
* E.g. each actor consumes its own thread.
|
||||
*/
|
||||
def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -10,9 +10,14 @@
|
|||
*/
|
||||
package se.scalablesolutions.akka.kernel.reactor
|
||||
|
||||
import kernel.management.Management
|
||||
|
||||
import java.util.{LinkedList, Queue, List}
|
||||
|
||||
class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
|
||||
import com.twitter.service.Stats
|
||||
|
||||
class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBase(name) {
|
||||
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessage_" + name)
|
||||
def start = if (!active) {
|
||||
active = true
|
||||
val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue)
|
||||
|
|
@ -22,12 +27,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase {
|
|||
try {
|
||||
messageDemultiplexer.select
|
||||
} catch { case e: InterruptedException => active = false }
|
||||
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations.iterator
|
||||
while (selectedInvocations.hasNext) {
|
||||
val invocation = selectedInvocations.next
|
||||
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
|
||||
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(selectedInvocations.size)
|
||||
val iter = selectedInvocations.iterator
|
||||
while (iter.hasNext) {
|
||||
val invocation = iter.next
|
||||
val invoker = messageHandlers.get(invocation.sender)
|
||||
if (invoker != null) invoker.invoke(invocation)
|
||||
selectedInvocations.remove
|
||||
iter.remove
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,12 +4,16 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.reactor
|
||||
|
||||
import kernel.management.Management
|
||||
|
||||
import java.util.concurrent._
|
||||
import locks.ReentrantLock
|
||||
import atomic.{AtomicLong, AtomicInteger}
|
||||
import ThreadPoolExecutor.CallerRunsPolicy
|
||||
import java.util.{Collection, HashSet, HashMap, LinkedList, List}
|
||||
|
||||
import com.twitter.service.Stats
|
||||
|
||||
/**
|
||||
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
|
||||
* See also this article: [http://today.java.net/cs/user/print/a/350].
|
||||
|
|
@ -56,16 +60,17 @@ import java.util.{Collection, HashSet, HashMap, LinkedList, List}
|
|||
*
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extends MessageDispatcherBase {
|
||||
def this() = this(false)
|
||||
class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: Boolean) extends MessageDispatcherBase(name) {
|
||||
def this(name: String) = this(name, false)
|
||||
|
||||
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
|
||||
private val NR_START_THREADS = 16
|
||||
private val NR_MAX_THREADS = 128
|
||||
private val KEEP_ALIVE_TIME = 60000L // default is one minute
|
||||
private var inProcessOfBuilding = false
|
||||
private var executor: ExecutorService = _
|
||||
private var threadPoolBuilder: ThreadPoolExecutor = _
|
||||
private val threadFactory = new MonitorableThreadFactory("akka")
|
||||
private val threadFactory = new MonitorableThreadFactory("akka:" + name)
|
||||
private var boundedExecutorBound = -1
|
||||
private val busyInvokers = new HashSet[AnyRef]
|
||||
|
||||
|
|
@ -89,6 +94,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
|
|||
} catch { case e: InterruptedException => active = false }
|
||||
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
|
||||
val reservedInvocations = reserve(selectedInvocations)
|
||||
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(reservedInvocations.size)
|
||||
val it = reservedInvocations.entrySet.iterator
|
||||
while (it.hasNext) {
|
||||
val entry = it.next
|
||||
|
|
@ -157,6 +163,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
|
|||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
inProcessOfBuilding = false
|
||||
blockingQueue = queue
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue)
|
||||
this
|
||||
}
|
||||
|
|
@ -169,7 +176,8 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
|
|||
def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): EventBasedThreadPoolDispatcher = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory)
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory)
|
||||
boundedExecutorBound = bound
|
||||
this
|
||||
}
|
||||
|
|
@ -177,28 +185,32 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend
|
|||
def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): EventBasedThreadPoolDispatcher = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable](capacity), threadFactory, new CallerRunsPolicy)
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable](capacity)
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
||||
this
|
||||
}
|
||||
|
||||
def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: EventBasedThreadPoolDispatcher = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory, new CallerRunsPolicy)
|
||||
blockingQueue = new LinkedBlockingQueue[Runnable]
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
||||
this
|
||||
}
|
||||
|
||||
def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy)
|
||||
blockingQueue = new SynchronousQueue[Runnable](fair)
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
||||
this
|
||||
}
|
||||
|
||||
def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): EventBasedThreadPoolDispatcher = synchronized {
|
||||
ensureNotActive
|
||||
verifyNotInConstructionPhase
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy)
|
||||
blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair)
|
||||
threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy)
|
||||
this
|
||||
}
|
||||
|
||||
|
|
@ -311,13 +323,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend
|
|||
def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables)
|
||||
def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
|
||||
def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables)
|
||||
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
/*
|
||||
def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables)
|
||||
def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit)
|
||||
def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables)
|
||||
def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
*/
|
||||
def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit)
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
|
|||
|
|
@ -4,20 +4,31 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.reactor
|
||||
|
||||
import kernel.management.Management
|
||||
|
||||
import java.util.{LinkedList, Queue, List}
|
||||
import java.util.concurrent.TimeUnit
|
||||
import java.util.concurrent.{TimeUnit, BlockingQueue}
|
||||
import java.util.HashMap
|
||||
|
||||
trait MessageDispatcherBase extends MessageDispatcher {
|
||||
import com.twitter.service.Stats
|
||||
|
||||
abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher {
|
||||
|
||||
//val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
|
||||
val MILLISECONDS = TimeUnit.MILLISECONDS
|
||||
val queue = new ReactiveMessageQueue
|
||||
|
||||
val queue = new ReactiveMessageQueue(name)
|
||||
var blockingQueue: BlockingQueue[Runnable] = _
|
||||
@volatile protected var active: Boolean = false
|
||||
protected val messageHandlers = new HashMap[AnyRef, MessageInvoker]
|
||||
protected var selectorThread: Thread = _
|
||||
protected val guard = new Object
|
||||
|
||||
if (Management.RECORD_STATS) {
|
||||
Stats.makeGauge("SizeOfBlockingQueue_" + name) {
|
||||
guard.synchronized { blockingQueue.size.toDouble }
|
||||
}
|
||||
}
|
||||
|
||||
def messageQueue = queue
|
||||
|
||||
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
|
||||
|
|
@ -40,10 +51,16 @@ trait MessageDispatcherBase extends MessageDispatcher {
|
|||
protected def doShutdown = {}
|
||||
}
|
||||
|
||||
class ReactiveMessageQueue extends MessageQueue {
|
||||
class ReactiveMessageQueue(name: String) extends MessageQueue {
|
||||
private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
|
||||
@volatile private var interrupted = false
|
||||
|
||||
if (Management.RECORD_STATS) {
|
||||
Stats.makeGauge("SizeOfReactiveQueue_" + name) {
|
||||
queue.synchronized { queue.size.toDouble }
|
||||
}
|
||||
}
|
||||
|
||||
def append(handle: MessageInvocation) = queue.synchronized {
|
||||
queue.offer(handle)
|
||||
queue.notifyAll
|
||||
|
|
@ -64,4 +81,4 @@ class ReactiveMessageQueue extends MessageQueue {
|
|||
interrupted = true
|
||||
queue.notifyAll
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -4,18 +4,24 @@
|
|||
|
||||
package se.scalablesolutions.akka.kernel.reactor
|
||||
|
||||
import com.twitter.service.Stats
|
||||
|
||||
import java.util.concurrent.LinkedBlockingQueue
|
||||
import java.util.Queue
|
||||
|
||||
import kernel.actor.{Actor, ActorMessageInvoker}
|
||||
import kernel.management.Management
|
||||
|
||||
/**
|
||||
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
|
||||
* @author <a href="http://jonasboner.com">Jonas Bonér</a>
|
||||
*/
|
||||
class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) extends MessageDispatcher {
|
||||
def this(actor: Actor) = this(new ActorMessageInvoker(actor))
|
||||
class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher {
|
||||
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor))
|
||||
|
||||
private val queue = new BlockingMessageQueue
|
||||
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
|
||||
|
||||
private val queue = new BlockingMessageQueue(name)
|
||||
private var selectorThread: Thread = _
|
||||
@volatile private var active: Boolean = false
|
||||
|
||||
|
|
@ -27,6 +33,7 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker)
|
|||
override def run = {
|
||||
while (active) {
|
||||
try {
|
||||
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr
|
||||
messageHandler.invoke(queue.take)
|
||||
} catch { case e: InterruptedException => active = false }
|
||||
}
|
||||
|
|
@ -44,7 +51,13 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker)
|
|||
def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException
|
||||
}
|
||||
|
||||
class BlockingMessageQueue extends MessageQueue {
|
||||
class BlockingMessageQueue(name: String) extends MessageQueue {
|
||||
if (Management.RECORD_STATS) {
|
||||
Stats.makeGauge("SizeOfBlockingQueue_" + name) {
|
||||
queue.size.toDouble
|
||||
}
|
||||
}
|
||||
|
||||
// FIXME: configure the LBQ
|
||||
private val queue = new LinkedBlockingQueue[MessageInvocation]
|
||||
def append(handle: MessageInvocation) = queue.put(handle)
|
||||
|
|
@ -52,4 +65,4 @@ class BlockingMessageQueue extends MessageQueue {
|
|||
def take: MessageInvocation = queue.take
|
||||
def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException
|
||||
def interrupt = throw new UnsupportedOperationException
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -55,7 +55,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
|
|||
val guardLock = new ReentrantLock
|
||||
val handleLatch = new CountDownLatch(100)
|
||||
val key = "key"
|
||||
val dispatcher = new EventBasedSingleThreadDispatcher
|
||||
val dispatcher = new EventBasedSingleThreadDispatcher("name")
|
||||
dispatcher.registerHandler(key, new TestMessageHandle(handleLatch))
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
|
|
@ -69,7 +69,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
|
|||
val handleLatch = new CountDownLatch(2)
|
||||
val key1 = "key1"
|
||||
val key2 = "key2"
|
||||
val dispatcher = new EventBasedSingleThreadDispatcher
|
||||
val dispatcher = new EventBasedSingleThreadDispatcher("name")
|
||||
dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch))
|
||||
dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch))
|
||||
dispatcher.start
|
||||
|
|
@ -83,7 +83,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase {
|
|||
val handleLatch = new CountDownLatch(200)
|
||||
val key1 = "key1"
|
||||
val key2 = "key2"
|
||||
val dispatcher = new EventBasedSingleThreadDispatcher
|
||||
val dispatcher = new EventBasedSingleThreadDispatcher("name")
|
||||
dispatcher.registerHandler(key1, new MessageInvoker {
|
||||
var currentValue = -1;
|
||||
def invoke(message: MessageInvocation) {
|
||||
|
|
|
|||
|
|
@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
|
|||
val guardLock = new ReentrantLock
|
||||
val handleLatch = new CountDownLatch(10)
|
||||
val key = "key"
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
|
||||
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
.setCorePoolSize(2)
|
||||
.setMaxPoolSize(4)
|
||||
|
|
@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
|
|||
val handlersBarrier = new CyclicBarrier(3)
|
||||
val key1 = "key1"
|
||||
val key2 = "key2"
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
|
||||
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
.setCorePoolSize(2)
|
||||
.setMaxPoolSize(4)
|
||||
|
|
@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase {
|
|||
val handleLatch = new CountDownLatch(200)
|
||||
val key1 = "key1"
|
||||
val key2 = "key2"
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher
|
||||
val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name")
|
||||
dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100)
|
||||
.setCorePoolSize(2)
|
||||
.setMaxPoolSize(4)
|
||||
|
|
|
|||
|
|
@ -26,7 +26,6 @@ class RemoteActorSpecActorBidirectional extends Actor {
|
|||
}
|
||||
|
||||
class RemoteActorSpec extends TestCase {
|
||||
|
||||
kernel.Kernel.config
|
||||
new Thread(new Runnable() {
|
||||
def run = {
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@ class ThreadBasedDispatcherTest extends TestCase {
|
|||
private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = {
|
||||
val guardLock = new ReentrantLock
|
||||
val handleLatch = new CountDownLatch(100)
|
||||
val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch))
|
||||
val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch))
|
||||
dispatcher.start
|
||||
for (i <- 0 until 100) {
|
||||
dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None))
|
||||
|
|
@ -60,7 +60,7 @@ class ThreadBasedDispatcherTest extends TestCase {
|
|||
|
||||
private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = {
|
||||
val handleLatch = new CountDownLatch(100)
|
||||
val dispatcher = new ThreadBasedDispatcher(new MessageInvoker {
|
||||
val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker {
|
||||
var currentValue = -1;
|
||||
def invoke(message: MessageInvocation) {
|
||||
if (threadingIssueDetected.get) return
|
||||
|
|
|
|||
Binary file not shown.
Binary file not shown.
0
lib/cassidy-0.1.jar
Executable file → Normal file
0
lib/cassidy-0.1.jar
Executable file → Normal file
0
lib/jersey-client-1.1.1-ea.jar
Executable file → Normal file
0
lib/jersey-client-1.1.1-ea.jar
Executable file → Normal file
0
lib/jersey-core-1.1.1-ea.jar
Executable file → Normal file
0
lib/jersey-core-1.1.1-ea.jar
Executable file → Normal file
0
lib/jersey-server-1.1.1-ea.jar
Executable file → Normal file
0
lib/jersey-server-1.1.1-ea.jar
Executable file → Normal file
BIN
lib/scala-stats-1.0.jar
Normal file
BIN
lib/scala-stats-1.0.jar
Normal file
Binary file not shown.
|
|
@ -114,4 +114,4 @@ class JsonpFilter extends BroadcastFilter[String] with Logging {
|
|||
message +
|
||||
"\" }); \n</script>\n")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue