diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala index 628981e0fa..f5f45cbe4d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamLatencySpec.scala @@ -26,6 +26,8 @@ import akka.stream.KillSwitches import akka.Done import org.agrona.IoUtil import java.io.File +import java.io.File +import io.aeron.CncFileDescriptor object AeronStreamLatencySpec extends MultiNodeConfig { val first = role("first") @@ -80,6 +82,9 @@ abstract class AeronStreamLatencySpec val driver = MediaDriver.launchEmbedded() + val stats = + new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) + val aeron = { val ctx = new Aeron.Context ctx.aeronDirectoryName(driver.aeronDirectoryName) @@ -152,6 +157,11 @@ abstract class AeronStreamLatencySpec } } + def printStats(side: String): Unit = { + println(side + " stats:") + stats.print(System.out) + } + val scenarios = List( TestSettings( testName = "rate-100-size-100", @@ -236,6 +246,7 @@ abstract class AeronStreamLatencySpec rep.halt() } + printStats(myself.name) enterBarrier("after-" + testName) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala index e59fd296fc..d86b55f0e5 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/AeronStreamMaxThroughputSpec.scala @@ -20,6 +20,8 @@ import com.typesafe.config.ConfigFactory import io.aeron.Aeron import io.aeron.driver.MediaDriver import akka.stream.KillSwitches +import java.io.File +import io.aeron.CncFileDescriptor object AeronStreamMaxThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -82,9 +84,13 @@ abstract class AeronStreamMaxThroughputSpec var plot = PlotResult() + val driver = MediaDriver.launchEmbedded() + + val stats = + new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) + val aeron = { val ctx = new Aeron.Context - val driver = MediaDriver.launchEmbedded() ctx.aeronDirectoryName(driver.aeronDirectoryName) Aeron.connect(ctx) } @@ -139,6 +145,11 @@ abstract class AeronStreamMaxThroughputSpec plot = plot.add(testName, throughput * payloadSize / 1024 / 1024) } + def printStats(side: String): Unit = { + println(side + " stats:") + stats.print(System.out) + } + val scenarios = List( TestSettings( testName = "size-100", @@ -183,6 +194,7 @@ abstract class AeronStreamMaxThroughputSpec enterBarrier(receiverName + "-started") Await.ready(done, barrierTimeout) rep.halt() + printStats("receiver") enterBarrier(testName + "-done") } @@ -195,7 +207,9 @@ abstract class AeronStreamMaxThroughputSpec .map { n ⇒ payload } .runWith(new AeronSink(channel(second), aeron, taskRunner)) + printStats("sender") enterBarrier(testName + "-done") + } enterBarrier("after-" + testName) diff --git a/akka-remote/src/test/java/akka/remote/artery/AeronStat.java b/akka-remote/src/test/java/akka/remote/artery/AeronStat.java new file mode 100644 index 0000000000..feab499287 --- /dev/null +++ b/akka-remote/src/test/java/akka/remote/artery/AeronStat.java @@ -0,0 +1,273 @@ +/* + * Copyright 2014 - 2016 Real Logic Ltd. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package akka.remote.artery; + +import java.io.File; +import java.io.PrintStream; +import java.nio.MappedByteBuffer; +import java.util.Date; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Supplier; +import java.util.regex.Pattern; + +import io.aeron.CncFileDescriptor; +import io.aeron.CommonContext; +import org.agrona.DirectBuffer; +import org.agrona.IoUtil; +import org.agrona.concurrent.status.CountersReader; +import org.agrona.concurrent.SigInt; + +import static io.aeron.CncFileDescriptor.*; +import static io.aeron.driver.status.StreamPositionCounter.*; +import static io.aeron.driver.status.PublisherLimit.PUBLISHER_LIMIT_TYPE_ID; +import static io.aeron.driver.status.SubscriberPos.SUBSCRIBER_POSITION_TYPE_ID; +import static io.aeron.driver.status.SystemCounterDescriptor.SYSTEM_COUNTER_TYPE_ID; + +/** + * Tool for printing out Aeron counters. A command-and-control (cnc) file is maintained by media driver + * in shared memory. This application reads the the cnc file and prints the counters. Layout of the cnc file is + * described in {@link CncFileDescriptor}. + * + * This tool accepts filters on the command line, e.g. for connections only see example below: + * + * + * java -cp aeron-samples/build/libs/samples.jar io.aeron.samples.AeronStat type=[1-4] identity=12345 + * + */ +public class AeronStat +{ + /** + * Types of the counters. + * + */ + private static final String COUNTER_TYPE_ID = "type"; + + /** + * The identity of each counter that can either be the system counter id or registration id for positions. + */ + private static final String COUNTER_IDENTITY = "identity"; + + /** + * Session id filter to be used for position counters. + */ + private static final String COUNTER_SESSION_ID = "session"; + + /** + * Stream id filter to be used for position counters. + */ + private static final String COUNTER_STREAM_ID = "stream"; + + /** + * Channel filter to be used for position counters. + */ + private static final String COUNTER_CHANNEL = "channel"; + + private static final int ONE_SECOND = 1_000; + + private final CountersReader counters; + private final Pattern typeFilter; + private final Pattern identityFilter; + private final Pattern sessionFilter; + private final Pattern streamFilter; + private final Pattern channelFilter; + + public AeronStat( + final CountersReader counters, + final Pattern typeFilter, + final Pattern identityFilter, + final Pattern sessionFilter, + final Pattern streamFilter, + final Pattern channelFilter) + { + this.counters = counters; + this.typeFilter = typeFilter; + this.identityFilter = identityFilter; + this.sessionFilter = sessionFilter; + this.streamFilter = streamFilter; + this.channelFilter = channelFilter; + } + + public AeronStat(final CountersReader counters) + { + this.counters = counters; + this.typeFilter = null; + this.identityFilter = null; + this.sessionFilter = null; + this.streamFilter = null; + this.channelFilter = null; + } + + public static CountersReader mapCounters() + { + return mapCounters(CommonContext.newDefaultCncFile()); + } + + public static CountersReader mapCounters(final File cncFile) + { + System.out.println("Command `n Control file " + cncFile); + + final MappedByteBuffer cncByteBuffer = IoUtil.mapExistingFile(cncFile, "cnc"); + final DirectBuffer cncMetaData = createMetaDataBuffer(cncByteBuffer); + final int cncVersion = cncMetaData.getInt(cncVersionOffset(0)); + + if (CncFileDescriptor.CNC_VERSION != cncVersion) + { + throw new IllegalStateException("CnC version not supported: file version=" + cncVersion); + } + + return new CountersReader( + createCountersMetaDataBuffer(cncByteBuffer, cncMetaData), + createCountersValuesBuffer(cncByteBuffer, cncMetaData)); + } + + public static void main(final String[] args) throws Exception + { + Pattern typeFilter = null; + Pattern identityFilter = null; + Pattern sessionFilter = null; + Pattern streamFilter = null; + Pattern channelFilter = null; + + if (0 != args.length) + { + checkForHelp(args); + + for (final String arg : args) + { + final int equalsIndex = arg.indexOf('='); + if (-1 == equalsIndex) + { + System.out.println("Arguments must be in name=pattern format: Invalid '" + arg + "'"); + return; + } + + final String argName = arg.substring(0, equalsIndex); + final String argValue = arg.substring(equalsIndex + 1); + + switch (argName) + { + case COUNTER_TYPE_ID: + typeFilter = Pattern.compile(argValue); + break; + + case COUNTER_IDENTITY: + identityFilter = Pattern.compile(argValue); + break; + + case COUNTER_SESSION_ID: + sessionFilter = Pattern.compile(argValue); + break; + + case COUNTER_STREAM_ID: + streamFilter = Pattern.compile(argValue); + break; + + case COUNTER_CHANNEL: + channelFilter = Pattern.compile(argValue); + break; + + default: + System.out.println("Unrecognised argument: '" + arg + "'"); + return; + } + } + } + + final AeronStat aeronStat = new AeronStat( + mapCounters(), typeFilter, identityFilter, sessionFilter, streamFilter, channelFilter); + final AtomicBoolean running = new AtomicBoolean(true); + SigInt.register(() -> running.set(false)); + + while (running.get()) + { + System.out.print("\033[H\033[2J"); + + System.out.format("%1$tH:%1$tM:%1$tS - Aeron Stat%n", new Date()); + System.out.println("========================="); + + aeronStat.print(System.out); + System.out.println("--"); + + Thread.sleep(ONE_SECOND); + } + } + + public void print(final PrintStream out) + { + counters.forEach( + (counterId, typeId, keyBuffer, label) -> + { + if (filter(typeId, keyBuffer)) + { + final long value = counters.getCounterValue(counterId); + out.format("%3d: %,20d - %s%n", counterId, value, label); + } + }); + } + + private static void checkForHelp(final String[] args) + { + for (final String arg : args) + { + if ("-?".equals(arg) || "-h".equals(arg) || "-help".equals(arg)) + { + System.out.println( + "Usage: [-Daeron.dir=] AeronStat%n" + + "\tfilter by optional regex patterns:%n" + + "\t[type=]%n" + + "\t[identity=]%n" + + "\t[sessionId=]%n" + + "\t[streamId=]%n" + + "\t[channel=]%n"); + + System.exit(0); + } + } + } + + private boolean filter(final int typeId, final DirectBuffer keyBuffer) + { + if (!match(typeFilter, () -> Integer.toString(typeId))) + { + return false; + } + + if (SYSTEM_COUNTER_TYPE_ID == typeId && !match(identityFilter, () -> Integer.toString(keyBuffer.getInt(0)))) + { + return false; + } + else if (typeId >= PUBLISHER_LIMIT_TYPE_ID && typeId <= SUBSCRIBER_POSITION_TYPE_ID) + { + if (!match(identityFilter, () -> Long.toString(keyBuffer.getLong(REGISTRATION_ID_OFFSET))) || + !match(sessionFilter, () -> Integer.toString(keyBuffer.getInt(SESSION_ID_OFFSET))) || + !match(streamFilter, () -> Integer.toString(keyBuffer.getInt(STREAM_ID_OFFSET))) || + !match(channelFilter, () -> keyBuffer.getStringUtf8(CHANNEL_OFFSET))) + { + return false; + } + } + + return true; + } + + private static boolean match(final Pattern pattern, final Supplier supplier) + { + return null == pattern || pattern.matcher(supplier.get()).find(); + } +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala index d2a312cb34..ce337bdfa3 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/AeronStreamsApp.scala @@ -23,6 +23,8 @@ import io.aeron.UnavailableImageHandler import io.aeron.Image import io.aeron.AvailableImageHandler import akka.actor.ExtendedActorSystem +import java.io.File +import io.aeron.CncFileDescriptor object AeronStreamsApp { @@ -34,6 +36,18 @@ object AeronStreamsApp { val payload = ("0" * 100).getBytes("utf-8") lazy val sendTimes = new AtomicLongArray(latencyN) + lazy val driver = { + val driverContext = new MediaDriver.Context + driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) + driverContext.driverTimeoutMs(SECONDS.toNanos(10)) + MediaDriver.launchEmbedded(driverContext) + } + + lazy val stats = { + new AeronStat(AeronStat.mapCounters(new File(driver.aeronDirectoryName, CncFileDescriptor.CNC_FILE))) + } + lazy val aeron = { val ctx = new Aeron.Context ctx.errorHandler(new ErrorHandler { @@ -52,11 +66,6 @@ object AeronStreamsApp { } }) - val driverContext = new MediaDriver.Context - driverContext.clientLivenessTimeoutNs(SECONDS.toNanos(10)) - driverContext.imageLivenessTimeoutNs(SECONDS.toNanos(10)) - driverContext.driverTimeoutMs(SECONDS.toNanos(10)) - val driver = MediaDriver.launchEmbedded(driverContext) ctx.aeronDirectoryName(driver.aeronDirectoryName) Aeron.connect(ctx) } @@ -132,6 +141,9 @@ object AeronStreamsApp { if (args(0) == "debug-sender") runDebugSender() + + if (args.length >= 2 && args(1) == "stats") + runStats() } def runReceiver(): Unit = { @@ -257,6 +269,7 @@ object AeronStreamsApp { e.printStackTrace exit(-1) } + } def runDebugSender(): Unit = { @@ -271,4 +284,8 @@ object AeronStreamsApp { .runWith(new AeronSink(channel1, aeron, taskRunner)) } + def runStats(): Unit = { + Source.tick(10.second, 10.second, "tick").runForeach { _ ⇒ stats.print(System.out) } + } + }