=act,tes Initial draft of internal MetricsKit

Note: This is NOT aimed to provide an micro-benchmarking solution.
The goal is to provide data for broad trend analysis. For techniques
that fight the inliner and other specialised techniques, refer to JMH.

+ custom console and graphite reporters
  - had to be custom because it's not possible to add custom metric
    types to the existing reporters
+ initial hdr.Histogram histogram() provider, see
  http://latencyutils.github.io/LatencyUtils/
+ Not using timers provided by Metrics, instead use the above histogram
+ Added average Actor size measurement
+ Measuring the "blocking time" when an actor is created, before we fire
  of the async part of this process; Measures in loop and will fluctuate
  a lot. Times are in `us` -- System.nanoTime should provide good enough
  resolution.
+ Measuring total actor creation time by using
  `KnownOpsInTimespanTimer`, which given a known number of ops, in a
  large amount of time, roughtly estimates time per one operation.
  // Yes, we are aware of the possibility of GC pauses and other horrors
+ All classes are `private[akka]`, we should not encourage people to use
  this yet
+ Counters use Java 8's `LongAdder`, which is metric's private;
  The new trend in Java land will be copy paste-ing this class ;)
+ Metrics are logged to Graphite, so we can long-term analyse these
+ Reporters are configurable using typesafe-config

! I'm not very happy about how I work around Metrics not being too open
  for adding additional custom metrics. Seems like a hack at places.
  I will consider removing the Metrics dependency all together.
  numbers

Example output:

```
-- KnownOpsInTimespanTimer-------------------------------------------
  actor-creation.total.creating-100000-actors.Props|new-EmptyArgsActor|…||-same
               ops = 100000
              time = 1.969 s
             ops/s = 50782.22
               avg = 19.69 μs

-- AveragingGauge---------------------------------------------------
  actor-creation.Props|new-EmptyArgsActor|…||-same.avg-mem-per-actor
                avg = 439.67
```
This commit is contained in:
Konrad 'ktoso' Malawski 2014-04-29 10:50:36 +02:00
parent 3cc84f11fc
commit 684c0279ec
22 changed files with 1869 additions and 156 deletions

View file

@ -0,0 +1,223 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import scala.language.postfixOps
import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec }
import scala.concurrent.duration._
import akka.TestUtils
import akka.testkit.metrics._
import org.scalatest.BeforeAndAfterAll
import java.io.PrintStream
import java.util.concurrent.TimeUnit
import akka.testkit.metrics.HeapMemoryUsage
object ActorCreationPerfSpec {
final case class Create(number: Int, props: () Props)
case object Created
case object IsAlive
case object Alive
case object WaitForChildren
case object Waited
class EmptyActor extends Actor {
def receive = {
case IsAlive sender() ! Alive
}
}
class EmptyArgsActor(val foo: Int, val bar: Int) extends Actor {
def receive = {
case IsAlive sender() ! Alive
}
}
class TimingDriver(hist: HdrHistogram) extends Actor {
def receive = {
case IsAlive
sender() ! Alive
case Create(number, propsCreator)
for (i 1 to number) {
val s = System.nanoTime()
context.actorOf(propsCreator.apply())
hist.update(System.nanoTime - s)
}
sender() ! Created
case WaitForChildren
context.children.foreach(_ ! IsAlive)
context.become(waiting(context.children.size, sender()), discardOld = false)
}
def waiting(number: Int, replyTo: ActorRef): Receive = {
var current = number
{
case Alive
current -= 1
if (current == 0) {
replyTo ! Waited
context.unbecome()
}
}
}
}
class Driver extends Actor {
def receive = {
case IsAlive
sender() ! Alive
case Create(number, propsCreator)
for (i 1 to number) {
context.actorOf(propsCreator.apply())
}
sender() ! Created
case WaitForChildren
context.children.foreach(_ ! IsAlive)
context.become(waiting(context.children.size, sender()), discardOld = false)
}
def waiting(number: Int, replyTo: ActorRef): Receive = {
var current = number
{
case Alive
current -= 1
if (current == 0) {
replyTo ! Waited
context.unbecome()
}
}
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender
with MetricsKit with BeforeAndAfterAll {
import ActorCreationPerfSpec._
def metricsConfig = system.settings.config
val ActorCreationKey = MetricKey.fromString("actor-creation")
val BlockingTimeKey = ActorCreationKey / "synchronous-part"
val TotalTimeKey = ActorCreationKey / "total"
val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000)
val nrOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000)
val nrOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 3)
def runWithCounterInside(metricName: String, scenarioName: String, number: Int, propsCreator: () Props) {
val hist = histogram(BlockingTimeKey / metricName, 100.millis.toNanos, 5, "ns")
val driver = system.actorOf(Props(classOf[TimingDriver], hist), scenarioName)
driver ! IsAlive
expectMsg(Alive)
driver ! Create(number, propsCreator)
expectMsgPF(15 seconds, s"$scenarioName waiting for Created") { case Created }
driver ! WaitForChildren
expectMsgPF(15 seconds, s"$scenarioName waiting for Waited") { case Waited }
driver ! PoisonPill
TestUtils.verifyActorTermination(driver, 15 seconds)
gc()
}
def runWithoutCounter(scenarioName: String, number: Int, propsCreator: () Props): HeapMemoryUsage = {
val mem = measureMemory(TotalTimeKey / scenarioName)
val driver = system.actorOf(Props(classOf[Driver]), scenarioName)
driver ! IsAlive
expectMsg(Alive)
gc()
val before = mem.getHeapSnapshot
driver ! Create(number, propsCreator)
expectMsgPF(15 seconds, s"$scenarioName waiting for Created") { case Created }
driver ! WaitForChildren
expectMsgPF(15 seconds, s"$scenarioName waiting for Waited") { case Waited }
gc()
val after = mem.getHeapSnapshot
driver ! PoisonPill
TestUtils.verifyActorTermination(driver, 15 seconds)
after diff before
}
def registerTests(name: String, propsCreator: () Props) {
val scenarioName = name.replaceAll("""[^\w]""", "")
s"warm-up before: $name" taggedAs PerformanceTest in {
if (warmUp > 0) {
runWithoutCounter(s"${scenarioName}_warmup", warmUp, propsCreator)
}
clearMetrics()
}
s"measure synchronous blocked time for $name" taggedAs PerformanceTest in {
// note: measuring per-actor-memory-use in this scenario is skewed as the Actor contains references to counters etc!
// for measuring actor size use refer to the `runWithoutCounter` method
for (i 1 to nrOfRepeats) {
runWithCounterInside(name, s"${scenarioName}_driver_inside_$i", nrOfActors, propsCreator)
}
reportAndClearMetrics()
}
s"measure total creation time for $name" taggedAs PerformanceTest in {
val avgMem = averageGauge(ActorCreationKey / name / "avg-mem-per-actor")
for (i 1 to nrOfRepeats) {
val heapUsed = timedWithKnownOps(TotalTimeKey / s"creating-$nrOfActors-actors" / name, ops = nrOfActors) {
runWithoutCounter(s"${scenarioName}_driver_outside_$i", nrOfActors, propsCreator)
}
avgMem.add(heapUsed.used / nrOfActors) // average actor size, over nrOfRepeats
// time is handled by the histogram already
}
reportAndClearMetrics()
}
}
"Actor creation with actorOf" must {
registerTests("Props[EmptyActor] with new Props", () Props[EmptyActor])
val props1 = Props[EmptyActor]
registerTests("Props[EmptyActor] with same Props", () props1)
registerTests("Props(new EmptyActor) new", () { Props(new EmptyActor) })
val props2 = Props(new EmptyActor)
registerTests("Props(new EmptyActor) same", () { props2 })
registerTests("Props(classOf[EmptyArgsActor], ...) new", () { Props(classOf[EmptyArgsActor], 4711, 1729) })
val props3 = Props(classOf[EmptyArgsActor], 4711, 1729)
registerTests("Props(classOf[EmptyArgsActor], ...) same", () { props3 })
registerTests("Props(new EmptyArgsActor(...)) new", () { Props(new EmptyArgsActor(4711, 1729)) })
val props4 = Props(new EmptyArgsActor(4711, 1729))
registerTests("Props(new EmptyArgsActor(...)) same", () { props4 })
}
override def afterTermination() = shutdownMetrics()
override def expectedTestDuration = 2 minutes
}

View file

