Retry creation of ActorSystem in remoting tests #23481 (#23769)

* Retry creation of ActorSystem in remoting tests #23481

Remoting multi-jvm test rely on setting port = 0 which selects an open
port. This has a race where two of the JVMs open/close the same port
then configure their ActorSystem with it so one of them fails to start
due to the port being in use. This adds a simple retry so another port
is selected.
This commit is contained in:
Christopher Batey 2017-10-24 07:22:25 -05:00 committed by Patrik Nordwall
parent da0cfa577b
commit 70ad537af5
7 changed files with 82 additions and 34 deletions

View file

@ -8,12 +8,12 @@ import java.net.{ InetAddress, InetSocketAddress }
import com.typesafe.config.{ Config, ConfigFactory, ConfigObject }
import scala.concurrent.{ Await, Awaitable, Future }
import scala.concurrent.{ Await, Awaitable }
import scala.util.control.NonFatal
import scala.collection.immutable
import akka.actor._
import akka.util.Timeout
import akka.remote.testconductor.{ RoleName, TestConductor, TestConductorExt }
import akka.remote.testconductor.{ TestConductor, TestConductorExt }
import akka.testkit._
import akka.testkit.TestKit
import akka.testkit.TestEvent._
@ -22,6 +22,8 @@ import scala.concurrent.duration._
import akka.remote.testconductor.RoleName
import akka.actor.RootActorPath
import akka.event.{ Logging, LoggingAdapter }
import akka.remote.RemoteTransportException
import org.jboss.netty.channel.ChannelException
/**
* Configure the role names and participants of the test, including configuration settings.
@ -267,7 +269,18 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
this(config.myself, actorSystemCreator(ConfigFactory.load(config.config)), config.roles, config.deployments)
def this(config: MultiNodeConfig) =
this(config, config ActorSystem(MultiNodeSpec.getCallerName(classOf[MultiNodeSpec]), config))
this(config, {
val name = MultiNodeSpec.getCallerName(classOf[MultiNodeSpec])
config
try {
ActorSystem(name, config)
} catch {
// Retry creating the system once as when using port = 0 two systems may try and use the same one.
// RTE is for aeron, CE for netty
case _: RemoteTransportException ActorSystem(name, config)
case _: ChannelException ActorSystem(name, config)
}
})
val log: LoggingAdapter = Logging(system, this.getClass)
@ -275,7 +288,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles:
* Enrich `.await()` onto all Awaitables, using remaining duration from the innermost
* enclosing `within` block or QueryTimeout.
*/
implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w)
implicit def awaitHelper[T](w: Awaitable[T]): AwaitHelper[T] = new AwaitHelper(w)
class AwaitHelper[T](w: Awaitable[T]) {
def await: T = Await.result(w, remainingOr(testConductor.Settings.QueryTimeout.duration))
}

View file

@ -5,7 +5,7 @@ package akka.remote
import language.postfixOps
import scala.concurrent.duration._
import com.typesafe.config.{ Config, ConfigFactory }
import com.typesafe.config.ConfigFactory
import akka.actor.Actor
import akka.actor.ActorIdentity
import akka.actor.ActorRef
@ -15,8 +15,6 @@ import akka.actor.Props
import akka.actor.Terminated
import akka.remote.testconductor.RoleName
import akka.remote.testkit.MultiNodeConfig
import akka.remote.testkit.MultiNodeSpec
import akka.remote.testkit.STMultiNodeSpec
import akka.testkit._
class RemoteNodeDeathWatchConfig(artery: Boolean) extends MultiNodeConfig {
@ -88,7 +86,6 @@ object RemoteNodeDeathWatchSpec {
case msg testActor forward msg
}
}
}
abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchConfig)
@ -112,7 +109,9 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
def identify(role: RoleName, actorName: String): ActorRef = {
system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName)
expectMsgType[ActorIdentity].ref.get
val actorIdentity = expectMsgType[ActorIdentity]
assert(actorIdentity.ref.isDefined, s"Unable to Identify actor: $actorName on node: $role")
actorIdentity.ref.get
}
def assertCleanup(timeout: FiniteDuration = 5.seconds): Unit = {
@ -164,7 +163,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
enterBarrier("after-1")
@ -197,7 +196,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
enterBarrier("after-2")
@ -229,7 +228,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
enterBarrier("after-3")
@ -257,7 +256,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
expectMsg(1 second, Ack)
enterBarrier("unwatch-s1-4")
system.stop(s1)
expectNoMsg(2 seconds)
expectNoMessage(2 seconds)
enterBarrier("stop-s1-4")
system.stop(s2)
@ -276,7 +275,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
enterBarrier("after-4")
@ -318,7 +317,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
}
@ -352,15 +351,15 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
p1.receiveN(2, 5 seconds).collect { case WrappedTerminated(t) t.actor }.toSet should ===(Set(a1, a2))
p3.expectMsgType[WrappedTerminated](5 seconds).t.actor should ===(a3)
p2.expectNoMsg(2 seconds)
p2.expectNoMessage(2 seconds)
enterBarrier("terminated-verified-5")
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
p1.expectNoMsg(100 millis)
p2.expectNoMsg(100 millis)
p3.expectNoMsg(100 millis)
expectNoMessage(2.seconds)
p1.expectNoMessage(100 millis)
p2.expectNoMessage(100 millis)
p3.expectNoMessage(100 millis)
assertCleanup()
}
@ -402,7 +401,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup()
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
}
@ -447,7 +446,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon
// verify that things are cleaned up, and heartbeating is stopped
assertCleanup(20 seconds)
expectNoMsg(2.seconds)
expectNoMessage(2.seconds)
assertCleanup()
}

