diff --git a/bin/start-akka-server.sh b/bin/start-akka-server.sh index 04814b7b11..6127220fe8 100755 --- a/bin/start-akka-server.sh +++ b/bin/start-akka-server.sh @@ -3,7 +3,6 @@ VERSION=0.5 BASE_DIR=$(dirname $0)/.. - echo 'Starting Akka Kernel from directory' $BASE_DIR echo 'Resetting persistent storage in' $BASE_DIR/storage @@ -20,7 +19,6 @@ LIB_DIR=$BASE_DIR/lib CLASSPATH=$BASE_DIR/config CLASSPATH=$CLASSPATH:$LIB_DIR/akka-kernel-0.5.jar CLASSPATH=$CLASSPATH:$LIB_DIR/akka-util-java-0.5.jar - CLASSPATH=$CLASSPATH:$LIB_DIR/antlr-3.1.3.jar CLASSPATH=$CLASSPATH:$LIB_DIR/aopalliance-1.0.jar CLASSPATH=$CLASSPATH:$LIB_DIR/asm-3.1.jar @@ -63,6 +61,7 @@ CLASSPATH=$CLASSPATH:$LIB_DIR/netty-3.1.0.CR1.jar CLASSPATH=$CLASSPATH:$LIB_DIR/providerutil.jar CLASSPATH=$CLASSPATH:$LIB_DIR/protobuf-java-2.1.0.jar CLASSPATH=$CLASSPATH:$LIB_DIR/scala-library-2.7.5.jar +CLASSPATH=$CLASSPATH:$LIB_DIR/scala-stats-1.0.jar CLASSPATH=$CLASSPATH:$LIB_DIR/servlet-api-2.5.jar CLASSPATH=$CLASSPATH:$LIB_DIR/slf4j-api-1.4.3.jar CLASSPATH=$CLASSPATH:$LIB_DIR/slf4j-log4j12-1.4.3.jar @@ -74,7 +73,7 @@ CLASSPATH=$CLASSPATH:$LIB_DIR/zookeeper-3.1.0.jar JVM_OPTS=" \ -server \ -Xms128M \ - -Xmx2G \ + -Xmx1G \ -XX:SurvivorRatio=8 \ -XX:TargetSurvivorRatio=90 \ -XX:+AggressiveOpts \ @@ -91,4 +90,5 @@ JVM_OPTS=" \ #$JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.Boot se.scalablesolutions.akka.kernel.Kernel ${1} +echo $JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.kernel.Kernel ${1} $JAVA_HOME/bin/java $JVM_OPTS -cp $CLASSPATH se.scalablesolutions.akka.kernel.Kernel ${1} diff --git a/config/akka-reference.conf b/config/akka-reference.conf index 3f672d1803..1cbd12cee6 100644 --- a/config/akka-reference.conf +++ b/config/akka-reference.conf @@ -1,6 +1,6 @@ -################################# -# Akka Actor Kernel Config File # -################################# +##################### +# Akka Config File # +################### # This file has all the default settings, so all these could be remove with no visible effect. # Modify as needed. @@ -19,11 +19,16 @@ boot = ["sample.java.Boot", "sample.scala.Boot"] # FQN to the class doing initial active object/actor # supervisor bootstrap, should be defined in default constructor + + service = on + record-stats = on + + timeout = 5000 # default timeout for future based invocations concurrent-mode = off # if turned on, then the same actor instance is allowed to execute concurrently - # e.g. departing from the actor model for better performance - serialize-messages = on # does a deep clone of (non-primitive) messages to ensure immutability + serialize-messages = on # does a deep clone of (non-primitive) messages to ensure immutability diff --git a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java index d2cc9b7d0a..fdc2a48c98 100644 --- a/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java +++ b/fun-test-java/src/test/java/se/scalablesolutions/akka/api/ActiveObjectGuiceConfiguratorTest.java @@ -21,7 +21,7 @@ public class ActiveObjectGuiceConfiguratorTest extends TestCase { protected void setUp() { se.scalablesolutions.akka.kernel.Kernel$.MODULE$.config(); - EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher(); + EventBasedThreadPoolDispatcher dispatcher = new EventBasedThreadPoolDispatcher("name"); dispatcher .withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(16) diff --git a/kernel/pom.xml b/kernel/pom.xml index dcc5bc9b67..5faf1864e4 100644 --- a/kernel/pom.xml +++ b/kernel/pom.xml @@ -125,11 +125,6 @@ commons-lang 2.4 - - se.foldleft - cassidy - 0.1 - diff --git a/kernel/src/main/scala/Kernel.scala b/kernel/src/main/scala/Kernel.scala index 6e3641208e..57bb697079 100644 --- a/kernel/src/main/scala/Kernel.scala +++ b/kernel/src/main/scala/Kernel.scala @@ -12,12 +12,13 @@ import javax.ws.rs.core.UriBuilder import java.io.File import java.net.URLClassLoader -import net.lag.configgy.{Config, Configgy, RuntimeEnvironment} +import net.lag.configgy.{Config, Configgy, RuntimeEnvironment, ParseException} import kernel.jersey.AkkaCometServlet import kernel.nio.RemoteServer import kernel.state.CassandraStorage import kernel.util.Logging +import kernel.management.Management /** * @author Jonas Bonér @@ -32,6 +33,7 @@ object Kernel extends Logging { val BOOT_CLASSES = config.getList("akka.boot") val RUN_REMOTE_SERVICE = config.getBool("akka.remote.service", true) + val RUN_MANAGEMENT_SERVICE = config.getBool("akka.management.service", true) val STORAGE_SYSTEM = config.getString("akka.storage.system", "cassandra") val RUN_REST_SERVICE = config.getBool("akka.rest.service", true) @@ -43,6 +45,7 @@ object Kernel extends Logging { private var remoteServer: RemoteServer = _ private var jerseySelectorThread: SelectorThread = _ private val startTime = System.currentTimeMillis + private var applicationLoader: Option[ClassLoader] = None def main(args: Array[String]) = boot @@ -51,11 +54,15 @@ object Kernel extends Logging { printBanner log.info("Starting Akka kernel...") + runApplicationBootClasses + if (RUN_REMOTE_SERVICE) startRemoteService + if (RUN_MANAGEMENT_SERVICE) startManagementService STORAGE_SYSTEM match { case "cassandra" => startCassandra case "terracotta" => throw new UnsupportedOperationException("terracotta storage backend is not yet supported") + case "mongodb" => throw new UnsupportedOperationException("mongodb storage backend is not yet supported") case "redis" => throw new UnsupportedOperationException("redis storage backend is not yet supported") case "voldemort" => throw new UnsupportedOperationException("voldemort storage backend is not yet supported") case "tokyo-cabinet" => throw new UnsupportedOperationException("tokyo-cabinet storage backend is not yet supported") @@ -64,8 +71,6 @@ object Kernel extends Logging { if (RUN_REST_SERVICE) startJersey - runApplicationBootClasses - log.info("Akka kernel started successfully") hasBooted = true } @@ -79,17 +84,18 @@ object Kernel extends Logging { val runtime = new RuntimeEnvironment(getClass) //runtime.load(args) val config = Configgy.config - config.registerWithJmx("com.scalablesolutions.akka.config") + config.registerWithJmx("se.scalablesolutions.akka") // FIXME fix Configgy JMX subscription to allow management // config.subscribe { c => configure(c.getOrElse(new Config)) } config } catch { - case e: net.lag.configgy.ParseException => throw new Error("Could not retreive the akka.conf config file. Make sure you have set the AKKA_HOME environment variable to the root of the distribution.") + case e: ParseException => throw new Error("Could not retreive the akka.conf config file. Make sure you have set the AKKA_HOME environment variable to the root of the distribution.") } } private[akka] def runApplicationBootClasses = { + new management.RestfulJMXBoot // add the REST/JMX service val HOME = try { System.getenv("AKKA_HOME") } catch { case e: NullPointerException => throw new IllegalStateException("AKKA_HOME system variable needs to be set. Should point to the root of the Akka distribution.") } //val CLASSES = HOME + "/kernel/target/classes" // FIXME remove for dist //val LIB = HOME + "/lib" @@ -105,16 +111,22 @@ object Kernel extends Logging { log.info("Booting with boot class [%s]", clazz) loader.loadClass(clazz).newInstance } + applicationLoader = Some(loader) } private[akka] def startRemoteService = { // FIXME manage remote serve thread for graceful shutdown val remoteServerThread = new Thread(new Runnable() { - def run = RemoteServer.start - }, "akka remote service") + def run = RemoteServer.start(applicationLoader) + }, "Akka Remote Service") remoteServerThread.start } + private[akka] def startManagementService = { + Management.startJMX("se.scalablesolutions.akka") + log.info("Management service started successfully.") + } + private[akka] def startCassandra = if (config.getBool("akka.storage.cassandra.service", true)) { System.setProperty("cassandra", "") System.setProperty("storage-config", akka.Boot.CONFIG + "/") diff --git a/kernel/src/main/scala/actor/Actor.scala b/kernel/src/main/scala/actor/Actor.scala index ea4145a4f3..fab6adf40d 100644 --- a/kernel/src/main/scala/actor/Actor.scala +++ b/kernel/src/main/scala/actor/Actor.scala @@ -8,14 +8,17 @@ import com.google.protobuf.ByteString import java.net.InetSocketAddress import java.util.concurrent.CopyOnWriteArraySet -import kernel.reactor._ -import kernel.config.ScalaConfig._ -import kernel.stm.TransactionManagement -import kernel.util.Helpers.ReadWriteLock -import kernel.nio.protobuf.RemoteProtocol.RemoteRequest -import kernel.util.Logging +import reactor._ +import config.ScalaConfig._ +import stm.TransactionManagement +import util.Helpers.ReadWriteLock +import nio.protobuf.RemoteProtocol.RemoteRequest +import util.Logging import serialization.{Serializer, Serializable, SerializationProtocol} import nio.{RemoteProtocolBuilder, RemoteClient, RemoteServer, RemoteRequestIdFactory} +import management.Management + +import com.twitter.service.Stats sealed abstract class LifecycleMessage case class Init(config: AnyRef) extends LifecycleMessage @@ -42,14 +45,16 @@ class ActorMessageInvoker(val actor: Actor) extends MessageInvoker { * @author Jonas Bonér */ object Actor { - val TIMEOUT = kernel.Kernel.config.getInt("akka.actor.timeout", 5000) - val SERIALIZE_MESSAGES = kernel.Kernel.config.getBool("akka.actor.serialize-messages", false) + val TIMEOUT = Kernel.config.getInt("akka.actor.timeout", 5000) + val SERIALIZE_MESSAGES = Kernel.config.getBool("akka.actor.serialize-messages", false) } /** * @author Jonas Bonér */ -@serializable trait Actor extends Logging with TransactionManagement { +trait Actor extends Logging with TransactionManagement { + Stats.getCounter("NrOfActors").incr + @volatile private[this] var isRunning: Boolean = false private[this] val remoteFlagLock = new ReadWriteLock private[this] val transactionalFlagLock = new ReadWriteLock @@ -64,6 +69,8 @@ object Actor { protected[this] val linkedActors = new CopyOnWriteArraySet[Actor] protected[actor] var lifeCycleConfig: Option[LifeCycle] = None + val name = this.getClass.getName + // ==================================== // ==== USER CALLBACKS TO OVERRIDE ==== // ==================================== @@ -96,7 +103,7 @@ object Actor { * */ protected[kernel] var dispatcher: MessageDispatcher = { - val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher + val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher(getClass.getName) mailbox = dispatcher.messageQueue dispatcher.registerHandler(this, new ActorMessageInvoker(this)) dispatcher @@ -529,6 +536,8 @@ object Actor { } private[this] def handleTrapExit(dead: Actor, reason: Throwable): Unit = { + if (Management.RECORD_STATS) Stats.getCounter("NrOfFailures_" + dead.name).incr + if (trapExit) { if (faultHandler.isDefined) { faultHandler.get match { @@ -546,6 +555,7 @@ object Actor { linkedActors.toArray.toList.asInstanceOf[List[Actor]].foreach(_.restart(reason)) private[Actor] def restart(reason: AnyRef) = synchronized { + if (Management.RECORD_STATS) Stats.getCounter("NrOfRestarts_" + name).incr lifeCycleConfig match { case None => throw new IllegalStateException("Server [" + id + "] does not have a life-cycle defined.") diff --git a/kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala b/kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala index 82d0f1e76b..8f9ca19bd2 100644 --- a/kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala +++ b/kernel/src/main/scala/jersey/ActorComponentProviderFactory.scala @@ -19,7 +19,6 @@ extends IoCComponentProviderFactory with Logging { override def getComponentProvider(clazz: Class[_]): IoCComponentProvider = getComponentProvider(null, clazz) override def getComponentProvider(context: ComponentContext, clazz: Class[_]): IoCComponentProvider = { - //log.info("ProviderFactory: resolve => " + clazz.getName) - configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null) + configurators.find(_.isDefined(clazz)).map(_ => new ActorComponentProvider(clazz, configurators)).getOrElse(null) } -} \ No newline at end of file +} diff --git a/kernel/src/main/scala/management/JMX.scala b/kernel/src/main/scala/management/JMX.scala new file mode 100755 index 0000000000..988a123668 --- /dev/null +++ b/kernel/src/main/scala/management/JMX.scala @@ -0,0 +1,143 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.management + +import com.twitter.service.Stats + +import scala.collection.jcl + +import java.lang.management.ManagementFactory +import javax.{management => jmx} +import javax.management.remote.{JMXConnectorServerFactory, JMXServiceURL} + +import kernel.Kernel.config +import kernel.util.Logging + +/** + * @author Jonas Bonér + */ +object Management { + val RECORD_STATS = config.getBool("akka.management.record-stats", true) + + def startJMX(packageName: String): Unit = + startJMX(packageName, ManagementFactory.getPlatformMBeanServer) + + def startJMX(packageName: String, mbeanServer: jmx.MBeanServer): Unit = { + mbeanServer.registerMBean(new StatisticsMBean, new jmx.ObjectName(packageName + ":type=Stats")) + mbeanServer.registerMBean(new ManagementMBean, new jmx.ObjectName(packageName + ":type=Management")) + java.rmi.registry.LocateRegistry.createRegistry(1099) + JMXConnectorServerFactory.newJMXConnectorServer( + new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi"), + null, + mbeanServer).start + } +} + +/** + * @author Jonas Bonér + */ +class StatisticsMBean extends jmx.DynamicMBean with Logging { + def getMBeanInfo = new jmx.MBeanInfo( + "se.scalablesolutions.akka.management.Stats", + "runtime statistics", + getAttributeInfo, + null, null, null, + new jmx.ImmutableDescriptor("immutableInfo=false")) + + def getAttribute(name: String): AnyRef = { + val segments = name.split("_", 2) + segments(0) match { + case "counter" => + Stats.getCounterStats()(segments(1)).asInstanceOf[java.lang.Long] + case "timing" => + val prefix = segments(1).split("_", 2) + val timing = Stats.getTimingStats(false)(prefix(1)) + val x = prefix(0) match { + case "min" => timing.minimum + case "max" => timing.maximum + case "count" => timing.count + case "average" => timing.average + } + x.asInstanceOf[java.lang.Integer] + case "gauge" => + Stats.getGaugeStats(false)(segments(1)).asInstanceOf[java.lang.Double] + } + } + + def getAttributes(names: Array[String]): jmx.AttributeList = { + val rv = new jmx.AttributeList + for (name <- names) rv.add(new jmx.Attribute(name, getAttribute(name))) + rv + } + + def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = throw new UnsupportedOperationException + def setAttribute(attr: jmx.Attribute): Unit = throw new UnsupportedOperationException + def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = throw new UnsupportedOperationException + + private def getAttributeInfo: Array[jmx.MBeanAttributeInfo] = { + (Stats.getCounterStats.keys.map { name => + List(new jmx.MBeanAttributeInfo("counter_" + name, "java.lang.Long", "counter", true, false, false)) + } ++ Stats.getTimingStats(false).keys.map { name => + List("min", "max", "average", "count") map { prefix => + new jmx.MBeanAttributeInfo("timing_" + prefix + "_" + name, "java.lang.Integer", "timing", true, false, false) + } + } ++ Stats.getGaugeStats(false).keys.map { name => + List(new jmx.MBeanAttributeInfo("gauge_" + name, "java.lang.Long", "gauge", true, false, false)) + }).toList.flatten[jmx.MBeanAttributeInfo].toArray + } +} + +/** + * @author Jonas Bonér + */ +class ManagementMBean extends jmx.DynamicMBean with Logging { + val operations: Array[jmx.MBeanOperationInfo] = Array( + new jmx.MBeanOperationInfo("dosomething", "TODO", + Array( + new jmx.MBeanParameterInfo("key", "java.lang.String", "TODO"), + new jmx.MBeanParameterInfo("value", "java.lang.String", "TODO") + ), "void", jmx.MBeanOperationInfo.ACTION) + ) + + def getMBeanInfo = new jmx.MBeanInfo( + "se.scalablesolutions.akka.management.Management", + "runtime management", + Array[jmx.MBeanAttributeInfo](), + null, operations, null, + new jmx.ImmutableDescriptor("immutableInfo=false")) + + def getAttribute(name: String): AnyRef = throw new UnsupportedOperationException + + def getAttributes(names: Array[String]): jmx.AttributeList = throw new UnsupportedOperationException + + def invoke(actionName: String, params: Array[Object], signature: Array[String]): AnyRef = { + actionName match { + case "dosomething" => + params match { + case Array(name: String, value: String) => + try { + {} + } catch { + case e: Exception => + log.warning("Exception: %s", e.getMessage) + throw e + } + case _ => throw new jmx.MBeanException(new Exception("Bad signature " + params.toList.toString)) + } + case _ => throw new jmx.MBeanException(new Exception("No such method")) + } + null + } + + def setAttribute(attr: jmx.Attribute): Unit = attr.getValue match { + case s: String => println("Setting: " + s) + case _ => throw new jmx.InvalidAttributeValueException() + } + + def setAttributes(attrs: jmx.AttributeList): jmx.AttributeList = { + for (attr <- jcl.Buffer(attrs.asList)) setAttribute(attr) + attrs + } +} diff --git a/kernel/src/main/scala/management/RestfulJMX.scala b/kernel/src/main/scala/management/RestfulJMX.scala new file mode 100755 index 0000000000..5726e57764 --- /dev/null +++ b/kernel/src/main/scala/management/RestfulJMX.scala @@ -0,0 +1,71 @@ +/** + * Copyright (C) 2009 Scalable Solutions. + */ + +package se.scalablesolutions.akka.kernel.management + +import se.scalablesolutions.akka.kernel.actor.{SupervisorFactory, Actor} +import se.scalablesolutions.akka.kernel.config.ScalaConfig._ +import se.scalablesolutions.akka.kernel.util.Logging + +import javax.ws.rs.core.MultivaluedMap +import javax.ws.rs.{GET, POST, Path, QueryParam, Produces, WebApplicationException, Consumes} +import javax.management._ +import javax.management.remote.{JMXConnector, JMXConnectorFactory, JMXServiceURL} +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import java.util.concurrent.ConcurrentHashMap + +/** + * REST interface to Akka's JMX service. + *

