Merge pull request #20544 from akka/wip-20317-aeron-log-patriknw

log Aeron errors, #20317
This commit is contained in:
Patrik Nordwall 2016-05-20 13:07:04 +02:00
commit 4b048f5c4b
2 changed files with 94 additions and 0 deletions

View file

@ -0,0 +1,76 @@
/*
* Copyright 2014 - 2016 Real Logic Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package akka.remote.artery;
import io.aeron.CncFileDescriptor;
import org.agrona.DirectBuffer;
import org.agrona.IoUtil;
import org.agrona.concurrent.AtomicBuffer;
import org.agrona.concurrent.errors.ErrorLogReader;
import akka.event.LoggingAdapter;
import java.io.File;
import java.nio.MappedByteBuffer;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.concurrent.atomic.AtomicLong;
/**
* Application to print out errors recorded in the command-and-control (cnc) file is maintained by media driver in shared
* memory. This application reads the the cnc file and prints the distinct errors. Layout of the cnc file is described in
* {@link CncFileDescriptor}.
*/
public class AeronErrorLog
{
private final File cncFile;
public AeronErrorLog(File cncFile)
{
this.cncFile = cncFile;
}
public long logErrors(LoggingAdapter log, long sinceTimestamp)
{
final MappedByteBuffer cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc");
final DirectBuffer cncMetaDataBuffer = CncFileDescriptor.createMetaDataBuffer(cncByteBuffer);
final int cncVersion = cncMetaDataBuffer.getInt(CncFileDescriptor.cncVersionOffset(0));
if (CncFileDescriptor.CNC_VERSION != cncVersion)
{
throw new IllegalStateException("CNC version not supported: file version=" + cncVersion);
}
final AtomicBuffer buffer = CncFileDescriptor.createErrorLogBuffer(cncByteBuffer, cncMetaDataBuffer);
final SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSSZ");
// using AtomicLong because access from lambda, not because of currency
final AtomicLong lastTimestamp = new AtomicLong(sinceTimestamp);
ErrorLogReader.read(
buffer,
(observationCount, firstObservationTimestamp, lastObservationTimestamp, encodedException) -> {
log.error(String.format(
"Aeron error: %d observations from %s to %s for:%n %s",
observationCount,
dateFormat.format(new Date(firstObservationTimestamp)),
dateFormat.format(new Date(lastObservationTimestamp)),
encodedException));
lastTimestamp.set(Math.max(lastTimestamp.get(), lastObservationTimestamp));
}, sinceTimestamp);
return lastTimestamp.get();
}
}

View file

@ -66,6 +66,9 @@ import java.net.InetSocketAddress
import java.nio.channels.DatagramChannel
import akka.remote.artery.OutboundControlJunction.OutboundControlIngress
import io.aeron.CncFileDescriptor
import java.util.concurrent.atomic.AtomicLong
import akka.actor.Cancellable
import scala.collection.JavaConverters._
/**
@ -215,6 +218,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
@volatile private[this] var messageDispatcher: MessageDispatcher = _
@volatile private[this] var driver: MediaDriver = _
@volatile private[this] var aeron: Aeron = _
@volatile private[this] var aeronErrorLogTask: Cancellable = _
override def defaultAddress: Address = localAddress.address
override def addresses: Set[Address] = Set(defaultAddress)
@ -266,6 +270,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
override def start(): Unit = {
startMediaDriver()
startAeron()
startAeronErrorLog()
taskRunner.start()
val port =
@ -329,6 +334,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
aeron = Aeron.connect(ctx)
}
private def startAeronErrorLog(): Unit = {
val errorLog = new AeronErrorLog(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))
val lastTimestamp = new AtomicLong(0L)
import system.dispatcher // FIXME perhaps use another dispatcher for this
aeronErrorLogTask = system.scheduler.schedule(3.seconds, 5.seconds) {
if (!isShutdown) {
val newLastTimestamp = errorLog.logErrors(log, lastTimestamp.get)
lastTimestamp.set(newLastTimestamp + 1)
}
}
}
private def runInboundStreams(): Unit = {
runInboundControlStream()
runInboundOrdinaryMessagesStream()
@ -417,6 +434,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
_shutdown = true
killSwitch.shutdown()
if (taskRunner != null) taskRunner.stop()
if (aeronErrorLogTask != null) aeronErrorLogTask.cancel()
if (aeron != null) aeron.close()
if (driver != null) {
driver.close()