diff --git a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala index f98c6c0b8b..942333be7a 100644 --- a/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala +++ b/akka-stream-tests/src/test/scala/akka/stream/scaladsl/FlowRecoverWithSpec.scala @@ -115,20 +115,20 @@ class FlowRecoverWithSpec extends AkkaSpec { "terminate with exception after set number of retries" in assertAllStagesStopped { Source(1 to 3).map { a ⇒ if (a == 3) throw new IndexOutOfBoundsException() else a } - .recoverWithRetries(3, { - case t: Throwable ⇒ - Source(List(11, 22)).concat(Source.failed(ex)) - }).runWith(TestSink.probe[Int]) - .request(2) - .expectNextN(List(1, 2)) - .request(2) - .expectNextN(List(11, 22)) - .request(2) - .expectNextN(List(11, 22)) - .request(2) - .expectNextN(List(11, 22)) - .request(1) - .expectError(ex) + .recoverWithRetries(3, { + case t: Throwable ⇒ + Source(List(11, 22)).concat(Source.failed(ex)) + }).runWith(TestSink.probe[Int]) + .request(2) + .expectNextN(List(1, 2)) + .request(2) + .expectNextN(List(11, 22)) + .request(2) + .expectNextN(List(11, 22)) + .request(2) + .expectNextN(List(11, 22)) + .request(1) + .expectError(ex) } } } diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala index 97d210949f..00ecb7ade8 100644 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala +++ b/akka-testkit/src/test/scala/akka/testkit/metrics/MetricsKit.scala @@ -12,7 +12,7 @@ import com.typesafe.config.Config import java.util import scala.util.matching.Regex import scala.collection.mutable -import akka.testkit.metrics.reporter.{ GraphiteClient, AkkaGraphiteReporter, AkkaConsoleReporter } +import akka.testkit.metrics.reporter.AkkaConsoleReporter import org.scalatest.Notifying import scala.reflect.ClassTag @@ -24,7 +24,6 @@ import scala.reflect.ClassTag * This trait instead aims to give an high level overview as well as data for trend-analysis of long running tests. * * Reporting defaults to `ConsoleReporter`. - * In order to send registry to Graphite run sbt with the following property: `-Dakka.registry.reporting.0=graphite`. */ private[akka] trait MetricsKit extends MetricsKitOps { this: Notifying ⇒ @@ -61,23 +60,7 @@ private[akka] trait MetricsKit extends MetricsKitOps { } } - def configureGraphiteReporter() { - if (settings.Reporters.contains("graphite")) { - note(s"MetricsKit: Graphite reporter enabled, sending metrics to: ${settings.GraphiteReporter.Host}:${settings.GraphiteReporter.Port}") - val address = new InetSocketAddress(settings.GraphiteReporter.Host, settings.GraphiteReporter.Port) - val graphite = new GraphiteClient(address) - val akkaGraphiteReporter = new AkkaGraphiteReporter(registry, settings.GraphiteReporter.Prefix, graphite, settings.GraphiteReporter.Verbose) - - if (settings.GraphiteReporter.ScheduledReportInterval > Duration.Zero) { - akkaGraphiteReporter.start(settings.GraphiteReporter.ScheduledReportInterval.toMillis, TimeUnit.MILLISECONDS) - } - - reporters ::= akkaGraphiteReporter - } - } - configureConsoleReporter() - configureGraphiteReporter() } /** @@ -217,15 +200,6 @@ private[akka] class MetricsKitSettings(config: Config) { val Reporters = config.getStringList("akka.test.metrics.reporters") - object GraphiteReporter { - val Prefix = config.getString("akka.test.metrics.reporter.graphite.prefix") - lazy val Host = config.getString("akka.test.metrics.reporter.graphite.host").requiring(v ⇒ !v.trim.isEmpty, "akka.test.metrics.reporter.graphite.host was used but was empty!") - val Port = config.getInt("akka.test.metrics.reporter.graphite.port") - val Verbose = config.getBoolean("akka.test.metrics.reporter.graphite.verbose") - - val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.graphite.scheduled-report-interval") - } - object ConsoleReporter { val ScheduledReportInterval = config.getMillisDuration("akka.test.metrics.reporter.console.scheduled-report-interval") val Verbose = config.getBoolean("akka.test.metrics.reporter.console.verbose") diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaGraphiteReporter.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaGraphiteReporter.scala deleted file mode 100644 index 8571112e7c..0000000000 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/AkkaGraphiteReporter.scala +++ /dev/null @@ -1,184 +0,0 @@ -/** - * Copyright (C) 2009-2016 Lightbend Inc. - */ -package akka.testkit.metrics.reporter - -import java.text.DateFormat -import java.util -import java.util.concurrent.TimeUnit -import com.codahale.metrics._ -import java.util.{ Locale, Date } -import akka.testkit.metrics._ -import scala.concurrent.duration._ - -/** - * Used to report `com.codahale.metrics.Metric` types that the original `com.codahale.metrics.graphite.GraphiteReporter` is unaware of (cannot re-use directly because of private constructor). - */ -class AkkaGraphiteReporter( - registry: AkkaMetricRegistry, - prefix: String, - graphite: GraphiteClient, - verbose: Boolean = false) - extends ScheduledReporter(registry.asInstanceOf[MetricRegistry], "akka-graphite-reporter", MetricFilter.ALL, TimeUnit.SECONDS, TimeUnit.NANOSECONDS) { - - // todo get rid of ScheduledReporter (would mean removing codahale metrics)? - - private final val ConsoleWidth = 80 - - val locale = Locale.getDefault - val dateFormat = DateFormat.getDateTimeInstance(DateFormat.SHORT, DateFormat.MEDIUM, locale) - val clock = Clock.defaultClock() - - override def report(gauges: util.SortedMap[String, Gauge[_]], counters: util.SortedMap[String, Counter], histograms: util.SortedMap[String, Histogram], meters: util.SortedMap[String, Meter], timers: util.SortedMap[String, Timer]) { - val dateTime = dateFormat.format(new Date(clock.getTime)) - - // akka-custom metrics - val knownOpsInTimespanCounters = registry.getKnownOpsInTimespanCounters - val hdrHistograms = registry.getHdrHistograms - val averagingGauges = registry.getAveragingGauges - - val metricsCount = List(gauges, counters, histograms, meters, timers).map(_.size).sum + List(knownOpsInTimespanCounters, hdrHistograms).map(_.size).sum - sendWithBanner("== AkkaGraphiteReporter @ " + dateTime + " == (" + metricsCount + " metrics)", '=') - - try { - // graphite takes timestamps in seconds - val now = System.currentTimeMillis.millis.toSeconds - - // default Metrics types - import collection.JavaConverters._ - sendMetrics(now, gauges.asScala, sendGauge) - sendMetrics(now, counters.asScala, sendCounter) - sendMetrics(now, histograms.asScala, sendHistogram) - sendMetrics(now, meters.asScala, sendMetered) - sendMetrics(now, timers.asScala, sendTimer) - - sendMetrics(now, knownOpsInTimespanCounters, sendKnownOpsInTimespanCounter) - sendMetrics(now, hdrHistograms, sendHdrHistogram) - sendMetrics(now, averagingGauges, sendAveragingGauge) - - } catch { - case ex: Exception ⇒ throw new RuntimeException("Unable to send metrics to Graphite!", ex) - } - } - - def sendMetrics[T <: Metric](now: Long, metrics: Iterable[(String, T)], send: (Long, String, T) ⇒ Unit) { - for ((key, metric) ← metrics) { - if (verbose) - println(" " + key) - send(now, key, metric) - } - } - - private def sendHistogram(now: Long, key: String, histogram: Histogram) { - val snapshot = histogram.getSnapshot - send(key + ".count", histogram.getCount, now) - send(key + ".max", snapshot.getMax, now) - send(key + ".mean", snapshot.getMean, now) - send(key + ".min", snapshot.getMin, now) - send(key + ".stddev", snapshot.getStdDev, now) - send(key + ".p50", snapshot.getMedian, now) - send(key + ".p75", snapshot.get75thPercentile, now) - send(key + ".p95", snapshot.get95thPercentile, now) - send(key + ".p98", snapshot.get98thPercentile, now) - send(key + ".p99", snapshot.get99thPercentile, now) - send(key + ".p999", snapshot.get999thPercentile, now) - } - - private def sendTimer(now: Long, key: String, timer: Timer) { - val snapshot = timer.getSnapshot - send(key + ".max", convertDuration(snapshot.getMax), now) - send(key + ".mean", convertDuration(snapshot.getMean), now) - send(key + ".min", convertDuration(snapshot.getMin), now) - send(key + ".stddev", convertDuration(snapshot.getStdDev), now) - send(key + ".p50", convertDuration(snapshot.getMedian), now) - send(key + ".p75", convertDuration(snapshot.get75thPercentile), now) - send(key + ".p95", convertDuration(snapshot.get95thPercentile), now) - send(key + ".p98", convertDuration(snapshot.get98thPercentile), now) - send(key + ".p99", convertDuration(snapshot.get99thPercentile), now) - send(key + ".p999", convertDuration(snapshot.get999thPercentile), now) - sendMetered(now, key, timer) - } - - private def sendMetered(now: Long, key: String, meter: Metered) { - send(key + ".count", meter.getCount, now) - send(key + ".m1_rate", convertRate(meter.getOneMinuteRate), now) - send(key + ".m5_rate", convertRate(meter.getFiveMinuteRate), now) - send(key + ".m15_rate", convertRate(meter.getFifteenMinuteRate), now) - send(key + ".mean_rate", convertRate(meter.getMeanRate), now) - } - - private def sendGauge(now: Long, key: String, gauge: Gauge[_]) { - sendNumericOrIgnore(key + ".gauge", gauge.getValue, now) - } - - private def sendCounter(now: Long, key: String, counter: Counter) { - sendNumericOrIgnore(key + ".count", counter.getCount, now) - } - - private def sendKnownOpsInTimespanCounter(now: Long, key: String, counter: KnownOpsInTimespanTimer) { - send(key + ".ops", counter.getCount, now) - send(key + ".time", counter.elapsedTime, now) - send(key + ".opsPerSec", counter.opsPerSecond, now) - send(key + ".avg", counter.avgDuration, now) - } - - private def sendHdrHistogram(now: Long, key: String, hist: HdrHistogram) { - val snapshot = hist.getData - send(key + ".min", snapshot.getMinValue, now) - send(key + ".max", snapshot.getMaxValue, now) - send(key + ".mean", snapshot.getMean, now) - send(key + ".stddev", snapshot.getStdDeviation, now) - send(key + ".p75", snapshot.getValueAtPercentile(75.0), now) - send(key + ".p95", snapshot.getValueAtPercentile(95.0), now) - send(key + ".p98", snapshot.getValueAtPercentile(98.0), now) - send(key + ".p99", snapshot.getValueAtPercentile(99.0), now) - send(key + ".p999", snapshot.getValueAtPercentile(99.9), now) - } - - private def sendAveragingGauge(now: Long, key: String, gauge: AveragingGauge) { - sendNumericOrIgnore(key + ".avg-gauge", gauge.getValue, now) - } - - override def stop(): Unit = try { - super.stop() - graphite.close() - } catch { - case ex: Exception ⇒ System.err.println("Was unable to close Graphite connection: " + ex.getMessage) - } - - private def sendNumericOrIgnore(key: String, value: Any, now: Long) { - // seriously nothing better than this? (without Any => String => Num) - value match { - case v: Int ⇒ send(key, v, now) - case v: Long ⇒ send(key, v, now) - case v: Byte ⇒ send(key, v, now) - case v: Short ⇒ send(key, v, now) - case v: Float ⇒ send(key, v, now) - case v: Double ⇒ send(key, v, now) - case _ ⇒ // ignore non-numeric metric... - } - } - - private def send(key: String, value: Double, now: Long) { - if (value >= 0) - graphite.send(s"$prefix.$key", "%2.2f".format(value), now) - } - - private def send(key: String, value: Long, now: Long) { - if (value >= 0) - graphite.send(s"$prefix.$key", value.toString, now) - } - - private def sendWithBanner(s: String, c: Char) { - print(s) - print(' ') - var i: Int = 0 - while (i < (ConsoleWidth - s.length - 1)) { - print(c) - i += 1 - } - println() - } - -} - diff --git a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/GraphiteClient.scala b/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/GraphiteClient.scala deleted file mode 100644 index 47a3233261..0000000000 --- a/akka-testkit/src/test/scala/akka/testkit/metrics/reporter/GraphiteClient.scala +++ /dev/null @@ -1,52 +0,0 @@ -/** - * Copyright (C) 2014-2016 Lightbend Inc. - */ -package akka.testkit.metrics.reporter - -import java.util.regex.Pattern -import java.nio.charset.Charset -import java.io._ -import javax.net.SocketFactory -import java.net.InetSocketAddress - -/** - * Carbon (graphite) client, which can be used to send metrics. - * - * The data is sent over a plain Socket, even though it would fit AkkaIO nicely, but this way it has no dependencies. - */ -class GraphiteClient(address: InetSocketAddress) extends Closeable { - - private final val WHITESPACE = Pattern.compile("[\\s]+") - private final val charset: Charset = Charset.forName("UTF-8") - - private lazy val socket = { - val s = SocketFactory.getDefault.createSocket(address.getAddress, address.getPort) - s.setKeepAlive(true) - s - } - - private lazy val writer = new BufferedWriter(new OutputStreamWriter(socket.getOutputStream, charset)) - - /** Send measurement carbon server. Thread-safe. */ - def send(name: String, value: String, timestamp: Long) { - val sb = new StringBuilder() - .append(sanitize(name)).append(' ') - .append(sanitize(value)).append(' ') - .append(timestamp.toString).append('\n') - - // The write calls below handle the string in-one-go (locking); - // Whereas the metrics' implementation of the graphite client uses multiple `write` calls, - // which could become interwoven, thus producing a wrong metric-line, when called by multiple threads. - writer.write(sb.toString()) - writer.flush() - } - - /** Closes underlying connection. */ - def close() { - try socket.close() finally writer.close() - } - - protected def sanitize(s: String): String = { - WHITESPACE.matcher(s).replaceAll("-") - } -} diff --git a/project/TestExtras.scala b/project/TestExtras.scala index a4ba5eef2e..980198ba5a 100644 --- a/project/TestExtras.scala +++ b/project/TestExtras.scala @@ -3,16 +3,8 @@ */ package akka +import sbt.Keys._ import sbt._ -import Keys._ - -import com.timgroup.statsd.{StatsDClientErrorHandler, NonBlockingStatsDClient} -import sbt.testing.{TestSelector, Status, Event} -import scala.util.Try -import java.io.{InputStreamReader, BufferedReader, DataOutputStream, OutputStreamWriter} -import java.net.{InetAddress, URLEncoder, HttpURLConnection, Socket} -import com.typesafe.sbt.SbtGit -import com.typesafe.sbt.SbtGit.GitKeys._ object TestExtras { diff --git a/project/plugins.sbt b/project/plugins.sbt index 57ac2925c4..cbc569d776 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -31,9 +31,6 @@ addSbtPlugin("com.typesafe.sbt" % "sbt-git" % "0.6.2") addSbtPlugin("pl.project13.scala" % "sbt-jmh" % "0.2.3") -// stats reporting -libraryDependencies += "com.timgroup" % "java-statsd-client" % "2.0.0" - addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.0.0-RC1") // for advanced PR validation features