+ * Here is an example that retreives the current number of Actors. + *

+ * http://localhost:9998/management
+ *   ?service=service:jmx:rmi:///jndi/rmi://localhost:1099/jmxrmi
+ *   &component=se.scalablesolutions.akka:type=Stats
+ *   &attribute=counter_NrOfActors
+ * 
+ */ +@Path("/management") +class RestfulJMX extends Actor with Logging { + private case class Request(service: String, component: String, attribute: String) + private val connectors = new ConcurrentHashMap[String, JMXConnector] + + @GET + @Produces(Array("text/html")) + def queryJMX( + @QueryParam("service") service: String, + @QueryParam("component") component: String, + @QueryParam("attribute") attribute: String) = + (this !! Request(service, component, attribute)).getOrElse(Error in REST JMX management service) + + override def receive: PartialFunction[Any, Unit] = { + case Request(service, component, attribute) => reply(retrieveAttribute(service, component, attribute)) + } + + private def retrieveAttribute(service: String, component: String, attribute: String): scala.xml.Elem = { + try { + var connector = connectors.putIfAbsent(service, JMXConnectorFactory.connect(new JMXServiceURL(service))) +
{connector.getMBeanServerConnection.getAttribute(new ObjectName(component), attribute).toString}
+ } catch { + case e: Exception => + if (connectors.contains(service)) connectors.remove(service) + throw e + } + } +} + +class RestfulJMXBoot extends Logging { + log.info("Booting Restful JMX servivce") + object factory extends SupervisorFactory { + override def getSupervisorConfig: SupervisorConfig = { + SupervisorConfig( + RestartStrategy(OneForOne, 3, 100), + Supervise( + new RestfulJMX, + LifeCycle(Permanent, 100)) + :: Nil) + } + } + factory.newSupervisor.startSupervisor +} diff --git a/kernel/src/main/scala/nio/RemoteClient.scala b/kernel/src/main/scala/nio/RemoteClient.scala index 2e62d4dbcf..e91173c4f7 100644 --- a/kernel/src/main/scala/nio/RemoteClient.scala +++ b/kernel/src/main/scala/nio/RemoteClient.scala @@ -12,6 +12,7 @@ import kernel.actor.{Exit, Actor} import kernel.reactor.{DefaultCompletableFutureResult, CompletableFutureResult} import serialization.{Serializer, Serializable, SerializationProtocol} import kernel.util.Logging +import kernel.management.Management import org.jboss.netty.bootstrap.ClientBootstrap import org.jboss.netty.channel._ @@ -21,6 +22,8 @@ import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} import scala.collection.mutable.HashMap +import com.twitter.service.Stats + /** * @author Jonas Bonér */ @@ -44,6 +47,10 @@ object RemoteClient extends Logging { * @author Jonas Bonér */ class RemoteClient(hostname: String, port: Int) extends Logging { + val name = "RemoteClient@" + hostname + val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name) + val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name) + @volatile private var isRunning = false private val futures = new ConcurrentHashMap[Long, CompletableFutureResult] private val supervisors = new ConcurrentHashMap[String, Actor] @@ -55,7 +62,7 @@ class RemoteClient(hostname: String, port: Int) extends Logging { private val bootstrap = new ClientBootstrap(channelFactory) - bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(futures, supervisors)) + bootstrap.setPipelineFactory(new RemoteClientPipelineFactory(name, futures, supervisors)) bootstrap.setOption("tcpNoDelay", true) bootstrap.setOption("keepAlive", true) @@ -84,6 +91,10 @@ class RemoteClient(hostname: String, port: Int) extends Logging { } def send(request: RemoteRequest): Option[CompletableFutureResult] = if (isRunning) { + if (Management.RECORD_STATS) { + NR_OF_BYTES_SENT.incr(request.getSerializedSize) + NR_OF_MESSAGES_SENT.incr + } if (request.getIsOneWay) { connection.getChannel.write(request) None @@ -111,15 +122,16 @@ class RemoteClient(hostname: String, port: Int) extends Logging { /** * @author Jonas Bonér */ -class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFutureResult], - supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory { +class RemoteClientPipelineFactory(name: String, + futures: ConcurrentMap[Long, CompletableFutureResult], + supervisors: ConcurrentMap[String, Actor]) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val p = Channels.pipeline() p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)); p.addLast("protobufDecoder", new ProtobufDecoder(RemoteReply.getDefaultInstance)); p.addLast("frameEncoder", new LengthFieldPrepender(4)); p.addLast("protobufEncoder", new ProtobufEncoder()); - p.addLast("handler", new RemoteClientHandler(futures, supervisors)) + p.addLast("handler", new RemoteClientHandler(name, futures, supervisors)) p } } @@ -128,10 +140,14 @@ class RemoteClientPipelineFactory(futures: ConcurrentMap[Long, CompletableFuture * @author Jonas Bonér */ @ChannelPipelineCoverage { val value = "all" } -class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResult], +class RemoteClientHandler(val name: String, + val futures: ConcurrentMap[Long, CompletableFutureResult], val supervisors: ConcurrentMap[String, Actor]) extends SimpleChannelUpstreamHandler with Logging { + val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name) + val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name) + override def handleUpstream(ctx: ChannelHandlerContext, event: ChannelEvent) = { if (event.isInstanceOf[ChannelStateEvent] && event.asInstanceOf[ChannelStateEvent].getState != ChannelState.INTEREST_OPS) { log.debug(event.toString) @@ -144,6 +160,10 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu val result = event.getMessage if (result.isInstanceOf[RemoteReply]) { val reply = result.asInstanceOf[RemoteReply] + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_RECEIVED.incr + NR_OF_BYTES_RECEIVED.incr(reply.getSerializedSize) + } log.debug("Received RemoteReply[\n%s]", reply.toString) val future = futures.get(reply.getId) if (reply.getIsSuccessful) { @@ -159,7 +179,7 @@ class RemoteClientHandler(val futures: ConcurrentMap[Long, CompletableFutureResu } future.completeWithException(null, parseException(reply)) } - futures.remove(reply.getId) + futures.remove(reply.getId) } else throw new IllegalArgumentException("Unknown message received in remote client handler: " + result) } catch { case e: Exception => diff --git a/kernel/src/main/scala/nio/RemoteServer.scala b/kernel/src/main/scala/nio/RemoteServer.scala index b2f1931879..bbc223247e 100644 --- a/kernel/src/main/scala/nio/RemoteServer.scala +++ b/kernel/src/main/scala/nio/RemoteServer.scala @@ -13,6 +13,7 @@ import kernel.util._ import protobuf.RemoteProtocol import protobuf.RemoteProtocol.{RemoteReply, RemoteRequest} import serialization.{Serializer, Serializable, SerializationProtocol} +import kernel.management.Management import org.jboss.netty.bootstrap.ServerBootstrap import org.jboss.netty.channel._ @@ -20,22 +21,28 @@ import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory import org.jboss.netty.handler.codec.frame.{LengthFieldBasedFrameDecoder, LengthFieldPrepender} import org.jboss.netty.handler.codec.protobuf.{ProtobufDecoder, ProtobufEncoder} +import com.twitter.service.Stats + /** * @author Jonas Bonér */ class RemoteServer extends Logging { - def start = RemoteServer.start + def start = RemoteServer.start(None) } /** * @author Jonas Bonér */ object RemoteServer extends Logging { - val HOSTNAME = kernel.Kernel.config.getString("akka.remote.hostname", "localhost") - val PORT = kernel.Kernel.config.getInt("akka.remote.port", 9999) - val CONNECTION_TIMEOUT_MILLIS = kernel.Kernel.config.getInt("akka.remote.connection-timeout", 1000) + import kernel.Kernel.config + val HOSTNAME = config.getString("akka.remote.hostname", "localhost") + val PORT = config.getInt("akka.remote.port", 9999) + val CONNECTION_TIMEOUT_MILLIS = config.getInt("akka.remote.connection-timeout", 1000) + + val name = "RemoteServer@" + HOSTNAME @volatile private var isRunning = false + @volatile private var isConfigured = false private val factory = new NioServerSocketChannelFactory( Executors.newCachedThreadPool, @@ -44,18 +51,15 @@ object RemoteServer extends Logging { private val activeObjectFactory = new ActiveObjectFactory private val bootstrap = new ServerBootstrap(factory) - // FIXME provide different codecs (Thrift, Avro, Protobuf, JSON) - private val handler = new RemoteServerHandler - bootstrap.setPipelineFactory(new RemoteServerPipelineFactory) - bootstrap.setOption("child.tcpNoDelay", true) - bootstrap.setOption("child.keepAlive", true) - bootstrap.setOption("child.reuseAddress", true) - bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS) - - def start = synchronized { + def start(loader: Option[ClassLoader]) = synchronized { if (!isRunning) { log.info("Starting remote server at [%s:%s]", HOSTNAME, PORT) + bootstrap.setPipelineFactory(new RemoteServerPipelineFactory(name, loader)) + bootstrap.setOption("child.tcpNoDelay", true) + bootstrap.setOption("child.keepAlive", true) + bootstrap.setOption("child.reuseAddress", true) + bootstrap.setOption("child.connectTimeoutMillis", CONNECTION_TIMEOUT_MILLIS) bootstrap.bind(new InetSocketAddress(HOSTNAME, PORT)) isRunning = true } @@ -65,14 +69,14 @@ object RemoteServer extends Logging { /** * @author Jonas Bonér */ -class RemoteServerPipelineFactory extends ChannelPipelineFactory { +class RemoteServerPipelineFactory(name: String, loader: Option[ClassLoader]) extends ChannelPipelineFactory { def getPipeline: ChannelPipeline = { val p = Channels.pipeline() p.addLast("frameDecoder", new LengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4)) p.addLast("protobufDecoder", new ProtobufDecoder(RemoteProtocol.RemoteRequest.getDefaultInstance)) p.addLast("frameEncoder", new LengthFieldPrepender(4)) p.addLast("protobufEncoder", new ProtobufEncoder) - p.addLast("handler", new RemoteServerHandler) + p.addLast("handler", new RemoteServerHandler(name, loader)) p } } @@ -81,7 +85,12 @@ class RemoteServerPipelineFactory extends ChannelPipelineFactory { * @author Jonas Bonér */ @ChannelPipelineCoverage { val value = "all" } -class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { +class RemoteServerHandler(val name: String, val applicationLoader: Option[ClassLoader]) extends SimpleChannelUpstreamHandler with Logging { + val NR_OF_BYTES_SENT = Stats.getCounter("NrOfBytesSent_" + name) + val NR_OF_BYTES_RECEIVED = Stats.getCounter("NrOfBytesReceived_" + name) + val NR_OF_MESSAGES_SENT = Stats.getCounter("NrOfMessagesSent_" + name) + val NR_OF_MESSAGES_RECEIVED = Stats.getCounter("NrOfMessagesReceived_" + name) + private val activeObjectFactory = new ActiveObjectFactory private val activeObjects = new ConcurrentHashMap[String, AnyRef] private val actors = new ConcurrentHashMap[String, Actor] @@ -106,6 +115,10 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { } private def handleRemoteRequest(request: RemoteRequest, channel: Channel) = { + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_RECEIVED.incr + NR_OF_BYTES_RECEIVED.incr(request.getSerializedSize) + } log.debug("Received RemoteRequest[\n%s]", request.toString) if (request.getIsActor) dispatchToActor(request, channel) else dispatchToActiveObject(request, channel) @@ -128,7 +141,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { .setIsActor(true) RemoteProtocolBuilder.setMessage(result, replyBuilder) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + val replyMessage = replyBuilder.build + channel.write(replyMessage) + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_SENT.incr + NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) + } } catch { case e: Throwable => log.error("Could not invoke remote actor [%s] due to: %s", request.getTarget, e) @@ -139,7 +157,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { .setIsSuccessful(false) .setIsActor(true) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + val replyMessage = replyBuilder.build + channel.write(replyMessage) + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_SENT.incr + NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) + } } } } @@ -165,7 +188,12 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { .setIsActor(false) RemoteProtocolBuilder.setMessage(result, replyBuilder) if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + val replyMessage = replyBuilder.build + channel.write(replyMessage) + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_SENT.incr + NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) + } } } catch { case e: InvocationTargetException => @@ -176,8 +204,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { .setException(e.getCause.getClass.getName + "$" + e.getCause.getMessage) .setIsSuccessful(false) .setIsActor(false) - if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) + val replyMessage = replyBuilder.build + channel.write(replyMessage) + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_SENT.incr + NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) + } case e: Throwable => log.error("Could not invoke remote active object [%s :: %s] due to: %s", request.getMethod, request.getTarget, e) e.printStackTrace @@ -186,8 +219,13 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { .setException(e.getClass.getName + "$" + e.getMessage) .setIsSuccessful(false) .setIsActor(false) - if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) - channel.write(replyBuilder.build) + if (request.hasSupervisorUuid) replyBuilder.setSupervisorUuid(request.getSupervisorUuid) + val replyMessage = replyBuilder.build + channel.write(replyMessage) + if (Management.RECORD_STATS) { + NR_OF_MESSAGES_SENT.incr + NR_OF_BYTES_SENT.incr(replyMessage.getSerializedSize) + } } } @@ -223,8 +261,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { private def createActiveObject(name: String, timeout: Long): AnyRef = { val activeObjectOrNull = activeObjects.get(name) if (activeObjectOrNull == null) { - val clazz = Class.forName(name) try { + val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) + else Class.forName(name) val newInstance = activeObjectFactory.newInstance(clazz, timeout).asInstanceOf[AnyRef] activeObjects.put(name, newInstance) newInstance @@ -240,8 +279,9 @@ class RemoteServerHandler extends SimpleChannelUpstreamHandler with Logging { private def createActor(name: String, timeout: Long): Actor = { val actorOrNull = actors.get(name) if (actorOrNull == null) { - val clazz = Class.forName(name) try { + val clazz = if (applicationLoader.isDefined) applicationLoader.get.loadClass(name) + else Class.forName(name) val newInstance = clazz.newInstance.asInstanceOf[Actor] newInstance.timeout = timeout actors.put(name, newInstance) diff --git a/kernel/src/main/scala/reactor/Dispatchers.scala b/kernel/src/main/scala/reactor/Dispatchers.scala index d7697e965c..5c4935bbd5 100644 --- a/kernel/src/main/scala/reactor/Dispatchers.scala +++ b/kernel/src/main/scala/reactor/Dispatchers.scala @@ -49,17 +49,17 @@ class DispatcherFactory { * Creates an event based dispatcher serving multiple (millions) of actors through a thread pool. * Has a fluent builder interface for configuring its semantics. */ - def newEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher - def newConcurrentEventBasedThreadPoolDispatcher = new EventBasedThreadPoolDispatcher(true) + def newEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name) + def newConcurrentEventBasedThreadPoolDispatcher(name: String) = new EventBasedThreadPoolDispatcher(name, true) /** * Creates an event based dispatcher serving multiple (millions) of actors through a single thread. */ - def newEventBasedSingleThreadDispatcher = new EventBasedSingleThreadDispatcher + def newEventBasedSingleThreadDispatcher(name: String) = new EventBasedSingleThreadDispatcher(name) /** * Creates an thread based dispatcher serving a single actor through the same single thread. * E.g. each actor consumes its own thread. */ def newThreadBasedDispatcher(actor: Actor) = new ThreadBasedDispatcher(actor) -} \ No newline at end of file +} diff --git a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala index c3d52e777e..a4c7a0fc80 100644 --- a/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedSingleThreadDispatcher.scala @@ -10,9 +10,14 @@ */ package se.scalablesolutions.akka.kernel.reactor +import kernel.management.Management + import java.util.{LinkedList, Queue, List} -class EventBasedSingleThreadDispatcher extends MessageDispatcherBase { +import com.twitter.service.Stats + +class EventBasedSingleThreadDispatcher(name: String) extends MessageDispatcherBase(name) { + val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessage_" + name) def start = if (!active) { active = true val messageDemultiplexer = new EventBasedSingleThreadDemultiplexer(queue) @@ -22,12 +27,14 @@ class EventBasedSingleThreadDispatcher extends MessageDispatcherBase { try { messageDemultiplexer.select } catch { case e: InterruptedException => active = false } - val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations.iterator - while (selectedInvocations.hasNext) { - val invocation = selectedInvocations.next + val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations + if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(selectedInvocations.size) + val iter = selectedInvocations.iterator + while (iter.hasNext) { + val invocation = iter.next val invoker = messageHandlers.get(invocation.sender) if (invoker != null) invoker.invoke(invocation) - selectedInvocations.remove + iter.remove } } } diff --git a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala index a3b9b4dbe4..939fe8ccbe 100644 --- a/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala +++ b/kernel/src/main/scala/reactor/EventBasedThreadPoolDispatcher.scala @@ -4,12 +4,16 @@ package se.scalablesolutions.akka.kernel.reactor +import kernel.management.Management + import java.util.concurrent._ import locks.ReentrantLock import atomic.{AtomicLong, AtomicInteger} import ThreadPoolExecutor.CallerRunsPolicy import java.util.{Collection, HashSet, HashMap, LinkedList, List} +import com.twitter.service.Stats + /** * Implements the Reactor pattern as defined in: [http://www.cs.wustl.edu/~schmidt/PDF/reactor-siemens.pdf].
* See also this article: [http://today.java.net/cs/user/print/a/350]. @@ -56,16 +60,17 @@ import java.util.{Collection, HashSet, HashMap, LinkedList, List} * * @author Jonas Bonér */ -class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extends MessageDispatcherBase { - def this() = this(false) +class EventBasedThreadPoolDispatcher(name: String, private val concurrentMode: Boolean) extends MessageDispatcherBase(name) { + def this(name: String) = this(name, false) + val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name) private val NR_START_THREADS = 16 private val NR_MAX_THREADS = 128 private val KEEP_ALIVE_TIME = 60000L // default is one minute private var inProcessOfBuilding = false private var executor: ExecutorService = _ private var threadPoolBuilder: ThreadPoolExecutor = _ - private val threadFactory = new MonitorableThreadFactory("akka") + private val threadFactory = new MonitorableThreadFactory("akka:" + name) private var boundedExecutorBound = -1 private val busyInvokers = new HashSet[AnyRef] @@ -89,6 +94,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend } catch { case e: InterruptedException => active = false } val selectedInvocations = messageDemultiplexer.acquireSelectedInvocations val reservedInvocations = reserve(selectedInvocations) + if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr(reservedInvocations.size) val it = reservedInvocations.entrySet.iterator while (it.hasNext) { val entry = it.next @@ -157,6 +163,7 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend ensureNotActive verifyNotInConstructionPhase inProcessOfBuilding = false + blockingQueue = queue threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, queue) this } @@ -169,7 +176,8 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend def withNewThreadPoolWithBoundedBlockingQueue(bound: Int): EventBasedThreadPoolDispatcher = synchronized { ensureNotActive verifyNotInConstructionPhase - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory) + blockingQueue = new LinkedBlockingQueue[Runnable] + threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory) boundedExecutorBound = bound this } @@ -177,28 +185,32 @@ class EventBasedThreadPoolDispatcher(private val concurrentMode: Boolean) extend def withNewThreadPoolWithLinkedBlockingQueueWithCapacity(capacity: Int): EventBasedThreadPoolDispatcher = synchronized { ensureNotActive verifyNotInConstructionPhase - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable](capacity), threadFactory, new CallerRunsPolicy) + blockingQueue = new LinkedBlockingQueue[Runnable](capacity) + threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this } def withNewThreadPoolWithLinkedBlockingQueueWithUnboundedCapacity: EventBasedThreadPoolDispatcher = synchronized { ensureNotActive verifyNotInConstructionPhase - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new LinkedBlockingQueue[Runnable], threadFactory, new CallerRunsPolicy) + blockingQueue = new LinkedBlockingQueue[Runnable] + threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this } def withNewThreadPoolWithSynchronousQueueWithFairness(fair: Boolean): EventBasedThreadPoolDispatcher = synchronized { ensureNotActive verifyNotInConstructionPhase - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new SynchronousQueue[Runnable](fair), threadFactory, new CallerRunsPolicy) + blockingQueue = new SynchronousQueue[Runnable](fair) + threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this } def withNewThreadPoolWithArrayBlockingQueueWithCapacityAndFairness(capacity: Int, fair: Boolean): EventBasedThreadPoolDispatcher = synchronized { ensureNotActive verifyNotInConstructionPhase - threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, new ArrayBlockingQueue[Runnable](capacity, fair), threadFactory, new CallerRunsPolicy) + blockingQueue = new ArrayBlockingQueue[Runnable](capacity, fair) + threadPoolBuilder = new ThreadPoolExecutor(NR_START_THREADS, NR_MAX_THREADS, KEEP_ALIVE_TIME, MILLISECONDS, blockingQueue, threadFactory, new CallerRunsPolicy) this } @@ -311,13 +323,7 @@ class BoundedExecutorDecorator(val executor: ExecutorService, bound: Int) extend def invokeAll[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAll(callables) def invokeAll[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) def invokeAny[T](callables: Collection[_ <: Callable[T]]) = executor.invokeAny(callables) - def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) -/* - def invokeAll[T](callables: Collection[Callable[T]]) = executor.invokeAll(callables) - def invokeAll[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAll(callables, l, timeUnit) - def invokeAny[T](callables: Collection[Callable[T]]) = executor.invokeAny(callables) - def invokeAny[T](callables: Collection[Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) - */ + def invokeAny[T](callables: Collection[_ <: Callable[T]], l: Long, timeUnit: TimeUnit) = executor.invokeAny(callables, l, timeUnit) } /** diff --git a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala index 9dd21f2f42..d47db197a5 100644 --- a/kernel/src/main/scala/reactor/MessageDispatcherBase.scala +++ b/kernel/src/main/scala/reactor/MessageDispatcherBase.scala @@ -4,20 +4,31 @@ package se.scalablesolutions.akka.kernel.reactor +import kernel.management.Management + import java.util.{LinkedList, Queue, List} -import java.util.concurrent.TimeUnit +import java.util.concurrent.{TimeUnit, BlockingQueue} import java.util.HashMap -trait MessageDispatcherBase extends MessageDispatcher { +import com.twitter.service.Stats + +abstract class MessageDispatcherBase(val name: String) extends MessageDispatcher { + //val CONCURRENT_MODE = kernel.Kernel.config.getBool("akka.actor.concurrent-mode", false) val MILLISECONDS = TimeUnit.MILLISECONDS - val queue = new ReactiveMessageQueue - + val queue = new ReactiveMessageQueue(name) + var blockingQueue: BlockingQueue[Runnable] = _ @volatile protected var active: Boolean = false protected val messageHandlers = new HashMap[AnyRef, MessageInvoker] protected var selectorThread: Thread = _ protected val guard = new Object + if (Management.RECORD_STATS) { + Stats.makeGauge("SizeOfBlockingQueue_" + name) { + guard.synchronized { blockingQueue.size.toDouble } + } + } + def messageQueue = queue def registerHandler(key: AnyRef, handler: MessageInvoker) = guard.synchronized { @@ -40,10 +51,16 @@ trait MessageDispatcherBase extends MessageDispatcher { protected def doShutdown = {} } -class ReactiveMessageQueue extends MessageQueue { +class ReactiveMessageQueue(name: String) extends MessageQueue { private[kernel] val queue: Queue[MessageInvocation] = new LinkedList[MessageInvocation] @volatile private var interrupted = false + if (Management.RECORD_STATS) { + Stats.makeGauge("SizeOfReactiveQueue_" + name) { + queue.synchronized { queue.size.toDouble } + } + } + def append(handle: MessageInvocation) = queue.synchronized { queue.offer(handle) queue.notifyAll @@ -64,4 +81,4 @@ class ReactiveMessageQueue extends MessageQueue { interrupted = true queue.notifyAll } -} \ No newline at end of file +} diff --git a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala index b01f373dc9..fbae2d8c99 100644 --- a/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala +++ b/kernel/src/main/scala/reactor/ThreadBasedDispatcher.scala @@ -4,18 +4,24 @@ package se.scalablesolutions.akka.kernel.reactor +import com.twitter.service.Stats + import java.util.concurrent.LinkedBlockingQueue import java.util.Queue + import kernel.actor.{Actor, ActorMessageInvoker} +import kernel.management.Management /** * Dedicates a unique thread for each actor passed in as reference. Served through its messageQueue. * @author Jonas Bonér */ -class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) extends MessageDispatcher { - def this(actor: Actor) = this(new ActorMessageInvoker(actor)) +class ThreadBasedDispatcher private[kernel] (val name: String, val messageHandler: MessageInvoker) extends MessageDispatcher { + def this(actor: Actor) = this(actor.getClass.getName, new ActorMessageInvoker(actor)) - private val queue = new BlockingMessageQueue + val NR_OF_PROCESSED_MESSAGES = Stats.getCounter("NrOfProcessedMessages_" + name) + + private val queue = new BlockingMessageQueue(name) private var selectorThread: Thread = _ @volatile private var active: Boolean = false @@ -27,6 +33,7 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) override def run = { while (active) { try { + if (Management.RECORD_STATS) NR_OF_PROCESSED_MESSAGES.incr messageHandler.invoke(queue.take) } catch { case e: InterruptedException => active = false } } @@ -44,7 +51,13 @@ class ThreadBasedDispatcher private[kernel] (val messageHandler: MessageInvoker) def unregisterHandler(key: AnyRef) = throw new UnsupportedOperationException } -class BlockingMessageQueue extends MessageQueue { +class BlockingMessageQueue(name: String) extends MessageQueue { + if (Management.RECORD_STATS) { + Stats.makeGauge("SizeOfBlockingQueue_" + name) { + queue.size.toDouble + } + } + // FIXME: configure the LBQ private val queue = new LinkedBlockingQueue[MessageInvocation] def append(handle: MessageInvocation) = queue.put(handle) @@ -52,4 +65,4 @@ class BlockingMessageQueue extends MessageQueue { def take: MessageInvocation = queue.take def read(destination: Queue[MessageInvocation]) = throw new UnsupportedOperationException def interrupt = throw new UnsupportedOperationException -} \ No newline at end of file +} diff --git a/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala index dc272893f5..3ac4eee51a 100644 --- a/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala +++ b/kernel/src/test/scala/EventBasedSingleThreadDispatcherTest.scala @@ -55,7 +55,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(100) val key = "key" - val dispatcher = new EventBasedSingleThreadDispatcher + val dispatcher = new EventBasedSingleThreadDispatcher("name") dispatcher.registerHandler(key, new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { @@ -69,7 +69,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { val handleLatch = new CountDownLatch(2) val key1 = "key1" val key2 = "key2" - val dispatcher = new EventBasedSingleThreadDispatcher + val dispatcher = new EventBasedSingleThreadDispatcher("name") dispatcher.registerHandler(key1, new TestMessageHandle(handleLatch)) dispatcher.registerHandler(key2, new TestMessageHandle(handleLatch)) dispatcher.start @@ -83,7 +83,7 @@ class EventBasedSingleThreadDispatcherTest extends TestCase { val handleLatch = new CountDownLatch(200) val key1 = "key1" val key2 = "key2" - val dispatcher = new EventBasedSingleThreadDispatcher + val dispatcher = new EventBasedSingleThreadDispatcher("name") dispatcher.registerHandler(key1, new MessageInvoker { var currentValue = -1; def invoke(message: MessageInvocation) { diff --git a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala index 89b908015e..c0b205d6f6 100644 --- a/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala +++ b/kernel/src/test/scala/EventBasedThreadPoolDispatcherTest.scala @@ -37,7 +37,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(10) val key = "key" - val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher + val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name") dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(2) .setMaxPoolSize(4) @@ -76,7 +76,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { val handlersBarrier = new CyclicBarrier(3) val key1 = "key1" val key2 = "key2" - val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher + val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name") dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(2) .setMaxPoolSize(4) @@ -121,7 +121,7 @@ class EventBasedThreadPoolDispatcherTest extends TestCase { val handleLatch = new CountDownLatch(200) val key1 = "key1" val key2 = "key2" - val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher + val dispatcher = Dispatchers.newEventBasedThreadPoolDispatcher("name") dispatcher.withNewThreadPoolWithBoundedBlockingQueue(100) .setCorePoolSize(2) .setMaxPoolSize(4) diff --git a/kernel/src/test/scala/RemoteActorSpec.scala b/kernel/src/test/scala/RemoteActorSpec.scala index 61397e6018..3387aa8eb0 100644 --- a/kernel/src/test/scala/RemoteActorSpec.scala +++ b/kernel/src/test/scala/RemoteActorSpec.scala @@ -26,7 +26,6 @@ class RemoteActorSpecActorBidirectional extends Actor { } class RemoteActorSpec extends TestCase { - kernel.Kernel.config new Thread(new Runnable() { def run = { diff --git a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala index 621e3dccfd..1ad5c0b733 100644 --- a/kernel/src/test/scala/ThreadBasedDispatcherTest.scala +++ b/kernel/src/test/scala/ThreadBasedDispatcherTest.scala @@ -49,7 +49,7 @@ class ThreadBasedDispatcherTest extends TestCase { private def internalTestMessagesDispatchedToTheSameHandlerAreExecutedSequentially: Unit = { val guardLock = new ReentrantLock val handleLatch = new CountDownLatch(100) - val dispatcher = new ThreadBasedDispatcher(new TestMessageHandle(handleLatch)) + val dispatcher = new ThreadBasedDispatcher("name", new TestMessageHandle(handleLatch)) dispatcher.start for (i <- 0 until 100) { dispatcher.messageQueue.append(new MessageInvocation("id", new Object, None, None)) @@ -60,7 +60,7 @@ class ThreadBasedDispatcherTest extends TestCase { private def internalTestMessagesDispatchedToHandlersAreExecutedInFIFOOrder: Unit = { val handleLatch = new CountDownLatch(100) - val dispatcher = new ThreadBasedDispatcher(new MessageInvoker { + val dispatcher = new ThreadBasedDispatcher("name", new MessageInvoker { var currentValue = -1; def invoke(message: MessageInvocation) { if (threadingIssueDetected.get) return diff --git a/lib/atmosphere-core-0.3.jar~HEAD b/lib/atmosphere-core-0.3.jar~HEAD deleted file mode 100644 index aef71c00ba..0000000000 Binary files a/lib/atmosphere-core-0.3.jar~HEAD and /dev/null differ diff --git a/lib/atmosphere-portable-runtime-0.3.jar~HEAD b/lib/atmosphere-portable-runtime-0.3.jar~HEAD deleted file mode 100644 index 19875c8c51..0000000000 Binary files a/lib/atmosphere-portable-runtime-0.3.jar~HEAD and /dev/null differ diff --git a/lib/cassidy-0.1.jar b/lib/cassidy-0.1.jar old mode 100755 new mode 100644 diff --git a/lib/jersey-client-1.1.1-ea.jar b/lib/jersey-client-1.1.1-ea.jar old mode 100755 new mode 100644 diff --git a/lib/jersey-core-1.1.1-ea.jar b/lib/jersey-core-1.1.1-ea.jar old mode 100755 new mode 100644 diff --git a/lib/jersey-server-1.1.1-ea.jar b/lib/jersey-server-1.1.1-ea.jar old mode 100755 new mode 100644 diff --git a/lib/scala-stats-1.0.jar b/lib/scala-stats-1.0.jar new file mode 100644 index 0000000000..6b1b43bf7b Binary files /dev/null and b/lib/scala-stats-1.0.jar differ diff --git a/samples-scala/src/main/scala/SimpleService.scala b/samples-scala/src/main/scala/SimpleService.scala index 79eaaad468..59d28e52e6 100644 --- a/samples-scala/src/main/scala/SimpleService.scala +++ b/samples-scala/src/main/scala/SimpleService.scala @@ -114,4 +114,4 @@ class JsonpFilter extends BroadcastFilter[String] with Logging { message + "\" }); \n\n") } -} \ No newline at end of file +}