Add flight recording and dump on failure for more artery tests

This commit is contained in:
Johan Andrén 2016-11-29 11:51:31 +01:00
parent 267f31149c
commit fca3e1e485
19 changed files with 149 additions and 213 deletions

View file

@ -12,33 +12,12 @@ import akka.testkit.AkkaSpec
import com.typesafe.config.{ Config, ConfigFactory }
import org.scalatest.Outcome
object ArteryMultiNodeSpec {
def defaultConfig =
ConfigFactory.parseString(s"""
akka {
actor.provider = remote
actor.warn-about-java-serializer-usage = off
remote.artery {
enabled = on
canonical {
hostname = localhost
port = 0
}
advanced.flight-recorder {
enabled=on
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
}
}
}
""")
}
/**
* Base class for remoting tests what needs to test interaction between a "local" actor system
* which is always created (the usual AkkaSpec system), and multiple additional actor systems over artery
*/
abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArteryMultiNodeSpec.defaultConfig)) {
abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArterySpecSupport.defaultConfig))
with FlightRecorderSpecIntegration {
def this() = this(ConfigFactory.empty())
def this(extraConfig: String) = this(ConfigFactory.parseString(extraConfig))
@ -50,8 +29,6 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF
def address(sys: ActorSystem) = RARP(sys).provider.getDefaultAddress
def rootActorPath(sys: ActorSystem) = RootActorPath(address(sys))
def nextGeneratedSystemName = s"${localSystem.name}-remote-${remoteSystems.size}"
private val flightRecorderFile: Path =
FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination)
private var remoteSystems: Vector[ActorSystem] = Vector.empty
@ -73,29 +50,10 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF
remoteSystem
}
// keep track of failure so that we can print flight recorder output on failures
private var failed = false
override protected def withFixture(test: NoArgTest): Outcome = {
val out = super.withFixture(test)
if (!out.isSucceeded) failed = true
out
}
override def afterTermination(): Unit = {
remoteSystems.foreach(sys shutdown(sys))
remoteSystems = Vector.empty
handleFlightRecorderFile()
}
private def handleFlightRecorderFile(): Unit = {
if (Files.exists(flightRecorderFile)) {
if (failed) {
// logger may not be alive anymore so we have to use stdout here
println("Flight recorder dump:")
FlightRecorderReader.dumpToStdout(flightRecorderFile)
}
Files.delete(flightRecorderFile)
}
super.afterTermination()
}
}

View file

@ -0,0 +1,81 @@
/*
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
*/
package akka.remote.artery
import java.nio.file.{ FileSystems, Files, Path }
import java.util.UUID
import akka.actor.ActorSystem
import akka.remote.RARP
import akka.testkit.AkkaSpec
import com.typesafe.config.ConfigFactory
import org.scalatest.Outcome
object ArterySpecSupport {
// same for all artery enabled remoting tests
private val staticArteryRemotingConfig = ConfigFactory.parseString(s"""
akka {
actor {
provider = remote
warn-about-java-serializer-usage = off
serialize-creators = off
}
remote.artery {
enabled = on
canonical {
hostname = localhost
port = 0
}
}
}""")
/** artery enabled, flight recorder enabled, dynamic selection of port on localhost */
def defaultConfig =
ConfigFactory.parseString(s"""
akka {
remote.artery {
advanced.flight-recorder {
enabled=on
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
}
}
}
""").withFallback(staticArteryRemotingConfig)
}
/**
* Dumps flight recorder data on test failure if artery flight recorder is enabled
*/
trait FlightRecorderSpecIntegration { self: AkkaSpec
def system: ActorSystem
private val flightRecorderFile: Path =
FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination)
// keep track of failure so that we can print flight recorder output on failures
private var failed = false
override protected def withFixture(test: NoArgTest): Outcome = {
val out = test()
if (!out.isSucceeded) failed = true
out
}
override def afterTermination(): Unit = {
self.afterTermination()
handleFlightRecorderFile()
}
protected def handleFlightRecorderFile(): Unit = {
if (Files.exists(flightRecorderFile)) {
if (failed) {
// logger may not be alive anymore so we have to use stdout here
println("Flight recorder dump:")
FlightRecorderReader.dumpToStdout(flightRecorderFile)
}
Files.delete(flightRecorderFile)
}
}
}

View file

@ -11,21 +11,7 @@ import com.typesafe.config.ConfigFactory
import scala.concurrent.Await
import scala.concurrent.duration._
object FlushOnShutdownSpec {
val config = ConfigFactory.parseString(s"""
akka {
actor.provider = remote
actor.serialize-creators = off
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
}
""")
}
class FlushOnShutdownSpec extends ArteryMultiNodeSpec(FlushOnShutdownSpec.config) {
class FlushOnShutdownSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) {
val remoteSystem = newRemoteSystem()

View file

@ -14,15 +14,9 @@ object HandshakeDenySpec {
val commonConfig = ConfigFactory.parseString(s"""
akka.loglevel = WARNING
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
remote.artery.advanced.handshake-timeout = 2s
remote.artery.advanced.image-liveness-timeout = 1.9s
}
""")
akka.remote.artery.advanced.handshake-timeout = 2s
akka.remote.artery.advanced.image-liveness-timeout = 1.9s
""").withFallback(ArterySpecSupport.defaultConfig)
}

View file

@ -18,22 +18,16 @@ object HandshakeFailureSpec {
val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
val commonConfig = ConfigFactory.parseString(s"""
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
remote.artery.advanced.handshake-timeout = 2s
remote.artery.advanced.image-liveness-timeout = 1.9s
}
""")
akka.remote.artery.advanced.handshake-timeout = 2s
akka.remote.artery.advanced.image-liveness-timeout = 1.9s
""").withFallback(ArterySpecSupport.defaultConfig)
val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
.withFallback(commonConfig)
}
class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender {
class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender with FlightRecorderSpecIntegration {
import HandshakeFailureSpec._
var systemB: ActorSystem = null

View file

@ -17,22 +17,16 @@ object HandshakeRetrySpec {
val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
val commonConfig = ConfigFactory.parseString(s"""
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
remote.artery.advanced.handshake-timeout = 10s
remote.artery.advanced.image-liveness-timeout = 7s
}
""")
akka.remote.artery.advanced.handshake-timeout = 10s
akka.remote.artery.advanced.image-liveness-timeout = 7s
""").withFallback(ArterySpecSupport.defaultConfig)
val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
.withFallback(commonConfig)
}
class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender {
class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender with FlightRecorderSpecIntegration {
import HandshakeRetrySpec._
var systemB: ActorSystem = null
@ -58,7 +52,9 @@ class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with
}
override def afterTermination(): Unit =
override def afterTermination(): Unit = {
if (systemB != null) shutdown(systemB)
super.afterTermination()
}
}

View file

@ -18,15 +18,9 @@ import com.typesafe.config.ConfigFactory
object LateConnectSpec {
val config = ConfigFactory.parseString(s"""
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
remote.artery.advanced.handshake-timeout = 3s
remote.artery.advanced.image-liveness-timeout = 2.9s
}
""")
akka.remote.artery.advanced.handshake-timeout = 3s
akka.remote.artery.advanced.image-liveness-timeout = 2.9s
""").withFallback(ArterySpecSupport.defaultConfig)
}
@ -61,6 +55,9 @@ class LateConnectSpec extends AkkaSpec(LateConnectSpec.config) with ImplicitSend
}
}
override def afterTermination(): Unit = shutdown(systemB)
override def afterTermination(): Unit = {
shutdown(systemB)
super.afterTermination()
}
}

View file

@ -3,19 +3,12 @@
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.Address
import akka.remote.EndpointManager.Send
import akka.remote.RemoteActorRef
import akka.remote.UniqueAddress
import akka.remote.artery.SystemMessageDelivery._
import akka.stream.ActorMaterializer
import akka.stream.ActorMaterializerSettings
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
import akka.stream.scaladsl.Keep
import akka.stream.testkit.scaladsl.TestSink
import akka.stream.testkit.scaladsl.TestSource
import akka.testkit.AkkaSpec
import akka.testkit.ImplicitSender
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import akka.util.OptionVal
object OutboundControlJunctionSpec {

View file

@ -26,14 +26,11 @@ object RemoteDeathWatchSpec {
}
}
remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
}
""")
""").withFallback(ArterySpecSupport.defaultConfig)
}
class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec with FlightRecorderSpecIntegration {
import RemoteDeathWatchSpec._
system.eventStream.publish(TestEvent.Mute(
@ -44,6 +41,7 @@ class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with Im
override def afterTermination() {
shutdown(other)
super.afterTermination()
}
override def expectedTestDuration: FiniteDuration = 120.seconds

View file

@ -12,7 +12,6 @@ import akka.remote.RemoteScope
object RemoteDeployerSpec {
val deployerConf = ConfigFactory.parseString("""
akka.actor.provider = remote
akka.actor.deployment {
/service2 {
router = round-robin-pool
@ -21,10 +20,7 @@ object RemoteDeployerSpec {
dispatcher = mydispatcher
}
}
akka.remote.artery.enabled = on
akka.remote.artery.canonical.hostname = localhost
akka.remote.artery.canonical.port = 0
""", ConfigParseOptions.defaults)
""").withFallback(ArterySpecSupport.defaultConfig)
class RecipeActor extends Actor {
def receive = { case _ }
@ -32,7 +28,7 @@ object RemoteDeployerSpec {
}
class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) with FlightRecorderSpecIntegration {
"A RemoteDeployer" must {

View file

@ -32,13 +32,7 @@ object RemoteDeploymentSpec {
}
}
class RemoteDeploymentSpec extends AkkaSpec("""
#akka.loglevel=DEBUG
akka.actor.provider = remote
akka.remote.artery.enabled = on
akka.remote.artery.canonical.hostname = localhost
akka.remote.artery.canonical.port = 0
""") {
class RemoteDeploymentSpec extends AkkaSpec(ArterySpecSupport.defaultConfig) {
import RemoteDeploymentSpec._

View file

@ -21,11 +21,7 @@ object RemoteRouterSpec {
}
}
class RemoteRouterSpec extends AkkaSpec("""
akka.actor.provider = remote
akka.remote.artery.enabled = on
akka.remote.artery.canonical.hostname = localhost
akka.remote.artery.canonical.port = 0
class RemoteRouterSpec extends AkkaSpec(ConfigFactory.parseString("""
akka.actor.deployment {
/remote-override {
router = round-robin-pool
@ -39,7 +35,7 @@ class RemoteRouterSpec extends AkkaSpec("""
router = round-robin-pool
nr-of-instances = 6
}
}""") {
}""").withFallback(ArterySpecSupport.defaultConfig)) with FlightRecorderSpecIntegration {
import RemoteRouterSpec._
@ -85,6 +81,7 @@ class RemoteRouterSpec extends AkkaSpec("""
override def afterTermination(): Unit = {
shutdown(masterSystem)
super.afterTermination()
}
def collectRouteePaths(probe: TestProbe, router: ActorRef, n: Int): immutable.Seq[ActorPath] = {

View file

@ -3,38 +3,20 @@
*/
package akka.remote.artery
import scala.concurrent.duration._
import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSystem, Identify, Props, RootActorPath }
import akka.testkit.{ AkkaSpec, ImplicitSender }
import com.typesafe.config.ConfigFactory
import akka.actor.Actor.Receive
import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Deploy, Identify, PoisonPill, Props, RootActorPath }
import akka.remote.RARP
import akka.testkit.TestActors
import akka.actor.PoisonPill
import akka.testkit.TestProbe
import akka.actor.ActorRef
import com.typesafe.config.Config
import akka.testkit.{ AkkaSpec, ImplicitSender, TestActors, TestProbe }
import com.typesafe.config.{ Config, ConfigFactory }
object RemoteSendConsistencySpec {
import scala.concurrent.duration._
val config = ConfigFactory.parseString(s"""
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
}
""")
}
class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(RemoteSendConsistencySpec.config)
class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(ArterySpecSupport.defaultConfig)
class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
ConfigFactory.parseString("""
akka.remote.artery.advanced.outbound-lanes = 3
akka.remote.artery.advanced.inbound-lanes = 3
""").withFallback(RemoteSendConsistencySpec.config))
""").withFallback(ArterySpecSupport.defaultConfig)) with FlightRecorderSpecIntegration
abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
@ -137,6 +119,9 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpe
}
override def afterTermination(): Unit = shutdown(systemB)
override def afterTermination(): Unit = {
shutdown(systemB)
super.shutdown(systemB)
}
}

View file

@ -63,15 +63,7 @@ object RemoteWatcherSpec {
}
class RemoteWatcherSpec extends AkkaSpec(
"""akka {
loglevel = INFO
log-dead-letters-during-shutdown = false
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
}""") with ImplicitSender {
class RemoteWatcherSpec extends AkkaSpec(ArterySpecSupport.defaultConfig) with ImplicitSender with FlightRecorderSpecIntegration {
import RemoteWatcherSpec._
import RemoteWatcher._
@ -88,6 +80,7 @@ class RemoteWatcherSpec extends AkkaSpec(
override def afterTermination() {
shutdown(remoteSystem)
super.afterTermination()
}
val heartbeatRspB = ArteryHeartbeatRsp(remoteAddressUid)

View file

@ -13,24 +13,11 @@ import akka.testkit.EventFilter
object SerializationErrorSpec {
val config = ConfigFactory.parseString(s"""
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
actor {
serialize-creators = false
serialize-messages = false
}
}
""")
object NotSerializableMsg
}
class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) with ImplicitSender {
class SerializationErrorSpec extends AkkaSpec(ArterySpecSupport.defaultConfig) with ImplicitSender {
import SerializationErrorSpec._
val configB = ConfigFactory.parseString("""

View file

@ -31,23 +31,11 @@ import akka.util.OptionVal
object SystemMessageDeliverySpec {
val config = ConfigFactory.parseString(s"""
akka.loglevel=INFO
akka {
actor.provider = remote
remote.artery.enabled = on
remote.artery.canonical.hostname = localhost
remote.artery.canonical.port = 0
}
akka.actor.serialize-creators = off
akka.actor.serialize-messages = off
""")
case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage
}
class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.config) with ImplicitSender {
class SystemMessageDeliverySpec extends AkkaSpec(ArterySpecSupport.defaultConfig) with ImplicitSender with FlightRecorderSpecIntegration {
import SystemMessageDeliverySpec._
val addressA = UniqueAddress(
@ -63,7 +51,10 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16)
override def afterTermination(): Unit = shutdown(systemB)
override def afterTermination(): Unit = {
shutdown(systemB)
super.afterTermination()
}
private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = {
val deadLetters = TestProbe().ref

View file

@ -59,26 +59,21 @@ object UntrustedSpec {
}
}
val config = ConfigFactory.parseString(
"""
akka.remote.artery.untrusted-mode = on
akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ]
akka.loglevel = DEBUG # the test is verifying some Debug logging
"""
).withFallback(ArterySpecSupport.defaultConfig)
}
class UntrustedSpec extends AkkaSpec("""
akka.actor.provider = remote
akka.remote.artery.untrusted-mode = on
akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ]
akka.remote.artery.enabled = on
akka.remote.artery.canonical.hostname = localhost
akka.remote.artery.canonical.port = 0
akka.loglevel = DEBUG # the test is verifying some Debug logging
""") with ImplicitSender {
class UntrustedSpec extends AkkaSpec(UntrustedSpec.config) with ImplicitSender with FlightRecorderSpecIntegration {
import UntrustedSpec._
val client = ActorSystem("UntrustedSpec-client", ConfigFactory.parseString("""
akka.actor.provider = remote
akka.remote.artery.enabled = on
akka.remote.artery.canonical.hostname = localhost
akka.remote.artery.canonical.port = 0
"""))
val client = ActorSystem("UntrustedSpec-client", ArterySpecSupport.defaultConfig)
val addr = RARP(system).provider.getDefaultAddress
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
@ -100,6 +95,7 @@ class UntrustedSpec extends AkkaSpec("""
override def afterTermination() {
shutdown(client)
super.afterTermination()
}
// need to enable debug log-level without actually printing those messages

View file

@ -227,7 +227,7 @@ object Source {
* `create` factory is never called and the materialized `CompletionStage` is failed.
*/
def lazily[T, M](create: function.Creator[Source[T, M]]): Source[T, CompletionStage[M]] =
scaladsl.Source.lazily[T, M](() => create.create().asScala).mapMaterializedValue(_.toJava).asJava
scaladsl.Source.lazily[T, M](() create.create().asScala).mapMaterializedValue(_.toJava).asJava
/**
* Creates a `Source` that is materialized as a [[org.reactivestreams.Subscriber]]

View file

@ -360,7 +360,7 @@ object Source {
* the materialized future is completed with its value, if downstream cancels or fails without any demand the
* create factory is never called and the materialized `Future` is failed.
*/
def lazily[T, M](create: () => Source[T, M]): Source[T, Future[M]] =
def lazily[T, M](create: () Source[T, M]): Source[T, Future[M]] =
Source.fromGraph(new LazySource[T, M](create))
/**