Merge pull request #21904 from johanandren/wip-enable-afr-in-artery-tests-johanandren
Enable flight recorder in more artery tests
This commit is contained in:
commit
6ac50fd945
18 changed files with 195 additions and 306 deletions
|
|
@ -12,33 +12,12 @@ import akka.testkit.AkkaSpec
|
|||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
import org.scalatest.Outcome
|
||||
|
||||
object ArteryMultiNodeSpec {
|
||||
|
||||
def defaultConfig =
|
||||
ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
actor.warn-about-java-serializer-usage = off
|
||||
remote.artery {
|
||||
enabled = on
|
||||
canonical {
|
||||
hostname = localhost
|
||||
port = 0
|
||||
}
|
||||
advanced.flight-recorder {
|
||||
enabled=on
|
||||
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
|
||||
}
|
||||
}
|
||||
}
|
||||
""")
|
||||
}
|
||||
|
||||
/**
|
||||
* Base class for remoting tests what needs to test interaction between a "local" actor system
|
||||
* which is always created (the usual AkkaSpec system), and multiple additional actor systems over artery
|
||||
*/
|
||||
abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArteryMultiNodeSpec.defaultConfig)) {
|
||||
abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withFallback(ArterySpecSupport.defaultConfig))
|
||||
with FlightRecorderSpecIntegration {
|
||||
|
||||
def this() = this(ConfigFactory.empty())
|
||||
def this(extraConfig: String) = this(ConfigFactory.parseString(extraConfig))
|
||||
|
|
@ -50,8 +29,6 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF
|
|||
def address(sys: ActorSystem) = RARP(sys).provider.getDefaultAddress
|
||||
def rootActorPath(sys: ActorSystem) = RootActorPath(address(sys))
|
||||
def nextGeneratedSystemName = s"${localSystem.name}-remote-${remoteSystems.size}"
|
||||
private val flightRecorderFile: Path =
|
||||
FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination)
|
||||
|
||||
private var remoteSystems: Vector[ActorSystem] = Vector.empty
|
||||
|
||||
|
|
@ -61,11 +38,11 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF
|
|||
*/
|
||||
def newRemoteSystem(extraConfig: Option[String] = None, name: Option[String] = None): ActorSystem = {
|
||||
val config =
|
||||
extraConfig.fold(
|
||||
ArterySpecSupport.newFlightRecorderConfig.withFallback(extraConfig.fold(
|
||||
localSystem.settings.config
|
||||
)(
|
||||
str ⇒ ConfigFactory.parseString(str).withFallback(localSystem.settings.config)
|
||||
)
|
||||
))
|
||||
|
||||
val remoteSystem = ActorSystem(name.getOrElse(nextGeneratedSystemName), config)
|
||||
remoteSystems = remoteSystems :+ remoteSystem
|
||||
|
|
@ -73,29 +50,11 @@ abstract class ArteryMultiNodeSpec(config: Config) extends AkkaSpec(config.withF
|
|||
remoteSystem
|
||||
}
|
||||
|
||||
// keep track of failure so that we can print flight recorder output on failures
|
||||
private var failed = false
|
||||
override protected def withFixture(test: NoArgTest): Outcome = {
|
||||
val out = super.withFixture(test)
|
||||
if (!out.isSucceeded) failed = true
|
||||
out
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit = {
|
||||
remoteSystems.foreach(sys ⇒ shutdown(sys))
|
||||
(system +: remoteSystems).foreach(handleFlightRecorderFile)
|
||||
remoteSystems = Vector.empty
|
||||
handleFlightRecorderFile()
|
||||
}
|
||||
|
||||
private def handleFlightRecorderFile(): Unit = {
|
||||
if (Files.exists(flightRecorderFile)) {
|
||||
if (failed) {
|
||||
// logger may not be alive anymore so we have to use stdout here
|
||||
println("Flight recorder dump:")
|
||||
FlightRecorderReader.dumpToStdout(flightRecorderFile)
|
||||
}
|
||||
Files.delete(flightRecorderFile)
|
||||
}
|
||||
super.afterTermination()
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -0,0 +1,92 @@
|
|||
/*
|
||||
* Copyright (C) 2016 Lightbend Inc. <http://www.lightbend.com>
|
||||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import java.nio.file.{ FileSystems, Files, Path }
|
||||
import java.util.UUID
|
||||
|
||||
import akka.actor.ActorSystem
|
||||
import akka.remote.RARP
|
||||
import akka.testkit.AkkaSpec
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.Outcome
|
||||
|
||||
object ArterySpecSupport {
|
||||
// same for all artery enabled remoting tests
|
||||
private val staticArteryRemotingConfig = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor {
|
||||
provider = remote
|
||||
warn-about-java-serializer-usage = off
|
||||
serialize-creators = off
|
||||
}
|
||||
remote.artery {
|
||||
enabled = on
|
||||
canonical {
|
||||
hostname = localhost
|
||||
port = 0
|
||||
}
|
||||
}
|
||||
}""")
|
||||
|
||||
def newFlightRecorderConfig =
|
||||
ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
remote.artery {
|
||||
advanced.flight-recorder {
|
||||
enabled=on
|
||||
destination=target/flight-recorder-${UUID.randomUUID().toString}.afr
|
||||
}
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
/**
|
||||
* Artery enabled, flight recorder enabled, dynamic selection of port on localhost.
|
||||
* Combine with [[FlightRecorderSpecIntegration]] or remember to delete flight recorder file if using manually
|
||||
*/
|
||||
def defaultConfig = newFlightRecorderConfig.withFallback(staticArteryRemotingConfig)
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* Dumps flight recorder data on test failure if artery flight recorder is enabled
|
||||
*
|
||||
* Important note: if you more than one (the default AkkaSpec.system) systems you need to override
|
||||
* afterTermination and call handleFlightRecorderFile manually in the spec or else it will not be dumped
|
||||
* on failure but also leak the afr file
|
||||
*/
|
||||
trait FlightRecorderSpecIntegration { self: AkkaSpec ⇒
|
||||
|
||||
def system: ActorSystem
|
||||
|
||||
protected final def flightRecorderFileFor(system: ActorSystem): Path =
|
||||
FileSystems.getDefault.getPath(RARP(system).provider.remoteSettings.Artery.Advanced.FlightRecorderDestination)
|
||||
|
||||
// keep track of failure so that we can print flight recorder output on failures
|
||||
protected final def failed = _failed
|
||||
private var _failed = false
|
||||
override protected def withFixture(test: NoArgTest): Outcome = {
|
||||
val out = test()
|
||||
if (!out.isSucceeded) _failed = true
|
||||
out
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit = {
|
||||
self.afterTermination()
|
||||
handleFlightRecorderFile(system)
|
||||
}
|
||||
|
||||
protected def handleFlightRecorderFile(system: ActorSystem): Unit = {
|
||||
val flightRecorderFile = flightRecorderFileFor(system)
|
||||
if (Files.exists(flightRecorderFile)) {
|
||||
if (failed) {
|
||||
// logger may not be alive anymore so we have to use stdout here
|
||||
println(s"Flight recorder dump for system [${system.name}]:")
|
||||
FlightRecorderReader.dumpToStdout(flightRecorderFile)
|
||||
}
|
||||
Files.delete(flightRecorderFile)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
@ -11,21 +11,7 @@ import com.typesafe.config.ConfigFactory
|
|||
import scala.concurrent.Await
|
||||
import scala.concurrent.duration._
|
||||
|
||||
object FlushOnShutdownSpec {
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
actor.serialize-creators = off
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
}
|
||||
""")
|
||||
|
||||
}
|
||||
|
||||
class FlushOnShutdownSpec extends ArteryMultiNodeSpec(FlushOnShutdownSpec.config) {
|
||||
class FlushOnShutdownSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) {
|
||||
|
||||
val remoteSystem = newRemoteSystem()
|
||||
|
||||
|
|
|
|||
|
|
@ -14,15 +14,9 @@ object HandshakeDenySpec {
|
|||
|
||||
val commonConfig = ConfigFactory.parseString(s"""
|
||||
akka.loglevel = WARNING
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
remote.artery.advanced.handshake-timeout = 2s
|
||||
remote.artery.advanced.image-liveness-timeout = 1.9s
|
||||
}
|
||||
""")
|
||||
akka.remote.artery.advanced.handshake-timeout = 2s
|
||||
akka.remote.artery.advanced.image-liveness-timeout = 1.9s
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -18,26 +18,15 @@ object HandshakeFailureSpec {
|
|||
val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
|
||||
|
||||
val commonConfig = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
remote.artery.advanced.handshake-timeout = 2s
|
||||
remote.artery.advanced.image-liveness-timeout = 1.9s
|
||||
}
|
||||
""")
|
||||
|
||||
val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
|
||||
.withFallback(commonConfig)
|
||||
akka.remote.artery.advanced.handshake-timeout = 2s
|
||||
akka.remote.artery.advanced.image-liveness-timeout = 1.9s
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
}
|
||||
|
||||
class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender {
|
||||
class HandshakeFailureSpec extends ArteryMultiNodeSpec(HandshakeFailureSpec.commonConfig) with ImplicitSender {
|
||||
import HandshakeFailureSpec._
|
||||
|
||||
var systemB: ActorSystem = null
|
||||
|
||||
"Artery handshake" must {
|
||||
|
||||
"allow for timeout and later connect" in {
|
||||
|
|
@ -45,7 +34,9 @@ class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) w
|
|||
sel ! "hello"
|
||||
expectNoMsg(3.seconds) // longer than handshake-timeout
|
||||
|
||||
systemB = ActorSystem("systemB", HandshakeFailureSpec.configB)
|
||||
val systemB = newRemoteSystem(
|
||||
name = Some("systemB"),
|
||||
extraConfig = Some(s"akka.remote.artery.canonical.port = $portB"))
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
|
||||
within(10.seconds) {
|
||||
|
|
@ -65,7 +56,4 @@ class HandshakeFailureSpec extends AkkaSpec(HandshakeFailureSpec.commonConfig) w
|
|||
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit =
|
||||
if (systemB != null) shutdown(systemB)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -17,26 +17,15 @@ object HandshakeRetrySpec {
|
|||
val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
|
||||
|
||||
val commonConfig = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
remote.artery.advanced.handshake-timeout = 10s
|
||||
remote.artery.advanced.image-liveness-timeout = 7s
|
||||
}
|
||||
""")
|
||||
|
||||
val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
|
||||
.withFallback(commonConfig)
|
||||
akka.remote.artery.advanced.handshake-timeout = 10s
|
||||
akka.remote.artery.advanced.image-liveness-timeout = 7s
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
}
|
||||
|
||||
class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender {
|
||||
class HandshakeRetrySpec extends ArteryMultiNodeSpec(HandshakeRetrySpec.commonConfig) with ImplicitSender {
|
||||
import HandshakeRetrySpec._
|
||||
|
||||
var systemB: ActorSystem = null
|
||||
|
||||
"Artery handshake" must {
|
||||
|
||||
"be retried during handshake-timeout (no message loss)" in {
|
||||
|
|
@ -44,7 +33,10 @@ class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with
|
|||
sel ! "hello"
|
||||
expectNoMsg(1.second)
|
||||
|
||||
systemB = ActorSystem("systemB", HandshakeRetrySpec.configB)
|
||||
val systemB = newRemoteSystem(
|
||||
name = Some("systemB"),
|
||||
extraConfig = Some(s"akka.remote.artery.canonical.port = $portB")
|
||||
)
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
|
||||
expectMsg("hello")
|
||||
|
|
@ -58,7 +50,4 @@ class HandshakeRetrySpec extends AkkaSpec(HandshakeRetrySpec.commonConfig) with
|
|||
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit =
|
||||
if (systemB != null) shutdown(systemB)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -18,24 +18,18 @@ import com.typesafe.config.ConfigFactory
|
|||
object LateConnectSpec {
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
remote.artery.advanced.handshake-timeout = 3s
|
||||
remote.artery.advanced.image-liveness-timeout = 2.9s
|
||||
}
|
||||
""")
|
||||
akka.remote.artery.advanced.handshake-timeout = 3s
|
||||
akka.remote.artery.advanced.image-liveness-timeout = 2.9s
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
}
|
||||
|
||||
class LateConnectSpec extends AkkaSpec(LateConnectSpec.config) with ImplicitSender {
|
||||
class LateConnectSpec extends ArteryMultiNodeSpec(LateConnectSpec.config) with ImplicitSender {
|
||||
|
||||
val portB = SocketUtil.temporaryServerAddress("localhost", udp = true).getPort
|
||||
val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
|
||||
.withFallback(system.settings.config)
|
||||
lazy val systemB = ActorSystem("systemB", configB)
|
||||
lazy val systemB = newRemoteSystem(
|
||||
name = Some("systemB"),
|
||||
extraConfig = Some(s"akka.remote.artery.canonical.port = $portB"))
|
||||
|
||||
"Connection" must {
|
||||
|
||||
|
|
@ -60,7 +54,4 @@ class LateConnectSpec extends AkkaSpec(LateConnectSpec.config) with ImplicitSend
|
|||
expectMsg("ping3")
|
||||
}
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit = shutdown(systemB)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -3,19 +3,12 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.Address
|
||||
import akka.remote.EndpointManager.Send
|
||||
import akka.remote.RemoteActorRef
|
||||
import akka.remote.UniqueAddress
|
||||
import akka.remote.artery.SystemMessageDelivery._
|
||||
import akka.stream.ActorMaterializer
|
||||
import akka.stream.ActorMaterializerSettings
|
||||
import akka.stream.{ ActorMaterializer, ActorMaterializerSettings }
|
||||
import akka.stream.scaladsl.Keep
|
||||
import akka.stream.testkit.scaladsl.TestSink
|
||||
import akka.stream.testkit.scaladsl.TestSource
|
||||
import akka.testkit.AkkaSpec
|
||||
import akka.testkit.ImplicitSender
|
||||
import akka.stream.testkit.scaladsl.{ TestSink, TestSource }
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||
import akka.util.OptionVal
|
||||
|
||||
object OutboundControlJunctionSpec {
|
||||
|
|
|
|||
|
|
@ -26,25 +26,17 @@ object RemoteDeathWatchSpec {
|
|||
}
|
||||
}
|
||||
remote.watch-failure-detector.acceptable-heartbeat-pause = 3s
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
}
|
||||
""")
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
}
|
||||
|
||||
class RemoteDeathWatchSpec extends AkkaSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
|
||||
class RemoteDeathWatchSpec extends ArteryMultiNodeSpec(RemoteDeathWatchSpec.config) with ImplicitSender with DefaultTimeout with DeathWatchSpec {
|
||||
import RemoteDeathWatchSpec._
|
||||
|
||||
system.eventStream.publish(TestEvent.Mute(
|
||||
EventFilter[io.aeron.exceptions.RegistrationException]()))
|
||||
|
||||
val other = ActorSystem("other", ConfigFactory.parseString(s"akka.remote.artery.canonical.port=$otherPort")
|
||||
.withFallback(system.settings.config))
|
||||
|
||||
override def afterTermination() {
|
||||
shutdown(other)
|
||||
}
|
||||
val other = newRemoteSystem(name = Some("other"), extraConfig = Some(s"akka.remote.artery.canonical.port=$otherPort"))
|
||||
|
||||
override def expectedTestDuration: FiniteDuration = 120.seconds
|
||||
|
||||
|
|
|
|||
|
|
@ -12,7 +12,6 @@ import akka.remote.RemoteScope
|
|||
|
||||
object RemoteDeployerSpec {
|
||||
val deployerConf = ConfigFactory.parseString("""
|
||||
akka.actor.provider = remote
|
||||
akka.actor.deployment {
|
||||
/service2 {
|
||||
router = round-robin-pool
|
||||
|
|
@ -21,10 +20,7 @@ object RemoteDeployerSpec {
|
|||
dispatcher = mydispatcher
|
||||
}
|
||||
}
|
||||
akka.remote.artery.enabled = on
|
||||
akka.remote.artery.canonical.hostname = localhost
|
||||
akka.remote.artery.canonical.port = 0
|
||||
""", ConfigParseOptions.defaults)
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
class RecipeActor extends Actor {
|
||||
def receive = { case _ ⇒ }
|
||||
|
|
@ -32,7 +28,7 @@ object RemoteDeployerSpec {
|
|||
|
||||
}
|
||||
|
||||
class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) {
|
||||
class RemoteDeployerSpec extends AkkaSpec(RemoteDeployerSpec.deployerConf) with FlightRecorderSpecIntegration {
|
||||
|
||||
"A RemoteDeployer" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -32,30 +32,20 @@ object RemoteDeploymentSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class RemoteDeploymentSpec extends AkkaSpec("""
|
||||
#akka.loglevel=DEBUG
|
||||
akka.actor.provider = remote
|
||||
akka.remote.artery.enabled = on
|
||||
akka.remote.artery.canonical.hostname = localhost
|
||||
akka.remote.artery.canonical.port = 0
|
||||
""") {
|
||||
class RemoteDeploymentSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) {
|
||||
|
||||
import RemoteDeploymentSpec._
|
||||
|
||||
val port = RARP(system).provider.getDefaultAddress.port.get
|
||||
val conf = ConfigFactory.parseString(
|
||||
val conf =
|
||||
s"""
|
||||
akka.actor.deployment {
|
||||
/blub.remote = "akka://${system.name}@localhost:$port"
|
||||
}
|
||||
""").withFallback(system.settings.config)
|
||||
"""
|
||||
|
||||
val masterSystem = ActorSystem("Master" + system.name, conf)
|
||||
val masterPort = RARP(masterSystem).provider.getDefaultAddress.port.get
|
||||
|
||||
override def afterTermination(): Unit = {
|
||||
shutdown(masterSystem)
|
||||
}
|
||||
val masterSystem = newRemoteSystem(name = Some("Master" + system.name), extraConfig = Some(conf))
|
||||
val masterPort = address(masterSystem).port.get
|
||||
|
||||
"Remoting" must {
|
||||
|
||||
|
|
|
|||
|
|
@ -21,11 +21,7 @@ object RemoteRouterSpec {
|
|||
}
|
||||
}
|
||||
|
||||
class RemoteRouterSpec extends AkkaSpec("""
|
||||
akka.actor.provider = remote
|
||||
akka.remote.artery.enabled = on
|
||||
akka.remote.artery.canonical.hostname = localhost
|
||||
akka.remote.artery.canonical.port = 0
|
||||
class RemoteRouterSpec extends AkkaSpec(ConfigFactory.parseString("""
|
||||
akka.actor.deployment {
|
||||
/remote-override {
|
||||
router = round-robin-pool
|
||||
|
|
@ -39,7 +35,7 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
router = round-robin-pool
|
||||
nr-of-instances = 6
|
||||
}
|
||||
}""") {
|
||||
}""").withFallback(ArterySpecSupport.defaultConfig)) with FlightRecorderSpecIntegration {
|
||||
|
||||
import RemoteRouterSpec._
|
||||
|
||||
|
|
@ -79,12 +75,16 @@ class RemoteRouterSpec extends AkkaSpec("""
|
|||
target.nodes = ["akka://${sysName}@localhost:${port}"]
|
||||
}
|
||||
}
|
||||
}""").withFallback(system.settings.config)
|
||||
}"""
|
||||
).withFallback(ArterySpecSupport.newFlightRecorderConfig)
|
||||
.withFallback(system.settings.config)
|
||||
|
||||
val masterSystem = ActorSystem("Master" + sysName, conf)
|
||||
|
||||
override def afterTermination(): Unit = {
|
||||
shutdown(masterSystem)
|
||||
handleFlightRecorderFile(system)
|
||||
handleFlightRecorderFile(masterSystem)
|
||||
}
|
||||
|
||||
def collectRouteePaths(probe: TestProbe, router: ActorRef, n: Int): immutable.Seq[ActorPath] = {
|
||||
|
|
|
|||
|
|
@ -3,44 +3,25 @@
|
|||
*/
|
||||
package akka.remote.artery
|
||||
|
||||
import scala.concurrent.duration._
|
||||
import akka.actor.{ Actor, ActorIdentity, ActorSystem, Deploy, ExtendedActorSystem, Identify, Props, RootActorPath }
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import akka.actor.Actor.Receive
|
||||
import akka.actor.{ Actor, ActorIdentity, ActorRef, ActorSystem, Deploy, Identify, PoisonPill, Props, RootActorPath }
|
||||
import akka.remote.RARP
|
||||
import akka.testkit.TestActors
|
||||
import akka.actor.PoisonPill
|
||||
import akka.testkit.TestProbe
|
||||
import akka.actor.ActorRef
|
||||
import com.typesafe.config.Config
|
||||
import akka.testkit.{ AkkaSpec, ImplicitSender, TestActors, TestProbe }
|
||||
import com.typesafe.config.{ Config, ConfigFactory }
|
||||
|
||||
object RemoteSendConsistencySpec {
|
||||
import scala.concurrent.duration._
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
}
|
||||
""")
|
||||
|
||||
}
|
||||
|
||||
class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(RemoteSendConsistencySpec.config)
|
||||
class RemoteSendConsistencySpec extends AbstractRemoteSendConsistencySpec(ArterySpecSupport.defaultConfig)
|
||||
|
||||
class RemoteSendConsistencyWithThreeLanesSpec extends AbstractRemoteSendConsistencySpec(
|
||||
ConfigFactory.parseString("""
|
||||
akka.remote.artery.advanced.outbound-lanes = 3
|
||||
akka.remote.artery.advanced.inbound-lanes = 3
|
||||
""").withFallback(RemoteSendConsistencySpec.config))
|
||||
""").withFallback(ArterySpecSupport.defaultConfig))
|
||||
|
||||
abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpec(config) with ImplicitSender {
|
||||
abstract class AbstractRemoteSendConsistencySpec(config: Config) extends ArteryMultiNodeSpec(config) with ImplicitSender {
|
||||
|
||||
val systemB = ActorSystem("systemB", system.settings.config)
|
||||
val addressB = RARP(systemB).provider.getDefaultAddress
|
||||
println(addressB)
|
||||
val systemB = newRemoteSystem(name = Some("systemB"))
|
||||
val addressB = address(systemB)
|
||||
val rootB = RootActorPath(addressB)
|
||||
|
||||
"Artery" must {
|
||||
|
|
@ -137,6 +118,4 @@ abstract class AbstractRemoteSendConsistencySpec(config: Config) extends AkkaSpe
|
|||
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit = shutdown(systemB)
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -63,23 +63,15 @@ object RemoteWatcherSpec {
|
|||
|
||||
}
|
||||
|
||||
class RemoteWatcherSpec extends AkkaSpec(
|
||||
"""akka {
|
||||
loglevel = INFO
|
||||
log-dead-letters-during-shutdown = false
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
}""") with ImplicitSender {
|
||||
class RemoteWatcherSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) with ImplicitSender {
|
||||
|
||||
import RemoteWatcherSpec._
|
||||
import RemoteWatcher._
|
||||
|
||||
override def expectedTestDuration = 2.minutes
|
||||
|
||||
val remoteSystem = ActorSystem("RemoteSystem", system.settings.config)
|
||||
val remoteAddress = RARP(remoteSystem).provider.getDefaultAddress
|
||||
val remoteSystem = newRemoteSystem(name = Some("RemoteSystem"))
|
||||
val remoteAddress = address(remoteSystem)
|
||||
def remoteAddressUid = AddressUidExtension(remoteSystem).longAddressUid
|
||||
|
||||
Seq(system, remoteSystem).foreach(muteDeadLetters(
|
||||
|
|
@ -88,6 +80,7 @@ class RemoteWatcherSpec extends AkkaSpec(
|
|||
|
||||
override def afterTermination() {
|
||||
shutdown(remoteSystem)
|
||||
super.afterTermination()
|
||||
}
|
||||
|
||||
val heartbeatRspB = ArteryHeartbeatRsp(remoteAddressUid)
|
||||
|
|
|
|||
|
|
@ -13,39 +13,25 @@ import akka.testkit.EventFilter
|
|||
|
||||
object SerializationErrorSpec {
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
actor {
|
||||
serialize-creators = false
|
||||
serialize-messages = false
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
object NotSerializableMsg
|
||||
|
||||
}
|
||||
|
||||
class SerializationErrorSpec extends AkkaSpec(SerializationErrorSpec.config) with ImplicitSender {
|
||||
class SerializationErrorSpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) with ImplicitSender {
|
||||
import SerializationErrorSpec._
|
||||
|
||||
val configB = ConfigFactory.parseString("""
|
||||
akka.actor.serialization-identifiers {
|
||||
# this will cause deserialization error
|
||||
"akka.serialization.ByteArraySerializer" = -4
|
||||
}
|
||||
""").withFallback(system.settings.config)
|
||||
val systemB = ActorSystem("systemB", configB)
|
||||
val systemB = newRemoteSystem(
|
||||
name = Some("systemB"),
|
||||
extraConfig = Some("""
|
||||
akka.actor.serialization-identifiers {
|
||||
# this will cause deserialization error
|
||||
"akka.serialization.ByteArraySerializer" = -4
|
||||
}
|
||||
"""))
|
||||
systemB.actorOf(TestActors.echoActorProps, "echo")
|
||||
val addressB = RARP(systemB).provider.getDefaultAddress
|
||||
val addressB = address(systemB)
|
||||
val rootB = RootActorPath(addressB)
|
||||
|
||||
override def afterTermination(): Unit = shutdown(systemB)
|
||||
|
||||
"Serialization error" must {
|
||||
|
||||
"be logged when serialize fails" in {
|
||||
|
|
|
|||
|
|
@ -31,31 +31,19 @@ import akka.util.OptionVal
|
|||
|
||||
object SystemMessageDeliverySpec {
|
||||
|
||||
val config = ConfigFactory.parseString(s"""
|
||||
akka.loglevel=INFO
|
||||
akka {
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
}
|
||||
akka.actor.serialize-creators = off
|
||||
akka.actor.serialize-messages = off
|
||||
""")
|
||||
|
||||
case class TestSysMsg(s: String) extends SystemMessageDelivery.AckedDeliveryMessage
|
||||
|
||||
}
|
||||
|
||||
class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.config) with ImplicitSender {
|
||||
class SystemMessageDeliverySpec extends ArteryMultiNodeSpec(ArterySpecSupport.defaultConfig) with ImplicitSender {
|
||||
import SystemMessageDeliverySpec._
|
||||
|
||||
val addressA = UniqueAddress(
|
||||
RARP(system).provider.getDefaultAddress,
|
||||
address(system),
|
||||
AddressUidExtension(system).longAddressUid)
|
||||
val systemB = ActorSystem("systemB", system.settings.config)
|
||||
val systemB = newRemoteSystem(name = Some("systemB"))
|
||||
val addressB = UniqueAddress(
|
||||
RARP(systemB).provider.getDefaultAddress,
|
||||
address(systemB),
|
||||
AddressUidExtension(systemB).longAddressUid)
|
||||
val rootB = RootActorPath(addressB.address)
|
||||
val matSettings = ActorMaterializerSettings(system).withFuzzing(true)
|
||||
|
|
@ -63,8 +51,6 @@ class SystemMessageDeliverySpec extends AkkaSpec(SystemMessageDeliverySpec.confi
|
|||
|
||||
private val outboundEnvelopePool = ReusableOutboundEnvelope.createObjectPool(capacity = 16)
|
||||
|
||||
override def afterTermination(): Unit = shutdown(systemB)
|
||||
|
||||
private def send(sendCount: Int, resendInterval: FiniteDuration, outboundContext: OutboundContext): Source[OutboundEnvelope, NotUsed] = {
|
||||
val deadLetters = TestProbe().ref
|
||||
Source(1 to sendCount)
|
||||
|
|
|
|||
|
|
@ -59,26 +59,21 @@ object UntrustedSpec {
|
|||
}
|
||||
}
|
||||
|
||||
val config = ConfigFactory.parseString(
|
||||
"""
|
||||
akka.remote.artery.untrusted-mode = on
|
||||
akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ]
|
||||
akka.loglevel = DEBUG # the test is verifying some Debug logging
|
||||
"""
|
||||
).withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
}
|
||||
|
||||
class UntrustedSpec extends AkkaSpec("""
|
||||
akka.actor.provider = remote
|
||||
akka.remote.artery.untrusted-mode = on
|
||||
akka.remote.artery.trusted-selection-paths = ["/user/receptionist", ]
|
||||
akka.remote.artery.enabled = on
|
||||
akka.remote.artery.canonical.hostname = localhost
|
||||
akka.remote.artery.canonical.port = 0
|
||||
akka.loglevel = DEBUG # the test is verifying some Debug logging
|
||||
""") with ImplicitSender {
|
||||
class UntrustedSpec extends ArteryMultiNodeSpec(UntrustedSpec.config) with ImplicitSender {
|
||||
|
||||
import UntrustedSpec._
|
||||
|
||||
val client = ActorSystem("UntrustedSpec-client", ConfigFactory.parseString("""
|
||||
akka.actor.provider = remote
|
||||
akka.remote.artery.enabled = on
|
||||
akka.remote.artery.canonical.hostname = localhost
|
||||
akka.remote.artery.canonical.port = 0
|
||||
"""))
|
||||
val client = newRemoteSystem(name = Some("UntrustedSpec-client"))
|
||||
val addr = RARP(system).provider.getDefaultAddress
|
||||
|
||||
val receptionist = system.actorOf(Props(classOf[Receptionist], testActor), "receptionist")
|
||||
|
|
@ -98,10 +93,6 @@ class UntrustedSpec extends AkkaSpec("""
|
|||
p.expectMsgType[ActorIdentity].ref.get
|
||||
}
|
||||
|
||||
override def afterTermination() {
|
||||
shutdown(client)
|
||||
}
|
||||
|
||||
// need to enable debug log-level without actually printing those messages
|
||||
system.eventStream.publish(TestEvent.Mute(EventFilter.debug()))
|
||||
|
||||
|
|
|
|||
|
|
@ -4,14 +4,13 @@
|
|||
|
||||
package akka.remote.artery.compress
|
||||
|
||||
import akka.actor.{ ActorIdentity, ActorRef, ActorSystem, Identify }
|
||||
import akka.remote.artery.compress.CompressionProtocol.Events
|
||||
import akka.testkit._
|
||||
import akka.util.Timeout
|
||||
import akka.actor.{ ActorIdentity, ActorSystem, Identify }
|
||||
import akka.pattern.ask
|
||||
import akka.remote.RARP
|
||||
import akka.remote.artery.ArteryTransport
|
||||
import akka.remote.artery.compress.CompressionProtocol.Events.{ Event, ReceivedActorRefCompressionTable }
|
||||
import akka.remote.artery.{ ArteryMultiNodeSpec, ArterySpecSupport, ArteryTransport }
|
||||
import akka.testkit._
|
||||
import akka.util.Timeout
|
||||
import com.typesafe.config.ConfigFactory
|
||||
import org.scalatest.BeforeAndAfter
|
||||
|
||||
|
|
@ -24,12 +23,6 @@ object HandshakeShouldDropCompressionTableSpec {
|
|||
|
||||
val commonConfig = ConfigFactory.parseString(s"""
|
||||
akka {
|
||||
loglevel = INFO
|
||||
|
||||
actor.provider = remote
|
||||
remote.artery.enabled = on
|
||||
remote.artery.canonical.hostname = localhost
|
||||
remote.artery.canonical.port = 0
|
||||
remote.artery.advanced.handshake-timeout = 10s
|
||||
remote.artery.advanced.image-liveness-timeout = 7s
|
||||
|
||||
|
|
@ -40,14 +33,11 @@ object HandshakeShouldDropCompressionTableSpec {
|
|||
}
|
||||
}
|
||||
}
|
||||
""")
|
||||
|
||||
val configB = ConfigFactory.parseString(s"akka.remote.artery.canonical.port = $portB")
|
||||
.withFallback(commonConfig)
|
||||
""").withFallback(ArterySpecSupport.defaultConfig)
|
||||
|
||||
}
|
||||
|
||||
class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDropCompressionTableSpec.commonConfig)
|
||||
class HandshakeShouldDropCompressionTableSpec extends ArteryMultiNodeSpec(HandshakeShouldDropCompressionTableSpec.commonConfig)
|
||||
with ImplicitSender with BeforeAndAfter {
|
||||
import HandshakeShouldDropCompressionTableSpec._
|
||||
|
||||
|
|
@ -55,7 +45,9 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
|
|||
var systemB: ActorSystem = null
|
||||
|
||||
before {
|
||||
systemB = ActorSystem("systemB", configB)
|
||||
systemB = newRemoteSystem(
|
||||
name = Some("systemB"),
|
||||
extraConfig = Some(s"akka.remote.artery.canonical.port = $portB"))
|
||||
}
|
||||
|
||||
"Outgoing compression table" must {
|
||||
|
|
@ -92,11 +84,13 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
|
|||
info("System [A] received: " + a1)
|
||||
a1.table.dictionary.keySet should contain(a1Probe.ref)
|
||||
|
||||
log.warning("SHUTTING DOWN system {}...", systemB)
|
||||
log.info("SHUTTING DOWN system {}...", systemB)
|
||||
shutdown(systemB)
|
||||
systemB = ActorSystem("systemB", configB)
|
||||
systemB = newRemoteSystem(
|
||||
name = Some("systemB"),
|
||||
extraConfig = Some(s"akka.remote.artery.canonical.port = $portB"))
|
||||
Thread.sleep(1000)
|
||||
log.warning("SYSTEM READY {}...", systemB)
|
||||
log.info("SYSTEM READY {}...", systemB)
|
||||
|
||||
val aNewProbe = TestProbe()
|
||||
system.eventStream.subscribe(aNewProbe.ref, classOf[Event])
|
||||
|
|
@ -138,14 +132,4 @@ class HandshakeShouldDropCompressionTableSpec extends AkkaSpec(HandshakeShouldDr
|
|||
ref.get
|
||||
}
|
||||
|
||||
after {
|
||||
shutdownAllActorSystems()
|
||||
}
|
||||
|
||||
override def afterTermination(): Unit =
|
||||
shutdownAllActorSystems()
|
||||
|
||||
private def shutdownAllActorSystems(): Unit = {
|
||||
if (systemB != null) shutdown(systemB)
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue