diff --git a/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala b/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala deleted file mode 100644 index 0b79050202..0000000000 --- a/akka-bench-jmh/src/main/scala/akka/remote/artery/FlightRecorderBench.scala +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.remote.artery - -import java.io.File -import java.nio.channels.FileChannel -import java.nio.file.StandardOpenOption -import java.util.concurrent.{ CountDownLatch, TimeUnit } -import java.util.concurrent.TimeUnit - -import org.openjdk.jmh.annotations.{ OperationsPerInvocation, _ } - -@State(Scope.Benchmark) -@OutputTimeUnit(TimeUnit.MICROSECONDS) -@BenchmarkMode(Array(Mode.Throughput)) -class FlightRecorderBench { - - @Param(Array("1", "5", "10")) - var writers: Int = 0 - - val Writes = 10000000 - - private var file: File = _ - private var fileChannel: FileChannel = _ - private var recorder: FlightRecorder = _ - - @Setup - def setup(): Unit = { - file = File.createTempFile("akka-flightrecorder", "dat") - file.deleteOnExit() - fileChannel = - FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ) - recorder = new FlightRecorder(fileChannel) - } - - @TearDown - def shutdown(): Unit = { - fileChannel.force(false) - recorder.close() - fileChannel.close() - file.delete() - } - - @Benchmark - @OperationsPerInvocation(10000000) - def flight_recorder_writes(): Unit = { - val latch = new CountDownLatch(writers) - (1 to writers).foreach { _ => - val sink = recorder.createEventSink() - new Thread { - override def run(): Unit = { - var i = Writes - while (i > 0) { - sink.hiFreq(16, 16) - i -= 1 - } - latch.countDown() - } - }.run() - } - - latch.await() - } - -} diff --git a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala index 816547ee60..b1d5758174 100644 --- a/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala +++ b/akka-cluster-typed/src/multi-jvm/scala/akka/cluster/typed/MultiNodeTypedClusterSpec.scala @@ -6,24 +6,19 @@ package akka.cluster.typed import java.util.concurrent.ConcurrentHashMap -import akka.actor.{ Address, Scheduler } import akka.actor.typed.ActorSystem -import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec } -import akka.testkit.WatchedByCoroner -import org.scalatest.{ Matchers, Suite } import akka.actor.typed.scaladsl.adapter._ +import akka.actor.{ Address, Scheduler } import akka.cluster.{ ClusterEvent, MemberStatus } import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeSpec, STMultiNodeSpec } +import akka.testkit.WatchedByCoroner +import org.scalatest.{ Matchers, Suite } import scala.concurrent.duration._ import scala.language.implicitConversions -trait MultiNodeTypedClusterSpec - extends Suite - with STMultiNodeSpec - with WatchedByCoroner - with FlightRecordingSupport - with Matchers { +trait MultiNodeTypedClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with Matchers { self: MultiNodeSpec => override def initialParticipants: Int = roles.size diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index a93c2b2af9..5b447042fb 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -5,30 +5,26 @@ package akka.cluster import java.util.UUID - -import language.implicitConversions - -import org.scalatest.{ Canceled, Outcome, Suite } -import org.scalatest.exceptions.TestCanceledException -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec } -import akka.testkit._ -import akka.testkit.TestEvent._ -import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, PoisonPill, Props, RootActorPath } -import akka.event.Logging.ErrorLevel -import akka.util.ccompat._ -import scala.concurrent.duration._ -import scala.collection.immutable import java.util.concurrent.ConcurrentHashMap -import akka.remote.DefaultFailureDetectorRegistry +import akka.actor.{ Actor, ActorRef, ActorSystem, Address, Deploy, PoisonPill, Props, RootActorPath } import akka.cluster.ClusterEvent.{ MemberEvent, MemberRemoved } -import akka.util.ccompat._ -import scala.concurrent.Await - +import akka.event.Logging.ErrorLevel +import akka.remote.DefaultFailureDetectorRegistry +import akka.remote.testconductor.RoleName +import akka.remote.testkit.{ MultiNodeSpec, STMultiNodeSpec } import akka.serialization.jackson.CborSerializable +import akka.testkit.TestEvent._ +import akka.testkit._ +import akka.util.ccompat._ +import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.exceptions.TestCanceledException +import org.scalatest.{ Canceled, Outcome, Suite } + +import scala.collection.immutable +import scala.concurrent.Await +import scala.concurrent.duration._ +import scala.language.implicitConversions @ccompatUsedUntil213 object MultiNodeClusterSpec { @@ -99,7 +95,7 @@ object MultiNodeClusterSpec { } } -trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport { +trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec => override def initialParticipants = roles.size @@ -115,10 +111,6 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro override protected def afterTermination(): Unit = { self.afterTermination() stopCoroner() - if (failed || sys.props.get("akka.remote.artery.always-dump-flight-recorder").isDefined) { - printFlightRecording() - } - deleteFlightRecorderFile() } override def expectedTestDuration = 60.seconds diff --git a/akka-docs/src/main/paradox/remoting-artery.md b/akka-docs/src/main/paradox/remoting-artery.md index fed9e88778..723f3f718e 100644 --- a/akka-docs/src/main/paradox/remoting-artery.md +++ b/akka-docs/src/main/paradox/remoting-artery.md @@ -789,45 +789,6 @@ to be noted though that during a continuously high-throughput period this settin as the thread mostly has tasks to execute. This also means that under high throughput (but below maximum capacity) the system might have less latency than at low message rates. -## Internal Event Log for Debugging (Flight Recorder) - -@@@ note - -In this version ($akka.version$) the flight-recorder is disabled by default because there is no automatic -file name and path calculation implemented to make it possible to reuse the same file for every restart of -the same actor system without clashing with files produced by other systems (possibly running on the same machine). -Currently, you have to set the path and file names yourself to avoid creating an unbounded number -of files and enable flight recorder manually by adding *akka.remote.artery.advanced.flight-recorder.enabled=on* to -your configuration file. This a limitation of the current version and will not be necessary in the future. - -@@@ - -Emitting event information (logs) from internals is always a trade off. The events that are usable for -the Akka developers are usually too low level to be of any use for users and usually need to be fine-grained enough -to provide enough information to be able to debug issues in the internal implementation. This usually means that -these logs are hidden behind special flags and emitted at low log levels to not clutter the log output of the user -system. Unfortunately this means that during production or integration testing these flags are usually off and -events are not available when an actual failure happens - leaving maintainers in the dark about details of the event. -To solve this contradiction, remoting has an internal, high-performance event store for debug events which is always on. -This log and the events that it contains are highly specialized and not directly exposed to users, their primary purpose -is to help the maintainers of Akka to identify and solve issues discovered during daily usage. When you encounter -production issues involving remoting, you can include the flight recorder log file in your bug report to give us -more insight into the nature of the failure. - -There are various important features of this event log: - - * Flight Recorder produces a fixed size file completely encapsulating log rotation. This means that this -file will never grow in size and will not cause any unexpected disk space shortage in production. - * This file is crash resistant, i.e. its contents can be recovered even if the JVM hosting the `ActorSystem` -crashes unexpectedly. - * Very low overhead, specialized, binary logging that has no significant overhead and can be safely left enabled -for production systems. - -The location of the file can be controlled via the *akka.remote.artery.advanced.flight-recorder.destination* setting (see -@ref:[akka-remote (artery)](general/configuration.md#config-akka-remote-artery) for details). By default, a file with the *.afr* extension is produced in the temporary -directory of the operating system. In cases where the flight recorder casuses issues, it can be disabled by adding the -setting *akka.remote.artery.advanced.flight-recorder.enabled=off*, although this is not recommended. - ## Remote Configuration diff --git a/akka-multi-node-testkit/src/main/mima-filters/2.5.x.backwards.excludes b/akka-multi-node-testkit/src/main/mima-filters/2.5.x.backwards.excludes index c6fdef5fd5..aba4546843 100644 --- a/akka-multi-node-testkit/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-multi-node-testkit/src/main/mima-filters/2.5.x.backwards.excludes @@ -1,3 +1,6 @@ # Protobuf 3 ProblemFilters.exclude[Problem]("akka.remote.testconductor.TestConductorProtocol*") ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.testconductor.ProtobufDecoder.this") + +# Remove AFR #27581 +ProblemFilters.exclude[MissingClassProblem]("akka.remote.testkit.FlightRecordingSupport") \ No newline at end of file diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/FlightRecordingSupport.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/FlightRecordingSupport.scala deleted file mode 100644 index 3486a9455b..0000000000 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/FlightRecordingSupport.scala +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.remote.testkit - -import java.nio.file.{ FileSystems, Files, Path } - -import akka.remote.RARP -import akka.remote.artery.FlightRecorderReader - -/** - * Provides test framework agnostic methods to dump the artery flight recorder data after a test has completed - you - * must integrate the logic with the testing tool you use yourself. - * - * The flight recorder must be enabled and the flight recorder destination must be an absolute file name so - * that the akka config can be used to find it. For example you could ensure a unique file per test using - * something like this in your config: - * {{{ - * akka.remote.artery.advanced.flight-recorder { - * enabled=on - * destination=target/flight-recorder-${UUID.randomUUID().toString}.afr - * } - * }}} - * - * You need to hook in dump and deletion of files where it makes sense in your tests. (For example, dump after all tests has - * run and there was a failure and then delete) - */ -trait FlightRecordingSupport { self: MultiNodeSpec => - private lazy val arteryEnabled = - RARP(system).provider.remoteSettings.Artery.Enabled - private lazy val flightRecorderFile: Path = - FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination) - - /** - * Delete flight the recorder file if it exists - */ - final protected def deleteFlightRecorderFile(): Unit = { - if (arteryEnabled && destinationIsValidForDump() && Files.exists(flightRecorderFile)) { - Files.delete(flightRecorderFile) - } - } - - /** - * Dump the contents of the flight recorder file to standard output - */ - final protected def printFlightRecording(): Unit = { - if (arteryEnabled && destinationIsValidForDump() && Files.exists(flightRecorderFile)) { - // use stdout/println as we do not know if the system log is alive - println(s"Flight recorder dump from '$flightRecorderFile':") - FlightRecorderReader.dumpToStdout(flightRecorderFile) - } - } - - private def destinationIsValidForDump() = { - val path = flightRecorderFile.toString - path != "" && path.endsWith(".afr") - } - -} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala index 66a653c22e..3a788497b3 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemotingMultiNodeSpec.scala @@ -7,10 +7,10 @@ package akka.remote import java.util.UUID import akka.remote.artery.ArterySpecSupport -import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.testkit.{ DefaultTimeout, ImplicitSender } import com.typesafe.config.ConfigFactory -import org.scalatest.{ Outcome, Suite } +import org.scalatest.Suite object RemotingMultiNodeSpec { @@ -29,23 +29,7 @@ abstract class RemotingMultiNodeSpec(config: MultiNodeConfig) extends MultiNodeSpec(config) with Suite with STMultiNodeSpec - with FlightRecordingSupport with ImplicitSender with DefaultTimeout { self: MultiNodeSpec => - // Keep track of failure so we can print artery flight recording on failure - private var failed = false - final override protected def withFixture(test: NoArgTest): Outcome = { - val out = super.withFixture(test) - if (!out.isSucceeded) - failed = true - out - } - - override def afterTermination(): Unit = { - if (failed || sys.props.get("akka.remote.artery.always-dump-flight-recorder").isDefined) { - printFlightRecording() - } - deleteFlightRecorderFile() - } } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala index b9496d3ae4..9c79c88093 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanInThrougputSpec.scala @@ -6,15 +6,15 @@ package akka.remote.artery import java.util.concurrent.Executors -import scala.concurrent.duration._ import akka.actor._ -import akka.remote.{ RemoteActorRefProvider, RemotingMultiNodeSpec } +import akka.remote.artery.MaxThroughputSpec._ import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.PerfFlamesSupport +import akka.remote.testkit.{ MultiNodeConfig, PerfFlamesSupport } +import akka.remote.{ RemoteActorRefProvider, RemotingMultiNodeSpec } import akka.testkit._ import com.typesafe.config.ConfigFactory -import akka.remote.artery.MaxThroughputSpec._ + +import scala.concurrent.duration._ object FanInThroughputSpec extends MultiNodeConfig { val totalNumberOfNodes = @@ -129,9 +129,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput runOn(roles.head) { val rep = reporter(testName) val receivers = (1 to sendingNodes.size).map { n => - system.actorOf( - receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1, senderReceiverPairs), - receiverName + "-" + n) + system.actorOf(receiverProps(rep, payloadSize, senderReceiverPairs), receiverName + "-" + n) } enterBarrier(receiverName + "-started") enterBarrier(testName + "-done") @@ -151,13 +149,7 @@ abstract class FanInThroughputSpec extends RemotingMultiNodeSpec(FanInThroughput val receiver = receivers(idx) val plotProbe = TestProbe() val snd = system.actorOf( - senderProps( - receiver, - receivers, - testSettings, - plotProbe.ref, - printTaskRunnerMetrics = idx == 0, - resultReporter), + senderProps(receiver, receivers, testSettings, plotProbe.ref, resultReporter), testName + "-snd" + idx) val terminationProbe = TestProbe() terminationProbe.watch(snd) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala index 4f6a8cb800..dc6be2db56 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/FanOutThrougputSpec.scala @@ -127,9 +127,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp runOn(targetNodes: _*) { val rep = reporter(testName) - val receiver = system.actorOf( - receiverProps(rep, payloadSize, printTaskRunnerMetrics = true, senderReceiverPairs), - receiverName) + val receiver = system.actorOf(receiverProps(rep, payloadSize, senderReceiverPairs), receiverName) enterBarrier(receiverName + "-started") enterBarrier(testName + "-done") receiver ! PoisonPill @@ -143,13 +141,7 @@ abstract class FanOutThroughputSpec extends RemotingMultiNodeSpec(FanOutThroughp val receiver = receivers(i) val plotProbe = TestProbe() val snd = system.actorOf( - senderProps( - receiver, - receivers, - testSettings, - plotProbe.ref, - printTaskRunnerMetrics = i == 0, - resultReporter), + senderProps(receiver, receivers, testSettings, plotProbe.ref, resultReporter), testName + "-snd" + (i + 1)) val terminationProbe = TestProbe() terminationProbe.watch(snd) diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index 46edbdb975..9332e8e7dc 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -8,20 +8,19 @@ import java.util.concurrent.Executors import java.util.concurrent.atomic.AtomicLongArray import java.util.concurrent.locks.LockSupport -import scala.concurrent.duration._ - import akka.actor._ import akka.dispatch.Dispatchers import akka.remote.RemotingMultiNodeSpec import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.serialization.jackson.CborSerializable -import akka.stream.ActorMaterializer +import akka.stream.{ ActorMaterializer, ThrottleMode } +import akka.stream.scaladsl.Source import akka.testkit._ import com.typesafe.config.ConfigFactory import org.HdrHistogram.Histogram -import akka.stream.scaladsl.Source -import akka.stream.ThrottleMode + +import scala.concurrent.duration._ object LatencySpec extends MultiNodeConfig { val first = role("first") @@ -109,7 +108,6 @@ object LatencySpec extends MultiNodeConfig { var count = 0 var startTime = System.nanoTime() - val taskRunnerMetrics = new TaskRunnerMetrics(context.system) var reportedArrayOOB = false def receive = { @@ -161,8 +159,6 @@ object LatencySpec extends MultiNodeConfig { println("Histogram of RTT latencies in microseconds.") histogram.outputPercentileDistribution(System.out, 1000.0) - taskRunnerMetrics.printHistograms() - val plots = LatencyPlots( PlotResult().add(testName, percentile(50.0)), PlotResult().add(testName, percentile(90.0)), diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 01c8cb8f56..cc933f364d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -8,19 +8,17 @@ import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS -import scala.concurrent.duration._ - import akka.actor._ -import akka.remote.{ RARP, RemoteActorRefProvider, RemotingMultiNodeSpec } +import akka.remote.artery.compress.CompressionProtocol.Events.ReceivedActorRefCompressionTable import akka.remote.testconductor.RoleName -import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.PerfFlamesSupport -import akka.serialization.ByteBufferSerializer -import akka.serialization.SerializerWithStringManifest +import akka.remote.testkit.{ MultiNodeConfig, PerfFlamesSupport } +import akka.remote.{ RARP, RemoteActorRefProvider, RemotingMultiNodeSpec } +import akka.serialization.jackson.CborSerializable +import akka.serialization.{ ByteBufferSerializer, SerializerWithStringManifest } import akka.testkit._ import com.typesafe.config.ConfigFactory -import akka.remote.artery.compress.CompressionProtocol.Events.ReceivedActorRefCompressionTable -import akka.serialization.jackson.CborSerializable + +import scala.concurrent.duration._ object MaxThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -110,14 +108,11 @@ object MaxThroughputSpec extends MultiNodeConfig { override def tell(msg: Any, sender: ActorRef) = sel.tell(msg, sender) } - def receiverProps(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int): Props = - Props(new Receiver(reporter, payloadSize, printTaskRunnerMetrics, numSenders)) - .withDispatcher("akka.remote.default-remote-dispatcher") + def receiverProps(reporter: RateReporter, payloadSize: Int, numSenders: Int): Props = + Props(new Receiver(reporter, payloadSize, numSenders)).withDispatcher("akka.remote.default-remote-dispatcher") - class Receiver(reporter: RateReporter, payloadSize: Int, printTaskRunnerMetrics: Boolean, numSenders: Int) - extends Actor { + class Receiver(reporter: RateReporter, payloadSize: Int, numSenders: Int) extends Actor { private var c = 0L - private val taskRunnerMetrics = new TaskRunnerMetrics(context.system) private var endMessagesMissing = numSenders private var correspondingSender : ActorRef = null // the Actor which send the Start message will also receive the report @@ -138,8 +133,6 @@ object MaxThroughputSpec extends MultiNodeConfig { endMessagesMissing -= 1 // wait for End message from all senders case End => - if (printTaskRunnerMetrics) - taskRunnerMetrics.printHistograms() correspondingSender ! EndResult(c) context.stop(self) @@ -158,16 +151,14 @@ object MaxThroughputSpec extends MultiNodeConfig { targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, - printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter): Props = - Props(new Sender(mainTarget, targets, testSettings, plotRef, printTaskRunnerMetrics, reporter)) + Props(new Sender(mainTarget, targets, testSettings, plotRef, reporter)) class Sender( target: Target, targets: Array[Target], testSettings: TestSettings, plotRef: ActorRef, - printTaskRunnerMetrics: Boolean, reporter: BenchmarkFileReporter) extends Actor { val numTargets = targets.size @@ -177,7 +168,6 @@ object MaxThroughputSpec extends MultiNodeConfig { var startTime = 0L var remaining = totalMessages var maxRoundTripMillis = 0L - val taskRunnerMetrics = new TaskRunnerMetrics(context.system) context.system.eventStream.subscribe(self, classOf[ReceivedActorRefCompressionTable]) @@ -272,9 +262,6 @@ object MaxThroughputSpec extends MultiNodeConfig { s"total size ${totalSize(context.system)}, " + s"$took ms to deliver $totalReceived messages.") - if (printTaskRunnerMetrics) - taskRunnerMetrics.printHistograms() - plotRef ! PlotResult().add(testName, throughput * payloadSize * testSettings.senderReceiverPairs / 1024 / 1024) context.stop(self) @@ -459,9 +446,7 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec runOn(second) { val rep = reporter(testName) val receivers = (1 to senderReceiverPairs).map { n => - system.actorOf( - receiverProps(rep, payloadSize, printTaskRunnerMetrics = n == 1, senderReceiverPairs), - receiverName + n) + system.actorOf(receiverProps(rep, payloadSize, senderReceiverPairs), receiverName + n) } enterBarrier(receiverName + "-started") enterBarrier(testName + "-done") @@ -475,15 +460,8 @@ abstract class MaxThroughputSpec extends RemotingMultiNodeSpec(MaxThroughputSpec val senders = for (n <- 1 to senderReceiverPairs) yield { val receiver = receivers(n - 1) val plotProbe = TestProbe() - val snd = system.actorOf( - senderProps( - receiver, - receivers, - testSettings, - plotProbe.ref, - printTaskRunnerMetrics = n == 1, - resultReporter), - testName + "-snd" + n) + val snd = system + .actorOf(senderProps(receiver, receivers, testSettings, plotProbe.ref, resultReporter), testName + "-snd" + n) val terminationProbe = TestProbe() terminationProbe.watch(snd) snd ! Run diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TaskRunnerMetrics.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TaskRunnerMetrics.scala deleted file mode 100644 index 06b825d344..0000000000 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/TaskRunnerMetrics.scala +++ /dev/null @@ -1,63 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.remote.artery - -import akka.actor.ActorSystem -import akka.actor.ExtendedActorSystem -import akka.remote.RemoteActorRefProvider -import org.HdrHistogram.Histogram -import java.util.concurrent.TimeUnit.SECONDS - -class TaskRunnerMetrics(system: ActorSystem) { - - private var entryOffset = 0 - - def printHistograms(): Unit = { - val aeronSourceHistogram = new Histogram(SECONDS.toNanos(10), 3) - val aeronSinkHistogram = new Histogram(SECONDS.toNanos(10), 3) - system.asInstanceOf[ExtendedActorSystem].provider.asInstanceOf[RemoteActorRefProvider].transport match { - case a: ArteryTransport => - a.afrFileChannel.foreach { afrFileChannel => - var c = 0 - var aeronSourceMaxBeforeDelegate = 0L - var aeronSinkMaxBeforeDelegate = 0L - val reader = new FlightRecorderReader(afrFileChannel) - reader.structure.hiFreqLog.logs.foreach(_.compactEntries.foreach { entry => - c += 1 - if (c > entryOffset) { - entry.code match { - case FlightRecorderEvents.AeronSource_ReturnFromTaskRunner => - aeronSourceHistogram.recordValue(entry.param) - case FlightRecorderEvents.AeronSink_ReturnFromTaskRunner => - aeronSinkHistogram.recordValue(entry.param) - case FlightRecorderEvents.AeronSource_DelegateToTaskRunner => - aeronSourceMaxBeforeDelegate = math.max(aeronSourceMaxBeforeDelegate, entry.param) - case FlightRecorderEvents.AeronSink_DelegateToTaskRunner => - aeronSinkMaxBeforeDelegate = math.max(aeronSinkMaxBeforeDelegate, entry.param) - case _ => - } - } - }) - - reader.close() - entryOffset = c - - if (aeronSourceHistogram.getTotalCount > 0) { - println( - s"Histogram of AeronSource tasks in microseconds. Max count before delegate: $aeronSourceMaxBeforeDelegate") - aeronSourceHistogram.outputPercentileDistribution(System.out, 1000.0) - } - - if (aeronSinkHistogram.getTotalCount > 0) { - println( - s"Histogram of AeronSink tasks in microseconds. Max count before delegate: $aeronSinkMaxBeforeDelegate") - aeronSinkHistogram.outputPercentileDistribution(System.out, 1000.0) - } - } - case _ => - } - } - -} diff --git a/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes b/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes index 477e803765..d6d2808114 100644 --- a/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes +++ b/akka-remote/src/main/mima-filters/2.5.x.backwards.excludes @@ -36,3 +36,39 @@ ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.serialization.Th # Upgrade to protobuf 3 ProblemFilters.exclude[Problem]("akka.remote.serialization.ArteryMessageSerializer*") ProblemFilters.exclude[Problem]("akka.remote.*Formats*") + +# Remove AFR #27581 +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderReader$*") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderReader") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorder") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorder$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.SynchronizedEventSink") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.EventClock") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.ShutDown") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderDump$delayedInit$body") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderStatus") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.EventClockImpl") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.RollingEventLogSection") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.RollingEventLogSection$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderDump") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.FlightRecorderDump$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.Running") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.Running$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.ShutDown$") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.SnapshotInProgress") +ProblemFilters.exclude[MissingClassProblem]("akka.remote.artery.SnapshotInProgress$") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.IgnoreEventSink.*") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArterySettings#Advanced.FlightRecorderEnabled") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArterySettings#Advanced.FlightRecorderDestination") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.tcp.TcpFraming.this") +ProblemFilters.exclude[ReversedMissingMethodProblem]("akka.remote.artery.EventSink.hiFreq") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.IgnoreEventSink.hiFreq") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.EventSink.alert") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.EventSink.hiFreq") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.EventSink.flushHiFreqBatch") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.remote.artery.IgnoreEventSink.alert") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.afrFileChannel") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.afrFile") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.createFlightRecorderEventSink") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.createFlightRecorderEventSink$default$1") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.remote.artery.ArteryTransport.flightRecorder") diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 2aadac1d3a..35a2a621d0 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -995,17 +995,6 @@ akka { # If more restarts occurs the ActorSystem will be terminated. outbound-max-restarts = 5 - - - flight-recorder { - enabled = off - # Controls where the flight recorder file will be written. There are three options: - # 1. Empty: a file will be generated in the temporary directory of the OS - # 2. A relative or absolute path ending with ".afr": this file will be used - # 3. A relative or absolute path: this directory will be used, the file will get a random file name - destination = "" - } - # compression of common strings in remoting messages, like actor destinations, serializers etc compression { diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 717c13fcc7..88dc0dbf63 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -166,8 +166,6 @@ private[akka] final class ArterySettings private (config: Config) { .getMillisDuration("outbound-restart-timeout") .requiring(interval => interval > Duration.Zero, "outbound-restart-timeout must be more than zero") val OutboundMaxRestarts: Int = getInt("outbound-max-restarts") - val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled") - val FlightRecorderDestination: String = getString("flight-recorder.destination") val Compression = new Compression(getConfig("compression")) final val MaximumFrameSize: Int = math diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index b2458c6b21..f2b30f9331 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -4,33 +4,14 @@ package akka.remote.artery -import java.nio.channels.FileChannel -import java.nio.file.Path import java.util.concurrent.TimeUnit -import java.util.concurrent.atomic.AtomicBoolean -import java.util.concurrent.atomic.AtomicLong -import java.util.concurrent.atomic.AtomicReference +import java.util.concurrent.atomic.{ AtomicBoolean, AtomicLong, AtomicReference } -import scala.annotation.tailrec -import scala.concurrent.Await -import scala.concurrent.Future -import scala.concurrent.Promise -import scala.concurrent.duration._ -import scala.util.Failure -import scala.util.Success -import scala.util.Try -import scala.util.control.NoStackTrace -import scala.util.control.NonFatal - -import akka.Done -import akka.NotUsed -import akka.actor.Actor -import akka.actor.Props -import akka.actor._ +import akka.{ Done, NotUsed } +import akka.actor.{ Actor, ActorRef, Address, CoordinatedShutdown, Dropped, ExtendedActorSystem, Props } import akka.annotation.InternalStableApi import akka.dispatch.Dispatchers -import akka.event.Logging -import akka.event.LoggingAdapter +import akka.event.{ Logging, LoggingAdapter } import akka.remote.AddressUidExtension import akka.remote.RemoteActorRef import akka.remote.RemoteActorRefProvider @@ -38,25 +19,22 @@ import akka.remote.RemoteTransport import akka.remote.UniqueAddress import akka.remote.artery.Decoder.InboundCompressionAccess import akka.remote.artery.Encoder.OutboundCompressionAccess -import akka.remote.artery.InboundControlJunction.ControlMessageObserver -import akka.remote.artery.InboundControlJunction.ControlMessageSubject +import akka.remote.artery.InboundControlJunction.{ ControlMessageObserver, ControlMessageSubject } import akka.remote.artery.OutboundControlJunction.OutboundControlIngress import akka.remote.artery.compress.CompressionProtocol.CompressionMessage import akka.remote.artery.compress._ -import akka.remote.transport.ThrottlerTransportAdapter.Blackhole -import akka.remote.transport.ThrottlerTransportAdapter.SetThrottle -import akka.remote.transport.ThrottlerTransportAdapter.Unthrottled -import akka.stream.AbruptTerminationException -import akka.stream.ActorMaterializer -import akka.stream.KillSwitches -import akka.stream.Materializer -import akka.stream.SharedKillSwitch -import akka.stream.scaladsl.Flow -import akka.stream.scaladsl.Keep -import akka.stream.scaladsl.Sink +import akka.remote.transport.ThrottlerTransportAdapter.{ Blackhole, SetThrottle, Unthrottled } +import akka.stream._ +import akka.stream.scaladsl.{ Flow, Keep, Sink } import akka.util.{ unused, OptionVal, WildcardIndex } import com.github.ghik.silencer.silent +import scala.annotation.tailrec +import scala.concurrent.{ Await, Future, Promise } +import scala.concurrent.duration._ +import scala.util.{ Failure, Success, Try } +import scala.util.control.{ NoStackTrace, NonFatal } + /** * INTERNAL API * Inbound API that is used by the stream operators. @@ -332,11 +310,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr override val log: LoggingAdapter = Logging(system, getClass.getName) - val (afrFileChannel, afrFile, flightRecorder) = initializeFlightRecorder() match { - case None => (None, None, None) - case Some((c, f, r)) => (Some(c), Some(f), Some(r)) - } - /** * Compression tables must be created once, such that inbound lane restarts don't cause dropping of the tables. * However are the InboundCompressions are owned by the Decoder operator, and any call into them must be looped through the Decoder! @@ -345,7 +318,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr */ protected val _inboundCompressions = { if (settings.Advanced.Compression.Enabled) { - val eventSink = createFlightRecorderEventSink(synchr = false) + val eventSink = IgnoreEventSink new InboundCompressionsImpl(system, this, settings.Advanced.Compression, eventSink) } else NoInboundCompressions } @@ -404,22 +377,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr capacity = settings.Advanced.OutboundMessageQueueSize * settings.Advanced.OutboundLanes * 3) - /** - * Thread-safe flight recorder for top level events. - */ - val topLevelFlightRecorder: EventSink = - createFlightRecorderEventSink(synchr = true) - - def createFlightRecorderEventSink(synchr: Boolean = false): EventSink = { - flightRecorder match { - case Some(f) => - val eventSink = f.createEventSink() - if (synchr) new SynchronizedEventSink(eventSink) - else eventSink - case None => - IgnoreEventSink - } - } + val topLevelFlightRecorder: EventSink = IgnoreEventSink private val associationRegistry = new AssociationRegistry( remoteAddress => @@ -627,6 +585,7 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr // and can result in forming two separate clusters (cluster split). // Instead, the downing strategy should act on ThisActorSystemQuarantinedEvent, e.g. // use it as a STONITH signal. + @silent("deprecated") val lifecycleEvent = ThisActorSystemQuarantinedEvent(localAddress, from) system.eventStream.publish(lifecycleEvent) @@ -709,9 +668,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr _inboundCompressionAccess = OptionVal.None topLevelFlightRecorder.loFreq(Transport_FlightRecorderClose, NoMetaData) - flightRecorder.foreach(_.close()) - afrFileChannel.foreach(_.force(true)) - afrFileChannel.foreach(_.close()) Done } } @@ -965,17 +921,6 @@ private[remote] abstract class ArteryTransport(_system: ExtendedActorSystem, _pr .toMat(messageDispatcherSink)(Keep.both) } - private def initializeFlightRecorder(): Option[(FileChannel, Path, FlightRecorder)] = { - if (settings.Advanced.FlightRecorderEnabled) { - val afrFile = FlightRecorder.createFlightRecorderFile(settings.Advanced.FlightRecorderDestination) - log.info("Flight recorder enabled, output can be found in '{}'", afrFile) - - val fileChannel = FlightRecorder.prepareFileForFlightRecorder(afrFile) - Some((fileChannel, afrFile, new FlightRecorder(fileChannel))) - } else - None - } - def outboundTestFlow(outboundContext: OutboundContext): Flow[OutboundEnvelope, OutboundEnvelope, NotUsed] = if (settings.Advanced.TestMode) Flow.fromGraph(new OutboundTestStage(outboundContext, testState)) else Flow[OutboundEnvelope] diff --git a/akka-remote/src/main/scala/akka/remote/artery/EventSink.scala b/akka-remote/src/main/scala/akka/remote/artery/EventSink.scala new file mode 100644 index 0000000000..cf1d2064e9 --- /dev/null +++ b/akka-remote/src/main/scala/akka/remote/artery/EventSink.scala @@ -0,0 +1,25 @@ +/* + * Copyright (C) 2009-2019 Lightbend Inc. + */ + +package akka.remote.artery + +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +@InternalApi +private[akka] trait EventSink { + def loFreq(eventId: Int, data: Array[Byte]): Unit + def loFreq(eventId: Int, data: String): Unit + def hiFreq(eventId: Int, data: Long): Unit + def alert(eventId: Int, data: Array[Byte]): Unit +} + +object IgnoreEventSink extends EventSink { + def loFreq(eventId: Int, data: Array[Byte]): Unit = () + def loFreq(eventId: Int, data: String): Unit = () + def hiFreq(eventId: Int, data: Long): Unit = () + def alert(eventId: Int, data: Array[Byte]): Unit = () +} diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala deleted file mode 100644 index 9bf3e5f8f0..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala +++ /dev/null @@ -1,462 +0,0 @@ -/* - * Copyright (C) 2016-2019 Lightbend Inc. - */ - -package akka.remote.artery - -import java.io.RandomAccessFile -import java.nio.channels.FileChannel -import java.nio.file._ -import java.nio.{ ByteBuffer, ByteOrder } -import java.util.concurrent.atomic.AtomicReference -import java.util.concurrent.{ CountDownLatch, TimeUnit } - -import org.agrona.BitUtil -import org.agrona.concurrent.MappedResizeableBuffer - -import scala.annotation.tailrec - -/** - * INTERNAL API - */ -private[remote] trait EventSink { - def alert(code: Int, metadata: Array[Byte]): Unit - def alert(code: Int, metadata: String): Unit - def loFreq(code: Int, metadata: Array[Byte]): Unit - def loFreq(code: Int, metadata: String): Unit - def hiFreq(code: Long, param: Long): Unit - - def flushHiFreqBatch(): Unit -} - -/** - * INTERNAL API - */ -private[remote] object IgnoreEventSink extends EventSink { - override def alert(code: Int, metadata: Array[Byte]): Unit = () - override def alert(code: Int, metadata: String): Unit = () - override def loFreq(code: Int, metadata: Array[Byte]): Unit = () - override def loFreq(code: Int, metadata: String): Unit = () - override def flushHiFreqBatch(): Unit = () - override def hiFreq(code: Long, param: Long): Unit = () -} - -/** - * INTERNAL API - */ -private[remote] class SynchronizedEventSink(delegate: EventSink) extends EventSink { - override def alert(code: Int, metadata: Array[Byte]): Unit = synchronized { - delegate.alert(code, metadata) - } - - override def alert(code: Int, metadata: String): Unit = { - alert(code, metadata.getBytes("US-ASCII")) - } - - override def loFreq(code: Int, metadata: Array[Byte]): Unit = synchronized { - delegate.loFreq(code, metadata) - } - - override def loFreq(code: Int, metadata: String): Unit = { - loFreq(code, metadata.getBytes("US-ASCII")) - } - - override def flushHiFreqBatch(): Unit = synchronized { - delegate.flushHiFreqBatch() - } - - override def hiFreq(code: Long, param: Long): Unit = synchronized { - delegate.hiFreq(code, param) - } -} - -/** - * INTERNAL API - * - * Update clock at various resolutions and acquire the resulting timestamp. - */ -private[remote] trait EventClock { - - def updateWallClock(): Unit - def updateHighSpeedClock(): Unit - - def wallClockPart: Long - def highSpeedPart: Long - -} - -/** - * INTERNAL API - * - * This class is not thread-safe - */ -private[remote] class EventClockImpl extends EventClock { - - private[this] var wallClock: Long = 0 - private[this] var highSpeedClock: Long = 0 - private[this] var highSpeedClockOffset: Long = 0 - - updateWallClock() - - override def updateWallClock(): Unit = { - wallClock = System.currentTimeMillis() - highSpeedClockOffset = System.nanoTime() - highSpeedClock = 0 - } - - override def updateHighSpeedClock(): Unit = { - // TODO: Update wall clock once in a while - highSpeedClock = System.nanoTime() - highSpeedClockOffset - } - - override def wallClockPart: Long = wallClock - override def highSpeedPart: Long = highSpeedClock -} - -/** - * INTERNAL API - */ -private[remote] object RollingEventLogSection { - val HeadPointerOffset = 0L - val LogStateOffset = 8L - val RecordsOffset = 16L - val LogOffset = 0L - - // Log states - val Empty = 0 - val Live = 1 - val Snapshot = 2 - - // Slot states - val Committed = 0 - val Dirty = 1 - - val CommitEntrySize = 4 -} - -/** - * INTERNAL API - */ -private[remote] class RollingEventLogSection( - fileChannel: FileChannel, - offset: Long, - entryCount: Long, - logBufferSize: Long, - recordSize: Int) { - import RollingEventLogSection._ - - require(entryCount > 0, "entryCount must be greater than 0") - require((entryCount & (entryCount - 1)) == 0, "entryCount must be power of two") - private[this] val LogMask: Long = entryCount - 1L - - private[this] val buffers: Array[MappedResizeableBuffer] = Array.tabulate(FlightRecorder.SnapshotCount) { logId => - val buffer = new MappedResizeableBuffer(fileChannel, offset + logId * logBufferSize, logBufferSize) - // Clear old data - buffer.setMemory(0, logBufferSize.toInt, 0.toByte) - if (logId == 0) buffer.putLong(LogStateOffset, Live) - buffer - } - - def clear(logId: Int): Unit = buffers(logId).setMemory(0, logBufferSize.toInt, 0.toByte) - - /* - * The logic here MUST be kept in sync with its simulated version in RollingEventLogSimulationSpec as it - * is currently the best place to do in-depth stress-testing of this logic. Unfortunately currently there is no - * sane way to use the same code here and in the test, too. - */ - def write(logId: Int, recordBuffer: ByteBuffer): Unit = { - val logBuffer: MappedResizeableBuffer = buffers(logId) - - @tailrec def writeRecord(): Unit = { - // Advance the head - val recordOffset = RecordsOffset + ((logBuffer.getAndAddLong(HeadPointerOffset, 1L) & LogMask) * recordSize) - val payloadOffset = recordOffset + CommitEntrySize - // Signal that we write to the record. This is to prevent concurrent writes to the same slot - // if the head *wraps over* and points again to this location. Without this we would end up with partial or corrupted - // writes to the slot. - if (logBuffer.compareAndSetInt(recordOffset, Committed, Dirty)) { - // 128 bytes total, 4 bytes used for Commit/Dirty flag - logBuffer.putBytes(payloadOffset, recordBuffer, recordSize - 4) - //println(logBuffer.getLong(recordOffset + 4)) - - // Now this is free to be overwritten - logBuffer.putIntVolatile(recordOffset, Committed) - } else writeRecord() // Try to claim a new slot - } - - writeRecord() - } - - def markSnapshot(logId: Int): Unit = buffers(logId).putLongVolatile(LogStateOffset, Snapshot) - def markLive(logId: Int): Unit = buffers(logId).putLongVolatile(LogStateOffset, Live) - - def close(): Unit = buffers.foreach(_.close()) -} - -/** - * INTERNAL API - */ -private[remote] object FlightRecorder { - - /** - * @return A created file where the flight recorder file can be written. There are three options, depending - * on ``destination``: - * 1. Empty: a file will be generated in the temporary directory of the OS - * 2. A relative or absolute path ending with ".afr": this file will be used - * 3. A relative or absolute path: this directory will be used, the file will get a random file name - */ - def createFlightRecorderFile(destination: String, fs: FileSystem = FileSystems.getDefault): Path = { - - // TODO safer file permissions (e.g. only user readable on POSIX)? - destination match { - // not defined, use temporary directory - case "" => Files.createTempFile("artery", ".afr") - - case directory if directory.endsWith(".afr") => - val path = fs.getPath(directory).toAbsolutePath - if (!Files.exists(path)) { - Files.createDirectories(path.getParent) - Files.createFile(path) - } - path - - case directory => - val path = fs.getPath(directory).toAbsolutePath - if (!Files.exists(path)) Files.createDirectories(path) - - Files.createTempFile(path, "artery", ".afr") - } - } - - def prepareFileForFlightRecorder(path: Path): FileChannel = { - // Force the size, otherwise memory mapping will fail on *nixes - val randomAccessFile = new RandomAccessFile(path.toFile, "rwd") - randomAccessFile.setLength(FlightRecorder.TotalSize) - randomAccessFile.close() - - FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ) - } - - val Alignment = 64 * 1024 // Windows is picky about mapped section alignments - - val MagicString = 0x31524641 // "AFR1", little-endian - val GlobalSectionSize = BitUtil.align(24, Alignment) - val StartTimeStampOffset = 4 - - val LogHeaderSize = 16 - val SnapshotCount = 4 - val SnapshotMask = SnapshotCount - 1 - - // TODO: Dummy values right now, format is under construction - val AlertRecordSize = 128 - val LoFreqRecordSize = 128 - val HiFreqBatchSize = 62 - val HiFreqRecordSize = 16 * (HiFreqBatchSize + 2) // (batched events + header) - - val AlertWindow = 256 - val LoFreqWindow = 256 - val HiFreqWindow = 256 // This is counted in batches ! - - val AlertLogSize = BitUtil.align(LogHeaderSize + (AlertWindow * AlertRecordSize), Alignment) - val LoFreqLogSize = BitUtil.align(LogHeaderSize + (LoFreqWindow * LoFreqRecordSize), Alignment) - val HiFreqLogSize = BitUtil.align(LogHeaderSize + (HiFreqWindow * HiFreqRecordSize), Alignment) - - val AlertSectionSize = AlertLogSize * SnapshotCount - val LoFreqSectionSize = LoFreqLogSize * SnapshotCount - val HiFreqSectionSize = HiFreqLogSize * SnapshotCount - - val AlertSectionOffset = GlobalSectionSize - val LoFreqSectionOffset = GlobalSectionSize + AlertSectionSize - val HiFreqSectionOffset = GlobalSectionSize + AlertSectionSize + LoFreqSectionSize - - val TotalSize = GlobalSectionSize + AlertSectionSize + LoFreqSectionSize + HiFreqSectionSize - - val HiFreqEntryCountFieldOffset = 16 -} - -/** - * INTERNAL API - */ -private[remote] sealed trait FlightRecorderStatus -private[remote] case object Running extends FlightRecorderStatus -private[remote] case object ShutDown extends FlightRecorderStatus -private[remote] final case class SnapshotInProgress(latch: CountDownLatch) extends FlightRecorderStatus - -/** - * INTERNAL API - */ -private[remote] class FlightRecorder(val fileChannel: FileChannel) - extends AtomicReference[FlightRecorderStatus](Running) { - import FlightRecorder._ - - private[this] val globalSection = new MappedResizeableBuffer(fileChannel, 0, GlobalSectionSize) - - require(SnapshotCount > 0, "SnapshotCount must be greater than 0") - require((SnapshotCount & (SnapshotCount - 1)) == 0, "SnapshotCount must be power of two") - private[this] val SnapshotMask = SnapshotCount - 1 - private[this] val alertLogs = - new RollingEventLogSection( - fileChannel = fileChannel, - offset = AlertSectionOffset, - entryCount = AlertWindow, - logBufferSize = AlertLogSize, - recordSize = AlertRecordSize) - private[this] val loFreqLogs = - new RollingEventLogSection( - fileChannel = fileChannel, - offset = LoFreqSectionOffset, - entryCount = LoFreqWindow, - logBufferSize = LoFreqLogSize, - recordSize = LoFreqRecordSize) - private[this] val hiFreqLogs = - new RollingEventLogSection( - fileChannel = fileChannel, - offset = HiFreqSectionOffset, - entryCount = HiFreqWindow, - logBufferSize = HiFreqLogSize, - recordSize = HiFreqRecordSize) - // No need for volatile, guarded by atomic CAS and set - @volatile private var currentLog = 0 - - init() - - private def init(): Unit = { - globalSection.putInt(0, MagicString) - globalSection.putLong(StartTimeStampOffset, System.currentTimeMillis()) - } - - def snapshot(): Unit = { - // Coalesce concurrent snapshot requests into one, i.e. ignore the "late-comers". - // In other words, this is a critical section in which participants either enter, or just - // simply skip ("Hm, seems someone else already does it. ¯\_(ツ)_/¯ ") - val snapshotLatch = new CountDownLatch(1) - val snapshotInProgress = SnapshotInProgress(snapshotLatch) - if (compareAndSet(Running, snapshotInProgress)) { - val previousLog = currentLog - val nextLog = (currentLog + 1) & SnapshotMask - // Mark new log as Live - hiFreqLogs.clear(nextLog) - loFreqLogs.clear(nextLog) - alertLogs.clear(nextLog) - hiFreqLogs.markLive(nextLog) - loFreqLogs.markLive(nextLog) - alertLogs.markLive(nextLog) - // Redirect traffic to newly allocated log - currentLog = nextLog - // Mark previous log as snapshot - hiFreqLogs.markSnapshot(previousLog) - loFreqLogs.markSnapshot(previousLog) - alertLogs.markSnapshot(previousLog) - fileChannel.force(true) - snapshotLatch.countDown() - compareAndSet(snapshotInProgress, Running) - // At this point it is NOT GUARANTEED that all writers have finished writing to the currently snapshotted - // buffer! - } - } - - def close(): Unit = { - getAndSet(ShutDown) match { - case SnapshotInProgress(latch) => latch.await(3, TimeUnit.SECONDS) - case _ => // Nothing to unlock - } - alertLogs.close() - hiFreqLogs.close() - loFreqLogs.close() - globalSection.close() - } - - def createEventSink(): EventSink = new EventSink { - private[this] val clock = new EventClockImpl - private[this] val alertRecordBuffer = ByteBuffer.allocate(AlertRecordSize).order(ByteOrder.LITTLE_ENDIAN) - private[this] val loFreqRecordBuffer = ByteBuffer.allocate(LoFreqRecordSize).order(ByteOrder.LITTLE_ENDIAN) - private[this] val hiFreqBatchBuffer = ByteBuffer.allocate(HiFreqRecordSize).order(ByteOrder.LITTLE_ENDIAN) - private[this] var hiFreqBatchedEntries = 0L - - startHiFreqBatch() - - override def alert(code: Int, metadata: Array[Byte]): Unit = { - if (FlightRecorder.this.get eq Running) { - clock.updateWallClock() - prepareRichRecord(alertRecordBuffer, code, metadata) - alertLogs.write(currentLog, alertRecordBuffer) - flushHiFreqBatch() - snapshot() - } - } - - override def alert(code: Int, metadata: String): Unit = { - alert(code, metadata.getBytes("US-ASCII")) - } - - override def loFreq(code: Int, metadata: Array[Byte]): Unit = { - val status = FlightRecorder.this.get - if (status eq Running) { - clock.updateHighSpeedClock() - prepareRichRecord(loFreqRecordBuffer, code, metadata) - loFreqLogs.write(currentLog, loFreqRecordBuffer) - } - } - - override def loFreq(code: Int, metadata: String): Unit = { - loFreq(code, metadata.getBytes("US-ASCII")) - } - - private def prepareRichRecord(recordBuffer: ByteBuffer, code: Int, metadata: Array[Byte]): Unit = { - recordBuffer.clear() - // TODO: This is a bit overkill, needs some smarter scheme later, no need to always store the wallclock - recordBuffer.putLong(clock.wallClockPart) - recordBuffer.putLong(clock.highSpeedPart) - recordBuffer.putInt(code) - // Truncate if necessary - val metadataLength = math.min(LoFreqRecordSize - 32, metadata.length) - recordBuffer.put(metadataLength.toByte) - if (metadataLength > 0) - recordBuffer.put(metadata, 0, metadataLength) - // Don't flip here! We always write fixed size records - recordBuffer.position(0) - } - - // TODO: Try to save as many bytes here as possible! We will see crazy throughput here - override def hiFreq(code: Long, param: Long): Unit = { - val status = FlightRecorder.this.get - if (status eq Running) { - hiFreqBatchedEntries += 1 - hiFreqBatchBuffer.putLong(code) - hiFreqBatchBuffer.putLong(param) - - // If batch is full, time to flush - if (!hiFreqBatchBuffer.hasRemaining) flushHiFreqBatch() - } - } - - private def startHiFreqBatch(): Unit = { - hiFreqBatchBuffer.clear() - // Refresh the nanotime - clock.updateHighSpeedClock() - // Header of the batch will contain our most accurate knowledge of the clock, individual entries do not - // contain any timestamp - hiFreqBatchBuffer.putLong(clock.wallClockPart) - hiFreqBatchBuffer.putLong(clock.highSpeedPart) - // Leave space for the size field - hiFreqBatchBuffer.putLong(0L) - // Reserved for now - hiFreqBatchBuffer.putLong(0L) - // Mow ready to write some more events... - } - - override def flushHiFreqBatch(): Unit = { - val status = FlightRecorder.this.get - if (status eq Running) { - if (hiFreqBatchedEntries > 0) { - hiFreqBatchBuffer.putLong(HiFreqEntryCountFieldOffset, hiFreqBatchedEntries) - hiFreqBatchedEntries = 0 - hiFreqBatchBuffer.position(0) - hiFreqLogs.write(currentLog, hiFreqBatchBuffer) - startHiFreqBatch() - } - } - } - - } -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala deleted file mode 100644 index fc03849127..0000000000 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala +++ /dev/null @@ -1,274 +0,0 @@ -/* - * Copyright (C) 2018-2019 Lightbend Inc. - */ - -package akka.remote.artery - -import java.io.{ IOException, RandomAccessFile } -import java.nio.channels.FileChannel -import java.nio.file.{ FileSystems, Path } -import java.time.Instant - -import org.agrona.concurrent.MappedResizeableBuffer - -import scala.collection.{ immutable, SortedSet } - -/** - * Internal API - * - * Minimal utility for dumping a given afr file as text to stdout - */ -object FlightRecorderDump extends App { - require(args.size == 1, "Usage: FlightRecorderDump afr-file") - val path = FileSystems.getDefault.getPath(args(0)) - FlightRecorderReader.dumpToStdout(path) -} - -/** - * Internal API - */ -private[akka] object FlightRecorderReader { - import FlightRecorder._ - - sealed trait LogState - case object Empty extends LogState - case object Live extends LogState - case object Snapshot extends LogState - - case class SectionParameters( - offset: Long, - sectionSize: Long, - logSize: Long, - window: Long, - recordSize: Long, - entriesPerRecord: Long) { - override def toString: String = - s""" - | offset = $offset - | size = $sectionSize - | log size = $logSize - | window = $window - | record size = $recordSize - | max Entries/Record = $entriesPerRecord - | max Total Entries = ${entriesPerRecord * window} - """.stripMargin - } - - val AlertSectionParameters = SectionParameters( - offset = AlertSectionOffset, - sectionSize = AlertSectionSize, - logSize = AlertLogSize, - window = AlertWindow, - recordSize = AlertRecordSize, - entriesPerRecord = 1) - - val LoFreqSectionParameters = SectionParameters( - offset = LoFreqSectionOffset, - sectionSize = LoFreqSectionSize, - logSize = LoFreqLogSize, - window = LoFreqWindow, - recordSize = LoFreqRecordSize, - entriesPerRecord = 1) - - val HiFreqSectionParameters = SectionParameters( - offset = HiFreqSectionOffset, - sectionSize = HiFreqSectionSize, - logSize = HiFreqLogSize, - window = HiFreqWindow, - recordSize = HiFreqRecordSize, - entriesPerRecord = HiFreqBatchSize) - - def dumpToStdout(flightRecorderFile: Path): Unit = { - var raFile: RandomAccessFile = null - var channel: FileChannel = null - var reader: FlightRecorderReader = null - try { - - raFile = new RandomAccessFile(flightRecorderFile.toFile, "rw") - channel = raFile.getChannel - reader = new FlightRecorderReader(channel) - val alerts: Seq[FlightRecorderReader#Entry] = reader.structure.alertLog.logs.flatMap(_.richEntries) - val hiFreq: Seq[FlightRecorderReader#Entry] = reader.structure.hiFreqLog.logs.flatMap(_.compactEntries) - val loFreq: Seq[FlightRecorderReader#Entry] = reader.structure.loFreqLog.logs.flatMap(_.richEntries) - - implicit val ordering = - Ordering.fromLessThan[FlightRecorderReader#Entry]((a, b) => a.timeStamp.isBefore(b.timeStamp)) - val sorted = SortedSet[FlightRecorderReader#Entry](alerts: _*) ++ hiFreq ++ loFreq - - println("--- FLIGHT RECORDER LOG") - sorted.foreach(println) - - } finally { - if (reader ne null) reader.close() - if (channel ne null) channel.close() - if (raFile ne null) raFile.close() - } - } - -} - -/** - * Internal API - */ -private[akka] final class FlightRecorderReader(fileChannel: FileChannel) { - import FlightRecorder._ - import FlightRecorderReader._ - - case class Structure(startTime: Instant, alertLog: RollingLog, loFreqLog: RollingLog, hiFreqLog: RollingLog) { - override def toString: String = - s""" - |AFR file created at $startTime - |Total size : $TotalSize - | - |--- ALERTS - |$alertLog - |--- LOW FREQUENCY EVENTS - |$loFreqLog - |--- HIGH FREQUENCY EVENTS - |$hiFreqLog - """.stripMargin - } - - case class RollingLog(sectionParameters: SectionParameters, logs: immutable.Seq[Log]) { - override def toString: String = - s""" - |$sectionParameters - | - |${logs.mkString(" ", "\n ", "")} - """.stripMargin - } - - case class Log(sectionParameters: SectionParameters, offset: Long, id: Int, state: LogState, head: Long) { - override def toString: String = s"$id: $state \thead = $head (Offset: $offset Size: ${sectionParameters.logSize})" - - def richEntries: Iterator[RichEntry] = { - new Iterator[RichEntry] { - var recordOffset = offset + RollingEventLogSection.RecordsOffset - var recordsLeft = math.min(head, sectionParameters.window) - - override def hasNext: Boolean = recordsLeft > 0 - - override def next(): RichEntry = { - val recordStartOffset = recordOffset + RollingEventLogSection.CommitEntrySize - - // FIXME: extract magic numbers - val metadata = new Array[Byte](fileBuffer.getByte(recordStartOffset + 20)) - fileBuffer.getBytes(recordStartOffset + 21, metadata) - - val entry = RichEntry( - timeStamp = Instant - .ofEpochMilli(fileBuffer.getLong(recordStartOffset)) - .plusNanos(fileBuffer.getLong(recordStartOffset + 8)), - dirty = fileBuffer.getLong(recordOffset) == RollingEventLogSection.Dirty, - code = fileBuffer.getInt(recordStartOffset + 16), - metadata = metadata) - recordOffset += sectionParameters.recordSize - recordsLeft -= 1 - entry - } - } - } - - def compactEntries: Iterator[CompactEntry] = { - new Iterator[CompactEntry] { - var recordOffset = offset + RollingEventLogSection.RecordsOffset - var entryOffset = recordOffset + RollingEventLogSection.CommitEntrySize - var recordsLeft = math.min(head, sectionParameters.window) - var entriesLeft = -1L - var dirty = false - var timeStamp: Instant = _ - - private def readHeader(): Unit = { - dirty = fileBuffer.getLong(recordOffset) == RollingEventLogSection.Dirty - val entiresHeaderOffset = recordOffset + RollingEventLogSection.CommitEntrySize - entriesLeft = fileBuffer.getLong(entiresHeaderOffset + HiFreqEntryCountFieldOffset) - timeStamp = Instant - .ofEpochMilli(fileBuffer.getLong(entiresHeaderOffset)) - .plusNanos(fileBuffer.getLong(entiresHeaderOffset + 8)) - entryOffset = entiresHeaderOffset + 32 - } - - override def hasNext: Boolean = recordsLeft > 0 - - override def next(): CompactEntry = { - if (entriesLeft == -1L) readHeader() - - val entry = CompactEntry( - timeStamp, - dirty, - code = fileBuffer.getLong(entryOffset), - param = fileBuffer.getLong(entryOffset + 8)) - - entriesLeft -= 1 - if (entriesLeft == 0) { - recordOffset += sectionParameters.recordSize - recordsLeft -= 1 - readHeader() - } else { - entryOffset += 16 - } - - entry - } - } - } - } - - trait Entry { - def timeStamp: Instant - } - - case class RichEntry(timeStamp: Instant, dirty: Boolean, code: Long, metadata: Array[Byte]) extends Entry { - override def toString: String = { - val textualCode = FlightRecorderEvents.eventDictionary.getOrElse(code, "").take(34) - val metadataString = new String(metadata, "US-ASCII") - f"[$timeStamp] ${if (dirty) "#" else ""} $code%3s $textualCode%-34s | $metadataString" - } - } - - case class CompactEntry(timeStamp: Instant, dirty: Boolean, code: Long, param: Long) extends Entry { - override def toString: String = { - val textualCode = FlightRecorderEvents.eventDictionary.getOrElse(code, "").take(34) - f"[$timeStamp] ${if (dirty) "#" else ""} $code%3s $textualCode%-34s | $param" - } - } - - private val fileBuffer = new MappedResizeableBuffer(fileChannel, 0, TotalSize) - private var _structure: Structure = _ - rereadStructure() - - def rereadStructure(): Unit = { - if (fileBuffer.getInt(0) != MagicString) { - fileBuffer.close() - throw new IOException(s"Expected magic string AFR1 (0x31524641) but got ${fileBuffer.getInt(0)}") - } - - val alertLog = readRollingLog(AlertSectionParameters) - val loFreqLog = readRollingLog(LoFreqSectionParameters) - val hiFreqLog = readRollingLog(HiFreqSectionParameters) - - _structure = Structure(Instant.ofEpochMilli(fileBuffer.getLong(4)), alertLog, loFreqLog, hiFreqLog) - } - - private def readRollingLog(sectionParameters: SectionParameters): RollingLog = { - val logs = Vector.tabulate(SnapshotCount) { idx => - readLog(idx, sectionParameters.offset + (idx * sectionParameters.logSize), sectionParameters) - } - RollingLog(sectionParameters, logs) - } - - private def readLog(id: Int, offset: Long, sectionParameters: SectionParameters): Log = { - val state = fileBuffer.getLong(offset + RollingEventLogSection.LogStateOffset) match { - case RollingEventLogSection.Empty => Empty - case RollingEventLogSection.Live => Live - case RollingEventLogSection.Snapshot => Snapshot - case other => throw new IOException(s"Unrecognized log state: $other in log at offset $offset") - } - Log(sectionParameters, offset, id, state, fileBuffer.getLong(offset + RollingEventLogSection.HeadPointerOffset)) - } - - def structure: Structure = _structure - - def close(): Unit = fileBuffer.close() - -} diff --git a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala index b3f051d42f..69177649e5 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/aeron/ArteryAeronUdpTransport.scala @@ -297,7 +297,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro taskRunner, bufferPool, giveUpAfter, - createFlightRecorderEventSink())) + IgnoreEventSink)) } private def aeronSource( @@ -305,14 +305,7 @@ private[remote] class ArteryAeronUdpTransport(_system: ExtendedActorSystem, _pro pool: EnvelopeBufferPool, inboundChannel: String): Source[EnvelopeBuffer, AeronSource.AeronLifecycle] = Source.fromGraph( - new AeronSource( - inboundChannel, - streamId, - aeron, - taskRunner, - pool, - createFlightRecorderEventSink(), - aeronSourceSpinningStrategy)) + new AeronSource(inboundChannel, streamId, aeron, taskRunner, pool, IgnoreEventSink, aeronSourceSpinningStrategy)) private def aeronSourceSpinningStrategy: Int = if (settings.Advanced.InboundLanes > 1 || // spinning was identified to be the cause of massive slowdowns with multiple lanes, see #21365 diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala index ad98346917..524d65c542 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/ArteryTcpTransport.scala @@ -115,7 +115,7 @@ private[remote] class ArteryTcpTransport( bufferPool: EnvelopeBufferPool): Sink[EnvelopeBuffer, Future[Done]] = { implicit val sys: ActorSystem = system - val afr = createFlightRecorderEventSink() + val afr = IgnoreEventSink val host = outboundContext.remoteAddress.host.get val port = outboundContext.remoteAddress.port.get @@ -224,7 +224,7 @@ private[remote] class ArteryTcpTransport( val binding = serverBinding match { case None => - val afr = createFlightRecorderEventSink() + val afr = IgnoreEventSink val binding = connectionSource .to(Sink.foreach { connection => afr.loFreq( @@ -315,7 +315,7 @@ private[remote] class ArteryTcpTransport( Flow[ByteString] .via(inboundKillSwitch.flow) // must create new FlightRecorder event sink for each connection because they can't be shared - .via(new TcpFraming(() => createFlightRecorderEventSink(false))) + .via(new TcpFraming) .alsoTo(inboundStream) .filter(_ => false) // don't send back anything in this TCP socket .map(_ => ByteString.empty) // make it a Flow[ByteString] again diff --git a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala index 708d57bf5e..cd2850d785 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/tcp/TcpFraming.scala @@ -59,10 +59,11 @@ import akka.util.ByteString /** * INTERNAL API */ -@InternalApi private[akka] class TcpFraming(flightRecorderSupplier: () => EventSink) - extends ByteStringParser[EnvelopeBuffer] { +@InternalApi private[akka] class TcpFraming extends ByteStringParser[EnvelopeBuffer] { + + val flightRecorder = IgnoreEventSink + override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = new ParsingLogic { - val flightRecorder = flightRecorderSupplier() abstract class Step extends ParseStep[EnvelopeBuffer] startWith(ReadMagic) diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala index 7b2d2cd0e0..c1a39f9a36 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala @@ -4,21 +4,19 @@ package akka.remote.artery -import akka.actor.{ ActorSystem, Address, BootstrapSetup, RootActorPath } import akka.actor.setup.ActorSystemSetup +import akka.actor.{ ActorSystem, Address, BootstrapSetup, RootActorPath } import akka.remote.RARP import akka.testkit.{ AkkaSpec, SocketUtil } import com.typesafe.config.{ Config, ConfigFactory } -import org.scalatest.Outcome -import org.scalatest.Pending +import org.scalatest.{ Outcome, Pending } /** * Base class for remoting tests what needs to test interaction between a "local" actor system * which is always created (the usual AkkaSpec system), and multiple additional actor systems over artery */ abstract class ArteryMultiNodeSpec(config: Config) - extends AkkaSpec(config.withFallback(ArterySpecSupport.defaultConfig)) - with FlightRecorderSpecIntegration { + extends AkkaSpec(config.withFallback(ArterySpecSupport.defaultConfig)) { def this() = this(ConfigFactory.empty()) def this(extraConfig: String) = this(ConfigFactory.parseString(extraConfig)) @@ -58,8 +56,8 @@ abstract class ArteryMultiNodeSpec(config: Config) name: Option[String] = None, setup: Option[ActorSystemSetup] = None): ActorSystem = { val config = - ArterySpecSupport.newFlightRecorderConfig.withFallback(extraConfig.fold(localSystem.settings.config)(str => - ConfigFactory.parseString(str).withFallback(localSystem.settings.config))) + extraConfig.fold(localSystem.settings.config)(str => + ConfigFactory.parseString(str).withFallback(localSystem.settings.config)) val sysName = name.getOrElse(nextGeneratedSystemName) val remoteSystem = setup match { @@ -74,7 +72,6 @@ abstract class ArteryMultiNodeSpec(config: Config) override def afterTermination(): Unit = { remoteSystems.foreach(sys => shutdown(sys)) - (system +: remoteSystems).foreach(handleFlightRecorderFile) remoteSystems = Vector.empty super.afterTermination() } diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala index 3acb3f7ed6..2b09783549 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArterySpecSupport.scala @@ -4,15 +4,7 @@ package akka.remote.artery -import java.nio.file.{ FileSystems, Files, Path } -import java.util.UUID - -import akka.actor.ActorSystem -import akka.remote.RARP -import akka.testkit.AkkaSpec -import com.typesafe.config.Config -import com.typesafe.config.ConfigFactory -import org.scalatest.Outcome +import com.typesafe.config.{ Config, ConfigFactory } object ArterySpecSupport { // same for all artery enabled remoting tests @@ -32,26 +24,11 @@ object ArterySpecSupport { } }""") - def newFlightRecorderConfig: Config = - ConfigFactory.parseString(s""" - akka { - remote.artery { - advanced.flight-recorder { - enabled=on - destination=target/flight-recorder-${UUID.randomUUID().toString}.afr - } - } - } - """) - /** * Artery enabled, flight recorder enabled, dynamic selection of port on localhost. - * Combine with [[FlightRecorderSpecIntegration]] or remember to delete flight recorder file if using manually */ def defaultConfig: Config = - newFlightRecorderConfig - .withFallback(staticArteryRemotingConfig) - .withFallback(tlsConfig) // TLS only used if transport=tls-tcp + staticArteryRemotingConfig.withFallback(tlsConfig) // TLS only used if transport=tls-tcp // set the test key-store and trust-store properties // TLS only used if transport=tls-tcp, which can be set from specific tests or @@ -69,44 +46,3 @@ object ArterySpecSupport { } } - -/** - * Dumps flight recorder data on test failure if artery flight recorder is enabled - * - * Important note: if you more than one (the default AkkaSpec.system) systems you need to override - * afterTermination and call handleFlightRecorderFile manually in the spec or else it will not be dumped - * on failure but also leak the afr file - */ -trait FlightRecorderSpecIntegration { self: AkkaSpec => - - def system: ActorSystem - - protected final def flightRecorderFileFor(system: ActorSystem): Path = - FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination) - - // keep track of failure so that we can print flight recorder output on failures - protected final def failed = _failed - private var _failed = false - override protected def withFixture(test: NoArgTest): Outcome = { - val out = test() - if (!out.isSucceeded) _failed = true - out - } - - override def afterTermination(): Unit = { - self.afterTermination() - handleFlightRecorderFile(system) - } - - protected def handleFlightRecorderFile(system: ActorSystem): Unit = { - val flightRecorderFile = flightRecorderFileFor(system) - if (Files.exists(flightRecorderFile)) { - if (failed) { - // logger may not be alive anymore so we have to use stdout here - println(s"Flight recorder dump for system [${system.name}]:") - FlightRecorderReader.dumpToStdout(flightRecorderFile) - } - Files.delete(flightRecorderFile) - } - } -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala deleted file mode 100644 index a8b6b0ceab..0000000000 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala +++ /dev/null @@ -1,442 +0,0 @@ -/* - * Copyright (C) 2009-2019 Lightbend Inc. - */ - -package akka.remote.artery - -import java.io.{ File, IOException, RandomAccessFile } -import java.nio.channels.FileChannel -import java.nio.file.{ Files, Path, StandardOpenOption } -import java.time.Instant -import java.util.Arrays -import java.util.concurrent.{ CountDownLatch, TimeUnit } - -import akka.testkit.AkkaSpec -import com.google.common.jimfs.{ Configuration, Jimfs } - -class FlightRecorderSpec extends AkkaSpec { - import FlightRecorderReader._ - - "Flight Recorder" must { - - "properly initialize AFR file when created" in withFlightRecorder { (_, reader, channel) => - channel.force(false) - - // otherwise isAfter assertion below can randomly fail - Thread.sleep(1) - val currentTime = Instant.now() - - reader.rereadStructure() - - currentTime.isAfter(reader.structure.startTime) should be(true) - (currentTime.toEpochMilli - reader.structure.startTime.toEpochMilli < 3000) should be(true) - - reader.structure.alertLog.logs.size should ===(FlightRecorder.SnapshotCount) - reader.structure.loFreqLog.logs.size should ===(FlightRecorder.SnapshotCount) - reader.structure.hiFreqLog.logs.size should ===(FlightRecorder.SnapshotCount) - - def checkLogInitialized(log: reader.RollingLog): Unit = { - log.logs(0).state should ===(Live) - log.logs(0).head should ===(0L) - log.logs(0).richEntries.toSeq should ===(Nil) - - log.logs(1).state should ===(Empty) - log.logs(1).head should ===(0L) - log.logs(1).richEntries.toSeq should ===(Nil) - - log.logs(2).state should ===(Empty) - log.logs(2).head should ===(0L) - log.logs(2).richEntries.toSeq should ===(Nil) - - log.logs(3).state should ===(Empty) - log.logs(3).head should ===(0L) - log.logs(3).richEntries.toSeq should ===(Nil) - } - - checkLogInitialized(reader.structure.alertLog) - checkLogInitialized(reader.structure.loFreqLog) - checkLogInitialized(reader.structure.hiFreqLog) - } - - "properly rotate logs when snapshotting" in withFlightRecorder { (recorder, reader, channel) => - recorder.snapshot() - channel.force(false) - reader.rereadStructure() - - def checkLogRotated(log: reader.RollingLog, states: Seq[LogState]): Unit = - log.logs.zip(states).foreach { case (log, state) => log.state should ===(state) } - - checkLogRotated(reader.structure.alertLog, List(Snapshot, Live, Empty, Empty)) - checkLogRotated(reader.structure.loFreqLog, List(Snapshot, Live, Empty, Empty)) - checkLogRotated(reader.structure.hiFreqLog, List(Snapshot, Live, Empty, Empty)) - - recorder.snapshot() - reader.rereadStructure() - - checkLogRotated(reader.structure.alertLog, List(Snapshot, Snapshot, Live, Empty)) - checkLogRotated(reader.structure.loFreqLog, List(Snapshot, Snapshot, Live, Empty)) - checkLogRotated(reader.structure.hiFreqLog, List(Snapshot, Snapshot, Live, Empty)) - - recorder.snapshot() - recorder.snapshot() - reader.rereadStructure() - - checkLogRotated(reader.structure.alertLog, List(Live, Snapshot, Snapshot, Snapshot)) - checkLogRotated(reader.structure.loFreqLog, List(Live, Snapshot, Snapshot, Snapshot)) - checkLogRotated(reader.structure.hiFreqLog, List(Live, Snapshot, Snapshot, Snapshot)) - } - - "properly report zero low frequency events" in withFlightRecorder { (_, reader, channel) => - channel.force(false) - reader.rereadStructure() - - val entries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - - entries.isEmpty should be(true) - } - - "properly report zero high frequency events" in withFlightRecorder { (_, reader, channel) => - channel.force(false) - reader.rereadStructure() - - val entries = reader.structure.hiFreqLog.logs(0).compactEntries.toSeq - - entries.isEmpty should be(true) - } - - "properly store one low frequency event" in withFlightRecorder { (recorder, reader, channel) => - val sink = recorder.createEventSink() - val helloBytes = "Hello".getBytes("US-ASCII") - - sink.loFreq(42, helloBytes) - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - - entries.exists(_.dirty) should be(false) - entries.map(_.code.toInt) should ===(List(42)) - } - - "properly store one high frequency event" in withFlightRecorder { (recorder, reader, channel) => - val sink = recorder.createEventSink() - - sink.hiFreq(42, 64) - sink.flushHiFreqBatch() - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.hiFreqLog.logs(0).compactEntries.toSeq - - entries.exists(_.dirty) should be(false) - entries.map(_.code.toInt) should ===(List(42)) - entries.map(_.param.toInt) should ===(List(64)) - } - - "properly store low frequency events" in withFlightRecorder { (recorder, reader, channel) => - val sink = recorder.createEventSink() - val helloBytes = "Hello".getBytes("US-ASCII") - - for (i <- 0 until FlightRecorder.LoFreqWindow) - sink.loFreq(i, helloBytes) - - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - - entries.exists(_.dirty) should be(false) - entries.map(_.code.toInt) should ===(0 until FlightRecorder.LoFreqWindow) - entries.forall(entry => Arrays.equals(entry.metadata, helloBytes)) should be(true) - - // Timestamps are monotonic - entries.sortBy(_.code) should ===(entries.sortBy(_.timeStamp)) - } - - "properly truncate low frequency event metadata if necessary" in withFlightRecorder { (recorder, reader, channel) => - val sink = recorder.createEventSink() - val longMetadata = new Array[Byte](1024) - - sink.loFreq(0, longMetadata) - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - - entries.size should ===(1) - entries.head.metadata should ===(new Array[Byte](FlightRecorder.LoFreqRecordSize - 32)) - - } - - "properly store high frequency events" in withFlightRecorder { (recorder, reader, channel) => - val EffectiveHighFreqWindow = FlightRecorder.HiFreqWindow * FlightRecorder.HiFreqBatchSize - val sink = recorder.createEventSink() - - for (i <- 0 until EffectiveHighFreqWindow) - sink.hiFreq(i, 42) - - sink.flushHiFreqBatch() - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.hiFreqLog.logs(0).compactEntries.toSeq - - entries.exists(_.dirty) should be(false) - entries.map(_.code.toInt) should ===(0 until EffectiveHighFreqWindow) - entries.forall(entry => entry.param == 42) should be(true) - - // Timestamps are monotonic - entries.sortBy(_.code) should ===(entries.sortBy(_.timeStamp)) - } - - "properly store and rotate low frequency events" in withFlightRecorder { (recorder, reader, channel) => - val sink = recorder.createEventSink() - val helloBytes = "Hello".getBytes("US-ASCII") - - for (i <- 0 until FlightRecorder.LoFreqWindow + 100) - sink.loFreq(i, helloBytes) - - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - - entries.exists(_.dirty) should be(false) - entries.map(_.code.toInt).sorted should ===(100 until (FlightRecorder.LoFreqWindow + 100)) - entries.forall(entry => Arrays.equals(entry.metadata, helloBytes)) should be(true) - - // Timestamps are monotonic - entries.sortBy(_.code) should ===(entries.sortBy(_.timeStamp)) - } - - "properly store and rotate high frequency events" in withFlightRecorder { (recorder, reader, channel) => - val EffectiveHighFreqWindow = FlightRecorder.HiFreqWindow * FlightRecorder.HiFreqBatchSize - val sink = recorder.createEventSink() - - for (i <- 0 until EffectiveHighFreqWindow + 100) - sink.hiFreq(i, 42) - - sink.flushHiFreqBatch() - channel.force(false) - - reader.rereadStructure() - val entries = reader.structure.hiFreqLog.logs(0).compactEntries.toSeq - - entries.exists(_.dirty) should be(false) - // Note the (2 * FlightRecorder.HiFreqBatchSize) initial sequence number. - // This is because the overflow by 100 events rotates out two records, not just 100. - entries.map(_.code.toInt).sorted should ===( - (2 * FlightRecorder.HiFreqBatchSize) until (EffectiveHighFreqWindow + 100)) - entries.forall(entry => entry.param == 42) should be(true) - - // Timestamps are monotonic - entries.sortBy(_.code) should ===(entries.sortBy(_.timeStamp)) - } - - "properly store low frequency events after snapshot" in withFlightRecorder { (recorder, reader, _) => - val sink = recorder.createEventSink() - val helloBytes = "Hello".getBytes("US-ASCII") - val hello2Bytes = "Hello2".getBytes("US-ASCII") - - for (i <- 0 until 100) - sink.loFreq(i, helloBytes) - - recorder.snapshot() - - for (i <- 0 until 50) - sink.loFreq(i, hello2Bytes) - - reader.rereadStructure() - - reader.structure.loFreqLog.logs(0).state should ===(Snapshot) - reader.structure.loFreqLog.logs(1).state should ===(Live) - - val snapshotEntries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - val liveEntries = reader.structure.loFreqLog.logs(1).richEntries.toSeq - - snapshotEntries.exists(_.dirty) should be(false) - snapshotEntries.map(_.code.toInt) should ===(0 until 100) - snapshotEntries.forall(entry => Arrays.equals(entry.metadata, helloBytes)) should be(true) - - // Timestamps are monotonic - snapshotEntries.sortBy(_.code) should ===(snapshotEntries.sortBy(_.timeStamp)) - - liveEntries.exists(_.dirty) should be(false) - liveEntries.map(_.code.toInt) should ===(0 until 50) - liveEntries.forall(entry => Arrays.equals(entry.metadata, hello2Bytes)) should be(true) - - // Timestamps are monotonic - liveEntries.sortBy(_.code) should ===(liveEntries.sortBy(_.timeStamp)) - } - - "properly store high frequency events after snapshot" in withFlightRecorder { (recorder, reader, channel) => - val sink = recorder.createEventSink() - - for (i <- 0 until 100) - sink.hiFreq(i, 0) - - sink.flushHiFreqBatch() - recorder.snapshot() - - for (i <- 0 until 50) - sink.hiFreq(i, 1) - - sink.flushHiFreqBatch() - channel.force(false) - reader.rereadStructure() - - reader.structure.hiFreqLog.logs(0).state should ===(Snapshot) - reader.structure.hiFreqLog.logs(1).state should ===(Live) - - val snapshotEntries = reader.structure.hiFreqLog.logs(0).compactEntries.toSeq - val liveEntries = reader.structure.hiFreqLog.logs(1).compactEntries.toSeq - - snapshotEntries.exists(_.dirty) should be(false) - snapshotEntries.map(_.code.toInt) should ===(0 until 100) - snapshotEntries.forall(_.param == 0) should be(true) - - // Timestamps are monotonic - snapshotEntries.sortBy(_.code) should ===(snapshotEntries.sortBy(_.timeStamp)) - - liveEntries.exists(_.dirty) should be(false) - liveEntries.map(_.code.toInt) should ===(0 until 50) - liveEntries.forall(_.param == 1) should be(true) - - // Timestamps are monotonic - liveEntries.sortBy(_.code) should ===(liveEntries.sortBy(_.timeStamp)) - } - - "properly store alerts and make a snapshot" in withFlightRecorder { (recorder, reader, _) => - val sink = recorder.createEventSink() - val helloBytes = "Hello".getBytes("US-ASCII") - val alertBytes = "An alert".getBytes("US-ASCII") - - for (i <- 0 until 100) { - sink.hiFreq(i, 1) - sink.loFreq(i, helloBytes) - } - - sink.alert(42, alertBytes) - reader.rereadStructure() - - // Snapshot is automatically taken - reader.structure.alertLog.logs(0).state should ===(Snapshot) - reader.structure.loFreqLog.logs(0).state should ===(Snapshot) - reader.structure.hiFreqLog.logs(0).state should ===(Snapshot) - reader.structure.alertLog.logs(1).state should ===(Live) - reader.structure.loFreqLog.logs(1).state should ===(Live) - reader.structure.hiFreqLog.logs(1).state should ===(Live) - - val hiFreqEntries = reader.structure.hiFreqLog.logs(0).compactEntries.toSeq - val loFreqEntries = reader.structure.loFreqLog.logs(0).richEntries.toSeq - val alertEntries = reader.structure.alertLog.logs(0).richEntries.toSeq - - // High frequency events are flushed (100 leaves an uncomplete batch if not flushed, - // i.e. only the first batch visible if alert did not flush) - hiFreqEntries.map(_.code.toInt) should ===(0 until 100) - hiFreqEntries.forall(_.param == 1) should be(true) - loFreqEntries.map(_.code.toInt) should ===(0 until 100) - loFreqEntries.forall(entry => Arrays.equals(entry.metadata, helloBytes)) should be(true) - alertEntries.map(_.code.toInt) should ===(List(42)) - Arrays.equals(alertEntries.head.metadata, alertBytes) should be(true) - } - - "properly store events from multiple threads" in withFlightRecorder { (recorder, reader, channel) => - val Threads = 4 - val startLatch = new CountDownLatch(1) - val finishLatch = new CountDownLatch(Threads) - - for (i <- 1 to Threads) { - new Thread { - override def run(): Unit = { - val sink = recorder.createEventSink() - startLatch.await(3, TimeUnit.SECONDS) - - for (j <- 0 until 100) sink.loFreq(code = i, Array(j.toByte)) - finishLatch.countDown() - } - }.start() - } - - startLatch.countDown() - finishLatch.await(3, TimeUnit.SECONDS) - channel.force(false) - reader.rereadStructure() - - reader.structure.loFreqLog.logs.head.richEntries.size should ===(FlightRecorder.LoFreqWindow) - - for (i <- 1 to Threads) { - val entries = reader.structure.loFreqLog.logs.head.richEntries.filter(_.code == i).toSeq - - entries.exists(_.dirty) should be(false) - // Entries are consecutive for any given writer - entries.map(_.metadata(0).toInt).sorted should ===((100 - entries.size) until 100) - entries.forall(_.code == i) should be(true) - - // Timestamps are monotonic - entries.sortBy(_.metadata(0).toInt) should ===(entries.sortBy(_.timeStamp)) - } - } - - "create flight recorder file" in { - def assertFileIsSound(path: Path) = { - Files.exists(path) should ===(true) - Files.isRegularFile(path) should ===(true) - Files.isWritable(path) should ===(true) - Files.isReadable(path) should ===(true) - } - val fs = Jimfs.newFileSystem(Configuration.unix()) - - try { - val tmpPath = FlightRecorder.createFlightRecorderFile("", fs) - assertFileIsSound(tmpPath) - // this is likely in the actual file system, so lets delete it - Files.delete(tmpPath) - - Files.createDirectory(fs.getPath("/directory")) - val tmpFileInGivenPath = FlightRecorder.createFlightRecorderFile("/directory", fs) - assertFileIsSound(tmpFileInGivenPath) - - val specificFile = FlightRecorder.createFlightRecorderFile("/directory/flight-recorder.afr", fs) - assertFileIsSound(specificFile) - - } finally { - fs.close() - } - - } - - } - - private def withFlightRecorder(body: (FlightRecorder, FlightRecorderReader, FileChannel) => Unit): Unit = { - val file = File.createTempFile("artery", ".afr") - file.deleteOnExit() - - var randomAccessFile: RandomAccessFile = null - var recorder: FlightRecorder = null - var reader: FlightRecorderReader = null - var channel: FileChannel = null - - try { - randomAccessFile = new RandomAccessFile(file, "rwd") - randomAccessFile.setLength(FlightRecorder.TotalSize) - randomAccessFile.close() - - channel = - FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ) - recorder = new FlightRecorder(channel) - reader = new FlightRecorderReader(channel) - body(recorder, reader, channel) - } finally { - // Try to delete anyway - try { - if (randomAccessFile ne null) randomAccessFile.close() - if (recorder ne null) recorder.close() - if (reader ne null) reader.close() - if (channel ne null) channel.close() - file.delete() - } catch { case e: IOException => e.printStackTrace() } - } - } - -} diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala index 61cc79e2b2..19213685ba 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteDeployerSpec.scala @@ -4,12 +4,12 @@ package akka.remote.artery -import akka.testkit._ -import akka.actor._ -import akka.routing._ -import com.typesafe.config._ import akka.ConfigurationException +import akka.actor._ import akka.remote.RemoteScope +import akka.routing._ +import akka.testkit._ +import com.typesafe.config._ object RemoteDeployerSpec { val deployerConf = ConfigFactory.parseString(""" @@ -29,7 +29,7 @@ object RemoteDeployerSpec { } -class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) with FlightRecorderSpecIntegration { +class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) { "A RemoteDeployer" must { diff --git a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala index d24e0070b2..90e4c11fad 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/RemoteRouterSpec.scala @@ -4,14 +4,15 @@ package akka.remote.artery -import scala.collection.immutable -import akka.testkit._ -import akka.routing._ import akka.actor._ import akka.remote.routing._ -import com.typesafe.config._ -import akka.testkit.TestActors.echoActorProps import akka.remote.{ RARP, RemoteScope } +import akka.routing._ +import akka.testkit.TestActors.echoActorProps +import akka.testkit._ +import com.typesafe.config._ + +import scala.collection.immutable object RemoteRouterSpec { class Parent extends Actor { @@ -38,8 +39,7 @@ class RemoteRouterSpec router = round-robin-pool nr-of-instances = 6 } - }""").withFallback(ArterySpecSupport.defaultConfig)) - with FlightRecorderSpecIntegration { + }""").withFallback(ArterySpecSupport.defaultConfig)) { import RemoteRouterSpec._ @@ -78,14 +78,12 @@ class RemoteRouterSpec target.nodes = ["akka://${sysName}@localhost:${port}"] } } - }""").withFallback(ArterySpecSupport.newFlightRecorderConfig).withFallback(system.settings.config) + }""").withFallback(system.settings.config) val masterSystem = ActorSystem("Master" + sysName, conf) override def afterTermination(): Unit = { shutdown(masterSystem) - handleFlightRecorderFile(system) - handleFlightRecorderFile(masterSystem) } def collectRouteePaths(probe: TestProbe, router: ActorRef, n: Int): immutable.Seq[ActorPath] = { diff --git a/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala index 444e2d46ec..a6408852d7 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/tcp/TcpFramingSpec.scala @@ -6,8 +6,8 @@ package akka.remote.artery package tcp import akka.stream.impl.io.ByteStringParser.ParsingException -import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Framing.FramingException +import akka.stream.scaladsl.Flow import akka.stream.scaladsl.Sink import akka.stream.scaladsl.Source import akka.testkit.AkkaSpec @@ -21,9 +21,7 @@ class TcpFramingSpec extends AkkaSpec(""" """) with ImplicitSender { import TcpFraming.encodeFrameHeader - private val afr = IgnoreEventSink - - private val framingFlow = Flow[ByteString].via(new TcpFraming(() => afr)) + private val framingFlow = Flow[ByteString].via(new TcpFraming) private val payload5 = ByteString((1 to 5).map(_.toByte).toArray)