20623 Make sure external (mapped) resources are properly cleaned on shutdown
This commit is contained in:
parent
e49b11607a
commit
0d77034adc
6 changed files with 62 additions and 21 deletions
|
|
@ -88,8 +88,9 @@ abstract class AeronStreamLatencySpec
|
|||
|
||||
val pool = new EnvelopeBufferPool(1024 * 1024, 128)
|
||||
|
||||
val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc");
|
||||
val stats =
|
||||
new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)))
|
||||
new AeronStat(AeronStat.mapCounters(cncByteBuffer))
|
||||
|
||||
val aeron = {
|
||||
val ctx = new Aeron.Context
|
||||
|
|
@ -129,6 +130,7 @@ abstract class AeronStreamLatencySpec
|
|||
taskRunner.stop()
|
||||
aeron.close()
|
||||
driver.close()
|
||||
IoUtil.unmap(cncByteBuffer)
|
||||
IoUtil.delete(new File(driver.aeronDirectoryName), true)
|
||||
runOn(first) {
|
||||
println(plots.plot50.csv(system.name + "50"))
|
||||
|
|
|
|||
|
|
@ -86,8 +86,9 @@ abstract class AeronStreamMaxThroughputSpec
|
|||
|
||||
val pool = new EnvelopeBufferPool(1024 * 1024, 128)
|
||||
|
||||
val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc");
|
||||
val stats =
|
||||
new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)))
|
||||
new AeronStat(AeronStat.mapCounters(cncByteBuffer))
|
||||
|
||||
val aeron = {
|
||||
val ctx = new Aeron.Context
|
||||
|
|
@ -129,6 +130,7 @@ abstract class AeronStreamMaxThroughputSpec
|
|||
taskRunner.stop()
|
||||
aeron.close()
|
||||
driver.close()
|
||||
IoUtil.unmap(cncByteBuffer)
|
||||
IoUtil.delete(new File(driver.aeronDirectoryName), true)
|
||||
runOn(second) {
|
||||
println(plot.csv(system.name))
|
||||
|
|
|
|||
|
|
@ -4,8 +4,9 @@
|
|||
package akka.remote.artery
|
||||
|
||||
import java.util.concurrent.Executors
|
||||
import java.util.concurrent.atomic.AtomicLongArray
|
||||
import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLongArray }
|
||||
import java.util.concurrent.locks.LockSupport
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor._
|
||||
import akka.remote.testconductor.RoleName
|
||||
|
|
@ -83,6 +84,7 @@ object LatencySpec extends MultiNodeConfig {
|
|||
var count = 0
|
||||
var startTime = System.nanoTime()
|
||||
val taskRunnerMetrics = new TaskRunnerMetrics(context.system)
|
||||
var reportedArrayOOB = false
|
||||
|
||||
def receive = {
|
||||
case bytes: Array[Byte] ⇒
|
||||
|
|
@ -100,7 +102,16 @@ object LatencySpec extends MultiNodeConfig {
|
|||
reporter.onMessage(1, payloadSize)
|
||||
count += 1
|
||||
val d = System.nanoTime() - sendTimes.get(count - 1)
|
||||
histogram.recordValue(d)
|
||||
try {
|
||||
histogram.recordValue(d)
|
||||
} catch {
|
||||
case e: ArrayIndexOutOfBoundsException ⇒
|
||||
// Report it only once instead of flooding the console
|
||||
if (!reportedArrayOOB) {
|
||||
e.printStackTrace()
|
||||
reportedArrayOOB = true
|
||||
}
|
||||
}
|
||||
if (count == totalMessages) {
|
||||
printTotal(testName, size, histogram, System.nanoTime() - startTime)
|
||||
context.stop(self)
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue