From 1df2c2d53aaf8fb27142b8e00812919e9044d39c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johan=20Andr=C3=A9n?= Date: Thu, 20 Feb 2020 14:03:50 +0100 Subject: [PATCH] UdpConnectedIntegrationSpec dns fail fix (#28558) * More generous timeout for dns resolution failure #28133 * Use async-dns as workaround for JDK/glibc/whatever resolution bug * Handle Async DNS lookup failure in UDP and TCP connections --- .../akka/io/UdpConnectedIntegrationSpec.scala | 15 ++++++++-- .../scala/akka/io/TcpOutgoingConnection.scala | 7 +++++ .../main/scala/akka/io/UdpConnection.scala | 28 ++++++++++++------- .../main/scala/akka/io/dns/DnsProtocol.scala | 4 +++ 4 files changed, 41 insertions(+), 13 deletions(-) diff --git a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala index 3ac6126d64..91ec67679a 100644 --- a/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/UdpConnectedIntegrationSpec.scala @@ -6,15 +6,24 @@ package akka.io import java.net.InetSocketAddress -import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } -import akka.util.ByteString import akka.actor.ActorRef import akka.testkit.SocketUtil.temporaryServerAddresses import akka.testkit.WithLogCapturing +import akka.testkit.AkkaSpec +import akka.testkit.ImplicitSender +import akka.testkit.TestProbe +import akka.util.ByteString + import scala.concurrent.duration._ class UdpConnectedIntegrationSpec extends AkkaSpec(""" akka.loglevel = DEBUG + akka.actor.debug.lifecycle = on + akka.actor.debug.autoreceive = on + akka.io.udp-connected.trace-logging = on + # issues with dns resolution of non existent host hanging with the + # Java native host resolution + akka.io.dns.resolver = async-dns akka.loggers = ["akka.testkit.SilenceAllTestEventListener"] """) with ImplicitSender with WithLogCapturing { @@ -45,7 +54,7 @@ class UdpConnectedIntegrationSpec extends AkkaSpec(""" val handler = TestProbe() val command = UdpConnected.Connect(handler.ref, InetSocketAddress.createUnresolved(serverAddress, 1234), None) commander.send(IO(UdpConnected), command) - commander.expectMsg(6.seconds, UdpConnected.CommandFailed(command)) + commander.expectMsg(10.seconds, UdpConnected.CommandFailed(command)) } "report error if can not resolve (cached)" in { diff --git a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala index f8bf0eb84c..c3d00383d3 100644 --- a/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala +++ b/akka-actor/src/main/scala/akka/io/TcpOutgoingConnection.scala @@ -7,6 +7,8 @@ package akka.io import java.net.{ ConnectException, InetSocketAddress } import java.nio.channels.{ SelectionKey, SocketChannel } +import akka.actor.Status.Failure + import scala.util.control.{ NoStackTrace, NonFatal } import scala.concurrent.duration._ import akka.actor.{ ActorRef, ReceiveTimeout } @@ -83,6 +85,11 @@ private[io] class TcpOutgoingConnection( } case ReceiveTimeout => connectionTimeout() + case Failure(ex) => + // async-dns responds with a Failure on DNS server lookup failure + reportConnectFailure { + throw new RuntimeException(ex) + } } def register(address: InetSocketAddress, registration: ChannelRegistration): Unit = { diff --git a/akka-actor/src/main/scala/akka/io/UdpConnection.scala b/akka-actor/src/main/scala/akka/io/UdpConnection.scala index e35c284cb6..f953a58bee 100644 --- a/akka-actor/src/main/scala/akka/io/UdpConnection.scala +++ b/akka-actor/src/main/scala/akka/io/UdpConnection.scala @@ -9,6 +9,8 @@ import java.nio.ByteBuffer import java.nio.channels.DatagramChannel import java.nio.channels.SelectionKey._ +import akka.actor.Status.Failure + import scala.annotation.tailrec import scala.util.control.NonFatal import akka.actor.{ Actor, ActorLogging, ActorRef } @@ -50,7 +52,9 @@ private[io] class UdpConnection( context.become(resolving()) } } else { - doConnect(remoteAddress) + reportConnectFailure { + doConnect(remoteAddress) + } } def resolving(): Receive = { @@ -58,18 +62,22 @@ private[io] class UdpConnection( reportConnectFailure { doConnect(new InetSocketAddress(r.address(), remoteAddress.getPort)) } + case Failure(ex) => + // async-dns responds with a Failure on DNS server lookup failure + reportConnectFailure { + throw new RuntimeException(ex) + } } def doConnect(@unused address: InetSocketAddress): Unit = { - reportConnectFailure { - channel = DatagramChannel.open - channel.configureBlocking(false) - val socket = channel.socket - options.foreach(_.beforeDatagramBind(socket)) - localAddress.foreach(socket.bind) - channel.connect(remoteAddress) - channelRegistry.register(channel, OP_READ) - } + channel = DatagramChannel.open + channel.configureBlocking(false) + val socket = channel.socket + options.foreach(_.beforeDatagramBind(socket)) + localAddress.foreach(socket.bind) + channel.connect(remoteAddress) + channelRegistry.register(channel, OP_READ) + log.debug("Successfully connected to [{}]", remoteAddress) } diff --git a/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala b/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala index 8fc07be2c2..85ed71b567 100644 --- a/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala +++ b/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala @@ -47,6 +47,10 @@ object DnsProtocol { */ def srvRequestType(): RequestType = Srv + /** + * Sending this to the [[AsyncDnsManager]] will either lead to a [[Resolved]] or a [[akka.actor.Status.Failure]] response. + * If request type are both, both resolutions must succeed or the response is a failure. + */ final case class Resolve(name: String, requestType: RequestType) extends ConsistentHashable { override def consistentHashKey: Any = name }