diff --git a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala index a819090154..6d4eeb9927 100644 --- a/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala +++ b/akka-cluster/src/multi-jvm/scala/akka/cluster/MultiNodeClusterSpec.scala @@ -5,20 +5,24 @@ package akka.cluster // TODO remove metrics +import java.util.UUID + import language.implicitConversions -import org.scalatest.{ Suite, Outcome, Canceled } +import org.scalatest.{ Canceled, Outcome, Suite } import org.scalatest.exceptions.TestCanceledException import com.typesafe.config.Config import com.typesafe.config.ConfigFactory import akka.remote.testconductor.RoleName -import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec } +import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeSpec, STMultiNodeSpec } import akka.testkit._ import akka.testkit.TestEvent._ import akka.actor.{ ActorSystem, Address } import akka.event.Logging.ErrorLevel + import scala.concurrent.duration._ import scala.collection.immutable import java.util.concurrent.ConcurrentHashMap + import akka.remote.DefaultFailureDetectorRegistry import akka.actor.ActorRef import akka.actor.Actor @@ -33,7 +37,7 @@ object MultiNodeClusterSpec { def clusterConfig(failureDetectorPuppet: Boolean): Config = if (failureDetectorPuppet) clusterConfigWithFailureDetectorPuppet else clusterConfig - def clusterConfig: Config = ConfigFactory.parseString(""" + def clusterConfig: Config = ConfigFactory.parseString(s""" akka.actor.provider = cluster akka.cluster { jmx.enabled = off @@ -47,11 +51,18 @@ object MultiNodeClusterSpec { akka.loglevel = INFO akka.log-dead-letters = off akka.log-dead-letters-during-shutdown = off - akka.remote.log-remote-lifecycle-events = off + akka.remote { + log-remote-lifecycle-events = off + artery.advanced.flight-recorder { + enabled=on + destination=target/flight-recorder-${UUID.randomUUID().toString}.afr + } + } akka.loggers = ["akka.testkit.TestEventListener"] akka.test { single-expect-default = 5 s } + """) // sometimes we need to coordinate test shutdown with messages instead of barriers @@ -77,7 +88,7 @@ object MultiNodeClusterSpec { } } -trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner { self: MultiNodeSpec ⇒ +trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoroner with FlightRecordingSupport { self: MultiNodeSpec ⇒ override def initialParticipants = roles.size @@ -92,6 +103,10 @@ trait MultiNodeClusterSpec extends Suite with STMultiNodeSpec with WatchedByCoro override protected def afterTermination(): Unit = { self.afterTermination() stopCoroner() + if (failed || sys.props.get("akka.remote.artery.always-dump-flight-recorder").isDefined) { + printFlightRecording() + } + deleteFlightRecorderFile() } override def expectedTestDuration = 60.seconds diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/FlightRecordingSupport.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/FlightRecordingSupport.scala new file mode 100644 index 0000000000..c5aa24275b --- /dev/null +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/FlightRecordingSupport.scala @@ -0,0 +1,59 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote.testkit + +import java.nio.file.{ FileSystems, Files, Path } + +import akka.remote.RARP +import akka.remote.artery.FlightRecorderReader + +/** + * Provides test framework agnostic methods to dump the artery flight recorder data after a test has completed - you + * must integrate the logic with the testing tool you use yourself. + * + * The flight recorder must be enabled and the flight recorder destination must be an absolute file name so + * that the akka config can be used to find it. For example you could ensure a unique file per test using + * something like this in your config: + * {{{ + * akka.remote.artery.advanced.flight-recorder { + * enabled=on + * destination=target/flight-recorder-${UUID.randomUUID().toString}.afr + * } + * }}} + * + * You need to hook in dump and deletion of files where it makes sense in your tests. (For example, dump after all tests has + * run and there was a failure and then delete) + */ +trait FlightRecordingSupport { self: MultiNodeSpec ⇒ + private lazy val arteryEnabled = + RARP(system).provider.remoteSettings.Artery.Enabled + private lazy val flightRecorderFile: Path = + FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination) + + /** + * Delete flight the recorder file if it exists + */ + final protected def deleteFlightRecorderFile(): Unit = { + if (arteryEnabled && destinationIsValidForDump() && Files.exists(flightRecorderFile)) { + Files.delete(flightRecorderFile) + } + } + + /** + * Dump the contents of the flight recorder file to standard output + */ + final protected def printFlightRecording(): Unit = { + if (arteryEnabled && destinationIsValidForDump() && Files.exists(flightRecorderFile)) { + // use stdout/println as we do not know if the system log is alive + println("Flight recorder dump:") + FlightRecorderReader.dumpToStdout(flightRecorderFile) + } + } + + private def destinationIsValidForDump() = { + val path = flightRecorderFile.toString + path != "" && path.endsWith(".afr") + } + +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala index 35badd1cc0..fd1d2d079b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/AttemptSysMsgRedeliverySpec.scala @@ -24,7 +24,7 @@ class AttemptSysMsgRedeliveryMultiJvmSpec(artery: Boolean) extends MultiNodeConf commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) testTransport(on = true) @@ -53,8 +53,7 @@ object AttemptSysMsgRedeliverySpec { } abstract class AttemptSysMsgRedeliverySpec(multiNodeConfig: AttemptSysMsgRedeliveryMultiJvmSpec) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender with DefaultTimeout { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import AttemptSysMsgRedeliverySpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala index 67635e1b17..47fe16b294 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/LookupRemoteActorSpec.scala @@ -18,7 +18,7 @@ class LookupRemoteActorMultiJvmSpec(artery: Boolean) extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) val master = role("master") val slave = role("slave") @@ -39,8 +39,8 @@ object LookupRemoteActorSpec { } } -abstract class LookupRemoteActorSpec(multiNodeConfig: LookupRemoteActorMultiJvmSpec) extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender with DefaultTimeout { +abstract class LookupRemoteActorSpec(multiNodeConfig: LookupRemoteActorMultiJvmSpec) + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import LookupRemoteActorSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/MultiNodeRemotingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/MultiNodeRemotingSpec.scala new file mode 100644 index 0000000000..65e92bc68a --- /dev/null +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/MultiNodeRemotingSpec.scala @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2016 Lightbend Inc. + */ +package akka.remote + +import java.util.UUID + +import akka.remote.testkit.{ FlightRecordingSupport, MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } +import akka.testkit.{ DefaultTimeout, ImplicitSender } +import com.typesafe.config.ConfigFactory +import org.scalatest.{ Outcome, Suite } + +object MultiNodeRemotingSpec { + + def arteryFlightRecordingConf = + ConfigFactory.parseString( + s""" + akka.remote.artery.advanced.flight-recorder { + enabled=on + destination=target/flight-recorder-${UUID.randomUUID().toString}.afr + } + """) + +} + +abstract class MultiNodeRemotingSpec(config: MultiNodeConfig) extends MultiNodeSpec(config) + with Suite + with STMultiNodeSpec + with FlightRecordingSupport + with ImplicitSender + with DefaultTimeout { self: MultiNodeSpec ⇒ + + // Keep track of failure so we can print artery flight recording on failure + private var failed = false + final override protected def withFixture(test: NoArgTest): Outcome = { + val out = super.withFixture(test) + if (!out.isSucceeded) + failed = true + out + } + + override def afterTermination(): Unit = { + if (failed || sys.props.get("akka.remote.artery.always-dump-flight-recorder").isDefined) { + printFlightRecording() + } + deleteFlightRecorderFile() + } +} diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala index 91a959dacd..66025899da 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/NewRemoteActorSpec.scala @@ -20,7 +20,7 @@ class NewRemoteActorMultiJvmSpec(artery: Boolean) extends MultiNodeConfig { ConfigFactory.parseString(s""" akka.remote.log-remote-lifecycle-events = off akka.remote.artery.enabled = $artery - """))) + """).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf))) val master = role("master") val slave = role("slave") @@ -55,8 +55,7 @@ object NewRemoteActorSpec { } abstract class NewRemoteActorSpec(multiNodeConfig: NewRemoteActorMultiJvmSpec) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender with DefaultTimeout { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import NewRemoteActorSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala index 7a337fa57c..6c3b9df226 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/PiercingShouldKeepQuarantineSpec.scala @@ -15,7 +15,7 @@ class PiercingShouldKeepQuarantineConfig(artery: Boolean) extends MultiNodeConfi ConfigFactory.parseString(s""" akka.remote.retry-gate-closed-for = 0.5s akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) } @@ -38,9 +38,7 @@ object PiercingShouldKeepQuarantineSpec { } abstract class PiercingShouldKeepQuarantineSpec(multiNodeConfig: PiercingShouldKeepQuarantineConfig) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec - with ImplicitSender { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import PiercingShouldKeepQuarantineSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeliverySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeliverySpec.scala index 3e59ee2e87..d54c90e4f8 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeliverySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeliverySpec.scala @@ -26,7 +26,7 @@ class RemoteDeliveryConfig(artery: Boolean) extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) } class RemoteDeliveryMultiJvmNode1 extends RemoteDeliverySpec(new RemoteDeliveryConfig(artery = false)) @@ -48,8 +48,7 @@ object RemoteDeliverySpec { } abstract class RemoteDeliverySpec(multiNodeConfig: RemoteDeliveryConfig) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import RemoteDeliverySpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala index 649e340b6e..a469a5baf1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteDeploymentDeathWatchSpec.scala @@ -27,7 +27,7 @@ class RemoteDeploymentDeathWatchMultiJvmSpec(artery: Boolean) extends MultiNodeC akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = off akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) deployOn(second, """/hello.remote = "@third@" """) @@ -69,8 +69,7 @@ object RemoteDeploymentDeathWatchSpec { } abstract class RemoteDeploymentDeathWatchSpec(multiNodeConfig: RemoteDeploymentDeathWatchMultiJvmSpec) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import RemoteDeploymentDeathWatchSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala index 9492cfc0cd..08f6269945 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteGatePiercingSpec.scala @@ -29,7 +29,7 @@ object RemoteGatePiercingSpec extends MultiNodeConfig { akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = INFO akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 5 s - """))) + """))) nodeConfig(first)( ConfigFactory.parseString("akka.remote.retry-gate-closed-for = 1 d # Keep it long")) @@ -51,8 +51,7 @@ class RemoteGatePiercingSpecMultiJvmNode1 extends RemoteGatePiercingSpec class RemoteGatePiercingSpecMultiJvmNode2 extends RemoteGatePiercingSpec abstract class RemoteGatePiercingSpec - extends MultiNodeSpec(RemoteGatePiercingSpec) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(RemoteGatePiercingSpec) { import RemoteGatePiercingSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala index 06dcfe4e8d..4fe5da25ce 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -31,7 +31,7 @@ class RemoteNodeDeathWatchConfig(artery: Boolean) extends MultiNodeConfig { ## Use a tighter setting than the default, otherwise it takes 20s for DeathWatch to trigger akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 3 s akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) } @@ -92,8 +92,7 @@ object RemoteNodeDeathWatchSpec { } abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchConfig) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import RemoteNodeDeathWatchSpec._ import RemoteWatcher._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala index 2a7a838e69..63dc11987a 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartDeathWatchSpec.scala @@ -63,8 +63,7 @@ object RemoteNodeRestartDeathWatchSpec { } abstract class RemoteNodeRestartDeathWatchSpec(multiNodeConfig: RemoteNodeRestartDeathWatchConfig) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import RemoteNodeRestartDeathWatchSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala index 08da06b2c7..ee7015602d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeRestartGateSpec.scala @@ -46,8 +46,7 @@ class RemoteNodeRestartGateSpecMultiJvmNode1 extends RemoteNodeRestartGateSpec class RemoteNodeRestartGateSpecMultiJvmNode2 extends RemoteNodeRestartGateSpec abstract class RemoteNodeRestartGateSpec - extends MultiNodeSpec(RemoteNodeRestartGateSpec) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(RemoteNodeRestartGateSpec) { import RemoteNodeRestartGateSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala index dc72e998a0..b396c6e402 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeShutdownAndComesBackSpec.scala @@ -30,7 +30,7 @@ object RemoteNodeShutdownAndComesBackSpec extends MultiNodeConfig { akka.remote.transport-failure-detector.heartbeat-interval = 1 s akka.remote.transport-failure-detector.acceptable-heartbeat-pause = 3 s akka.remote.watch-failure-detector.acceptable-heartbeat-pause = 60 s - """))) + """))) testTransport(on = true) @@ -47,8 +47,7 @@ class RemoteNodeShutdownAndComesBackMultiJvmNode1 extends RemoteNodeShutdownAndC class RemoteNodeShutdownAndComesBackMultiJvmNode2 extends RemoteNodeShutdownAndComesBackSpec abstract class RemoteNodeShutdownAndComesBackSpec - extends MultiNodeSpec(RemoteNodeShutdownAndComesBackSpec) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(RemoteNodeShutdownAndComesBackSpec) { import RemoteNodeShutdownAndComesBackSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala index 170fdfa4f6..7790ea489e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteQuarantinePiercingSpec.scala @@ -27,7 +27,7 @@ class RemoteQuarantinePiercingConfig(artery: Boolean) extends MultiNodeConfig { akka.loglevel = INFO akka.remote.log-remote-lifecycle-events = INFO akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) } @@ -51,9 +51,7 @@ object RemoteQuarantinePiercingSpec { } abstract class RemoteQuarantinePiercingSpec(multiNodeConfig: RemoteQuarantinePiercingConfig) - extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec - with ImplicitSender { + extends MultiNodeRemotingSpec(multiNodeConfig) { import multiNodeConfig._ import RemoteQuarantinePiercingSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala index 456d8a262a..5e7253286f 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteReDeploymentSpec.scala @@ -35,7 +35,7 @@ class RemoteReDeploymentConfig(artery: Boolean) extends MultiNodeConfig { acceptable-heartbeat-pause=2.5s } akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) testTransport(on = true) @@ -105,8 +105,8 @@ object RemoteReDeploymentMultiJvmSpec { def echoProps(target: ActorRef) = Props(new Echo(target)) } -abstract class RemoteReDeploymentMultiJvmSpec(multiNodeConfig: RemoteReDeploymentConfig) extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender { +abstract class RemoteReDeploymentMultiJvmSpec(multiNodeConfig: RemoteReDeploymentConfig) + extends MultiNodeRemotingSpec(multiNodeConfig) { def sleepAfterKill: FiniteDuration def expectQuarantine: Boolean diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala index bf66b9795a..cd53abd011 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteRestartedQuarantinedSpec.scala @@ -56,8 +56,7 @@ class RemoteRestartedQuarantinedSpecMultiJvmNode1 extends RemoteRestartedQuarant class RemoteRestartedQuarantinedSpecMultiJvmNode2 extends RemoteRestartedQuarantinedSpec abstract class RemoteRestartedQuarantinedSpec - extends MultiNodeSpec(RemoteRestartedQuarantinedSpec) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(RemoteRestartedQuarantinedSpec) { import RemoteRestartedQuarantinedSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala index dd87973f18..b43610fea1 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/Ticket15109Spec.scala @@ -47,9 +47,7 @@ object Ticket15109Spec extends MultiNodeConfig { class Ticket15109SpecMultiJvmNode1 extends Ticket15109Spec class Ticket15109SpecMultiJvmNode2 extends Ticket15109Spec -abstract class Ticket15109Spec extends MultiNodeSpec(Ticket15109Spec) - with STMultiNodeSpec - with ImplicitSender { +abstract class Ticket15109Spec extends MultiNodeRemotingSpec(Ticket15109Spec) { import Ticket15109Spec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala index f8e1afe19b..d7a5763be7 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/LatencySpec.scala @@ -9,6 +9,7 @@ import java.util.concurrent.locks.LockSupport import scala.concurrent.duration._ import akka.actor._ +import akka.remote.MultiNodeRemotingSpec import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -52,7 +53,7 @@ object LatencySpec extends MultiNodeConfig { } } } - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) final case object Reset @@ -160,8 +161,7 @@ class LatencySpecMultiJvmNode1 extends LatencySpec class LatencySpecMultiJvmNode2 extends LatencySpec abstract class LatencySpec - extends MultiNodeSpec(LatencySpec) - with STMultiNodeSpec with ImplicitSender { + extends MultiNodeRemotingSpec(LatencySpec) { import LatencySpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala index 9488ddd898..6e2b29184d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/MaxThroughputSpec.scala @@ -6,9 +6,10 @@ package akka.remote.artery import java.nio.ByteBuffer import java.util.concurrent.Executors import java.util.concurrent.TimeUnit.NANOSECONDS + import scala.concurrent.duration._ import akka.actor._ -import akka.remote.RemoteActorRefProvider +import akka.remote.{ MultiNodeRemotingSpec, RARP, RemoteActorRefProvider } import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -19,7 +20,6 @@ import akka.serialization.SerializerWithStringManifest import akka.testkit._ import com.typesafe.config.ConfigFactory import akka.remote.artery.compress.CompressionProtocol.Events.ReceivedActorRefCompressionTable -import akka.remote.RARP object MaxThroughputSpec extends MultiNodeConfig { val first = role("first") @@ -67,7 +67,7 @@ object MaxThroughputSpec extends MultiNodeConfig { } } } - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) case object Run sealed trait Echo extends DeadLetterSuppression with JavaSerializable @@ -277,10 +277,7 @@ object MaxThroughputSpec extends MultiNodeConfig { class MaxThroughputSpecMultiJvmNode1 extends MaxThroughputSpec class MaxThroughputSpecMultiJvmNode2 extends MaxThroughputSpec -abstract class MaxThroughputSpec - extends MultiNodeSpec(MaxThroughputSpec) - with STMultiNodeSpec with ImplicitSender - with PerfFlamesSupport { +abstract class MaxThroughputSpec extends MultiNodeRemotingSpec(MaxThroughputSpec) with PerfFlamesSupport { import MaxThroughputSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala index b7bc87155a..333de1a87e 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/RemoteRestartedQuarantinedSpec.scala @@ -4,12 +4,13 @@ package akka.remote.artery import akka.remote.transport.AssociationHandle + import language.postfixOps import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory import akka.actor._ import akka.remote.testconductor.RoleName -import akka.remote.transport.ThrottlerTransportAdapter.{ ForceDisassociateExplicitly, ForceDisassociate, Direction } +import akka.remote.transport.ThrottlerTransportAdapter.{ Direction, ForceDisassociate, ForceDisassociateExplicitly } import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec import akka.remote.testkit.STMultiNodeSpec @@ -17,10 +18,9 @@ import akka.testkit._ import akka.actor.ActorIdentity import akka.remote.testconductor.RoleName import akka.actor.Identify + import scala.concurrent.Await -import akka.remote.AddressUidExtension -import akka.remote.RARP -import akka.remote.ThisActorSystemQuarantinedEvent +import akka.remote.{ AddressUidExtension, MultiNodeRemotingSpec, RARP, ThisActorSystemQuarantinedEvent } object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { val first = role("first") @@ -31,7 +31,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { akka.loglevel = WARNING akka.remote.log-remote-lifecycle-events = WARNING akka.remote.artery.enabled = on - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) class Subject extends Actor { def receive = { @@ -45,9 +45,7 @@ object RemoteRestartedQuarantinedSpec extends MultiNodeConfig { class RemoteRestartedQuarantinedSpecMultiJvmNode1 extends RemoteRestartedQuarantinedSpec class RemoteRestartedQuarantinedSpecMultiJvmNode2 extends RemoteRestartedQuarantinedSpec -abstract class RemoteRestartedQuarantinedSpec - extends MultiNodeSpec(RemoteRestartedQuarantinedSpec) - with STMultiNodeSpec with ImplicitSender { +abstract class RemoteRestartedQuarantinedSpec extends MultiNodeRemotingSpec(RemoteRestartedQuarantinedSpec) { import RemoteRestartedQuarantinedSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala index 9eab4bab1b..3356111b5d 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/artery/SurviveNetworkPartitionSpec.scala @@ -7,7 +7,7 @@ import scala.concurrent.duration._ import akka.actor._ import akka.actor.ActorIdentity import akka.actor.Identify -import akka.remote.RARP +import akka.remote.{ MultiNodeRemotingSpec, QuarantinedEvent, RARP } import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig import akka.remote.testkit.MultiNodeSpec @@ -15,7 +15,6 @@ import akka.remote.testkit.STMultiNodeSpec import akka.remote.transport.ThrottlerTransportAdapter.Direction import akka.testkit._ import com.typesafe.config.ConfigFactory -import akka.remote.QuarantinedEvent object SurviveNetworkPartitionSpec extends MultiNodeConfig { val first = role("first") @@ -26,7 +25,7 @@ object SurviveNetworkPartitionSpec extends MultiNodeConfig { akka.loglevel = INFO akka.remote.artery.enabled = on akka.remote.artery.advanced.give-up-system-message-after = 4s - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) testTransport(on = true) } @@ -34,9 +33,7 @@ object SurviveNetworkPartitionSpec extends MultiNodeConfig { class SurviveNetworkPartitionSpecMultiJvmNode1 extends SurviveNetworkPartitionSpec class SurviveNetworkPartitionSpecMultiJvmNode2 extends SurviveNetworkPartitionSpec -abstract class SurviveNetworkPartitionSpec - extends MultiNodeSpec(SurviveNetworkPartitionSpec) - with STMultiNodeSpec with ImplicitSender { +abstract class SurviveNetworkPartitionSpec extends MultiNodeRemotingSpec(SurviveNetworkPartitionSpec) { import SurviveNetworkPartitionSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala index 318b71a4ad..27cce605b0 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRandomSpec.scala @@ -9,7 +9,8 @@ import akka.actor.ActorRef import akka.actor.Address import akka.actor.PoisonPill import akka.actor.Props -import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } +import akka.remote.MultiNodeRemotingSpec +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.routing.Broadcast import akka.routing.RandomPool import akka.routing.RoutedActorRef @@ -26,7 +27,7 @@ class RemoteRandomConfig(artery: Boolean) extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) deployOnAll(""" /service-hello { @@ -55,8 +56,8 @@ object RemoteRandomSpec { } } -class RemoteRandomSpec(multiNodeConfig: RemoteRandomConfig) extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender with DefaultTimeout { +class RemoteRandomSpec(multiNodeConfig: RemoteRandomConfig) extends MultiNodeRemotingSpec(multiNodeConfig) + with DefaultTimeout { import multiNodeConfig._ import RemoteRandomSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala index 157c8fb0e3..8c21c7c644 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteRoundRobinSpec.scala @@ -10,9 +10,11 @@ import akka.actor.ActorRef import akka.actor.Props import akka.actor.PoisonPill import akka.actor.Address + import scala.concurrent.Await import akka.pattern.ask -import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } +import akka.remote.MultiNodeRemotingSpec +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.routing.Broadcast import akka.routing.GetRoutees import akka.routing.Routees @@ -23,6 +25,7 @@ import akka.routing.Resizer import akka.routing.Routee import akka.routing.FromConfig import akka.testkit._ + import scala.concurrent.duration._ import com.typesafe.config.ConfigFactory @@ -36,7 +39,7 @@ class RemoteRoundRobinConfig(artery: Boolean) extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) deployOnAll(""" /service-hello { @@ -83,8 +86,8 @@ object RemoteRoundRobinSpec { } } -class RemoteRoundRobinSpec(multiNodeConfig: RemoteRoundRobinConfig) extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender with DefaultTimeout { +class RemoteRoundRobinSpec(multiNodeConfig: RemoteRoundRobinConfig) extends MultiNodeRemotingSpec(multiNodeConfig) + with DefaultTimeout { import multiNodeConfig._ import RemoteRoundRobinSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteScatterGatherSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteScatterGatherSpec.scala index 351595d837..d671d5779b 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteScatterGatherSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/routing/RemoteScatterGatherSpec.scala @@ -9,7 +9,8 @@ import akka.actor.ActorRef import akka.actor.Address import akka.actor.PoisonPill import akka.actor.Props -import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeConfig, MultiNodeSpec } +import akka.remote.MultiNodeRemotingSpec +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.routing.Broadcast import akka.routing.ScatterGatherFirstCompletedPool import akka.routing.RoutedActorRef @@ -27,7 +28,7 @@ class RemoteScatterGatherConfig(artery: Boolean) extends MultiNodeConfig { commonConfig(debugConfig(on = false).withFallback( ConfigFactory.parseString(s""" akka.remote.artery.enabled = $artery - """))) + """)).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) deployOnAll(""" /service-hello { @@ -56,8 +57,8 @@ object RemoteScatterGatherSpec { } } -class RemoteScatterGatherSpec(multiNodeConfig: RemoteScatterGatherConfig) extends MultiNodeSpec(multiNodeConfig) - with STMultiNodeSpec with ImplicitSender with DefaultTimeout { +class RemoteScatterGatherSpec(multiNodeConfig: RemoteScatterGatherConfig) extends MultiNodeRemotingSpec(multiNodeConfig) + with DefaultTimeout { import multiNodeConfig._ import RemoteScatterGatherSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala index a52ee6f10f..c5cb873d65 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testconductor/TestConductorSpec.scala @@ -5,7 +5,8 @@ package akka.remote.testconductor import language.postfixOps import com.typesafe.config.ConfigFactory -import akka.actor.{ Props, Actor, ActorIdentity, Identify, Deploy } +import akka.actor.{ Actor, ActorIdentity, Deploy, Identify, Props } + import scala.concurrent.Await import scala.concurrent.Awaitable import scala.concurrent.duration._ @@ -13,11 +14,13 @@ import akka.testkit.ImplicitSender import akka.testkit.LongRunningTest import java.net.InetSocketAddress import java.net.InetAddress -import akka.remote.testkit.{ STMultiNodeSpec, MultiNodeSpec, MultiNodeConfig } + +import akka.remote.MultiNodeRemotingSpec +import akka.remote.testkit.{ MultiNodeConfig, MultiNodeSpec, STMultiNodeSpec } import akka.remote.transport.ThrottlerTransportAdapter.Direction object TestConductorMultiJvmSpec extends MultiNodeConfig { - commonConfig(debugConfig(on = false)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) val master = role("master") val slave = role("slave") @@ -28,7 +31,7 @@ object TestConductorMultiJvmSpec extends MultiNodeConfig { class TestConductorMultiJvmNode1 extends TestConductorSpec class TestConductorMultiJvmNode2 extends TestConductorSpec -class TestConductorSpec extends MultiNodeSpec(TestConductorMultiJvmSpec) with STMultiNodeSpec with ImplicitSender { +class TestConductorSpec extends MultiNodeRemotingSpec(TestConductorMultiJvmSpec) { import TestConductorMultiJvmSpec._ diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala index ba7b00a469..99fc3b6add 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/testkit/MultiNodeSpecSpec.scala @@ -3,10 +3,11 @@ */ package akka.remote.testkit +import akka.remote.MultiNodeRemotingSpec import akka.testkit.LongRunningTest object MultiNodeSpecMultiJvmSpec extends MultiNodeConfig { - commonConfig(debugConfig(on = false)) + commonConfig(debugConfig(on = false).withFallback(MultiNodeRemotingSpec.arteryFlightRecordingConf)) val node1 = role("node1") val node2 = role("node2") @@ -19,7 +20,7 @@ class MultiNodeSpecSpecMultiJvmNode2 extends MultiNodeSpecSpec class MultiNodeSpecSpecMultiJvmNode3 extends MultiNodeSpecSpec class MultiNodeSpecSpecMultiJvmNode4 extends MultiNodeSpecSpec -class MultiNodeSpecSpec extends MultiNodeSpec(MultiNodeSpecMultiJvmSpec) with STMultiNodeSpec { +class MultiNodeSpecSpec extends MultiNodeRemotingSpec(MultiNodeSpecMultiJvmSpec) { import MultiNodeSpecMultiJvmSpec._ diff --git a/akka-remote/src/main/resources/reference.conf b/akka-remote/src/main/resources/reference.conf index 2a210b6ee5..9b8c83ce5d 100644 --- a/akka-remote/src/main/resources/reference.conf +++ b/akka-remote/src/main/resources/reference.conf @@ -285,6 +285,11 @@ akka { flight-recorder { // FIXME it should be enabled by default, but there is some concurrency issue that crashes the JVM enabled = off + # Controls where the flight recorder file will be written. There are three options: + # 1. Empty: a file will be generated in the temporary directory of the OS + # 2. A relative or absolute path ending with ".afr": this file will be used + # 3. A relative or absolute path: this directory will be used, the file will get a random file name + destination="" } # compression of common strings in remoting messages, like actor destinations, serializers etc diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala index 0d3c6cc1a8..eb14e7b506 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArterySettings.scala @@ -11,9 +11,11 @@ import akka.util.Helpers.{ ConfigOps, Requiring, toRootLowerCase } import akka.util.WildcardIndex import akka.NotUsed import com.typesafe.config.Config + import scala.collection.JavaConverters._ import scala.concurrent.duration._ import java.net.InetAddress +import java.nio.file.Path import java.util.concurrent.TimeUnit /** INTERNAL API */ @@ -108,6 +110,7 @@ private[akka] final class ArterySettings private (config: Config) { val DriverTimeout = config.getMillisDuration("driver-timeout").requiring(interval ⇒ interval > Duration.Zero, "driver-timeout must be more than zero") val FlightRecorderEnabled: Boolean = getBoolean("flight-recorder.enabled") + val FlightRecorderDestination: String = getString("flight-recorder.destination") val Compression = new Compression(getConfig("compression")) final val MaximumFrameSize = 1024 * 1024 diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index d9be34a281..9f7368a73c 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -6,21 +6,19 @@ package akka.remote.artery import java.io.File import java.net.InetSocketAddress import java.nio.channels.{ DatagramChannel, FileChannel } +import java.nio.file.Path import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.{ AtomicLong, AtomicReference } import java.util.concurrent.atomic.AtomicBoolean import scala.annotation.tailrec -import scala.collection.JavaConverters._ -import scala.concurrent.Future -import scala.concurrent.Promise +import scala.concurrent.{ Await, Future, Promise } import scala.concurrent.duration._ import scala.util.Failure import scala.util.Success import scala.util.Try import scala.util.control.NoStackTrace import scala.util.control.NonFatal - import akka.Done import akka.NotUsed import akka.actor._ @@ -426,8 +424,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R log.info("Remoting started; listening on address: {}", defaultAddress) } - private lazy val stopMediaDriverShutdownHook = new Thread { - override def run(): Unit = stopMediaDriver() + private lazy val shutdownHook = new Thread { + override def run(): Unit = { + if (!_shutdown) { + internalShutdown() + + } + } } private def startMediaDriver(): Unit = { @@ -464,7 +467,7 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R val driver = MediaDriver.launchEmbedded(driverContext) log.info("Started embedded media driver in directory [{}]", driver.aeronDirectoryName) topLevelFREvents.loFreq(Transport_MediaDriverStarted, driver.aeronDirectoryName().getBytes("US-ASCII")) - Runtime.getRuntime.addShutdownHook(stopMediaDriverShutdownHook) + Runtime.getRuntime.addShutdownHook(shutdownHook) if (!mediaDriver.compareAndSet(None, Some(driver))) { throw new IllegalStateException("media driver started more than once") } @@ -493,7 +496,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R "Couldn't delete Aeron embedded media driver files in [{}] due to [{}]", driver.aeronDirectoryName, e.getMessage) } - Try(Runtime.getRuntime.removeShutdownHook(stopMediaDriverShutdownHook)) } } @@ -742,14 +744,18 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R flushingPromise.future } implicit val ec = remoteDispatcher + flushing.recover { case _ ⇒ Done }.flatMap(_ ⇒ internalShutdown()) + } + private def internalShutdown(): Future[Done] = { + import system.dispatcher + + killSwitch.abort(ShutdownSignal) + topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) for { - _ ← flushing.recover { case _ ⇒ Done } - _ = killSwitch.abort(ShutdownSignal) _ ← streamsCompleted + _ ← taskRunner.stop() } yield { - topLevelFREvents.loFreq(Transport_KillSwitchPulled, NoMetaData) - taskRunner.stop() topLevelFREvents.loFreq(Transport_Stopped, NoMetaData) if (aeronErrorLogTask != null) { @@ -767,8 +773,6 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R flightRecorder.foreach(_.close()) afrFileChannel.foreach(_.force(true)) afrFileChannel.foreach(_.close()) - // TODO: Be smarter about this in tests and make it always-on-for prod - afrFile.foreach(_.delete()) Done } } @@ -957,11 +961,10 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R .toMat(messageDispatcherSink)(Keep.both) } - private def initializeFlightRecorder(): Option[(FileChannel, File, FlightRecorder)] = { + private def initializeFlightRecorder(): Option[(FileChannel, Path, FlightRecorder)] = { if (settings.Advanced.FlightRecorderEnabled) { - // TODO: Figure out where to put it, currently using temporary files - val afrFile = File.createTempFile("artery", ".afr") - afrFile.deleteOnExit() + val afrFile = FlightRecorder.createFlightRecorderFile(settings.Advanced.FlightRecorderDestination) + log.info("Flight recorder enabled, output can be found in '{}'", afrFile) val fileChannel = FlightRecorder.prepareFileForFlightRecorder(afrFile) Some((fileChannel, afrFile, new FlightRecorder(fileChannel))) diff --git a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala index c6583e6781..c6d63817a1 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorder.scala @@ -3,9 +3,9 @@ */ package akka.remote.artery -import java.io.{ File, RandomAccessFile } +import java.io.RandomAccessFile import java.nio.channels.FileChannel -import java.nio.file.StandardOpenOption +import java.nio.file._ import java.nio.{ ByteBuffer, ByteOrder } import java.util.concurrent.atomic.AtomicReference import java.util.concurrent.{ CountDownLatch, TimeUnit } @@ -184,13 +184,43 @@ private[remote] class RollingEventLogSection( */ private[remote] object FlightRecorder { - def prepareFileForFlightRecorder(file: File): FileChannel = { + /** + * @return A created file where the flight recorder file can be written. There are three options, depending + * on ``destination``: + * 1. Empty: a file will be generated in the temporary directory of the OS + * 2. A relative or absolute path ending with ".afr": this file will be used + * 3. A relative or absolute path: this directory will be used, the file will get a random file name + */ + def createFlightRecorderFile(destination: String, fs: FileSystem = FileSystems.getDefault): Path = { + + // TODO safer file permissions (e.g. only user readable on POSIX)? + destination match { + // not defined, use temporary directory + case "" ⇒ Files.createTempFile("artery", ".afr") + + case directory if directory.endsWith(".afr") ⇒ + val path = fs.getPath(directory).toAbsolutePath + if (!Files.exists(path)) { + Files.createDirectories(path.getParent) + Files.createFile(path) + } + path + + case directory ⇒ + val path = fs.getPath(directory).toAbsolutePath + if (!Files.exists(path)) Files.createDirectories(path) + + Files.createTempFile(path, "artery", ".afr") + } + } + + def prepareFileForFlightRecorder(path: Path): FileChannel = { // Force the size, otherwise memory mapping will fail on *nixes - val randomAccessFile = new RandomAccessFile(file, "rwd") + val randomAccessFile = new RandomAccessFile(path.toFile, "rwd") randomAccessFile.setLength(FlightRecorder.TotalSize) randomAccessFile.close() - FileChannel.open(file.toPath, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ) + FileChannel.open(path, StandardOpenOption.CREATE, StandardOpenOption.WRITE, StandardOpenOption.READ) } val Alignment = 64 * 1024 // Windows is picky about mapped section alignments diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderReader.scala b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala similarity index 86% rename from akka-remote/src/test/scala/akka/remote/artery/FlightRecorderReader.scala rename to akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala index fd5cbb805d..71682da8b3 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderReader.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/FlightRecorderReader.scala @@ -1,14 +1,18 @@ package akka.remote.artery -import java.io.IOException +import java.io.{ IOException, RandomAccessFile } import java.nio.channels.FileChannel +import java.nio.file.Path import java.time.Instant import org.agrona.concurrent.MappedResizeableBuffer import scala.collection.immutable -object FlightRecorderReader { +/** + * Internal API + */ +private[akka] object FlightRecorderReader { import FlightRecorder._ sealed trait LogState @@ -59,9 +63,39 @@ object FlightRecorderReader { recordSize = HiFreqRecordSize, entriesPerRecord = HiFreqBatchSize) + def dumpToStdout(flightRecorderFile: Path): Unit = { + var raFile: RandomAccessFile = null + var channel: FileChannel = null + var reader: FlightRecorderReader = null + try { + + raFile = new RandomAccessFile(flightRecorderFile.toFile, "rw") + channel = raFile.getChannel + reader = new FlightRecorderReader(channel) + println(reader.structure) + + println("--- ALERT ENTRIES") + reader.structure.alertLog.logs.foreach(log ⇒ println(log.richEntries.mkString("\n"))) + + println("--- HI FREQUENCY ENTRIES") + reader.structure.hiFreqLog.logs.foreach(log ⇒ println(log.compactEntries.mkString("\n"))) + + println("--- LO FREQUENCY ENTRIES") + reader.structure.loFreqLog.logs.foreach(log ⇒ println(log.richEntries.mkString("\n"))) + + } finally { + if (reader ne null) reader.close() + if (channel ne null) channel.close() + if (raFile ne null) raFile.close() + } + } + } -class FlightRecorderReader(fileChannel: FileChannel) { +/** + * Internal API + */ +private[akka] final class FlightRecorderReader(fileChannel: FileChannel) { import FlightRecorder._ import FlightRecorderReader._ diff --git a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala index 635c7ee517..922672bf6b 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/TaskRunner.scala @@ -3,22 +3,19 @@ */ package akka.remote.artery -import java.util.concurrent.TimeUnit.MICROSECONDS -import java.util.concurrent.TimeUnit.MILLISECONDS +import java.util.concurrent.TimeUnit.{MICROSECONDS, MILLISECONDS} + +import akka.Done +import akka.actor.ExtendedActorSystem +import akka.dispatch.{AbstractNodeQueue, MonitorableThreadFactory} +import akka.event.Logging +import org.agrona.concurrent.{BackoffIdleStrategy, BusySpinIdleStrategy, IdleStrategy, SleepingIdleStrategy} import scala.annotation.tailrec +import scala.concurrent.{Future, Promise} import scala.reflect.ClassTag import scala.util.control.NonFatal -import akka.actor.ExtendedActorSystem -import akka.dispatch.AbstractNodeQueue -import akka.dispatch.MonitorableThreadFactory -import akka.event.Logging -import org.agrona.concurrent.BackoffIdleStrategy -import org.agrona.concurrent.BusySpinIdleStrategy -import org.agrona.concurrent.IdleStrategy -import org.agrona.concurrent.SleepingIdleStrategy - /** * INTERNAL API */ @@ -112,6 +109,7 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: In private[this] var running = false private[this] val cmdQueue = new CommandQueue private[this] val tasks = new ArrayBag[Task] + private[this] val shutdown = Promise[Done]() private val idleStrategy = createIdleStrategy(idleCpuLevel) private var reset = false @@ -126,8 +124,9 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: In thread.start() } - def stop(): Unit = { + def stop(): Future[Done] = { command(Shutdown) + shutdown.future } def command(cmd: Command): Unit = { @@ -177,7 +176,9 @@ private[akka] class TaskRunner(system: ExtendedActorSystem, val idleCpuLevel: In case null ⇒ // no command case Add(task) ⇒ tasks.add(task) case Remove(task) ⇒ tasks.remove(task) - case Shutdown ⇒ running = false + case Shutdown ⇒ + running = false + shutdown.trySuccess(Done) } } diff --git a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala index 908bf25f3d..4e27bcc44d 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/ArteryMultiNodeSpec.scala @@ -3,14 +3,19 @@ */ package akka.remote.artery -import akka.actor.{ ActorSystem, ExtendedActorSystem, RootActorPath } +import java.nio.file.{ FileSystems, Files, Path } +import java.util.UUID + +import akka.actor.{ ActorSystem, RootActorPath } import akka.remote.RARP import akka.testkit.AkkaSpec import com.typesafe.config.{ Config, ConfigFactory } +import org.scalatest.Outcome object ArteryMultiNodeSpec { + def defaultConfig = - ConfigFactory.parseString(""" + ConfigFactory.parseString(s""" akka { actor.provider = remote actor.warn-about-java-serializer-usage = off @@ -20,6 +25,10 @@ object ArteryMultiNodeSpec { hostname = localhost port = 0 } + advanced.flight-recorder { + enabled=on + destination=target/flight-recorder-${UUID.randomUUID().toString}.afr + } } } """) @@ -41,6 +50,8 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF def address(sys: ActorSystem) = RARP(sys).provider.getDefaultAddress def rootActorPath(sys: ActorSystem) = RootActorPath(address(sys)) def nextGeneratedSystemName = s"${localSystem.name}-remote-${remoteSystems.size}" + private val flightRecorderFile: Path = + FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination) private var remoteSystems: Vector[ActorSystem] = Vector.empty @@ -62,9 +73,32 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF remoteSystem } + // keep track of failure so that we can print flight recorder output on failures + private var failed = false + override protected def withFixture(test: NoArgTest): Outcome = { + val out = super.withFixture(test) + if (!out.isSucceeded) failed = true + out + } + + override protected def beforeTermination(): Unit = { + handleFlightRecorderFile() + } + override def afterTermination(): Unit = { remoteSystems.foreach(sys ⇒ shutdown(sys)) remoteSystems = Vector.empty } + private def handleFlightRecorderFile(): Unit = { + if (Files.exists(flightRecorderFile)) { + if (failed) { + // logger may not be alive anymore so we have to use stdout here + println("Flight recorder dump:") + FlightRecorderReader.dumpToStdout(flightRecorderFile) + } + Files.delete(flightRecorderFile) + } + } + } diff --git a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala index 3dd3030fb6..4a04660b9b 100644 --- a/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/artery/FlightRecorderSpec.scala @@ -6,12 +6,13 @@ package akka.remote.artery import java.io.{ File, IOException, RandomAccessFile } import java.nio.channels.FileChannel -import java.nio.file.StandardOpenOption +import java.nio.file.{ Files, Path, StandardOpenOption } import java.time.Instant import java.util.Arrays import java.util.concurrent.{ CountDownLatch, TimeUnit } import akka.testkit.AkkaSpec +import com.google.common.jimfs.Jimfs class FlightRecorderSpec extends AkkaSpec { import FlightRecorderReader._ @@ -372,6 +373,34 @@ class FlightRecorderSpec extends AkkaSpec { } } + "create flight recorder file" in { + def assertFileIsSound(path: Path) = { + Files.exists(path) should ===(true) + Files.isRegularFile(path) should ===(true) + Files.isWritable(path) should ===(true) + Files.isReadable(path) should ===(true) + } + val fs = Jimfs.newFileSystem() + + try { + val tmpPath = FlightRecorder.createFlightRecorderFile("", fs) + assertFileIsSound(tmpPath) + // this is likely in the actual file system, so lets delete it + Files.delete(tmpPath) + + Files.createDirectory(fs.getPath("/directory")) + val tmpFileInGivenPath = FlightRecorder.createFlightRecorderFile("/directory", fs) + assertFileIsSound(tmpFileInGivenPath) + + val specificFile = FlightRecorder.createFlightRecorderFile("/directory/flight-recorder.afr", fs) + assertFileIsSound(specificFile) + + } finally { + fs.close() + } + + } + } private def withFlightRecorder(body: (FlightRecorder, FlightRecorderReader, FileChannel) ⇒ Unit): Unit = { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index c5935a8b2f..247a42e198 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -140,7 +140,7 @@ object Dependencies { val actorTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.commonsCodec, Test.commonsMath, Test.mockito, Test.scalacheck.value, Test.junitIntf) - val remote = l ++= Seq(netty, uncommonsMath, aeronDriver, aeronClient, Test.junit, Test.scalatest.value) + val remote = l ++= Seq(netty, uncommonsMath, aeronDriver, aeronClient, Test.junit, Test.scalatest.value, Test.jimfs) val remoteTests = l ++= Seq(Test.junit, Test.scalatest.value, Test.scalaXml)