diff --git a/akka-remote/src/test/scala/akka/remote/classic/RemotingSpec.scala b/akka-remote/src/test/scala/akka/remote/classic/RemotingSpec.scala index b0e1210a8f..80dc96e558 100644 --- a/akka-remote/src/test/scala/akka/remote/classic/RemotingSpec.scala +++ b/akka-remote/src/test/scala/akka/remote/classic/RemotingSpec.scala @@ -23,6 +23,8 @@ import com.typesafe.config._ import scala.concurrent.{ Await, Future } import scala.concurrent.duration._ +import scala.util.control.NonFatal + object RemotingSpec { final case class ActorSelReq(s: String) @@ -532,7 +534,7 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } } - "should not publish AddressTerminated even on InvalidAssociationExecptions" in { + "should not publish AddressTerminated even on InvalidAssociationExceptions" in { val localAddress = Address("akka.test", "system1", "localhost", 1) val rawLocalAddress = localAddress.copy(protocol = "test") val remoteAddress = Address("akka.test", "system2", "localhost", 2) @@ -754,33 +756,49 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D } + // retry a few times as the temporaryServerAddress can be taken by the time the new actor system + // binds + def selectionAndBind( + config: Config, + thisSystem: ActorSystem, + probe: TestProbe, + retries: Int = 3): (ActorSystem, ActorSelection) = { + val otherAddress = temporaryServerAddress() + val otherConfig = ConfigFactory.parseString(s""" + akka.remote.classic.netty.tcp.port = ${otherAddress.getPort} + """).withFallback(config) + val otherSelection = + thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") + otherSelection.tell("ping", probe.ref) + probe.expectNoMessage(200.millis) + try { + (ActorSystem("other-system", otherConfig), otherSelection) + } catch { + case NonFatal(ex) if ex.getMessage.contains("Failed to bind") && retries > 0 => + selectionAndBind(config, thisSystem, probe, retries = retries - 1) + case other => + throw other + } + } + "be able to connect to system even if it's not there at first" in { val config = ConfigFactory.parseString(s""" - akka.remote.enabled-transports = ["akka.remote.classic.netty.tcp"] + akka.remote.classic.enabled-transports = ["akka.remote.classic.netty.tcp"] akka.remote.classic.netty.tcp.port = 0 - akka.remote.retry-gate-closed-for = 5s + akka.remote.classic.retry-gate-closed-for = 5s """).withFallback(remoteSystem.settings.config) val thisSystem = ActorSystem("this-system", config) try { muteSystem(thisSystem) val probe = new TestProbe(thisSystem) - val probeSender = probe.ref - val otherAddress = temporaryServerAddress() - val otherConfig = ConfigFactory.parseString(s""" - akka.remote.classic.netty.tcp.port = ${otherAddress.getPort} - """).withFallback(config) - val otherSelection = - thisSystem.actorSelection(s"akka.tcp://other-system@localhost:${otherAddress.getPort}/user/echo") - otherSelection.tell("ping", probeSender) - probe.expectNoMessage(1.seconds) - val otherSystem = ActorSystem("other-system", otherConfig) + val (otherSystem, otherSelection) = selectionAndBind(config, thisSystem, probe) try { muteSystem(otherSystem) probe.expectNoMessage(2.seconds) otherSystem.actorOf(Props[Echo2], "echo") within(5.seconds) { awaitAssert { - otherSelection.tell("ping", probeSender) + otherSelection.tell("ping", probe.ref) assert(probe.expectMsgType[(String, ActorRef)](500.millis)._1 == "pong") } } @@ -794,9 +812,9 @@ class RemotingSpec extends AkkaSpec(RemotingSpec.cfg) with ImplicitSender with D "allow other system to connect even if it's not there at first" in { val config = ConfigFactory.parseString(s""" - akka.remote.enabled-transports = ["akka.remote.classic.netty.tcp"] + akka.remote.classic.enabled-transports = ["akka.remote.classic.netty.tcp"] akka.remote.classic.netty.tcp.port = 0 - akka.remote.retry-gate-closed-for = 5s + akka.remote.classic.retry-gate-closed-for = 5s """).withFallback(remoteSystem.settings.config) val thisSystem = ActorSystem("this-system", config) try {