JFR-based remoting flight recorder #26282

This commit is contained in:
Johan Andrén 2019-12-16 11:45:13 +01:00 committed by GitHub
parent 02db62bd73
commit 8a019f86a1
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 920 additions and 256 deletions

View file

@ -99,6 +99,13 @@ akka {
# by setting this property to `off`.
fail-mixed-versions = on
# Some modules (remoting only right now) can emit custom events to the Java Flight Recorder if running
# on JDK 11 or later. If you for some reason do not want that, it can be disabled and switched to no-ops
# with this toggle.
java-flight-recorder {
enabled = true
}
actor {
# Either one of "local", "remote" or "cluster" or the

View file

@ -864,3 +864,12 @@ spec:
There is currently no way to limit the size of a memory empty dir but there is a [pull request](https://github.com/kubernetes/kubernetes/pull/63641) for adding it.
Any space used in the mount will count towards your container's memory usage.
### Flight Recorder
When running on JDK 11 Artery specific flight recording is available through the [Java Flight Recorder (JFR)](https://openjdk.java.net/jeps/328).
The flight recorder is automatically enabled by detecting JDK 11 but can be disabled if needed by setting `akka.java-flight-recorder.enabled = false`.
Low overhead Artery specific events are emitted by default when JFR is enabled, higher overhead events needs a custom settings template and are not enabled automatically with the `profiling` JFR template.
To enable those create a copy of the `profiling` template and enable all `Akka` sub category events, for example through the JMC GUI.

View file

@ -206,7 +206,7 @@ In the thread state diagrams below the colours have the following meaning:
* Green - Runnable state
The thread information was recorded using the YourKit profiler, however any good JVM profiler
has this feature (including the free and bundled with the Oracle JDK VisualVM, as well as Oracle Flight Recorder).
has this feature (including the free and bundled with the Oracle JDK [VisualVM](https://visualvm.github.io/), as well as [Java Mission Control](https://openjdk.java.net/projects/jmc/)).
The orange portion of the thread shows that it is idle. Idle threads are fine -
they're ready to accept new work. However, large amount of turquoise (blocked, or sleeping as in our example) threads

View file

@ -105,9 +105,16 @@ abstract class AeronStreamConsistencySpec
runOn(second) {
// just echo back
Source
.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0))
.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0))
.runWith(
new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
new AeronSink(
channel(first),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
}
enterBarrier("echo-started")
}
@ -121,7 +128,7 @@ abstract class AeronStreamConsistencySpec
val started = TestProbe()
val startMsg = "0".getBytes("utf-8")
Source
.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0))
.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0))
.via(killSwitch.flow)
.runForeach { envelope =>
val bytes = ByteString.fromByteBuffer(envelope.byteBuffer)
@ -151,7 +158,14 @@ abstract class AeronStreamConsistencySpec
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(
new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
new AeronSink(
channel(second),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
started.expectMsg(Done)
}
@ -164,7 +178,14 @@ abstract class AeronStreamConsistencySpec
envelope
}
.runWith(
new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
new AeronSink(
channel(second),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
Await.ready(done, 20.seconds)
killSwitch.shutdown()

View file

@ -193,7 +193,7 @@ abstract class AeronStreamLatencySpec
val started = TestProbe()
val startMsg = "0".getBytes("utf-8")
Source
.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0))
.fromGraph(new AeronSource(channel(first), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0))
.via(killSwitch.flow)
.runForeach { envelope =>
val bytes = ByteString.fromByteBuffer(envelope.byteBuffer)
@ -224,7 +224,14 @@ abstract class AeronStreamLatencySpec
}
.throttle(1, 200.milliseconds, 1, ThrottleMode.Shaping)
.runWith(
new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
new AeronSink(
channel(second),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
started.expectMsg(Done)
}
@ -243,7 +250,15 @@ abstract class AeronStreamLatencySpec
val queueValue = Source
.fromGraph(new SendQueue[Unit](sendToDeadLetters))
.via(sendFlow)
.to(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
.to(
new AeronSink(
channel(second),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
.run()
val queue = new ManyToOneConcurrentArrayQueue[Unit](1024)
@ -298,9 +313,16 @@ abstract class AeronStreamLatencySpec
runOn(second) {
// just echo back
Source
.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0))
.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0))
.runWith(
new AeronSink(channel(first), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
new AeronSink(
channel(first),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
}
enterBarrier("echo-started")
}

View file

@ -168,7 +168,7 @@ abstract class AeronStreamMaxThroughputSpec
val done = TestLatch(1)
val killSwitch = KillSwitches.shared(testName)
Source
.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, IgnoreEventSink, 0))
.fromGraph(new AeronSource(channel(second), streamId, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0))
.via(killSwitch.flow)
.runForeach { envelope =>
val bytes = ByteString.fromByteBuffer(envelope.byteBuffer)
@ -205,7 +205,15 @@ abstract class AeronStreamMaxThroughputSpec
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel(second), streamId, aeron, taskRunner, pool, giveUpMessageAfter, IgnoreEventSink))
.runWith(
new AeronSink(
channel(second),
streamId,
aeron,
taskRunner,
pool,
giveUpMessageAfter,
NoOpRemotingFlightRecorder))
printStats("sender")
enterBarrier(testName + "-done")

View file

@ -0,0 +1,15 @@
# all internals
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.IgnoreEventSink$")
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderEvents")
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.IgnoreEventSink")
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.EventSink")
ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderEvents$")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.ArteryTransport.flightRecorder")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.topLevelFlightRecorder")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.aeron.AeronSource.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.aeron.AeronSink.this")
ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.remote.artery.compress.InboundCompressionsImpl.<init>$default$4")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.compress.InboundCompressionsImpl.this")
ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.tcp.TcpFraming.this")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.tcp.TcpFraming.flightRecorder")
ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.tcp.TcpFraming.this")

View file

@ -0,0 +1,395 @@
/*
* Copyright (C) extends Event 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery.jfr
import java.net.InetSocketAddress
import akka.actor.Address
import akka.annotation.InternalApi
import akka.remote.UniqueAddress
import jdk.jfr.StackTrace
import jdk.jfr.Category
import jdk.jfr.Label
import jdk.jfr.Event
import jdk.jfr.DataAmount
import jdk.jfr.Enabled
import jdk.jfr.Timespan
// requires jdk9+ to compile
// for editing these in IntelliJ, open module settings, change JDK dependency to 11 for only this module
/**
* INTERNAL API
*/
@InternalApi
private[akka] object JFREventUtils {
def stringOf(address: InetSocketAddress): String =
s"${address.getHostString}:${address.getPort}"
}
// transport events
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron")) @Label("Media driver started")
final class TransportMediaDriverStarted(val directoryName: String) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Transport started")
final class TransportStarted() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron")) @Label("Aeron error log started")
final class TransportAeronErrorLogStarted() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Task runner started")
final class TransportTaskRunnerStarted() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Unique address set")
final class TransportUniqueAddressSet(_uniqueAddress: UniqueAddress) extends Event {
val uniqueAddress = _uniqueAddress.toString()
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Materializer started")
final class TransportMaterializerStarted() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Startup finished")
final class TransportStartupFinished() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Kill switch pulled")
final class TransportKillSwitchPulled() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Stopped")
final class TransportStopped() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron")) @Label("Aeron log task stopped")
final class TransportAeronErrorLogTaskStopped() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Media file deleted")
final class TransportMediaFileDeleted() extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Send queue overflow")
final class TransportSendQueueOverflow(val queueIndex: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Stop idle outbound")
final class TransportStopIdleOutbound(_remoteAddress: Address, val queueIndex: Int) extends Event {
val remoteAddress = _remoteAddress.toString
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Quarantined")
final class TransportQuarantined(_remoteAddress: Address, val uid: Long) extends Event {
val remoteAddress = _remoteAddress.toString
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Remove quarantined")
final class TransportRemoveQuarantined(_remoteAddress: Address) extends Event {
val remoteAddress = _remoteAddress.toString
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Restart outbound")
final class TransportRestartOutbound(_remoteAddress: Address, val streamName: String) extends Event {
val remoteAddress = _remoteAddress.toString
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Transport")) @Label("Restart inbound")
final class TransportRestartInbound(_remoteAddress: UniqueAddress, val streamName: String) extends Event {
val remoteAddress = _remoteAddress.toString()
}
// aeron sink events
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Started")
final class AeronSinkStarted(val channel: String, val streamId: Int) extends Event {}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Task runner removed")
final class AeronSinkTaskRunnerRemoved(val channel: String, val streamId: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Publication closed")
final class AeronSinkPublicationClosed(val channel: String, val streamId: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Publication closed unexpectedly")
final class AeronSinkPublicationClosedUnexpectedly(val channel: String, val streamId: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Stopped")
final class AeronSinkStopped(val channel: String, val streamId: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Envelope grabbed")
final class AeronSinkEnvelopeGrabbed(@DataAmount() val lastMessageSize: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Envelope offered")
final class AeronSinkEnvelopeOffered(@DataAmount() val lastMessageSize: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Gave up envelope")
final class AeronSinkGaveUpEnvelope(val cause: String) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Delegate to task runner")
final class AeronSinkDelegateToTaskRunner(val countBeforeDelegate: Long) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Sink")) @Label("Return from task runner")
final class AeronSinkReturnFromTaskRunner(@Timespan(Timespan.NANOSECONDS) val nanosSinceTaskStartTime: Long)
extends Event
// aeron source events
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Started")
final class AeronSourceStarted(val channel: String, val streamId: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Stopped")
final class AeronSourceStopped(val channel: String, val streamId: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Received")
final class AeronSourceReceived(@DataAmount() val size: Int) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Delegate to task runner")
final class AeronSourceDelegateToTaskRunner(val countBeforeDelegate: Long) extends Event
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Aeron", "Source")) @Label("Return from task runner")
final class AeronSourceReturnFromTaskRunner(@Timespan(Timespan.NANOSECONDS) val nanosSinceTaskStartTime: Long)
extends Event
// compression events
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Compression")) @Label("ActorRef advertisement")
final class CompressionActorRefAdvertisement(val uid: Long) extends Event
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Compression")) @Label("ClassManifest advertisement")
final class CompressionClassManifestAdvertisement(val uid: Long) extends Event
// tcp outbound events
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Tcp", "Outbound")) @Label("Connected")
final class TcpOutboundConnected(_remoteAddress: Address, val streamName: String) extends Event {
val remoteAddress = _remoteAddress.toString
}
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Tcp", "Outbound")) @Label("Sent")
final class TcpOutboundSent(@DataAmount() val size: Int) extends Event
// tcp inbound events
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Bound")
final class TcpInboundBound(val bindHost: String, _address: InetSocketAddress) extends Event {
val address = JFREventUtils.stringOf(_address)
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Unbound")
final class TcpInboundUnbound(_localAddress: UniqueAddress) extends Event {
val localAddress = _localAddress.toString()
}
/**
* INTERNAL API
*/
@InternalApi
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Connected")
final class TcpInboundConnected(_remoteAddress: InetSocketAddress) extends Event {
val remoteAddress = JFREventUtils.stringOf(_remoteAddress)
}
/**
* INTERNAL API
*/
@InternalApi
@Enabled(false) // hi frequency event
@StackTrace(false)
@Category(Array("Akka", "Remoting", "Tcp", "Inbound")) @Label("Received")
final class TcpInboundReceived(@DataAmount() val size: Int) extends Event

View file

@ -0,0 +1,139 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery.jfr
import java.net.InetSocketAddress
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.annotation.InternalApi
import akka.remote.UniqueAddress
import akka.remote.artery.RemotingFlightRecorder
/**
* INTERNAL API
*/
@InternalApi
private[akka] final class JFRRemotingFlightRecorder(system: ExtendedActorSystem) extends RemotingFlightRecorder {
override def transportMediaDriverStarted(directoryName: String): Unit =
new TransportMediaDriverStarted(directoryName).commit()
override def transportStarted(): Unit =
new TransportStarted().commit()
override def transportAeronErrorLogStarted(): Unit =
new TransportAeronErrorLogStarted().commit()
override def transportTaskRunnerStarted(): Unit =
new TransportTaskRunnerStarted().commit()
override def transportUniqueAddressSet(uniqueAddress: UniqueAddress): Unit =
new TransportUniqueAddressSet(uniqueAddress).commit()
override def transportMaterializerStarted(): Unit =
new TransportMaterializerStarted().commit()
override def transportStartupFinished(): Unit =
new TransportStartupFinished().commit()
override def transportKillSwitchPulled(): Unit =
new TransportKillSwitchPulled().commit()
override def transportStopped(): Unit =
new TransportStopped().commit()
override def transportAeronErrorLogTaskStopped(): Unit =
new TransportAeronErrorLogTaskStopped().commit()
override def transportMediaFileDeleted(): Unit =
new TransportMediaFileDeleted().commit()
override def transportSendQueueOverflow(queueIndex: Int): Unit =
new TransportSendQueueOverflow(queueIndex).commit()
override def transportStopIdleOutbound(remoteAddress: Address, queueIndex: Int): Unit =
new TransportStopIdleOutbound(remoteAddress, queueIndex).commit()
override def transportQuarantined(remoteAddress: Address, uid: Long): Unit =
new TransportQuarantined(remoteAddress, uid).commit()
override def transportRemoveQuarantined(remoteAddress: Address): Unit =
new TransportRemoveQuarantined(remoteAddress).commit()
override def transportRestartOutbound(remoteAddress: Address, streamName: String): Unit =
new TransportRestartOutbound(remoteAddress, streamName).commit()
override def transportRestartInbound(remoteAddress: UniqueAddress, streamName: String): Unit =
new TransportRestartInbound(remoteAddress, streamName).commit()
override def aeronSinkStarted(channel: String, streamId: Int): Unit =
new AeronSinkStarted(channel, streamId).commit()
override def aeronSinkTaskRunnerRemoved(channel: String, streamId: Int): Unit =
new AeronSinkTaskRunnerRemoved(channel, streamId).commit()
override def aeronSinkPublicationClosed(channel: String, streamId: Int): Unit =
new AeronSinkPublicationClosed(channel, streamId).commit()
override def aeronSinkPublicationClosedUnexpectedly(channel: String, streamId: Int): Unit =
new AeronSinkPublicationClosedUnexpectedly(channel, streamId).commit()
override def aeronSinkStopped(channel: String, streamId: Int): Unit =
new AeronSinkStopped(channel, streamId).commit()
override def aeronSinkEnvelopeGrabbed(lastMessageSize: Int): Unit =
new AeronSinkEnvelopeGrabbed(lastMessageSize).commit()
override def aeronSinkEnvelopeOffered(lastMessageSize: Int): Unit =
new AeronSinkEnvelopeOffered(lastMessageSize).commit()
override def aeronSinkGaveUpEnvelope(cause: String): Unit =
new AeronSinkGaveUpEnvelope(cause).commit()
override def aeronSinkDelegateToTaskRunner(countBeforeDelegate: Long): Unit =
new AeronSinkDelegateToTaskRunner(countBeforeDelegate).commit()
override def aeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit =
new AeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime).commit()
override def aeronSourceStarted(channel: String, streamId: Int): Unit =
new AeronSourceStarted(channel, streamId).commit()
override def aeronSourceStopped(channel: String, streamId: Int): Unit =
new AeronSourceStopped(channel, streamId).commit()
override def aeronSourceReceived(size: Int): Unit =
new AeronSourceReceived(size).commit()
override def aeronSourceDelegateToTaskRunner(countBeforeDelegate: Long): Unit =
new AeronSourceDelegateToTaskRunner(countBeforeDelegate).commit()
override def aeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit =
new AeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime).commit()
override def compressionActorRefAdvertisement(uid: Long): Unit =
new CompressionActorRefAdvertisement(uid).commit()
override def compressionClassManifestAdvertisement(uid: Long): Unit =
new CompressionClassManifestAdvertisement(uid).commit()
override def tcpOutboundConnected(remoteAddress: Address, streamName: String): Unit =
new TcpOutboundConnected(remoteAddress, streamName).commit()
override def tcpOutboundSent(size: Int): Unit =
new TcpOutboundSent(size).commit()
override def tcpInboundBound(bindHost: String, address: InetSocketAddress): Unit =
new TcpInboundBound(bindHost, address).commit()
override def tcpInboundUnbound(localAddress: UniqueAddress): Unit =
new TcpInboundUnbound(localAddress).commit()
override def tcpInboundConnected(remoteAddress: InetSocketAddress): Unit =
new TcpInboundConnected(remoteAddress).commit()
override def tcpInboundReceived(size: Int): Unit =
new TcpInboundReceived(size).commit()
}

View file

@ -294,7 +294,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
extends RemoteTransport(_system, _provider)
with InboundContext {
import ArteryTransport._
import FlightRecorderEvents._
type LifeCycle
@ -309,6 +308,9 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
override val log: MarkerLoggingAdapter = Logging.withMarker(system, getClass)
val flightRecorder: RemotingFlightRecorder = RemotingFlightRecorder(system)
log.debug("Using flight recorder {}", flightRecorder)
/**
* Compression tables must be created once, such that inbound lane restarts don't cause dropping of the tables.
* However are the InboundCompressions are owned by the Decoder operator, and any call into them must be looped through the Decoder!
@ -317,8 +319,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
*/
protected val _inboundCompressions = {
if (settings.Advanced.Compression.Enabled) {
val eventSink = IgnoreEventSink
new InboundCompressionsImpl(system, this, settings.Advanced.Compression, eventSink)
new InboundCompressionsImpl(system, this, settings.Advanced.Compression, flightRecorder)
} else NoInboundCompressions
}
@ -376,8 +377,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
capacity =
settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3)
val topLevelFlightRecorder: EventSink = IgnoreEventSink
private val associationRegistry = new AssociationRegistry(
remoteAddress =>
new Association(
@ -399,7 +398,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Runtime.getRuntime.addShutdownHook(shutdownHook)
startTransport()
topLevelFlightRecorder.loFreq(Transport_Started, NoMetaData)
flightRecorder.transportStarted()
val systemMaterializer = SystemMaterializer(system)
materializer =
@ -409,7 +408,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
settings.Advanced.ControlStreamMaterializerSettings)
messageDispatcher = new MessageDispatcher(system, provider)
topLevelFlightRecorder.loFreq(Transport_MaterializerStarted, NoMetaData)
flightRecorder.transportMaterializerStarted()
val (port, boundPort) = bindInboundStreams()
@ -422,11 +421,11 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
Address(ArteryTransport.ProtocolName, system.name, settings.Bind.Hostname, boundPort),
AddressUidExtension(system).longAddressUid)
topLevelFlightRecorder.loFreq(Transport_UniqueAddressSet, _localAddress.toString())
flightRecorder.transportUniqueAddressSet(_localAddress)
runInboundStreams(port, boundPort)
topLevelFlightRecorder.loFreq(Transport_StartupFinished, NoMetaData)
flightRecorder.transportStartupFinished()
startRemoveQuarantinedAssociationTask()
@ -618,7 +617,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
case cause =>
if (restartCounter.restart()) {
log.error(cause, "{} failed. Restarting it. {}", streamName, cause.getMessage)
topLevelFlightRecorder.loFreq(Transport_RestartInbound, s"$localAddress - $streamName")
flightRecorder.transportRestartInbound(localAddress, streamName)
restart()
} else {
log.error(
@ -661,7 +660,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
implicit val ec = system.dispatchers.internalDispatcher
killSwitch.abort(ShutdownSignal)
topLevelFlightRecorder.loFreq(Transport_KillSwitchPulled, NoMetaData)
flightRecorder.transportKillSwitchPulled()
for {
_ <- streamsCompleted.recover { case _ => Done }
_ <- shutdownTransport().recover { case _ => Done }
@ -669,7 +668,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr
// no need to explicitly shut down the contained access since it's lifecycle is bound to the Decoder
_inboundCompressionAccess = OptionVal.None
topLevelFlightRecorder.loFreq(Transport_FlightRecorderClose, NoMetaData)
Done
}
}

View file

@ -139,12 +139,11 @@ private[remote] class Association(
extends AbstractAssociation
with OutboundContext {
import Association._
import FlightRecorderEvents._
require(remoteAddress.port.nonEmpty)
private val log = Logging.withMarker(transport.system, getClass)
private def flightRecorder = transport.topLevelFlightRecorder
private def flightRecorder = transport.flightRecorder
override def settings = transport.settings
private def advancedSettings = transport.settings.Advanced
@ -350,7 +349,7 @@ private[remote] class Association(
transport.system.eventStream
.publish(Dropped(message, reason, env.sender.getOrElse(ActorRef.noSender), recipient.getOrElse(deadletters)))
flightRecorder.hiFreq(Transport_SendQueueOverflow, queueIndex)
flightRecorder.transportSendQueueOverflow(queueIndex)
deadletters ! env
}
@ -503,7 +502,7 @@ private[remote] class Association(
reason)
transport.system.eventStream.publish(QuarantinedEvent(UniqueAddress(remoteAddress, u)))
}
flightRecorder.loFreq(Transport_Quarantined, s"$remoteAddress - $u")
flightRecorder.transportQuarantined(remoteAddress, u)
clearOutboundCompression()
clearInboundCompression(u)
// end delivery of system messages to that incarnation after this point
@ -546,7 +545,7 @@ private[remote] class Association(
*/
def removedAfterQuarantined(): Unit = {
if (!isRemovedAfterQuarantined()) {
flightRecorder.loFreq(Transport_RemovedQuarantined, remoteAddress.toString)
flightRecorder.transportRemoveQuarantined(remoteAddress)
queues(ControlQueueIndex) = RemovedQueueWrapper
if (transport.largeMessageChannelEnabled)
@ -626,7 +625,7 @@ private[remote] class Association(
case OptionVal.Some(k) =>
// for non-control streams we can stop the entire stream
log.info("Stopping idle outbound stream [{}] to [{}]", queueIndex, remoteAddress)
flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
flightRecorder.transportStopIdleOutbound(remoteAddress, queueIndex)
setStopReason(queueIndex, OutboundStreamStopIdleSignal)
clearStreamKillSwitch(queueIndex, k)
k.abort(OutboundStreamStopIdleSignal)
@ -639,7 +638,7 @@ private[remote] class Association(
associationState.controlIdleKillSwitch match {
case OptionVal.Some(killSwitch) =>
log.info("Stopping idle outbound control stream to [{}]", remoteAddress)
flightRecorder.loFreq(Transport_StopIdleOutbound, s"$remoteAddress - $queueIndex")
flightRecorder.transportStopIdleOutbound(remoteAddress, queueIndex)
setControlIdleKillSwitch(OptionVal.None)
killSwitch.abort(OutboundStreamStopIdleSignal)
case OptionVal.None => // already stopped
@ -881,7 +880,7 @@ private[remote] class Association(
restart: () => Unit): Unit = {
def lazyRestart(): Unit = {
flightRecorder.loFreq(Transport_RestartOutbound, s"$remoteAddress - $streamName")
flightRecorder.transportRestartOutbound(remoteAddress, streamName)
outboundCompressionAccess = Vector.empty
if (queueIndex == ControlQueueIndex) {
materializing = new CountDownLatch(1)

View file

@ -1,25 +0,0 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import akka.annotation.InternalApi
/**
* INTERNAL API
*/
@InternalApi
private[akka] trait EventSink {
def loFreq(eventId: Int, data: Array[Byte]): Unit
def loFreq(eventId: Int, data: String): Unit
def hiFreq(eventId: Int, data: Long): Unit
def alert(eventId: Int, data: Array[Byte]): Unit
}
object IgnoreEventSink extends EventSink {
def loFreq(eventId: Int, data: Array[Byte]): Unit = ()
def loFreq(eventId: Int, data: String): Unit = ()
def hiFreq(eventId: Int, data: Long): Unit = ()
def alert(eventId: Int, data: Array[Byte]): Unit = ()
}

View file

@ -1,126 +0,0 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
/**
* INTERNAL API
*/
private[remote] object FlightRecorderEvents {
// Note: Remember to update dictionary when adding new events!
val NoMetaData = Array.empty[Byte]
// Top level remoting events
val Transport_MediaDriverStarted = 0
val Transport_Started = 1
val Transport_AeronErrorLogStarted = 2
val Transport_TaskRunnerStarted = 3
val Transport_UniqueAddressSet = 4
val Transport_MaterializerStarted = 5
val Transport_StartupFinished = 6
val Transport_OnAvailableImage = 7
val Transport_KillSwitchPulled = 8
val Transport_Stopped = 9
val Transport_AeronErrorLogTaskStopped = 10
val Transport_MediaFileDeleted = 11
val Transport_FlightRecorderClose = 12
val Transport_SendQueueOverflow = 13
val Transport_StopIdleOutbound = 14
val Transport_Quarantined = 15
val Transport_RemovedQuarantined = 16
val Transport_RestartOutbound = 17
val Transport_RestartInbound = 18
// Aeron Sink events
val AeronSink_Started = 50
val AeronSink_TaskRunnerRemoved = 51
val AeronSink_PublicationClosed = 52
val AeronSink_Stopped = 53
val AeronSink_EnvelopeGrabbed = 54
val AeronSink_EnvelopeOffered = 55
val AeronSink_GaveUpEnvelope = 56
val AeronSink_DelegateToTaskRunner = 57
val AeronSink_ReturnFromTaskRunner = 58
// Aeron Source events
val AeronSource_Started = 70
val AeronSource_Stopped = 71
val AeronSource_Received = 72
val AeronSource_DelegateToTaskRunner = 73
val AeronSource_ReturnFromTaskRunner = 74
// Compression events
val Compression_CompressedActorRef = 90
val Compression_AllocatedActorRefCompressionId = 91
val Compression_CompressedManifest = 92
val Compression_AllocatedManifestCompressionId = 93
val Compression_Inbound_RunActorRefAdvertisement = 94
val Compression_Inbound_RunClassManifestAdvertisement = 95
val TcpOutbound_Connected = 150
val TcpOutbound_Sent = 151
val TcpInbound_Bound = 170
val TcpInbound_Unbound = 171
val TcpInbound_Connected = 172
val TcpInbound_Received = 173
// Used for presentation of the entries in the flight recorder
lazy val eventDictionary = Map(
Transport_MediaDriverStarted -> "Transport: Media driver started",
Transport_Started -> "Transport: started",
Transport_AeronErrorLogStarted -> "Transport: Aeron error log started",
Transport_TaskRunnerStarted -> "Transport: Task runner started",
Transport_UniqueAddressSet -> "Transport: Unique address set",
Transport_MaterializerStarted -> "Transport: Materializer started",
Transport_StartupFinished -> "Transport: Startup finished",
Transport_OnAvailableImage -> "Transport: onAvailableImage",
Transport_KillSwitchPulled -> "Transport: KillSwitch pulled",
Transport_Stopped -> "Transport: Stopped",
Transport_AeronErrorLogTaskStopped -> "Transport: Aeron errorLog task stopped",
Transport_MediaFileDeleted -> "Transport: Media file deleted",
Transport_FlightRecorderClose -> "Transport: Flight recorder closed",
Transport_SendQueueOverflow -> "Transport: Send queue overflow",
Transport_StopIdleOutbound -> "Transport: Remove idle outbound",
Transport_Quarantined -> "Transport: Quarantined association",
Transport_RemovedQuarantined -> "Transport: Removed idle quarantined association",
Transport_RestartOutbound -> "Transport: Restart outbound",
Transport_RestartInbound -> "Transport: Restart outbound",
// Aeron Sink events
AeronSink_Started -> "AeronSink: Started",
AeronSink_TaskRunnerRemoved -> "AeronSink: Task runner removed",
AeronSink_PublicationClosed -> "AeronSink: Publication closed",
AeronSink_Stopped -> "AeronSink: Stopped",
AeronSink_EnvelopeGrabbed -> "AeronSink: Envelope grabbed",
AeronSink_EnvelopeOffered -> "AeronSink: Envelope offered",
AeronSink_GaveUpEnvelope -> "AeronSink: Gave up envelope",
AeronSink_DelegateToTaskRunner -> "AeronSink: Delegate to task runner",
AeronSink_ReturnFromTaskRunner -> "AeronSink: Return from task runner",
// Aeron Source events
AeronSource_Started -> "AeronSource: Started",
AeronSource_Stopped -> "AeronSource: Stopped",
AeronSource_Received -> "AeronSource: Received",
AeronSource_DelegateToTaskRunner -> "AeronSource: Delegate to task runner",
AeronSource_ReturnFromTaskRunner -> "AeronSource: Return from task runner",
// Compression events
Compression_CompressedActorRef -> "Compression: Compressed ActorRef",
Compression_AllocatedActorRefCompressionId -> "Compression: Allocated ActorRef compression id",
Compression_CompressedManifest -> "Compression: Compressed manifest",
Compression_AllocatedManifestCompressionId -> "Compression: Allocated manifest compression id",
Compression_Inbound_RunActorRefAdvertisement -> "InboundCompression: Run class manifest compression advertisement",
Compression_Inbound_RunClassManifestAdvertisement -> "InboundCompression: Run class manifest compression advertisement",
// TCP outbound events
TcpOutbound_Connected -> "TCP out: Connected",
TcpOutbound_Sent -> "TCP out: Sent message",
// TCP inbound events
TcpInbound_Bound -> "TCP in: Bound",
TcpInbound_Unbound -> "TCP in: Unbound",
TcpInbound_Connected -> "TCP in: New connection",
TcpInbound_Received -> "TCP in: Received message").map {
case (int, str) => int.toLong -> str
}
}

View file

@ -0,0 +1,152 @@
/*
* Copyright (C) 2018-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import java.net.InetSocketAddress
import akka.actor.Address
import akka.actor.ExtendedActorSystem
import akka.actor.Extension
import akka.actor.ExtensionId
import akka.actor.ExtensionIdProvider
import akka.annotation.InternalApi
import akka.remote.UniqueAddress
import akka.util.JavaVersion
import scala.util.Failure
import scala.util.Success
/**
* INTERNAL API
*/
@InternalApi
object RemotingFlightRecorder extends ExtensionId[RemotingFlightRecorder] with ExtensionIdProvider {
override def createExtension(system: ExtendedActorSystem): RemotingFlightRecorder =
if (JavaVersion.majorVersion >= 11 && system.settings.config.getBoolean("akka.java-flight-recorder.enabled")) {
// Dynamic instantiation to not trigger class load on earlier JDKs
system.dynamicAccess.createInstanceFor[RemotingFlightRecorder](
"akka.remote.artery.jfr.JFRRemotingFlightRecorder",
(classOf[ExtendedActorSystem], system) :: Nil) match {
case Success(jfr) => jfr
case Failure(ex) =>
system.log
.warning("Failed to load JFR remoting flight recorder, falling back to noop. Exception: {}", ex.getMessage)
NoOpRemotingFlightRecorder
} // fallback if not possible to dynamically load for some reason
} else
// JFR not available on Java 8
NoOpRemotingFlightRecorder
override def lookup(): ExtensionId[_ <: Extension] = this
}
/**
* INTERNAL API
*/
@InternalApi
private[akka] trait RemotingFlightRecorder extends Extension {
def transportMediaDriverStarted(directoryName: String): Unit
def transportStarted(): Unit
def transportAeronErrorLogStarted(): Unit
def transportTaskRunnerStarted(): Unit
def transportUniqueAddressSet(uniqueAddress: UniqueAddress): Unit
def transportMaterializerStarted(): Unit
def transportStartupFinished(): Unit
def transportKillSwitchPulled(): Unit
def transportStopped(): Unit
def transportAeronErrorLogTaskStopped(): Unit
def transportMediaFileDeleted(): Unit
def transportSendQueueOverflow(queueIndex: Int): Unit
def transportStopIdleOutbound(remoteAddress: Address, queueIndex: Int): Unit
def transportQuarantined(remoteAddress: Address, uid: Long): Unit
def transportRemoveQuarantined(remoteAddress: Address): Unit
def transportRestartOutbound(remoteAddress: Address, streamName: String): Unit
def transportRestartInbound(remoteAddress: UniqueAddress, streamName: String): Unit
def aeronSinkStarted(channel: String, streamId: Int): Unit
def aeronSinkTaskRunnerRemoved(channel: String, streamId: Int): Unit
def aeronSinkPublicationClosed(channel: String, streamId: Int): Unit
def aeronSinkPublicationClosedUnexpectedly(channel: String, streamId: Int): Unit
def aeronSinkStopped(channel: String, streamId: Int): Unit
def aeronSinkEnvelopeGrabbed(lastMessageSize: Int): Unit
def aeronSinkEnvelopeOffered(lastMessageSize: Int): Unit
def aeronSinkGaveUpEnvelope(cause: String): Unit
def aeronSinkDelegateToTaskRunner(countBeforeDelegate: Long): Unit
def aeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit
def aeronSourceStarted(channel: String, streamId: Int): Unit
def aeronSourceStopped(channel: String, streamId: Int): Unit
def aeronSourceReceived(size: Int): Unit
def aeronSourceDelegateToTaskRunner(countBeforeDelegate: Long): Unit
def aeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit
def compressionActorRefAdvertisement(uid: Long): Unit
def compressionClassManifestAdvertisement(uid: Long): Unit
def tcpOutboundConnected(remoteAddress: Address, streamName: String): Unit
def tcpOutboundSent(size: Int): Unit
def tcpInboundBound(bindHost: String, address: InetSocketAddress): Unit
def tcpInboundUnbound(localAddress: UniqueAddress): Unit
def tcpInboundConnected(remoteAddress: InetSocketAddress): Unit
def tcpInboundReceived(size: Int): Unit
}
/**
* JFR is only available under certain circumstances (JDK11 for now, possible OpenJDK 8 in the future) so therefore
* the default on JDK 8 needs to be a no-op flight recorder.
*
* INTERNAL
*/
@InternalApi
private[akka] case object NoOpRemotingFlightRecorder extends RemotingFlightRecorder {
override def transportMediaDriverStarted(directoryName: String): Unit = ()
override def transportStarted(): Unit = ()
override def transportAeronErrorLogStarted(): Unit = ()
override def transportTaskRunnerStarted(): Unit = ()
override def transportUniqueAddressSet(uniqueAddress: UniqueAddress): Unit = ()
override def transportMaterializerStarted(): Unit = ()
override def transportStartupFinished(): Unit = ()
override def transportKillSwitchPulled(): Unit = ()
override def transportStopped(): Unit = ()
override def transportAeronErrorLogTaskStopped(): Unit = ()
override def transportMediaFileDeleted(): Unit = ()
override def transportStopIdleOutbound(remoteAddress: Address, queueIndex: Int): Unit = ()
override def transportQuarantined(remoteAddress: Address, uid: Long): Unit = ()
override def transportRemoveQuarantined(remoteAddress: Address): Unit = ()
override def transportRestartOutbound(remoteAddress: Address, streamName: String): Unit = ()
override def transportRestartInbound(remoteAddress: UniqueAddress, streamName: String): Unit = ()
override def transportSendQueueOverflow(queueIndex: Int): Unit = ()
override def aeronSinkStarted(channel: String, streamId: Int): Unit = ()
override def aeronSinkTaskRunnerRemoved(channel: String, streamId: Int): Unit = ()
override def aeronSinkPublicationClosed(channel: String, streamId: Int): Unit = ()
override def aeronSinkPublicationClosedUnexpectedly(channel: String, streamId: Int): Unit = ()
override def aeronSinkStopped(channel: String, streamId: Int): Unit = ()
override def aeronSinkEnvelopeGrabbed(lastMessageSize: Int): Unit = ()
override def aeronSinkEnvelopeOffered(lastMessageSize: Int): Unit = ()
override def aeronSinkGaveUpEnvelope(cause: String): Unit = ()
override def aeronSinkDelegateToTaskRunner(countBeforeDelegate: Long): Unit = ()
override def aeronSinkReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit = ()
override def aeronSourceStarted(channel: String, streamId: Int): Unit = ()
override def aeronSourceStopped(channel: String, streamId: Int): Unit = ()
override def aeronSourceReceived(size: Int): Unit = ()
override def aeronSourceDelegateToTaskRunner(countBeforeDelegate: Long): Unit = ()
override def aeronSourceReturnFromTaskRunner(nanosSinceTaskStartTime: Long): Unit = ()
override def compressionActorRefAdvertisement(uid: Long): Unit = ()
override def compressionClassManifestAdvertisement(uid: Long): Unit = ()
override def tcpOutboundConnected(remoteAddress: Address, streamName: String): Unit = ()
override def tcpOutboundSent(size: Int): Unit = ()
override def tcpInboundBound(bindHost: String, address: InetSocketAddress): Unit = ()
override def tcpInboundUnbound(localAddress: UniqueAddress): Unit = ()
override def tcpInboundConnected(remoteAddress: InetSocketAddress): Unit = ()
override def tcpInboundReceived(size: Int): Unit = ()
}

View file

@ -93,11 +93,10 @@ private[remote] class AeronSink(
taskRunner: TaskRunner,
pool: EnvelopeBufferPool,
giveUpAfter: Duration,
flightRecorder: EventSink)
flightRecorder: RemotingFlightRecorder)
extends GraphStageWithMaterializedValue[SinkShape[EnvelopeBuffer], Future[Done]] {
import AeronSink._
import TaskRunner._
import FlightRecorderEvents._
val in: Inlet[EnvelopeBuffer] = Inlet("AeronSink")
override val shape: SinkShape[EnvelopeBuffer] = SinkShape(in)
@ -129,23 +128,21 @@ private[remote] class AeronSink(
private var delegateTaskStartTime = 0L
private var countBeforeDelegate = 0L
private val channelMetadata = channel.getBytes("US-ASCII")
override def preStart(): Unit = {
setKeepGoing(true)
pull(in)
// TODO: Identify different sinks!
flightRecorder.loFreq(AeronSink_Started, channelMetadata)
flightRecorder.aeronSinkStarted(channel, streamId)
}
override def postStop(): Unit = {
try {
taskRunner.command(Remove(addOfferTask.task))
flightRecorder.loFreq(AeronSink_TaskRunnerRemoved, channelMetadata)
flightRecorder.aeronSinkTaskRunnerRemoved(channel, streamId)
pub.close()
flightRecorder.loFreq(AeronSink_PublicationClosed, channelMetadata)
flightRecorder.aeronSinkPublicationClosed(channel, streamId)
} finally {
flightRecorder.loFreq(AeronSink_Stopped, channelMetadata)
flightRecorder.aeronSinkStopped(channel, streamId)
completed.complete(completedValue)
}
}
@ -155,7 +152,7 @@ private[remote] class AeronSink(
envelopeInFlight = grab(in)
backoffCount = spinning
lastMsgSize = envelopeInFlight.byteBuffer.limit
flightRecorder.hiFreq(AeronSink_EnvelopeGrabbed, lastMsgSize)
flightRecorder.aeronSinkEnvelopeGrabbed(lastMsgSize)
publish()
}
@ -188,17 +185,18 @@ private[remote] class AeronSink(
offerTask.msgSize = lastMsgSize
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addOfferTask)
flightRecorder.hiFreq(AeronSink_DelegateToTaskRunner, countBeforeDelegate)
flightRecorder.aeronSinkDelegateToTaskRunner(countBeforeDelegate)
}
private def taskOnOfferSuccess(): Unit = {
countBeforeDelegate = 0
flightRecorder.hiFreq(AeronSink_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime)
// FIXME does calculation belong here or in impl?
flightRecorder.aeronSinkReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime)
onOfferSuccess()
}
private def onOfferSuccess(): Unit = {
flightRecorder.hiFreq(AeronSink_EnvelopeOffered, lastMsgSize)
flightRecorder.aeronSinkEnvelopeOffered(lastMsgSize)
offerTaskInProgress = false
pool.release(envelopeInFlight)
offerTask.buffer = null
@ -213,7 +211,7 @@ private[remote] class AeronSink(
private def onGiveUp(): Unit = {
offerTaskInProgress = false
val cause = new GaveUpMessageException(s"Gave up sending message to $channel after ${giveUpAfter.pretty}.")
flightRecorder.alert(AeronSink_GaveUpEnvelope, cause.toString.getBytes("US-ASCII"))
flightRecorder.aeronSinkGaveUpEnvelope(cause.getMessage)
completedValue = Failure(cause)
failStage(cause)
}
@ -222,7 +220,7 @@ private[remote] class AeronSink(
offerTaskInProgress = false
val cause = new PublicationClosedException(s"Aeron Publication to [${channel}] was closed.")
// this is not exepected, since we didn't close the publication ourselves
flightRecorder.alert(AeronSink_PublicationClosed, channelMetadata)
flightRecorder.aeronSinkPublicationClosedUnexpectedly(channel, streamId)
completedValue = Failure(cause)
failStage(cause)
}

View file

@ -85,13 +85,12 @@ private[remote] class AeronSource(
aeron: Aeron,
taskRunner: TaskRunner,
pool: EnvelopeBufferPool,
flightRecorder: EventSink,
flightRecorder: RemotingFlightRecorder,
spinning: Int)
extends GraphStageWithMaterializedValue[SourceShape[EnvelopeBuffer], AeronSource.AeronLifecycle] {
import AeronSource._
import TaskRunner._
import FlightRecorderEvents._
val out: Outlet[EnvelopeBuffer] = Outlet("AeronSource")
override val shape: SourceShape[EnvelopeBuffer] = SourceShape(out)
@ -108,8 +107,6 @@ private[remote] class AeronSource(
private val messageHandler = new MessageHandler(pool)
private val addPollTask: Add = Add(pollTask(subscription, messageHandler, getAsyncCallback(taskOnMessage)))
private val channelMetadata = channel.getBytes("US-ASCII")
private var delegatingToTaskRunner = false
private var pendingUnavailableImages: List[Int] = Nil
@ -124,7 +121,7 @@ private[remote] class AeronSource(
override protected def logSource = classOf[AeronSource]
override def preStart(): Unit = {
flightRecorder.loFreq(AeronSource_Started, channelMetadata)
flightRecorder.aeronSourceStarted(channel, streamId)
}
override def postStop(): Unit = {
@ -134,7 +131,7 @@ private[remote] class AeronSource(
case e: DriverTimeoutException =>
// media driver was shutdown
log.debug("DriverTimeout when closing subscription. {}", e)
} finally flightRecorder.loFreq(AeronSource_Stopped, channelMetadata)
} finally flightRecorder.aeronSourceStopped(channel, streamId)
}
// OutHandler
@ -161,7 +158,7 @@ private[remote] class AeronSource(
subscriberLoop() // recursive
} else {
// delegate backoff to shared TaskRunner
flightRecorder.hiFreq(AeronSource_DelegateToTaskRunner, countBeforeDelegate)
flightRecorder.aeronSourceDelegateToTaskRunner(countBeforeDelegate)
delegatingToTaskRunner = true
delegateTaskStartTime = System.nanoTime()
taskRunner.command(addPollTask)
@ -178,13 +175,14 @@ private[remote] class AeronSource(
private def taskOnMessage(data: EnvelopeBuffer): Unit = {
countBeforeDelegate = 0
delegatingToTaskRunner = false
flightRecorder.hiFreq(AeronSource_ReturnFromTaskRunner, System.nanoTime() - delegateTaskStartTime)
// FIXME push calculation to JFR?
flightRecorder.aeronSourceReturnFromTaskRunner(System.nanoTime() - delegateTaskStartTime)
freeSessionBuffers()
onMessage(data)
}
private def onMessage(data: EnvelopeBuffer): Unit = {
flightRecorder.hiFreq(AeronSource_Received, data.byteBuffer.limit)
flightRecorder.aeronSourceReceived(data.byteBuffer.limit())
push(out, data)
}

View file

@ -56,7 +56,6 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
import AeronSource.AeronLifecycle
import ArteryTransport._
import Decoder.InboundCompressionAccess
import FlightRecorderEvents._
override type LifeCycle = AeronLifecycle
@ -74,12 +73,12 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
startMediaDriver()
startAeron()
startAeronErrorLog()
topLevelFlightRecorder.loFreq(Transport_AeronErrorLogStarted, NoMetaData)
flightRecorder.transportAeronErrorLogStarted()
if (settings.Advanced.Aeron.LogAeronCounters) {
startAeronCounterLog()
}
taskRunner.start()
topLevelFlightRecorder.loFreq(Transport_TaskRunnerStarted, NoMetaData)
flightRecorder.transportTaskRunnerStarted()
}
private def startMediaDriver(): Unit = {
@ -131,7 +130,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
val driver = MediaDriver.launchEmbedded(driverContext)
log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName)
topLevelFlightRecorder.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName())
flightRecorder.transportMediaDriverStarted(driver.aeronDirectoryName())
if (!mediaDriver.compareAndSet(None, Some(driver))) {
throw new IllegalStateException("media driver started more than once")
}
@ -158,7 +157,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
try {
if (settings.Advanced.Aeron.DeleteAeronDirectory) {
IoUtil.delete(new File(driver.aeronDirectoryName), false)
topLevelFlightRecorder.loFreq(Transport_MediaFileDeleted, NoMetaData)
flightRecorder.transportMediaFileDeleted()
}
} catch {
case NonFatal(e) =>
@ -307,7 +306,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
taskRunner,
bufferPool,
giveUpAfter,
IgnoreEventSink))
flightRecorder))
}
private def aeronSource(
@ -315,7 +314,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
pool: EnvelopeBufferPool,
inboundChannel: String): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] =
Source.fromGraph(
new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, IgnoreEventSink, aeronSourceSpinningStrategy))
new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, flightRecorder, aeronSourceSpinningStrategy))
private def aeronSourceSpinningStrategy: Int =
if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365
@ -451,10 +450,10 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro
taskRunner
.stop()
.map { _ =>
topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData)
flightRecorder.transportStopped()
if (aeronErrorLogTask != null) {
aeronErrorLogTask.cancel()
topLevelFlightRecorder.loFreq(Transport_AeronErrorLogTaskStopped, NoMetaData)
flightRecorder.transportAeronErrorLogTaskStopped()
}
if (aeron != null) aeron.close()
if (aeronErrorLog != null) aeronErrorLog.close()

View file

@ -57,7 +57,7 @@ private[remote] final class InboundCompressionsImpl(
system: ActorSystem,
inboundContext: InboundContext,
settings: ArterySettings.Compression,
eventSink: EventSink = IgnoreEventSink)
flightRecorder: RemotingFlightRecorder = NoOpRemotingFlightRecorder)
extends InboundCompressions {
private[this] val _actorRefsIns = new Long2ObjectHashMap[InboundActorRefCompression]()
@ -108,7 +108,7 @@ private[remote] final class InboundCompressionsImpl(
val inbound = vs.next()
inboundContext.association(inbound.originUid) match {
case OptionVal.Some(a) if !a.associationState.isQuarantined(inbound.originUid) =>
eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunActorRefAdvertisement, inbound.originUid)
flightRecorder.compressionActorRefAdvertisement(inbound.originUid)
inbound.runNextTableAdvertisement()
case _ => remove :+= inbound.originUid
}
@ -140,7 +140,7 @@ private[remote] final class InboundCompressionsImpl(
val inbound = vs.next()
inboundContext.association(inbound.originUid) match {
case OptionVal.Some(a) if !a.associationState.isQuarantined(inbound.originUid) =>
eventSink.hiFreq(FlightRecorderEvents.Compression_Inbound_RunClassManifestAdvertisement, inbound.originUid)
flightRecorder.compressionClassManifestAdvertisement(inbound.originUid)
inbound.runNextTableAdvertisement()
case _ => remove :+= inbound.originUid
}

View file

@ -74,7 +74,6 @@ private[remote] class ArteryTcpTransport(
extends ArteryTransport(_system, _provider) {
import ArteryTransport._
import ArteryTcpTransport._
import FlightRecorderEvents._
override type LifeCycle = NotUsed
@ -118,8 +117,6 @@ private[remote] class ArteryTcpTransport(
bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = {
implicit val sys: ActorSystem = system
val afr = IgnoreEventSink
val host = outboundContext.remoteAddress.host.get
val port = outboundContext.remoteAddress.port.get
val remoteAddress = InetSocketAddress.createUnresolved(host, port)
@ -174,10 +171,7 @@ private[remote] class ArteryTcpTransport(
.via(Flow.lazyFlow(() => {
// only open the actual connection if any new messages are sent
logConnect()
afr.loFreq(
TcpOutbound_Connected,
s"${outboundContext.remoteAddress.host.get}:${outboundContext.remoteAddress.port.get} " +
s"/ ${streamName(streamId)}")
flightRecorder.tcpOutboundConnected(outboundContext.remoteAddress, streamName(streamId))
if (controlIdleKillSwitch.isDefined)
outboundContext.asInstanceOf[Association].setControlIdleKillSwitch(controlIdleKillSwitch)
@ -220,7 +214,7 @@ private[remote] class ArteryTcpTransport(
Flow[EnvelopeBuffer]
.map { env =>
val size = env.byteBuffer.limit()
afr.hiFreq(TcpOutbound_Sent, size)
flightRecorder.tcpOutboundSent(size)
// TODO Possible performance improvement, could we reduce the copying of bytes?
val bytes = ByteString(env.byteBuffer)
@ -260,12 +254,9 @@ private[remote] class ArteryTcpTransport(
val binding = serverBinding match {
case None =>
val afr = IgnoreEventSink
val binding = connectionSource
.to(Sink.foreach { connection =>
afr.loFreq(
TcpInbound_Connected,
s"${connection.remoteAddress.getHostString}:${connection.remoteAddress.getPort}")
flightRecorder.tcpInboundConnected(connection.remoteAddress)
inboundConnectionFlow.map(connection.handleWith(_))(sys.dispatcher)
})
.run()
@ -280,7 +271,7 @@ private[remote] class ArteryTcpTransport(
// only on initial startup, when ActorSystem is starting
val b = Await.result(binding, settings.Bind.BindTimeout)
afr.loFreq(TcpInbound_Bound, s"$bindHost:${b.localAddress.getPort}")
flightRecorder.tcpInboundBound(bindHost, b.localAddress)
b
case Some(binding) =>
// already bound, when restarting
@ -351,7 +342,7 @@ private[remote] class ArteryTcpTransport(
Flow[ByteString]
.via(inboundKillSwitch.flow)
// must create new FlightRecorder event sink for each connection because they can't be shared
.via(new TcpFraming)
.via(new TcpFraming(flightRecorder))
.alsoTo(inboundStream)
.filter(_ => false) // don't send back anything in this TCP socket
.map(_ => ByteString.empty) // make it a Flow[ByteString] again
@ -490,7 +481,7 @@ private[remote] class ArteryTcpTransport(
implicit val ec = system.dispatchers.internalDispatcher
inboundKillSwitch.shutdown()
unbind().map { _ =>
topLevelFlightRecorder.loFreq(Transport_Stopped, NoMetaData)
flightRecorder.transportStopped()
Done
}
}
@ -502,9 +493,7 @@ private[remote] class ArteryTcpTransport(
for {
_ <- binding.unbind()
} yield {
topLevelFlightRecorder.loFreq(
TcpInbound_Bound,
s"${localAddress.address.host.get}:${localAddress.address.port}")
flightRecorder.tcpInboundUnbound(localAddress)
Done
}
case None =>

View file

@ -9,7 +9,6 @@ import java.nio.ByteBuffer
import java.nio.ByteOrder
import akka.annotation.InternalApi
import akka.remote.artery.FlightRecorderEvents.TcpInbound_Received
import akka.stream.Attributes
import akka.stream.impl.io.ByteStringParser
import akka.stream.impl.io.ByteStringParser.ByteReader
@ -59,9 +58,8 @@ import akka.util.ByteString
/**
* INTERNAL API
*/
@InternalApi private[akka] class TcpFraming extends ByteStringParser[EnvelopeBuffer] {
val flightRecorder = IgnoreEventSink
@InternalApi private[akka] class TcpFraming(flightRecorder: RemotingFlightRecorder = NoOpRemotingFlightRecorder)
extends ByteStringParser[EnvelopeBuffer] {
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic {
@ -97,7 +95,7 @@ import akka.util.ByteString
private def createBuffer(bs: ByteString): EnvelopeBuffer = {
val buffer = ByteBuffer.wrap(bs.toArray)
buffer.order(ByteOrder.LITTLE_ENDIAN)
flightRecorder.hiFreq(TcpInbound_Received, buffer.limit)
flightRecorder.tcpInboundReceived(buffer.limit)
val res = new EnvelopeBuffer(buffer)
res.setStreamId(streamId)
res

View file

@ -0,0 +1,41 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery.jfr
import akka.actor.ActorSystem
import akka.remote.artery.NoOpRemotingFlightRecorder
import akka.remote.artery.RemotingFlightRecorder
import akka.testkit.AkkaSpec
import akka.testkit.TestKit
import com.typesafe.config.ConfigFactory
class JFRRemotingFlightRecorderSpec extends AkkaSpec {
"The RemotingFlightRecorder" must {
"use the JFR one on Java 11" in {
val extension = RemotingFlightRecorder(system)
extension shouldBe a[JFRRemotingFlightRecorder]
extension.transportStopped() // try to actually report something and see that it doesn't throw or something
}
"be disabled if configured to" in {
val system = ActorSystem(
"JFRRemotingFlightRecorderSpec-2",
ConfigFactory.parseString(
"""
akka.java-flight-recorder.enabled = false
"""))
try {
val extension = RemotingFlightRecorder(system)
extension should === (NoOpRemotingFlightRecorder)
} finally {
TestKit.shutdownActorSystem(system)
}
}
}
}

View file

@ -0,0 +1,22 @@
/*
* Copyright (C) 2009-2019 Lightbend Inc. <https://www.lightbend.com>
*/
package akka.remote.artery
import akka.testkit.AkkaSpec
import akka.util.JavaVersion
import org.scalatest.Matchers
class RemotingFlightRecorderSpec extends AkkaSpec with Matchers {
"The RemotingFlightRecorder" must {
"use the no-op recorder by default when running on JDK 8" in {
val extension = RemotingFlightRecorder(system)
if (JavaVersion.majorVersion < 11)
extension should ===(NoOpRemotingFlightRecorder)
}
}
}

View file

@ -58,7 +58,7 @@ class AeronSinkSpec extends AkkaSpec("""
val channel = s"aeron:udp?endpoint=localhost:$port"
Source
.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, IgnoreEventSink, 0))
.fromGraph(new AeronSource(channel, 1, aeron, taskRunner, pool, NoOpRemotingFlightRecorder, 0))
// fail receiver stream on first message
.map(_ => throw new RuntimeException("stop") with NoStackTrace)
.runWith(Sink.ignore)
@ -73,7 +73,7 @@ class AeronSinkSpec extends AkkaSpec("""
envelope.byteBuffer.flip()
envelope
}
.runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, IgnoreEventSink))
.runWith(new AeronSink(channel, 1, aeron, taskRunner, pool, 500.millis, NoOpRemotingFlightRecorder))
// without the give up timeout the stream would not complete/fail
intercept[GaveUpMessageException] {

View file

@ -311,6 +311,7 @@ lazy val remote =
.settings(OSGi.remote)
.settings(Protobuf.settings)
.settings(parallelExecution in Test := false)
.enablePlugins(Jdk9)
lazy val remoteTests = akkaModule("akka-remote-tests")
.dependsOn(

View file

@ -12,7 +12,7 @@ object Jdk9 extends AutoPlugin {
val CompileJdk9 = config("CompileJdk9").extend(Compile)
val TestJdk9 = config("TestJdk9").extend(Test)
val TestJdk9 = config("TestJdk9").extend(Test).extend(CompileJdk9)
val SCALA_SOURCE_DIRECTORY = "scala-jdk-9"
val SCALA_TEST_SOURCE_DIRECTORY = "scala-jdk9-only"
@ -22,25 +22,23 @@ object Jdk9 extends AutoPlugin {
val compileJdk9Settings = Seq(
// following the scala-2.12, scala-sbt-1.0, ... convention
unmanagedSourceDirectories := notOnJdk8(
Seq(
(Compile / sourceDirectory).value / SCALA_SOURCE_DIRECTORY,
(Compile / sourceDirectory).value / JAVA_SOURCE_DIRECTORY)),
Seq(
(Compile / sourceDirectory).value / SCALA_SOURCE_DIRECTORY,
(Compile / sourceDirectory).value / JAVA_SOURCE_DIRECTORY)),
scalacOptions := AkkaBuild.DefaultScalacOptions ++ notOnJdk8(Seq("-release", "11")),
javacOptions := AkkaBuild.DefaultJavacOptions ++ notOnJdk8(Seq("--release", "11")))
val testJdk9Settings = Seq(
// following the scala-2.12, scala-sbt-1.0, ... convention
unmanagedSourceDirectories := notOnJdk8(
Seq(
(Test / sourceDirectory).value / SCALA_TEST_SOURCE_DIRECTORY,
(Test / sourceDirectory).value / JAVA_TEST_SOURCE_DIRECTORY)),
Seq(
(Test / sourceDirectory).value / SCALA_TEST_SOURCE_DIRECTORY,
(Test / sourceDirectory).value / JAVA_TEST_SOURCE_DIRECTORY)),
scalacOptions := AkkaBuild.DefaultScalacOptions ++ notOnJdk8(Seq("-release", "11")),
javacOptions := AkkaBuild.DefaultJavacOptions ++ notOnJdk8(Seq("--release", "11")),
compile := compile.dependsOn(CompileJdk9 / compile).value,
classpathConfiguration := TestJdk9,
externalDependencyClasspath := (externalDependencyClasspath in Test).value
)
externalDependencyClasspath := (externalDependencyClasspath in Test).value)
val compileSettings = Seq(
// It might have been more 'neat' to add the jdk9 products to the jar via packageBin/mappings, but that doesn't work with the OSGi plugin,
@ -49,6 +47,11 @@ object Jdk9 extends AutoPlugin {
// ++= (CompileJdk9 / products).value.flatMap(Path.allSubpaths),
Compile / fullClasspath ++= (CompileJdk9 / exportedProducts).value)
val testSettings = Seq((Test / test) := {
(Test / test).value
(TestJdk9 / test).value
})
override def trigger = noTrigger
override def projectConfigurations = Seq(CompileJdk9)
override lazy val projectSettings =
@ -56,5 +59,6 @@ object Jdk9 extends AutoPlugin {
inConfig(CompileJdk9)(compileJdk9Settings) ++
compileSettings ++
inConfig(TestJdk9)(Defaults.testSettings) ++
inConfig(TestJdk9)(testJdk9Settings)
inConfig(TestJdk9)(testJdk9Settings) ++
testSettings
}