From 76b29a35e0e4ae5c44344c05c6c07e2d84732685 Mon Sep 17 00:00:00 2001 From: Patrik Nordwall Date: Wed, 18 May 2016 13:34:51 +0200 Subject: [PATCH] log Aeron errors, #20317 --- .../akka/remote/artery/AeronErrorLog.java | 76 +++++++++++++++++++ .../akka/remote/artery/ArteryTransport.scala | 18 +++++ 2 files changed, 94 insertions(+) create mode 100644 akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java diff --git a/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java b/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java new file mode 100644 index 0000000000..edee08f987 --- /dev/null +++ b/akka-remote/src/main/java/akka/remote/artery/AeronErrorLog.java @@ -0,0 +1,76 @@ +/* + * Copyright 2014 - 2016 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package akka.remote.artery; + +import io.aeron.CncFileDescriptor; +import org.agrona.DirectBuffer; +import org.agrona.IoUtil; +import org.agrona.concurrent.AtomicBuffer; +import org.agrona.concurrent.errors.ErrorLogReader; + +import akka.event.LoggingAdapter; + +import java.io.File; +import java.nio.MappedByteBuffer; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Application to print out errors recorded in the command-and-control (cnc) file is maintained by media driver in shared + * memory. This application reads the the cnc file and prints the distinct errors. Layout of the cnc file is described in + * {@link CncFileDescriptor}. + */ +public class AeronErrorLog +{ + private final File cncFile; + + public AeronErrorLog(File cncFile) + { + this.cncFile = cncFile; + } + + public long logErrors(LoggingAdapter log, long sinceTimestamp) + { + final MappedByteBuffer cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc"); + final DirectBuffer cncMetaDataBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer); + final int cncVersion = cncMetaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0)); + + if (CncFileDescriptor.CNC_VERSION != cncVersion) + { + throw new IllegalStateException("CNC version not supported: file version=" + cncVersion); + } + + final AtomicBuffer buffer = CncFileDescriptor.createErrorLogBuffer(cncByteBuffer, cncMetaDataBuffer); + final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ"); + + // using AtomicLong because access from lambda, not because of currency + final AtomicLong lastTimestamp = new AtomicLong(sinceTimestamp); + + ErrorLogReader.read( + buffer, + (observationCount, firstObservationTimestamp, lastObservationTimestamp, encodedException) -> { + log.error(String.format( + "Aeron error: %d observations from %s to %s for:%n %s", + observationCount, + dateFormat.format(new Date(firstObservationTimestamp)), + dateFormat.format(new Date(lastObservationTimestamp)), + encodedException)); + lastTimestamp.set(Math.max(lastTimestamp.get(), lastObservationTimestamp)); + }, sinceTimestamp); + return lastTimestamp.get(); + } +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index 0ef73d7d47..f732f3d4d7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -67,6 +67,9 @@ import java.net.InetSocketAddress import java.nio.channels.DatagramChannel import akka.remote.artery.OutboundControlJunction.OutboundControlIngress +import io.aeron.CncFileDescriptor +import java.util.concurrent.atomic.AtomicLong +import akka.actor.Cancellable /** * INTERNAL API @@ -215,6 +218,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R @volatile private[this] var messageDispatcher: MessageDispatcher = _ @volatile private[this] var driver: MediaDriver = _ @volatile private[this] var aeron: Aeron = _ + @volatile private[this] var aeronErrorLogTask: Cancellable = _ override def defaultAddress: Address = localAddress.address override def addresses: Set[Address] = Set(defaultAddress) @@ -249,6 +253,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R override def start(): Unit = { startMediaDriver() startAeron() + startAeronErrorLog() taskRunner.start() val port = @@ -312,6 +317,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R aeron = Aeron.connect(ctx) } + private def startAeronErrorLog(): Unit = { + val errorLog = new AeronErrorLog(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE)) + val lastTimestamp = new AtomicLong(0L) + import system.dispatcher // FIXME perhaps use another dispatcher for this + aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) { + if (!isShutdown) { + val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get) + lastTimestamp.set(newLastTimestamp + 1) + } + } + } + private def runInboundStreams(): Unit = { runInboundControlStream() runInboundOrdinaryMessagesStream() @@ -387,6 +404,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R _shutdown = true killSwitch.shutdown() if (taskRunner != null) taskRunner.stop() + if (aeronErrorLogTask != null) aeronErrorLogTask.cancel() if (aeron != null) aeron.close() if (driver != null) { driver.close()