@ -1,150 +0,0 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.actor
import scala.language.postfixOps
import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec }
import java.lang.management.ManagementFactory
import scala.concurrent.duration._
import akka.TestUtils
import scala.util.Try
object ActorPerfSpec {
final case class Create(number: Int, props: () Props)
case object Created
case object IsAlive
case object Alive
final case class WaitForChildren(number: Int)
case object Waited
class EmptyActor extends Actor {
def receive = {
case IsAlive sender() ! Alive
}
}
class EmptyArgsActor(val foo: Int, val bar: Int) extends Actor {
def receive = {
case IsAlive sender() ! Alive
}
}
class Driver extends Actor {
def receive = {
case IsAlive
sender() ! Alive
case Create(number, propsCreator)
for (i 1 to number) {
context.actorOf(propsCreator.apply())
}
sender() ! Created
case WaitForChildren(number)
context.children.foreach(_ ! IsAlive)
context.become(waiting(number, sender()), false)
}
def waiting(number: Int, replyTo: ActorRef): Receive = {
var current = number
{
case Alive
current -= 1
if (current == 0) {
replyTo ! Waited
context.unbecome()
}
}
}
}
}
@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner])
class ActorPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender {
import ActorPerfSpec._
val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000)
val numberOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000)
val numberOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 2)
def testActorCreation(name: String, propsCreator: () Props): Unit = {
val actorName = name.replaceAll("[ #\\?/!\\*%\\(\\)\\[\\]]", "_")
if (warmUp > 0)
measure(s"${actorName}_warmup", warmUp, propsCreator)
val results = for (i 1 to numberOfRepeats) yield measure(s"${actorName}_driver_$i", numberOfActors, propsCreator)
results.foreach {
case (duration, memory)
val micros = duration.toMicros
val avgMicros = micros.toDouble / numberOfActors
val avgMemory = memory.toDouble / numberOfActors
println(s"$name Created $numberOfActors");
println(s"In $micros us, avg: ${avgMicros}")
println(s"Footprint ${memory / 1024} KB, avg: ${avgMemory} B")
}
}
def measure(name: String, number: Int, propsCreator: () Props): (Duration, Long) = {
val memMx = ManagementFactory.getMemoryMXBean()
val driver = system.actorOf(Props[Driver], name)
driver ! IsAlive
expectMsg(Alive)
System.gc()
val memBefore = memMx.getHeapMemoryUsage
val start = System.nanoTime()
driver ! Create(number, propsCreator)
expectMsgPF(15 seconds, s"$name waiting for Created") { case Created }
val stop = System.nanoTime()
val duration = Duration.fromNanos(stop - start)
driver ! WaitForChildren(number)
expectMsgPF(15 seconds, s"$name waiting for Waited") { case Waited }
System.gc()
val memAfter = memMx.getHeapMemoryUsage
driver ! PoisonPill
TestUtils.verifyActorTermination(driver, 15 seconds)
(duration, memAfter.getUsed - memBefore.getUsed)
}
"Actor creation with actorFor" must {
"measure time for Props[EmptyActor] with new Props" taggedAs PerformanceTest in {
testActorCreation("Props[EmptyActor] new", () { Props[EmptyActor] })
}
"measure time for Props[EmptyActor] with same Props" taggedAs PerformanceTest in {
val props = Props[EmptyActor]
testActorCreation("Props[EmptyActor] same", () { props })
}
"measure time for Props(new EmptyActor) with new Props" taggedAs PerformanceTest in {
testActorCreation("Props(new EmptyActor) new", () { Props(new EmptyActor) })
}
"measure time for Props(new EmptyActor) with same Props" taggedAs PerformanceTest in {
val props = Props(new EmptyActor)
testActorCreation("Props(new EmptyActor) same", () { props })
}
"measure time for Props(classOf[EmptyArgsActor], ...) with new Props" taggedAs PerformanceTest in {
testActorCreation("Props(classOf[EmptyArgsActor], ...) new", () { Props(classOf[EmptyArgsActor], 4711, 1729) })
}
"measure time for Props(classOf[EmptyArgsActor], ...) with same Props" taggedAs PerformanceTest in {
val props = Props(classOf[EmptyArgsActor], 4711, 1729)
testActorCreation("Props(classOf[EmptyArgsActor], ...) same", () { props })
}
"measure time for Props(new EmptyArgsActor(...)) with new Props" taggedAs PerformanceTest in {
testActorCreation("Props(new EmptyArgsActor(...)) new", () { Props(new EmptyArgsActor(4711, 1729)) })
}
"measure time for Props(new EmptyArgsActor(...)) with same Props" taggedAs PerformanceTest in {
val props = Props(new EmptyArgsActor(4711, 1729))
testActorCreation("Props(new EmptyArgsActor(...)) same", () { props })
}
}
override def expectedTestDuration = 2 minutes
}

View file

@ -73,7 +73,7 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit
private var processor: ActorRef = _
override protected def beforeEach: Unit = {
override protected def beforeEach(): Unit = {
super.beforeEach()
setupTestProcessorData()
processor = createTestProcessor()

View file

@ -0,0 +1,197 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?revision=1.14&view=markup
*/
package akka.testkit.metrics;
import java.io.Serializable;
import java.util.concurrent.atomic.AtomicLong;
// CHECKSTYLE:OFF
/**
* One or more variables that together maintain an initially zero {@code long} sum. When updates
* (method {@link #add}) are contended across threads, the set of variables may grow dynamically to
* reduce contention. Method {@link #sum} (or, equivalently, {@link #longValue}) returns the current
* total combined across the variables maintaining the sum.
* <p/>
* <p>This class is usually preferable to {@link AtomicLong} when multiple threads update a common
* sum that is used for purposes such as collecting statistics, not for fine-grained synchronization
* control. Under low update contention, the two classes have similar characteristics. But under
* high contention, expected throughput of this class is significantly higher, at the expense of
* higher space consumption.
* <p/>
* <p>This class extends {@link Number}, but does <em>not</em> define methods such as {@code
* equals}, {@code hashCode} and {@code compareTo} because instances are expected to be mutated, and
* so are not useful as collection keys.
* <p/>
* <p><em>jsr166e note: This class is targeted to be placed in java.util.concurrent.atomic.</em>
*
* @author Doug Lea
* @since 1.8
*/
@SuppressWarnings("all")
class LongAdder extends Striped64 implements Serializable {
private static final long serialVersionUID = 7249069246863182397L;
/**
* Version of plus for use in retryUpdate
*/
final long fn(long v, long x) {
return v + x;
}
/**
* Creates a new adder with initial sum of zero.
*/
LongAdder() {
}
/**
* Adds the given value.
*
* @param x the value to add
*/
public void add(long x) {
Cell[] as;
long b, v;
HashCode hc;
Cell a;
int n;
if ((as = cells) != null || !casBase(b = base, b + x)) {
boolean uncontended = true;
int h = (hc = threadHashCode.get()).code;
if (as == null || (n = as.length) < 1 ||
(a = as[(n - 1) & h]) == null ||
!(uncontended = a.cas(v = a.value, v + x)))
retryUpdate(x, hc, uncontended);
}
}
/**
* Equivalent to {@code add(1)}.
*/
public void increment() {
add(1L);
}
/**
* Equivalent to {@code add(-1)}.
*/
public void decrement() {
add(-1L);
}
/**
* Returns the current sum. The returned value is <em>NOT</em> an atomic snapshot; invocation
* in the absence of concurrent updates returns an accurate result, but concurrent updates that
* occur while the sum is being calculated might not be incorporated.
*
* @return the sum
*/
public long sum() {
long sum = base;
Cell[] as = cells;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null)
sum += a.value;
}
}
return sum;
}
/**
* Resets variables maintaining the sum to zero. This method may be a useful alternative to
* creating a new adder, but is only effective if there are no concurrent updates. Because this
* method is intrinsically racy, it should only be used when it is known that no threads are
* concurrently updating.
*/
public void reset() {
internalReset(0L);
}
/**
* Equivalent in effect to {@link #sum} followed by {@link #reset}. This method may apply for
* example during quiescent points between multithreaded computations. If there are updates
* concurrent with this method, the returned value is <em>not</em> guaranteed to be the final
* value occurring before the reset.
*
* @return the sum
*/
public long sumThenReset() {
long sum = base;
Cell[] as = cells;
base = 0L;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null) {
sum += a.value;
a.value = 0L;
}
}
}
return sum;
}
/**
* Returns the String representation of the {@link #sum}.
*
* @return the String representation of the {@link #sum}
*/
public String toString() {
return Long.toString(sum());
}
/**
* Equivalent to {@link #sum}.
*
* @return the sum
*/
public long longValue() {
return sum();
}
/**
* Returns the {@link #sum} as an {@code int} after a narrowing primitive conversion.
*/
public int intValue() {
return (int) sum();
}
/**
* Returns the {@link #sum} as a {@code float} after a widening primitive conversion.
*/
public float floatValue() {
return (float) sum();
}
/**
* Returns the {@link #sum} as a {@code double} after a widening primitive conversion.
*/
public double doubleValue() {
return (double) sum();
}
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
s.defaultWriteObject();
s.writeLong(sum());
}
private void readObject(java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
busy = 0;
cells = null;
base = s.readLong();
}
}
// CHECKSTYLE:ON

View file

@ -0,0 +1,354 @@
/*
* Written by Doug Lea with assistance from members of JCP JSR-166
* Expert Group and released to the public domain, as explained at
* http://creativecommons.org/publicdomain/zero/1.0/
*
* http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.8&view=markup
*/
package akka.testkit.metrics;
import java.util.Random;
// CHECKSTYLE:OFF
/**
* A package-local class holding common representation and mechanics for classes supporting dynamic
* striping on 64bit values. The class extends Number so that concrete subclasses must publicly do
* so.
*/
@SuppressWarnings("all")
abstract class Striped64 extends Number {
/*
* This class maintains a lazily-initialized table of atomically
* updated variables, plus an extra "base" field. The table size
* is a power of two. Indexing uses masked per-thread hash codes.
* Nearly all declarations in this class are package-private,
* accessed directly by subclasses.
*
* Table entries are of class Cell; a variant of AtomicLong padded
* to reduce cache contention on most processors. Padding is
* overkill for most Atomics because they are usually irregularly
* scattered in memory and thus don't interfere much with each
* other. But Atomic objects residing in arrays will tend to be
* placed adjacent to each other, and so will most often share
* cache lines (with a huge negative performance impact) without
* this precaution.
*
* In part because Cells are relatively large, we avoid creating
* them until they are needed. When there is no contention, all
* updates are made to the base field. Upon first contention (a
* failed CAS on base update), the table is initialized to size 2.
* The table size is doubled upon further contention until
* reaching the nearest power of two greater than or equal to the
* number of CPUS. Table slots remain empty (null) until they are
* needed.
*
* A single spinlock ("busy") is used for initializing and
* resizing the table, as well as populating slots with new Cells.
* There is no need for a blocking lock; when the lock is not
* available, threads try other slots (or the base). During these
* retries, there is increased contention and reduced locality,
* which is still better than alternatives.
*
* Per-thread hash codes are initialized to random values.
* Contention and/or table collisions are indicated by failed
* CASes when performing an update operation (see method
* retryUpdate). Upon a collision, if the table size is less than
* the capacity, it is doubled in size unless some other thread
* holds the lock. If a hashed slot is empty, and lock is
* available, a new Cell is created. Otherwise, if the slot
* exists, a CAS is tried. Retries proceed by "double hashing",
* using a secondary hash (Marsaglia XorShift) to try to find a
* free slot.
*
* The table size is capped because, when there are more threads
* than CPUs, supposing that each thread were bound to a CPU,
* there would exist a perfect hash function mapping threads to
* slots that eliminates collisions. When we reach capacity, we
* search for this mapping by randomly varying the hash codes of
* colliding threads. Because search is random, and collisions
* only become known via CAS failures, convergence can be slow,
* and because threads are typically not bound to CPUS forever,
* may not occur at all. However, despite these limitations,
* observed contention rates are typically low in these cases.
*
* It is possible for a Cell to become unused when threads that
* once hashed to it terminate, as well as in the case where
* doubling the table causes no thread to hash to it under
* expanded mask. We do not try to detect or remove such cells,
* under the assumption that for long-running instances, observed
* contention levels will recur, so the cells will eventually be
* needed again; and for short-lived ones, it does not matter.
*/
/**
* Padded variant of AtomicLong supporting only raw accesses plus CAS. The value field is placed
* between pads, hoping that the JVM doesn't reorder them.
* <p/>
* JVM intrinsics note: It would be possible to use a release-only form of CAS here, if it were
* provided.
*/
static final class Cell {
volatile long p0, p1, p2, p3, p4, p5, p6;
volatile long value;
volatile long q0, q1, q2, q3, q4, q5, q6;
Cell(long x) {
value = x;
}
final boolean cas(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long valueOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> ak = Cell.class;
valueOffset = UNSAFE.objectFieldOffset
(ak.getDeclaredField("value"));
} catch (Exception e) {
throw new Error(e);
}
}
}
/**
* Holder for the thread-local hash code. The code is initially random, but may be set to a
* different value upon collisions.
*/
static final class HashCode {
static final Random rng = new Random();
int code;
HashCode() {
int h = rng.nextInt(); // Avoid zero to allow xorShift rehash
code = (h == 0) ? 1 : h;
}
}
/**
* The corresponding ThreadLocal class
*/
static final class ThreadHashCode extends ThreadLocal<HashCode> {
public HashCode initialValue() {
return new HashCode();
}
}
/**
* Static per-thread hash codes. Shared across all instances to reduce ThreadLocal pollution and
* because adjustments due to collisions in one table are likely to be appropriate for others.
*/
static final ThreadHashCode threadHashCode = new ThreadHashCode();
/**
* Number of CPUS, to place bound on table size
*/
static final int NCPU = Runtime.getRuntime().availableProcessors();
/**
* Table of cells. When non-null, size is a power of 2.
*/
transient volatile Cell[] cells;
/**
* Base value, used mainly when there is no contention, but also as a fallback during table
* initialization races. Updated via CAS.
*/
transient volatile long base;
/**
* Spinlock (locked via CAS) used when resizing and/or creating Cells.
*/
transient volatile int busy;
/**
* Package-private default constructor
*/
Striped64() {
}
/**
* CASes the base field.
*/
final boolean casBase(long cmp, long val) {
return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val);
}
/**
* CASes the busy field from 0 to 1 to acquire lock.
*/
final boolean casBusy() {
return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1);
}
/**
* Computes the function of current and new value. Subclasses should open-code this update
* function for most uses, but the virtualized form is needed within retryUpdate.
*
* @param currentValue the current value (of either base or a cell)
* @param newValue the argument from a user update call
* @return result of the update function
*/
abstract long fn(long currentValue, long newValue);
/**
* Handles cases of updates involving initialization, resizing, creating new Cells, and/or
* contention. See above for explanation. This method suffers the usual non-modularity problems
* of optimistic retry code, relying on rechecked sets of reads.
*
* @param x the value
* @param hc the hash code holder
* @param wasUncontended false if CAS failed before call
*/
final void retryUpdate(long x, HashCode hc, boolean wasUncontended) {
int h = hc.code;
boolean collide = false; // True if last slot nonempty
for (; ; ) {
Cell[] as;
Cell a;
int n;
long v;
if ((as = cells) != null && (n = as.length) > 0) {
if ((a = as[(n - 1) & h]) == null) {
if (busy == 0) { // Try to attach new Cell
Cell r = new Cell(x); // Optimistically create
if (busy == 0 && casBusy()) {
boolean created = false;
try { // Recheck under lock
Cell[] rs;
int m, j;
if ((rs = cells) != null &&
(m = rs.length) > 0 &&
rs[j = (m - 1) & h] == null) {
rs[j] = r;
created = true;
}
} finally {
busy = 0;
}
if (created)
break;
continue; // Slot is now non-empty
}
}
collide = false;
} else if (!wasUncontended) // CAS already known to fail
wasUncontended = true; // Continue after rehash
else if (a.cas(v = a.value, fn(v, x)))
break;
else if (n >= NCPU || cells != as)
collide = false; // At max size or stale
else if (!collide)
collide = true;
else if (busy == 0 && casBusy()) {
try {
if (cells == as) { // Expand table unless stale
Cell[] rs = new Cell[n << 1];
for (int i = 0; i < n; ++i)
rs[i] = as[i];
cells = rs;
}
} finally {
busy = 0;
}
collide = false;
continue; // Retry with expanded table
}
h ^= h << 13; // Rehash
h ^= h >>> 17;
h ^= h << 5;
} else if (busy == 0 && cells == as && casBusy()) {
boolean init = false;
try { // Initialize table
if (cells == as) {
Cell[] rs = new Cell[2];
rs[h & 1] = new Cell(x);
cells = rs;
init = true;
}
} finally {
busy = 0;
}
if (init)
break;
} else if (casBase(v = base, fn(v, x)))
break; // Fall back on using base
}
hc.code = h; // Record index for next time
}
/**
* Sets base and all cells to the given value.
*/
final void internalReset(long initialValue) {
Cell[] as = cells;
base = initialValue;
if (as != null) {
int n = as.length;
for (int i = 0; i < n; ++i) {
Cell a = as[i];
if (a != null)
a.value = initialValue;
}
}
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long baseOffset;
private static final long busyOffset;
static {
try {
UNSAFE = getUnsafe();
Class<?> sk = Striped64.class;
baseOffset = UNSAFE.objectFieldOffset
(sk.getDeclaredField("base"));
busyOffset = UNSAFE.objectFieldOffset
(sk.getDeclaredField("busy"));
} catch (Exception e) {
throw new Error(e);
}
}
/**
* Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. Replace with a simple
* call to Unsafe.getUnsafe when integrating into a jdk.
*
* @return a sun.misc.Unsafe
*/
private static sun.misc.Unsafe getUnsafe() {
try {
return sun.misc.Unsafe.getUnsafe();
} catch (SecurityException ignored) {
}
try {
return java.security.AccessController.doPrivileged
(new java.security.PrivilegedExceptionAction<sun.misc.Unsafe>() {
public sun.misc.Unsafe run() throws Exception {
Class<sun.misc.Unsafe> k = sun.misc.Unsafe.class;
for (java.lang.reflect.Field f : k.getDeclaredFields()) {
f.setAccessible(true);
Object x = f.get(null);
if (k.isInstance(x))
return k.cast(x);
}
throw new NoSuchFieldError("the Unsafe");
}
});
} catch (java.security.PrivilegedActionException e) {
throw new RuntimeException("Could not initialize intrinsics",
e.getCause());
}
}
}
// CHECKSTYLE:ON

