From 0d77034adc61c98144cb11ccc62985dc6304bd45 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Endre=20S=C3=A1ndor=20Varga?= Date: Fri, 9 Sep 2016 14:29:04 +0200 Subject: [PATCH] 20623 Make sure external (mapped) resources are properly cleaned on shutdown --- .../artery/AeronStreamLatencySpec.scala | 4 ++- .../artery/AeronStreamMaxThroughputSpec.scala | 4 ++- .../akka/remote/artery/LatencySpec.scala | 15 +++++++-- .../akka/remote/artery/AeronErrorLog.java | 33 ++++++++++++------- .../akka/remote/artery/ArteryTransport.scala | 6 ++-- .../java/akka/remote/artery/AeronStat.java | 21 ++++++++++-- 6 files changed, 62 insertions(+), 21 deletions(-) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 4b0fd67dd0..ef840e78cd 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -88,8 +88,9 @@ abstract class AeronStreamLatencySpec val pool = new EnvelopeBufferPool(1024 * 1024, 128) + val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc"); val stats = - new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) + new AeronStat(AeronStat.mapCounters(cncByteBuffer)) val aeron = { val ctx = new Aeron.Context @@ -129,6 +130,7 @@ abstract class AeronStreamLatencySpec taskRunner.stop() aeron.close() driver.close() + IoUtil.unmap(cncByteBuffer) IoUtil.delete(new File(driver.aeronDirectoryName), true) runOn(first) { println(plots.plot50.csv(system.name + "50")) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index a1f9a7ee4d..1f9875afee 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -86,8 +86,9 @@ abstract class AeronStreamMaxThroughputSpec val pool = new EnvelopeBufferPool(1024 * 1024, 128) + val cncByteBuffer = IoUtil.mapExistingFile(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE), "cnc"); val stats = - new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) + new AeronStat(AeronStat.mapCounters(cncByteBuffer)) val aeron = { val ctx = new Aeron.Context @@ -129,6 +130,7 @@ abstract class AeronStreamMaxThroughputSpec taskRunner.stop() aeron.close() driver.close() + IoUtil.unmap(cncByteBuffer) IoUtil.delete(new File(driver.aeronDirectoryName), true) runOn(second) { println(plot.csv(system.name)) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index c2251ed9f4..f8e1afe19b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -4,8 +4,9 @@ package akka.remote.artery import java.util.concurrent.Executors -import java.util.concurrent.atomic.AtomicLongArray +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLongArray } import java.util.concurrent.locks.LockSupport + import scala.concurrent.duration._ import akka.actor._ import akka.remote.testconductor.RoleName @@ -83,6 +84,7 @@ object LatencySpec extends MultiNodeConfig { var count = 0 var startTime = System.nanoTime() val taskRunnerMetrics = new TaskRunnerMetrics(context.system) + var reportedArrayOOB = false def receive = { case bytes: Array[Byte] ⇒ @@ -100,7 +102,16 @@ object LatencySpec extends MultiNodeConfig { reporter.onMessage(1, payloadSize) count += 1 val d = System.nanoTime() - sendTimes.get(count - 1) - histogram.recordValue(d) + try { + histogram.recordValue(d) + } catch { + case e: ArrayIndexOutOfBoundsException ⇒ + // Report it only once instead of flooding the console + if (!reportedArrayOOB) { + e.printStackTrace() + reportedArrayOOB = true + } + } if (count == totalMessages) { printTotal(testName, size, histogram, System.nanoTime() - startTime) context.stop(self) diff --git a/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java b/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java index edee08f987..87f0389594 100644 --- a/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java +++ b/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java @@ -37,26 +37,31 @@ import java.util.concurrent.atomic.AtomicLong; public class AeronErrorLog { private final File cncFile; + final MappedByteBuffer cncByteBuffer; + final DirectBuffer cncMetaDataBuffer; + final int cncVersion; + final AtomicBuffer buffer; + final SimpleDateFormat dateFormat; public AeronErrorLog(File cncFile) { this.cncFile = cncFile; + cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc"); + cncMetaDataBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer); + cncVersion = cncMetaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0)); + buffer = CncFileDescriptor.createErrorLogBuffer(cncByteBuffer, cncMetaDataBuffer); + dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ"); + + + if (CncFileDescriptor.CNC_VERSION != cncVersion) + { + IoUtil.unmap(cncByteBuffer); + throw new IllegalStateException("CNC version not supported: file version=" + cncVersion); + } } public long logErrors(LoggingAdapter log, long sinceTimestamp) { - final MappedByteBuffer cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc"); - final DirectBuffer cncMetaDataBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer); - final int cncVersion = cncMetaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0)); - - if (CncFileDescriptor.CNC_VERSION != cncVersion) - { - throw new IllegalStateException("CNC version not supported: file version=" + cncVersion); - } - - final AtomicBuffer buffer = CncFileDescriptor.createErrorLogBuffer(cncByteBuffer, cncMetaDataBuffer); - final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ"); - // using AtomicLong because access from lambda, not because of currency final AtomicLong lastTimestamp = new AtomicLong(sinceTimestamp); @@ -73,4 +78,8 @@ public class AeronErrorLog }, sinceTimestamp); return lastTimestamp.get(); } + + public void close() { + IoUtil.unmap(cncByteBuffer); + } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 8cbbcbba9b..a0c19bf561 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -294,6 +294,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R private[this] val mediaDriver = new AtomicReference[Option[MediaDriver]](None) @volatile private[this] var aeron: Aeron = _ @volatile private[this] var aeronErrorLogTask: Cancellable = _ + @volatile private[this] var areonErrorLog: AeronErrorLog = _ @volatile private[this] var inboundCompressions: Option[InboundCompressions] = None @@ -541,12 +542,12 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R // TODO Add FR Events private def startAeronErrorLog(): Unit = { - val errorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE)) + areonErrorLog = new AeronErrorLog(new File(aeronDir, CncFileDescriptor.CNC_FILE)) val lastTimestamp = new AtomicLong(0L) import system.dispatcher aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { if (!isShutdown) { - val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get) + val newLastTimestamp = areonErrorLog.logErrors(log, lastTimestamp.get) lastTimestamp.set(newLastTimestamp + 1) } } @@ -757,6 +758,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R topLevelFREvents.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) } if (aeron != null) aeron.close() + if (areonErrorLog != null) areonErrorLog.close() if (mediaDriver.get.isDefined) { stopMediaDriver() topLevelFREvents.loFreq(Transport_MediaFileDeleted, NoMetaData) diff --git a/akka-remote/src/test/java/akka/remote/artery/AeronStat.java b/akka-remote/src/test/java/akka/remote/artery/AeronStat.java index feab499287..3b63943f12 100644 --- a/akka-remote/src/test/java/akka/remote/artery/AeronStat.java +++ b/akka-remote/src/test/java/akka/remote/artery/AeronStat.java @@ -117,7 +117,22 @@ public class AeronStat { return mapCounters(CommonContext.newDefaultCncFile()); } - + + public static CountersReader mapCounters(final MappedByteBuffer cncByteBuffer) + { + final DirectBuffer cncMetaData = createMetaDataBuffer(cncByteBuffer); + final int cncVersion = cncMetaData.getInt(cncVersionOffset(0)); + + if (CncFileDescriptor.CNC_VERSION != cncVersion) + { + throw new IllegalStateException("CnC version not supported: file version=" + cncVersion); + } + + return new CountersReader( + createCountersMetaDataBuffer(cncByteBuffer, cncMetaData), + createCountersValuesBuffer(cncByteBuffer, cncMetaData)); + } + public static CountersReader mapCounters(final File cncFile) { System.out.println("Command `n Control file " + cncFile); @@ -132,8 +147,8 @@ public class AeronStat } return new CountersReader( - createCountersMetaDataBuffer(cncByteBuffer, cncMetaData), - createCountersValuesBuffer(cncByteBuffer, cncMetaData)); + createCountersMetaDataBuffer(cncByteBuffer, cncMetaData), + createCountersValuesBuffer(cncByteBuffer, cncMetaData)); } public static void main(final String[] args) throws Exception