-doc,str,tes remove statsd and graphite reporting code, its not used (#20383)
This commit is contained in:
parent
30827df017
commit
f9cbc36b03
6 changed files with 16 additions and 289 deletions
|
|
@ -115,20 +115,20 @@ class FlowRecoverWithSpec extends AkkaSpec {
|
|||
|
||||
"terminate with exception after set number of retries" in assertAllStagesStopped {
|
||||
Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a }
|
||||
.recoverWithRetries(3, {
|
||||
case t: Throwable ⇒
|
||||
Source(List(11, 22)).concat(Source.failed(ex))
|
||||
}).runWith(TestSink.probe[Int])
|
||||
.request(2)
|
||||
.expectNextN(List(1, 2))
|
||||
.request(2)
|
||||
.expectNextN(List(11, 22))
|
||||
.request(2)
|
||||
.expectNextN(List(11, 22))
|
||||
.request(2)
|
||||
.expectNextN(List(11, 22))
|
||||
.request(1)
|
||||
.expectError(ex)
|
||||
.recoverWithRetries(3, {
|
||||
case t: Throwable ⇒
|
||||
Source(List(11, 22)).concat(Source.failed(ex))
|
||||
}).runWith(TestSink.probe[Int])
|
||||
.request(2)
|
||||
.expectNextN(List(1, 2))
|
||||
.request(2)
|
||||
.expectNextN(List(11, 22))
|
||||
.request(2)
|
||||
.expectNextN(List(11, 22))
|
||||
.request(2)
|
||||
.expectNextN(List(11, 22))
|
||||
.request(1)
|
||||
.expectError(ex)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ import com.typesafe.config.Config
|
|||
import java.util
|
||||
import scala.util.matching.Regex
|
||||
import scala.collection.mutable
|
||||
import akka.testkit.metrics.reporter.{ GraphiteClient, AkkaGraphiteReporter, AkkaConsoleReporter }
|
||||
import akka.testkit.metrics.reporter.AkkaConsoleReporter
|
||||
import org.scalatest.Notifying
|
||||
import scala.reflect.ClassTag
|
||||
|
||||
|
|
@ -24,7 +24,6 @@ import scala.reflect.ClassTag
|
|||
* This trait instead aims to give an high level overview as well as data for trend-analysis of long running tests.
|
||||
*
|
||||
* Reporting defaults to `ConsoleReporter`.
|
||||
* In order to send registry to Graphite run sbt with the following property: `-Dakka.registry.reporting.0=graphite`.
|
||||
*/
|
||||
private[akka] trait MetricsKit extends MetricsKitOps {
|
||||
this: Notifying ⇒
|
||||
|
|
@ -61,23 +60,7 @@ private[akka] trait MetricsKit extends MetricsKitOps {
|
|||
}
|
||||
}
|
||||
|
||||
def configureGraphiteReporter() {
|
||||
if (settings.Reporters.contains("graphite")) {
|
||||
note(s"MetricsKit: Graphite reporter enabled, sending metrics to: ${settings.GraphiteReporter.Host}:${settings.GraphiteReporter.Port}")
|
||||
val address = new InetSocketAddress(settings.GraphiteReporter.Host, settings.GraphiteReporter.Port)
|
||||
val graphite = new GraphiteClient(address)
|
||||
val akkaGraphiteReporter = new AkkaGraphiteReporter(registry, settings.GraphiteReporter.Prefix, graphite, settings.GraphiteReporter.Verbose)
|
||||
|
||||
if (settings.GraphiteReporter.ScheduledReportInterval > Duration.Zero) {
|
||||
akkaGraphiteReporter.start(settings.GraphiteReporter.ScheduledReportInterval.toMillis, TimeUnit.MILLISECONDS)
|
||||
}
|
||||
|
||||
reporters ::= akkaGraphiteReporter
|
||||
}
|
||||
}
|
||||
|
||||
configureConsoleReporter()
|
||||
configureGraphiteReporter()
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -217,15 +200,6 @@ private[akka] class MetricsKitSettings(config: Config) {
|
|||
|
||||
val Reporters = config.getStringList("akka.test.metrics.reporters")
|
||||
|
||||
object GraphiteReporter {
|
||||
val Prefix = config.getString("akka.test.metrics.reporter.graphite.prefix")
|
||||
lazy val Host = config.getString("akka.test.metrics.reporter.graphite.host").requiring(v ⇒ !v.trim.isEmpty, "akka.test.metrics.reporter.graphite.host was used but was empty!")
|
||||
val Port = config.getInt("akka.test.metrics.reporter.graphite.port")
|
||||
val Verbose = config.getBoolean("akka.test.metrics.reporter.graphite.verbose")
|
||||
|
||||
val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.graphite.scheduled-report-interval")
|
||||
}
|
||||
|
||||
object ConsoleReporter {
|
||||
val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.console.scheduled-report-interval")
|
||||
val Verbose = config.getBoolean("akka.test.metrics.reporter.console.verbose")
|
||||
|
|
|
|||
|
|
@ -1,184 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2009-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.metrics.reporter
|
||||
|
||||
import java.text.DateFormat
|
||||
import java.util
|
||||
import java.util.concurrent.TimeUnit
|
||||
import com.codahale.metrics._
|
||||
import java.util.{ Locale, Date }
|
||||
import akka.testkit.metrics._
|
||||
import scala.concurrent.duration._
|
||||
|
||||
/**
|
||||
* Used to report `com.codahale.metrics.Metric` types that the original `com.codahale.metrics.graphite.GraphiteReporter` is unaware of (cannot re-use directly because of private constructor).
|
||||
*/
|
||||
class AkkaGraphiteReporter(
|
||||
registry: AkkaMetricRegistry,
|
||||
prefix: String,
|
||||
graphite: GraphiteClient,
|
||||
verbose: Boolean = false)
|
||||
extends ScheduledReporter(registry.asInstanceOf[MetricRegistry], "akka-graphite-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.NANOSECONDS) {
|
||||
|
||||
// todo get rid of ScheduledReporter (would mean removing codahale metrics)?
|
||||
|
||||
private final val ConsoleWidth = 80
|
||||
|
||||
val locale = Locale.getDefault
|
||||
val dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM, locale)
|
||||
val clock = Clock.defaultClock()
|
||||
|
||||
override def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) {
|
||||
val dateTime = dateFormat.format(new Date(clock.getTime))
|
||||
|
||||
// akka-custom metrics
|
||||
val knownOpsInTimespanCounters = registry.getKnownOpsInTimespanCounters
|
||||
val hdrHistograms = registry.getHdrHistograms
|
||||
val averagingGauges = registry.getAveragingGauges
|
||||
|
||||
val metricsCount = List(gauges, counters, histograms, meters, timers).map(_.size).sum + List(knownOpsInTimespanCounters, hdrHistograms).map(_.size).sum
|
||||
sendWithBanner("== AkkaGraphiteReporter @ " + dateTime + " == (" + metricsCount + " metrics)", '=')
|
||||
|
||||
try {
|
||||
// graphite takes timestamps in seconds
|
||||
val now = System.currentTimeMillis.millis.toSeconds
|
||||
|
||||
// default Metrics types
|
||||
import collection.JavaConverters._
|
||||
sendMetrics(now, gauges.asScala, sendGauge)
|
||||
sendMetrics(now, counters.asScala, sendCounter)
|
||||
sendMetrics(now, histograms.asScala, sendHistogram)
|
||||
sendMetrics(now, meters.asScala, sendMetered)
|
||||
sendMetrics(now, timers.asScala, sendTimer)
|
||||
|
||||
sendMetrics(now, knownOpsInTimespanCounters, sendKnownOpsInTimespanCounter)
|
||||
sendMetrics(now, hdrHistograms, sendHdrHistogram)
|
||||
sendMetrics(now, averagingGauges, sendAveragingGauge)
|
||||
|
||||
} catch {
|
||||
case ex: Exception ⇒ throw new RuntimeException("Unable to send metrics to Graphite!", ex)
|
||||
}
|
||||
}
|
||||
|
||||
def sendMetrics[T <: Metric](now: Long, metrics: Iterable[(String, T)], send: (Long, String, T) ⇒ Unit) {
|
||||
for ((key, metric) ← metrics) {
|
||||
if (verbose)
|
||||
println(" " + key)
|
||||
send(now, key, metric)
|
||||
}
|
||||
}
|
||||
|
||||
private def sendHistogram(now: Long, key: String, histogram: Histogram) {
|
||||
val snapshot = histogram.getSnapshot
|
||||
send(key + ".count", histogram.getCount, now)
|
||||
send(key + ".max", snapshot.getMax, now)
|
||||
send(key + ".mean", snapshot.getMean, now)
|
||||
send(key + ".min", snapshot.getMin, now)
|
||||
send(key + ".stddev", snapshot.getStdDev, now)
|
||||
send(key + ".p50", snapshot.getMedian, now)
|
||||
send(key + ".p75", snapshot.get75thPercentile, now)
|
||||
send(key + ".p95", snapshot.get95thPercentile, now)
|
||||
send(key + ".p98", snapshot.get98thPercentile, now)
|
||||
send(key + ".p99", snapshot.get99thPercentile, now)
|
||||
send(key + ".p999", snapshot.get999thPercentile, now)
|
||||
}
|
||||
|
||||
private def sendTimer(now: Long, key: String, timer: Timer) {
|
||||
val snapshot = timer.getSnapshot
|
||||
send(key + ".max", convertDuration(snapshot.getMax), now)
|
||||
send(key + ".mean", convertDuration(snapshot.getMean), now)
|
||||
send(key + ".min", convertDuration(snapshot.getMin), now)
|
||||
send(key + ".stddev", convertDuration(snapshot.getStdDev), now)
|
||||
send(key + ".p50", convertDuration(snapshot.getMedian), now)
|
||||
send(key + ".p75", convertDuration(snapshot.get75thPercentile), now)
|
||||
send(key + ".p95", convertDuration(snapshot.get95thPercentile), now)
|
||||
send(key + ".p98", convertDuration(snapshot.get98thPercentile), now)
|
||||
send(key + ".p99", convertDuration(snapshot.get99thPercentile), now)
|
||||
send(key + ".p999", convertDuration(snapshot.get999thPercentile), now)
|
||||
sendMetered(now, key, timer)
|
||||
}
|
||||
|
||||
private def sendMetered(now: Long, key: String, meter: Metered) {
|
||||
send(key + ".count", meter.getCount, now)
|
||||
send(key + ".m1_rate", convertRate(meter.getOneMinuteRate), now)
|
||||
send(key + ".m5_rate", convertRate(meter.getFiveMinuteRate), now)
|
||||
send(key + ".m15_rate", convertRate(meter.getFifteenMinuteRate), now)
|
||||
send(key + ".mean_rate", convertRate(meter.getMeanRate), now)
|
||||
}
|
||||
|
||||
private def sendGauge(now: Long, key: String, gauge: Gauge[_]) {
|
||||
sendNumericOrIgnore(key + ".gauge", gauge.getValue, now)
|
||||
}
|
||||
|
||||
private def sendCounter(now: Long, key: String, counter: Counter) {
|
||||
sendNumericOrIgnore(key + ".count", counter.getCount, now)
|
||||
}
|
||||
|
||||
private def sendKnownOpsInTimespanCounter(now: Long, key: String, counter: KnownOpsInTimespanTimer) {
|
||||
send(key + ".ops", counter.getCount, now)
|
||||
send(key + ".time", counter.elapsedTime, now)
|
||||
send(key + ".opsPerSec", counter.opsPerSecond, now)
|
||||
send(key + ".avg", counter.avgDuration, now)
|
||||
}
|
||||
|
||||
private def sendHdrHistogram(now: Long, key: String, hist: HdrHistogram) {
|
||||
val snapshot = hist.getData
|
||||
send(key + ".min", snapshot.getMinValue, now)
|
||||
send(key + ".max", snapshot.getMaxValue, now)
|
||||
send(key + ".mean", snapshot.getMean, now)
|
||||
send(key + ".stddev", snapshot.getStdDeviation, now)
|
||||
send(key + ".p75", snapshot.getValueAtPercentile(75.0), now)
|
||||
send(key + ".p95", snapshot.getValueAtPercentile(95.0), now)
|
||||
send(key + ".p98", snapshot.getValueAtPercentile(98.0), now)
|
||||
send(key + ".p99", snapshot.getValueAtPercentile(99.0), now)
|
||||
send(key + ".p999", snapshot.getValueAtPercentile(99.9), now)
|
||||
}
|
||||
|
||||
private def sendAveragingGauge(now: Long, key: String, gauge: AveragingGauge) {
|
||||
sendNumericOrIgnore(key + ".avg-gauge", gauge.getValue, now)
|
||||
}
|
||||
|
||||
override def stop(): Unit = try {
|
||||
super.stop()
|
||||
graphite.close()
|
||||
} catch {
|
||||
case ex: Exception ⇒ System.err.println("Was unable to close Graphite connection: " + ex.getMessage)
|
||||
}
|
||||
|
||||
private def sendNumericOrIgnore(key: String, value: Any, now: Long) {
|
||||
// seriously nothing better than this? (without Any => String => Num)
|
||||
value match {
|
||||
case v: Int ⇒ send(key, v, now)
|
||||
case v: Long ⇒ send(key, v, now)
|
||||
case v: Byte ⇒ send(key, v, now)
|
||||
case v: Short ⇒ send(key, v, now)
|
||||
case v: Float ⇒ send(key, v, now)
|
||||
case v: Double ⇒ send(key, v, now)
|
||||
case _ ⇒ // ignore non-numeric metric...
|
||||
}
|
||||
}
|
||||
|
||||
private def send(key: String, value: Double, now: Long) {
|
||||
if (value >= 0)
|
||||
graphite.send(s"$prefix.$key", "%2.2f".format(value), now)
|
||||
}
|
||||
|
||||
private def send(key: String, value: Long, now: Long) {
|
||||
if (value >= 0)
|
||||
graphite.send(s"$prefix.$key", value.toString, now)
|
||||
}
|
||||
|
||||
private def sendWithBanner(s: String, c: Char) {
|
||||
print(s)
|
||||
print(' ')
|
||||
var i: Int = 0
|
||||
while (i < (ConsoleWidth - s.length - 1)) {
|
||||
print(c)
|
||||
i += 1
|
||||
}
|
||||
println()
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
|
@ -1,52 +0,0 @@
|
|||
/**
|
||||
* Copyright (C) 2014-2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.testkit.metrics.reporter
|
||||
|
||||
import java.util.regex.Pattern
|
||||
import java.nio.charset.Charset
|
||||
import java.io._
|
||||
import javax.net.SocketFactory
|
||||
import java.net.InetSocketAddress
|
||||
|
||||
/**
|
||||
* Carbon (graphite) client, which can be used to send metrics.
|
||||
*
|
||||
* The data is sent over a plain Socket, even though it would fit AkkaIO nicely, but this way it has no dependencies.
|
||||
*/
|
||||
class GraphiteClient(address: InetSocketAddress) extends Closeable {
|
||||
|
||||
private final val WHITESPACE = Pattern.compile("[\\s]+")
|
||||
private final val charset: Charset = Charset.forName("UTF-8")
|
||||
|
||||
private lazy val socket = {
|
||||
val s = SocketFactory.getDefault.createSocket(address.getAddress, address.getPort)
|
||||
s.setKeepAlive(true)
|
||||
s
|
||||
}
|
||||
|
||||
private lazy val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream, charset))
|
||||
|
||||
/** Send measurement carbon server. Thread-safe. */
|
||||
def send(name: String, value: String, timestamp: Long) {
|
||||
val sb = new StringBuilder()
|
||||
.append(sanitize(name)).append(' ')
|
||||
.append(sanitize(value)).append(' ')
|
||||
.append(timestamp.toString).append('\n')
|
||||
|
||||
// The write calls below handle the string in-one-go (locking);
|
||||
// Whereas the metrics' implementation of the graphite client uses multiple `write` calls,
|
||||
// which could become interwoven, thus producing a wrong metric-line, when called by multiple threads.
|
||||
writer.write(sb.toString())
|
||||
writer.flush()
|
||||
}
|
||||
|
||||
/** Closes underlying connection. */
|
||||
def close() {
|
||||
try socket.close() finally writer.close()
|
||||
}
|
||||
|
||||
protected def sanitize(s: String): String = {
|
||||
WHITESPACE.matcher(s).replaceAll("-")
|
||||
}
|
||||
}
|
||||
|
|
@ -3,16 +3,8 @@
|
|||
*/
|
||||
package akka
|
||||
|
||||
import sbt.Keys._
|
||||
import sbt._
|
||||
import Keys._
|
||||
|
||||
import com.timgroup.statsd.{StatsDClientErrorHandler, NonBlockingStatsDClient}
|
||||
import sbt.testing.{TestSelector, Status, Event}
|
||||
import scala.util.Try
|
||||
import java.io.{InputStreamReader, BufferedReader, DataOutputStream, OutputStreamWriter}
|
||||
import java.net.{InetAddress, URLEncoder, HttpURLConnection, Socket}
|
||||
import com.typesafe.sbt.SbtGit
|
||||
import com.typesafe.sbt.SbtGit.GitKeys._
|
||||
|
||||
object TestExtras {
|
||||
|
||||
|
|
|
|||
|
|
@ -31,9 +31,6 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2")
|
|||
|
||||
addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.3")
|
||||
|
||||
// stats reporting
|
||||
libraryDependencies += "com.timgroup" % "java-statsd-client" % "2.0.0"
|
||||
|
||||
addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0-RC1")
|
||||
|
||||
// for advanced PR validation features
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue