diff --git a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala index f8fa42cfa4..7a415e2f0d 100644 --- a/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala +++ b/akka-multi-node-testkit/src/main/scala/akka/remote/testkit/MultiNodeSpec.scala @@ -8,12 +8,12 @@ import java.net.{ InetAddress, InetSocketAddress } import com.typesafe.config.{ Config, ConfigFactory, ConfigObject } -import scala.concurrent.{ Await, Awaitable, Future } +import scala.concurrent.{ Await, Awaitable } import scala.util.control.NonFatal import scala.collection.immutable import akka.actor._ import akka.util.Timeout -import akka.remote.testconductor.{ RoleName, TestConductor, TestConductorExt } +import akka.remote.testconductor.{ TestConductor, TestConductorExt } import akka.testkit._ import akka.testkit.TestKit import akka.testkit.TestEvent._ @@ -22,6 +22,8 @@ import scala.concurrent.duration._ import akka.remote.testconductor.RoleName import akka.actor.RootActorPath import akka.event.{ Logging, LoggingAdapter } +import akka.remote.RemoteTransportException +import org.jboss.netty.channel.ChannelException /** * Configure the role names and participants of the test, including configuration settings. @@ -267,7 +269,18 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: this(config.myself, actorSystemCreator(ConfigFactory.load(config.config)), config.roles, config.deployments) def this(config: MultiNodeConfig) = - this(config, config ⇒ ActorSystem(MultiNodeSpec.getCallerName(classOf[MultiNodeSpec]), config)) + this(config, { + val name = MultiNodeSpec.getCallerName(classOf[MultiNodeSpec]) + config ⇒ + try { + ActorSystem(name, config) + } catch { + // Retry creating the system once as when using port = 0 two systems may try and use the same one. + // RTE is for aeron, CE for netty + case _: RemoteTransportException ⇒ ActorSystem(name, config) + case _: ChannelException ⇒ ActorSystem(name, config) + } + }) val log: LoggingAdapter = Logging(system, this.getClass) @@ -275,7 +288,7 @@ abstract class MultiNodeSpec(val myself: RoleName, _system: ActorSystem, _roles: * Enrich `.await()` onto all Awaitables, using remaining duration from the innermost * enclosing `within` block or QueryTimeout. */ - implicit def awaitHelper[T](w: Awaitable[T]) = new AwaitHelper(w) + implicit def awaitHelper[T](w: Awaitable[T]): AwaitHelper[T] = new AwaitHelper(w) class AwaitHelper[T](w: Awaitable[T]) { def await: T = Await.result(w, remainingOr(testConductor.Settings.QueryTimeout.duration)) } diff --git a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala index 01fd9ddb5f..3d536670c5 100644 --- a/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala +++ b/akka-remote-tests/src/multi-jvm/scala/akka/remote/RemoteNodeDeathWatchSpec.scala @@ -5,7 +5,7 @@ package akka.remote import language.postfixOps import scala.concurrent.duration._ -import com.typesafe.config.{ Config, ConfigFactory } +import com.typesafe.config.ConfigFactory import akka.actor.Actor import akka.actor.ActorIdentity import akka.actor.ActorRef @@ -15,8 +15,6 @@ import akka.actor.Props import akka.actor.Terminated import akka.remote.testconductor.RoleName import akka.remote.testkit.MultiNodeConfig -import akka.remote.testkit.MultiNodeSpec -import akka.remote.testkit.STMultiNodeSpec import akka.testkit._ class RemoteNodeDeathWatchConfig(artery: Boolean) extends MultiNodeConfig { @@ -88,7 +86,6 @@ object RemoteNodeDeathWatchSpec { case msg ⇒ testActor forward msg } } - } abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchConfig) @@ -112,7 +109,9 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon def identify(role: RoleName, actorName: String): ActorRef = { system.actorSelection(node(role) / "user" / actorName) ! Identify(actorName) - expectMsgType[ActorIdentity].ref.get + val actorIdentity = expectMsgType[ActorIdentity] + assert(actorIdentity.ref.isDefined, s"Unable to Identify actor: $actorName on node: $role") + actorIdentity.ref.get } def assertCleanup(timeout: FiniteDuration = 5.seconds): Unit = { @@ -164,7 +163,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() enterBarrier("after-1") @@ -197,7 +196,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() enterBarrier("after-2") @@ -229,7 +228,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() enterBarrier("after-3") @@ -257,7 +256,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon expectMsg(1 second, Ack) enterBarrier("unwatch-s1-4") system.stop(s1) - expectNoMsg(2 seconds) + expectNoMessage(2 seconds) enterBarrier("stop-s1-4") system.stop(s2) @@ -276,7 +275,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() enterBarrier("after-4") @@ -318,7 +317,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() } @@ -352,15 +351,15 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon p1.receiveN(2, 5 seconds).collect { case WrappedTerminated(t) ⇒ t.actor }.toSet should ===(Set(a1, a2)) p3.expectMsgType[WrappedTerminated](5 seconds).t.actor should ===(a3) - p2.expectNoMsg(2 seconds) + p2.expectNoMessage(2 seconds) enterBarrier("terminated-verified-5") // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) - p1.expectNoMsg(100 millis) - p2.expectNoMsg(100 millis) - p3.expectNoMsg(100 millis) + expectNoMessage(2.seconds) + p1.expectNoMessage(100 millis) + p2.expectNoMessage(100 millis) + p3.expectNoMessage(100 millis) assertCleanup() } @@ -402,7 +401,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup() - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() } @@ -447,7 +446,7 @@ abstract class RemoteNodeDeathWatchSpec(multiNodeConfig: RemoteNodeDeathWatchCon // verify that things are cleaned up, and heartbeating is stopped assertCleanup(20 seconds) - expectNoMsg(2.seconds) + expectNoMessage(2.seconds) assertCleanup() } diff --git a/akka-remote-tests/src/test/scala/akka/remote/RemotingFailedToBindSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/RemotingFailedToBindSpec.scala new file mode 100644 index 0000000000..0e868076b3 --- /dev/null +++ b/akka-remote-tests/src/test/scala/akka/remote/RemotingFailedToBindSpec.scala @@ -0,0 +1,39 @@ +package akka.remote + +import akka.actor.ActorSystem +import akka.testkit.SocketUtil +import com.typesafe.config.ConfigFactory +import org.jboss.netty.channel.ChannelException +import org.scalatest.{ Matchers, WordSpec } + +class RemotingFailedToBindSpec extends WordSpec with Matchers { + + "an ActorSystem" must { + "not start if port is taken" in { + val port = SocketUtil.temporaryLocalPort(true) + val config = ConfigFactory.parseString( + s""" + |akka { + | actor { + | provider = remote + | } + | remote { + | netty.tcp { + | hostname = "127.0.0.1" + | port = $port + | } + | } + |} + """.stripMargin) + val as = ActorSystem("RemotingFailedToBindSpec", config) + try { + val ex = intercept[ChannelException] { + ActorSystem("BindTest2", config) + } + ex.getMessage should startWith("Failed to bind") + } finally { + as.terminate() + } + } + } +} diff --git a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala index 0e9224dabf..00069e3a22 100644 --- a/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala +++ b/akka-remote-tests/src/test/scala/akka/remote/artery/ArteryFailedToBindSpec.scala @@ -1,6 +1,7 @@ package akka.remote.artery import akka.actor.ActorSystem +import akka.remote.RemoteTransportException import akka.testkit.SocketUtil import com.typesafe.config.ConfigFactory import org.scalatest.{ Matchers, WordSpec } @@ -28,7 +29,7 @@ class ArteryFailedToBindSpec extends WordSpec with Matchers { """.stripMargin) val as = ActorSystem("BindTest1", config) try { - val ex = intercept[RuntimeException] { + val ex = intercept[RemoteTransportException] { ActorSystem("BindTest2", config) } ex.getMessage should equal("Inbound Aeron channel is in errored state. See Aeron logs for details.") diff --git a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala index 05e8d7a6fa..6f2d442c79 100644 --- a/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/RemoteTransport.scala @@ -7,7 +7,7 @@ package akka.remote import akka.AkkaException import akka.Done import akka.actor._ -import akka.event.{ LoggingAdapter } +import akka.event.LoggingAdapter import scala.collection.immutable import scala.concurrent.Future import scala.util.control.NoStackTrace @@ -18,7 +18,9 @@ import akka.util.OptionVal * such as inability to start, wrong configuration etc. */ @SerialVersionUID(1L) -class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) +class RemoteTransportException(message: String, cause: Throwable) extends AkkaException(message, cause) { + def this(msg: String) = this(msg, null) +} /** * [[RemoteTransportException]] without stack trace. diff --git a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala index c5dc10e485..ad839efc59 100644 --- a/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala +++ b/akka-remote/src/main/scala/akka/remote/artery/ArteryTransport.scala @@ -27,12 +27,7 @@ import akka.actor.Cancellable import akka.actor.Props import akka.event.Logging import akka.event.LoggingAdapter -import akka.remote.AddressUidExtension -import akka.remote.RemoteActorRef -import akka.remote.RemoteActorRefProvider -import akka.remote.RemoteTransport -import akka.remote.ThisActorSystemQuarantinedEvent -import akka.remote.UniqueAddress +import akka.remote._ import akka.remote.artery.AeronSource.ResourceLifecycle import akka.remote.artery.ArteryTransport.ShuttingDown import akka.remote.artery.Encoder.OutboundCompressionAccess @@ -624,13 +619,13 @@ private[remote] class ArteryTransport(_system: ExtendedActorSystem, _provider: R log.debug("Inbound channel is now active") } else if (status == ChannelEndpointStatus.ERRORED) { areonErrorLog.logErrors(log, 0L) - throw new RuntimeException("Inbound Aeron channel is in errored state. See Aeron logs for details.") + throw new RemoteTransportException("Inbound Aeron channel is in errored state. See Aeron logs for details.") } else if (status == ChannelEndpointStatus.INITIALIZING && retries > 0) { Thread.sleep(waitInterval) retry(retries - 1) } else { areonErrorLog.logErrors(log, 0L) - throw new RuntimeException("Timed out waiting for Aeron transport to bind. See Aeoron logs.") + throw new RemoteTransportException("Timed out waiting for Aeron transport to bind. See Aeoron logs.") } } } diff --git a/project/MultiNode.scala b/project/MultiNode.scala index 5ae9416941..aa038fcca8 100644 --- a/project/MultiNode.scala +++ b/project/MultiNode.scala @@ -3,7 +3,6 @@ */ package akka -import akka.TestExtras.Filter import akka.TestExtras.Filter.Keys._ import com.typesafe.sbt.{ SbtScalariform, SbtMultiJvm } import com.typesafe.sbt.SbtMultiJvm.MultiJvmKeys._