removed monitoring, statistics and management

This commit is contained in:
jboner 2009-08-20 14:05:00 +02:00
parent 5d41b79b27
commit 07eecc548a
15 changed files with 364 additions and 834 deletions

View file

@ -18,7 +18,6 @@ import kernel.rest.AkkaCometServlet
import kernel.nio.RemoteServer
import kernel.state.CassandraStorage
import kernel.util.Logging
import kernel.management.Management
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
@ -38,7 +37,6 @@ object Kernel extends Logging {
val BOOT_CLASSES = config.getList("akka.boot")
val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true)
val RUN_MANAGEMENT_SERVICE = config.getBool("akka.management.service", true)
val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra")
val RUN_REST_SERVICE = config.getBool("akka.rest.service", true)
val REST_HOSTNAME = kernel.Kernel.config.getString("akka.rest.hostname", "localhost")
@ -62,7 +60,6 @@ object Kernel extends Logging {
runApplicationBootClasses
if (RUN_REMOTE_SERVICE) startRemoteService
if (RUN_MANAGEMENT_SERVICE) startManagementService
if (RUN_REST_SERVICE) startREST
Thread.currentThread.setContextClassLoader(getClass.getClassLoader)
@ -90,15 +87,10 @@ object Kernel extends Logging {
case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.")
}
}
val config = Configgy.config
config.registerWithJmx("com.scalablesolutions.akka")
// FIXME fix Configgy JMX subscription to allow management
// config.subscribe { c => configure(c.getOrElse(new Config)) }
config
Configgy.config
}
private[akka] def runApplicationBootClasses = {
new management.RestfulJMXBoot // add the REST/JMX service
val loader =
if (HOME.isDefined) {
val CONFIG = HOME.get + "/config"
@ -127,11 +119,6 @@ object Kernel extends Logging {
remoteServerThread.start
}
private[akka] def startManagementService = {
Management("se.scalablesolutions.akka.management")
log.info("Management service started successfully.")
}
def startREST = {
val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build()

View file

@ -16,9 +16,6 @@ import nio.protobuf.RemoteProtocol.RemoteRequest
import util.Logging
import serialization.{Serializer, Serializable, SerializationProtocol}
import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory}
import management.Management
import com.twitter.service.Stats
sealed abstract class LifecycleMessage
case class Init(config: AnyRef) extends LifecycleMessage
@ -53,7 +50,6 @@ object Actor {
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
trait Actor extends Logging with TransactionManagement {
Stats.getCounter("NrOfActors").incr
ActorRegistry.register(this)
@volatile private[this] var isRunning: Boolean = false
@ -537,8 +533,6 @@ trait Actor extends Logging with TransactionManagement {
}
private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = {
if (Management.RECORD_STATS) Stats.getCounter("NrOfFailures_" + dead.name).incr
if (trapExit) {
if (faultHandler.isDefined) {
faultHandler.get match {
@ -556,7 +550,6 @@ trait Actor extends Logging with TransactionManagement {
linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason))
private[Actor] def restart(reason: AnyRef) = synchronized {
if (Management.RECORD_STATS) Stats.getCounter("NrOfRestarts_" + name).incr
lifeCycleConfig match {
case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.")

View file

@ -1,187 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.management
import com.twitter.service.Stats
import scala.collection.jcl
import scala.collection.mutable.ArrayBuffer
import java.util.concurrent.ThreadPoolExecutor
import java.lang.management.ManagementFactory
import javax.{management => jmx}
import javax.management.remote.{JMXConnectorServerFactory, JMXServiceURL}
import kernel.Kernel.config
import kernel.util.Logging
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
object Management extends Logging {
val RECORD_STATS = config.getBool("akka.management.record-stats", true)
private var name = "se.scalablesolutions.akka"
private val mbeanServer = ManagementFactory.getPlatformMBeanServer
def apply() = {}
def apply(packageName: String) = name = packageName
java.rmi.registry.LocateRegistry.createRegistry(1099)
JMXConnectorServerFactory.newJMXConnectorServer(
new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"),
null,
mbeanServer).start
registerMBean(new StatisticsMBean, "Stats")
def registerMBean(mbean: jmx.DynamicMBean, mbeanType: String) = {
val objectName = new jmx.ObjectName(name + ":type=" + mbeanType)
try { mbeanServer.getMBeanInfo(objectName) } catch {
case e: jmx.InstanceNotFoundException =>
mbeanServer.registerMBean(mbean, objectName)
}
}
def getStats(reset: Boolean) = {
var statistics = new ArrayBuffer[Tuple2[String, String]]
statistics += (("current time", (System.currentTimeMillis / 1000).toString))
statistics += (("akka version", Kernel.VERSION))
statistics += (("uptime", Kernel.uptime.toString))
for ((key, value) <- Stats.getJvmStats) statistics += (key, value.toString)
for ((key, value) <- Stats.getCounterStats) statistics += (key, value.toString)
for ((key, value) <- Stats.getTimingStats(reset)) statistics += (key, value.toString)
for ((key, value) <- Stats.getGaugeStats(reset)) statistics += (key, value.toString)
val report = {for ((key, value) <- statistics) yield "STAT %s %s".format(key, value)}.mkString("", "\r\n", "\r\n")
log.info("=========================================\n\t--- Statistics Report ---\n%s=========================================", report)
report
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class StatisticsMBean extends jmx.DynamicMBean {
def getMBeanInfo = new jmx.MBeanInfo(
"se.scalablesolutions.akka.kernel.management.StatisticsMBean",
"runtime statistics",
getAttributeInfo,
null, null, null,
new jmx.ImmutableDescriptor("immutableInfo=false"))
def getAttribute(name: String): AnyRef = {
val segments = name.split("_", 2)
segments(0) match {
case "counter" =>
Stats.getCounterStats()(segments(1)).asInstanceOf[java.lang.Long]
case "timing" =>
val prefix = segments(1).split("_", 2)
val timing = Stats.getTimingStats(false)(prefix(1))
val x = prefix(0) match {
case "min" => timing.minimum
case "max" => timing.maximum
case "count" => timing.count
case "average" => timing.average
}
x.asInstanceOf[java.lang.Integer]
case "gauge" =>
Stats.getGaugeStats(false)(segments(1)).asInstanceOf[java.lang.Double]
}
}
def getAttributes(names: Array[String]): jmx.AttributeList = {
val rv = new jmx.AttributeList
for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
rv
}
def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = throw new UnsupportedOperationException
def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
(Stats.getCounterStats.keys.map { name =>
List(new jmx.MBeanAttributeInfo("counter_" + name, "java.lang.Long", "counter", true, false, false))
} ++ Stats.getTimingStats(false).keys.map { name =>
List("min", "max", "average", "count") map { prefix =>
new jmx.MBeanAttributeInfo("timing_" + prefix + "_" + name, "java.lang.Integer", "timing", true, false, false)
}
} ++ Stats.getGaugeStats(false).keys.map { name =>
List(new jmx.MBeanAttributeInfo("gauge_" + name, "java.lang.Long", "gauge", true, false, false))
}).toList.flatten[jmx.MBeanAttributeInfo].toArray
}
}
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
class ThreadPoolMBean(threadPool: ThreadPoolExecutor) extends jmx.DynamicMBean {
val operations: Array[jmx.MBeanOperationInfo] = Array(
new jmx.MBeanOperationInfo("purge", "",
Array(), "void", jmx.MBeanOperationInfo.ACTION),
new jmx.MBeanOperationInfo("shutdown", "",
Array(), "void", jmx.MBeanOperationInfo.ACTION),
new jmx.MBeanOperationInfo("setCorePoolSize", "",
Array(new jmx.MBeanParameterInfo("corePoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION),
new jmx.MBeanOperationInfo("setMaximumPoolSize", "",
Array(new jmx.MBeanParameterInfo("maximumPoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION),
)
def getMBeanInfo = new jmx.MBeanInfo(
"se.scalablesolutions.akka.kernel.management.ThreadPoolMBean",
"runtime management",
getAttributeInfo,
null, operations, null,
new jmx.ImmutableDescriptor("immutableInfo=false"))
def getAttribute(name: String): AnyRef = name match {
case "getActiveCount" => threadPool.getActiveCount.asInstanceOf[AnyRef]
case "getCompletedTaskCount" => threadPool.getCompletedTaskCount.asInstanceOf[AnyRef]
case "getCorePoolSize" => threadPool.getCorePoolSize.asInstanceOf[AnyRef]
case "getLargestPoolSize" => threadPool.getLargestPoolSize.asInstanceOf[AnyRef]
case "getMaximumPoolSize" => threadPool.getMaximumPoolSize.asInstanceOf[AnyRef]
case "getPoolSize" => threadPool.getPoolSize.asInstanceOf[AnyRef]
case "getTaskCount" => threadPool.getTaskCount.asInstanceOf[AnyRef]
}
private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = {
Array(
new jmx.MBeanAttributeInfo("getCorePoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getMaximumPoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getActiveCount", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getCompletedTaskCount", "java.lang.Long", "", true, false, false),
new jmx.MBeanAttributeInfo("getLargestPoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getPoolSize", "java.lang.Int", "", true, false, false),
new jmx.MBeanAttributeInfo("getTaskCount", "java.lang.Long", "", true, false, false))
}
def getAttributes(names: Array[String]): jmx.AttributeList = {
val rv = new jmx.AttributeList
for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name)))
rv
}
def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = {
try {
actionName match {
case "purge" => threadPool.purge
case "shutdown" => threadPool.shutdown
case "setCorePoolSize" =>
params match {
case Array(corePoolSize: java.lang.Integer) => threadPool.setCorePoolSize(corePoolSize.intValue)
case _ => throw new Exception("Bad signature " + params.toList.toString)
}
case "setMaximumPoolSize" =>
params match {
case Array(maximumPoolSize: java.lang.Integer) => threadPool.setMaximumPoolSize(maximumPoolSize.intValue)
case _ => throw new Exception("Bad signature " + params.toList.toString)
}
}
} catch { case e: Exception => throw new jmx.MBeanException(e) }
"Success"
}
def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException
def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException
}

View file

@ -1,96 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.management
import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor}
import se.scalablesolutions.akka.kernel.config.ScalaConfig._
import se.scalablesolutions.akka.kernel.util.Logging
import javax.ws.rs.core.MultivaluedMap
import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes}
import javax.management._
import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL}
import javax.servlet.http.{HttpServletRequest, HttpServletResponse}
import java.util.concurrent.ConcurrentHashMap
/**
* REST interface to Akka's JMX service.
* <p/>
* Here is an example that retreives the current number of Actors.
* <pre>
* http://localhost:9998/jmx
* ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
* &component=se.scalablesolutions.akka:type=Stats
* &attribute=counter_NrOfActors
* </pre>
*/
@Path("/jmx")
class RestfulJMX extends Actor with Logging {
private case class Request(service: String, component: String, attribute: String)
private val connectors = new ConcurrentHashMap[String, JMXConnector]
@GET
@Produces(Array("text/plain"))
def queryJMX(
@QueryParam("service") service: String,
@QueryParam("component") component: String,
@QueryParam("attribute") attribute: String): String=
(this !! Request(service, component, attribute)).getOrElse("Error in REST JMX management service")
override def receive: PartialFunction[Any, Unit] = {
case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute))
}
private def retrieveAttribute(service: String, component: String, attribute: String): String = {
try {
var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service)))
connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString
} catch {
case e: Exception =>
if (connectors.contains(service)) connectors.remove(service)
throw e
}
}
}
/**
* REST interface to Akka's statistics recorder.
* <p/>
* Here is an example that retreives a statistics report.
* <pre>
* http://localhost:9998/stats?reset=true
* </pre>
*/
@Path("/stats")
class StatisticsReporter extends Actor with Logging {
private case class Stats(reset: Boolean)
@GET
@Produces(Array("text/html"))
def stats(@QueryParam("reset") reset: String): scala.xml.Elem =
(this !! Stats(java.lang.Boolean.valueOf(reset).booleanValue)).getOrElse(<h3>Error in REST JMX management service</h3>)
override def receive: PartialFunction[Any, Unit] = {
case Stats(reset) => reply(<pre>{Management.getStats(reset)}</pre>)
}
}
class RestfulJMXBoot extends Logging {
log.info("Booting Restful JMX servivce")
object factory extends SupervisorFactory {
override def getSupervisorConfig: SupervisorConfig = {
SupervisorConfig(
RestartStrategy(OneForOne, 3, 100),
Supervise(
new RestfulJMX,
LifeCycle(Permanent, 100)) ::
Supervise(
new StatisticsReporter,
LifeCycle(Permanent, 100)) ::
Nil)
}
}
factory.newSupervisor.startSupervisor
}

View file

@ -1,171 +0,0 @@
/**
* Copyright (C) 2009 Scalable Solutions.
*/
package se.scalablesolutions.akka.kernel.management
import javax.management._
import java.lang.management._
/*
object ScalaJMX {
val mbeanServer = ManagementFactory.getPlatformMBeanServer
def register(t: AnyRef, i: Class, name: ObjectName) = mbeanServer.registerMBean(new StandardMBean(t, i), name)
def registerBean(bean: DynamicMBean, name: ObjectName): ObjectInstance = mbeanServer.registerMBean(bean, name)
def register(t: AnyRef, name: String): ObjectInstance = register(t, beanClass(t), name)
def info(name: ObjectName): SBean = mbeanServer.getMBeanInfo(name)
def bean(name: ObjectName): SBeanInfo = convBeanInfo(name, mbeanServer.getMBeanInfo(name))
def invoke(name: ObjectName, operationName: String, params: Array[Object], signature: Array[String]): Object =
mbeanServer.invoke(name, operationName, params, signature)
def call(name: ObjectName, operationName: String): Object = invoke(name, operationName, Array[Object](), Array[String]())
def get(name: ObjectName, attribute: String) = mbeanServer.getAttribute(name, attribute)
def set(name: ObjectName, attribute: String, value: Object) = mbeanServer.setAttribute(name, new Attribute(attribute, value))
implicit def instanceToName(oi: ObjectInstance) = oi.getObjectName()
implicit def stringToName(name: String) = ObjectName.getInstance(name)
implicit def convBean(bi: MBeanInfo):SBean = SBean(bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
implicit def seqToArr(seq: Seq[AnyRef]): Array[Object] = seq.toArray
def convBeanInfo(name: ObjectName, bi: MBeanInfo):SBeanInfo = new SBeanInfo(name, bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors())
implicit def convAttrs(attrs: Array[MBeanAttributeInfo]): Seq[SAttr] =
for (val a <- attrs) yield a
implicit def convParams(params: Array[MBeanParameterInfo]): Seq[SParameter] =
for (val p <- params) yield p
implicit def convNotes(notes: Array[MBeanNotificationInfo]): Seq[SNotification] =
for (val p <- notes) yield p
implicit def convCons(cons: Array[MBeanConstructorInfo]): Seq[SConstructor] =
for (val p <- cons) yield p
implicit def convOps(cons: Array[MBeanOperationInfo]): Seq[SOperation] =
for (val p <- cons) yield p
implicit def convAttr(attr: MBeanAttributeInfo) = SAttr(attr.getName(), attr.getDescription(), attr.getType(), attr.isIs(), attr.isReadable(), attr.isWritable())
implicit def convNote(note: MBeanNotificationInfo) = SNotification(note.getName(), note.getDescription(), note.getNotifTypes())
implicit def convOp(op: MBeanOperationInfo):SOperation = SOperation(op.getName(), op.getDescription(), op.getImpact(), op.getReturnType(), op.getSignature())
implicit def convCon(con: MBeanConstructorInfo):SConstructor = SConstructor(con getName, con getDescription, con getSignature)
implicit def convParam(p: MBeanParameterInfo) = SParameter(p getName, p getDescription, p getType)
private def beanClass(t: AnyRef) = Class.forName(t.getClass().getName() + "MBean")
}
class MBean(mbeanInterface: String) extends StandardMBean(Class.forName(mbeanInterface))
abstract class SFeature(val name: String, val description: String)
case class SBean(className: String, description: String,
attrs: Seq[SAttr], notes: Seq[SNotification],
ops: Seq[SOperation], cons: Seq[SConstructor]) {
def writable = attrs.toList.filter(sa => sa.writable)
}
class SBeanInfo(name: ObjectName, className: String, description: String,
attrs: Seq[SAttr], notes: Seq[SNotification],
ops: Seq[SOperation], cons: Seq[SConstructor])
extends SBean(className, description, attrs, notes, ops, cons) {
def get(attribute: String) = SJMX.get(name, attribute)
def set(attribute: String, value: Object) = SJMX.set(name, attribute, value)
def call(opName: String) = SJMX.call(name, opName)
}
case class SAttr(
override val name: String,
override val description: String,
jmxType: String, isIs: boolean, readable: boolean, writable: boolean
) extends SFeature(name, description)
case class SNotification(
override val name: String,
override val description: String,
notifTypes: Array[String]) extends SFeature(name, description)
case class SOperation(
override val name: String,
override val description: String,
impact: int,
returnType: String,
signature: Seq[SParameter]) extends SFeature(name, description)
case class SParameter(
override val name: String,
override val description: String,
jmxType: String) extends SFeature(name, description)
case class SConstructor(
override val name: String,
override val description: String,
signature: Seq[SParameter]) extends SFeature(name, description)
*/
/*
package com.soletta.spipe;
import javax.management.{StandardMBean,ObjectName,MBeanInfo};
class SPipe extends MBean("com.soletta.spipe.SPipeMBean") with SPipeMBean {
import Console.println;
import SJMX._;
private var desc: String = "Yipe!";
def go = {
val oname: ObjectName = "default:name=SPipe";
val instance = SJMX.registerBean(this, oname);
set(oname, "Factor", "Hello!");
println(get(oname, "Factor"));
val SBean(n, d, Seq(_, a2, a3, _*), _, ops, _) = info(oname);
println("Bean name is " + n + ", description is " + d);
println("Second attribute is " + a2);
println("Third attribute is " + a3);
println("Writable attributes are " + info(oname).writable);
println("Ops: " + ops);
val x =
<bean name={n} description={d}>
{ops.toList.map(o => <operation name={o.name} description={o.description}/>)}
</bean> ;
println(x);
val inf = bean(oname);
inf.call("start");
println(inf.get("Factor"));
}
def getName = "SPipe!";
def setDescription(d: String) = desc = d;
override def getDescription() = desc;
def getFactor = desc;
def setFactor(s: String) = desc = s;
def isHappy = true;
override def getDescription(info: MBeanInfo) = desc;
}
object PipeMain {
def main(args: Array[String]): unit = {
(new SPipe) go;
}
}
trait SPipeMBean {
def getName: String;
def getDescription: String = getName;
def setDescription(d: String): unit;
def getFactor: String;
def setFactor(s: String): unit;
def isHappy: boolean;
def start() = { Console.println("Starting"); }
def stop() = { }
*/

View file

@ -12,7 +12,6 @@ import kernel.actor.{Exit, Actor}
import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult}
import serialization.{Serializer, Serializable, SerializationProtocol}
import kernel.util.Logging
import kernel.management.Management
import org.jboss.netty.bootstrap.ClientBootstrap
import org.jboss.netty.channel._
@ -22,8 +21,6 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import scala.collection.mutable.HashMap
import com.twitter.service.Stats
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -48,8 +45,6 @@ object RemoteClient extends Logging {
*/
class RemoteClient(hostname: String, port: Int) extends Logging {
val name = "RemoteClient@" + hostname
val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
@volatile private var isRunning = false
private val futures = new ConcurrentHashMap[Long, CompletableFutureResult]
@ -91,10 +86,6 @@ class RemoteClient(hostname: String, port: Int) extends Logging {
}
def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) {
if (Management.RECORD_STATS) {
NR_OF_BYTES_SENT.incr(request.getSerializedSize)
NR_OF_MESSAGES_SENT.incr
}
if (request.getIsOneWay) {
connection.getChannel.write(request)
None
@ -145,9 +136,6 @@ class RemoteClientHandler(val name: String,
val supervisors: ConcurrentMap[String, Actor])
extends SimpleChannelUpstreamHandler with Logging {
val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = {
if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) {
log.debug(event.toString)
@ -160,10 +148,6 @@ class RemoteClientHandler(val name: String,
val result = event.getMessage
if (result.isInstanceOf[RemoteReply]) {
val reply = result.asInstanceOf[RemoteReply]
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_RECEIVED.incr
NR_OF_BYTES_RECEIVED.incr(reply.getSerializedSize)
}
log.debug("Received RemoteReply[\n%s]", reply.toString)
val future = futures.get(reply.getId)
if (reply.getIsSuccessful) {

View file

@ -13,7 +13,6 @@ import kernel.util._
import protobuf.RemoteProtocol
import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest}
import serialization.{Serializer, Serializable, SerializationProtocol}
import kernel.management.Management
import org.jboss.netty.bootstrap.ServerBootstrap
import org.jboss.netty.channel._
@ -21,8 +20,6 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory
import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender}
import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder}
import com.twitter.service.Stats
/**
* @author <a href="http://jonasboner.com">Jonas Bon&#233;r</a>
*/
@ -86,11 +83,6 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) ext
*/
@ChannelPipelineCoverage { val value = "all" }
class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging {
val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name)
val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name)
val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name)
val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name)
private val activeObjectFactory = new ActiveObjectFactory
private val activeObjects = new ConcurrentHashMap[String, AnyRef]
private val actors = new ConcurrentHashMap[String, Actor]
@ -115,10 +107,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
}
private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = {
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_RECEIVED.incr
NR_OF_BYTES_RECEIVED.incr(request.getSerializedSize)
}
log.debug("Received RemoteRequest[\n%s]", request.toString)
if (request.getIsActor) dispatchToActor(request, channel)
else dispatchToActiveObject(request, channel)
@ -143,10 +131,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
} catch {
case e: Throwable =>
log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e)
@ -159,10 +143,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
}
}
}
@ -190,10 +170,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
}
} catch {
case e: InvocationTargetException =>
@ -207,10 +183,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
case e: Throwable =>
log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e)
e.printStackTrace
@ -222,10 +194,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL
if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid)
val replyMessage = replyBuilder.build
channel.write(replyMessage)
if (Management.RECORD_STATS) {
NR_OF_MESSAGES_SENT.incr
NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize)
}
}
}

View file

@ -10,14 +10,9 @@
*/
package se.scalablesolutions.akka.kernel.reactor
import kernel.management.Management
import java.util.{LinkedList, Queue, List}
import com.twitter.service.Stats
import java.util.{LinkedList, List}
class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBase(name) {
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessage_" + name)
def start = if (!active) {
active = true
val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue)
@ -28,7 +23,6 @@ class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBa
messageDemultiplexer.select
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(selectedInvocations.size)
val iter = selectedInvocations.iterator
while (iter.hasNext) {
val invocation = iter.next

View file

@ -4,16 +4,12 @@
package se.scalablesolutions.akka.kernel.reactor
import kernel.management.{Management, ThreadPoolMBean}
import java.util.concurrent._
import locks.ReentrantLock
import atomic.{AtomicLong, AtomicInteger}
import ThreadPoolExecutor.CallerRunsPolicy
import java.util.{Collection, HashSet, HashMap, LinkedList, List}
import com.twitter.service.Stats
/**
* Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].<br/>
* See also this article: [http://today.java.net/cs/user/print/a/350].
@ -63,7 +59,6 @@ import com.twitter.service.Stats
class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: Boolean) extends MessageDispatcherBase(name) {
def this(name: String) = this(name, false)
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
private val NR_START_THREADS = 16
private val NR_MAX_THREADS = 128
private val KEEP_ALIVE_TIME = 60000L // default is one minute
@ -79,7 +74,6 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B
def start = if (!active) {
active = true
Management.registerMBean(new ThreadPoolMBean(threadPoolBuilder), "ThreadPool_" + name)
/**
* This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/].
@ -95,7 +89,6 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B
} catch { case e: InterruptedException => active = false }
val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations
val reservedInvocations = reserve(selectedInvocations)
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(reservedInvocations.size)
val it = reservedInvocations.entrySet.iterator
while (it.hasNext) {
val entry = it.next

View file

@ -4,14 +4,10 @@
package se.scalablesolutions.akka.kernel.reactor
import kernel.management.Management
import java.util.{LinkedList, Queue, List}
import java.util.concurrent.{TimeUnit, BlockingQueue}
import java.util.HashMap
import com.twitter.service.Stats
abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher {
//val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false)
@ -23,12 +19,6 @@ abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher
protected var selectorThread: Thread = _
protected val guard = new Object
if (Management.RECORD_STATS) {
Stats.makeGauge("SizeOfBlockingQueue_" + name) {
guard.synchronized { blockingQueue.size.toDouble }
}
}
def messageQueue = queue
def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized {
@ -55,12 +45,6 @@ class ReactiveMessageQueue(name: String) extends MessageQueue {
private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation]
@volatile private var interrupted = false
if (Management.RECORD_STATS) {
Stats.makeGauge("SizeOfReactiveQueue_" + name) {
queue.synchronized { queue.size.toDouble }
}
}
def append(handle: MessageInvocation) = queue.synchronized {
queue.offer(handle)
queue.notifyAll

View file

@ -4,13 +4,10 @@
package se.scalablesolutions.akka.kernel.reactor
import com.twitter.service.Stats
import java.util.concurrent.LinkedBlockingQueue
import java.util.Queue
import kernel.actor.{Actor, ActorMessageInvoker}
import kernel.management.Management
/**
* Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue.
@ -19,8 +16,6 @@ import kernel.management.Management
class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher {
def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor))
val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name)
private val queue = new BlockingMessageQueue(name)
private var selectorThread: Thread = _
@volatile private var active: Boolean = false
@ -33,7 +28,6 @@ class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandle
override def run = {
while (active) {
try {
if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr
messageHandler.invoke(queue.take)
} catch { case e: InterruptedException => active = false }
}
@ -52,12 +46,6 @@ class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandle
}
class BlockingMessageQueue(name: String) extends MessageQueue {
if (Management.RECORD_STATS) {
Stats.makeGauge("SizeOfBlockingQueue_" + name) {
queue.size.toDouble
}
}
// FIXME: configure the LBQ
private val queue = new LinkedBlockingQueue[MessageInvocation]
def append(handle: MessageInvocation) = queue.put(handle)