diff --git a/akka.ipr b/akka.ipr index 226c8db6c4..e9a88e3c2a 100644 --- a/akka.ipr +++ b/akka.ipr @@ -1869,17 +1869,6 @@ - - - - - - - - - - - diff --git a/akka.iws b/akka.iws index bdf018e8e4..1793f4a646 100644 --- a/akka.iws +++ b/akka.iws @@ -6,13 +6,21 @@ - - - - + + + + + + + + + + + + @@ -94,15 +102,89 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + - + + + + + + + + + + + + @@ -116,22 +198,22 @@ @@ -186,6 +268,58 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -318,7 +456,67 @@ - + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + @@ -384,84 +582,6 @@ - @@ -507,7 +645,7 @@ - + @@ -535,7 +673,7 @@ - + - - - - - - - - + + + + + + + + + + + - - + + + @@ -1584,7 +1733,7 @@ - + @@ -1592,27 +1741,28 @@ - - - - - - - - - - - - - - - - - - - - - + + + + + + + + + + + + + + + + + + + + + + localhost @@ -1689,20 +1839,19 @@ - + + - - + - + - - + @@ -1713,8 +1862,9 @@ - + + @@ -1756,142 +1906,111 @@ - + - + - - - - - - - - + - - - - - - - - - - - - - - - - - - - - - + - + - + - - - - - - - - - - + - + - + - + - + + + + + + + + + + - + - + - + - + - + - + + + + + + + + + + + + + + + - - - - - - - - - - - - + - + - - - - - - - - + - + diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index a77daaa63b..5c05d16193 100644 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -18,7 +18,6 @@ import kernel.rest.AkkaCometServlet import kernel.nio.RemoteServer import kernel.state.CassandraStorage import kernel.util.Logging -import kernel.management.Management /** * @author Jonas Bonér @@ -38,7 +37,6 @@ object Kernel extends Logging { val BOOT_CLASSES = config.getList("akka.boot") val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true) - val RUN_MANAGEMENT_SERVICE = config.getBool("akka.management.service", true) val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra") val RUN_REST_SERVICE = config.getBool("akka.rest.service", true) val REST_HOSTNAME = kernel.Kernel.config.getString("akka.rest.hostname", "localhost") @@ -62,7 +60,6 @@ object Kernel extends Logging { runApplicationBootClasses if (RUN_REMOTE_SERVICE) startRemoteService - if (RUN_MANAGEMENT_SERVICE) startManagementService if (RUN_REST_SERVICE) startREST Thread.currentThread.setContextClassLoader(getClass.getClassLoader) @@ -90,15 +87,10 @@ object Kernel extends Logging { case e: ParseException => throw new IllegalStateException("'$AKKA_HOME/config/akka.conf' could not be found and no 'akka.conf' can be found on the classpath - aborting. . Either add it in the '$AKKA_HOME/config' directory or add it to the classpath.") } } - val config = Configgy.config - config.registerWithJmx("com.scalablesolutions.akka") - // FIXME fix Configgy JMX subscription to allow management - // config.subscribe { c => configure(c.getOrElse(new Config)) } - config + Configgy.config } private[akka] def runApplicationBootClasses = { - new management.RestfulJMXBoot // add the REST/JMX service val loader = if (HOME.isDefined) { val CONFIG = HOME.get + "/config" @@ -127,11 +119,6 @@ object Kernel extends Logging { remoteServerThread.start } - private[akka] def startManagementService = { - Management("se.scalablesolutions.akka.management") - log.info("Management service started successfully.") - } - def startREST = { val uri = UriBuilder.fromUri(REST_URL).port(REST_PORT).build() diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala index 5e21048fd4..a6dd905da2 100644 --- a/kernel/src/main/scala/actor/Actor.scala +++ b/kernel/src/main/scala/actor/Actor.scala @@ -16,9 +16,6 @@ import nio.protobuf.RemoteProtocol.RemoteRequest import util.Logging import serialization.{Serializer, Serializable, SerializationProtocol} import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} -import management.Management - -import com.twitter.service.Stats sealed abstract class LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage @@ -53,7 +50,6 @@ object Actor { * @author Jonas Bonér */ trait Actor extends Logging with TransactionManagement { - Stats.getCounter("NrOfActors").incr ActorRegistry.register(this) @volatile private[this] var isRunning: Boolean = false @@ -537,8 +533,6 @@ trait Actor extends Logging with TransactionManagement { } private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { - if (Management.RECORD_STATS) Stats.getCounter("NrOfFailures_" + dead.name).incr - if (trapExit) { if (faultHandler.isDefined) { faultHandler.get match { @@ -556,7 +550,6 @@ trait Actor extends Logging with TransactionManagement { linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason)) private[Actor] def restart(reason: AnyRef) = synchronized { - if (Management.RECORD_STATS) Stats.getCounter("NrOfRestarts_" + name).incr lifeCycleConfig match { case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.") diff --git a/kernel/src/main/scala/management/JMX.scala b/kernel/src/main/scala/management/JMX.scala deleted file mode 100755 index bdeea75324..0000000000 --- a/kernel/src/main/scala/management/JMX.scala +++ /dev/null @@ -1,187 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.kernel.management - -import com.twitter.service.Stats - -import scala.collection.jcl -import scala.collection.mutable.ArrayBuffer - -import java.util.concurrent.ThreadPoolExecutor -import java.lang.management.ManagementFactory -import javax.{management => jmx} -import javax.management.remote.{JMXConnectorServerFactory, JMXServiceURL} - -import kernel.Kernel.config -import kernel.util.Logging - -/** - * @author Jonas Bonér - */ -object Management extends Logging { - val RECORD_STATS = config.getBool("akka.management.record-stats", true) - private var name = "se.scalablesolutions.akka" - private val mbeanServer = ManagementFactory.getPlatformMBeanServer - - def apply() = {} - def apply(packageName: String) = name = packageName - - java.rmi.registry.LocateRegistry.createRegistry(1099) - JMXConnectorServerFactory.newJMXConnectorServer( - new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"), - null, - mbeanServer).start - - registerMBean(new StatisticsMBean, "Stats") - - def registerMBean(mbean: jmx.DynamicMBean, mbeanType: String) = { - val objectName = new jmx.ObjectName(name + ":type=" + mbeanType) - try { mbeanServer.getMBeanInfo(objectName) } catch { - case e: jmx.InstanceNotFoundException => - mbeanServer.registerMBean(mbean, objectName) - } - } - - def getStats(reset: Boolean) = { - var statistics = new ArrayBuffer[Tuple2[String, String]] - statistics += (("current time", (System.currentTimeMillis / 1000).toString)) - statistics += (("akka version", Kernel.VERSION)) - statistics += (("uptime", Kernel.uptime.toString)) - for ((key, value) <- Stats.getJvmStats) statistics += (key, value.toString) - for ((key, value) <- Stats.getCounterStats) statistics += (key, value.toString) - for ((key, value) <- Stats.getTimingStats(reset)) statistics += (key, value.toString) - for ((key, value) <- Stats.getGaugeStats(reset)) statistics += (key, value.toString) - val report = {for ((key, value) <- statistics) yield "STAT %s %s".format(key, value)}.mkString("", "\r\n", "\r\n") - log.info("=========================================\n\t--- Statistics Report ---\n%s=========================================", report) - report - } -} - -/** - * @author Jonas Bonér - */ -class StatisticsMBean extends jmx.DynamicMBean { - def getMBeanInfo = new jmx.MBeanInfo( - "se.scalablesolutions.akka.kernel.management.StatisticsMBean", - "runtime statistics", - getAttributeInfo, - null, null, null, - new jmx.ImmutableDescriptor("immutableInfo=false")) - - def getAttribute(name: String): AnyRef = { - val segments = name.split("_", 2) - segments(0) match { - case "counter" => - Stats.getCounterStats()(segments(1)).asInstanceOf[java.lang.Long] - case "timing" => - val prefix = segments(1).split("_", 2) - val timing = Stats.getTimingStats(false)(prefix(1)) - val x = prefix(0) match { - case "min" => timing.minimum - case "max" => timing.maximum - case "count" => timing.count - case "average" => timing.average - } - x.asInstanceOf[java.lang.Integer] - case "gauge" => - Stats.getGaugeStats(false)(segments(1)).asInstanceOf[java.lang.Double] - } - } - - def getAttributes(names: Array[String]): jmx.AttributeList = { - val rv = new jmx.AttributeList - for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name))) - rv - } - - def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = throw new UnsupportedOperationException - def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException - def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException - - private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = { - (Stats.getCounterStats.keys.map { name => - List(new jmx.MBeanAttributeInfo("counter_" + name, "java.lang.Long", "counter", true, false, false)) - } ++ Stats.getTimingStats(false).keys.map { name => - List("min", "max", "average", "count") map { prefix => - new jmx.MBeanAttributeInfo("timing_" + prefix + "_" + name, "java.lang.Integer", "timing", true, false, false) - } - } ++ Stats.getGaugeStats(false).keys.map { name => - List(new jmx.MBeanAttributeInfo("gauge_" + name, "java.lang.Long", "gauge", true, false, false)) - }).toList.flatten[jmx.MBeanAttributeInfo].toArray - } -} - -/** - * @author Jonas Bonér - */ -class ThreadPoolMBean(threadPool: ThreadPoolExecutor) extends jmx.DynamicMBean { - val operations: Array[jmx.MBeanOperationInfo] = Array( - new jmx.MBeanOperationInfo("purge", "", - Array(), "void", jmx.MBeanOperationInfo.ACTION), - new jmx.MBeanOperationInfo("shutdown", "", - Array(), "void", jmx.MBeanOperationInfo.ACTION), - new jmx.MBeanOperationInfo("setCorePoolSize", "", - Array(new jmx.MBeanParameterInfo("corePoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION), - new jmx.MBeanOperationInfo("setMaximumPoolSize", "", - Array(new jmx.MBeanParameterInfo("maximumPoolSize", "java.lang.Integer", "")), "void", jmx.MBeanOperationInfo.ACTION), - ) - - def getMBeanInfo = new jmx.MBeanInfo( - "se.scalablesolutions.akka.kernel.management.ThreadPoolMBean", - "runtime management", - getAttributeInfo, - null, operations, null, - new jmx.ImmutableDescriptor("immutableInfo=false")) - - def getAttribute(name: String): AnyRef = name match { - case "getActiveCount" => threadPool.getActiveCount.asInstanceOf[AnyRef] - case "getCompletedTaskCount" => threadPool.getCompletedTaskCount.asInstanceOf[AnyRef] - case "getCorePoolSize" => threadPool.getCorePoolSize.asInstanceOf[AnyRef] - case "getLargestPoolSize" => threadPool.getLargestPoolSize.asInstanceOf[AnyRef] - case "getMaximumPoolSize" => threadPool.getMaximumPoolSize.asInstanceOf[AnyRef] - case "getPoolSize" => threadPool.getPoolSize.asInstanceOf[AnyRef] - case "getTaskCount" => threadPool.getTaskCount.asInstanceOf[AnyRef] - } - - private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = { - Array( - new jmx.MBeanAttributeInfo("getCorePoolSize", "java.lang.Int", "", true, false, false), - new jmx.MBeanAttributeInfo("getMaximumPoolSize", "java.lang.Int", "", true, false, false), - new jmx.MBeanAttributeInfo("getActiveCount", "java.lang.Int", "", true, false, false), - new jmx.MBeanAttributeInfo("getCompletedTaskCount", "java.lang.Long", "", true, false, false), - new jmx.MBeanAttributeInfo("getLargestPoolSize", "java.lang.Int", "", true, false, false), - new jmx.MBeanAttributeInfo("getPoolSize", "java.lang.Int", "", true, false, false), - new jmx.MBeanAttributeInfo("getTaskCount", "java.lang.Long", "", true, false, false)) - } - - def getAttributes(names: Array[String]): jmx.AttributeList = { - val rv = new jmx.AttributeList - for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name))) - rv - } - - def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = { - try { - actionName match { - case "purge" => threadPool.purge - case "shutdown" => threadPool.shutdown - case "setCorePoolSize" => - params match { - case Array(corePoolSize: java.lang.Integer) => threadPool.setCorePoolSize(corePoolSize.intValue) - case _ => throw new Exception("Bad signature " + params.toList.toString) - } - case "setMaximumPoolSize" => - params match { - case Array(maximumPoolSize: java.lang.Integer) => threadPool.setMaximumPoolSize(maximumPoolSize.intValue) - case _ => throw new Exception("Bad signature " + params.toList.toString) - } - } - } catch { case e: Exception => throw new jmx.MBeanException(e) } - "Success" - } - - def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException - def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException -} diff --git a/kernel/src/main/scala/management/RestfulJMX.scala b/kernel/src/main/scala/management/RestfulJMX.scala deleted file mode 100755 index 05a5aac2a5..0000000000 --- a/kernel/src/main/scala/management/RestfulJMX.scala +++ /dev/null @@ -1,96 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.kernel.management - -import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor} -import se.scalablesolutions.akka.kernel.config.ScalaConfig._ -import se.scalablesolutions.akka.kernel.util.Logging - -import javax.ws.rs.core.MultivaluedMap -import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes} -import javax.management._ -import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL} -import javax.servlet.http.{HttpServletRequest, HttpServletResponse} -import java.util.concurrent.ConcurrentHashMap - -/** - * REST interface to Akka's JMX service. - *

- * Here is an example that retreives the current number of Actors. - *

- * http://localhost:9998/jmx
- *   ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
- *   &component=se.scalablesolutions.akka:type=Stats
- *   &attribute=counter_NrOfActors
- * 
- */ -@Path("/jmx") -class RestfulJMX extends Actor with Logging { - private case class Request(service: String, component: String, attribute: String) - - private val connectors = new ConcurrentHashMap[String, JMXConnector] - - @GET - @Produces(Array("text/plain")) - def queryJMX( - @QueryParam("service") service: String, - @QueryParam("component") component: String, - @QueryParam("attribute") attribute: String): String= - (this !! Request(service, component, attribute)).getOrElse("Error in REST JMX management service") - - override def receive: PartialFunction[Any, Unit] = { - case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute)) - } - - private def retrieveAttribute(service: String, component: String, attribute: String): String = { - try { - var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service))) - connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString - } catch { - case e: Exception => - if (connectors.contains(service)) connectors.remove(service) - throw e - } - } -} - -/** - * REST interface to Akka's statistics recorder. - *

- * Here is an example that retreives a statistics report. - *

- * http://localhost:9998/stats?reset=true
- * 
- */ -@Path("/stats") -class StatisticsReporter extends Actor with Logging { - private case class Stats(reset: Boolean) - @GET - @Produces(Array("text/html")) - def stats(@QueryParam("reset") reset: String): scala.xml.Elem = - (this !! Stats(java.lang.Boolean.valueOf(reset).booleanValue)).getOrElse(

Error in REST JMX management service

) - - override def receive: PartialFunction[Any, Unit] = { - case Stats(reset) => reply(
{Management.getStats(reset)}
) - } -} - -class RestfulJMXBoot extends Logging { - log.info("Booting Restful JMX servivce") - object factory extends SupervisorFactory { - override def getSupervisorConfig: SupervisorConfig = { - SupervisorConfig( - RestartStrategy(OneForOne, 3, 100), - Supervise( - new RestfulJMX, - LifeCycle(Permanent, 100)) :: - Supervise( - new StatisticsReporter, - LifeCycle(Permanent, 100)) :: - Nil) - } - } - factory.newSupervisor.startSupervisor -} diff --git a/kernel/src/main/scala/management/ScalaJMX.scala b/kernel/src/main/scala/management/ScalaJMX.scala deleted file mode 100755 index b4a7800f88..0000000000 --- a/kernel/src/main/scala/management/ScalaJMX.scala +++ /dev/null @@ -1,171 +0,0 @@ -/** - * Copyright (C) 2009 Scalable Solutions. - */ - -package se.scalablesolutions.akka.kernel.management - -import javax.management._ -import java.lang.management._ - -/* -object ScalaJMX { - - val mbeanServer = ManagementFactory.getPlatformMBeanServer - - def register(t: AnyRef, i: Class, name: ObjectName) = mbeanServer.registerMBean(new StandardMBean(t, i), name) - def registerBean(bean: DynamicMBean, name: ObjectName): ObjectInstance = mbeanServer.registerMBean(bean, name) - def register(t: AnyRef, name: String): ObjectInstance = register(t, beanClass(t), name) - - def info(name: ObjectName): SBean = mbeanServer.getMBeanInfo(name) - def bean(name: ObjectName): SBeanInfo = convBeanInfo(name, mbeanServer.getMBeanInfo(name)) - def invoke(name: ObjectName, operationName: String, params: Array[Object], signature: Array[String]): Object = - mbeanServer.invoke(name, operationName, params, signature) - def call(name: ObjectName, operationName: String): Object = invoke(name, operationName, Array[Object](), Array[String]()) - - def get(name: ObjectName, attribute: String) = mbeanServer.getAttribute(name, attribute) - def set(name: ObjectName, attribute: String, value: Object) = mbeanServer.setAttribute(name, new Attribute(attribute, value)) - - implicit def instanceToName(oi: ObjectInstance) = oi.getObjectName() - implicit def stringToName(name: String) = ObjectName.getInstance(name) - implicit def convBean(bi: MBeanInfo):SBean = SBean(bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors()) - implicit def seqToArr(seq: Seq[AnyRef]): Array[Object] = seq.toArray - - def convBeanInfo(name: ObjectName, bi: MBeanInfo):SBeanInfo = new SBeanInfo(name, bi.getClassName(), bi.getDescription(), bi.getAttributes(), bi.getNotifications(), bi.getOperations(), bi.getConstructors()) - - implicit def convAttrs(attrs: Array[MBeanAttributeInfo]): Seq[SAttr] = - for (val a <- attrs) yield a - implicit def convParams(params: Array[MBeanParameterInfo]): Seq[SParameter] = - for (val p <- params) yield p - implicit def convNotes(notes: Array[MBeanNotificationInfo]): Seq[SNotification] = - for (val p <- notes) yield p - implicit def convCons(cons: Array[MBeanConstructorInfo]): Seq[SConstructor] = - for (val p <- cons) yield p - implicit def convOps(cons: Array[MBeanOperationInfo]): Seq[SOperation] = - for (val p <- cons) yield p - - implicit def convAttr(attr: MBeanAttributeInfo) = SAttr(attr.getName(), attr.getDescription(), attr.getType(), attr.isIs(), attr.isReadable(), attr.isWritable()) - implicit def convNote(note: MBeanNotificationInfo) = SNotification(note.getName(), note.getDescription(), note.getNotifTypes()) - implicit def convOp(op: MBeanOperationInfo):SOperation = SOperation(op.getName(), op.getDescription(), op.getImpact(), op.getReturnType(), op.getSignature()) - implicit def convCon(con: MBeanConstructorInfo):SConstructor = SConstructor(con getName, con getDescription, con getSignature) - implicit def convParam(p: MBeanParameterInfo) = SParameter(p getName, p getDescription, p getType) - - private def beanClass(t: AnyRef) = Class.forName(t.getClass().getName() + "MBean") -} - -class MBean(mbeanInterface: String) extends StandardMBean(Class.forName(mbeanInterface)) - -abstract class SFeature(val name: String, val description: String) - -case class SBean(className: String, description: String, - attrs: Seq[SAttr], notes: Seq[SNotification], - ops: Seq[SOperation], cons: Seq[SConstructor]) { - def writable = attrs.toList.filter(sa => sa.writable) -} - -class SBeanInfo(name: ObjectName, className: String, description: String, - attrs: Seq[SAttr], notes: Seq[SNotification], - ops: Seq[SOperation], cons: Seq[SConstructor]) -extends SBean(className, description, attrs, notes, ops, cons) { - - def get(attribute: String) = SJMX.get(name, attribute) - def set(attribute: String, value: Object) = SJMX.set(name, attribute, value) - def call(opName: String) = SJMX.call(name, opName) -} - -case class SAttr( - override val name: String, - override val description: String, - jmxType: String, isIs: boolean, readable: boolean, writable: boolean -) extends SFeature(name, description) - -case class SNotification( - override val name: String, - override val description: String, - notifTypes: Array[String]) extends SFeature(name, description) - -case class SOperation( - override val name: String, - override val description: String, - impact: int, - returnType: String, - signature: Seq[SParameter]) extends SFeature(name, description) - -case class SParameter( - override val name: String, - override val description: String, - jmxType: String) extends SFeature(name, description) - -case class SConstructor( - override val name: String, - override val description: String, - signature: Seq[SParameter]) extends SFeature(name, description) - -*/ - -/* -package com.soletta.spipe; - -import javax.management.{StandardMBean,ObjectName,MBeanInfo}; - -class SPipe extends MBean("com.soletta.spipe.SPipeMBean") with SPipeMBean { - - import Console.println; - import SJMX._; - - private var desc: String = "Yipe!"; - - def go = { - val oname: ObjectName = "default:name=SPipe"; - val instance = SJMX.registerBean(this, oname); - - set(oname, "Factor", "Hello!"); - println(get(oname, "Factor")); - - val SBean(n, d, Seq(_, a2, a3, _*), _, ops, _) = info(oname); - println("Bean name is " + n + ", description is " + d); - println("Second attribute is " + a2); - println("Third attribute is " + a3); - println("Writable attributes are " + info(oname).writable); - println("Ops: " + ops); - - val x = - - {ops.toList.map(o => )} - ; - - println(x); - - val inf = bean(oname); - inf.call("start"); - println(inf.get("Factor")); - - } - - def getName = "SPipe!"; - def setDescription(d: String) = desc = d; - override def getDescription() = desc; - def getFactor = desc; - def setFactor(s: String) = desc = s; - def isHappy = true; - - override def getDescription(info: MBeanInfo) = desc; - -} - -object PipeMain { - def main(args: Array[String]): unit = { - (new SPipe) go; - } -} - -trait SPipeMBean { - def getName: String; - def getDescription: String = getName; - def setDescription(d: String): unit; - def getFactor: String; - def setFactor(s: String): unit; - def isHappy: boolean; - - def start() = { Console.println("Starting"); } - def stop() = { } -*/ diff --git a/kernel/src/main/scala/nio/RemoteClient.scala b/kernel/src/main/scala/nio/RemoteClient.scala index e91173c4f7..0ee67950e2 100644 --- a/kernel/src/main/scala/nio/RemoteClient.scala +++ b/kernel/src/main/scala/nio/RemoteClient.scala @@ -12,7 +12,6 @@ import kernel.actor.{Exit, Actor} import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} import serialization.{Serializer, Serializable, SerializationProtocol} import kernel.util.Logging -import kernel.management.Management import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel._ @@ -22,8 +21,6 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import scala.collection.mutable.HashMap -import com.twitter.service.Stats - /** * @author Jonas Bonér */ @@ -48,8 +45,6 @@ object RemoteClient extends Logging { */ class RemoteClient(hostname: String, port: Int) extends Logging { val name = "RemoteClient@" + hostname - val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name) - val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name) @volatile private var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] @@ -91,10 +86,6 @@ class RemoteClient(hostname: String, port: Int) extends Logging { } def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) { - if (Management.RECORD_STATS) { - NR_OF_BYTES_SENT.incr(request.getSerializedSize) - NR_OF_MESSAGES_SENT.incr - } if (request.getIsOneWay) { connection.getChannel.write(request) None @@ -145,9 +136,6 @@ class RemoteClientHandler(val name: String, val supervisors: ConcurrentMap[String, Actor]) extends SimpleChannelUpstreamHandler with Logging { - val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name) - val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name) - override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) @@ -160,10 +148,6 @@ class RemoteClientHandler(val name: String, val result = event.getMessage if (result.isInstanceOf[RemoteReply]) { val reply = result.asInstanceOf[RemoteReply] - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_RECEIVED.incr - NR_OF_BYTES_RECEIVED.incr(reply.getSerializedSize) - } log.debug("Received RemoteReply[\n%s]", reply.toString) val future = futures.get(reply.getId) if (reply.getIsSuccessful) { diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala index bbc223247e..76a7be6859 100644 --- a/kernel/src/main/scala/nio/RemoteServer.scala +++ b/kernel/src/main/scala/nio/RemoteServer.scala @@ -13,7 +13,6 @@ import kernel.util._ import protobuf.RemoteProtocol import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} import serialization.{Serializer, Serializable, SerializationProtocol} -import kernel.management.Management import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ @@ -21,8 +20,6 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} -import com.twitter.service.Stats - /** * @author Jonas Bonér */ @@ -86,11 +83,6 @@ class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) ext */ @ChannelPipelineCoverage { val value = "all" } class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging { - val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name) - val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name) - val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name) - val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name) - private val activeObjectFactory = new ActiveObjectFactory private val activeObjects = new ConcurrentHashMap[String, AnyRef] private val actors = new ConcurrentHashMap[String, Actor] @@ -115,10 +107,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL } private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_RECEIVED.incr - NR_OF_BYTES_RECEIVED.incr(request.getSerializedSize) - } log.debug("Received RemoteRequest[\n%s]", request.toString) if (request.getIsActor) dispatchToActor(request, channel) else dispatchToActiveObject(request, channel) @@ -143,10 +131,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) val replyMessage = replyBuilder.build channel.write(replyMessage) - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_SENT.incr - NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) - } } catch { case e: Throwable => log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e) @@ -159,10 +143,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) val replyMessage = replyBuilder.build channel.write(replyMessage) - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_SENT.incr - NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) - } } } } @@ -190,10 +170,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) val replyMessage = replyBuilder.build channel.write(replyMessage) - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_SENT.incr - NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) - } } } catch { case e: InvocationTargetException => @@ -207,10 +183,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) val replyMessage = replyBuilder.build channel.write(replyMessage) - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_SENT.incr - NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) - } case e: Throwable => log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e) e.printStackTrace @@ -222,10 +194,6 @@ class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassL if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) val replyMessage = replyBuilder.build channel.write(replyMessage) - if (Management.RECORD_STATS) { - NR_OF_MESSAGES_SENT.incr - NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) - } } } diff --git a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala index a4c7a0fc80..65c71da8f8 100644 --- a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala @@ -10,14 +10,9 @@ */ package se.scalablesolutions.akka.kernel.reactor -import kernel.management.Management - -import java.util.{LinkedList, Queue, List} - -import com.twitter.service.Stats +import java.util.{LinkedList, List} class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBase(name) { - val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessage_" + name) def start = if (!active) { active = true val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue) @@ -28,7 +23,6 @@ class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBa messageDemultiplexer.select } catch { case e: InterruptedException => active = false } val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations - if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(selectedInvocations.size) val iter = selectedInvocations.iterator while (iter.hasNext) { val invocation = iter.next diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala index 1f96769374..101f7cc0dd 100644 --- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala @@ -4,16 +4,12 @@ package se.scalablesolutions.akka.kernel.reactor -import kernel.management.{Management, ThreadPoolMBean} - import java.util.concurrent._ import locks.ReentrantLock import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import java.util.{Collection, HashSet, HashMap, LinkedList, List} -import com.twitter.service.Stats - /** * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350]. @@ -63,7 +59,6 @@ import com.twitter.service.Stats class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: Boolean) extends MessageDispatcherBase(name) { def this(name: String) = this(name, false) - val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name) private val NR_START_THREADS = 16 private val NR_MAX_THREADS = 128 private val KEEP_ALIVE_TIME = 60000L // default is one minute @@ -79,7 +74,6 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B def start = if (!active) { active = true - Management.registerMBean(new ThreadPoolMBean(threadPoolBuilder), "ThreadPool_" + name) /** * This dispatcher code is based on code from the actorom actor framework by Sergio Bossa [http://code.google.com/p/actorom/]. @@ -95,7 +89,6 @@ class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: B } catch { case e: InterruptedException => active = false } val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations val reservedInvocations = reserve(selectedInvocations) - if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(reservedInvocations.size) val it = reservedInvocations.entrySet.iterator while (it.hasNext) { val entry = it.next diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala index d47db197a5..27ed887c7a 100644 --- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala +++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala @@ -4,14 +4,10 @@ package se.scalablesolutions.akka.kernel.reactor -import kernel.management.Management - import java.util.{LinkedList, Queue, List} import java.util.concurrent.{TimeUnit, BlockingQueue} import java.util.HashMap -import com.twitter.service.Stats - abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher { //val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false) @@ -23,12 +19,6 @@ abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher protected var selectorThread: Thread = _ protected val guard = new Object - if (Management.RECORD_STATS) { - Stats.makeGauge("SizeOfBlockingQueue_" + name) { - guard.synchronized { blockingQueue.size.toDouble } - } - } - def messageQueue = queue def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized { @@ -55,12 +45,6 @@ class ReactiveMessageQueue(name: String) extends MessageQueue { private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation] @volatile private var interrupted = false - if (Management.RECORD_STATS) { - Stats.makeGauge("SizeOfReactiveQueue_" + name) { - queue.synchronized { queue.size.toDouble } - } - } - def append(handle: MessageInvocation) = queue.synchronized { queue.offer(handle) queue.notifyAll diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala index fbae2d8c99..28f9ca2761 100644 --- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala +++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala @@ -4,13 +4,10 @@ package se.scalablesolutions.akka.kernel.reactor -import com.twitter.service.Stats - import java.util.concurrent.LinkedBlockingQueue import java.util.Queue import kernel.actor.{Actor, ActorMessageInvoker} -import kernel.management.Management /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. @@ -19,8 +16,6 @@ import kernel.management.Management class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher { def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor)) - val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name) - private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -33,7 +28,6 @@ class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandle override def run = { while (active) { try { - if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr messageHandler.invoke(queue.take) } catch { case e: InterruptedException => active = false } } @@ -52,12 +46,6 @@ class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandle } class BlockingMessageQueue(name: String) extends MessageQueue { - if (Management.RECORD_STATS) { - Stats.makeGauge("SizeOfBlockingQueue_" + name) { - queue.size.toDouble - } - } - // FIXME: configure the LBQ private val queue = new LinkedBlockingQueue[MessageInvocation] def append(handle: MessageInvocation) = queue.put(handle) diff --git a/samples-java/akka-samples-java.iml b/samples-java/akka-samples-java.iml index 243de784f2..f2620f9234 100644 --- a/samples-java/akka-samples-java.iml +++ b/samples-java/akka-samples-java.iml @@ -9,29 +9,30 @@ - - + + - - + + - - - - + + + + + @@ -39,13 +40,10 @@ - + + - - - - @@ -63,10 +61,6 @@ - - - - @@ -76,7 +70,6 @@ -
diff --git a/samples-scala/akka-samples-scala.iml b/samples-scala/akka-samples-scala.iml index b864b09ba5..2dbee0009e 100644 --- a/samples-scala/akka-samples-scala.iml +++ b/samples-scala/akka-samples-scala.iml @@ -5,7 +5,7 @@
- + @@ -14,24 +14,20 @@ - - + + - - + + - - - - @@ -49,13 +45,10 @@ - + + - - - - @@ -82,7 +75,6 @@ -