View file

@ -0,0 +1,31 @@
akka {
# Configures MetricsKit
test.metrics {
# Available reporters are: console, graphite
# In order to configure from the command line, use the alternative list syntax:
# -Dakka.test.metrics.reporters.0=console -Dakka.test.metrics.reporters.1=graphite
reporters = [console]
reporter {
console {
# Automatically print metrics to the console at scheduled interval.
# To disable, set to `0`.
scheduled-report-interval = 0 ms
# enable for very verbose / detailed printouts
verbose = false
}
graphite {
prefix = "local"
host = ""
port = 2003
# Automatically print metrics to the console at scheduled interval.
# To disable, set to `0`.
scheduled-report-interval = 1 s
}
}
}
}

View file

@ -0,0 +1,30 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import com.codahale.metrics.Gauge
/**
* Gauge which exposes the Arithmetic Mean of values given to it.
*
* Can be used to expose average of a series of values to [[com.codahale.metrics.ScheduledReporter]]s.
*/
class AveragingGauge extends Gauge[Double] {
private val sum = new LongAdder
private val count = new LongAdder
def add(n: Long) {
count.increment()
sum add n
}
def add(ns: Seq[Long]) {
// takes a mutable Seq on order to allow use with Array's
count add ns.length
sum add ns.sum
}
override def getValue: Double = sum.sum().toDouble / count.sum()
}

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import java.util
import collection.JavaConverters._
import java.lang.management.{ OperatingSystemMXBean, ManagementFactory }
import com.codahale.metrics.{ Gauge, Metric, MetricSet }
import com.codahale.metrics.MetricRegistry._
import com.codahale.metrics.jvm.FileDescriptorRatioGauge
/**
* MetricSet exposing number of open and maximum file descriptors used by the JVM process.
*/
private[akka] class FileDescriptorMetricSet(os: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean) extends MetricSet {
override def getMetrics: util.Map[String, Metric] = {
Map[String, Metric](
name("file-descriptors", "open") -> new Gauge[Long] {
override def getValue: Long = invoke("getOpenFileDescriptorCount")
},
name("file-descriptors", "max") -> new Gauge[Long] {
override def getValue: Long = invoke("getMaxFileDescriptorCount")
},
name("file-descriptors", "ratio") -> new FileDescriptorRatioGauge(os)).asJava
}
private def invoke(name: String): Long = {
val method = os.getClass.getDeclaredMethod(name)
method.setAccessible(true)
method.invoke(os).asInstanceOf[Long]
}
}

View file

@ -0,0 +1,43 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import com.codahale.metrics.{ Snapshot, Sampling, Metric }
import org.{ HdrHistogram hdr }
import java.util.{ Arrays, Collections }
import java.lang.Math._
import java.io.{ OutputStream, OutputStreamWriter, PrintWriter }
/**
* Adapts Gil Tene's HdrHistogram to Metric's Metric interface.
*
* @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive
* integer that is >= 2.
* @param numberOfSignificantValueDigits The number of significant decimal digits to which the histogram will
* maintain value resolution and separation. Must be a non-negative
* integer between 0 and 5.
*/
private[akka] class HdrHistogram(
highestTrackableValue: Long,
numberOfSignificantValueDigits: Int,
val unit: String = "")
extends Metric {
private val hist = new hdr.Histogram(highestTrackableValue, numberOfSignificantValueDigits)
def update(value: Long) {
hist.recordValue(value)
}
def updateWithExpectedInterval(value: Long, expectedIntervalBetweenValueSamples: Long) {
hist.recordValueWithExpectedInterval(value, expectedIntervalBetweenValueSamples)
}
def updateWithCount(value: Long, count: Long) {
hist.recordValueWithCount(value, count)
}
def getData = hist.copy().getHistogramData
}

View file

@ -0,0 +1,37 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import com.codahale.metrics._
import java.util.concurrent.atomic.AtomicLong
import java.util.concurrent.TimeUnit._
/**
* Specialised "one-shot" Timer.
* Given a known number of operations performed within a time span (to be measured) it displays the average time one operation took.
*
* Please note that this is a *very coarse* estimation; The gain though is that we do not have to perform the counting inside of the measured thing (we can adding in tight loops).
*/
class KnownOpsInTimespanTimer(expectedOps: Long) extends Metric with Counting {
val startTime = System.nanoTime
val stopTime = new AtomicLong(0)
/**
* Stops the Timer.
* Can be called multiple times, though only the first call will be taken into account.
*
* @return true if this was the first call to `stop`, false otherwise
*/
def stop(): Boolean = stopTime.compareAndSet(0, System.nanoTime)
override def getCount: Long = expectedOps
def elapsedTime: Long = stopTime.get - startTime
def avgDuration: Long = (elapsedTime.toDouble / expectedOps).toLong
def opsPerSecond: Double = expectedOps.toDouble / (elapsedTime.toDouble / NANOSECONDS.convert(1, SECONDS))
}

View file

@ -0,0 +1,78 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import com.codahale.metrics._
import com.codahale.metrics.jvm
private[akka] trait MemoryUsageSnapshotting extends MetricsPrefix {
this: jvm.MemoryUsageGaugeSet
// accessing metrics in order to not to duplicate mxBean access too much
def getHeapSnapshot = {
val metrics = getMetrics
HeapMemoryUsage(
metrics.get(key("heap-init")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("heap-used")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("heap-max")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("heap-committed")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("heap-usage")).asInstanceOf[RatioGauge].getValue)
}
def getTotalSnapshot = {
val metrics = getMetrics
TotalMemoryUsage(
metrics.get(key("total-init")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("total-used")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("total-max")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("total-committed")).asInstanceOf[Gauge[Long]].getValue)
}
def getNonHeapSnapshot = {
val metrics = getMetrics
NonHeapMemoryUsage(
metrics.get(key("non-heap-init")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("non-heap-used")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("non-heap-max")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("non-heap-committed")).asInstanceOf[Gauge[Long]].getValue,
metrics.get(key("non-heap-usage")).asInstanceOf[RatioGauge].getValue)
}
private def key(k: String) = prefix + "." + k
}
private[akka] case class TotalMemoryUsage(init: Long, used: Long, max: Long, comitted: Long) {
def diff(other: TotalMemoryUsage): TotalMemoryUsage =
TotalMemoryUsage(
this.init - other.init,
this.used - other.used,
this.max - other.max,
this.comitted - other.comitted)
}
private[akka] case class HeapMemoryUsage(init: Long, used: Long, max: Long, comitted: Long, usage: Double) {
def diff(other: HeapMemoryUsage): HeapMemoryUsage =
HeapMemoryUsage(
this.init - other.init,
this.used - other.used,
this.max - other.max,
this.comitted - other.comitted,
this.usage - other.usage)
}
private[akka] case class NonHeapMemoryUsage(init: Long, used: Long, max: Long, comitted: Long, usage: Double) {
def diff(other: NonHeapMemoryUsage): NonHeapMemoryUsage =
NonHeapMemoryUsage(
this.init - other.init,
this.used - other.used,
this.max - other.max,
this.comitted - other.comitted,
this.usage - other.usage)
}

View file

@ -0,0 +1,32 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
trait MetricKeyDSL {
case class MetricKey private[MetricKeyDSL] (path: String) {
import MetricKey._
def /(key: String): MetricKey = MetricKey(path + "." + sanitizeMetricKeyPart(key))
override def toString = path
}
object MetricKey {
def fromString(root: String) = MetricKey(sanitizeMetricKeyPart(root))
// todo not sure what else needs replacing, while keeping key as readable as can be
private def sanitizeMetricKeyPart(keyPart: String) =
keyPart
.replaceAll("""\.\.\.""", "\u2026") // ... =>
.replaceAll("""\.""", "-")
.replaceAll("""[\]\[\(\)\<\>]""", "|")
.replaceAll(" ", "-")
.replaceAll("/", "-")
}
}
object MetricKeyDSL extends MetricKeyDSL

View file

@ -0,0 +1,232 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import com.codahale.metrics._
import com.codahale.metrics.graphite.Graphite // todo impl our own
import java.net.InetSocketAddress
import java.util.concurrent.TimeUnit
import scala.concurrent.duration._
import com.typesafe.config.Config
import java.util
import scala.util.matching.Regex
import scala.collection.mutable
import akka.testkit.metrics.reporter.{ AkkaGraphiteReporter, AkkaConsoleReporter }
import org.scalatest.Notifying
import scala.reflect.ClassTag
/**
* Allows to easily measure performance / memory / file descriptor use in tests.
*
* WARNING: This trait should not be seen as utility for micro-benchmarking,
* please refer to <a href="http://openjdk.java.net/projects/code-tools/jmh/">JMH</a> if that's what you're writing.
* This trait instead aims to give an high level overview as well as data for trend-analysis of long running tests.
*
* Reporting defaults to [[ConsoleReporter]].
* In order to send registry to Graphite run sbt with the following property: `-Dakka.registry.reporting.0=graphite`.
*/
private[akka] trait MetricsKit extends MetricsKitOps {
this: Notifying
import MetricsKit._
import collection.JavaConverters._
private var reporters: List[ScheduledReporter] = Nil
/**
* A configuration containing [[MetricsKitSettings]] under the key `akka.test.registry` must be provided.
* This can be the ActorSystems config.
*
* The reason this is not handled by an Extension is thatwe do not want to enforce having to start an ActorSystem,
* since code measured using this Kit may not need one (e.g. measuring plain Queue implementations).
*/
def metricsConfig: Config
private[metrics] val registry = new MetricRegistry() with AkkaMetricRegistry
initMetricReporters()
def initMetricReporters() {
val settings = new MetricsKitSettings(metricsConfig)
def configureConsoleReporter() {
if (settings.Reporters.contains("console")) {
val akkaConsoleReporter = new AkkaConsoleReporter(registry, settings.ConsoleReporter.Verbose)
if (settings.ConsoleReporter.ScheduledReportInterval > Duration.Zero)
akkaConsoleReporter.start(settings.ConsoleReporter.ScheduledReportInterval.toMillis, TimeUnit.MILLISECONDS)
reporters ::= akkaConsoleReporter
}
}
def configureGraphiteReporter() {
if (settings.Reporters.contains("graphite")) {
note(s"MetricsKit: Graphite reporter enabled, sending metrics to: ${settings.GraphiteReporter.Host}:${settings.GraphiteReporter.Port}")
val graphite = new Graphite(new InetSocketAddress(settings.GraphiteReporter.Host, settings.GraphiteReporter.Port))
val akkaGraphiteReporter = new AkkaGraphiteReporter(registry, settings.GraphiteReporter.Prefix, graphite)
if (settings.GraphiteReporter.ScheduledReportInterval > Duration.Zero) {
akkaGraphiteReporter.start(settings.GraphiteReporter.ScheduledReportInterval.toMillis, TimeUnit.MILLISECONDS)
}
reporters ::= akkaGraphiteReporter
}
}
configureConsoleReporter()
configureGraphiteReporter()
}
/**
* Schedule metric reports execution iterval. Should not be used multiple times
*/
def scheduleMetricReports(every: FiniteDuration) {
reporters foreach { _.start(every.toMillis, TimeUnit.MILLISECONDS) }
}
def registeredMetrics = registry.getMetrics.asScala
/**
* Causes immediate flush of metrics, using all registered reporters.
* Afterwards all metrics are removed from the registry.
*
* HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting.
*/
def reportAndClearMetrics() {
reporters foreach { _.report() }
clearMetrics()
}
/**
* Causes immediate flush of metrics, using all registered reporters.
*
* HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting.
*/
def reportMetrics() {
reporters foreach { _.report() }
}
/**
* Causes immediate flush of only memory related metrics, using all registered reporters.
*
* HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting.
*/
def reportMemoryMetrics() {
val gauges = registry.getGauges(MemMetricsFilter)
reporters foreach { _.report(gauges, empty, empty, empty, empty) }
}
/**
* Causes immediate flush of only memory related metrics, using all registered reporters.
*
* HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting.
*/
def reportGcMetrics() {
val gauges = registry.getGauges(GcMetricsFilter)
reporters foreach { _.report(gauges, empty, empty, empty, empty) }
}
/**
* Causes immediate flush of only file descriptor metrics, using all registered reporters.
*
* HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting.
*/
def reportFileDescriptorMetrics() {
val gauges = registry.getGauges(FileDescriptorMetricsFilter)
reporters foreach { _.report(gauges, empty, empty, empty, empty) }
}
/**
* Removes registered registry from registry.
* You should call this method then you're done measuring something - usually at the end of your test case,
* otherwise the registry from different tests would influence each others results (avg, min, max, ...).
*
* Please note that, if you have registered a `timer("thing")` previously, you will need to call `timer("thing")` again,
* in order to register a new timer.
*/
def clearMetrics(matching: MetricFilter = MetricFilter.ALL) {
registry.removeMatching(matching)
}
/**
* MUST be called after all tests have finished.
*/
def shutdownMetrics() {
reporters foreach { _.stop() }
}
private[metrics] def getOrRegister[M <: Metric](key: String, metric: M)(implicit tag: ClassTag[M]): M = {
import collection.JavaConverters._
registry.getMetrics.asScala.find(_._1 == key).map(_._2) match {
case Some(existing: M) existing
case Some(existing) throw new IllegalArgumentException("Key: [%s] is already for different kind of metric! Was [%s], expected [%s]".format(key, metric.getClass.getSimpleName, tag.runtimeClass.getSimpleName))
case _ registry.register(key, metric)
}
}
private val emptySortedMap = new util.TreeMap[String, Nothing]()
private def empty[T] = emptySortedMap.asInstanceOf[util.TreeMap[String, T]]
}
private[akka] object MetricsKit {
class RegexMetricFilter(regex: Regex) extends MetricFilter {
override def matches(name: String, metric: Metric) = regex.pattern.matcher(name).matches()
}
val MemMetricsFilter = new RegexMetricFilter(""".*\.mem\..*""".r)
val FileDescriptorMetricsFilter = new RegexMetricFilter(""".*\.file-descriptors\..*""".r)
val KnownOpsInTimespanCounterFilter = new MetricFilter {
override def matches(name: String, metric: Metric) = classOf[KnownOpsInTimespanTimer].isInstance(metric)
}
val GcMetricsFilter = new MetricFilter {
val keyPattern = """.*\.gc\..*""".r.pattern
override def matches(name: String, metric: Metric) = keyPattern.matcher(name).matches()
}
}
/** Provides access to custom Akka [[Metric]]s, with named methods. */
trait AkkaMetricRegistry {
this: MetricRegistry
def getKnownOpsInTimespanCounters = filterFor(classOf[KnownOpsInTimespanTimer])
def getHdrHistograms = filterFor(classOf[HdrHistogram])
def getAveragingGauges = filterFor(classOf[AveragingGauge])
import collection.JavaConverters._
private def filterFor[T](clazz: Class[T]): mutable.Iterable[(String, T)] =
for {
(key, metric) getMetrics.asScala
if clazz.isInstance(metric)
} yield key -> metric.asInstanceOf[T]
}
private[akka] class MetricsKitSettings(config: Config) {
import akka.util.Helpers._
val Reporters = config.getStringList("akka.test.metrics.reporters")
object GraphiteReporter {
val Prefix = config.getString("akka.test.metrics.reporter.graphite.prefix")
lazy val Host = config.getString("akka.test.metrics.reporter.graphite.host").requiring(v !v.trim.isEmpty, "akka.test.metrics.reporter.graphite.host was used but was empty!")
val Port = config.getInt("akka.test.metrics.reporter.graphite.port")
val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.graphite.scheduled-report-interval")
}
object ConsoleReporter {
val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.console.scheduled-report-interval")
val Verbose = config.getBoolean("akka.test.metrics.reporter.console.verbose")
}
}

View file

@ -0,0 +1,87 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import com.codahale.metrics
import com.codahale.metrics._
import java.util
import com.codahale.metrics.jvm
import com.codahale.metrics.jvm.MemoryUsageGaugeSet
/**
* User Land operations provided by the [[MetricsKit]].
*
* Extracted to give easy overview of user-API detached from MetricsKit internals.
*/
private[akka] trait MetricsKitOps extends MetricKeyDSL {
this: MetricsKit
type MetricKey = MetricKeyDSL#MetricKey
/** Simple thread-safe counter, backed by [[LongAdder]] so can pretty efficiently work even when hit by multiple threads */
def counter(key: MetricKey): Counter = registry.counter(key.toString)
/** Simple averaging Gauge, which exposes an arithmetic mean of the values added to it. */
def averageGauge(key: MetricKey): AveragingGauge = getOrRegister(key.toString, new AveragingGauge)
/**
* Used to measure timing of known number of operations over time.
* While not being the most percise, it allows to measure a coarse op/s without injecting counters to the measured operation (potentially hot-loop).
*
* Do not use for short running pieces of code.
*/
def timedWithKnownOps[T](key: MetricKey, ops: Long)(run: T): T = {
val c = getOrRegister(key.toString, new KnownOpsInTimespanTimer(expectedOps = ops))
try run finally c.stop()
}
/**
* Use when measuring for 9x'th percentiles as well as min / max / mean values.
*
* Backed by [[HdrHistogram]].
*
* @param unitString just for human readable output, during console printing
*/
def histogram(key: MetricKey, highestTrackableValue: Long, numberOfSignificantValueDigits: Int, unitString: String = ""): HdrHistogram =
getOrRegister((key / "hdr-histogram").toString, new HdrHistogram(highestTrackableValue, numberOfSignificantValueDigits, unitString))
/** Yet another delegate to `System.gc()` */
def gc() {
System.gc()
}
/**
* Enable memory measurements - will be logged by [[ScheduledReporter]]s if enabled.
* Must not be triggered multiple times - pass around the `MemoryUsageSnapshotting` if you need to measure different points.
*
* Also allows to [[MemoryUsageSnapshotting.getHeapSnapshot]] to obtain memory usage numbers at given point in time.
*/
def measureMemory(key: MetricKey): MemoryUsageGaugeSet with MemoryUsageSnapshotting = {
val gaugeSet = new jvm.MemoryUsageGaugeSet() with MemoryUsageSnapshotting {
val prefix = key / "mem"
}
registry.registerAll(gaugeSet)
gaugeSet
}
/** Enable GC measurements */
def measureGc(key: MetricKey) =
registry.registerAll(new jvm.GarbageCollectorMetricSet() with MetricsPrefix { val prefix = key / "gc" })
/** Enable File Descriptor measurements */
def measureFileDescriptors(key: MetricKey) =
registry.registerAll(new FileDescriptorMetricSet() with MetricsPrefix { val prefix = key / "file-descriptors" })
}
private[metrics] trait MetricsPrefix extends MetricSet {
def prefix: MetricKeyDSL#MetricKey
abstract override def getMetrics: util.Map[String, Metric] = {
// does not have to be fast, is only called once during registering registry
import collection.JavaConverters._
(super.getMetrics.asScala.map { case (k, v) (prefix / k).toString -> v }).asJava
}
}

View file

@ -0,0 +1,50 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics
import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfter, MustMatchers, WordSpec }
import com.typesafe.config.ConfigFactory
class MetricsKitSpec extends WordSpec with MustMatchers with BeforeAndAfter with BeforeAndAfterAll
with MetricsKit {
override def metricsConfig = ConfigFactory.load()
val KitKey = MetricKey.fromString("metrics-kit")
after {
clearMetrics()
}
override def afterAll() {
shutdownMetrics()
}
"MetricsKit" must {
"allow measuring file descriptor usage" in {
measureFileDescriptors(KitKey / "file-desc")
registeredMetrics.count(_._1 contains "file-descriptor") must be > 0
}
"allow to measure time, on known number of operations" in {
timedWithKnownOps(KitKey, ops = 10) {
2 + 2
}
}
"allow to measure average value using Gauge, given multiple values" in {
val sizes = List(1L, 2L, 3L)
val avg = averageGauge(KitKey / "avg-size")
avg.add(sizes)
avg.add(4)
avg.getValue must equal(2.5)
}
}
}

View file

@ -0,0 +1,153 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics.reporter
import java.io.PrintStream
import java.util
import java.util.concurrent.TimeUnit
import com.codahale.metrics._
import akka.testkit.metrics._
import scala.reflect.ClassTag
/**
* Used to report [[Metric]] types that the original [[ConsoleReporter]] is unaware of (cannot re-use directly because of private constructor).
*/
class AkkaConsoleReporter(
registry: AkkaMetricRegistry,
verbose: Boolean,
output: PrintStream = System.out)
extends ScheduledReporter(registry.asInstanceOf[MetricRegistry], "akka-console-reporter", MetricsKit.KnownOpsInTimespanCounterFilter, TimeUnit.SECONDS, TimeUnit.NANOSECONDS) {
private final val ConsoleWidth = 80
override def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
import collection.JavaConverters._
// default Metrics types
printMetrics(gauges.asScala, printGauge)
printMetrics(counters.asScala, printCounter)
printMetrics(histograms.asScala, printHistogram)
printMetrics(meters.asScala, printMeter)
printMetrics(timers.asScala, printTimer)
// custom Akka types
printMetrics(registry.getKnownOpsInTimespanCounters, printKnownOpsInTimespanCounter)
printMetrics(registry.getHdrHistograms, printHdrHistogram)
printMetrics(registry.getAveragingGauges, printAveragingGauge)
output.println()
output.flush()
}
def printMetrics[T <: Metric](metrics: Iterable[(String, T)], printer: T Unit)(implicit clazz: ClassTag[T]) {
if (!metrics.isEmpty) {
printWithBanner(s"-- ${simpleName(metrics.head._2.getClass)}", '-')
for ((key, metric) metrics) {
output.println(" " + key)
printer(metric)
}
output.println()
}
}
private def printMeter(meter: Meter) {
output.print(" count = %d%n".format(meter.getCount))
output.print(" mean rate = %2.2f events/%s%n".format(convertRate(meter.getMeanRate), getRateUnit))
output.print(" 1-minute rate = %2.2f events/%s%n".format(convertRate(meter.getOneMinuteRate), getRateUnit))
output.print(" 5-minute rate = %2.2f events/%s%n".format(convertRate(meter.getFiveMinuteRate), getRateUnit))
output.print(" 15-minute rate = %2.2f events/%s%n".format(convertRate(meter.getFifteenMinuteRate), getRateUnit))
}
private def printCounter(entry: Counter) {
output.print(" count = %d%n".format(entry.getCount))
}
private def printGauge(entry: Gauge[_]) {
output.print(" value = %s%n".format(entry.getValue))
}
private def printHistogram(histogram: Histogram) {
val snapshot = histogram.getSnapshot
output.print(" count = %d%n".format(histogram.getCount))
output.print(" min = %d%n".format(snapshot.getMin))
output.print(" max = %d%n".format(snapshot.getMax))
output.print(" mean = %2.2f%n".format(snapshot.getMean))
output.print(" stddev = %2.2f%n".format(snapshot.getStdDev))
output.print(" median = %2.2f%n".format(snapshot.getMedian))
output.print(" 75%% <= %2.2f%n".format(snapshot.get75thPercentile))
output.print(" 95%% <= %2.2f%n".format(snapshot.get95thPercentile))
output.print(" 98%% <= %2.2f%n".format(snapshot.get98thPercentile))
output.print(" 99%% <= %2.2f%n".format(snapshot.get99thPercentile))
output.print(" 99.9%% <= %2.2f%n".format(snapshot.get999thPercentile))
}
private def printTimer(timer: Timer) {
val snapshot = timer.getSnapshot
output.print(" count = %d%n".format(timer.getCount))
output.print(" mean rate = %2.2f calls/%s%n".format(convertRate(timer.getMeanRate), getRateUnit))
output.print(" 1-minute rate = %2.2f calls/%s%n".format(convertRate(timer.getOneMinuteRate), getRateUnit))
output.print(" 5-minute rate = %2.2f calls/%s%n".format(convertRate(timer.getFiveMinuteRate), getRateUnit))
output.print(" 15-minute rate = %2.2f calls/%s%n".format(convertRate(timer.getFifteenMinuteRate), getRateUnit))
output.print(" min = %2.2f %s%n".format(convertDuration(snapshot.getMin), getDurationUnit))
output.print(" max = %2.2f %s%n".format(convertDuration(snapshot.getMax), getDurationUnit))
output.print(" mean = %2.2f %s%n".format(convertDuration(snapshot.getMean), getDurationUnit))
output.print(" stddev = %2.2f %s%n".format(convertDuration(snapshot.getStdDev), getDurationUnit))
output.print(" median = %2.2f %s%n".format(convertDuration(snapshot.getMedian), getDurationUnit))
output.print(" 75%% <= %2.2f %s%n".format(convertDuration(snapshot.get75thPercentile), getDurationUnit))
output.print(" 95%% <= %2.2f %s%n".format(convertDuration(snapshot.get95thPercentile), getDurationUnit))
output.print(" 98%% <= %2.2f %s%n".format(convertDuration(snapshot.get98thPercentile), getDurationUnit))
output.print(" 99%% <= %2.2f %s%n".format(convertDuration(snapshot.get99thPercentile), getDurationUnit))
output.print(" 99.9%% <= %2.2f %s%n".format(convertDuration(snapshot.get999thPercentile), getDurationUnit))
}
private def printKnownOpsInTimespanCounter(counter: KnownOpsInTimespanTimer) {
import concurrent.duration._
import PrettyDuration._
output.print(" ops = %d%n".format(counter.getCount))
output.print(" time = %s%n".format(counter.elapsedTime.nanos.pretty))
output.print(" ops/s = %2.2f%n".format(counter.opsPerSecond))
output.print(" avg = %s%n".format(counter.avgDuration.nanos.pretty))
}
private def printHdrHistogram(hist: HdrHistogram) {
val data = hist.getData
val unit = hist.unit
output.print(" min = %d %s%n".format(data.getMinValue, unit))
output.print(" max = %d %s%n".format(data.getMaxValue, unit))
output.print(" mean = %2.2f %s%n".format(data.getMean, unit))
output.print(" stddev = %2.2f%n".format(data.getStdDeviation))
output.print(" 75%% <= %d %s%n".format(data.getValueAtPercentile(75.0), unit))
output.print(" 95%% <= %d %s%n".format(data.getValueAtPercentile(95.0), unit))
output.print(" 98%% <= %d %s%n".format(data.getValueAtPercentile(98.0), unit))
output.print(" 99%% <= %d %s%n".format(data.getValueAtPercentile(99.0), unit))
output.print(" 99.9%% <= %d %s%n".format(data.getValueAtPercentile(99.9), unit))
if (verbose)
data.outputPercentileDistribution(output, 1)
}
private def printAveragingGauge(gauge: AveragingGauge) {
output.print(" avg = %2.2f%n".format(gauge.getValue))
}
private def printWithBanner(s: String, c: Char) {
output.print(s)
output.print(' ')
var i: Int = 0
while (i < (ConsoleWidth - s.length - 1)) {
output.print(c)
i += 1
}
output.println()
}
/** Required for getting simple names of refined instances */
private def simpleName(clazz: Class[_]): String = {
val n = clazz.getName
val i = n.lastIndexOf('.')
n.substring(i + 1)
}
}

View file

@ -0,0 +1,190 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics.reporter
import java.io.IOException
import java.text.DateFormat
import java.util
import java.util.concurrent.TimeUnit
import com.codahale.metrics._
import java.util.{ Locale, Date }
import akka.testkit.metrics._
import com.codahale.metrics.graphite.Graphite
/**
* Used to report [[Metric]] types that the original [[com.codahale.metrics.graphite.GraphiteReporter]] is unaware of (cannot re-use directly because of private constructor).
*/
class AkkaGraphiteReporter(
registry: AkkaMetricRegistry,
prefix: String,
graphite: Graphite)
extends ScheduledReporter(registry.asInstanceOf[MetricRegistry], "akka-graphite-reporter", MetricsKit.KnownOpsInTimespanCounterFilter, TimeUnit.SECONDS, TimeUnit.NANOSECONDS) {
// todo get rid of ScheduledReporter (would mean removing codahale metrics)?
private final val ConsoleWidth = 80
val locale = Locale.getDefault
val dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM, locale)
val clock = Clock.defaultClock()
override def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
val dateTime = dateFormat.format(new Date(clock.getTime))
// akka-custom metrics
val knownOpsInTimespanCounters = registry.getKnownOpsInTimespanCounters
val hdrHistograms = registry.getHdrHistograms
val averagingGauges = registry.getAveragingGauges
val metricsCount = List(gauges, counters, histograms, meters, timers).map(_.size).sum + List(knownOpsInTimespanCounters, hdrHistograms).map(_.size).sum
sendWithBanner("== AkkaGraphiteReporter @ " + dateTime + " == (" + metricsCount + " metrics)", '=')
try {
graphite.connect()
// graphite takes timestamps in seconds
val now = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis)
// default Metrics types
import collection.JavaConverters._
sendMetrics(now, gauges.asScala, sendGauge)
sendMetrics(now, counters.asScala, sendCounter)
sendMetrics(now, histograms.asScala, sendHistogram)
sendMetrics(now, meters.asScala, sendMetered)
sendMetrics(now, timers.asScala, sendTimer)
sendMetrics(now, knownOpsInTimespanCounters, sendKnownOpsInTimespanCounter)
sendMetrics(now, hdrHistograms, sendHdrHistogram)
sendMetrics(now, averagingGauges, sendAveragingGauge)
} finally {
try {
graphite.close()
} catch {
case ex: IOException
System.err.println(s"Unable to close connection to graphite!")
}
}
}
def sendMetrics[T <: Metric](now: Long, metrics: Iterable[(String, T)], send: (Long, String, T) Unit) {
for ((key, metric) metrics) {
println(" " + key)
send(now, key, metric)
}
}
private def sendHistogram(now: Long, key: String, histogram: Histogram) {
val snapshot = histogram.getSnapshot
send(key + ".count", histogram.getCount, now)
send(key + ".max", snapshot.getMax, now)
send(key + ".mean", snapshot.getMean, now)
send(key + ".min", snapshot.getMin, now)
send(key + ".stddev", snapshot.getStdDev, now)
send(key + ".p50", snapshot.getMedian, now)
send(key + ".p75", snapshot.get75thPercentile, now)
send(key + ".p95", snapshot.get95thPercentile, now)
send(key + ".p98", snapshot.get98thPercentile, now)
send(key + ".p99", snapshot.get99thPercentile, now)
send(key + ".p999", snapshot.get999thPercentile, now)
}
private def sendTimer(now: Long, key: String, timer: Timer) {
val snapshot = timer.getSnapshot
send(key + ".max", convertDuration(snapshot.getMax), now)
send(key + ".mean", convertDuration(snapshot.getMean), now)
send(key + ".min", convertDuration(snapshot.getMin), now)
send(key + ".stddev", convertDuration(snapshot.getStdDev), now)
send(key + ".p50", convertDuration(snapshot.getMedian), now)
send(key + ".p75", convertDuration(snapshot.get75thPercentile), now)
send(key + ".p95", convertDuration(snapshot.get95thPercentile), now)
send(key + ".p98", convertDuration(snapshot.get98thPercentile), now)
send(key + ".p99", convertDuration(snapshot.get99thPercentile), now)
send(key + ".p999", convertDuration(snapshot.get999thPercentile), now)
sendMetered(now, key, timer)
}
private def sendMetered(now: Long, key: String, meter: Metered) {
send(key + ".count", meter.getCount, now)
send(key + ".m1_rate", convertRate(meter.getOneMinuteRate), now)
send(key + ".m5_rate", convertRate(meter.getFiveMinuteRate), now)
send(key + ".m15_rate", convertRate(meter.getFifteenMinuteRate), now)
send(key + ".mean_rate", convertRate(meter.getMeanRate), now)
}
private def sendGauge(now: Long, key: String, gauge: Gauge[_]) {
sendNumericOrIgnore(key + ".gauge", gauge.getValue, now)
}
private def sendCounter(now: Long, key: String, counter: Counter) {
sendNumericOrIgnore(key + ".count", counter.getCount, now)
}
private def sendKnownOpsInTimespanCounter(now: Long, key: String, counter: KnownOpsInTimespanTimer) {
send(key + ".ops", counter.getCount, now)
send(key + ".time", counter.elapsedTime, now)
send(key + ".opsPerSec", counter.opsPerSecond, now)
send(key + ".avg", counter.avgDuration, now)
}
private def sendHdrHistogram(now: Long, key: String, hist: HdrHistogram) {
val snapshot = hist.getData
send(key + ".min", snapshot.getMinValue, now)
send(key + ".max", snapshot.getMaxValue, now)
send(key + ".mean", snapshot.getMean, now)
send(key + ".stddev", snapshot.getStdDeviation, now)
send(key + ".p75", snapshot.getValueAtPercentile(75.0), now)
send(key + ".p95", snapshot.getValueAtPercentile(95.0), now)
send(key + ".p98", snapshot.getValueAtPercentile(98.0), now)
send(key + ".p99", snapshot.getValueAtPercentile(99.0), now)
send(key + ".p999", snapshot.getValueAtPercentile(99.9), now)
}
private def sendAveragingGauge(now: Long, key: String, gauge: AveragingGauge) {
sendNumericOrIgnore(key + ".avg-gauge", gauge.getValue, now)
}
override def stop(): Unit = try {
super.stop()
graphite.close()
} catch {
case ex: Exception System.err.println("Was unable to close Graphite connection: " + ex.getMessage)
}
private def sendNumericOrIgnore(key: String, value: Any, now: Long) {
// seriously nothing better than this? (without Any => String => Num)
value match {
case v: Int send(key, v, now)
case v: Long send(key, v, now)
case v: Byte send(key, v, now)
case v: Short send(key, v, now)
case v: Float send(key, v, now)
case v: Double send(key, v, now)
case _ // ignore non-numeric metric...
}
}
private def send(key: String, value: Double, now: Long) {
if (value >= 0)
graphite.send(s"$prefix.$key", "%2.2f".format(value), now)
}
private def send(key: String, value: Long, now: Long) {
if (value >= 0) //
graphite.send(s"$prefix.$key", value.toString, now)
}
private def sendWithBanner(s: String, c: Char) {
print(s)
print(' ')
var i: Int = 0
while (i < (ConsoleWidth - s.length - 1)) {
print(c)
i += 1
}
println()
}
}

