diff --git a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala index 5b7431d564..8761f48f34 100644 --- a/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/actor/dispatch/ActorModelSpec.scala @@ -22,6 +22,7 @@ import akka.pattern.ask import akka.testkit._ import akka.util.{ Timeout, Switch, Duration } import akka.util.duration._ +import annotation.tailrec object ActorModelSpec { @@ -224,16 +225,16 @@ object ActorModelSpec { } } - def await(until: Long)(condition: ⇒ Boolean): Unit = try { - while (System.currentTimeMillis() <= until) { - try { - if (condition) return else Thread.sleep(25) - } catch { - case e: InterruptedException ⇒ - } + def await(until: Long)(condition: ⇒ Boolean): Unit = if (System.currentTimeMillis() <= until) { + var done = false + try { + done = condition + if (!done) Thread.sleep(25) + } catch { + case e: InterruptedException ⇒ } - throw new AssertionError("await failed") - } + if (!done) await(until)(condition) + } else throw new AssertionError("await failed") } abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout { diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala index e0e6b0e1e2..1cccd19417 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/BenchResultRepository.scala @@ -102,20 +102,21 @@ class FileBenchResultRepository extends BenchResultRepository { private def save(stats: Stats) { new File(serDir).mkdirs - if (!serDirExists) return - val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp)) - val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser" - val f = new File(serDir, name) - var out: ObjectOutputStream = null - try { - out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f))) - out.writeObject(stats) - } catch { - case e: Exception ⇒ - val errMsg = "Failed to save [%s] to [%s], due to [%s]".format(stats, f.getAbsolutePath, e.getMessage) - throw new RuntimeException(errMsg) - } finally { - if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ } + if (serDirExists) { + val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp)) + val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser" + val f = new File(serDir, name) + var out: ObjectOutputStream = null + try { + out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f))) + out.writeObject(stats) + } catch { + case e: Exception ⇒ + val errMsg = "Failed to save [%s] to [%s], due to [%s]".format(stats, f.getAbsolutePath, e.getMessage) + throw new RuntimeException(errMsg) + } finally { + if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ } + } } } @@ -142,19 +143,20 @@ class FileBenchResultRepository extends BenchResultRepository { def saveHtmlReport(content: String, fileName: String) { new File(htmlDir).mkdirs - if (!htmlDirExists) return - val f = new File(htmlDir, fileName) - var writer: PrintWriter = null - try { - writer = new PrintWriter(new FileWriter(f)) - writer.print(content) - writer.flush() - } catch { - case e: Exception ⇒ - val errMsg = "Failed to save report to [%s], due to [%s]".format(f.getAbsolutePath, e.getMessage) - throw new RuntimeException(errMsg) - } finally { - if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } + if (htmlDirExists) { + val f = new File(htmlDir, fileName) + var writer: PrintWriter = null + try { + writer = new PrintWriter(new FileWriter(f)) + writer.print(content) + writer.flush() + } catch { + case e: Exception ⇒ + val errMsg = "Failed to save report to [%s], due to [%s]".format(f.getAbsolutePath, e.getMessage) + throw new RuntimeException(errMsg) + } finally { + if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ } + } } } diff --git a/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala index c513200310..52b30ceee7 100644 --- a/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala +++ b/akka-actor-tests/src/test/scala/akka/performance/workbench/GoogleChartBuilder.scala @@ -17,131 +17,133 @@ object GoogleChartBuilder { * Builds a bar chart for tps in the statistics. */ def tpsChartUrl(statsByTimestamp: TreeMap[Long, Seq[Stats]], title: String, legend: Stats ⇒ String): String = { - if (statsByTimestamp.isEmpty) return "" + if (statsByTimestamp.isEmpty) "" + else { + val loads = statsByTimestamp.values.head.map(_.load) + val allStats = statsByTimestamp.values.flatten - val loads = statsByTimestamp.values.head.map(_.load) - val allStats = statsByTimestamp.values.flatten + val sb = new StringBuilder + sb.append(BaseUrl) + // bar chart + sb.append("cht=bvg") + sb.append("&") + // size + sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight) + sb.append("&") + // title + sb.append("chtt=").append(urlEncode(title)) + sb.append("&") + // axis locations + sb.append("chxt=y,x") + sb.append("&") + // labels + sb.append("chxl=1:|") + sb.append(loads.mkString("|")) + sb.append("&") - val sb = new StringBuilder - sb.append(BaseUrl) - // bar chart - sb.append("cht=bvg") - sb.append("&") - // size - sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight) - sb.append("&") - // title - sb.append("chtt=").append(urlEncode(title)) - sb.append("&") - // axis locations - sb.append("chxt=y,x") - sb.append("&") - // labels - sb.append("chxl=1:|") - sb.append(loads.mkString("|")) - sb.append("&") + // label color and font + //sb.append("chxs=2,D65D82,11.5,0,lt,D65D82") + //sb.append("&") - // label color and font - //sb.append("chxs=2,D65D82,11.5,0,lt,D65D82") - //sb.append("&") + // legend + val legendStats = statsByTimestamp.values.map(_.head).toSeq + appendLegend(legendStats, sb, legend) + sb.append("&") + // bar spacing + sb.append("chbh=a,4,20") + sb.append("&") + // bar colors + barColors(statsByTimestamp.size, sb) + sb.append("&") - // legend - val legendStats = statsByTimestamp.values.map(_.head).toSeq - appendLegend(legendStats, sb, legend) - sb.append("&") - // bar spacing - sb.append("chbh=a,4,20") - sb.append("&") - // bar colors - barColors(statsByTimestamp.size, sb) - sb.append("&") + // data series + val loadStr = loads.mkString(",") + sb.append("chd=t:") + val maxValue = allStats.map(_.tps).max + val tpsSeries: Iterable[String] = + for (statsSeq ← statsByTimestamp.values) yield { + statsSeq.map(_.tps).mkString(",") + } + sb.append(tpsSeries.mkString("|")) - // data series - val loadStr = loads.mkString(",") - sb.append("chd=t:") - val maxValue = allStats.map(_.tps).max - val tpsSeries: Iterable[String] = - for (statsSeq ← statsByTimestamp.values) yield { - statsSeq.map(_.tps).mkString(",") - } - sb.append(tpsSeries.mkString("|")) + // y range + sb.append("&") + sb.append("chxr=0,0,").append(maxValue) + sb.append("&") + sb.append("chds=0,").append(maxValue) + sb.append("&") - // y range - sb.append("&") - sb.append("chxr=0,0,").append(maxValue) - sb.append("&") - sb.append("chds=0,").append(maxValue) - sb.append("&") + // grid lines + appendGridSpacing(maxValue.toLong, sb) - // grid lines - appendGridSpacing(maxValue.toLong, sb) - - return sb.toString + sb.toString + } } /** * Builds a bar chart for all percentiles and the mean in the statistics. */ def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = { - if (statistics.isEmpty) return "" + if (statistics.isEmpty) "" + else { + val current = statistics.last - val current = statistics.last + val sb = new StringBuilder + sb.append(BaseUrl) + // bar chart + sb.append("cht=bvg") + sb.append("&") + // size + sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight) + sb.append("&") + // title + sb.append("chtt=").append(urlEncode(title)) + sb.append("&") + // axis locations + sb.append("chxt=y,x,y") + sb.append("&") + // labels + percentileLabels(current.percentiles, sb) + sb.append("|mean") + sb.append("|2:|min|mean|median") + sb.append("&") + // label positions + sb.append("chxp=2,").append(current.min).append(",").append(current.mean).append(",") + .append(current.median) + sb.append("&") + // label color and font + sb.append("chxs=2,D65D82,11.5,0,lt,D65D82") + sb.append("&") + // lines for min, mean, median + sb.append("chxtc=2,-1000") + sb.append("&") + // legend + appendLegend(statistics, sb, legend) + sb.append("&") + // bar spacing + sb.append("chbh=a,4,20") + sb.append("&") + // bar colors + barColors(statistics.size, sb) + sb.append("&") - val sb = new StringBuilder - sb.append(BaseUrl) - // bar chart - sb.append("cht=bvg") - sb.append("&") - // size - sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight) - sb.append("&") - // title - sb.append("chtt=").append(urlEncode(title)) - sb.append("&") - // axis locations - sb.append("chxt=y,x,y") - sb.append("&") - // labels - percentileLabels(current.percentiles, sb) - sb.append("|mean") - sb.append("|2:|min|mean|median") - sb.append("&") - // label positions - sb.append("chxp=2,").append(current.min).append(",").append(current.mean).append(",") - .append(current.median) - sb.append("&") - // label color and font - sb.append("chxs=2,D65D82,11.5,0,lt,D65D82") - sb.append("&") - // lines for min, mean, median - sb.append("chxtc=2,-1000") - sb.append("&") - // legend - appendLegend(statistics, sb, legend) - sb.append("&") - // bar spacing - sb.append("chbh=a,4,20") - sb.append("&") - // bar colors - barColors(statistics.size, sb) - sb.append("&") + // data series + val maxValue = statistics.map(_.percentiles.last._2).max + sb.append("chd=t:") + dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb) - // data series - val maxValue = statistics.map(_.percentiles.last._2).max - sb.append("chd=t:") - dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb) + // y range + sb.append("&") + sb.append("chxr=0,0,").append(maxValue).append("|2,0,").append(maxValue) + sb.append("&") + sb.append("chds=0,").append(maxValue) + sb.append("&") - // y range - sb.append("&") - sb.append("chxr=0,0,").append(maxValue).append("|2,0,").append(maxValue) - sb.append("&") - sb.append("chds=0,").append(maxValue) - sb.append("&") + // grid lines + appendGridSpacing(maxValue, sb) - // grid lines - appendGridSpacing(maxValue, sb) - - return sb.toString + sb.toString + } } private def percentileLabels(percentiles: TreeMap[Int, Long], sb: StringBuilder) { @@ -197,108 +199,109 @@ object GoogleChartBuilder { } def latencyAndThroughputChartUrl(statistics: Seq[Stats], title: String): String = { - if (statistics.isEmpty) return "" + if (statistics.isEmpty) "" + else { + val sb = new StringBuilder + sb.append(BaseUrl) + // line chart + sb.append("cht=lxy") + sb.append("&") + // size + sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight) + sb.append("&") + // title + sb.append("chtt=").append(urlEncode(title)) + sb.append("&") + // axis locations + sb.append("chxt=x,y,r,x,y,r") + sb.append("&") + // labels + sb.append("chxl=3:|clients|4:|Latency+(us)|5:|Throughput+(tps)") + sb.append("&") + // label color and font + sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767") + sb.append("&") + sb.append("chco=") + val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900") + sb.append(seriesColors.mkString(",")) + sb.append("&") + // legend + sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput") + sb.append("&") - val sb = new StringBuilder - sb.append(BaseUrl) - // line chart - sb.append("cht=lxy") - sb.append("&") - // size - sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight) - sb.append("&") - // title - sb.append("chtt=").append(urlEncode(title)) - sb.append("&") - // axis locations - sb.append("chxt=x,y,r,x,y,r") - sb.append("&") - // labels - sb.append("chxl=3:|clients|4:|Latency+(us)|5:|Throughput+(tps)") - sb.append("&") - // label color and font - sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767") - sb.append("&") - sb.append("chco=") - val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900") - sb.append(seriesColors.mkString(",")) - sb.append("&") - // legend - sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput") - sb.append("&") + sb.append("chdlp=b") + sb.append("&") - sb.append("chdlp=b") - sb.append("&") + sb.append("chls=1|1|1") + sb.append("&") - sb.append("chls=1|1|1") - sb.append("&") + sb.append("chls=1|1|1") + sb.append("&") - sb.append("chls=1|1|1") - sb.append("&") + // margins + sb.append("chma=5,5,5,25") + sb.append("&") - // margins - sb.append("chma=5,5,5,25") - sb.append("&") + // data points + sb.append("chm=") + val chmStr = seriesColors.zipWithIndex.map(each ⇒ "o," + each._1 + "," + each._2 + ",-1,7").mkString("|") + sb.append(chmStr) + sb.append("&") - // data points - sb.append("chm=") - val chmStr = seriesColors.zipWithIndex.map(each ⇒ "o," + each._1 + "," + each._2 + ",-1,7").mkString("|") - sb.append(chmStr) - sb.append("&") + // data series + val loadStr = statistics.map(_.load).mkString(",") + sb.append("chd=t:") + val maxP = 95 + val percentiles = List(5, 50, maxP) + val maxValue = statistics.map(_.percentiles(maxP)).max + val percentileSeries: List[String] = + for (p ← percentiles) yield { + loadStr + "|" + statistics.map(_.percentiles(p)).mkString(",") + } + sb.append(percentileSeries.mkString("|")) - // data series - val loadStr = statistics.map(_.load).mkString(",") - sb.append("chd=t:") - val maxP = 95 - val percentiles = List(5, 50, maxP) - val maxValue = statistics.map(_.percentiles(maxP)).max - val percentileSeries: List[String] = - for (p ← percentiles) yield { - loadStr + "|" + statistics.map(_.percentiles(p)).mkString(",") + sb.append("|") + sb.append(loadStr).append("|") + val meanSeries = statistics.map(s ⇒ formatDouble(s.mean)).mkString(",") + sb.append(meanSeries) + + sb.append("|") + val maxTps: Double = statistics.map(_.tps).max + sb.append(loadStr).append("|") + val tpsSeries = statistics.map(s ⇒ formatDouble(s.tps)).mkString(",") + sb.append(tpsSeries) + + val minLoad = statistics.head.load + val maxLoad = statistics.last.load + + // y range + sb.append("&") + sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,") + .append(formatDouble(maxTps)) + sb.append("&") + + sb.append("chds=") + for (p ← percentiles) { + sb.append(minLoad).append(",").append(maxLoad) + sb.append(",0,").append(maxValue) + sb.append(",") } - sb.append(percentileSeries.mkString("|")) - - sb.append("|") - sb.append(loadStr).append("|") - val meanSeries = statistics.map(s ⇒ formatDouble(s.mean)).mkString(",") - sb.append(meanSeries) - - sb.append("|") - val maxTps: Double = statistics.map(_.tps).max - sb.append(loadStr).append("|") - val tpsSeries = statistics.map(s ⇒ formatDouble(s.tps)).mkString(",") - sb.append(tpsSeries) - - val minLoad = statistics.head.load - val maxLoad = statistics.last.load - - // y range - sb.append("&") - sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,") - .append(formatDouble(maxTps)) - sb.append("&") - - sb.append("chds=") - for (p ← percentiles) { sb.append(minLoad).append(",").append(maxLoad) - sb.append(",0,").append(maxValue) + sb.append(",0,").append(formatDouble(maxValue)) sb.append(",") + sb.append(minLoad).append(",").append(maxLoad) + sb.append(",0,").append(formatDouble(maxTps)) + sb.append("&") + + // label positions + sb.append("chxp=3,").append("50").append("|4,").append("100").append("|5,").append("100") + sb.append("&") + + // grid lines + appendGridSpacing(maxValue, sb) + + sb.toString } - sb.append(minLoad).append(",").append(maxLoad) - sb.append(",0,").append(formatDouble(maxValue)) - sb.append(",") - sb.append(minLoad).append(",").append(maxLoad) - sb.append(",0,").append(formatDouble(maxTps)) - sb.append("&") - - // label positions - sb.append("chxp=3,").append("50").append("|4,").append("100").append("|5,").append("100") - sb.append("&") - - // grid lines - appendGridSpacing(maxValue, sb) - - return sb.toString } def formatDouble(value: Double): String = { diff --git a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala index c7c8308de0..613599fa8e 100644 --- a/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala +++ b/akka-actor/src/main/scala/akka/util/BoundedBlockingQueue.scala @@ -7,6 +7,7 @@ package akka.util import java.util.concurrent.locks.ReentrantLock import java.util.concurrent.{ TimeUnit, BlockingQueue } import java.util.{ AbstractQueue, Queue, Collection, Iterator } +import annotation.tailrec /** * BoundedBlockingQueue wraps any Queue and turns the result into a BlockingQueue with a limited capacity @@ -80,15 +81,17 @@ class BoundedBlockingQueue[E <: AnyRef]( var nanos = unit.toNanos(timeout) lock.lockInterruptibly() try { - while (backing.size() == maxCapacity) { - if (nanos <= 0) - return false - else - nanos = notFull.awaitNanos(nanos) - } - require(backing.offer(e)) //Should never fail - notEmpty.signal() - true + @tailrec def awaitNotFull(ns: Long): Boolean = + if (backing.size() == maxCapacity) { + if (ns > 0) awaitNotFull(notFull.awaitNanos(ns)) + else false + } else true + + if (awaitNotFull(nanos)) { + require(backing.offer(e)) //Should never fail + notEmpty.signal() + true + } else false } finally { lock.unlock() } @@ -208,17 +211,14 @@ class BoundedBlockingQueue[E <: AnyRef]( else { lock.lock() try { - var n = 0 - var e: E = null.asInstanceOf[E] - while (n < maxElements) { - backing.poll() match { - case null ⇒ return n - case e ⇒ - c add e - n += 1 - } - } - n + @tailrec def drainOne(n: Int): Int = + if (n < maxElements) { + backing.poll() match { + case null ⇒ n + case e ⇒ c add e; drainOne(n + 1) + } + } else n + drainOne(0) } finally { lock.unlock() } @@ -285,14 +285,14 @@ class BoundedBlockingQueue[E <: AnyRef]( last = -1 //To avoid 2 subsequent removes without a next in between lock.lock() try { - val i = backing.iterator() - while (i.hasNext) { + @tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit = if (i.hasNext) { if (i.next eq target) { i.remove() notFull.signal() - return () - } + } else removeTarget(i) } + + removeTarget() } finally { lock.unlock() } diff --git a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala index 4149a0b0b1..d07fff3a32 100644 --- a/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala +++ b/akka-actor/src/main/scala/akka/util/SubclassifiedIndex.scala @@ -79,14 +79,14 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc: * Add key to this index which inherits its value set from the most specific * super-class which is known. */ - def addKey(key: K): Changes = { - for (n ← subkeys) { - if (sc.isEqual(n.key, key)) return Nil - else if (sc.isSubclass(key, n.key)) return n.addKey(key) + def addKey(key: K): Changes = + subkeys collectFirst { + case n if sc.isEqual(n.key, key) ⇒ Nil + case n if sc.isSubclass(key, n.key) ⇒ n.addKey(key) + } getOrElse { + integrate(new Nonroot(key, values)) + List((key, values)) } - integrate(new Nonroot(key, values)) - (key, values) :: Nil - } /** * Add value to all keys which are subclasses of the given key. If the key diff --git a/akka-zeromq/src/main/scala/akka/zeromq/package.scala b/akka-zeromq/src/main/scala/akka/zeromq/package.scala index c10ad9eb85..2795505fa0 100644 --- a/akka-zeromq/src/main/scala/akka/zeromq/package.scala +++ b/akka-zeromq/src/main/scala/akka/zeromq/package.scala @@ -11,14 +11,6 @@ import actor.ActorSystem * A package object with an implicit conversion for the actor system as a convenience */ package object zeromq { - - /** - * Creates a zeromq actor system implicitly - * @param system - * @return An augmented [[akka.actor.ActorSystem]] - */ - implicit def zeromqSystem(system: ActorSystem): ZeroMQExtension = ZeroMQExtension(system) - /** * Convenience accessor to subscribe to all events */