From 8a019f86a1d0a2aaead66d013f06864f322c7de0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Mon, 16 Dec 2019 11:45:13 +0100 Subject: [PATCH] JFR-based remoting flight recorder #26282 --- akka-actor/src/main/resources/reference.conf | 7 + akka-docs/src/main/paradox/remoting-artery.md | 9 + .../src/main/paradox/typed/dispatchers.md | 2 +- .../aeron/AeronStreamConcistencySpec.scala | 31 +- .../artery/aeron/AeronStreamLatencySpec.scala | 32 +- .../aeron/AeronStreamMaxThroughputSpec.scala | 12 +- ...-28305-JFR-based-flight-recording.excludes | 15 + .../akka/remote/artery/jfr/Events.scala | 395 ++++++++++++++++++ .../jfr/JFRRemotingFlightRecorder.scala | 139 ++++++ .../akka/remote/artery/ArteryTransport.scala | 22 +- .../akka/remote/artery/Association.scala | 15 +- .../scala/akka/remote/artery/EventSink.scala | 25 -- .../remote/artery/FlightRecorderEvents.scala | 126 ------ .../artery/RemotingFlightRecorder.scala | 152 +++++++ .../akka/remote/artery/aeron/AeronSink.scala | 26 +- .../remote/artery/aeron/AeronSource.scala | 16 +- .../aeron/ArteryAeronUdpTransport.scala | 17 +- .../artery/compress/InboundCompressions.scala | 6 +- .../artery/tcp/ArteryTcpTransport.scala | 25 +- .../akka/remote/artery/tcp/TcpFraming.scala | 8 +- .../jfr/JFRRemotingFlightRecorderSpec.scala | 41 ++ .../artery/RemotingFlightRecorderSpec.scala | 22 + .../remote/artery/aeron/AeronSinkSpec.scala | 4 +- build.sbt | 1 + project/Jdk9.scala | 28 +- 25 files changed, 920 insertions(+), 256 deletions(-) create mode 100644 akka-remote/src/main/mima-filters/2.6.1.backwards.excludes/pr-28305-JFR-based-flight-recording.excludes create mode 100644 akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/Events.scala create mode 100644 akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/EventSink.scala delete mode 100644 akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala create mode 100644 akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala create mode 100644 akka-remote/src/test/scala-jdk9-only/akka/remote/artery/jfr/JFRRemotingFlightRecorderSpec.scala create mode 100644 akka-remote/src/test/scala/akka/remote/artery/RemotingFlightRecorderSpec.scala diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index 00e9523b1e..0496fd4a9c 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -99,6 +99,13 @@ akka { # by setting this property to `off`. fail-mixed-versions = on + # Some modules (remoting only right now) can emit custom events to the Java Flight Recorder if running + # on JDK 11 or later. If you for some reason do not want that, it can be disabled and switched to no-ops + # with this toggle. + java-flight-recorder { + enabled = true + } + actor { # Either one of "local", "remote" or "cluster" or the diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index db5831fddc..4c2c283403 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -864,3 +864,12 @@ spec: There is currently no way to limit the size of a memory empty dir but there is a [pull request](https://github.com/kubernetes/kubernetes/pull/63641) for adding it. Any space used in the mount will count towards your container's memory usage. + + +### Flight Recorder + +When running on JDK 11 Artery specific flight recording is available through the [Java Flight Recorder (JFR)](https://openjdk.java.net/jeps/328). +The flight recorder is automatically enabled by detecting JDK 11 but can be disabled if needed by setting `akka.java-flight-recorder.enabled = false`. + +Low overhead Artery specific events are emitted by default when JFR is enabled, higher overhead events needs a custom settings template and are not enabled automatically with the `profiling` JFR template. +To enable those create a copy of the `profiling` template and enable all `Akka` sub category events, for example through the JMC GUI. diff --git a/akka-docs/src/main/paradox/typed/dispatchers.md b/akka-docs/src/main/paradox/typed/dispatchers.md index fb88224c40..7457a52a4f 100644 --- a/akka-docs/src/main/paradox/typed/dispatchers.md +++ b/akka-docs/src/main/paradox/typed/dispatchers.md @@ -206,7 +206,7 @@ In the thread state diagrams below the colours have the following meaning: * Green - Runnable state The thread information was recorded using the YourKit profiler, however any good JVM profiler -has this feature (including the free and bundled with the Oracle JDK VisualVM, as well as Oracle Flight Recorder). +has this feature (including the free and bundled with the Oracle JDK [VisualVM](https://visualvm.github.io/), as well as [Java Mission Control](https://openjdk.java.net/projects/jmc/)). The orange portion of the thread shows that it is idle. Idle threads are fine - they're ready to accept new work. However, large amount of turquoise (blocked, or sleeping as in our example) threads diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala index 03038b938f..4ac4eb0f64 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamConcistencySpec.scala @@ -105,9 +105,16 @@ abstract class AeronStreamConsistencySpec runOn(second) { // just echo back Source - .fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) + .fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0)) .runWith( - new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + new AeronSink( + channel(first), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) } enterBarrier("echo-started") } @@ -121,7 +128,7 @@ abstract class AeronStreamConsistencySpec val started = TestProbe() val startMsg = "0".getBytes("utf-8") Source - .fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) + .fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0)) .via(killSwitch.flow) .runForeach { envelope => val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) @@ -151,7 +158,14 @@ abstract class AeronStreamConsistencySpec } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) .runWith( - new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + new AeronSink( + channel(second), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) started.expectMsg(Done) } @@ -164,7 +178,14 @@ abstract class AeronStreamConsistencySpec envelope } .runWith( - new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + new AeronSink( + channel(second), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) Await.ready(done, 20.seconds) killSwitch.shutdown() diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala index ac5416b9a5..e1c92b3b78 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamLatencySpec.scala @@ -193,7 +193,7 @@ abstract class AeronStreamLatencySpec val started = TestProbe() val startMsg = "0".getBytes("utf-8") Source - .fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) + .fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0)) .via(killSwitch.flow) .runForeach { envelope => val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) @@ -224,7 +224,14 @@ abstract class AeronStreamLatencySpec } .throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping) .runWith( - new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + new AeronSink( + channel(second), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) started.expectMsg(Done) } @@ -243,7 +250,15 @@ abstract class AeronStreamLatencySpec val queueValue = Source .fromGraph(new SendQueue[Unit](sendToDeadLetters)) .via(sendFlow) - .to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + .to( + new AeronSink( + channel(second), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) .run() val queue = new ManyToOneConcurrentArrayQueue[Unit](1024) @@ -298,9 +313,16 @@ abstract class AeronStreamLatencySpec runOn(second) { // just echo back Source - .fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) + .fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0)) .runWith( - new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + new AeronSink( + channel(first), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) } enterBarrier("echo-started") } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala index 48acfb50da..b5d7d485a9 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/aeron/AeronStreamMaxThroughputSpec.scala @@ -168,7 +168,7 @@ abstract class AeronStreamMaxThroughputSpec val done = TestLatch(1) val killSwitch = KillSwitches.shared(testName) Source - .fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0)) + .fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0)) .via(killSwitch.flow) .runForeach { envelope => val bytes = ByteString.fromByteBuffer(envelope.byteBuffer) @@ -205,7 +205,15 @@ abstract class AeronStreamMaxThroughputSpec envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink)) + .runWith( + new AeronSink( + channel(second), + streamId, + aeron, + taskRunner, + pool, + giveUpMessageAfter, + NoOpRemotingFlightRecorder)) printStats("sender") enterBarrier(testName + "-done") diff --git a/akka-remote/src/main/mima-filters/2.6.1.backwards.excludes/pr-28305-JFR-based-flight-recording.excludes b/akka-remote/src/main/mima-filters/2.6.1.backwards.excludes/pr-28305-JFR-based-flight-recording.excludes new file mode 100644 index 0000000000..b54088bad3 --- /dev/null +++ b/akka-remote/src/main/mima-filters/2.6.1.backwards.excludes/pr-28305-JFR-based-flight-recording.excludes @@ -0,0 +1,15 @@ +# all internals +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.IgnoreEventSink$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderEvents") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.IgnoreEventSink") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.EventSink") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderEvents$") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.ArteryTransport.flightRecorder") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.topLevelFlightRecorder") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.aeron.AeronSource.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.aeron.AeronSink.this") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.compress.InboundCompressionsImpl.$default$4") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.compress.InboundCompressionsImpl.this") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.tcp.TcpFraming.this") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.tcp.TcpFraming.flightRecorder") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.tcp.TcpFraming.this") \ No newline at end of file diff --git a/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/Events.scala b/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/Events.scala new file mode 100644 index 0000000000..f71b15fcd8 --- /dev/null +++ b/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/Events.scala @@ -0,0 +1,395 @@ +/* + * Copyright (C) extends Event 2009-2019 Lightbend Inc. + */ + +package akka.remote.artery.jfr + +import java.net.InetSocketAddress + +import akka.actor.Address +import akka.annotation.InternalApi +import akka.remote.UniqueAddress +import jdk.jfr.StackTrace +import jdk.jfr.Category +import jdk.jfr.Label +import jdk.jfr.Event +import jdk.jfr.DataAmount +import jdk.jfr.Enabled +import jdk.jfr.Timespan + +// requires jdk9+ to compile +// for editing these in IntelliJ, open module settings, change JDK dependency to 11 for only this module + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object JFREventUtils { + + def stringOf(address: InetSocketAddress): String = + s"${address.getHostString}:${address.getPort}" + +} + +// transport events + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron")) @Label("Media driver started") +final class TransportMediaDriverStarted(val directoryName: String) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Transport started") +final class TransportStarted() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron")) @Label("Aeron error log started") +final class TransportAeronErrorLogStarted() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Task runner started") +final class TransportTaskRunnerStarted() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Unique address set") +final class TransportUniqueAddressSet(_uniqueAddress: UniqueAddress) extends Event { + val uniqueAddress = _uniqueAddress.toString() +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Materializer started") +final class TransportMaterializerStarted() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Startup finished") +final class TransportStartupFinished() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Kill switch pulled") +final class TransportKillSwitchPulled() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Stopped") +final class TransportStopped() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron")) @Label("Aeron log task stopped") +final class TransportAeronErrorLogTaskStopped() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Media file deleted") +final class TransportMediaFileDeleted() extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Send queue overflow") +final class TransportSendQueueOverflow(val queueIndex: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Stop idle outbound") +final class TransportStopIdleOutbound(_remoteAddress: Address, val queueIndex: Int) extends Event { + val remoteAddress = _remoteAddress.toString +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Quarantined") +final class TransportQuarantined(_remoteAddress: Address, val uid: Long) extends Event { + val remoteAddress = _remoteAddress.toString +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Remove quarantined") +final class TransportRemoveQuarantined(_remoteAddress: Address) extends Event { + val remoteAddress = _remoteAddress.toString +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Restart outbound") +final class TransportRestartOutbound(_remoteAddress: Address, val streamName: String) extends Event { + val remoteAddress = _remoteAddress.toString +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Transport")) @Label("Restart inbound") +final class TransportRestartInbound(_remoteAddress: UniqueAddress, val streamName: String) extends Event { + val remoteAddress = _remoteAddress.toString() +} + +// aeron sink events + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Started") +final class AeronSinkStarted(val channel: String, val streamId: Int) extends Event {} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Task runner removed") +final class AeronSinkTaskRunnerRemoved(val channel: String, val streamId: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Publication closed") +final class AeronSinkPublicationClosed(val channel: String, val streamId: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Publication closed unexpectedly") +final class AeronSinkPublicationClosedUnexpectedly(val channel: String, val streamId: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Stopped") +final class AeronSinkStopped(val channel: String, val streamId: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Envelope grabbed") +final class AeronSinkEnvelopeGrabbed(@DataAmount() val lastMessageSize: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Envelope offered") +final class AeronSinkEnvelopeOffered(@DataAmount() val lastMessageSize: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Gave up envelope") +final class AeronSinkGaveUpEnvelope(val cause: String) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Delegate to task runner") +final class AeronSinkDelegateToTaskRunner(val countBeforeDelegate: Long) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Return from task runner") +final class AeronSinkReturnFromTaskRunner(@Timespan(Timespan.NANOSECONDS) val nanosSinceTaskStartTime: Long) + extends Event + +// aeron source events + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Started") +final class AeronSourceStarted(val channel: String, val streamId: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Stopped") +final class AeronSourceStopped(val channel: String, val streamId: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Received") +final class AeronSourceReceived(@DataAmount() val size: Int) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Delegate to task runner") +final class AeronSourceDelegateToTaskRunner(val countBeforeDelegate: Long) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Return from task runner") +final class AeronSourceReturnFromTaskRunner(@Timespan(Timespan.NANOSECONDS) val nanosSinceTaskStartTime: Long) + extends Event + +// compression events + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Compression")) @Label("ActorRef advertisement") +final class CompressionActorRefAdvertisement(val uid: Long) extends Event + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Compression")) @Label("ClassManifest advertisement") +final class CompressionClassManifestAdvertisement(val uid: Long) extends Event + +// tcp outbound events + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Tcp", "Outbound")) @Label("Connected") +final class TcpOutboundConnected(_remoteAddress: Address, val streamName: String) extends Event { + val remoteAddress = _remoteAddress.toString +} + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Tcp", "Outbound")) @Label("Sent") +final class TcpOutboundSent(@DataAmount() val size: Int) extends Event + +// tcp inbound events + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Bound") +final class TcpInboundBound(val bindHost: String, _address: InetSocketAddress) extends Event { + val address = JFREventUtils.stringOf(_address) +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Unbound") +final class TcpInboundUnbound(_localAddress: UniqueAddress) extends Event { + val localAddress = _localAddress.toString() +} + +/** + * INTERNAL API + */ +@InternalApi +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Connected") +final class TcpInboundConnected(_remoteAddress: InetSocketAddress) extends Event { + val remoteAddress = JFREventUtils.stringOf(_remoteAddress) +} + +/** + * INTERNAL API + */ +@InternalApi +@Enabled(false) // hi frequency event +@StackTrace(false) +@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Received") +final class TcpInboundReceived(@DataAmount() val size: Int) extends Event diff --git a/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala b/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala new file mode 100644 index 0000000000..c87f8c4f63 --- /dev/null +++ b/akka-remote/src/main/scala-jdk-9/akka/remote/artery/jfr/JFRRemotingFlightRecorder.scala @@ -0,0 +1,139 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.remote.artery.jfr + +import java.net.InetSocketAddress + +import akka.actor.Address +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.remote.UniqueAddress +import akka.remote.artery.RemotingFlightRecorder + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final class JFRRemotingFlightRecorder(system: ExtendedActorSystem) extends RemotingFlightRecorder { + override def transportMediaDriverStarted(directoryName: String): Unit = + new TransportMediaDriverStarted(directoryName).commit() + + override def transportStarted(): Unit = + new TransportStarted().commit() + + override def transportAeronErrorLogStarted(): Unit = + new TransportAeronErrorLogStarted().commit() + + override def transportTaskRunnerStarted(): Unit = + new TransportTaskRunnerStarted().commit() + + override def transportUniqueAddressSet(uniqueAddress: UniqueAddress): Unit = + new TransportUniqueAddressSet(uniqueAddress).commit() + + override def transportMaterializerStarted(): Unit = + new TransportMaterializerStarted().commit() + + override def transportStartupFinished(): Unit = + new TransportStartupFinished().commit() + + override def transportKillSwitchPulled(): Unit = + new TransportKillSwitchPulled().commit() + + override def transportStopped(): Unit = + new TransportStopped().commit() + + override def transportAeronErrorLogTaskStopped(): Unit = + new TransportAeronErrorLogTaskStopped().commit() + + override def transportMediaFileDeleted(): Unit = + new TransportMediaFileDeleted().commit() + + override def transportSendQueueOverflow(queueIndex: Int): Unit = + new TransportSendQueueOverflow(queueIndex).commit() + + override def transportStopIdleOutbound(remoteAddress: Address, queueIndex: Int): Unit = + new TransportStopIdleOutbound(remoteAddress, queueIndex).commit() + + override def transportQuarantined(remoteAddress: Address, uid: Long): Unit = + new TransportQuarantined(remoteAddress, uid).commit() + + override def transportRemoveQuarantined(remoteAddress: Address): Unit = + new TransportRemoveQuarantined(remoteAddress).commit() + + override def transportRestartOutbound(remoteAddress: Address, streamName: String): Unit = + new TransportRestartOutbound(remoteAddress, streamName).commit() + + override def transportRestartInbound(remoteAddress: UniqueAddress, streamName: String): Unit = + new TransportRestartInbound(remoteAddress, streamName).commit() + + override def aeronSinkStarted(channel: String, streamId: Int): Unit = + new AeronSinkStarted(channel, streamId).commit() + + override def aeronSinkTaskRunnerRemoved(channel: String, streamId: Int): Unit = + new AeronSinkTaskRunnerRemoved(channel, streamId).commit() + + override def aeronSinkPublicationClosed(channel: String, streamId: Int): Unit = + new AeronSinkPublicationClosed(channel, streamId).commit() + + override def aeronSinkPublicationClosedUnexpectedly(channel: String, streamId: Int): Unit = + new AeronSinkPublicationClosedUnexpectedly(channel, streamId).commit() + + override def aeronSinkStopped(channel: String, streamId: Int): Unit = + new AeronSinkStopped(channel, streamId).commit() + + override def aeronSinkEnvelopeGrabbed(lastMessageSize: Int): Unit = + new AeronSinkEnvelopeGrabbed(lastMessageSize).commit() + + override def aeronSinkEnvelopeOffered(lastMessageSize: Int): Unit = + new AeronSinkEnvelopeOffered(lastMessageSize).commit() + + override def aeronSinkGaveUpEnvelope(cause: String): Unit = + new AeronSinkGaveUpEnvelope(cause).commit() + + override def aeronSinkDelegateToTaskRunner(countBeforeDelegate: Long): Unit = + new AeronSinkDelegateToTaskRunner(countBeforeDelegate).commit() + + override def aeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit = + new AeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime).commit() + + override def aeronSourceStarted(channel: String, streamId: Int): Unit = + new AeronSourceStarted(channel, streamId).commit() + + override def aeronSourceStopped(channel: String, streamId: Int): Unit = + new AeronSourceStopped(channel, streamId).commit() + + override def aeronSourceReceived(size: Int): Unit = + new AeronSourceReceived(size).commit() + + override def aeronSourceDelegateToTaskRunner(countBeforeDelegate: Long): Unit = + new AeronSourceDelegateToTaskRunner(countBeforeDelegate).commit() + + override def aeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit = + new AeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime).commit() + + override def compressionActorRefAdvertisement(uid: Long): Unit = + new CompressionActorRefAdvertisement(uid).commit() + + override def compressionClassManifestAdvertisement(uid: Long): Unit = + new CompressionClassManifestAdvertisement(uid).commit() + + override def tcpOutboundConnected(remoteAddress: Address, streamName: String): Unit = + new TcpOutboundConnected(remoteAddress, streamName).commit() + + override def tcpOutboundSent(size: Int): Unit = + new TcpOutboundSent(size).commit() + + override def tcpInboundBound(bindHost: String, address: InetSocketAddress): Unit = + new TcpInboundBound(bindHost, address).commit() + + override def tcpInboundUnbound(localAddress: UniqueAddress): Unit = + new TcpInboundUnbound(localAddress).commit() + + override def tcpInboundConnected(remoteAddress: InetSocketAddress): Unit = + new TcpInboundConnected(remoteAddress).commit() + + override def tcpInboundReceived(size: Int): Unit = + new TcpInboundReceived(size).commit() +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index e21e60a35a..771163130b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -294,7 +294,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr extends RemoteTransport(_system, _provider) with InboundContext { import ArteryTransport._ - import FlightRecorderEvents._ type LifeCycle @@ -309,6 +308,9 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr override val log: MarkerLoggingAdapter = Logging.withMarker(system, getClass) + val flightRecorder: RemotingFlightRecorder = RemotingFlightRecorder(system) + log.debug("Using flight recorder {}", flightRecorder) + /** * Compression tables must be created once, such that inbound lane restarts don't cause dropping of the tables. * However are the InboundCompressions are owned by the Decoder operator, and any call into them must be looped through the Decoder! @@ -317,8 +319,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr */ protected val _inboundCompressions = { if (settings.Advanced.Compression.Enabled) { - val eventSink = IgnoreEventSink - new InboundCompressionsImpl(system, this, settings.Advanced.Compression, eventSink) + new InboundCompressionsImpl(system, this, settings.Advanced.Compression, flightRecorder) } else NoInboundCompressions } @@ -376,8 +377,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr capacity = settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3) - val topLevelFlightRecorder: EventSink = IgnoreEventSink - private val associationRegistry = new AssociationRegistry( remoteAddress => new Association( @@ -399,7 +398,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr Runtime.getRuntime.addShutdownHook(shutdownHook) startTransport() - topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData) + flightRecorder.transportStarted() val systemMaterializer = SystemMaterializer(system) materializer = @@ -409,7 +408,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr settings.Advanced.ControlStreamMaterializerSettings) messageDispatcher = new MessageDispatcher(system, provider) - topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData) + flightRecorder.transportMaterializerStarted() val (port, boundPort) = bindInboundStreams() @@ -422,11 +421,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort), AddressUidExtension(system).longAddressUid) - topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString()) + flightRecorder.transportUniqueAddressSet(_localAddress) runInboundStreams(port, boundPort) - topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData) + flightRecorder.transportStartupFinished() startRemoveQuarantinedAssociationTask() @@ -618,7 +617,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr case cause => if (restartCounter.restart()) { log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage) - topLevelFlightRecorder.loFreq(Transport_RestartInbound, s"$localAddress - $streamName") + flightRecorder.transportRestartInbound(localAddress, streamName) restart() } else { log.error( @@ -661,7 +660,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr implicit val ec = system.dispatchers.internalDispatcher killSwitch.abort(ShutdownSignal) - topLevelFlightRecorder.loFreq(Transport_KillSwitchPulled, NoMetaData) + flightRecorder.transportKillSwitchPulled() for { _ <- streamsCompleted.recover { case _ => Done } _ <- shutdownTransport().recover { case _ => Done } @@ -669,7 +668,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr // no need to explicitly shut down the contained access since it's lifecycle is bound to the Decoder _inboundCompressionAccess = OptionVal.None - topLevelFlightRecorder.loFreq(Transport_FlightRecorderClose, NoMetaData) Done } } diff --git a/akka-remote/src/main/scala/akka/remote/artery/Association.scala b/akka-remote/src/main/scala/akka/remote/artery/Association.scala index d42241e9ba..b7a04ae626 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/Association.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/Association.scala @@ -139,12 +139,11 @@ private[remote] class Association( extends AbstractAssociation with OutboundContext { import Association._ - import FlightRecorderEvents._ require(remoteAddress.port.nonEmpty) private val log = Logging.withMarker(transport.system, getClass) - private def flightRecorder = transport.topLevelFlightRecorder + private def flightRecorder = transport.flightRecorder override def settings = transport.settings private def advancedSettings = transport.settings.Advanced @@ -350,7 +349,7 @@ private[remote] class Association( transport.system.eventStream .publish(Dropped(message, reason, env.sender.getOrElse(ActorRef.noSender), recipient.getOrElse(deadletters))) - flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex) + flightRecorder.transportSendQueueOverflow(queueIndex) deadletters ! env } @@ -503,7 +502,7 @@ private[remote] class Association( reason) transport.system.eventStream.publish(QuarantinedEvent(UniqueAddress(remoteAddress, u))) } - flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u") + flightRecorder.transportQuarantined(remoteAddress, u) clearOutboundCompression() clearInboundCompression(u) // end delivery of system messages to that incarnation after this point @@ -546,7 +545,7 @@ private[remote] class Association( */ def removedAfterQuarantined(): Unit = { if (!isRemovedAfterQuarantined()) { - flightRecorder.loFreq(Transport_RemovedQuarantined, remoteAddress.toString) + flightRecorder.transportRemoveQuarantined(remoteAddress) queues(ControlQueueIndex) = RemovedQueueWrapper if (transport.largeMessageChannelEnabled) @@ -626,7 +625,7 @@ private[remote] class Association( case OptionVal.Some(k) => // for non-control streams we can stop the entire stream log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress) - flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") + flightRecorder.transportStopIdleOutbound(remoteAddress, queueIndex) setStopReason(queueIndex, OutboundStreamStopIdleSignal) clearStreamKillSwitch(queueIndex, k) k.abort(OutboundStreamStopIdleSignal) @@ -639,7 +638,7 @@ private[remote] class Association( associationState.controlIdleKillSwitch match { case OptionVal.Some(killSwitch) => log.info("Stopping idle outbound control stream to [{}]", remoteAddress) - flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex") + flightRecorder.transportStopIdleOutbound(remoteAddress, queueIndex) setControlIdleKillSwitch(OptionVal.None) killSwitch.abort(OutboundStreamStopIdleSignal) case OptionVal.None => // already stopped @@ -881,7 +880,7 @@ private[remote] class Association( restart: () => Unit): Unit = { def lazyRestart(): Unit = { - flightRecorder.loFreq(Transport_RestartOutbound, s"$remoteAddress - $streamName") + flightRecorder.transportRestartOutbound(remoteAddress, streamName) outboundCompressionAccess = Vector.empty if (queueIndex == ControlQueueIndex) { materializing = new CountDownLatch(1) diff --git a/akka-remote/src/main/scala/akka/remote/artery/EventSink.scala b/akka-remote/src/main/scala/akka/remote/artery/EventSink.scala deleted file mode 100644 index cf1d2064e9..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/EventSink.scala +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.remote.artery - -import akka.annotation.InternalApi - -/** - * INTERNAL API - */ -@InternalApi -private[akka] trait EventSink { - def loFreq(eventId: Int, data: Array[Byte]): Unit - def loFreq(eventId: Int, data: String): Unit - def hiFreq(eventId: Int, data: Long): Unit - def alert(eventId: Int, data: Array[Byte]): Unit -} - -object IgnoreEventSink extends EventSink { - def loFreq(eventId: Int, data: Array[Byte]): Unit = () - def loFreq(eventId: Int, data: String): Unit = () - def hiFreq(eventId: Int, data: Long): Unit = () - def alert(eventId: Int, data: Array[Byte]): Unit = () -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala deleted file mode 100644 index 17ffb29c07..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderEvents.scala +++ /dev/null @@ -1,126 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.remote.artery - -/** - * INTERNAL API - */ -private[remote] object FlightRecorderEvents { - - // Note: Remember to update dictionary when adding new events! - - val NoMetaData = Array.empty[Byte] - - // Top level remoting events - val Transport_MediaDriverStarted = 0 - val Transport_Started = 1 - val Transport_AeronErrorLogStarted = 2 - val Transport_TaskRunnerStarted = 3 - val Transport_UniqueAddressSet = 4 - val Transport_MaterializerStarted = 5 - val Transport_StartupFinished = 6 - val Transport_OnAvailableImage = 7 - val Transport_KillSwitchPulled = 8 - val Transport_Stopped = 9 - val Transport_AeronErrorLogTaskStopped = 10 - val Transport_MediaFileDeleted = 11 - val Transport_FlightRecorderClose = 12 - val Transport_SendQueueOverflow = 13 - val Transport_StopIdleOutbound = 14 - val Transport_Quarantined = 15 - val Transport_RemovedQuarantined = 16 - val Transport_RestartOutbound = 17 - val Transport_RestartInbound = 18 - - // Aeron Sink events - val AeronSink_Started = 50 - val AeronSink_TaskRunnerRemoved = 51 - val AeronSink_PublicationClosed = 52 - val AeronSink_Stopped = 53 - val AeronSink_EnvelopeGrabbed = 54 - val AeronSink_EnvelopeOffered = 55 - val AeronSink_GaveUpEnvelope = 56 - val AeronSink_DelegateToTaskRunner = 57 - val AeronSink_ReturnFromTaskRunner = 58 - - // Aeron Source events - val AeronSource_Started = 70 - val AeronSource_Stopped = 71 - val AeronSource_Received = 72 - val AeronSource_DelegateToTaskRunner = 73 - val AeronSource_ReturnFromTaskRunner = 74 - - // Compression events - val Compression_CompressedActorRef = 90 - val Compression_AllocatedActorRefCompressionId = 91 - val Compression_CompressedManifest = 92 - val Compression_AllocatedManifestCompressionId = 93 - val Compression_Inbound_RunActorRefAdvertisement = 94 - val Compression_Inbound_RunClassManifestAdvertisement = 95 - - val TcpOutbound_Connected = 150 - val TcpOutbound_Sent = 151 - - val TcpInbound_Bound = 170 - val TcpInbound_Unbound = 171 - val TcpInbound_Connected = 172 - val TcpInbound_Received = 173 - - // Used for presentation of the entries in the flight recorder - lazy val eventDictionary = Map( - Transport_MediaDriverStarted -> "Transport: Media driver started", - Transport_Started -> "Transport: started", - Transport_AeronErrorLogStarted -> "Transport: Aeron error log started", - Transport_TaskRunnerStarted -> "Transport: Task runner started", - Transport_UniqueAddressSet -> "Transport: Unique address set", - Transport_MaterializerStarted -> "Transport: Materializer started", - Transport_StartupFinished -> "Transport: Startup finished", - Transport_OnAvailableImage -> "Transport: onAvailableImage", - Transport_KillSwitchPulled -> "Transport: KillSwitch pulled", - Transport_Stopped -> "Transport: Stopped", - Transport_AeronErrorLogTaskStopped -> "Transport: Aeron errorLog task stopped", - Transport_MediaFileDeleted -> "Transport: Media file deleted", - Transport_FlightRecorderClose -> "Transport: Flight recorder closed", - Transport_SendQueueOverflow -> "Transport: Send queue overflow", - Transport_StopIdleOutbound -> "Transport: Remove idle outbound", - Transport_Quarantined -> "Transport: Quarantined association", - Transport_RemovedQuarantined -> "Transport: Removed idle quarantined association", - Transport_RestartOutbound -> "Transport: Restart outbound", - Transport_RestartInbound -> "Transport: Restart outbound", - // Aeron Sink events - AeronSink_Started -> "AeronSink: Started", - AeronSink_TaskRunnerRemoved -> "AeronSink: Task runner removed", - AeronSink_PublicationClosed -> "AeronSink: Publication closed", - AeronSink_Stopped -> "AeronSink: Stopped", - AeronSink_EnvelopeGrabbed -> "AeronSink: Envelope grabbed", - AeronSink_EnvelopeOffered -> "AeronSink: Envelope offered", - AeronSink_GaveUpEnvelope -> "AeronSink: Gave up envelope", - AeronSink_DelegateToTaskRunner -> "AeronSink: Delegate to task runner", - AeronSink_ReturnFromTaskRunner -> "AeronSink: Return from task runner", - // Aeron Source events - AeronSource_Started -> "AeronSource: Started", - AeronSource_Stopped -> "AeronSource: Stopped", - AeronSource_Received -> "AeronSource: Received", - AeronSource_DelegateToTaskRunner -> "AeronSource: Delegate to task runner", - AeronSource_ReturnFromTaskRunner -> "AeronSource: Return from task runner", - // Compression events - Compression_CompressedActorRef -> "Compression: Compressed ActorRef", - Compression_AllocatedActorRefCompressionId -> "Compression: Allocated ActorRef compression id", - Compression_CompressedManifest -> "Compression: Compressed manifest", - Compression_AllocatedManifestCompressionId -> "Compression: Allocated manifest compression id", - Compression_Inbound_RunActorRefAdvertisement -> "InboundCompression: Run class manifest compression advertisement", - Compression_Inbound_RunClassManifestAdvertisement -> "InboundCompression: Run class manifest compression advertisement", - // TCP outbound events - TcpOutbound_Connected -> "TCP out: Connected", - TcpOutbound_Sent -> "TCP out: Sent message", - // TCP inbound events - TcpInbound_Bound -> "TCP in: Bound", - TcpInbound_Unbound -> "TCP in: Unbound", - TcpInbound_Connected -> "TCP in: New connection", - TcpInbound_Received -> "TCP in: Received message").map { - case (int, str) => int.toLong -> str - } - -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala new file mode 100644 index 0000000000..eb7e416f14 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/RemotingFlightRecorder.scala @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2018-2019 Lightbend Inc. + */ + +package akka.remote.artery + +import java.net.InetSocketAddress + +import akka.actor.Address +import akka.actor.ExtendedActorSystem +import akka.actor.Extension +import akka.actor.ExtensionId +import akka.actor.ExtensionIdProvider +import akka.annotation.InternalApi +import akka.remote.UniqueAddress +import akka.util.JavaVersion + +import scala.util.Failure +import scala.util.Success + +/** + * INTERNAL API + */ +@InternalApi +object RemotingFlightRecorder extends ExtensionId[RemotingFlightRecorder] with ExtensionIdProvider { + + override def createExtension(system: ExtendedActorSystem): RemotingFlightRecorder = + if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) { + // Dynamic instantiation to not trigger class load on earlier JDKs + system.dynamicAccess.createInstanceFor[RemotingFlightRecorder]( + "akka.remote.artery.jfr.JFRRemotingFlightRecorder", + (classOf[ExtendedActorSystem], system) :: Nil) match { + case Success(jfr) => jfr + case Failure(ex) => + system.log + .warning("Failed to load JFR remoting flight recorder, falling back to noop. Exception: {}", ex.getMessage) + NoOpRemotingFlightRecorder + } // fallback if not possible to dynamically load for some reason + } else + // JFR not available on Java 8 + NoOpRemotingFlightRecorder + + override def lookup(): ExtensionId[_ <: Extension] = this +} + +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait RemotingFlightRecorder extends Extension { + + def transportMediaDriverStarted(directoryName: String): Unit + def transportStarted(): Unit + def transportAeronErrorLogStarted(): Unit + def transportTaskRunnerStarted(): Unit + def transportUniqueAddressSet(uniqueAddress: UniqueAddress): Unit + def transportMaterializerStarted(): Unit + def transportStartupFinished(): Unit + def transportKillSwitchPulled(): Unit + def transportStopped(): Unit + def transportAeronErrorLogTaskStopped(): Unit + def transportMediaFileDeleted(): Unit + def transportSendQueueOverflow(queueIndex: Int): Unit + def transportStopIdleOutbound(remoteAddress: Address, queueIndex: Int): Unit + def transportQuarantined(remoteAddress: Address, uid: Long): Unit + def transportRemoveQuarantined(remoteAddress: Address): Unit + def transportRestartOutbound(remoteAddress: Address, streamName: String): Unit + def transportRestartInbound(remoteAddress: UniqueAddress, streamName: String): Unit + + def aeronSinkStarted(channel: String, streamId: Int): Unit + def aeronSinkTaskRunnerRemoved(channel: String, streamId: Int): Unit + def aeronSinkPublicationClosed(channel: String, streamId: Int): Unit + def aeronSinkPublicationClosedUnexpectedly(channel: String, streamId: Int): Unit + def aeronSinkStopped(channel: String, streamId: Int): Unit + def aeronSinkEnvelopeGrabbed(lastMessageSize: Int): Unit + def aeronSinkEnvelopeOffered(lastMessageSize: Int): Unit + def aeronSinkGaveUpEnvelope(cause: String): Unit + def aeronSinkDelegateToTaskRunner(countBeforeDelegate: Long): Unit + def aeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit + + def aeronSourceStarted(channel: String, streamId: Int): Unit + def aeronSourceStopped(channel: String, streamId: Int): Unit + def aeronSourceReceived(size: Int): Unit + def aeronSourceDelegateToTaskRunner(countBeforeDelegate: Long): Unit + def aeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit + + def compressionActorRefAdvertisement(uid: Long): Unit + def compressionClassManifestAdvertisement(uid: Long): Unit + + def tcpOutboundConnected(remoteAddress: Address, streamName: String): Unit + def tcpOutboundSent(size: Int): Unit + + def tcpInboundBound(bindHost: String, address: InetSocketAddress): Unit + def tcpInboundUnbound(localAddress: UniqueAddress): Unit + def tcpInboundConnected(remoteAddress: InetSocketAddress): Unit + def tcpInboundReceived(size: Int): Unit + +} + +/** + * JFR is only available under certain circumstances (JDK11 for now, possible OpenJDK 8 in the future) so therefore + * the default on JDK 8 needs to be a no-op flight recorder. + * + * INTERNAL + */ +@InternalApi +private[akka] case object NoOpRemotingFlightRecorder extends RemotingFlightRecorder { + override def transportMediaDriverStarted(directoryName: String): Unit = () + override def transportStarted(): Unit = () + override def transportAeronErrorLogStarted(): Unit = () + override def transportTaskRunnerStarted(): Unit = () + override def transportUniqueAddressSet(uniqueAddress: UniqueAddress): Unit = () + override def transportMaterializerStarted(): Unit = () + override def transportStartupFinished(): Unit = () + override def transportKillSwitchPulled(): Unit = () + override def transportStopped(): Unit = () + override def transportAeronErrorLogTaskStopped(): Unit = () + override def transportMediaFileDeleted(): Unit = () + override def transportStopIdleOutbound(remoteAddress: Address, queueIndex: Int): Unit = () + override def transportQuarantined(remoteAddress: Address, uid: Long): Unit = () + override def transportRemoveQuarantined(remoteAddress: Address): Unit = () + override def transportRestartOutbound(remoteAddress: Address, streamName: String): Unit = () + override def transportRestartInbound(remoteAddress: UniqueAddress, streamName: String): Unit = () + override def transportSendQueueOverflow(queueIndex: Int): Unit = () + + override def aeronSinkStarted(channel: String, streamId: Int): Unit = () + override def aeronSinkTaskRunnerRemoved(channel: String, streamId: Int): Unit = () + override def aeronSinkPublicationClosed(channel: String, streamId: Int): Unit = () + override def aeronSinkPublicationClosedUnexpectedly(channel: String, streamId: Int): Unit = () + override def aeronSinkStopped(channel: String, streamId: Int): Unit = () + override def aeronSinkEnvelopeGrabbed(lastMessageSize: Int): Unit = () + override def aeronSinkEnvelopeOffered(lastMessageSize: Int): Unit = () + override def aeronSinkGaveUpEnvelope(cause: String): Unit = () + override def aeronSinkDelegateToTaskRunner(countBeforeDelegate: Long): Unit = () + override def aeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit = () + + override def aeronSourceStarted(channel: String, streamId: Int): Unit = () + override def aeronSourceStopped(channel: String, streamId: Int): Unit = () + override def aeronSourceReceived(size: Int): Unit = () + override def aeronSourceDelegateToTaskRunner(countBeforeDelegate: Long): Unit = () + override def aeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit = () + + override def compressionActorRefAdvertisement(uid: Long): Unit = () + override def compressionClassManifestAdvertisement(uid: Long): Unit = () + override def tcpOutboundConnected(remoteAddress: Address, streamName: String): Unit = () + override def tcpOutboundSent(size: Int): Unit = () + override def tcpInboundBound(bindHost: String, address: InetSocketAddress): Unit = () + override def tcpInboundUnbound(localAddress: UniqueAddress): Unit = () + override def tcpInboundConnected(remoteAddress: InetSocketAddress): Unit = () + override def tcpInboundReceived(size: Int): Unit = () + +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala index 7a42965d0b..7ed357fcb3 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSink.scala @@ -93,11 +93,10 @@ private[remote] class AeronSink( taskRunner: TaskRunner, pool: EnvelopeBufferPool, giveUpAfter: Duration, - flightRecorder: EventSink) + flightRecorder: RemotingFlightRecorder) extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] { import AeronSink._ import TaskRunner._ - import FlightRecorderEvents._ val in: Inlet[EnvelopeBuffer] = Inlet("AeronSink") override val shape: SinkShape[EnvelopeBuffer] = SinkShape(in) @@ -129,23 +128,21 @@ private[remote] class AeronSink( private var delegateTaskStartTime = 0L private var countBeforeDelegate = 0L - private val channelMetadata = channel.getBytes("US-ASCII") - override def preStart(): Unit = { setKeepGoing(true) pull(in) // TODO: Identify different sinks! - flightRecorder.loFreq(AeronSink_Started, channelMetadata) + flightRecorder.aeronSinkStarted(channel, streamId) } override def postStop(): Unit = { try { taskRunner.command(Remove(addOfferTask.task)) - flightRecorder.loFreq(AeronSink_TaskRunnerRemoved, channelMetadata) + flightRecorder.aeronSinkTaskRunnerRemoved(channel, streamId) pub.close() - flightRecorder.loFreq(AeronSink_PublicationClosed, channelMetadata) + flightRecorder.aeronSinkPublicationClosed(channel, streamId) } finally { - flightRecorder.loFreq(AeronSink_Stopped, channelMetadata) + flightRecorder.aeronSinkStopped(channel, streamId) completed.complete(completedValue) } } @@ -155,7 +152,7 @@ private[remote] class AeronSink( envelopeInFlight = grab(in) backoffCount = spinning lastMsgSize = envelopeInFlight.byteBuffer.limit - flightRecorder.hiFreq(AeronSink_EnvelopeGrabbed, lastMsgSize) + flightRecorder.aeronSinkEnvelopeGrabbed(lastMsgSize) publish() } @@ -188,17 +185,18 @@ private[remote] class AeronSink( offerTask.msgSize = lastMsgSize delegateTaskStartTime = System.nanoTime() taskRunner.command(addOfferTask) - flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate) + flightRecorder.aeronSinkDelegateToTaskRunner(countBeforeDelegate) } private def taskOnOfferSuccess(): Unit = { countBeforeDelegate = 0 - flightRecorder.hiFreq(AeronSink_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime) + // FIXME does calculation belong here or in impl? + flightRecorder.aeronSinkReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime) onOfferSuccess() } private def onOfferSuccess(): Unit = { - flightRecorder.hiFreq(AeronSink_EnvelopeOffered, lastMsgSize) + flightRecorder.aeronSinkEnvelopeOffered(lastMsgSize) offerTaskInProgress = false pool.release(envelopeInFlight) offerTask.buffer = null @@ -213,7 +211,7 @@ private[remote] class AeronSink( private def onGiveUp(): Unit = { offerTaskInProgress = false val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.") - flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.toString.getBytes("US-ASCII")) + flightRecorder.aeronSinkGaveUpEnvelope(cause.getMessage) completedValue = Failure(cause) failStage(cause) } @@ -222,7 +220,7 @@ private[remote] class AeronSink( offerTaskInProgress = false val cause = new PublicationClosedException(s"Aeron Publication to [${channel}] was closed.") // this is not exepected, since we didn't close the publication ourselves - flightRecorder.alert(AeronSink_PublicationClosed, channelMetadata) + flightRecorder.aeronSinkPublicationClosedUnexpectedly(channel, streamId) completedValue = Failure(cause) failStage(cause) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala index 5fd111c22d..51c8599589 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/AeronSource.scala @@ -85,13 +85,12 @@ private[remote] class AeronSource( aeron: Aeron, taskRunner: TaskRunner, pool: EnvelopeBufferPool, - flightRecorder: EventSink, + flightRecorder: RemotingFlightRecorder, spinning: Int) extends GraphStageWithMaterializedValue[SourceShape[EnvelopeBuffer], AeronSource.AeronLifecycle] { import AeronSource._ import TaskRunner._ - import FlightRecorderEvents._ val out: Outlet[EnvelopeBuffer] = Outlet("AeronSource") override val shape: SourceShape[EnvelopeBuffer] = SourceShape(out) @@ -108,8 +107,6 @@ private[remote] class AeronSource( private val messageHandler = new MessageHandler(pool) private val addPollTask: Add = Add(pollTask(subscription, messageHandler, getAsyncCallback(taskOnMessage))) - private val channelMetadata = channel.getBytes("US-ASCII") - private var delegatingToTaskRunner = false private var pendingUnavailableImages: List[Int] = Nil @@ -124,7 +121,7 @@ private[remote] class AeronSource( override protected def logSource = classOf[AeronSource] override def preStart(): Unit = { - flightRecorder.loFreq(AeronSource_Started, channelMetadata) + flightRecorder.aeronSourceStarted(channel, streamId) } override def postStop(): Unit = { @@ -134,7 +131,7 @@ private[remote] class AeronSource( case e: DriverTimeoutException => // media driver was shutdown log.debug("DriverTimeout when closing subscription. {}", e) - } finally flightRecorder.loFreq(AeronSource_Stopped, channelMetadata) + } finally flightRecorder.aeronSourceStopped(channel, streamId) } // OutHandler @@ -161,7 +158,7 @@ private[remote] class AeronSource( subscriberLoop() // recursive } else { // delegate backoff to shared TaskRunner - flightRecorder.hiFreq(AeronSource_DelegateToTaskRunner, countBeforeDelegate) + flightRecorder.aeronSourceDelegateToTaskRunner(countBeforeDelegate) delegatingToTaskRunner = true delegateTaskStartTime = System.nanoTime() taskRunner.command(addPollTask) @@ -178,13 +175,14 @@ private[remote] class AeronSource( private def taskOnMessage(data: EnvelopeBuffer): Unit = { countBeforeDelegate = 0 delegatingToTaskRunner = false - flightRecorder.hiFreq(AeronSource_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime) + // FIXME push calculation to JFR? + flightRecorder.aeronSourceReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime) freeSessionBuffers() onMessage(data) } private def onMessage(data: EnvelopeBuffer): Unit = { - flightRecorder.hiFreq(AeronSource_Received, data.byteBuffer.limit) + flightRecorder.aeronSourceReceived(data.byteBuffer.limit()) push(out, data) } diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala index bb3274f791..afd19c5a92 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -56,7 +56,6 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro import AeronSource.AeronLifecycle import ArteryTransport._ import Decoder.InboundCompressionAccess - import FlightRecorderEvents._ override type LifeCycle = AeronLifecycle @@ -74,12 +73,12 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro startMediaDriver() startAeron() startAeronErrorLog() - topLevelFlightRecorder.loFreq(Transport_AeronErrorLogStarted, NoMetaData) + flightRecorder.transportAeronErrorLogStarted() if (settings.Advanced.Aeron.LogAeronCounters) { startAeronCounterLog() } taskRunner.start() - topLevelFlightRecorder.loFreq(Transport_TaskRunnerStarted, NoMetaData) + flightRecorder.transportTaskRunnerStarted() } private def startMediaDriver(): Unit = { @@ -131,7 +130,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro val driver = MediaDriver.launchEmbedded(driverContext) log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) - topLevelFlightRecorder.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName()) + flightRecorder.transportMediaDriverStarted(driver.aeronDirectoryName()) if (!mediaDriver.compareAndSet(None, Some(driver))) { throw new IllegalStateException("media driver started more than once") } @@ -158,7 +157,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro try { if (settings.Advanced.Aeron.DeleteAeronDirectory) { IoUtil.delete(new File(driver.aeronDirectoryName), false) - topLevelFlightRecorder.loFreq(Transport_MediaFileDeleted, NoMetaData) + flightRecorder.transportMediaFileDeleted() } } catch { case NonFatal(e) => @@ -307,7 +306,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro taskRunner, bufferPool, giveUpAfter, - IgnoreEventSink)) + flightRecorder)) } private def aeronSource( @@ -315,7 +314,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro pool: EnvelopeBufferPool, inboundChannel: String): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] = Source.fromGraph( - new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, IgnoreEventSink, aeronSourceSpinningStrategy)) + new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, flightRecorder, aeronSourceSpinningStrategy)) private def aeronSourceSpinningStrategy: Int = if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365 @@ -451,10 +450,10 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro taskRunner .stop() .map { _ => - topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) + flightRecorder.transportStopped() if (aeronErrorLogTask != null) { aeronErrorLogTask.cancel() - topLevelFlightRecorder.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData) + flightRecorder.transportAeronErrorLogTaskStopped() } if (aeron != null) aeron.close() if (aeronErrorLog != null) aeronErrorLog.close() diff --git a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala index b8e72668e3..cf321698c8 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/compress/InboundCompressions.scala @@ -57,7 +57,7 @@ private[remote] final class InboundCompressionsImpl( system: ActorSystem, inboundContext: InboundContext, settings: ArterySettings.Compression, - eventSink: EventSink = IgnoreEventSink) + flightRecorder: RemotingFlightRecorder = NoOpRemotingFlightRecorder) extends InboundCompressions { private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]() @@ -108,7 +108,7 @@ private[remote] final class InboundCompressionsImpl( val inbound = vs.next() inboundContext.association(inbound.originUid) match { case OptionVal.Some(a) if !a.associationState.isQuarantined(inbound.originUid) => - eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunActorRefAdvertisement, inbound.originUid) + flightRecorder.compressionActorRefAdvertisement(inbound.originUid) inbound.runNextTableAdvertisement() case _ => remove :+= inbound.originUid } @@ -140,7 +140,7 @@ private[remote] final class InboundCompressionsImpl( val inbound = vs.next() inboundContext.association(inbound.originUid) match { case OptionVal.Some(a) if !a.associationState.isQuarantined(inbound.originUid) => - eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunClassManifestAdvertisement, inbound.originUid) + flightRecorder.compressionClassManifestAdvertisement(inbound.originUid) inbound.runNextTableAdvertisement() case _ => remove :+= inbound.originUid } diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index e5cfd57f60..cdf2559ef7 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -74,7 +74,6 @@ private[remote] class ArteryTcpTransport( extends ArteryTransport(_system, _provider) { import ArteryTransport._ import ArteryTcpTransport._ - import FlightRecorderEvents._ override type LifeCycle = NotUsed @@ -118,8 +117,6 @@ private[remote] class ArteryTcpTransport( bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { implicit val sys: ActorSystem = system - val afr = IgnoreEventSink - val host = outboundContext.remoteAddress.host.get val port = outboundContext.remoteAddress.port.get val remoteAddress = InetSocketAddress.createUnresolved(host, port) @@ -174,10 +171,7 @@ private[remote] class ArteryTcpTransport( .via(Flow.lazyFlow(() => { // only open the actual connection if any new messages are sent logConnect() - afr.loFreq( - TcpOutbound_Connected, - s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " + - s"/ ${streamName(streamId)}") + flightRecorder.tcpOutboundConnected(outboundContext.remoteAddress, streamName(streamId)) if (controlIdleKillSwitch.isDefined) outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch) @@ -220,7 +214,7 @@ private[remote] class ArteryTcpTransport( Flow[EnvelopeBuffer] .map { env => val size = env.byteBuffer.limit() - afr.hiFreq(TcpOutbound_Sent, size) + flightRecorder.tcpOutboundSent(size) // TODO Possible performance improvement, could we reduce the copying of bytes? val bytes = ByteString(env.byteBuffer) @@ -260,12 +254,9 @@ private[remote] class ArteryTcpTransport( val binding = serverBinding match { case None => - val afr = IgnoreEventSink val binding = connectionSource .to(Sink.foreach { connection => - afr.loFreq( - TcpInbound_Connected, - s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}") + flightRecorder.tcpInboundConnected(connection.remoteAddress) inboundConnectionFlow.map(connection.handleWith(_))(sys.dispatcher) }) .run() @@ -280,7 +271,7 @@ private[remote] class ArteryTcpTransport( // only on initial startup, when ActorSystem is starting val b = Await.result(binding, settings.Bind.BindTimeout) - afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}") + flightRecorder.tcpInboundBound(bindHost, b.localAddress) b case Some(binding) => // already bound, when restarting @@ -351,7 +342,7 @@ private[remote] class ArteryTcpTransport( Flow[ByteString] .via(inboundKillSwitch.flow) // must create new FlightRecorder event sink for each connection because they can't be shared - .via(new TcpFraming) + .via(new TcpFraming(flightRecorder)) .alsoTo(inboundStream) .filter(_ => false) // don't send back anything in this TCP socket .map(_ => ByteString.empty) // make it a Flow[ByteString] again @@ -490,7 +481,7 @@ private[remote] class ArteryTcpTransport( implicit val ec = system.dispatchers.internalDispatcher inboundKillSwitch.shutdown() unbind().map { _ => - topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData) + flightRecorder.transportStopped() Done } } @@ -502,9 +493,7 @@ private[remote] class ArteryTcpTransport( for { _ <- binding.unbind() } yield { - topLevelFlightRecorder.loFreq( - TcpInbound_Bound, - s"${localAddress.address.host.get}:${localAddress.address.port}") + flightRecorder.tcpInboundUnbound(localAddress) Done } case None => diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala index cd2850d785..7ddf2f0dc1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala @@ -9,7 +9,6 @@ import java.nio.ByteBuffer import java.nio.ByteOrder import akka.annotation.InternalApi -import akka.remote.artery.FlightRecorderEvents.TcpInbound_Received import akka.stream.Attributes import akka.stream.impl.io.ByteStringParser import akka.stream.impl.io.ByteStringParser.ByteReader @@ -59,9 +58,8 @@ import akka.util.ByteString /** * INTERNAL API */ -@InternalApi private[akka] class TcpFraming extends ByteStringParser[EnvelopeBuffer] { - - val flightRecorder = IgnoreEventSink +@InternalApi private[akka] class TcpFraming(flightRecorder: RemotingFlightRecorder = NoOpRemotingFlightRecorder) + extends ByteStringParser[EnvelopeBuffer] { override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic { @@ -97,7 +95,7 @@ import akka.util.ByteString private def createBuffer(bs: ByteString): EnvelopeBuffer = { val buffer = ByteBuffer.wrap(bs.toArray) buffer.order(ByteOrder.LITTLE_ENDIAN) - flightRecorder.hiFreq(TcpInbound_Received, buffer.limit) + flightRecorder.tcpInboundReceived(buffer.limit) val res = new EnvelopeBuffer(buffer) res.setStreamId(streamId) res diff --git a/akka-remote/src/test/scala-jdk9-only/akka/remote/artery/jfr/JFRRemotingFlightRecorderSpec.scala b/akka-remote/src/test/scala-jdk9-only/akka/remote/artery/jfr/JFRRemotingFlightRecorderSpec.scala new file mode 100644 index 0000000000..1add811f07 --- /dev/null +++ b/akka-remote/src/test/scala-jdk9-only/akka/remote/artery/jfr/JFRRemotingFlightRecorderSpec.scala @@ -0,0 +1,41 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.remote.artery.jfr + +import akka.actor.ActorSystem +import akka.remote.artery.NoOpRemotingFlightRecorder +import akka.remote.artery.RemotingFlightRecorder +import akka.testkit.AkkaSpec +import akka.testkit.TestKit +import com.typesafe.config.ConfigFactory + +class JFRRemotingFlightRecorderSpec extends AkkaSpec { + + "The RemotingFlightRecorder" must { + + "use the JFR one on Java 11" in { + val extension = RemotingFlightRecorder(system) + extension shouldBe a[JFRRemotingFlightRecorder] + + extension.transportStopped() // try to actually report something and see that it doesn't throw or something + } + + "be disabled if configured to" in { + val system = ActorSystem( + "JFRRemotingFlightRecorderSpec-2", + ConfigFactory.parseString( + """ + akka.java-flight-recorder.enabled = false + """)) + try { + val extension = RemotingFlightRecorder(system) + extension should === (NoOpRemotingFlightRecorder) + } finally { + TestKit.shutdownActorSystem(system) + } + } + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemotingFlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemotingFlightRecorderSpec.scala new file mode 100644 index 0000000000..6260307e84 --- /dev/null +++ b/akka-remote/src/test/scala/akka/remote/artery/RemotingFlightRecorderSpec.scala @@ -0,0 +1,22 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.remote.artery + +import akka.testkit.AkkaSpec +import akka.util.JavaVersion +import org.scalatest.Matchers + +class RemotingFlightRecorderSpec extends AkkaSpec with Matchers { + + "The RemotingFlightRecorder" must { + + "use the no-op recorder by default when running on JDK 8" in { + val extension = RemotingFlightRecorder(system) + if (JavaVersion.majorVersion < 11) + extension should ===(NoOpRemotingFlightRecorder) + } + } + +} diff --git a/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala index a6c3630a4b..f5c9b6aca2 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/aeron/AeronSinkSpec.scala @@ -58,7 +58,7 @@ class AeronSinkSpec extends AkkaSpec(""" val channel = s"aeron:udp?endpoint=localhost:$port" Source - .fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, IgnoreEventSink, 0)) + .fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0)) // fail receiver stream on first message .map(_ => throw new RuntimeException("stop") with NoStackTrace) .runWith(Sink.ignore) @@ -73,7 +73,7 @@ class AeronSinkSpec extends AkkaSpec(""" envelope.byteBuffer.flip() envelope } - .runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, IgnoreEventSink)) + .runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, NoOpRemotingFlightRecorder)) // without the give up timeout the stream would not complete/fail intercept[GaveUpMessageException] { diff --git a/build.sbt b/build.sbt index d7b7774e39..338f3803c0 100644 --- a/build.sbt +++ b/build.sbt @@ -311,6 +311,7 @@ lazy val remote = .settings(OSGi.remote) .settings(Protobuf.settings) .settings(parallelExecution in Test := false) + .enablePlugins(Jdk9) lazy val remoteTests = akkaModule("akka-remote-tests") .dependsOn( diff --git a/project/Jdk9.scala b/project/Jdk9.scala index f1591cd37f..de603e27f1 100644 --- a/project/Jdk9.scala +++ b/project/Jdk9.scala @@ -12,7 +12,7 @@ object Jdk9 extends AutoPlugin { val CompileJdk9 = config("CompileJdk9").extend(Compile) - val TestJdk9 = config("TestJdk9").extend(Test) + val TestJdk9 = config("TestJdk9").extend(Test).extend(CompileJdk9) val SCALA_SOURCE_DIRECTORY = "scala-jdk-9" val SCALA_TEST_SOURCE_DIRECTORY = "scala-jdk9-only" @@ -22,25 +22,23 @@ object Jdk9 extends AutoPlugin { val compileJdk9Settings = Seq( // following the scala-2.12, scala-sbt-1.0, ... convention unmanagedSourceDirectories := notOnJdk8( - Seq( - (Compile / sourceDirectory).value / SCALA_SOURCE_DIRECTORY, - (Compile / sourceDirectory).value / JAVA_SOURCE_DIRECTORY)), - + Seq( + (Compile / sourceDirectory).value / SCALA_SOURCE_DIRECTORY, + (Compile / sourceDirectory).value / JAVA_SOURCE_DIRECTORY)), scalacOptions := AkkaBuild.DefaultScalacOptions ++ notOnJdk8(Seq("-release", "11")), javacOptions := AkkaBuild.DefaultJavacOptions ++ notOnJdk8(Seq("--release", "11"))) val testJdk9Settings = Seq( // following the scala-2.12, scala-sbt-1.0, ... convention unmanagedSourceDirectories := notOnJdk8( - Seq( - (Test / sourceDirectory).value / SCALA_TEST_SOURCE_DIRECTORY, - (Test / sourceDirectory).value / JAVA_TEST_SOURCE_DIRECTORY)), - + Seq( + (Test / sourceDirectory).value / SCALA_TEST_SOURCE_DIRECTORY, + (Test / sourceDirectory).value / JAVA_TEST_SOURCE_DIRECTORY)), scalacOptions := AkkaBuild.DefaultScalacOptions ++ notOnJdk8(Seq("-release", "11")), javacOptions := AkkaBuild.DefaultJavacOptions ++ notOnJdk8(Seq("--release", "11")), + compile := compile.dependsOn(CompileJdk9 / compile).value, classpathConfiguration := TestJdk9, - externalDependencyClasspath := (externalDependencyClasspath in Test).value - ) + externalDependencyClasspath := (externalDependencyClasspath in Test).value) val compileSettings = Seq( // It might have been more 'neat' to add the jdk9 products to the jar via packageBin/mappings, but that doesn't work with the OSGi plugin, @@ -49,6 +47,11 @@ object Jdk9 extends AutoPlugin { // ++= (CompileJdk9 / products).value.flatMap(Path.allSubpaths), Compile / fullClasspath ++= (CompileJdk9 / exportedProducts).value) + val testSettings = Seq((Test / test) := { + (Test / test).value + (TestJdk9 / test).value + }) + override def trigger = noTrigger override def projectConfigurations = Seq(CompileJdk9) override lazy val projectSettings = @@ -56,5 +59,6 @@ object Jdk9 extends AutoPlugin { inConfig(CompileJdk9)(compileJdk9Settings) ++ compileSettings ++ inConfig(TestJdk9)(Defaults.testSettings) ++ - inConfig(TestJdk9)(testJdk9Settings) + inConfig(TestJdk9)(testJdk9Settings) ++ + testSettings }