View file

@ -0,0 +1,48 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics.reporter
import scala.concurrent.duration._
object PrettyDuration {
implicit class PrettyPrintableDuration(val d: Duration) extends AnyVal {
def pretty: String = pretty(includeNanos = false)
/** Selects most apropriate TimeUnit for given duration and formats it accordingly */
def pretty(includeNanos: Boolean, precision: Int = 2): String = {
require(precision > 0, "precision must be > 0")
val nanos = d.toNanos
val unit = chooseUnit(nanos)
val value = nanos.toDouble / NANOSECONDS.convert(1, unit)
s"%.${precision}g %s%s".format(value, abbreviate(unit), if (includeNanos) s" ($nanos ns)" else "")
}
def chooseUnit(nanos: Long): TimeUnit = {
val d = nanos.nanos
if (d.toDays > 0) DAYS
else if (d.toHours > 0) HOURS
else if (d.toMinutes > 0) MINUTES
else if (d.toSeconds > 0) SECONDS
else if (d.toMillis > 0) MILLISECONDS
else if (d.toMicros > 0) MICROSECONDS
else NANOSECONDS
}
def abbreviate(unit: TimeUnit): String = unit match {
case NANOSECONDS "ns"
case MICROSECONDS "μs"
case MILLISECONDS "ms"
case SECONDS "s"
case MINUTES "min"
case HOURS "h"
case DAYS "d"
}
}
}

View file

@ -0,0 +1,31 @@
/**
* Copyright (C) 2009-2014 Typesafe Inc. <http://www.typesafe.com>
*/
package akka.testkit.metrics.reporter
import org.scalatest.{ Matchers, FlatSpec }
class PrettyDurationSpec extends FlatSpec with Matchers {
behavior of "PrettyDuration"
import concurrent.duration._
import PrettyDuration._
val cases =
95.nanos -> "95 ns" ::
9500.nanos -> "9.5 μs" ::
9500.micros -> "9.5 ms" ::
9500.millis -> "9.5 s" ::
95.seconds -> "1.6 min" ::
95.minutes -> "1.6 h" ::
95.hours -> "4.0 d" ::
Nil
cases foreach {
case (d, prettyString)
it should s"print $d seconds as $prettyString" in {
d.pretty should equal(prettyString)
}
}
}

View file

@ -70,6 +70,14 @@ object Dependencies {
// mirrored in OSGi sample
val paxExam = "org.ops4j.pax.exam" % "pax-exam-junit4" % "2.6.0" % "test" // ApacheV2
val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.1" % "test"
// metrics, measurements, perf testing
val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.1" % "test" // ApacheV2
val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.1" % "test" // ApacheV2
val metricsGraphite = "com.codahale.metrics" % "metrics-graphite" % "3.0.1" % "test" // ApacheV2
val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD
val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0
val metricsAll = Seq(metrics, metricsJvm, metricsGraphite, latencyUtils, hdrHistogram)
}
}
@ -79,7 +87,7 @@ object Dependencies {
val actor = Seq(config)
val testkit = Seq(Test.junit, Test.scalatest)
val testkit = Seq(Test.junit, Test.scalatest) ++ Test.metricsAll
val actorTests = Seq(Test.junit, Test.scalatest, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck, protobuf, Test.junitIntf)

View file

@ -117,7 +117,7 @@ object TestExtras {
object StatsDMetrics {
val statsd = config("statsd") extend Test
val statsd = config("statsd")
val enabled = settingKey[Boolean]("Set to true when you want to send stats to statsd; Enable with `-Dakka.sbt.statsd=true`")
@ -128,7 +128,6 @@ object TestExtras {
val port = settingKey[Int]("Port on which statsd is listening, defaults to 8125")
val settings = Seq(
// configuration
enabled in statsd := sys.props("akka.sbt.statsd") == "true",
@ -202,9 +201,9 @@ object TestExtras {
private def testTimerKey(det: Event): String = s"${det.fullyQualifiedName}.${testSelectorToId(det.selector)}"
private def testSelectorToId(sel: testing.Selector): String = sel.asInstanceOf[TestSelector].testName().replaceAll("""[^\w]""", "_")
private def testSelectorToId(sel: testing.Selector): String = sanitize(sel.asInstanceOf[TestSelector].testName())
private def testCounterKey(det: Event, status: Status): String = s"${det.fullyQualifiedName}.${status.toString.toLowerCase}"
private def testCounterKey(det: Event, status: Status): String = s"${sanitize(det.fullyQualifiedName)}.${status.toString.toLowerCase}"
private def keySuccess(fullyQualifiedName: String): String = fullyQualifiedName + ".success"
@ -212,6 +211,8 @@ object TestExtras {
private def keyError(fullyQualifiedName: String): String = fullyQualifiedName + ".error"
private def sanitize(s: String): String = s.replaceAll("""[^\w]""", "_")
}
}

View file

@ -23,4 +23,5 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1")
addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
// stats reporting
libraryDependencies += "com.timgroup" % "java-statsd-client" % "2.0.0"