diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala new file mode 100644 index 0000000000..a1d9f8495a --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/actor/ActorCreationPerfSpec.scala @@ -0,0 +1,223 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.actor + +import scala.language.postfixOps + +import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec } +import scala.concurrent.duration._ +import akka.TestUtils +import akka.testkit.metrics._ +import org.scalatest.BeforeAndAfterAll +import java.io.PrintStream +import java.util.concurrent.TimeUnit +import akka.testkit.metrics.HeapMemoryUsage + +object ActorCreationPerfSpec { + + final case class Create(number: Int, props: () ⇒ Props) + case object Created + case object IsAlive + case object Alive + case object WaitForChildren + case object Waited + + class EmptyActor extends Actor { + def receive = { + case IsAlive ⇒ sender() ! Alive + } + } + + class EmptyArgsActor(val foo: Int, val bar: Int) extends Actor { + def receive = { + case IsAlive ⇒ sender() ! Alive + } + } + + class TimingDriver(hist: HdrHistogram) extends Actor { + + def receive = { + case IsAlive ⇒ + sender() ! Alive + case Create(number, propsCreator) ⇒ + for (i ← 1 to number) { + val s = System.nanoTime() + + context.actorOf(propsCreator.apply()) + + hist.update(System.nanoTime - s) + } + sender() ! Created + case WaitForChildren ⇒ + context.children.foreach(_ ! IsAlive) + context.become(waiting(context.children.size, sender()), discardOld = false) + } + + def waiting(number: Int, replyTo: ActorRef): Receive = { + var current = number + + { + case Alive ⇒ + current -= 1 + if (current == 0) { + replyTo ! Waited + context.unbecome() + } + } + } + } + + class Driver extends Actor { + + def receive = { + case IsAlive ⇒ + sender() ! Alive + case Create(number, propsCreator) ⇒ + for (i ← 1 to number) { + context.actorOf(propsCreator.apply()) + } + sender() ! Created + case WaitForChildren ⇒ + context.children.foreach(_ ! IsAlive) + context.become(waiting(context.children.size, sender()), discardOld = false) + } + + def waiting(number: Int, replyTo: ActorRef): Receive = { + var current = number + + { + case Alive ⇒ + current -= 1 + if (current == 0) { + replyTo ! Waited + context.unbecome() + } + } + } + } +} + +@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) +class ActorCreationPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender + with MetricsKit with BeforeAndAfterAll { + + import ActorCreationPerfSpec._ + + def metricsConfig = system.settings.config + val ActorCreationKey = MetricKey.fromString("actor-creation") + val BlockingTimeKey = ActorCreationKey / "synchronous-part" + val TotalTimeKey = ActorCreationKey / "total" + + val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000) + val nrOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000) + val nrOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 3) + + def runWithCounterInside(metricName: String, scenarioName: String, number: Int, propsCreator: () ⇒ Props) { + val hist = histogram(BlockingTimeKey / metricName, 100.millis.toNanos, 5, "ns") + + val driver = system.actorOf(Props(classOf[TimingDriver], hist), scenarioName) + driver ! IsAlive + expectMsg(Alive) + + driver ! Create(number, propsCreator) + expectMsgPF(15 seconds, s"$scenarioName waiting for Created") { case Created ⇒ } + + driver ! WaitForChildren + expectMsgPF(15 seconds, s"$scenarioName waiting for Waited") { case Waited ⇒ } + + driver ! PoisonPill + TestUtils.verifyActorTermination(driver, 15 seconds) + + gc() + } + + def runWithoutCounter(scenarioName: String, number: Int, propsCreator: () ⇒ Props): HeapMemoryUsage = { + val mem = measureMemory(TotalTimeKey / scenarioName) + + val driver = system.actorOf(Props(classOf[Driver]), scenarioName) + driver ! IsAlive + expectMsg(Alive) + + gc() + val before = mem.getHeapSnapshot + + driver ! Create(number, propsCreator) + expectMsgPF(15 seconds, s"$scenarioName waiting for Created") { case Created ⇒ } + + driver ! WaitForChildren + expectMsgPF(15 seconds, s"$scenarioName waiting for Waited") { case Waited ⇒ } + + gc() + val after = mem.getHeapSnapshot + + driver ! PoisonPill + TestUtils.verifyActorTermination(driver, 15 seconds) + + after diff before + } + + def registerTests(name: String, propsCreator: () ⇒ Props) { + val scenarioName = name.replaceAll("""[^\w]""", "") + + s"warm-up before: $name" taggedAs PerformanceTest in { + if (warmUp > 0) { + runWithoutCounter(s"${scenarioName}_warmup", warmUp, propsCreator) + } + + clearMetrics() + } + + s"measure synchronous blocked time for $name" taggedAs PerformanceTest in { + // note: measuring per-actor-memory-use in this scenario is skewed as the Actor contains references to counters etc! + // for measuring actor size use refer to the `runWithoutCounter` method + for (i ← 1 to nrOfRepeats) { + runWithCounterInside(name, s"${scenarioName}_driver_inside_$i", nrOfActors, propsCreator) + } + + reportAndClearMetrics() + } + + s"measure total creation time for $name" taggedAs PerformanceTest in { + val avgMem = averageGauge(ActorCreationKey / name / "avg-mem-per-actor") + + for (i ← 1 to nrOfRepeats) { + val heapUsed = timedWithKnownOps(TotalTimeKey / s"creating-$nrOfActors-actors" / name, ops = nrOfActors) { + runWithoutCounter(s"${scenarioName}_driver_outside_$i", nrOfActors, propsCreator) + } + + avgMem.add(heapUsed.used / nrOfActors) // average actor size, over nrOfRepeats + // time is handled by the histogram already + } + + reportAndClearMetrics() + } + } + + "Actor creation with actorOf" must { + + registerTests("Props[EmptyActor] with new Props", () ⇒ Props[EmptyActor]) + + val props1 = Props[EmptyActor] + registerTests("Props[EmptyActor] with same Props", () ⇒ props1) + + registerTests("Props(new EmptyActor) new", () ⇒ { Props(new EmptyActor) }) + + val props2 = Props(new EmptyActor) + registerTests("Props(new EmptyActor) same", () ⇒ { props2 }) + + registerTests("Props(classOf[EmptyArgsActor], ...) new", () ⇒ { Props(classOf[EmptyArgsActor], 4711, 1729) }) + + val props3 = Props(classOf[EmptyArgsActor], 4711, 1729) + registerTests("Props(classOf[EmptyArgsActor], ...) same", () ⇒ { props3 }) + + registerTests("Props(new EmptyArgsActor(...)) new", () ⇒ { Props(new EmptyArgsActor(4711, 1729)) }) + + val props4 = Props(new EmptyArgsActor(4711, 1729)) + registerTests("Props(new EmptyArgsActor(...)) same", () ⇒ { props4 }) + } + + override def afterTermination() = shutdownMetrics() + + override def expectedTestDuration = 2 minutes +} diff --git a/akka-actor-tests/src/test/scala/akka/actor/ActorPerfSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/ActorPerfSpec.scala deleted file mode 100644 index 9cc7f802b7..0000000000 --- a/akka-actor-tests/src/test/scala/akka/actor/ActorPerfSpec.scala +++ /dev/null @@ -1,150 +0,0 @@ -/** - * Copyright (C) 2009-2014 Typesafe Inc. - */ -package akka.actor - -import scala.language.postfixOps - -import akka.testkit.{ PerformanceTest, ImplicitSender, AkkaSpec } -import java.lang.management.ManagementFactory -import scala.concurrent.duration._ -import akka.TestUtils -import scala.util.Try - -object ActorPerfSpec { - - final case class Create(number: Int, props: () ⇒ Props) - case object Created - case object IsAlive - case object Alive - final case class WaitForChildren(number: Int) - case object Waited - - class EmptyActor extends Actor { - def receive = { - case IsAlive ⇒ sender() ! Alive - } - } - - class EmptyArgsActor(val foo: Int, val bar: Int) extends Actor { - def receive = { - case IsAlive ⇒ sender() ! Alive - } - } - - class Driver extends Actor { - - def receive = { - case IsAlive ⇒ - sender() ! Alive - case Create(number, propsCreator) ⇒ - for (i ← 1 to number) { - context.actorOf(propsCreator.apply()) - } - sender() ! Created - case WaitForChildren(number) ⇒ - context.children.foreach(_ ! IsAlive) - context.become(waiting(number, sender()), false) - } - - def waiting(number: Int, replyTo: ActorRef): Receive = { - var current = number - - { - case Alive ⇒ - current -= 1 - if (current == 0) { - replyTo ! Waited - context.unbecome() - } - } - } - } -} - -@org.junit.runner.RunWith(classOf[org.scalatest.junit.JUnitRunner]) -class ActorPerfSpec extends AkkaSpec("akka.actor.serialize-messages = off") with ImplicitSender { - - import ActorPerfSpec._ - - val warmUp: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.warmUp", 50000) - val numberOfActors: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfActors", 100000) - val numberOfRepeats: Int = Integer.getInteger("akka.test.actor.ActorPerfSpec.numberOfRepeats", 2) - - def testActorCreation(name: String, propsCreator: () ⇒ Props): Unit = { - val actorName = name.replaceAll("[ #\\?/!\\*%\\(\\)\\[\\]]", "_") - if (warmUp > 0) - measure(s"${actorName}_warmup", warmUp, propsCreator) - val results = for (i ← 1 to numberOfRepeats) yield measure(s"${actorName}_driver_$i", numberOfActors, propsCreator) - results.foreach { - case (duration, memory) ⇒ - val micros = duration.toMicros - val avgMicros = micros.toDouble / numberOfActors - val avgMemory = memory.toDouble / numberOfActors - println(s"$name Created $numberOfActors"); - println(s"In $micros us, avg: ${avgMicros}") - println(s"Footprint ${memory / 1024} KB, avg: ${avgMemory} B") - } - } - - def measure(name: String, number: Int, propsCreator: () ⇒ Props): (Duration, Long) = { - val memMx = ManagementFactory.getMemoryMXBean() - val driver = system.actorOf(Props[Driver], name) - driver ! IsAlive - expectMsg(Alive) - System.gc() - val memBefore = memMx.getHeapMemoryUsage - val start = System.nanoTime() - driver ! Create(number, propsCreator) - expectMsgPF(15 seconds, s"$name waiting for Created") { case Created ⇒ } - val stop = System.nanoTime() - val duration = Duration.fromNanos(stop - start) - driver ! WaitForChildren(number) - expectMsgPF(15 seconds, s"$name waiting for Waited") { case Waited ⇒ } - System.gc() - val memAfter = memMx.getHeapMemoryUsage - driver ! PoisonPill - TestUtils.verifyActorTermination(driver, 15 seconds) - (duration, memAfter.getUsed - memBefore.getUsed) - } - - "Actor creation with actorFor" must { - - "measure time for Props[EmptyActor] with new Props" taggedAs PerformanceTest in { - testActorCreation("Props[EmptyActor] new", () ⇒ { Props[EmptyActor] }) - } - - "measure time for Props[EmptyActor] with same Props" taggedAs PerformanceTest in { - val props = Props[EmptyActor] - testActorCreation("Props[EmptyActor] same", () ⇒ { props }) - } - - "measure time for Props(new EmptyActor) with new Props" taggedAs PerformanceTest in { - testActorCreation("Props(new EmptyActor) new", () ⇒ { Props(new EmptyActor) }) - } - - "measure time for Props(new EmptyActor) with same Props" taggedAs PerformanceTest in { - val props = Props(new EmptyActor) - testActorCreation("Props(new EmptyActor) same", () ⇒ { props }) - } - - "measure time for Props(classOf[EmptyArgsActor], ...) with new Props" taggedAs PerformanceTest in { - testActorCreation("Props(classOf[EmptyArgsActor], ...) new", () ⇒ { Props(classOf[EmptyArgsActor], 4711, 1729) }) - } - - "measure time for Props(classOf[EmptyArgsActor], ...) with same Props" taggedAs PerformanceTest in { - val props = Props(classOf[EmptyArgsActor], 4711, 1729) - testActorCreation("Props(classOf[EmptyArgsActor], ...) same", () ⇒ { props }) - } - - "measure time for Props(new EmptyArgsActor(...)) with new Props" taggedAs PerformanceTest in { - testActorCreation("Props(new EmptyArgsActor(...)) new", () ⇒ { Props(new EmptyArgsActor(4711, 1729)) }) - } - - "measure time for Props(new EmptyArgsActor(...)) with same Props" taggedAs PerformanceTest in { - val props = Props(new EmptyArgsActor(4711, 1729)) - testActorCreation("Props(new EmptyArgsActor(...)) same", () ⇒ { props }) - } - } - override def expectedTestDuration = 2 minutes -} diff --git a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala index 0dcc668992..c9822d5df5 100644 --- a/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala +++ b/akka-persistence/src/test/scala/akka/persistence/ProcessorChannelSpec.scala @@ -73,7 +73,7 @@ abstract class ProcessorChannelSpec(config: Config) extends AkkaSpec(config) wit private var processor: ActorRef = _ - override protected def beforeEach: Unit = { + override protected def beforeEach(): Unit = { super.beforeEach() setupTestProcessorData() processor = createTestProcessor() diff --git a/akka-testkit/src/test/java/akka/testkit/metrics/LongAdder.java b/akka-testkit/src/test/java/akka/testkit/metrics/LongAdder.java new file mode 100644 index 0000000000..23232d5933 --- /dev/null +++ b/akka-testkit/src/test/java/akka/testkit/metrics/LongAdder.java @@ -0,0 +1,197 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + * + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/LongAdder.java?revision=1.14&view=markup + */ + +package akka.testkit.metrics; + +import java.io.Serializable; +import java.util.concurrent.atomic.AtomicLong; + +// CHECKSTYLE:OFF +/** + * One or more variables that together maintain an initially zero {@code long} sum. When updates + * (method {@link #add}) are contended across threads, the set of variables may grow dynamically to + * reduce contention. Method {@link #sum} (or, equivalently, {@link #longValue}) returns the current + * total combined across the variables maintaining the sum. + *

+ *

This class is usually preferable to {@link AtomicLong} when multiple threads update a common + * sum that is used for purposes such as collecting statistics, not for fine-grained synchronization + * control. Under low update contention, the two classes have similar characteristics. But under + * high contention, expected throughput of this class is significantly higher, at the expense of + * higher space consumption. + *

+ *

This class extends {@link Number}, but does not define methods such as {@code + * equals}, {@code hashCode} and {@code compareTo} because instances are expected to be mutated, and + * so are not useful as collection keys. + *

+ *

jsr166e note: This class is targeted to be placed in java.util.concurrent.atomic. + * + * @author Doug Lea + * @since 1.8 + */ +@SuppressWarnings("all") +class LongAdder extends Striped64 implements Serializable { + private static final long serialVersionUID = 7249069246863182397L; + + /** + * Version of plus for use in retryUpdate + */ + final long fn(long v, long x) { + return v + x; + } + + /** + * Creates a new adder with initial sum of zero. + */ + LongAdder() { + } + + /** + * Adds the given value. + * + * @param x the value to add + */ + public void add(long x) { + Cell[] as; + long b, v; + HashCode hc; + Cell a; + int n; + if ((as = cells) != null || !casBase(b = base, b + x)) { + boolean uncontended = true; + int h = (hc = threadHashCode.get()).code; + if (as == null || (n = as.length) < 1 || + (a = as[(n - 1) & h]) == null || + !(uncontended = a.cas(v = a.value, v + x))) + retryUpdate(x, hc, uncontended); + } + } + + /** + * Equivalent to {@code add(1)}. + */ + public void increment() { + add(1L); + } + + /** + * Equivalent to {@code add(-1)}. + */ + public void decrement() { + add(-1L); + } + + /** + * Returns the current sum. The returned value is NOT an atomic snapshot; invocation + * in the absence of concurrent updates returns an accurate result, but concurrent updates that + * occur while the sum is being calculated might not be incorporated. + * + * @return the sum + */ + public long sum() { + long sum = base; + Cell[] as = cells; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + sum += a.value; + } + } + return sum; + } + + /** + * Resets variables maintaining the sum to zero. This method may be a useful alternative to + * creating a new adder, but is only effective if there are no concurrent updates. Because this + * method is intrinsically racy, it should only be used when it is known that no threads are + * concurrently updating. + */ + public void reset() { + internalReset(0L); + } + + /** + * Equivalent in effect to {@link #sum} followed by {@link #reset}. This method may apply for + * example during quiescent points between multithreaded computations. If there are updates + * concurrent with this method, the returned value is not guaranteed to be the final + * value occurring before the reset. + * + * @return the sum + */ + public long sumThenReset() { + long sum = base; + Cell[] as = cells; + base = 0L; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) { + sum += a.value; + a.value = 0L; + } + } + } + return sum; + } + + /** + * Returns the String representation of the {@link #sum}. + * + * @return the String representation of the {@link #sum} + */ + public String toString() { + return Long.toString(sum()); + } + + /** + * Equivalent to {@link #sum}. + * + * @return the sum + */ + public long longValue() { + return sum(); + } + + /** + * Returns the {@link #sum} as an {@code int} after a narrowing primitive conversion. + */ + public int intValue() { + return (int) sum(); + } + + /** + * Returns the {@link #sum} as a {@code float} after a widening primitive conversion. + */ + public float floatValue() { + return (float) sum(); + } + + /** + * Returns the {@link #sum} as a {@code double} after a widening primitive conversion. + */ + public double doubleValue() { + return (double) sum(); + } + + private void writeObject(java.io.ObjectOutputStream s) + throws java.io.IOException { + s.defaultWriteObject(); + s.writeLong(sum()); + } + + private void readObject(java.io.ObjectInputStream s) + throws java.io.IOException, ClassNotFoundException { + s.defaultReadObject(); + busy = 0; + cells = null; + base = s.readLong(); + } +} +// CHECKSTYLE:ON diff --git a/akka-testkit/src/test/java/akka/testkit/metrics/Striped64.java b/akka-testkit/src/test/java/akka/testkit/metrics/Striped64.java new file mode 100644 index 0000000000..3bf5e08c20 --- /dev/null +++ b/akka-testkit/src/test/java/akka/testkit/metrics/Striped64.java @@ -0,0 +1,354 @@ +/* + * Written by Doug Lea with assistance from members of JCP JSR-166 + * Expert Group and released to the public domain, as explained at + * http://creativecommons.org/publicdomain/zero/1.0/ + * + * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/src/jsr166e/Striped64.java?revision=1.8&view=markup + */ + +package akka.testkit.metrics; + +import java.util.Random; + +// CHECKSTYLE:OFF +/** + * A package-local class holding common representation and mechanics for classes supporting dynamic + * striping on 64bit values. The class extends Number so that concrete subclasses must publicly do + * so. + */ +@SuppressWarnings("all") +abstract class Striped64 extends Number { + /* + * This class maintains a lazily-initialized table of atomically + * updated variables, plus an extra "base" field. The table size + * is a power of two. Indexing uses masked per-thread hash codes. + * Nearly all declarations in this class are package-private, + * accessed directly by subclasses. + * + * Table entries are of class Cell; a variant of AtomicLong padded + * to reduce cache contention on most processors. Padding is + * overkill for most Atomics because they are usually irregularly + * scattered in memory and thus don't interfere much with each + * other. But Atomic objects residing in arrays will tend to be + * placed adjacent to each other, and so will most often share + * cache lines (with a huge negative performance impact) without + * this precaution. + * + * In part because Cells are relatively large, we avoid creating + * them until they are needed. When there is no contention, all + * updates are made to the base field. Upon first contention (a + * failed CAS on base update), the table is initialized to size 2. + * The table size is doubled upon further contention until + * reaching the nearest power of two greater than or equal to the + * number of CPUS. Table slots remain empty (null) until they are + * needed. + * + * A single spinlock ("busy") is used for initializing and + * resizing the table, as well as populating slots with new Cells. + * There is no need for a blocking lock; when the lock is not + * available, threads try other slots (or the base). During these + * retries, there is increased contention and reduced locality, + * which is still better than alternatives. + * + * Per-thread hash codes are initialized to random values. + * Contention and/or table collisions are indicated by failed + * CASes when performing an update operation (see method + * retryUpdate). Upon a collision, if the table size is less than + * the capacity, it is doubled in size unless some other thread + * holds the lock. If a hashed slot is empty, and lock is + * available, a new Cell is created. Otherwise, if the slot + * exists, a CAS is tried. Retries proceed by "double hashing", + * using a secondary hash (Marsaglia XorShift) to try to find a + * free slot. + * + * The table size is capped because, when there are more threads + * than CPUs, supposing that each thread were bound to a CPU, + * there would exist a perfect hash function mapping threads to + * slots that eliminates collisions. When we reach capacity, we + * search for this mapping by randomly varying the hash codes of + * colliding threads. Because search is random, and collisions + * only become known via CAS failures, convergence can be slow, + * and because threads are typically not bound to CPUS forever, + * may not occur at all. However, despite these limitations, + * observed contention rates are typically low in these cases. + * + * It is possible for a Cell to become unused when threads that + * once hashed to it terminate, as well as in the case where + * doubling the table causes no thread to hash to it under + * expanded mask. We do not try to detect or remove such cells, + * under the assumption that for long-running instances, observed + * contention levels will recur, so the cells will eventually be + * needed again; and for short-lived ones, it does not matter. + */ + + /** + * Padded variant of AtomicLong supporting only raw accesses plus CAS. The value field is placed + * between pads, hoping that the JVM doesn't reorder them. + *

+ * JVM intrinsics note: It would be possible to use a release-only form of CAS here, if it were + * provided. + */ + static final class Cell { + volatile long p0, p1, p2, p3, p4, p5, p6; + volatile long value; + volatile long q0, q1, q2, q3, q4, q5, q6; + + Cell(long x) { + value = x; + } + + final boolean cas(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long valueOffset; + + static { + try { + UNSAFE = getUnsafe(); + Class ak = Cell.class; + valueOffset = UNSAFE.objectFieldOffset + (ak.getDeclaredField("value")); + } catch (Exception e) { + throw new Error(e); + } + } + + } + + /** + * Holder for the thread-local hash code. The code is initially random, but may be set to a + * different value upon collisions. + */ + static final class HashCode { + static final Random rng = new Random(); + int code; + + HashCode() { + int h = rng.nextInt(); // Avoid zero to allow xorShift rehash + code = (h == 0) ? 1 : h; + } + } + + /** + * The corresponding ThreadLocal class + */ + static final class ThreadHashCode extends ThreadLocal { + public HashCode initialValue() { + return new HashCode(); + } + } + + /** + * Static per-thread hash codes. Shared across all instances to reduce ThreadLocal pollution and + * because adjustments due to collisions in one table are likely to be appropriate for others. + */ + static final ThreadHashCode threadHashCode = new ThreadHashCode(); + + /** + * Number of CPUS, to place bound on table size + */ + static final int NCPU = Runtime.getRuntime().availableProcessors(); + + /** + * Table of cells. When non-null, size is a power of 2. + */ + transient volatile Cell[] cells; + + /** + * Base value, used mainly when there is no contention, but also as a fallback during table + * initialization races. Updated via CAS. + */ + transient volatile long base; + + /** + * Spinlock (locked via CAS) used when resizing and/or creating Cells. + */ + transient volatile int busy; + + /** + * Package-private default constructor + */ + Striped64() { + } + + /** + * CASes the base field. + */ + final boolean casBase(long cmp, long val) { + return UNSAFE.compareAndSwapLong(this, baseOffset, cmp, val); + } + + /** + * CASes the busy field from 0 to 1 to acquire lock. + */ + final boolean casBusy() { + return UNSAFE.compareAndSwapInt(this, busyOffset, 0, 1); + } + + /** + * Computes the function of current and new value. Subclasses should open-code this update + * function for most uses, but the virtualized form is needed within retryUpdate. + * + * @param currentValue the current value (of either base or a cell) + * @param newValue the argument from a user update call + * @return result of the update function + */ + abstract long fn(long currentValue, long newValue); + + /** + * Handles cases of updates involving initialization, resizing, creating new Cells, and/or + * contention. See above for explanation. This method suffers the usual non-modularity problems + * of optimistic retry code, relying on rechecked sets of reads. + * + * @param x the value + * @param hc the hash code holder + * @param wasUncontended false if CAS failed before call + */ + final void retryUpdate(long x, HashCode hc, boolean wasUncontended) { + int h = hc.code; + boolean collide = false; // True if last slot nonempty + for (; ; ) { + Cell[] as; + Cell a; + int n; + long v; + if ((as = cells) != null && (n = as.length) > 0) { + if ((a = as[(n - 1) & h]) == null) { + if (busy == 0) { // Try to attach new Cell + Cell r = new Cell(x); // Optimistically create + if (busy == 0 && casBusy()) { + boolean created = false; + try { // Recheck under lock + Cell[] rs; + int m, j; + if ((rs = cells) != null && + (m = rs.length) > 0 && + rs[j = (m - 1) & h] == null) { + rs[j] = r; + created = true; + } + } finally { + busy = 0; + } + if (created) + break; + continue; // Slot is now non-empty + } + } + collide = false; + } else if (!wasUncontended) // CAS already known to fail + wasUncontended = true; // Continue after rehash + else if (a.cas(v = a.value, fn(v, x))) + break; + else if (n >= NCPU || cells != as) + collide = false; // At max size or stale + else if (!collide) + collide = true; + else if (busy == 0 && casBusy()) { + try { + if (cells == as) { // Expand table unless stale + Cell[] rs = new Cell[n << 1]; + for (int i = 0; i < n; ++i) + rs[i] = as[i]; + cells = rs; + } + } finally { + busy = 0; + } + collide = false; + continue; // Retry with expanded table + } + h ^= h << 13; // Rehash + h ^= h >>> 17; + h ^= h << 5; + } else if (busy == 0 && cells == as && casBusy()) { + boolean init = false; + try { // Initialize table + if (cells == as) { + Cell[] rs = new Cell[2]; + rs[h & 1] = new Cell(x); + cells = rs; + init = true; + } + } finally { + busy = 0; + } + if (init) + break; + } else if (casBase(v = base, fn(v, x))) + break; // Fall back on using base + } + hc.code = h; // Record index for next time + } + + + /** + * Sets base and all cells to the given value. + */ + final void internalReset(long initialValue) { + Cell[] as = cells; + base = initialValue; + if (as != null) { + int n = as.length; + for (int i = 0; i < n; ++i) { + Cell a = as[i]; + if (a != null) + a.value = initialValue; + } + } + } + + // Unsafe mechanics + private static final sun.misc.Unsafe UNSAFE; + private static final long baseOffset; + private static final long busyOffset; + + static { + try { + UNSAFE = getUnsafe(); + Class sk = Striped64.class; + baseOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("base")); + busyOffset = UNSAFE.objectFieldOffset + (sk.getDeclaredField("busy")); + } catch (Exception e) { + throw new Error(e); + } + } + + /** + * Returns a sun.misc.Unsafe. Suitable for use in a 3rd party package. Replace with a simple + * call to Unsafe.getUnsafe when integrating into a jdk. + * + * @return a sun.misc.Unsafe + */ + private static sun.misc.Unsafe getUnsafe() { + try { + return sun.misc.Unsafe.getUnsafe(); + } catch (SecurityException ignored) { + + } + try { + return java.security.AccessController.doPrivileged + (new java.security.PrivilegedExceptionAction() { + public sun.misc.Unsafe run() throws Exception { + Class k = sun.misc.Unsafe.class; + for (java.lang.reflect.Field f : k.getDeclaredFields()) { + f.setAccessible(true); + Object x = f.get(null); + if (k.isInstance(x)) + return k.cast(x); + } + throw new NoSuchFieldError("the Unsafe"); + } + }); + } catch (java.security.PrivilegedActionException e) { + throw new RuntimeException("Could not initialize intrinsics", + e.getCause()); + } + } +} +// CHECKSTYLE:ON diff --git a/akka-testkit/src/test/resources/reference.conf b/akka-testkit/src/test/resources/reference.conf new file mode 100644 index 0000000000..77fcefe089 --- /dev/null +++ b/akka-testkit/src/test/resources/reference.conf @@ -0,0 +1,31 @@ +akka { + # Configures MetricsKit + test.metrics { + + # Available reporters are: console, graphite + # In order to configure from the command line, use the alternative list syntax: + # -Dakka.test.metrics.reporters.0=console -Dakka.test.metrics.reporters.1=graphite + reporters = [console] + + reporter { + console { + # Automatically print metrics to the console at scheduled interval. + # To disable, set to `0`. + scheduled-report-interval = 0 ms + + # enable for very verbose / detailed printouts + verbose = false + } + + graphite { + prefix = "local" + host = "" + port = 2003 + + # Automatically print metrics to the console at scheduled interval. + # To disable, set to `0`. + scheduled-report-interval = 1 s + } + } + } +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/AveragingGauge.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/AveragingGauge.scala new file mode 100644 index 0000000000..b6f9337f40 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/AveragingGauge.scala @@ -0,0 +1,30 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import com.codahale.metrics.Gauge + +/** + * Gauge which exposes the Arithmetic Mean of values given to it. + * + * Can be used to expose average of a series of values to [[com.codahale.metrics.ScheduledReporter]]s. + */ +class AveragingGauge extends Gauge[Double] { + + private val sum = new LongAdder + private val count = new LongAdder + + def add(n: Long) { + count.increment() + sum add n + } + + def add(ns: Seq[Long]) { + // takes a mutable Seq on order to allow use with Array's + count add ns.length + sum add ns.sum + } + + override def getValue: Double = sum.sum().toDouble / count.sum() +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/FileDescriptorMetricSet.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/FileDescriptorMetricSet.scala new file mode 100644 index 0000000000..221d668db1 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/FileDescriptorMetricSet.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import java.util +import collection.JavaConverters._ +import java.lang.management.{ OperatingSystemMXBean, ManagementFactory } +import com.codahale.metrics.{ Gauge, Metric, MetricSet } +import com.codahale.metrics.MetricRegistry._ +import com.codahale.metrics.jvm.FileDescriptorRatioGauge + +/** + * MetricSet exposing number of open and maximum file descriptors used by the JVM process. + */ +private[akka] class FileDescriptorMetricSet(os: OperatingSystemMXBean = ManagementFactory.getOperatingSystemMXBean) extends MetricSet { + + override def getMetrics: util.Map[String, Metric] = { + Map[String, Metric]( + + name("file-descriptors", "open") -> new Gauge[Long] { + override def getValue: Long = invoke("getOpenFileDescriptorCount") + }, + + name("file-descriptors", "max") -> new Gauge[Long] { + override def getValue: Long = invoke("getMaxFileDescriptorCount") + }, + + name("file-descriptors", "ratio") -> new FileDescriptorRatioGauge(os)).asJava + } + + private def invoke(name: String): Long = { + val method = os.getClass.getDeclaredMethod(name) + method.setAccessible(true) + method.invoke(os).asInstanceOf[Long] + } +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala new file mode 100644 index 0000000000..dacc1d11c7 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/HdrHistogram.scala @@ -0,0 +1,43 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import com.codahale.metrics.{ Snapshot, Sampling, Metric } +import org.{ HdrHistogram ⇒ hdr } +import java.util.{ Arrays, Collections } +import java.lang.Math._ +import java.io.{ OutputStream, OutputStreamWriter, PrintWriter } + +/** + * Adapts Gil Tene's HdrHistogram to Metric's Metric interface. + * + * @param highestTrackableValue The highest value to be tracked by the histogram. Must be a positive + * integer that is >= 2. + * @param numberOfSignificantValueDigits The number of significant decimal digits to which the histogram will + * maintain value resolution and separation. Must be a non-negative + * integer between 0 and 5. + */ +private[akka] class HdrHistogram( + highestTrackableValue: Long, + numberOfSignificantValueDigits: Int, + val unit: String = "") + extends Metric { + + private val hist = new hdr.Histogram(highestTrackableValue, numberOfSignificantValueDigits) + + def update(value: Long) { + hist.recordValue(value) + } + + def updateWithExpectedInterval(value: Long, expectedIntervalBetweenValueSamples: Long) { + hist.recordValueWithExpectedInterval(value, expectedIntervalBetweenValueSamples) + } + + def updateWithCount(value: Long, count: Long) { + hist.recordValueWithCount(value, count) + } + + def getData = hist.copy().getHistogramData + +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/KnownOpsInTimespanTimer.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/KnownOpsInTimespanTimer.scala new file mode 100644 index 0000000000..82221161c9 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/KnownOpsInTimespanTimer.scala @@ -0,0 +1,37 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import com.codahale.metrics._ +import java.util.concurrent.atomic.AtomicLong +import java.util.concurrent.TimeUnit._ + +/** + * Specialised "one-shot" Timer. + * Given a known number of operations performed within a time span (to be measured) it displays the average time one operation took. + * + * Please note that this is a *very coarse* estimation; The gain though is that we do not have to perform the counting inside of the measured thing (we can adding in tight loops). + */ +class KnownOpsInTimespanTimer(expectedOps: Long) extends Metric with Counting { + + val startTime = System.nanoTime + val stopTime = new AtomicLong(0) + + /** + * Stops the Timer. + * Can be called multiple times, though only the first call will be taken into account. + * + * @return true if this was the first call to `stop`, false otherwise + */ + def stop(): Boolean = stopTime.compareAndSet(0, System.nanoTime) + + override def getCount: Long = expectedOps + + def elapsedTime: Long = stopTime.get - startTime + + def avgDuration: Long = (elapsedTime.toDouble / expectedOps).toLong + + def opsPerSecond: Double = expectedOps.toDouble / (elapsedTime.toDouble / NANOSECONDS.convert(1, SECONDS)) + +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MemoryUsageSnapshotting.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MemoryUsageSnapshotting.scala new file mode 100644 index 0000000000..284db38ddd --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MemoryUsageSnapshotting.scala @@ -0,0 +1,78 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import com.codahale.metrics._ +import com.codahale.metrics.jvm + +private[akka] trait MemoryUsageSnapshotting extends MetricsPrefix { + this: jvm.MemoryUsageGaugeSet ⇒ + + // accessing metrics in order to not to duplicate mxBean access too much + + def getHeapSnapshot = { + val metrics = getMetrics + HeapMemoryUsage( + metrics.get(key("heap-init")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("heap-used")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("heap-max")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("heap-committed")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("heap-usage")).asInstanceOf[RatioGauge].getValue) + } + + def getTotalSnapshot = { + val metrics = getMetrics + TotalMemoryUsage( + metrics.get(key("total-init")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("total-used")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("total-max")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("total-committed")).asInstanceOf[Gauge[Long]].getValue) + } + + def getNonHeapSnapshot = { + val metrics = getMetrics + NonHeapMemoryUsage( + metrics.get(key("non-heap-init")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("non-heap-used")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("non-heap-max")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("non-heap-committed")).asInstanceOf[Gauge[Long]].getValue, + metrics.get(key("non-heap-usage")).asInstanceOf[RatioGauge].getValue) + } + + private def key(k: String) = prefix + "." + k + +} + +private[akka] case class TotalMemoryUsage(init: Long, used: Long, max: Long, comitted: Long) { + + def diff(other: TotalMemoryUsage): TotalMemoryUsage = + TotalMemoryUsage( + this.init - other.init, + this.used - other.used, + this.max - other.max, + this.comitted - other.comitted) + +} + +private[akka] case class HeapMemoryUsage(init: Long, used: Long, max: Long, comitted: Long, usage: Double) { + + def diff(other: HeapMemoryUsage): HeapMemoryUsage = + HeapMemoryUsage( + this.init - other.init, + this.used - other.used, + this.max - other.max, + this.comitted - other.comitted, + this.usage - other.usage) +} + +private[akka] case class NonHeapMemoryUsage(init: Long, used: Long, max: Long, comitted: Long, usage: Double) { + + def diff(other: NonHeapMemoryUsage): NonHeapMemoryUsage = + NonHeapMemoryUsage( + this.init - other.init, + this.used - other.used, + this.max - other.max, + this.comitted - other.comitted, + this.usage - other.usage) +} \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricKeyDSL.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricKeyDSL.scala new file mode 100644 index 0000000000..0e1227a380 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricKeyDSL.scala @@ -0,0 +1,32 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +trait MetricKeyDSL { + + case class MetricKey private[MetricKeyDSL] (path: String) { + + import MetricKey._ + + def /(key: String): MetricKey = MetricKey(path + "." + sanitizeMetricKeyPart(key)) + + override def toString = path + } + + object MetricKey { + def fromString(root: String) = MetricKey(sanitizeMetricKeyPart(root)) + + // todo not sure what else needs replacing, while keeping key as readable as can be + private def sanitizeMetricKeyPart(keyPart: String) = + keyPart + .replaceAll("""\.\.\.""", "\u2026") // ... => … + .replaceAll("""\.""", "-") + .replaceAll("""[\]\[\(\)\<\>]""", "|") + .replaceAll(" ", "-") + .replaceAll("/", "-") + } + +} + +object MetricKeyDSL extends MetricKeyDSL \ No newline at end of file diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala new file mode 100644 index 0000000000..ed0b1bc663 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala @@ -0,0 +1,232 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import com.codahale.metrics._ +import com.codahale.metrics.graphite.Graphite // todo impl our own +import java.net.InetSocketAddress +import java.util.concurrent.TimeUnit +import scala.concurrent.duration._ +import com.typesafe.config.Config +import java.util +import scala.util.matching.Regex +import scala.collection.mutable +import akka.testkit.metrics.reporter.{ AkkaGraphiteReporter, AkkaConsoleReporter } +import org.scalatest.Notifying +import scala.reflect.ClassTag + +/** + * Allows to easily measure performance / memory / file descriptor use in tests. + * + * WARNING: This trait should not be seen as utility for micro-benchmarking, + * please refer to JMH if that's what you're writing. + * This trait instead aims to give an high level overview as well as data for trend-analysis of long running tests. + * + * Reporting defaults to [[ConsoleReporter]]. + * In order to send registry to Graphite run sbt with the following property: `-Dakka.registry.reporting.0=graphite`. + */ +private[akka] trait MetricsKit extends MetricsKitOps { + this: Notifying ⇒ + + import MetricsKit._ + import collection.JavaConverters._ + + private var reporters: List[ScheduledReporter] = Nil + + /** + * A configuration containing [[MetricsKitSettings]] under the key `akka.test.registry` must be provided. + * This can be the ActorSystems config. + * + * The reason this is not handled by an Extension is thatwe do not want to enforce having to start an ActorSystem, + * since code measured using this Kit may not need one (e.g. measuring plain Queue implementations). + */ + def metricsConfig: Config + + private[metrics] val registry = new MetricRegistry() with AkkaMetricRegistry + + initMetricReporters() + + def initMetricReporters() { + val settings = new MetricsKitSettings(metricsConfig) + + def configureConsoleReporter() { + if (settings.Reporters.contains("console")) { + val akkaConsoleReporter = new AkkaConsoleReporter(registry, settings.ConsoleReporter.Verbose) + + if (settings.ConsoleReporter.ScheduledReportInterval > Duration.Zero) + akkaConsoleReporter.start(settings.ConsoleReporter.ScheduledReportInterval.toMillis, TimeUnit.MILLISECONDS) + + reporters ::= akkaConsoleReporter + } + } + + def configureGraphiteReporter() { + if (settings.Reporters.contains("graphite")) { + note(s"MetricsKit: Graphite reporter enabled, sending metrics to: ${settings.GraphiteReporter.Host}:${settings.GraphiteReporter.Port}") + val graphite = new Graphite(new InetSocketAddress(settings.GraphiteReporter.Host, settings.GraphiteReporter.Port)) + val akkaGraphiteReporter = new AkkaGraphiteReporter(registry, settings.GraphiteReporter.Prefix, graphite) + + if (settings.GraphiteReporter.ScheduledReportInterval > Duration.Zero) { + akkaGraphiteReporter.start(settings.GraphiteReporter.ScheduledReportInterval.toMillis, TimeUnit.MILLISECONDS) + } + + reporters ::= akkaGraphiteReporter + } + } + + configureConsoleReporter() + configureGraphiteReporter() + } + + /** + * Schedule metric reports execution iterval. Should not be used multiple times + */ + def scheduleMetricReports(every: FiniteDuration) { + reporters foreach { _.start(every.toMillis, TimeUnit.MILLISECONDS) } + } + + def registeredMetrics = registry.getMetrics.asScala + + /** + * Causes immediate flush of metrics, using all registered reporters. + * Afterwards all metrics are removed from the registry. + * + * HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting. + */ + def reportAndClearMetrics() { + reporters foreach { _.report() } + clearMetrics() + } + + /** + * Causes immediate flush of metrics, using all registered reporters. + * + * HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting. + */ + def reportMetrics() { + reporters foreach { _.report() } + } + + /** + * Causes immediate flush of only memory related metrics, using all registered reporters. + * + * HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting. + */ + def reportMemoryMetrics() { + val gauges = registry.getGauges(MemMetricsFilter) + + reporters foreach { _.report(gauges, empty, empty, empty, empty) } + } + + /** + * Causes immediate flush of only memory related metrics, using all registered reporters. + * + * HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting. + */ + def reportGcMetrics() { + val gauges = registry.getGauges(GcMetricsFilter) + + reporters foreach { _.report(gauges, empty, empty, empty, empty) } + } + + /** + * Causes immediate flush of only file descriptor metrics, using all registered reporters. + * + * HINT: this operation can be costy, run outside of your tested code, or rely on scheduled reporting. + */ + def reportFileDescriptorMetrics() { + val gauges = registry.getGauges(FileDescriptorMetricsFilter) + + reporters foreach { _.report(gauges, empty, empty, empty, empty) } + } + + /** + * Removes registered registry from registry. + * You should call this method then you're done measuring something - usually at the end of your test case, + * otherwise the registry from different tests would influence each others results (avg, min, max, ...). + * + * Please note that, if you have registered a `timer("thing")` previously, you will need to call `timer("thing")` again, + * in order to register a new timer. + */ + def clearMetrics(matching: MetricFilter = MetricFilter.ALL) { + registry.removeMatching(matching) + } + + /** + * MUST be called after all tests have finished. + */ + def shutdownMetrics() { + reporters foreach { _.stop() } + } + + private[metrics] def getOrRegister[M <: Metric](key: String, metric: ⇒ M)(implicit tag: ClassTag[M]): M = { + import collection.JavaConverters._ + registry.getMetrics.asScala.find(_._1 == key).map(_._2) match { + case Some(existing: M) ⇒ existing + case Some(existing) ⇒ throw new IllegalArgumentException("Key: [%s] is already for different kind of metric! Was [%s], expected [%s]".format(key, metric.getClass.getSimpleName, tag.runtimeClass.getSimpleName)) + case _ ⇒ registry.register(key, metric) + } + } + + private val emptySortedMap = new util.TreeMap[String, Nothing]() + private def empty[T] = emptySortedMap.asInstanceOf[util.TreeMap[String, T]] +} + +private[akka] object MetricsKit { + + class RegexMetricFilter(regex: Regex) extends MetricFilter { + override def matches(name: String, metric: Metric) = regex.pattern.matcher(name).matches() + } + + val MemMetricsFilter = new RegexMetricFilter(""".*\.mem\..*""".r) + + val FileDescriptorMetricsFilter = new RegexMetricFilter(""".*\.file-descriptors\..*""".r) + + val KnownOpsInTimespanCounterFilter = new MetricFilter { + override def matches(name: String, metric: Metric) = classOf[KnownOpsInTimespanTimer].isInstance(metric) + } + + val GcMetricsFilter = new MetricFilter { + val keyPattern = """.*\.gc\..*""".r.pattern + + override def matches(name: String, metric: Metric) = keyPattern.matcher(name).matches() + } +} + +/** Provides access to custom Akka [[Metric]]s, with named methods. */ +trait AkkaMetricRegistry { + this: MetricRegistry ⇒ + + def getKnownOpsInTimespanCounters = filterFor(classOf[KnownOpsInTimespanTimer]) + def getHdrHistograms = filterFor(classOf[HdrHistogram]) + def getAveragingGauges = filterFor(classOf[AveragingGauge]) + + import collection.JavaConverters._ + private def filterFor[T](clazz: Class[T]): mutable.Iterable[(String, T)] = + for { + (key, metric) ← getMetrics.asScala + if clazz.isInstance(metric) + } yield key -> metric.asInstanceOf[T] +} + +private[akka] class MetricsKitSettings(config: Config) { + + import akka.util.Helpers._ + + val Reporters = config.getStringList("akka.test.metrics.reporters") + + object GraphiteReporter { + val Prefix = config.getString("akka.test.metrics.reporter.graphite.prefix") + lazy val Host = config.getString("akka.test.metrics.reporter.graphite.host").requiring(v ⇒ !v.trim.isEmpty, "akka.test.metrics.reporter.graphite.host was used but was empty!") + val Port = config.getInt("akka.test.metrics.reporter.graphite.port") + + val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.graphite.scheduled-report-interval") + } + + object ConsoleReporter { + val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.console.scheduled-report-interval") + val Verbose = config.getBoolean("akka.test.metrics.reporter.console.verbose") + } + +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala new file mode 100644 index 0000000000..f41657d532 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitOps.scala @@ -0,0 +1,87 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import com.codahale.metrics +import com.codahale.metrics._ +import java.util +import com.codahale.metrics.jvm +import com.codahale.metrics.jvm.MemoryUsageGaugeSet + +/** + * User Land operations provided by the [[MetricsKit]]. + * + * Extracted to give easy overview of user-API detached from MetricsKit internals. + */ +private[akka] trait MetricsKitOps extends MetricKeyDSL { + this: MetricsKit ⇒ + + type MetricKey = MetricKeyDSL#MetricKey + + /** Simple thread-safe counter, backed by [[LongAdder]] so can pretty efficiently work even when hit by multiple threads */ + def counter(key: MetricKey): Counter = registry.counter(key.toString) + + /** Simple averaging Gauge, which exposes an arithmetic mean of the values added to it. */ + def averageGauge(key: MetricKey): AveragingGauge = getOrRegister(key.toString, new AveragingGauge) + + /** + * Used to measure timing of known number of operations over time. + * While not being the most percise, it allows to measure a coarse op/s without injecting counters to the measured operation (potentially hot-loop). + * + * Do not use for short running pieces of code. + */ + def timedWithKnownOps[T](key: MetricKey, ops: Long)(run: ⇒ T): T = { + val c = getOrRegister(key.toString, new KnownOpsInTimespanTimer(expectedOps = ops)) + try run finally c.stop() + } + + /** + * Use when measuring for 9x'th percentiles as well as min / max / mean values. + * + * Backed by [[HdrHistogram]]. + * + * @param unitString just for human readable output, during console printing + */ + def histogram(key: MetricKey, highestTrackableValue: Long, numberOfSignificantValueDigits: Int, unitString: String = ""): HdrHistogram = + getOrRegister((key / "hdr-histogram").toString, new HdrHistogram(highestTrackableValue, numberOfSignificantValueDigits, unitString)) + + /** Yet another delegate to `System.gc()` */ + def gc() { + System.gc() + } + + /** + * Enable memory measurements - will be logged by [[ScheduledReporter]]s if enabled. + * Must not be triggered multiple times - pass around the `MemoryUsageSnapshotting` if you need to measure different points. + * + * Also allows to [[MemoryUsageSnapshotting.getHeapSnapshot]] to obtain memory usage numbers at given point in time. + */ + def measureMemory(key: MetricKey): MemoryUsageGaugeSet with MemoryUsageSnapshotting = { + val gaugeSet = new jvm.MemoryUsageGaugeSet() with MemoryUsageSnapshotting { + val prefix = key / "mem" + } + + registry.registerAll(gaugeSet) + gaugeSet + } + + /** Enable GC measurements */ + def measureGc(key: MetricKey) = + registry.registerAll(new jvm.GarbageCollectorMetricSet() with MetricsPrefix { val prefix = key / "gc" }) + + /** Enable File Descriptor measurements */ + def measureFileDescriptors(key: MetricKey) = + registry.registerAll(new FileDescriptorMetricSet() with MetricsPrefix { val prefix = key / "file-descriptors" }) + +} + +private[metrics] trait MetricsPrefix extends MetricSet { + def prefix: MetricKeyDSL#MetricKey + + abstract override def getMetrics: util.Map[String, Metric] = { + // does not have to be fast, is only called once during registering registry + import collection.JavaConverters._ + (super.getMetrics.asScala.map { case (k, v) ⇒ (prefix / k).toString -> v }).asJava + } +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitSpec.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitSpec.scala new file mode 100644 index 0000000000..49600ee009 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKitSpec.scala @@ -0,0 +1,50 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics + +import org.scalatest.{ BeforeAndAfterAll, BeforeAndAfter, MustMatchers, WordSpec } +import com.typesafe.config.ConfigFactory + +class MetricsKitSpec extends WordSpec with MustMatchers with BeforeAndAfter with BeforeAndAfterAll + with MetricsKit { + + override def metricsConfig = ConfigFactory.load() + + val KitKey = MetricKey.fromString("metrics-kit") + + after { + clearMetrics() + } + + override def afterAll() { + shutdownMetrics() + } + + "MetricsKit" must { + + "allow measuring file descriptor usage" in { + measureFileDescriptors(KitKey / "file-desc") + + registeredMetrics.count(_._1 contains "file-descriptor") must be > 0 + } + + "allow to measure time, on known number of operations" in { + timedWithKnownOps(KitKey, ops = 10) { + 2 + 2 + } + } + + "allow to measure average value using Gauge, given multiple values" in { + val sizes = List(1L, 2L, 3L) + + val avg = averageGauge(KitKey / "avg-size") + + avg.add(sizes) + avg.add(4) + + avg.getValue must equal(2.5) + } + } + +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaConsoleReporter.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaConsoleReporter.scala new file mode 100644 index 0000000000..30ce8e6efe --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaConsoleReporter.scala @@ -0,0 +1,153 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics.reporter + +import java.io.PrintStream +import java.util +import java.util.concurrent.TimeUnit +import com.codahale.metrics._ +import akka.testkit.metrics._ +import scala.reflect.ClassTag + +/** + * Used to report [[Metric]] types that the original [[ConsoleReporter]] is unaware of (cannot re-use directly because of private constructor). + */ +class AkkaConsoleReporter( + registry: AkkaMetricRegistry, + verbose: Boolean, + output: PrintStream = System.out) + extends ScheduledReporter(registry.asInstanceOf[MetricRegistry], "akka-console-reporter", MetricsKit.KnownOpsInTimespanCounterFilter, TimeUnit.SECONDS, TimeUnit.NANOSECONDS) { + + private final val ConsoleWidth = 80 + + override def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + import collection.JavaConverters._ + + // default Metrics types + printMetrics(gauges.asScala, printGauge) + printMetrics(counters.asScala, printCounter) + printMetrics(histograms.asScala, printHistogram) + printMetrics(meters.asScala, printMeter) + printMetrics(timers.asScala, printTimer) + + // custom Akka types + printMetrics(registry.getKnownOpsInTimespanCounters, printKnownOpsInTimespanCounter) + printMetrics(registry.getHdrHistograms, printHdrHistogram) + printMetrics(registry.getAveragingGauges, printAveragingGauge) + + output.println() + output.flush() + } + + def printMetrics[T <: Metric](metrics: Iterable[(String, T)], printer: T ⇒ Unit)(implicit clazz: ClassTag[T]) { + if (!metrics.isEmpty) { + printWithBanner(s"-- ${simpleName(metrics.head._2.getClass)}", '-') + for ((key, metric) ← metrics) { + output.println(" " + key) + printer(metric) + } + output.println() + } + } + + private def printMeter(meter: Meter) { + output.print(" count = %d%n".format(meter.getCount)) + output.print(" mean rate = %2.2f events/%s%n".format(convertRate(meter.getMeanRate), getRateUnit)) + output.print(" 1-minute rate = %2.2f events/%s%n".format(convertRate(meter.getOneMinuteRate), getRateUnit)) + output.print(" 5-minute rate = %2.2f events/%s%n".format(convertRate(meter.getFiveMinuteRate), getRateUnit)) + output.print(" 15-minute rate = %2.2f events/%s%n".format(convertRate(meter.getFifteenMinuteRate), getRateUnit)) + } + + private def printCounter(entry: Counter) { + output.print(" count = %d%n".format(entry.getCount)) + } + + private def printGauge(entry: Gauge[_]) { + output.print(" value = %s%n".format(entry.getValue)) + } + + private def printHistogram(histogram: Histogram) { + val snapshot = histogram.getSnapshot + output.print(" count = %d%n".format(histogram.getCount)) + output.print(" min = %d%n".format(snapshot.getMin)) + output.print(" max = %d%n".format(snapshot.getMax)) + output.print(" mean = %2.2f%n".format(snapshot.getMean)) + output.print(" stddev = %2.2f%n".format(snapshot.getStdDev)) + output.print(" median = %2.2f%n".format(snapshot.getMedian)) + output.print(" 75%% <= %2.2f%n".format(snapshot.get75thPercentile)) + output.print(" 95%% <= %2.2f%n".format(snapshot.get95thPercentile)) + output.print(" 98%% <= %2.2f%n".format(snapshot.get98thPercentile)) + output.print(" 99%% <= %2.2f%n".format(snapshot.get99thPercentile)) + output.print(" 99.9%% <= %2.2f%n".format(snapshot.get999thPercentile)) + } + + private def printTimer(timer: Timer) { + val snapshot = timer.getSnapshot + output.print(" count = %d%n".format(timer.getCount)) + output.print(" mean rate = %2.2f calls/%s%n".format(convertRate(timer.getMeanRate), getRateUnit)) + output.print(" 1-minute rate = %2.2f calls/%s%n".format(convertRate(timer.getOneMinuteRate), getRateUnit)) + output.print(" 5-minute rate = %2.2f calls/%s%n".format(convertRate(timer.getFiveMinuteRate), getRateUnit)) + output.print(" 15-minute rate = %2.2f calls/%s%n".format(convertRate(timer.getFifteenMinuteRate), getRateUnit)) + output.print(" min = %2.2f %s%n".format(convertDuration(snapshot.getMin), getDurationUnit)) + output.print(" max = %2.2f %s%n".format(convertDuration(snapshot.getMax), getDurationUnit)) + output.print(" mean = %2.2f %s%n".format(convertDuration(snapshot.getMean), getDurationUnit)) + output.print(" stddev = %2.2f %s%n".format(convertDuration(snapshot.getStdDev), getDurationUnit)) + output.print(" median = %2.2f %s%n".format(convertDuration(snapshot.getMedian), getDurationUnit)) + output.print(" 75%% <= %2.2f %s%n".format(convertDuration(snapshot.get75thPercentile), getDurationUnit)) + output.print(" 95%% <= %2.2f %s%n".format(convertDuration(snapshot.get95thPercentile), getDurationUnit)) + output.print(" 98%% <= %2.2f %s%n".format(convertDuration(snapshot.get98thPercentile), getDurationUnit)) + output.print(" 99%% <= %2.2f %s%n".format(convertDuration(snapshot.get99thPercentile), getDurationUnit)) + output.print(" 99.9%% <= %2.2f %s%n".format(convertDuration(snapshot.get999thPercentile), getDurationUnit)) + } + + private def printKnownOpsInTimespanCounter(counter: KnownOpsInTimespanTimer) { + import concurrent.duration._ + import PrettyDuration._ + output.print(" ops = %d%n".format(counter.getCount)) + output.print(" time = %s%n".format(counter.elapsedTime.nanos.pretty)) + output.print(" ops/s = %2.2f%n".format(counter.opsPerSecond)) + output.print(" avg = %s%n".format(counter.avgDuration.nanos.pretty)) + } + + private def printHdrHistogram(hist: HdrHistogram) { + val data = hist.getData + val unit = hist.unit + output.print(" min = %d %s%n".format(data.getMinValue, unit)) + output.print(" max = %d %s%n".format(data.getMaxValue, unit)) + output.print(" mean = %2.2f %s%n".format(data.getMean, unit)) + output.print(" stddev = %2.2f%n".format(data.getStdDeviation)) + output.print(" 75%% <= %d %s%n".format(data.getValueAtPercentile(75.0), unit)) + output.print(" 95%% <= %d %s%n".format(data.getValueAtPercentile(95.0), unit)) + output.print(" 98%% <= %d %s%n".format(data.getValueAtPercentile(98.0), unit)) + output.print(" 99%% <= %d %s%n".format(data.getValueAtPercentile(99.0), unit)) + output.print(" 99.9%% <= %d %s%n".format(data.getValueAtPercentile(99.9), unit)) + + if (verbose) + data.outputPercentileDistribution(output, 1) + } + + private def printAveragingGauge(gauge: AveragingGauge) { + output.print(" avg = %2.2f%n".format(gauge.getValue)) + } + + private def printWithBanner(s: String, c: Char) { + output.print(s) + output.print(' ') + var i: Int = 0 + while (i < (ConsoleWidth - s.length - 1)) { + output.print(c) + i += 1 + } + output.println() + } + + /** Required for getting simple names of refined instances */ + private def simpleName(clazz: Class[_]): String = { + val n = clazz.getName + val i = n.lastIndexOf('.') + n.substring(i + 1) + } + +} + diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaGraphiteReporter.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaGraphiteReporter.scala new file mode 100644 index 0000000000..7417be8c25 --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaGraphiteReporter.scala @@ -0,0 +1,190 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics.reporter + +import java.io.IOException +import java.text.DateFormat +import java.util +import java.util.concurrent.TimeUnit +import com.codahale.metrics._ +import java.util.{ Locale, Date } +import akka.testkit.metrics._ +import com.codahale.metrics.graphite.Graphite + +/** + * Used to report [[Metric]] types that the original [[com.codahale.metrics.graphite.GraphiteReporter]] is unaware of (cannot re-use directly because of private constructor). + */ +class AkkaGraphiteReporter( + registry: AkkaMetricRegistry, + prefix: String, + graphite: Graphite) + extends ScheduledReporter(registry.asInstanceOf[MetricRegistry], "akka-graphite-reporter", MetricsKit.KnownOpsInTimespanCounterFilter, TimeUnit.SECONDS, TimeUnit.NANOSECONDS) { + + // todo get rid of ScheduledReporter (would mean removing codahale metrics)? + + private final val ConsoleWidth = 80 + + val locale = Locale.getDefault + val dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM, locale) + val clock = Clock.defaultClock() + + override def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { + val dateTime = dateFormat.format(new Date(clock.getTime)) + + // akka-custom metrics + val knownOpsInTimespanCounters = registry.getKnownOpsInTimespanCounters + val hdrHistograms = registry.getHdrHistograms + val averagingGauges = registry.getAveragingGauges + + val metricsCount = List(gauges, counters, histograms, meters, timers).map(_.size).sum + List(knownOpsInTimespanCounters, hdrHistograms).map(_.size).sum + sendWithBanner("== AkkaGraphiteReporter @ " + dateTime + " == (" + metricsCount + " metrics)", '=') + + try { + graphite.connect() + + // graphite takes timestamps in seconds + val now = TimeUnit.MILLISECONDS.toSeconds(System.currentTimeMillis) + + // default Metrics types + import collection.JavaConverters._ + sendMetrics(now, gauges.asScala, sendGauge) + sendMetrics(now, counters.asScala, sendCounter) + sendMetrics(now, histograms.asScala, sendHistogram) + sendMetrics(now, meters.asScala, sendMetered) + sendMetrics(now, timers.asScala, sendTimer) + + sendMetrics(now, knownOpsInTimespanCounters, sendKnownOpsInTimespanCounter) + sendMetrics(now, hdrHistograms, sendHdrHistogram) + sendMetrics(now, averagingGauges, sendAveragingGauge) + + } finally { + try { + graphite.close() + } catch { + case ex: IOException ⇒ + System.err.println(s"Unable to close connection to graphite!") + } + } + } + + def sendMetrics[T <: Metric](now: Long, metrics: Iterable[(String, T)], send: (Long, String, T) ⇒ Unit) { + for ((key, metric) ← metrics) { + println(" " + key) + send(now, key, metric) + } + } + + private def sendHistogram(now: Long, key: String, histogram: Histogram) { + val snapshot = histogram.getSnapshot + send(key + ".count", histogram.getCount, now) + send(key + ".max", snapshot.getMax, now) + send(key + ".mean", snapshot.getMean, now) + send(key + ".min", snapshot.getMin, now) + send(key + ".stddev", snapshot.getStdDev, now) + send(key + ".p50", snapshot.getMedian, now) + send(key + ".p75", snapshot.get75thPercentile, now) + send(key + ".p95", snapshot.get95thPercentile, now) + send(key + ".p98", snapshot.get98thPercentile, now) + send(key + ".p99", snapshot.get99thPercentile, now) + send(key + ".p999", snapshot.get999thPercentile, now) + } + + private def sendTimer(now: Long, key: String, timer: Timer) { + val snapshot = timer.getSnapshot + send(key + ".max", convertDuration(snapshot.getMax), now) + send(key + ".mean", convertDuration(snapshot.getMean), now) + send(key + ".min", convertDuration(snapshot.getMin), now) + send(key + ".stddev", convertDuration(snapshot.getStdDev), now) + send(key + ".p50", convertDuration(snapshot.getMedian), now) + send(key + ".p75", convertDuration(snapshot.get75thPercentile), now) + send(key + ".p95", convertDuration(snapshot.get95thPercentile), now) + send(key + ".p98", convertDuration(snapshot.get98thPercentile), now) + send(key + ".p99", convertDuration(snapshot.get99thPercentile), now) + send(key + ".p999", convertDuration(snapshot.get999thPercentile), now) + sendMetered(now, key, timer) + } + + private def sendMetered(now: Long, key: String, meter: Metered) { + send(key + ".count", meter.getCount, now) + send(key + ".m1_rate", convertRate(meter.getOneMinuteRate), now) + send(key + ".m5_rate", convertRate(meter.getFiveMinuteRate), now) + send(key + ".m15_rate", convertRate(meter.getFifteenMinuteRate), now) + send(key + ".mean_rate", convertRate(meter.getMeanRate), now) + } + + private def sendGauge(now: Long, key: String, gauge: Gauge[_]) { + sendNumericOrIgnore(key + ".gauge", gauge.getValue, now) + } + + private def sendCounter(now: Long, key: String, counter: Counter) { + sendNumericOrIgnore(key + ".count", counter.getCount, now) + } + + private def sendKnownOpsInTimespanCounter(now: Long, key: String, counter: KnownOpsInTimespanTimer) { + send(key + ".ops", counter.getCount, now) + send(key + ".time", counter.elapsedTime, now) + send(key + ".opsPerSec", counter.opsPerSecond, now) + send(key + ".avg", counter.avgDuration, now) + } + + private def sendHdrHistogram(now: Long, key: String, hist: HdrHistogram) { + val snapshot = hist.getData + send(key + ".min", snapshot.getMinValue, now) + send(key + ".max", snapshot.getMaxValue, now) + send(key + ".mean", snapshot.getMean, now) + send(key + ".stddev", snapshot.getStdDeviation, now) + send(key + ".p75", snapshot.getValueAtPercentile(75.0), now) + send(key + ".p95", snapshot.getValueAtPercentile(95.0), now) + send(key + ".p98", snapshot.getValueAtPercentile(98.0), now) + send(key + ".p99", snapshot.getValueAtPercentile(99.0), now) + send(key + ".p999", snapshot.getValueAtPercentile(99.9), now) + } + + private def sendAveragingGauge(now: Long, key: String, gauge: AveragingGauge) { + sendNumericOrIgnore(key + ".avg-gauge", gauge.getValue, now) + } + + override def stop(): Unit = try { + super.stop() + graphite.close() + } catch { + case ex: Exception ⇒ System.err.println("Was unable to close Graphite connection: " + ex.getMessage) + } + + private def sendNumericOrIgnore(key: String, value: Any, now: Long) { + // seriously nothing better than this? (without Any => String => Num) + value match { + case v: Int ⇒ send(key, v, now) + case v: Long ⇒ send(key, v, now) + case v: Byte ⇒ send(key, v, now) + case v: Short ⇒ send(key, v, now) + case v: Float ⇒ send(key, v, now) + case v: Double ⇒ send(key, v, now) + case _ ⇒ // ignore non-numeric metric... + } + } + + private def send(key: String, value: Double, now: Long) { + if (value >= 0) + graphite.send(s"$prefix.$key", "%2.2f".format(value), now) + } + + private def send(key: String, value: Long, now: Long) { + if (value >= 0) // + graphite.send(s"$prefix.$key", value.toString, now) + } + + private def sendWithBanner(s: String, c: Char) { + print(s) + print(' ') + var i: Int = 0 + while (i < (ConsoleWidth - s.length - 1)) { + print(c) + i += 1 + } + println() + } + +} + diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/PrettyDuration.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/PrettyDuration.scala new file mode 100644 index 0000000000..5edbdc692f --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/PrettyDuration.scala @@ -0,0 +1,48 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics.reporter + +import scala.concurrent.duration._ + +object PrettyDuration { + + implicit class PrettyPrintableDuration(val d: Duration) extends AnyVal { + + def pretty: String = pretty(includeNanos = false) + + /** Selects most apropriate TimeUnit for given duration and formats it accordingly */ + def pretty(includeNanos: Boolean, precision: Int = 2): String = { + require(precision > 0, "precision must be > 0") + + val nanos = d.toNanos + val unit = chooseUnit(nanos) + val value = nanos.toDouble / NANOSECONDS.convert(1, unit) + + s"%.${precision}g %s%s".format(value, abbreviate(unit), if (includeNanos) s" ($nanos ns)" else "") + } + + def chooseUnit(nanos: Long): TimeUnit = { + val d = nanos.nanos + + if (d.toDays > 0) DAYS + else if (d.toHours > 0) HOURS + else if (d.toMinutes > 0) MINUTES + else if (d.toSeconds > 0) SECONDS + else if (d.toMillis > 0) MILLISECONDS + else if (d.toMicros > 0) MICROSECONDS + else NANOSECONDS + } + + def abbreviate(unit: TimeUnit): String = unit match { + case NANOSECONDS ⇒ "ns" + case MICROSECONDS ⇒ "μs" + case MILLISECONDS ⇒ "ms" + case SECONDS ⇒ "s" + case MINUTES ⇒ "min" + case HOURS ⇒ "h" + case DAYS ⇒ "d" + } + } + +} diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/PrettyDurationSpec.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/PrettyDurationSpec.scala new file mode 100644 index 0000000000..0529a121fc --- /dev/null +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/PrettyDurationSpec.scala @@ -0,0 +1,31 @@ +/** + * Copyright (C) 2009-2014 Typesafe Inc. + */ +package akka.testkit.metrics.reporter + +import org.scalatest.{ Matchers, FlatSpec } + +class PrettyDurationSpec extends FlatSpec with Matchers { + + behavior of "PrettyDuration" + + import concurrent.duration._ + import PrettyDuration._ + + val cases = + 95.nanos -> "95 ns" :: + 9500.nanos -> "9.5 μs" :: + 9500.micros -> "9.5 ms" :: + 9500.millis -> "9.5 s" :: + 95.seconds -> "1.6 min" :: + 95.minutes -> "1.6 h" :: + 95.hours -> "4.0 d" :: + Nil + + cases foreach { + case (d, prettyString) ⇒ + it should s"print $d seconds as $prettyString" in { + d.pretty should equal(prettyString) + } + } +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 9d6246cf1d..82af337b78 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -70,6 +70,14 @@ object Dependencies { // mirrored in OSGi sample val paxExam = "org.ops4j.pax.exam" % "pax-exam-junit4" % "2.6.0" % "test" // ApacheV2 val scalaXml = "org.scala-lang.modules" %% "scala-xml" % "1.0.1" % "test" + + // metrics, measurements, perf testing + val metrics = "com.codahale.metrics" % "metrics-core" % "3.0.1" % "test" // ApacheV2 + val metricsJvm = "com.codahale.metrics" % "metrics-jvm" % "3.0.1" % "test" // ApacheV2 + val metricsGraphite = "com.codahale.metrics" % "metrics-graphite" % "3.0.1" % "test" // ApacheV2 + val latencyUtils = "org.latencyutils" % "LatencyUtils" % "1.0.3" % "test" // Free BSD + val hdrHistogram = "org.hdrhistogram" % "HdrHistogram" % "1.1.4" % "test" // CC0 + val metricsAll = Seq(metrics, metricsJvm, metricsGraphite, latencyUtils, hdrHistogram) } } @@ -79,7 +87,7 @@ object Dependencies { val actor = Seq(config) - val testkit = Seq(Test.junit, Test.scalatest) + val testkit = Seq(Test.junit, Test.scalatest) ++ Test.metricsAll val actorTests = Seq(Test.junit, Test.scalatest, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck, protobuf, Test.junitIntf) diff --git a/project/TestExtras.scala b/project/TestExtras.scala index 404bff1244..e193e8c013 100644 --- a/project/TestExtras.scala +++ b/project/TestExtras.scala @@ -117,7 +117,7 @@ object TestExtras { object StatsDMetrics { - val statsd = config("statsd") extend Test + val statsd = config("statsd") val enabled = settingKey[Boolean]("Set to true when you want to send stats to statsd; Enable with `-Dakka.sbt.statsd=true`") @@ -128,7 +128,6 @@ object TestExtras { val port = settingKey[Int]("Port on which statsd is listening, defaults to 8125") - val settings = Seq( // configuration enabled in statsd := sys.props("akka.sbt.statsd") == "true", @@ -202,9 +201,9 @@ object TestExtras { private def testTimerKey(det: Event): String = s"${det.fullyQualifiedName}.${testSelectorToId(det.selector)}" - private def testSelectorToId(sel: testing.Selector): String = sel.asInstanceOf[TestSelector].testName().replaceAll("""[^\w]""", "_") + private def testSelectorToId(sel: testing.Selector): String = sanitize(sel.asInstanceOf[TestSelector].testName()) - private def testCounterKey(det: Event, status: Status): String = s"${det.fullyQualifiedName}.${status.toString.toLowerCase}" + private def testCounterKey(det: Event, status: Status): String = s"${sanitize(det.fullyQualifiedName)}.${status.toString.toLowerCase}" private def keySuccess(fullyQualifiedName: String): String = fullyQualifiedName + ".success" @@ -212,6 +211,8 @@ object TestExtras { private def keyError(fullyQualifiedName: String): String = fullyQualifiedName + ".error" + private def sanitize(s: String): String = s.replaceAll("""[^\w]""", "_") + } } diff --git a/project/plugins.sbt b/project/plugins.sbt index 8a131df25b..7838eafd15 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -23,4 +23,5 @@ addSbtPlugin("com.eed3si9n" % "sbt-unidoc" % "0.3.1") addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2") +// stats reporting libraryDependencies += "com.timgroup" % "java-statsd-client" % "2.0.0"