View file

@ -0,0 +1,39 @@
package akka.remote
import akka.actor.ActorSystem
import akka.testkit.SocketUtil
import com.typesafe.config.ConfigFactory
import org.jboss.netty.channel.ChannelException
import org.scalatest.{ Matchers, WordSpec }
class RemotingFailedToBindSpec extends WordSpec with Matchers {
"an ActorSystem" must {
"not start if port is taken" in {
val port = SocketUtil.temporaryLocalPort(true)
val config = ConfigFactory.parseString(
s"""
|akka {
| actor {
| provider = remote
| }
| remote {
| netty.tcp {
| hostname = "127.0.0.1"
| port = $port
| }
| }
|}
""".stripMargin)
val as = ActorSystem("RemotingFailedToBindSpec", config)
try {
val ex = intercept[ChannelException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should startWith("Failed to bind")
} finally {
as.terminate()
}
}
}
}

View file

@ -1,6 +1,7 @@
package akka.remote.artery
import akka.actor.ActorSystem
import akka.remote.RemoteTransportException
import akka.testkit.SocketUtil
import com.typesafe.config.ConfigFactory
import org.scalatest.{ Matchers, WordSpec }
@ -28,7 +29,7 @@ class ArteryFailedToBindSpec extends WordSpec with Matchers {
""".stripMargin)
val as = ActorSystem("BindTest1", config)
try {
val ex = intercept[RuntimeException] {
val ex = intercept[RemoteTransportException] {
ActorSystem("BindTest2", config)
}
ex.getMessage should equal("Inbound Aeron channel is in errored state. See Aeron logs for details.")

View file

@ -7,7 +7,7 @@ package akka.remote
import akka.AkkaException
import akka.Done
import akka.actor._
import akka.event.{ LoggingAdapter }
import akka.event.LoggingAdapter
import scala.collection.immutable
import scala.concurrent.Future
import scala.util.control.NoStackTrace
@ -18,7 +18,9 @@ import akka.util.OptionVal
* such as inability to start, wrong configuration etc.
*/
@SerialVersionUID(1L)
class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause)
class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) {
def this(msg: String) = this(msg, null)
}
/**
* [[RemoteTransportException]] without stack trace.

View file

@ -27,12 +27,7 @@ import akka.actor.Cancellable
import akka.actor.Props
import akka.event.Logging
import akka.event.LoggingAdapter
import akka.remote.AddressUidExtension
import akka.remote.RemoteActorRef
import akka.remote.RemoteActorRefProvider
import akka.remote.RemoteTransport
import akka.remote.ThisActorSystemQuarantinedEvent
import akka.remote.UniqueAddress
import akka.remote._
import akka.remote.artery.AeronSource.ResourceLifecycle
import akka.remote.artery.ArteryTransport.ShuttingDown
import akka.remote.artery.Encoder.OutboundCompressionAccess
@ -624,13 +619,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R
log.debug("Inbound channel is now active")
} else if (status == ChannelEndpointStatus.ERRORED) {
areonErrorLog.logErrors(log, 0L)
throw new RuntimeException("Inbound Aeron channel is in errored state. See Aeron logs for details.")
throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.")
} else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) {
Thread.sleep(waitInterval)
retry(retries - 1)
} else {
areonErrorLog.logErrors(log, 0L)
throw new RuntimeException("Timed out waiting for Aeron transport to bind. See Aeoron logs.")
throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.")
}
}
}

View file

@ -3,7 +3,6 @@
*/
package akka
import akka.TestExtras.Filter
import akka.TestExtras.Filter.Keys._
import com.typesafe.sbt.{ SbtScalariform, SbtMultiJvm }
import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys._