Removing all uses of 'return' and removing the ZeroMQExtension implicit
This commit is contained in:
parent
f76306cc0c
commit
f20d6d1a08
6 changed files with 271 additions and 273 deletions
|
|
@ -22,6 +22,7 @@ import akka.pattern.ask
|
|||
import akka.testkit._
|
||||
import akka.util.{ Timeout, Switch, Duration }
|
||||
import akka.util.duration._
|
||||
import annotation.tailrec
|
||||
|
||||
object ActorModelSpec {
|
||||
|
||||
|
|
@ -224,16 +225,16 @@ object ActorModelSpec {
|
|||
}
|
||||
}
|
||||
|
||||
def await(until: Long)(condition: ⇒ Boolean): Unit = try {
|
||||
while (System.currentTimeMillis() <= until) {
|
||||
try {
|
||||
if (condition) return else Thread.sleep(25)
|
||||
} catch {
|
||||
case e: InterruptedException ⇒
|
||||
}
|
||||
def await(until: Long)(condition: ⇒ Boolean): Unit = if (System.currentTimeMillis() <= until) {
|
||||
var done = false
|
||||
try {
|
||||
done = condition
|
||||
if (!done) Thread.sleep(25)
|
||||
} catch {
|
||||
case e: InterruptedException ⇒
|
||||
}
|
||||
throw new AssertionError("await failed")
|
||||
}
|
||||
if (!done) await(until)(condition)
|
||||
} else throw new AssertionError("await failed")
|
||||
}
|
||||
|
||||
abstract class ActorModelSpec(config: String) extends AkkaSpec(config) with DefaultTimeout {
|
||||
|
|
|
|||
|
|
@ -102,20 +102,21 @@ class FileBenchResultRepository extends BenchResultRepository {
|
|||
|
||||
private def save(stats: Stats) {
|
||||
new File(serDir).mkdirs
|
||||
if (!serDirExists) return
|
||||
val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp))
|
||||
val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser"
|
||||
val f = new File(serDir, name)
|
||||
var out: ObjectOutputStream = null
|
||||
try {
|
||||
out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f)))
|
||||
out.writeObject(stats)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
val errMsg = "Failed to save [%s] to [%s], due to [%s]".format(stats, f.getAbsolutePath, e.getMessage)
|
||||
throw new RuntimeException(errMsg)
|
||||
} finally {
|
||||
if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ }
|
||||
if (serDirExists) {
|
||||
val timestamp = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date(stats.timestamp))
|
||||
val name = stats.name + "--" + timestamp + "--" + stats.load + ".ser"
|
||||
val f = new File(serDir, name)
|
||||
var out: ObjectOutputStream = null
|
||||
try {
|
||||
out = new ObjectOutputStream(new BufferedOutputStream(new FileOutputStream(f)))
|
||||
out.writeObject(stats)
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
val errMsg = "Failed to save [%s] to [%s], due to [%s]".format(stats, f.getAbsolutePath, e.getMessage)
|
||||
throw new RuntimeException(errMsg)
|
||||
} finally {
|
||||
if (out ne null) try { out.close() } catch { case ignore: Exception ⇒ }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -142,19 +143,20 @@ class FileBenchResultRepository extends BenchResultRepository {
|
|||
|
||||
def saveHtmlReport(content: String, fileName: String) {
|
||||
new File(htmlDir).mkdirs
|
||||
if (!htmlDirExists) return
|
||||
val f = new File(htmlDir, fileName)
|
||||
var writer: PrintWriter = null
|
||||
try {
|
||||
writer = new PrintWriter(new FileWriter(f))
|
||||
writer.print(content)
|
||||
writer.flush()
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
val errMsg = "Failed to save report to [%s], due to [%s]".format(f.getAbsolutePath, e.getMessage)
|
||||
throw new RuntimeException(errMsg)
|
||||
} finally {
|
||||
if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ }
|
||||
if (htmlDirExists) {
|
||||
val f = new File(htmlDir, fileName)
|
||||
var writer: PrintWriter = null
|
||||
try {
|
||||
writer = new PrintWriter(new FileWriter(f))
|
||||
writer.print(content)
|
||||
writer.flush()
|
||||
} catch {
|
||||
case e: Exception ⇒
|
||||
val errMsg = "Failed to save report to [%s], due to [%s]".format(f.getAbsolutePath, e.getMessage)
|
||||
throw new RuntimeException(errMsg)
|
||||
} finally {
|
||||
if (writer ne null) try { writer.close() } catch { case ignore: Exception ⇒ }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -17,131 +17,133 @@ object GoogleChartBuilder {
|
|||
* Builds a bar chart for tps in the statistics.
|
||||
*/
|
||||
def tpsChartUrl(statsByTimestamp: TreeMap[Long, Seq[Stats]], title: String, legend: Stats ⇒ String): String = {
|
||||
if (statsByTimestamp.isEmpty) return ""
|
||||
if (statsByTimestamp.isEmpty) ""
|
||||
else {
|
||||
val loads = statsByTimestamp.values.head.map(_.load)
|
||||
val allStats = statsByTimestamp.values.flatten
|
||||
|
||||
val loads = statsByTimestamp.values.head.map(_.load)
|
||||
val allStats = statsByTimestamp.values.flatten
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// bar chart
|
||||
sb.append("cht=bvg")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=y,x")
|
||||
sb.append("&")
|
||||
// labels
|
||||
sb.append("chxl=1:|")
|
||||
sb.append(loads.mkString("|"))
|
||||
sb.append("&")
|
||||
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// bar chart
|
||||
sb.append("cht=bvg")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=y,x")
|
||||
sb.append("&")
|
||||
// labels
|
||||
sb.append("chxl=1:|")
|
||||
sb.append(loads.mkString("|"))
|
||||
sb.append("&")
|
||||
// label color and font
|
||||
//sb.append("chxs=2,D65D82,11.5,0,lt,D65D82")
|
||||
//sb.append("&")
|
||||
|
||||
// label color and font
|
||||
//sb.append("chxs=2,D65D82,11.5,0,lt,D65D82")
|
||||
//sb.append("&")
|
||||
// legend
|
||||
val legendStats = statsByTimestamp.values.map(_.head).toSeq
|
||||
appendLegend(legendStats, sb, legend)
|
||||
sb.append("&")
|
||||
// bar spacing
|
||||
sb.append("chbh=a,4,20")
|
||||
sb.append("&")
|
||||
// bar colors
|
||||
barColors(statsByTimestamp.size, sb)
|
||||
sb.append("&")
|
||||
|
||||
// legend
|
||||
val legendStats = statsByTimestamp.values.map(_.head).toSeq
|
||||
appendLegend(legendStats, sb, legend)
|
||||
sb.append("&")
|
||||
// bar spacing
|
||||
sb.append("chbh=a,4,20")
|
||||
sb.append("&")
|
||||
// bar colors
|
||||
barColors(statsByTimestamp.size, sb)
|
||||
sb.append("&")
|
||||
// data series
|
||||
val loadStr = loads.mkString(",")
|
||||
sb.append("chd=t:")
|
||||
val maxValue = allStats.map(_.tps).max
|
||||
val tpsSeries: Iterable[String] =
|
||||
for (statsSeq ← statsByTimestamp.values) yield {
|
||||
statsSeq.map(_.tps).mkString(",")
|
||||
}
|
||||
sb.append(tpsSeries.mkString("|"))
|
||||
|
||||
// data series
|
||||
val loadStr = loads.mkString(",")
|
||||
sb.append("chd=t:")
|
||||
val maxValue = allStats.map(_.tps).max
|
||||
val tpsSeries: Iterable[String] =
|
||||
for (statsSeq ← statsByTimestamp.values) yield {
|
||||
statsSeq.map(_.tps).mkString(",")
|
||||
}
|
||||
sb.append(tpsSeries.mkString("|"))
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,0,").append(maxValue)
|
||||
sb.append("&")
|
||||
sb.append("chds=0,").append(maxValue)
|
||||
sb.append("&")
|
||||
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,0,").append(maxValue)
|
||||
sb.append("&")
|
||||
sb.append("chds=0,").append(maxValue)
|
||||
sb.append("&")
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue.toLong, sb)
|
||||
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue.toLong, sb)
|
||||
|
||||
return sb.toString
|
||||
sb.toString
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Builds a bar chart for all percentiles and the mean in the statistics.
|
||||
*/
|
||||
def percentilesAndMeanChartUrl(statistics: Seq[Stats], title: String, legend: Stats ⇒ String): String = {
|
||||
if (statistics.isEmpty) return ""
|
||||
if (statistics.isEmpty) ""
|
||||
else {
|
||||
val current = statistics.last
|
||||
|
||||
val current = statistics.last
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// bar chart
|
||||
sb.append("cht=bvg")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=y,x,y")
|
||||
sb.append("&")
|
||||
// labels
|
||||
percentileLabels(current.percentiles, sb)
|
||||
sb.append("|mean")
|
||||
sb.append("|2:|min|mean|median")
|
||||
sb.append("&")
|
||||
// label positions
|
||||
sb.append("chxp=2,").append(current.min).append(",").append(current.mean).append(",")
|
||||
.append(current.median)
|
||||
sb.append("&")
|
||||
// label color and font
|
||||
sb.append("chxs=2,D65D82,11.5,0,lt,D65D82")
|
||||
sb.append("&")
|
||||
// lines for min, mean, median
|
||||
sb.append("chxtc=2,-1000")
|
||||
sb.append("&")
|
||||
// legend
|
||||
appendLegend(statistics, sb, legend)
|
||||
sb.append("&")
|
||||
// bar spacing
|
||||
sb.append("chbh=a,4,20")
|
||||
sb.append("&")
|
||||
// bar colors
|
||||
barColors(statistics.size, sb)
|
||||
sb.append("&")
|
||||
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// bar chart
|
||||
sb.append("cht=bvg")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=y,x,y")
|
||||
sb.append("&")
|
||||
// labels
|
||||
percentileLabels(current.percentiles, sb)
|
||||
sb.append("|mean")
|
||||
sb.append("|2:|min|mean|median")
|
||||
sb.append("&")
|
||||
// label positions
|
||||
sb.append("chxp=2,").append(current.min).append(",").append(current.mean).append(",")
|
||||
.append(current.median)
|
||||
sb.append("&")
|
||||
// label color and font
|
||||
sb.append("chxs=2,D65D82,11.5,0,lt,D65D82")
|
||||
sb.append("&")
|
||||
// lines for min, mean, median
|
||||
sb.append("chxtc=2,-1000")
|
||||
sb.append("&")
|
||||
// legend
|
||||
appendLegend(statistics, sb, legend)
|
||||
sb.append("&")
|
||||
// bar spacing
|
||||
sb.append("chbh=a,4,20")
|
||||
sb.append("&")
|
||||
// bar colors
|
||||
barColors(statistics.size, sb)
|
||||
sb.append("&")
|
||||
// data series
|
||||
val maxValue = statistics.map(_.percentiles.last._2).max
|
||||
sb.append("chd=t:")
|
||||
dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb)
|
||||
|
||||
// data series
|
||||
val maxValue = statistics.map(_.percentiles.last._2).max
|
||||
sb.append("chd=t:")
|
||||
dataSeries(statistics.map(_.percentiles), statistics.map(_.mean), sb)
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,0,").append(maxValue).append("|2,0,").append(maxValue)
|
||||
sb.append("&")
|
||||
sb.append("chds=0,").append(maxValue)
|
||||
sb.append("&")
|
||||
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,0,").append(maxValue).append("|2,0,").append(maxValue)
|
||||
sb.append("&")
|
||||
sb.append("chds=0,").append(maxValue)
|
||||
sb.append("&")
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue, sb)
|
||||
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue, sb)
|
||||
|
||||
return sb.toString
|
||||
sb.toString
|
||||
}
|
||||
}
|
||||
|
||||
private def percentileLabels(percentiles: TreeMap[Int, Long], sb: StringBuilder) {
|
||||
|
|
@ -197,108 +199,109 @@ object GoogleChartBuilder {
|
|||
}
|
||||
|
||||
def latencyAndThroughputChartUrl(statistics: Seq[Stats], title: String): String = {
|
||||
if (statistics.isEmpty) return ""
|
||||
if (statistics.isEmpty) ""
|
||||
else {
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// line chart
|
||||
sb.append("cht=lxy")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=x,y,r,x,y,r")
|
||||
sb.append("&")
|
||||
// labels
|
||||
sb.append("chxl=3:|clients|4:|Latency+(us)|5:|Throughput+(tps)")
|
||||
sb.append("&")
|
||||
// label color and font
|
||||
sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767")
|
||||
sb.append("&")
|
||||
sb.append("chco=")
|
||||
val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900")
|
||||
sb.append(seriesColors.mkString(","))
|
||||
sb.append("&")
|
||||
// legend
|
||||
sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput")
|
||||
sb.append("&")
|
||||
|
||||
val sb = new StringBuilder
|
||||
sb.append(BaseUrl)
|
||||
// line chart
|
||||
sb.append("cht=lxy")
|
||||
sb.append("&")
|
||||
// size
|
||||
sb.append("chs=").append(ChartWidth).append("x").append(ChartHeight)
|
||||
sb.append("&")
|
||||
// title
|
||||
sb.append("chtt=").append(urlEncode(title))
|
||||
sb.append("&")
|
||||
// axis locations
|
||||
sb.append("chxt=x,y,r,x,y,r")
|
||||
sb.append("&")
|
||||
// labels
|
||||
sb.append("chxl=3:|clients|4:|Latency+(us)|5:|Throughput+(tps)")
|
||||
sb.append("&")
|
||||
// label color and font
|
||||
sb.append("chxs=0,676767,11.5,0,lt,676767|1,676767,11.5,0,lt,676767|2,676767,11.5,0,lt,676767")
|
||||
sb.append("&")
|
||||
sb.append("chco=")
|
||||
val seriesColors = List("25B33B", "3072F3", "FF0000", "37F0ED", "FF9900")
|
||||
sb.append(seriesColors.mkString(","))
|
||||
sb.append("&")
|
||||
// legend
|
||||
sb.append("chdl=5th%20Percentile|Median|95th%20Percentile|Mean|Throughput")
|
||||
sb.append("&")
|
||||
sb.append("chdlp=b")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chdlp=b")
|
||||
sb.append("&")
|
||||
sb.append("chls=1|1|1")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chls=1|1|1")
|
||||
sb.append("&")
|
||||
sb.append("chls=1|1|1")
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chls=1|1|1")
|
||||
sb.append("&")
|
||||
// margins
|
||||
sb.append("chma=5,5,5,25")
|
||||
sb.append("&")
|
||||
|
||||
// margins
|
||||
sb.append("chma=5,5,5,25")
|
||||
sb.append("&")
|
||||
// data points
|
||||
sb.append("chm=")
|
||||
val chmStr = seriesColors.zipWithIndex.map(each ⇒ "o," + each._1 + "," + each._2 + ",-1,7").mkString("|")
|
||||
sb.append(chmStr)
|
||||
sb.append("&")
|
||||
|
||||
// data points
|
||||
sb.append("chm=")
|
||||
val chmStr = seriesColors.zipWithIndex.map(each ⇒ "o," + each._1 + "," + each._2 + ",-1,7").mkString("|")
|
||||
sb.append(chmStr)
|
||||
sb.append("&")
|
||||
// data series
|
||||
val loadStr = statistics.map(_.load).mkString(",")
|
||||
sb.append("chd=t:")
|
||||
val maxP = 95
|
||||
val percentiles = List(5, 50, maxP)
|
||||
val maxValue = statistics.map(_.percentiles(maxP)).max
|
||||
val percentileSeries: List[String] =
|
||||
for (p ← percentiles) yield {
|
||||
loadStr + "|" + statistics.map(_.percentiles(p)).mkString(",")
|
||||
}
|
||||
sb.append(percentileSeries.mkString("|"))
|
||||
|
||||
// data series
|
||||
val loadStr = statistics.map(_.load).mkString(",")
|
||||
sb.append("chd=t:")
|
||||
val maxP = 95
|
||||
val percentiles = List(5, 50, maxP)
|
||||
val maxValue = statistics.map(_.percentiles(maxP)).max
|
||||
val percentileSeries: List[String] =
|
||||
for (p ← percentiles) yield {
|
||||
loadStr + "|" + statistics.map(_.percentiles(p)).mkString(",")
|
||||
sb.append("|")
|
||||
sb.append(loadStr).append("|")
|
||||
val meanSeries = statistics.map(s ⇒ formatDouble(s.mean)).mkString(",")
|
||||
sb.append(meanSeries)
|
||||
|
||||
sb.append("|")
|
||||
val maxTps: Double = statistics.map(_.tps).max
|
||||
sb.append(loadStr).append("|")
|
||||
val tpsSeries = statistics.map(s ⇒ formatDouble(s.tps)).mkString(",")
|
||||
sb.append(tpsSeries)
|
||||
|
||||
val minLoad = statistics.head.load
|
||||
val maxLoad = statistics.last.load
|
||||
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,")
|
||||
.append(formatDouble(maxTps))
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chds=")
|
||||
for (p ← percentiles) {
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(maxValue)
|
||||
sb.append(",")
|
||||
}
|
||||
sb.append(percentileSeries.mkString("|"))
|
||||
|
||||
sb.append("|")
|
||||
sb.append(loadStr).append("|")
|
||||
val meanSeries = statistics.map(s ⇒ formatDouble(s.mean)).mkString(",")
|
||||
sb.append(meanSeries)
|
||||
|
||||
sb.append("|")
|
||||
val maxTps: Double = statistics.map(_.tps).max
|
||||
sb.append(loadStr).append("|")
|
||||
val tpsSeries = statistics.map(s ⇒ formatDouble(s.tps)).mkString(",")
|
||||
sb.append(tpsSeries)
|
||||
|
||||
val minLoad = statistics.head.load
|
||||
val maxLoad = statistics.last.load
|
||||
|
||||
// y range
|
||||
sb.append("&")
|
||||
sb.append("chxr=0,").append(minLoad).append(",").append(maxLoad).append(",4").append("|1,0,").append(maxValue).append("|2,0,")
|
||||
.append(formatDouble(maxTps))
|
||||
sb.append("&")
|
||||
|
||||
sb.append("chds=")
|
||||
for (p ← percentiles) {
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(maxValue)
|
||||
sb.append(",0,").append(formatDouble(maxValue))
|
||||
sb.append(",")
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(formatDouble(maxTps))
|
||||
sb.append("&")
|
||||
|
||||
// label positions
|
||||
sb.append("chxp=3,").append("50").append("|4,").append("100").append("|5,").append("100")
|
||||
sb.append("&")
|
||||
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue, sb)
|
||||
|
||||
sb.toString
|
||||
}
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(formatDouble(maxValue))
|
||||
sb.append(",")
|
||||
sb.append(minLoad).append(",").append(maxLoad)
|
||||
sb.append(",0,").append(formatDouble(maxTps))
|
||||
sb.append("&")
|
||||
|
||||
// label positions
|
||||
sb.append("chxp=3,").append("50").append("|4,").append("100").append("|5,").append("100")
|
||||
sb.append("&")
|
||||
|
||||
// grid lines
|
||||
appendGridSpacing(maxValue, sb)
|
||||
|
||||
return sb.toString
|
||||
}
|
||||
|
||||
def formatDouble(value: Double): String = {
|
||||
|
|
|
|||
|
|
@ -7,6 +7,7 @@ package akka.util
|
|||
import java.util.concurrent.locks.ReentrantLock
|
||||
import java.util.concurrent.{ TimeUnit, BlockingQueue }
|
||||
import java.util.{ AbstractQueue, Queue, Collection, Iterator }
|
||||
import annotation.tailrec
|
||||
|
||||
/**
|
||||
* BoundedBlockingQueue wraps any Queue and turns the result into a BlockingQueue with a limited capacity
|
||||
|
|
@ -80,15 +81,17 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
var nanos = unit.toNanos(timeout)
|
||||
lock.lockInterruptibly()
|
||||
try {
|
||||
while (backing.size() == maxCapacity) {
|
||||
if (nanos <= 0)
|
||||
return false
|
||||
else
|
||||
nanos = notFull.awaitNanos(nanos)
|
||||
}
|
||||
require(backing.offer(e)) //Should never fail
|
||||
notEmpty.signal()
|
||||
true
|
||||
@tailrec def awaitNotFull(ns: Long): Boolean =
|
||||
if (backing.size() == maxCapacity) {
|
||||
if (ns > 0) awaitNotFull(notFull.awaitNanos(ns))
|
||||
else false
|
||||
} else true
|
||||
|
||||
if (awaitNotFull(nanos)) {
|
||||
require(backing.offer(e)) //Should never fail
|
||||
notEmpty.signal()
|
||||
true
|
||||
} else false
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
|
|
@ -208,17 +211,14 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
else {
|
||||
lock.lock()
|
||||
try {
|
||||
var n = 0
|
||||
var e: E = null.asInstanceOf[E]
|
||||
while (n < maxElements) {
|
||||
backing.poll() match {
|
||||
case null ⇒ return n
|
||||
case e ⇒
|
||||
c add e
|
||||
n += 1
|
||||
}
|
||||
}
|
||||
n
|
||||
@tailrec def drainOne(n: Int): Int =
|
||||
if (n < maxElements) {
|
||||
backing.poll() match {
|
||||
case null ⇒ n
|
||||
case e ⇒ c add e; drainOne(n + 1)
|
||||
}
|
||||
} else n
|
||||
drainOne(0)
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
|
|
@ -285,14 +285,14 @@ class BoundedBlockingQueue[E <: AnyRef](
|
|||
last = -1 //To avoid 2 subsequent removes without a next in between
|
||||
lock.lock()
|
||||
try {
|
||||
val i = backing.iterator()
|
||||
while (i.hasNext) {
|
||||
@tailrec def removeTarget(i: Iterator[E] = backing.iterator()): Unit = if (i.hasNext) {
|
||||
if (i.next eq target) {
|
||||
i.remove()
|
||||
notFull.signal()
|
||||
return ()
|
||||
}
|
||||
} else removeTarget(i)
|
||||
}
|
||||
|
||||
removeTarget()
|
||||
} finally {
|
||||
lock.unlock()
|
||||
}
|
||||
|
|
|
|||
|
|
@ -79,14 +79,14 @@ class SubclassifiedIndex[K, V] private (private var values: Set[V])(implicit sc:
|
|||
* Add key to this index which inherits its value set from the most specific
|
||||
* super-class which is known.
|
||||
*/
|
||||
def addKey(key: K): Changes = {
|
||||
for (n ← subkeys) {
|
||||
if (sc.isEqual(n.key, key)) return Nil
|
||||
else if (sc.isSubclass(key, n.key)) return n.addKey(key)
|
||||
def addKey(key: K): Changes =
|
||||
subkeys collectFirst {
|
||||
case n if sc.isEqual(n.key, key) ⇒ Nil
|
||||
case n if sc.isSubclass(key, n.key) ⇒ n.addKey(key)
|
||||
} getOrElse {
|
||||
integrate(new Nonroot(key, values))
|
||||
List((key, values))
|
||||
}
|
||||
integrate(new Nonroot(key, values))
|
||||
(key, values) :: Nil
|
||||
}
|
||||
|
||||
/**
|
||||
* Add value to all keys which are subclasses of the given key. If the key
|
||||
|
|
|
|||
|
|
@ -11,14 +11,6 @@ import actor.ActorSystem
|
|||
* A package object with an implicit conversion for the actor system as a convenience
|
||||
*/
|
||||
package object zeromq {
|
||||
|
||||
/**
|
||||
* Creates a zeromq actor system implicitly
|
||||
* @param system
|
||||
* @return An augmented [[akka.actor.ActorSystem]]
|
||||
*/
|
||||
implicit def zeromqSystem(system: ActorSystem): ZeroMQExtension = ZeroMQExtension(system)
|
||||
|
||||
/**
|
||||
* Convenience accessor to subscribe to all events
|
||||
*/
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue