Harden RemotingSpec: Retry if address is taken (#26795)
* Harden RemotingSpec: Retry if address is taken The test requires that the ActorSystem not be started when doing the selection to using port = 0 is not an option. Fixes #25657
This commit is contained in:
parent
9f19e4972b
commit
e855db8825
1 changed files with 34 additions and 16 deletions
|
|
@ -23,6 +23,8 @@ import com.typesafe.config._
|
||||||
import scala.concurrent.{ Await, Future }
|
import scala.concurrent.{ Await, Future }
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import scala.util.control.NonFatal
|
||||||
|
|
||||||
object RemotingSpec {
|
object RemotingSpec {
|
||||||
|
|
||||||
final case class ActorSelReq(s: String)
|
final case class ActorSelReq(s: String)
|
||||||
|
|
@ -532,7 +534,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
"should not publish AddressTerminated even on InvalidAssociationExecptions" in {
|
"should not publish AddressTerminated even on InvalidAssociationExceptions" in {
|
||||||
val localAddress = Address("akka.test", "system1", "localhost", 1)
|
val localAddress = Address("akka.test", "system1", "localhost", 1)
|
||||||
val rawLocalAddress = localAddress.copy(protocol = "test")
|
val rawLocalAddress = localAddress.copy(protocol = "test")
|
||||||
val remoteAddress = Address("akka.test", "system2", "localhost", 2)
|
val remoteAddress = Address("akka.test", "system2", "localhost", 2)
|
||||||
|
|
@ -754,33 +756,49 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retry a few times as the temporaryServerAddress can be taken by the time the new actor system
|
||||||
|
// binds
|
||||||
|
def selectionAndBind(
|
||||||
|
config: Config,
|
||||||
|
thisSystem: ActorSystem,
|
||||||
|
probe: TestProbe,
|
||||||
|
retries: Int = 3): (ActorSystem, ActorSelection) = {
|
||||||
|
val otherAddress = temporaryServerAddress()
|
||||||
|
val otherConfig = ConfigFactory.parseString(s"""
|
||||||
|
akka.remote.classic.netty.tcp.port = ${otherAddress.getPort}
|
||||||
|
""").withFallback(config)
|
||||||
|
val otherSelection =
|
||||||
|
thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo")
|
||||||
|
otherSelection.tell("ping", probe.ref)
|
||||||
|
probe.expectNoMessage(200.millis)
|
||||||
|
try {
|
||||||
|
(ActorSystem("other-system", otherConfig), otherSelection)
|
||||||
|
} catch {
|
||||||
|
case NonFatal(ex) if ex.getMessage.contains("Failed to bind") && retries > 0 =>
|
||||||
|
selectionAndBind(config, thisSystem, probe, retries = retries - 1)
|
||||||
|
case other =>
|
||||||
|
throw other
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
"be able to connect to system even if it's not there at first" in {
|
"be able to connect to system even if it's not there at first" in {
|
||||||
val config = ConfigFactory.parseString(s"""
|
val config = ConfigFactory.parseString(s"""
|
||||||
akka.remote.enabled-transports = ["akka.remote.classic.netty.tcp"]
|
akka.remote.classic.enabled-transports = ["akka.remote.classic.netty.tcp"]
|
||||||
akka.remote.classic.netty.tcp.port = 0
|
akka.remote.classic.netty.tcp.port = 0
|
||||||
akka.remote.retry-gate-closed-for = 5s
|
akka.remote.classic.retry-gate-closed-for = 5s
|
||||||
""").withFallback(remoteSystem.settings.config)
|
""").withFallback(remoteSystem.settings.config)
|
||||||
val thisSystem = ActorSystem("this-system", config)
|
val thisSystem = ActorSystem("this-system", config)
|
||||||
try {
|
try {
|
||||||
muteSystem(thisSystem)
|
muteSystem(thisSystem)
|
||||||
val probe = new TestProbe(thisSystem)
|
val probe = new TestProbe(thisSystem)
|
||||||
val probeSender = probe.ref
|
val (otherSystem, otherSelection) = selectionAndBind(config, thisSystem, probe)
|
||||||
val otherAddress = temporaryServerAddress()
|
|
||||||
val otherConfig = ConfigFactory.parseString(s"""
|
|
||||||
akka.remote.classic.netty.tcp.port = ${otherAddress.getPort}
|
|
||||||
""").withFallback(config)
|
|
||||||
val otherSelection =
|
|
||||||
thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo")
|
|
||||||
otherSelection.tell("ping", probeSender)
|
|
||||||
probe.expectNoMessage(1.seconds)
|
|
||||||
val otherSystem = ActorSystem("other-system", otherConfig)
|
|
||||||
try {
|
try {
|
||||||
muteSystem(otherSystem)
|
muteSystem(otherSystem)
|
||||||
probe.expectNoMessage(2.seconds)
|
probe.expectNoMessage(2.seconds)
|
||||||
otherSystem.actorOf(Props[Echo2], "echo")
|
otherSystem.actorOf(Props[Echo2], "echo")
|
||||||
within(5.seconds) {
|
within(5.seconds) {
|
||||||
awaitAssert {
|
awaitAssert {
|
||||||
otherSelection.tell("ping", probeSender)
|
otherSelection.tell("ping", probe.ref)
|
||||||
assert(probe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong")
|
assert(probe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
@ -794,9 +812,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D
|
||||||
|
|
||||||
"allow other system to connect even if it's not there at first" in {
|
"allow other system to connect even if it's not there at first" in {
|
||||||
val config = ConfigFactory.parseString(s"""
|
val config = ConfigFactory.parseString(s"""
|
||||||
akka.remote.enabled-transports = ["akka.remote.classic.netty.tcp"]
|
akka.remote.classic.enabled-transports = ["akka.remote.classic.netty.tcp"]
|
||||||
akka.remote.classic.netty.tcp.port = 0
|
akka.remote.classic.netty.tcp.port = 0
|
||||||
akka.remote.retry-gate-closed-for = 5s
|
akka.remote.classic.retry-gate-closed-for = 5s
|
||||||
""").withFallback(remoteSystem.settings.config)
|
""").withFallback(remoteSystem.settings.config)
|
||||||
val thisSystem = ActorSystem("this-system", config)
|
val thisSystem = ActorSystem("this-system", config)
|
||||||
try {
|
try {
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue