diff --git a/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala b/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala new file mode 100644 index 0000000000..f4e5d803bf --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/io/dns/AsyncDnsResolverIntegrationSpec.scala @@ -0,0 +1,199 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns + +import java.net.InetAddress + +import akka.io.dns.DnsProtocol.{ Ip, RequestType, Srv } +import akka.io.{ Dns, IO } +import akka.pattern.ask +import akka.testkit.AkkaSpec +import akka.util.Timeout + +import scala.concurrent.duration._ + +/* +Relies on two zones setup, akka.test and akka.test2 e.g. +* Install bind +* Create the two zones in /var/named/akka.test.zone and /var/named/akka.test2.zone +* Add the following to /etc/named.conf: + +zone "akka.test" IN { + type master; + file "akka.test.zone"; +}; + +zone "akka.test2" IN { + type master; + file "akka.test2.zone"; +}; + + +/var/named/akka.test.zone: + +$TTL 86400 + +@ IN SOA akka.test root.akka.test ( + 2017010302 + 3600 + 900 + 604800 + 86400 +) + +@ IN NS test +test IN A 192.168.1.19 +a-single IN A 192.168.1.20 +a-double IN A 192.168.1.21 +a-double IN A 192.168.1.22 +aaaa-single IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:1 +aaaa-double IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:2 +aaaa-double IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:3 +a-aaaa IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:4 +a-aaaa IN AAAA fd4d:36b2:3eca:a2d8:0:0:0:5 +a-aaaa IN A 192.168.1.23 +a-aaaa IN A 192.168.1.24 + +service.tcp 86400 IN SRV 10 60 5060 a-single +service.tcp 86400 IN SRV 10 40 5070 a-double + +cname-in IN CNAME a-double +cname-ext IN CNAME a-single.akka.test2. + +/var/named/akka.test2.zone: + +$TTL 86400 + +@ IN SOA akka.test2 root.akka.test2 ( + 2017010302 + 3600 + 900 + 604800 + 86400 +) + +@ IN NS test2 +test2 IN A 192.168.2.19 +a-single IN A 192.168.2.20 + +*/ +class AsyncDnsResolverIntegrationSpec extends AkkaSpec( + """ + akka.loglevel = DEBUG + akka.io.dns.resolver = async-dns + akka.io.dns.async-dns.nameservers = [localhost] +// akka.io.dns.async-dns.nameservers = default + """) { + val duration = 10.seconds + implicit val timeout = Timeout(duration) + + "Resolver" must { + pending + "resolve single A record" in { + val name = "a-single.akka.test" + val answer = resolve(name, DnsProtocol.Ip(ipv6 = false)) + withClue(answer) { + answer.name shouldEqual name + answer.results.size shouldEqual 1 + answer.results.head.name shouldEqual name + answer.results.head.asInstanceOf[ARecord].ip shouldEqual InetAddress.getByName("192.168.1.20") + } + } + + "resolve double A records" in { + val name = "a-double.akka.test" + val answer = resolve(name) + answer.name shouldEqual name + answer.results.map(_.asInstanceOf[ARecord].ip).toSet shouldEqual Set( + InetAddress.getByName("192.168.1.21"), + InetAddress.getByName("192.168.1.22") + ) + } + + "resolve single AAAA record" in { + val name = "aaaa-single.akka.test" + val answer = resolve(name) + answer.name shouldEqual name + answer.results.map(_.asInstanceOf[AAAARecord].ip) shouldEqual Seq(InetAddress.getByName("fd4d:36b2:3eca:a2d8:0:0:0:1")) + } + + "resolve double AAAA records" in { + val name = "aaaa-double.akka.test" + val answer = resolve(name) + answer.name shouldEqual name + answer.results.map(_.asInstanceOf[AAAARecord].ip).toSet shouldEqual Set( + InetAddress.getByName("fd4d:36b2:3eca:a2d8:0:0:0:2"), + InetAddress.getByName("fd4d:36b2:3eca:a2d8:0:0:0:3") + ) + } + + "resolve mixed A/AAAA records" in { + val name = "a-aaaa.akka.test" + val answer = resolve(name) + answer.name shouldEqual name + + answer.results.collect { case r: ARecord ⇒ r.ip }.toSet shouldEqual Set( + InetAddress.getByName("192.168.1.23"), + InetAddress.getByName("192.168.1.24") + ) + + answer.results.collect { case r: AAAARecord ⇒ r.ip }.toSet shouldEqual Set( + InetAddress.getByName("fd4d:36b2:3eca:a2d8:0:0:0:4"), + InetAddress.getByName("fd4d:36b2:3eca:a2d8:0:0:0:5") + ) + } + + "resolve external CNAME record" in { + val name = "cname-ext.akka.test" + val answer = (IO(Dns) ? DnsProtocol.Resolve(name)).mapTo[DnsProtocol.Resolved].futureValue + answer.name shouldEqual name + answer.results.collect { case r: CNameRecord ⇒ r.canonicalName }.toSet shouldEqual Set( + "a-single.akka.test2" + ) + answer.results.collect { case r: ARecord ⇒ r.ip }.toSet shouldEqual Set( + InetAddress.getByName("192.168.2.20") + ) + } + + "resolve internal CNAME record" in { + val name = "cname-in.akka.test" + val answer = resolve(name) + answer.name shouldEqual name + answer.results.collect { case r: CNameRecord ⇒ r.canonicalName }.toSet shouldEqual Set( + "a-double.akka.test" + ) + answer.results.collect { case r: ARecord ⇒ r.ip }.toSet shouldEqual Set( + InetAddress.getByName("192.168.1.21"), + InetAddress.getByName("192.168.1.22") + ) + } + + "resolve SRV record" in { + val name = "service.tcp.akka.test" + val answer = resolve("service.tcp.akka.test", Srv) + + answer.name shouldEqual name + answer.results.collect { case r: SRVRecord ⇒ r }.toSet shouldEqual Set( + SRVRecord("service.tcp.akka.test", 86400, 10, 60, 5060, "a-single.akka.test"), + SRVRecord("service.tcp.akka.test", 86400, 10, 40, 5070, "a-double.akka.test") + ) + } + + "resolve same address twice" in { + resolve("a-single.akka.test").results.map(_.asInstanceOf[ARecord].ip) shouldEqual Seq(InetAddress.getByName("192.168.1.20")) + resolve("a-single.akka.test").results.map(_.asInstanceOf[ARecord].ip) shouldEqual Seq(InetAddress.getByName("192.168.1.20")) + } + + "handle nonexistent domains" in { + val answer = (IO(Dns) ? DnsProtocol.Resolve("nonexistent.akka.test")).mapTo[DnsProtocol.Resolved].futureValue + answer.results shouldEqual List.empty + } + + def resolve(name: String, requestType: RequestType = Ip()) = { + (IO(Dns) ? DnsProtocol.Resolve(name, requestType)).mapTo[DnsProtocol.Resolved].futureValue + } + + } +} diff --git a/akka-actor-tests/src/test/scala/akka/io/dns/NameserverAddressParserSpec.scala b/akka-actor-tests/src/test/scala/akka/io/dns/NameserverAddressParserSpec.scala new file mode 100644 index 0000000000..7a85194a4d --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/io/dns/NameserverAddressParserSpec.scala @@ -0,0 +1,26 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns + +import java.net.InetSocketAddress + +import org.scalatest.{ Matchers, WordSpec } + +class NameserverAddressParserSpec extends WordSpec with Matchers { + "Parser" should { + "handle explicit port in IPv4 address" in { + DnsSettings.parseNameserverAddress("8.8.8.8:153") shouldEqual new InetSocketAddress("8.8.8.8", 153) + } + "handle explicit port in IPv6 address" in { + DnsSettings.parseNameserverAddress("[2001:4860:4860::8888]:153") shouldEqual new InetSocketAddress("2001:4860:4860::8888", 153) + } + "handle default port in IPv4 address" in { + DnsSettings.parseNameserverAddress("8.8.8.8") shouldEqual new InetSocketAddress("8.8.8.8", 53) + } + "handle default port in IPv6 address" in { + DnsSettings.parseNameserverAddress("[2001:4860:4860::8888]") shouldEqual new InetSocketAddress("2001:4860:4860::8888", 53) + } + } +} diff --git a/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsResolverSpec.scala b/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsResolverSpec.scala new file mode 100644 index 0000000000..e30c937746 --- /dev/null +++ b/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsResolverSpec.scala @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import java.net.{ Inet6Address, InetAddress } + +import akka.actor.Status.Failure +import akka.actor.{ ActorRef, ExtendedActorSystem, Props } +import akka.io.dns.{ AAAARecord, ARecord, DnsSettings } +import akka.testkit.{ AkkaSpec, ImplicitSender, TestProbe } +import com.typesafe.config.ConfigFactory +import akka.io.dns.DnsProtocol._ +import akka.io.dns.internal.AsyncDnsResolver.ResolveFailedException +import akka.io.dns.internal.DnsClient.{ Answer, Question4, Question6, SrvQuestion } + +import scala.collection.immutable + +class AsyncDnsResolverSpec extends AkkaSpec( + """ + akka.loglevel = INFO + """) with ImplicitSender { + + "Async DNS Resolver" must { + + "use dns clients in order" in { + val dnsClient1 = TestProbe() + val dnsClient2 = TestProbe() + val r = resolver(List(dnsClient1.ref, dnsClient2.ref)) + r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false)) + dnsClient1.expectMsg(Question4(1, "cats.com")) + dnsClient1.reply(Answer(1, immutable.Seq())) + dnsClient2.expectNoMessage() + expectMsg(Resolved("cats.com", immutable.Seq())) + } + + "move to next client if first fails" in { + val dnsClient1 = TestProbe() + val dnsClient2 = TestProbe() + val r = resolver(List(dnsClient1.ref, dnsClient2.ref)) + r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false)) + // first will get ask timeout + dnsClient1.expectMsg(Question4(1, "cats.com")) + dnsClient1.reply(Failure(new RuntimeException("Nope"))) + dnsClient2.expectMsg(Question4(2, "cats.com")) + dnsClient2.reply(Answer(2, immutable.Seq())) + expectMsg(Resolved("cats.com", immutable.Seq())) + } + + "move to next client if first times out" in { + val dnsClient1 = TestProbe() + val dnsClient2 = TestProbe() + val r = resolver(List(dnsClient1.ref, dnsClient2.ref)) + r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false)) + // first will get ask timeout + dnsClient1.expectMsg(Question4(1, "cats.com")) + dnsClient2.expectMsg(Question4(2, "cats.com")) + dnsClient2.reply(Answer(2, immutable.Seq())) + expectMsg(Resolved("cats.com", immutable.Seq())) + } + + "gets both A and AAAA records if requested" in { + val dnsClient1 = TestProbe() + val r = resolver(List(dnsClient1.ref)) + r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = true)) + dnsClient1.expectMsg(Question4(1, "cats.com")) + val ipv4Record = ARecord("cats.com", 100, InetAddress.getByName("127.0.0.1")) + dnsClient1.reply(Answer(1, immutable.Seq(ipv4Record))) + dnsClient1.expectMsg(Question6(2, "cats.com")) + val ipv6Record = AAAARecord("cats.com", 100, InetAddress.getByName("::1").asInstanceOf[Inet6Address]) + dnsClient1.reply(Answer(2, immutable.Seq(ipv6Record))) + expectMsg(Resolved("cats.com", immutable.Seq(ipv4Record, ipv6Record))) + } + + "fails if all dns clients timeout" in { + val dnsClient1 = TestProbe() + val dnsClient2 = TestProbe() + val r = resolver(List(dnsClient1.ref, dnsClient2.ref)) + r ! Resolve("cats.com", Ip(ipv4 = true, ipv6 = false)) + expectMsgPF(remainingOrDefault) { + case Failure(ResolveFailedException(_)) ⇒ + } + } + + "gets SRV records if requested" in { + val dnsClient1 = TestProbe() + val dnsClient2 = TestProbe() + val r = resolver(List(dnsClient1.ref, dnsClient2.ref)) + r ! Resolve("cats.com", Srv) + dnsClient1.expectMsg(SrvQuestion(1, "cats.com")) + dnsClient1.reply(Answer(1, immutable.Seq())) + dnsClient2.expectNoMessage() + expectMsg(Resolved("cats.com", immutable.Seq())) + } + } + + def resolver(clients: List[ActorRef]): ActorRef = { + val settings = new DnsSettings(system.asInstanceOf[ExtendedActorSystem], ConfigFactory.parseString( + """ + nameservers = ["one","two"] + resolve-timeout = 25ms + """)) + system.actorOf(Props(new AsyncDnsResolver(settings, new AsyncDnsCache(), (arf, l) ⇒ { + clients + }))) + } +} diff --git a/akka-actor/src/main/mima-filters/2.5.13.backwards.excludes b/akka-actor/src/main/mima-filters/2.5.13.backwards.excludes index 1313c0fddf..fc6853ca1b 100644 --- a/akka-actor/src/main/mima-filters/2.5.13.backwards.excludes +++ b/akka-actor/src/main/mima-filters/2.5.13.backwards.excludes @@ -1,3 +1,16 @@ # Fix NotInfluenceReceiveTimeout behavior when used with Timers trait (#24989) ProblemFilters.exclude[IncompatibleTemplateDefProblem]("akka.actor.TimerSchedulerImpl$TimerMsg") ProblemFilters.exclude[MissingClassProblem]("akka.actor.TimerSchedulerImpl$TimerMsg$") + +#12591 and #2517 - Async DNS +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#CacheEntry.copy") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.SimpleDnsCache#CacheEntry.copy$default$1") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.SimpleDnsCache#CacheEntry.answer") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#CacheEntry.this") +ProblemFilters.exclude[MissingTypesProblem]("akka.io.SimpleDnsCache$CacheEntry$") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#CacheEntry.apply") +ProblemFilters.exclude[DirectMissingMethodProblem]("akka.io.SimpleDnsCache#Cache.put") +ProblemFilters.exclude[IncompatibleResultTypeProblem]("akka.io.SimpleDnsCache#ExpiryEntry.name") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#ExpiryEntry.this") +ProblemFilters.exclude[MissingClassProblem]("akka.io.SimpleDnsCache$ExpiryEntryOrdering$") +ProblemFilters.exclude[IncompatibleMethTypeProblem]("akka.io.SimpleDnsCache#Cache.get") diff --git a/akka-actor/src/main/resources/reference.conf b/akka-actor/src/main/resources/reference.conf index dd1fbb4b41..5f5d841924 100644 --- a/akka-actor/src/main/resources/reference.conf +++ b/akka-actor/src/main/resources/reference.conf @@ -323,14 +323,21 @@ akka { } } - /IO-DNS/inet-address { + "/IO-DNS/inet-address" { mailbox = "unbounded" router = "consistent-hashing-pool" nr-of-instances = 4 } + "/IO-DNS/inet-address/*" { dispatcher = "akka.actor.default-blocking-io-dispatcher" } + + "/IO-DNS/async-dns" { + mailbox = "unbounded" + router = "round-robin-pool" + nr-of-instances = 1 + } } default-dispatcher { @@ -1011,7 +1018,7 @@ akka { # Name of the subconfig at path akka.io.dns, see inet-address below # - # Change to `async-dns` to use the new akka "native" DNS resolver, + # Change to `async-dns` to use the new "native" DNS resolver, # which is also capable of resolving SRV records. resolver = "inet-address" @@ -1035,18 +1042,25 @@ akka { cache-cleanup-interval = 120s } - async { - # Must implement akka.io.DnsProvider - provider-object = "akka.io.dns.AsyncDnsProvider" + async-dns { + provider-object = "akka.io.dns.internal.AsyncDnsProvider" # Configures nameservers to query during DNS resolution. - # # Defaults to the nameservers that would be used by the JVM by default. # Set to a list of IPs to override the servers, e.g. [ "8.8.8.8", "8.8.4.4" ] for Google's servers + # If multiple are defined then they are tried in order until one responds nameservers = default - # use /etc/resolve.conf on systems where it is available during DNS lookups - resolv-conf = on + # The time that a request is allowed to live before being discarded + # given no reply. The lower bound of this should always be the amount + # of time to reasonably expect a DNS server to reply within. + # If multiple name servers are provided then each gets this long to response before trying + # the next one + resolve-timeout = 5s + + # How often to sweep out expired cache entries. + # Note that this interval has nothing to do with TTLs + cache-cleanup-interval = 120s } } } diff --git a/akka-actor/src/main/scala/akka/io/Dns.scala b/akka-actor/src/main/scala/akka/io/Dns.scala index 1f65133002..fa64c99985 100644 --- a/akka-actor/src/main/scala/akka/io/Dns.scala +++ b/akka-actor/src/main/scala/akka/io/Dns.scala @@ -51,12 +51,10 @@ object Dns extends ExtensionId[DnsExt] with ExtensionIdProvider { } } - // TODO tempted to deprecate this one? def cached(name: String)(system: ActorSystem): Option[Resolved] = { Dns(system).cache.cached(name) } - // TODO tempted to deprecate this one? def resolve(name: String)(system: ActorSystem, sender: ActorRef): Option[Resolved] = { Dns(system).cache.resolve(name)(system, sender) } @@ -71,7 +69,8 @@ object Dns extends ExtensionId[DnsExt] with ExtensionIdProvider { override def get(system: ActorSystem): DnsExt = super.get(system) } -class DnsExt(system: ExtendedActorSystem) extends IO.Extension { +class DnsExt(val system: ExtendedActorSystem) extends IO.Extension { + val Settings = new Settings(system.settings.config.getConfig("akka.io.dns")) class Settings private[DnsExt] (_config: Config) { @@ -81,6 +80,8 @@ class DnsExt(system: ExtendedActorSystem) extends IO.Extension { val Resolver: String = getString("resolver") val ResolverConfig: Config = getConfig(Resolver) val ProviderObjectName: String = ResolverConfig.getString("provider-object") + + override def toString = s"Settings($Dispatcher, $Resolver, $ResolverConfig, $ProviderObjectName)" } val provider: DnsProvider = system.dynamicAccess.getClassFor[DnsProvider](Settings.ProviderObjectName).get.newInstance() diff --git a/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala b/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala index 68ce5f0b8a..8cc6bdf340 100644 --- a/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala +++ b/akka-actor/src/main/scala/akka/io/SimpleDnsCache.scala @@ -5,21 +5,23 @@ package akka.io import java.util.concurrent.atomic.AtomicReference + +import akka.annotation.InternalApi import akka.io.Dns.Resolved import scala.annotation.tailrec import scala.collection.immutable -private[io] sealed trait PeriodicCacheCleanup { +private[io] trait PeriodicCacheCleanup { def cleanup(): Unit } class SimpleDnsCache extends Dns with PeriodicCacheCleanup { - import akka.io.SimpleDnsCache._ + import SimpleDnsCache._ - private val cache = new AtomicReference(new Cache( - immutable.SortedSet()(ExpiryEntryOrdering), - immutable.Map(), clock _)) + private val cache = new AtomicReference(new Cache[String, Dns.Resolved]( + immutable.SortedSet()(expiryEntryOrdering[String]()), + Map(), clock)) private val nanoBase = System.nanoTime() @@ -36,7 +38,7 @@ class SimpleDnsCache extends Dns with PeriodicCacheCleanup { @tailrec private[io] final def put(r: Resolved, ttlMillis: Long): Unit = { val c = cache.get() - if (!cache.compareAndSet(c, c.put(r, ttlMillis))) + if (!cache.compareAndSet(c, c.put(r.name, r, ttlMillis))) put(r, ttlMillis) } @@ -49,25 +51,30 @@ class SimpleDnsCache extends Dns with PeriodicCacheCleanup { } object SimpleDnsCache { - private class Cache(queue: immutable.SortedSet[ExpiryEntry], cache: immutable.Map[String, CacheEntry], clock: () ⇒ Long) { - def get(name: String): Option[Resolved] = { + + /** + * INTERNAL API + */ + @InternalApi + private[io] class Cache[K, V](queue: immutable.SortedSet[ExpiryEntry[K]], cache: immutable.Map[K, CacheEntry[V]], clock: () ⇒ Long) { + def get(name: K): Option[V] = { for { e ← cache.get(name) if e.isValid(clock()) } yield e.answer } - def put(answer: Resolved, ttlMillis: Long): Cache = { + def put(name: K, answer: V, ttlMillis: Long): Cache[K, V] = { val until0 = clock() + ttlMillis val until = if (until0 < 0) Long.MaxValue else until0 - new Cache( - queue + new ExpiryEntry(answer.name, until), - cache + (answer.name → CacheEntry(answer, until)), + new Cache[K, V]( + queue + new ExpiryEntry[K](name, until), + cache + (name → CacheEntry(answer, until)), clock) } - def cleanup(): Cache = { + def cleanup(): Cache[K, V] = { val now = clock() var q = queue var c = cache @@ -82,17 +89,25 @@ object SimpleDnsCache { } } - private case class CacheEntry(answer: Dns.Resolved, until: Long) { + private case class CacheEntry[T](answer: T, until: Long) { def isValid(clock: Long): Boolean = clock < until } - private class ExpiryEntry(val name: String, val until: Long) extends Ordered[ExpiryEntry] { + /** + * INTERNAL API + */ + @InternalApi + private[io] class ExpiryEntry[K](val name: K, val until: Long) extends Ordered[ExpiryEntry[K]] { def isValid(clock: Long): Boolean = clock < until - override def compare(that: ExpiryEntry): Int = -until.compareTo(that.until) + override def compare(that: ExpiryEntry[K]): Int = -until.compareTo(that.until) } - private object ExpiryEntryOrdering extends Ordering[ExpiryEntry] { - override def compare(x: ExpiryEntry, y: ExpiryEntry): Int = { + /** + * INTERNAL API + */ + @InternalApi + private[io] def expiryEntryOrdering[K]() = new Ordering[ExpiryEntry[K]] { + override def compare(x: ExpiryEntry[K], y: ExpiryEntry[K]): Int = { x.until.compareTo(y.until) } } diff --git a/akka-actor/src/main/scala/akka/io/SimpleDnsManager.scala b/akka-actor/src/main/scala/akka/io/SimpleDnsManager.scala index bd8466432f..f35189194d 100644 --- a/akka-actor/src/main/scala/akka/io/SimpleDnsManager.scala +++ b/akka-actor/src/main/scala/akka/io/SimpleDnsManager.scala @@ -17,7 +17,11 @@ class SimpleDnsManager(val ext: DnsExt) extends Actor with RequiresMessageQueue[ import context._ - private val resolver = actorOf(FromConfig.props(Props(ext.provider.actorClass, ext.cache, ext.Settings.ResolverConfig).withDeploy(Deploy.local).withDispatcher(ext.Settings.Dispatcher)), ext.Settings.Resolver) + private val resolver = actorOf(FromConfig.props(Props(ext.provider.actorClass, ext.cache, ext.Settings.ResolverConfig) + .withDeploy(Deploy.local).withDispatcher(ext.Settings.Dispatcher)), ext.Settings.Resolver) + + private val inetDnsEnabled = ext.provider.actorClass == classOf[InetAddressDnsResolver] + private val cacheCleanup = ext.cache match { case cleanup: PeriodicCacheCleanup ⇒ Some(cleanup) case _ ⇒ None @@ -28,27 +32,25 @@ class SimpleDnsManager(val ext: DnsExt) extends Actor with RequiresMessageQueue[ system.scheduler.schedule(interval, interval, self, SimpleDnsManager.CacheCleanup) } - override def receive = { + override def receive: Receive = { case r @ Dns.Resolve(name) ⇒ log.debug("Resolution request for {} from {}", name, sender()) resolver.forward(r) - case SimpleDnsManager.CacheCleanup ⇒ - for (c ← cacheCleanup) - c.cleanup() - case m: dns.DnsProtocol.Protocol ⇒ - val legacyDnsResolver = classOf[InetAddressDnsResolver] - if (ext.provider.actorClass == legacyDnsResolver) { - // FIXME technically we COULD adopt the protocol here, but not sure we should... - log.warning( - "Message of [akka.io.dns.DnsProtocol.Protocol] received ({}), while the legacy {} was configured! Dropping DNS resolve request." + - "Please use [akka.io.dns.DnsProtocol.resolve] to create resolution requests for the Async DNS resolver.", - Logging.simpleName(m), Logging.simpleName(legacyDnsResolver)) + case SimpleDnsManager.CacheCleanup ⇒ + cacheCleanup.foreach(_.cleanup()) + + case m: dns.DnsProtocol.Resolve ⇒ + if (inetDnsEnabled) { + log.error( + "Message of [akka.io.dns.DnsProtocol.Protocol] received ({}) while inet-address dns was configured. Dropping DNS resolve request." + + "Only use [akka.io.dns.DnsProtocol.resolve] to create resolution requests for the Async DNS resolver.", + Logging.simpleName(m)) } else resolver.forward(m) } override def postStop(): Unit = { - for (t ← cleanupTimer) t.cancel() + cleanupTimer.foreach(_.cancel()) } } diff --git a/akka-actor/src/main/scala/akka/io/dns/AsyncDnsProvider.scala b/akka-actor/src/main/scala/akka/io/dns/AsyncDnsProvider.scala deleted file mode 100644 index 5844636e17..0000000000 --- a/akka-actor/src/main/scala/akka/io/dns/AsyncDnsProvider.scala +++ /dev/null @@ -1,13 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - */ - -package akka.io.dns - -import akka.io._ - -class AsyncDnsProvider extends DnsProvider { - override def cache: Dns = new SimpleDnsCache() - override def actorClass = classOf[InetAddressDnsResolver] - override def managerClass = classOf[SimpleDnsManager] -} diff --git a/akka-actor/src/main/scala/akka/io/dns/AsyncDnsResolver.scala b/akka-actor/src/main/scala/akka/io/dns/AsyncDnsResolver.scala deleted file mode 100644 index 8770898c82..0000000000 --- a/akka-actor/src/main/scala/akka/io/dns/AsyncDnsResolver.scala +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - */ - -package akka.io.dns - -import akka.actor.{ Actor, ActorLogging } -import akka.io.SimpleDnsCache -import akka.io.dns.protocol.DnsRecordType -import com.typesafe.config.Config -import akka.pattern.pipe - -import scala.concurrent.Future - -final class AsyncDnsResolver(cache: SimpleDnsCache, config: Config) extends Actor with ActorLogging { - implicit val ec = context.dispatcher - - override def receive: Receive = { - case m: DnsProtocol.Protocol ⇒ onReceive(m) - } - - def onReceive(m: DnsProtocol.Protocol): Unit = m match { - case DnsProtocol.Resolve(name, mode) ⇒ - resolve(name, mode) pipeTo sender() - } - - def resolve(str: String, types: Set[DnsRecordType]): Future[DnsProtocol.Resolved] = { - ??? - } -} - -object AsyncDnsResolver { - - /** SRV lookups start with `_` so we use this heurestic to issue an SRV lookup TODO DO WE REALLY */ - private[dns] def isLikelySrvLookup(name: String): Boolean = - name.startsWith("_") -} diff --git a/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala b/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala index da7a305f5a..c2f0b6d697 100644 --- a/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala +++ b/akka-actor/src/main/scala/akka/io/dns/DnsProtocol.scala @@ -4,9 +4,13 @@ package akka.io.dns -import akka.io.dns.protocol.{ DnsRecordType, DnsResourceRecord } +import java.util + +import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.ApiMayChange import scala.collection.immutable +import scala.collection.JavaConverters._ /** * Supersedes [[akka.io.Dns]] protocol. @@ -15,26 +19,54 @@ import scala.collection.immutable * * Allows for more detailed lookups, by specifying which records should be checked, * and responses can more information than plain IP addresses (e.g. ports for SRV records). + * */ +@ApiMayChange object DnsProtocol { - def resolve(name: String): Resolve = - Resolve(name, NormalLookupRecordTypes) + @ApiMayChange + sealed trait RequestType + final case class Ip(ipv4: Boolean = true, ipv6: Boolean = true) extends RequestType + final case object Srv extends RequestType - def resolve(name: String, recordTypes: Set[DnsRecordType]): Resolve = - Resolve(name, recordTypes) + /** + * Java API + */ + def ipRequestType(ipv4: Boolean, ipv6: Boolean): RequestType = Ip(ipv4, ipv6) - sealed trait Protocol - private[akka] final case class Resolve(name: String, mode: Set[DnsRecordType]) extends Protocol + /** + * Java API + */ + def ipRequestType(): RequestType = Ip(ipv4 = true, ipv6 = true) - final case class Resolved(name: String, results: immutable.Seq[DnsResourceRecord]) extends Protocol + /** + * Java API + */ + def srvRequestType(): RequestType = Srv - import DnsRecordType._ - /** The default set of record types most applications are interested in: A, AAAA and CNAME */ - final val NormalLookupRecordTypes = Set(A, AAAA, CNAME) + final case class Resolve(name: String, requestType: RequestType) - /** Request lookups of `SRV` records */ - final val ServiceRecordTypes = Set(SRV) + object Resolve { + def apply(name: String): Resolve = Resolve(name, Ip()) + + /** + * Java API + */ + def create(name: String): Resolve = Resolve(name, Ip()) + + /** + * Java API + */ + def create(name: String, requestType: RequestType): Resolve = Resolve(name, requestType) + } + + @ApiMayChange + final case class Resolved(name: String, results: immutable.Seq[ResourceRecord]) extends NoSerializationVerificationNeeded { + /** + * Java API + */ + def getResults(): util.List[ResourceRecord] = results.asJava + } } diff --git a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsResourceRecords.scala b/akka-actor/src/main/scala/akka/io/dns/DnsResourceRecords.scala similarity index 53% rename from akka-actor/src/main/scala/akka/io/dns/protocol/DnsResourceRecords.scala rename to akka-actor/src/main/scala/akka/io/dns/DnsResourceRecords.scala index 7ccd76eb54..1d66f5a3f0 100644 --- a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsResourceRecords.scala +++ b/akka-actor/src/main/scala/akka/io/dns/DnsResourceRecords.scala @@ -3,28 +3,41 @@ * Adopted from Apache v2 licensed: https://github.com/ilya-epifanov/akka-dns */ -package akka.io.dns.protocol +package akka.io.dns import java.net.{ Inet4Address, Inet6Address, InetAddress } -import akka.annotation.InternalApi +import akka.actor.NoSerializationVerificationNeeded +import akka.annotation.{ ApiMayChange, InternalApi } +import akka.io.dns.internal.{ DomainName, _ } import akka.util.{ ByteIterator, ByteString, ByteStringBuilder } import scala.annotation.switch -@InternalApi -sealed abstract class DnsResourceRecord(val name: String, val ttl: Int, val recType: Short, val recClass: Short) { - def write(it: ByteStringBuilder): Unit = { - DnsDomainName.write(it, name) +@ApiMayChange +sealed abstract class ResourceRecord(val name: String, val ttl: Int, val recType: Short, val recClass: Short) + extends NoSerializationVerificationNeeded { + + /** + * INTERNAL API + */ + @InternalApi + private[dns] def write(it: ByteStringBuilder): Unit = { + DomainName.write(it, name) it.putShort(recType) it.putShort(recClass) } } -@InternalApi +@ApiMayChange final case class ARecord(override val name: String, override val ttl: Int, - ip: Inet4Address) extends DnsResourceRecord(name, ttl, DnsRecordType.A.code, DnsRecordClass.IN.code) { - override def write(it: ByteStringBuilder): Unit = { + ip: InetAddress) extends ResourceRecord(name, ttl, RecordType.A.code, RecordClass.IN.code) { + + /** + * INTERNAL API + */ + @InternalApi + private[dns] override def write(it: ByteStringBuilder): Unit = { super.write(it) val addr = ip.getAddress it.putShort(addr.length) @@ -32,8 +45,11 @@ final case class ARecord(override val name: String, override val ttl: Int, } } +/** + * INTERNAL API + */ @InternalApi -object ARecord { +private[dns] object ARecord { def parseBody(name: String, ttl: Int, length: Short, it: ByteIterator): ARecord = { val addr = Array.ofDim[Byte](4) it.getBytes(addr) @@ -41,10 +57,15 @@ object ARecord { } } -@InternalApi +@ApiMayChange final case class AAAARecord(override val name: String, override val ttl: Int, - ip: Inet6Address) extends DnsResourceRecord(name, ttl, DnsRecordType.AAAA.code, DnsRecordClass.IN.code) { - override def write(it: ByteStringBuilder): Unit = { + ip: Inet6Address) extends ResourceRecord(name, ttl, RecordType.AAAA.code, RecordClass.IN.code) { + + /** + * INTERNAL API + */ + @InternalApi + private[dns] override def write(it: ByteStringBuilder): Unit = { super.write(it) val addr = ip.getAddress it.putShort(addr.length) @@ -52,8 +73,16 @@ final case class AAAARecord(override val name: String, override val ttl: Int, } } +/** + * INTERNAL API + */ @InternalApi -object AAAARecord { +private[dns] object AAAARecord { + + /** + * INTERNAL API + */ + @InternalApi def parseBody(name: String, ttl: Int, length: Short, it: ByteIterator): AAAARecord = { val addr = Array.ofDim[Byte](16) it.getBytes(addr) @@ -61,49 +90,72 @@ object AAAARecord { } } -@InternalApi -final case class CNAMERecord(override val name: String, override val ttl: Int, - canonicalName: String) extends DnsResourceRecord(name, ttl, DnsRecordType.CNAME.code, DnsRecordClass.IN.code) { +@ApiMayChange +final case class CNameRecord(override val name: String, override val ttl: Int, + canonicalName: String) extends ResourceRecord(name, ttl, RecordType.CNAME.code, RecordClass.IN.code) { + /** + * INTERNAL API + */ + @InternalApi override def write(it: ByteStringBuilder): Unit = { super.write(it) - it.putShort(DnsDomainName.length(name)) - DnsDomainName.write(it, name) + it.putShort(DomainName.length(name)) + DomainName.write(it, name) } } @InternalApi -object CNAMERecord { - def parseBody(name: String, ttl: Int, length: Short, it: ByteIterator, msg: ByteString): CNAMERecord = { - CNAMERecord(name, ttl, DnsDomainName.parse(it, msg)) +private[dns] object CNameRecord { + /** + * INTERNAL API + */ + @InternalApi + def parseBody(name: String, ttl: Int, length: Short, it: ByteIterator, msg: ByteString): CNameRecord = { + CNameRecord(name, ttl, DomainName.parse(it, msg)) } } -@InternalApi +@ApiMayChange final case class SRVRecord(override val name: String, override val ttl: Int, - priority: Int, weight: Int, port: Int, target: String) extends DnsResourceRecord(name, ttl, DnsRecordType.SRV.code, DnsRecordClass.IN.code) { + priority: Int, weight: Int, port: Int, target: String) extends ResourceRecord(name, ttl, RecordType.SRV.code, RecordClass.IN.code) { + /** + * INTERNAL API + */ + @InternalApi override def write(it: ByteStringBuilder): Unit = { super.write(it) it.putShort(priority) it.putShort(weight) it.putShort(port) - DnsDomainName.write(it, target) + DomainName.write(it, target) } } +/** + * INTERNAL API + */ @InternalApi -object SRVRecord { +private[dns] object SRVRecord { + /** + * INTERNAL API + */ + @InternalApi def parseBody(name: String, ttl: Int, length: Short, it: ByteIterator, msg: ByteString): SRVRecord = { val priority = it.getShort val weight = it.getShort val port = it.getShort - SRVRecord(name, ttl, priority, weight, port, DnsDomainName.parse(it, msg)) + SRVRecord(name, ttl, priority, weight, port, DomainName.parse(it, msg)) } } -@InternalApi +@ApiMayChange final case class UnknownRecord(override val name: String, override val ttl: Int, override val recType: Short, override val recClass: Short, - data: ByteString) extends DnsResourceRecord(name, ttl, recType, recClass) { + data: ByteString) extends ResourceRecord(name, ttl, recType, recClass) { + /** + * INTERNAL API + */ + @InternalApi override def write(it: ByteStringBuilder): Unit = { super.write(it) it.putShort(data.length) @@ -111,16 +163,30 @@ final case class UnknownRecord(override val name: String, override val ttl: Int, } } +/** + * INTERNAL API + */ @InternalApi -object UnknownRecord { +private[dns] object UnknownRecord { + /** + * INTERNAL API + */ + @InternalApi def parseBody(name: String, ttl: Int, recType: Short, recClass: Short, length: Short, it: ByteIterator): UnknownRecord = UnknownRecord(name, ttl, recType, recClass, it.toByteString) } +/** + * INTERNAL API + */ @InternalApi -object DnsResourceRecord { - def parse(it: ByteIterator, msg: ByteString): DnsResourceRecord = { - val name = DnsDomainName.parse(it, msg) +private[dns] object ResourceRecord { + /** + * INTERNAL API + */ + @InternalApi + def parse(it: ByteIterator, msg: ByteString): ResourceRecord = { + val name = DomainName.parse(it, msg) val recType = it.getShort val recClass = it.getShort val ttl = it.getInt @@ -129,7 +195,7 @@ object DnsResourceRecord { it.drop(rdLength) (recType: @switch) match { case 1 ⇒ ARecord.parseBody(name, ttl, rdLength, data) - case 5 ⇒ CNAMERecord.parseBody(name, ttl, rdLength, data, msg) + case 5 ⇒ CNameRecord.parseBody(name, ttl, rdLength, data, msg) case 28 ⇒ AAAARecord.parseBody(name, ttl, rdLength, data) case 33 ⇒ SRVRecord.parseBody(name, ttl, rdLength, data, msg) case _ ⇒ UnknownRecord.parseBody(name, ttl, recType, recClass, rdLength, data) diff --git a/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala b/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala index a9d45cef05..1280896441 100644 --- a/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala +++ b/akka-actor/src/main/scala/akka/io/dns/DnsSettings.scala @@ -5,93 +5,57 @@ package akka.io.dns -import java.io.File import java.net.{ InetSocketAddress, URI } -import java.nio.file.Paths -import java.util.concurrent.TimeUnit +import java.util -import com.typesafe.config.{ Config, ConfigValueType } +import akka.actor.ExtendedActorSystem +import akka.annotation.InternalApi +import akka.util.JavaDurationConverters._ +import com.typesafe.config.Config import scala.collection.JavaConverters._ import scala.collection.{ breakOut, immutable } import scala.concurrent.duration.FiniteDuration -import scala.io.Source -import scala.util.{ Failure, Success, Try } -import java.lang.reflect.Method -import java.security.{ AccessController, PrivilegedExceptionAction } -import java.util - -import scala.util.control.NonFatal +import scala.util.Try /** INTERNAL API */ -private[dns] final class DnsSettings(c: Config) { +@InternalApi +private[dns] final class DnsSettings(system: ExtendedActorSystem, c: Config) { + import DnsSettings._ - val systemNameServers = - if (c.getBoolean("resolv-conf")) - parseSystemNameServers(Paths.get("/etc/resolv.conf").toFile) - else - Option.empty[immutable.Seq[InetSocketAddress]] - - val NameServers: immutable.Seq[InetSocketAddress] = { + val NameServers: List[InetSocketAddress] = { val addrs = Try(c.getString("nameservers")).toOption.toList .flatMap { - case "default" ⇒ getDefaultSearchDomains().getOrElse(failUnableToDetermineDefaultNameservers) + case "default" ⇒ getDefaultNameServers(system).getOrElse(failUnableToDetermineDefaultNameservers) case address ⇒ parseNameserverAddress(address) :: Nil } if (addrs.nonEmpty) addrs else c.getStringList("nameservers").asScala.map(parseNameserverAddress)(breakOut) } - val NegativeTtl: Long = c.getDuration("negative-ttl", TimeUnit.MILLISECONDS) - val MinPositiveTtl: Long = c.getDuration("min-positive-ttl", TimeUnit.MILLISECONDS) - val MaxPositiveTtl: Long = c.getDuration("max-positive-ttl", TimeUnit.MILLISECONDS) - - val ResolveTimeout: FiniteDuration = FiniteDuration(c.getDuration("request-ttl", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + val ResolveTimeout: FiniteDuration = c.getDuration("resolve-timeout").asScala // ------------------------- - private val inetSocketAddress = """(.*?)(?::(\d+))?""".r - - def parseNameserverAddress(str: String): InetSocketAddress = { - val inetSocketAddress(host, port) = str - new InetSocketAddress(host, Option(port).fold(53)(_.toInt)) - } - - // TODO replace with actual parser, regex is very likely not efficient... - private val ipv4Address = """^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$""".r - private val ipv6Address = """^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$""".r - - private def isInetAddress(name: String): Boolean = - ipv4Address.findAllMatchIn(name).nonEmpty || ipv6Address.findAllMatchIn(name).nonEmpty - - // Note that the corresponding man page doesn't actually dictate the format of this field, - // just the keywords and their meanings. See http://man7.org/linux/man-pages/man5/resolv.conf.5.html - private[io] val NameserverLine = """^\s*nameserver\s+(.*)$""".r - - // OS specific. No encoding or charset is specified by the man page as I recall. - // See http://man7.org/linux/man-pages/man5/resolv.conf.5.html. - def parseSystemNameServers(resolvConf: File): Option[immutable.Seq[InetSocketAddress]] = - try { - val addresses = - for { - line ← Source.fromFile(resolvConf).getLines() - addr ← NameserverLine.findFirstMatchIn(line).map(_.group(1)) - } yield parseNameserverAddress(addr) - Some(addresses.toList) - } catch { - case NonFatal(_) ⇒ Option.empty - } - def failUnableToDetermineDefaultNameservers = throw new IllegalStateException("Unable to obtain default nameservers from JNDI or via reflection. " + - "Please set `akka.io.dns.async.nameservers` explicitly in order to be able to resolve domain names.") + "Please set `akka.io.dns.async-dns.nameservers` explicitly in order to be able to resolve domain names.") } object DnsSettings { private final val DnsFallbackPort = 53 + private val inetSocketAddress = """(.*?)(?::(\d+))?""".r + + /** + * INTERNAL API + */ + @InternalApi private[akka] def parseNameserverAddress(str: String): InetSocketAddress = { + val inetSocketAddress(host, port) = str + new InetSocketAddress(host, Option(port).fold(DnsFallbackPort)(_.toInt)) + } /** * INTERNAL API @@ -102,7 +66,7 @@ object DnsSettings { * * Based on: https://github.com/netty/netty/blob/4.1/resolver-dns/src/main/java/io/netty/resolver/dns/DefaultDnsServerAddressStreamProvider.java#L58-L146 */ - private[akka] def getDefaultSearchDomains(): Try[List[InetSocketAddress]] = { + private[akka] def getDefaultNameServers(system: ExtendedActorSystem): Try[List[InetSocketAddress]] = { def asInetSocketAddress(server: String): Try[InetSocketAddress] = { Try { val uri = new URI(server) @@ -116,11 +80,10 @@ object DnsSettings { } def getNameserversUsingJNDI: Try[List[InetSocketAddress]] = { - import javax.naming.Context - import javax.naming.NamingException - import javax.naming.directory.InitialDirContext - import java.net.URI import java.util + + import javax.naming.Context + import javax.naming.directory.InitialDirContext // Using jndi-dns to obtain the default name servers. // // See: @@ -144,17 +107,18 @@ object DnsSettings { // this method is used as a fallback in case JNDI results in an empty list // this method will not work when running modularised of course since it needs access to internal sun classes def getNameserversUsingReflection: Try[List[InetSocketAddress]] = { - Try { - val configClass = Class.forName("sun.net.dns.ResolverConfiguration") - val open = configClass.getMethod("open") - val nameservers = configClass.getMethod("nameservers") - val instance = open.invoke(null) - - val ns = nameservers.invoke(instance).asInstanceOf[util.List[String]] - val res = if (ns.isEmpty) throw new IllegalStateException("Empty nameservers list discovered using reflection. Consider configuring default nameservers manually!") - else ns.asScala.toList - res.flatMap(s ⇒ asInetSocketAddress(s).toOption) - } + system.dynamicAccess.getClassFor("sun.net.dns.ResolerConfiguration") + .flatMap { c ⇒ + Try { + val open = c.getMethod("open") + val nameservers = c.getMethod("nameservers") + val instance = open.invoke(null) + val ns = nameservers.invoke(instance).asInstanceOf[util.List[String]] + val res = if (ns.isEmpty) throw new IllegalStateException("Empty nameservers list discovered using reflection. Consider configuring default nameservers manually!") + else ns.asScala.toList + res.flatMap(s ⇒ asInetSocketAddress(s).toOption) + } + } } getNameserversUsingJNDI orElse getNameserversUsingReflection diff --git a/akka-actor/src/main/scala/akka/io/dns/RecordClass.scala b/akka-actor/src/main/scala/akka/io/dns/RecordClass.scala new file mode 100644 index 0000000000..62e4df652d --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/RecordClass.scala @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns + +final case class RecordClass(code: Short, name: String) + +object RecordClass { + + val IN = RecordClass(1, "IN") + val CS = RecordClass(2, "CS") + val CH = RecordClass(3, "CH") + val HS = RecordClass(4, "HS") + + val WILDCARD = RecordClass(255, "WILDCARD") + +} diff --git a/akka-actor/src/main/scala/akka/io/dns/RecordType.scala b/akka-actor/src/main/scala/akka/io/dns/RecordType.scala new file mode 100644 index 0000000000..3d047bdba5 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/RecordType.scala @@ -0,0 +1,86 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns + +import akka.annotation.ApiMayChange +import akka.util.OptionVal + +/** + * DNS Record Type + */ +@ApiMayChange +final case class RecordType(code: Short, name: String) + +@ApiMayChange +object RecordType { + /** + * array for fast lookups by id + * wasteful, but we get trivial indexing into it for lookup + */ + private final val lookupTable = Array.ofDim[RecordType](256) + + private[akka] def lookup(code: Int): RecordType = lookupTable(code) + def apply(id: Short): OptionVal[RecordType] = { + if (id < 1 || id > 255) OptionVal.None + else OptionVal(RecordType.lookup(id)) + } + + private def register(t: RecordType): RecordType = { + lookupTable(t.code) = t + t + } + + /** A host address */ + final val A: RecordType = register(RecordType(1, "A")) + /** An authoritative name server */ + final val NS: RecordType = register(RecordType(2, "NS")) + /** A mail destination (Obsolete - use MX) */ + final val MD: RecordType = register(RecordType(3, "MD")) + /** A mail forwarder (Obsolete - use MX) */ + final val MF: RecordType = register(RecordType(4, "MF")) + /** the canonical name for an alias */ + final val CNAME: RecordType = register(RecordType(5, "CNAME")) + /** marks the start of a zone of authority */ + final val SOA: RecordType = register(RecordType(6, "SOA")) + /** A mailbox domain name (EXPERIMENTAL) */ + final val MB: RecordType = register(RecordType(7, "MB")) + /** A mail group member (EXPERIMENTAL) */ + final val MG: RecordType = register(RecordType(8, "MG")) + /** A mail rename domain name (EXPERIMENTAL) */ + final val MR: RecordType = register(RecordType(9, "MR")) + /** A null RR (EXPERIMENTAL) */ + final val NULL: RecordType = register(RecordType(10, "NULL")) + /** A well known service description */ + final val WKS: RecordType = register(RecordType(11, "WKS")) + /** A domain name pointer */ + final val PTR: RecordType = register(RecordType(12, "PTR")) + /** host information */ + final val HINFO: RecordType = register(RecordType(13, "HINFO")) + /** mailbox or mail list information */ + final val MINFO: RecordType = register(RecordType(14, "MINFO")) + /** mail exchange */ + final val MX: RecordType = register(RecordType(15, "MX")) + /** text strings */ + final val TXT: RecordType = register(RecordType(16, "TXT")) + + /** The AAAA resource record type is a record specific to the Internet class that stores a single IPv6 address. */ + // See: https://tools.ietf.org/html/rfc3596 + final val AAAA: RecordType = register(RecordType(28, "AAAA")) + + /** + * The SRV RR allows administrators to use several servers for a single + * domain, to move services from host to host with little fuss, and to + * designate some hosts as primary servers for a service and others as + * backups. + */ + // See: https://tools.ietf.org/html/rfc2782 + final val SRV: RecordType = register(RecordType(33, "SRV")) + + final val AXFR: RecordType = register(RecordType(252, "AXFR")) + final val MAILB: RecordType = register(RecordType(253, "MAILB")) + final val MAILA: RecordType = register(RecordType(254, "MAILA")) + final val WILDCARD: RecordType = register(RecordType(255, "WILDCARD")) +} + diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala new file mode 100644 index 0000000000..7d1f0d514d --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala @@ -0,0 +1,69 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import java.util.concurrent.atomic.AtomicReference + +import akka.annotation.InternalApi +import akka.io.{ Dns, PeriodicCacheCleanup } + +import scala.collection.immutable +import akka.io.SimpleDnsCache._ +import akka.io.dns.internal.AsyncDnsResolver.{ Ipv4Type, Ipv6Type, QueryType } +import akka.io.dns.{ AAAARecord, ARecord, ResourceRecord } + +import scala.annotation.tailrec + +/** + * Internal API + */ +@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup { + private val cache = new AtomicReference(new Cache[(String, QueryType), immutable.Seq[ResourceRecord]]( + immutable.SortedSet()(expiryEntryOrdering()), + immutable.Map(), clock)) + + private val nanoBase = System.nanoTime() + + /** + * Gets any IPv4 and IPv6 cached entries. + * To get Srv or just one type use DnsProtocol + */ + override def cached(name: String): Option[Dns.Resolved] = { + for { + ipv4 ← cache.get().get((name, Ipv4Type)) + ipv6 ← cache.get().get((name, Ipv6Type)) + } yield { + Dns.Resolved(name, (ipv4 ++ ipv6).collect { + case r: ARecord ⇒ r.ip + case r: AAAARecord ⇒ r.ip + }) + } + } + + // Milliseconds since start + protected def clock(): Long = { + val now = System.nanoTime() + if (now - nanoBase < 0) 0 + else (now - nanoBase) / 1000000 + } + + private[io] final def get(key: (String, QueryType)): Option[immutable.Seq[ResourceRecord]] = { + cache.get().get(key) + } + + @tailrec + private[io] final def put(key: (String, QueryType), records: immutable.Seq[ResourceRecord], ttlMillis: Long): Unit = { + val c = cache.get() + if (!cache.compareAndSet(c, c.put(key, records, ttlMillis))) + put(key, records, ttlMillis) + } + + @tailrec + override final def cleanup(): Unit = { + val c = cache.get() + if (!cache.compareAndSet(c, c.cleanup())) + cleanup() + } +} diff --git a/akka-actor/src/main/scala/akka/io/dns/AsyncDnsManager.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala similarity index 51% rename from akka-actor/src/main/scala/akka/io/dns/AsyncDnsManager.scala rename to akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala index c75adba46c..7f6e690a18 100644 --- a/akka-actor/src/main/scala/akka/io/dns/AsyncDnsManager.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala @@ -2,52 +2,75 @@ * Copyright (C) 2018 Lightbend Inc. */ -package akka.io.dns +package akka.io.dns.internal +import java.net.InetSocketAddress import java.util.concurrent.TimeUnit -import akka.actor.{ Actor, ActorLogging, Deploy, Props } +import akka.actor.{ Actor, ActorLogging, ActorRefFactory, Deploy, Props, Timers } +import akka.annotation.InternalApi import akka.dispatch.{ RequiresMessageQueue, UnboundedMessageQueueSemantics } +import akka.io.dns.{ DnsProtocol, DnsSettings } +import akka.io.dns.internal.AsyncDnsManager.CacheCleanup import akka.io.{ Dns, DnsExt, PeriodicCacheCleanup } import akka.routing.FromConfig import scala.concurrent.duration.Duration -final class AsyncDnsManager(val ext: DnsExt) extends Actor with RequiresMessageQueue[UnboundedMessageQueueSemantics] with ActorLogging { - import AsyncDnsManager._ +/** + * INTERNAL API + */ +@InternalApi +private[io] object AsyncDnsManager { + private case object CacheCleanup +} + +/** + * INTERNAL API + */ +@InternalApi +private[io] final class AsyncDnsManager(val ext: DnsExt) extends Actor + with RequiresMessageQueue[UnboundedMessageQueueSemantics] with ActorLogging with Timers { + implicit val ec = context.dispatcher private var oldProtocolWarningLoggedTimes = 0 + val settings = new DnsSettings(ext.system, ext.Settings.ResolverConfig) + private val resolver = { - val props: Props = FromConfig.props(Props(ext.provider.actorClass, ext.cache, ext.Settings.ResolverConfig).withDeploy(Deploy.local).withDispatcher(ext.Settings.Dispatcher)) + val props: Props = FromConfig.props(Props(ext.provider.actorClass, settings, ext.cache, (factory: ActorRefFactory, dns: List[InetSocketAddress]) ⇒ { + dns.map(ns ⇒ factory.actorOf(Props(new DnsClient(ns)))) + }).withDeploy(Deploy.local).withDispatcher(ext.Settings.Dispatcher)) context.actorOf(props, ext.Settings.Resolver) } + private val cacheCleanup = ext.cache match { case cleanup: PeriodicCacheCleanup ⇒ Some(cleanup) case _ ⇒ None } - private val cleanupTimer = cacheCleanup map { _ ⇒ - val interval = Duration(ext.Settings.ResolverConfig.getDuration("cache-cleanup-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) - context.system.scheduler.schedule(interval, interval, self, AsyncDnsManager.CacheCleanup) + override def preStart(): Unit = { + cacheCleanup.foreach { _ ⇒ + val interval = Duration(ext.Settings.ResolverConfig.getDuration("cache-cleanup-interval", TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS) + timers.startPeriodicTimer(CacheCleanup, CacheCleanup, interval) + } } - override def receive = { + override def receive: Receive = { case r: DnsProtocol.Resolve ⇒ - log.debug("Resolution request for {} {}from {}", r.name, r.mode, sender()) + log.debug("Resolution request for {} {} from {}", r.name, r.requestType, sender()) resolver.forward(r) - case r @ Dns.Resolve(name) ⇒ + case Dns.Resolve(name) ⇒ // adapt legacy protocol to new protocol log.debug("Resolution request for {} from {}", name, sender()) warnAboutOldProtocolUse(name) - val adapted = DnsProtocol.resolve(name) + val adapted = DnsProtocol.Resolve(name) resolver.forward(adapted) case CacheCleanup ⇒ - for (c ← cacheCleanup) - c.cleanup() + cacheCleanup.foreach(_.cleanup()) } private def warnAboutOldProtocolUse(name: String): Unit = { @@ -58,12 +81,5 @@ final class AsyncDnsManager(val ext: DnsExt) extends Actor with RequiresMessageQ "(This warning will be logged at most {} times)", name, warnAtMostTimes) } } - - override def postStop(): Unit = { - for (t ← cleanupTimer) t.cancel() - } } -object AsyncDnsManager { - private case object CacheCleanup -} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsProvider.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsProvider.scala new file mode 100644 index 0000000000..e75babd481 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsProvider.scala @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import akka.annotation.InternalApi +import akka.io._ + +/** + * INTERNAL API + */ +@InternalApi +private[akka] class AsyncDnsProvider extends DnsProvider { + override def cache: Dns = new AsyncDnsCache() + override def actorClass = classOf[AsyncDnsResolver] + override def managerClass = classOf[AsyncDnsManager] +} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsResolver.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsResolver.scala new file mode 100644 index 0000000000..cef893a989 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsResolver.scala @@ -0,0 +1,155 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import java.net.InetSocketAddress + +import akka.actor.{ Actor, ActorLogging, ActorRef, ActorRefFactory, Props } +import akka.annotation.InternalApi +import akka.io.dns.DnsProtocol.{ Ip, RequestType, Srv } +import akka.io.dns.internal.DnsClient._ +import akka.io.dns.{ DnsProtocol, DnsSettings, ResourceRecord } +import akka.pattern.{ ask, pipe } +import akka.util.{ Helpers, Timeout } + +import scala.collection.immutable.Seq +import scala.collection.{ breakOut, immutable } +import scala.concurrent.Future +import scala.util.control.NonFatal + +/** + * INTERNAL API + */ +@InternalApi +private[io] final class AsyncDnsResolver( + settings: DnsSettings, + cache: AsyncDnsCache, + clientFactory: (ActorRefFactory, List[InetSocketAddress]) ⇒ List[ActorRef]) extends Actor with ActorLogging { + + import AsyncDnsResolver._ + + implicit val ec = context.dispatcher + + // For ask to DNS Client + implicit val timeout = Timeout(settings.ResolveTimeout) + + val nameServers = settings.NameServers + + log.debug("Using name servers [{}]", nameServers) + + private var requestId: Short = 0 + private def nextId(): Short = { + requestId = (requestId + 1).toShort + requestId + } + + private val resolvers: List[ActorRef] = clientFactory(context, nameServers) + + override def receive: Receive = { + case DnsProtocol.Resolve(name, _) if isInetAddress(name) ⇒ + log.warning("Tried to resolve ip [{}]. Ignoring.", name) + case DnsProtocol.Resolve(name, mode) ⇒ + resolve(name, mode, resolvers) pipeTo sender() + } + + private def resolve(name: String, requestType: RequestType, resolvers: List[ActorRef]): Future[DnsProtocol.Resolved] = { + resolvers match { + case Nil ⇒ + Future.failed(ResolveFailedException(s"Timed out resolving $name with nameservers: $nameServers")) + case head :: tail ⇒ resolve(name, requestType, head).recoverWith { + case NonFatal(t) ⇒ + log.error(t, "Resolve failed. Trying next name server") + resolve(name, requestType, tail) + } + } + } + + private def sendQuestion(resolver: ActorRef, message: DnsQuestion): Future[Seq[ResourceRecord]] = { + val result = (resolver ? message).mapTo[Answer].map(_.rrs) + result.onFailure { + case NonFatal(_) ⇒ resolver ! DropRequest(message.id) + } + result + } + + private def resolve(name: String, requestType: RequestType, resolver: ActorRef): Future[DnsProtocol.Resolved] = { + log.debug("Attempting to resolve {} with {}", name, resolver) + val caseFoldedName = Helpers.toRootLowerCase(name) + val recs: Future[Seq[ResourceRecord]] = requestType match { + case Ip(ipv4, ipv6) ⇒ + val ipv4Recs = if (ipv4) + cache.get((name, Ipv4Type)) match { + case Some(r) ⇒ + log.debug("Ipv4 cached {}", r) + Future.successful(r) + case None ⇒ + sendQuestion(resolver, Question4(nextId(), caseFoldedName)) + } + else + Empty + + val ipv6Recs = if (ipv6) + cache.get((name, Ipv6Type)) match { + case Some(r) ⇒ + log.debug("Ipv6 cached {}", r) + Future.successful(r) + case None ⇒ + sendQuestion(resolver, Question6(nextId(), caseFoldedName)) + } + else + Empty + + ipv4Recs.flatMap(ipv4Records ⇒ { + // TODO, do we want config to specify a max for this? + if (ipv4Records.nonEmpty) { + val minTtl4 = ipv4Records.minBy(_.ttl).ttl + cache.put((name, Ipv4Type), ipv4Records, minTtl4) + } + ipv6Recs.map(ipv6Records ⇒ { + if (ipv6Records.nonEmpty) { + val minTtl6 = ipv6Records.minBy(_.ttl).ttl + cache.put((name, Ipv6Type), ipv6Records, minTtl6) + } + ipv4Records ++ ipv6Records + }) + }) + case Srv ⇒ + cache.get((name, Ipv4Type)) match { + case Some(r) ⇒ Future.successful(r) + case None ⇒ + sendQuestion(resolver, SrvQuestion(nextId(), caseFoldedName)) + } + + } + + recs.map(result ⇒ DnsProtocol.Resolved(name, result)) + } + +} + +/** + * INTERNAL API + */ +@InternalApi +private[io] object AsyncDnsResolver { + + private val ipv4Address = + """^[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}$""".r + + private val ipv6Address = + """^\s*((([0-9A-Fa-f]{1,4}:){7}([0-9A-Fa-f]{1,4}|:))|(([0-9A-Fa-f]{1,4}:){6}(:[0-9A-Fa-f]{1,4}|((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){5}(((:[0-9A-Fa-f]{1,4}){1,2})|:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3})|:))|(([0-9A-Fa-f]{1,4}:){4}(((:[0-9A-Fa-f]{1,4}){1,3})|((:[0-9A-Fa-f]{1,4})?:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){3}(((:[0-9A-Fa-f]{1,4}){1,4})|((:[0-9A-Fa-f]{1,4}){0,2}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){2}(((:[0-9A-Fa-f]{1,4}){1,5})|((:[0-9A-Fa-f]{1,4}){0,3}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(([0-9A-Fa-f]{1,4}:){1}(((:[0-9A-Fa-f]{1,4}){1,6})|((:[0-9A-Fa-f]{1,4}){0,4}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:))|(:(((:[0-9A-Fa-f]{1,4}){1,7})|((:[0-9A-Fa-f]{1,4}){0,5}:((25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)(\.(25[0-5]|2[0-4]\d|1\d\d|[1-9]?\d)){3}))|:)))(%.+)?\s*$""".r + + private def isInetAddress(name: String): Boolean = + ipv4Address.findAllMatchIn(name).nonEmpty || ipv6Address.findAllMatchIn(name).nonEmpty + + private val Empty = Future.successful(immutable.Seq.empty[ResourceRecord]) + + sealed trait QueryType + final case object Ipv4Type extends QueryType + final case object Ipv6Type extends QueryType + final case object SrvType extends QueryType + + case class ResolveFailedException(msg: String) extends Exception(msg) +} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala b/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala new file mode 100644 index 0000000000..4ad564987a --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/DnsClient.scala @@ -0,0 +1,119 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import java.net.{ InetAddress, InetSocketAddress } + +import akka.actor.Status.Failure +import akka.actor.{ Actor, ActorLogging, ActorRef, NoSerializationVerificationNeeded, Stash } +import akka.annotation.InternalApi +import akka.io.dns.{ RecordClass, RecordType, ResourceRecord } +import akka.io.{ IO, Udp } + +import scala.collection.immutable +import scala.util.Try + +/** + * INTERNAL API + */ +@InternalApi private[akka] object DnsClient { + sealed trait DnsQuestion { + def id: Short + } + final case class SrvQuestion(id: Short, name: String) extends DnsQuestion + final case class Question4(id: Short, name: String) extends DnsQuestion + final case class Question6(id: Short, name: String) extends DnsQuestion + final case class Answer(id: Short, rrs: immutable.Seq[ResourceRecord]) extends NoSerializationVerificationNeeded + final case class DropRequest(id: Short) +} + +/** + * INTERNAL API + */ +@InternalApi private[akka] class DnsClient(ns: InetSocketAddress) extends Actor with ActorLogging with Stash { + + import DnsClient._ + + import context.system + + IO(Udp) ! Udp.Bind(self, new InetSocketAddress(InetAddress.getByAddress(Array.ofDim(4)), 0)) + + var inflightRequests: Map[Short, ActorRef] = Map.empty + + def receive: Receive = { + case Udp.Bound(local) ⇒ + log.debug(s"Bound to UDP address [{}]", local) + context.become(ready(sender())) + unstashAll() + case _: Question4 ⇒ + stash() + case _: Question6 ⇒ + stash() + case _: SrvQuestion ⇒ + stash() + } + + private def message(name: String, id: Short, recordType: RecordType): Message = { + Message(id, MessageFlags(), immutable.Seq(Question(name, recordType, RecordClass.IN))) + } + + def ready(socket: ActorRef): Receive = { + case DropRequest(id) ⇒ + log.debug("Dropping request [{}]", id) + inflightRequests -= id + case Question4(id, name) ⇒ + log.debug("Resolving [{}] (A)", name) + inflightRequests += (id -> sender()) + val msg = message(name, id, RecordType.A) + log.debug(s"Message [{}] to [{}]: [{}]", id, ns, msg) + socket ! Udp.Send(msg.write(), ns) + + case Question6(id, name) ⇒ + log.debug("Resolving [{}] (AAAA)", name) + inflightRequests += (id -> sender()) + val msg = message(name, id, RecordType.AAAA) + log.debug(s"Message to [{}]: [{}]", ns, msg) + socket ! Udp.Send(msg.write(), ns) + + case SrvQuestion(id, name) ⇒ + log.debug("Resolving [{}] (SRV)", name) + inflightRequests += (id -> sender()) + val msg = message(name, id, RecordType.SRV) + log.debug(s"Message to {}: msg", ns, msg) + socket ! Udp.Send(msg.write(), ns) + + case Udp.CommandFailed(cmd) ⇒ + log.debug("Command failed [{}]", cmd) + cmd match { + case send: Udp.Send ⇒ + // best effort, don't throw + Try { + val msg = Message.parse(send.payload) + inflightRequests.get(msg.id).foreach { s ⇒ + s ! Failure(new RuntimeException("Send failed to nameserver")) + inflightRequests -= msg.id + } + } + case _ ⇒ + log.warning("Dns client failed to send {}", cmd) + } + case Udp.Received(data, remote) ⇒ + log.debug(s"Received message from [{}]: [{}]", remote, data) + val msg = Message.parse(data) + log.debug(s"Decoded: $msg") + val recs = if (msg.flags.responseCode == ResponseCode.SUCCESS) msg.answerRecs else immutable.Seq.empty + val response = Answer(msg.id, recs) + inflightRequests.get(response.id) match { + case Some(reply) ⇒ + reply ! response + inflightRequests -= response.id + case None ⇒ + log.debug("Client for id {} not found. Discarding response.", response.id) + } + + case Udp.Unbind ⇒ socket ! Udp.Unbind + case Udp.Unbound ⇒ context.stop(self) + } +} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/DnsMessage.scala b/akka-actor/src/main/scala/akka/io/dns/internal/DnsMessage.scala new file mode 100644 index 0000000000..197d16d516 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/DnsMessage.scala @@ -0,0 +1,144 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import akka.annotation.InternalApi +import akka.io.dns.ResourceRecord +import akka.util.{ ByteString, ByteStringBuilder } + +import scala.collection.immutable.Seq + +/** + * INTERNAL API + */ +@InternalApi +private[internal] object OpCode extends Enumeration { + val QUERY = Value(0) + val IQUERY = Value(1) + val STATUS = Value(2) +} + +/** + * INTERNAL API + */ +@InternalApi +private[internal] object ResponseCode extends Enumeration { + val SUCCESS = Value(0) + val FORMAT_ERROR = Value(1) + val SERVER_FAILURE = Value(2) + val NAME_ERROR = Value(3) + val NOT_IMPLEMENTED = Value(4) + val REFUSED = Value(5) +} + +/** + * INTERNAL API + */ +@InternalApi +private[internal] case class MessageFlags(flags: Short) extends AnyVal { + def isQuery: Boolean = (flags & 0x8000) == 0 + + def isAnswer = !isQuery + + def opCode: OpCode.Value = OpCode((flags & 0x7800) >> 11) + + def isAuthoritativeAnswer: Boolean = (flags & (1 << 10)) != 0 + + def isTruncated: Boolean = (flags & (1 << 9)) != 0 + + def isRecursionDesired: Boolean = (flags & (1 << 8)) != 0 + + def isRecursionAvailable: Boolean = (flags & (1 << 7)) != 0 + + def responseCode: ResponseCode.Value = { + ResponseCode(flags & 0x0f) + } + + override def toString: String = { + var ret = List[String]() + ret +:= s"$responseCode" + if (isRecursionAvailable) ret +:= "RA" + if (isRecursionDesired) ret +:= "RD" + if (isTruncated) ret +:= "TR" + if (isAuthoritativeAnswer) ret +:= "AA" + ret +:= s"$opCode" + if (isAnswer) ret +:= "AN" + ret.mkString("<", ",", ">") + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[internal] object MessageFlags { + def apply(answer: Boolean = false, opCode: OpCode.Value = OpCode.QUERY, authoritativeAnswer: Boolean = false, + truncated: Boolean = false, recursionDesired: Boolean = true, recursionAvailable: Boolean = false, + responseCode: ResponseCode.Value = ResponseCode.SUCCESS): MessageFlags = { + new MessageFlags(( + (if (answer) 0x8000 else 0) | + (opCode.id << 11) | + (if (authoritativeAnswer) 1 << 10 else 0) | + (if (truncated) 1 << 9 else 0) | + (if (recursionDesired) 1 << 8 else 0) | + (if (recursionAvailable) 1 << 7 else 0) | + responseCode.id).toShort) + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[internal] case class Message( + id: Short, + flags: MessageFlags, + questions: Seq[Question] = Seq.empty, + answerRecs: Seq[ResourceRecord] = Seq.empty, + authorityRecs: Seq[ResourceRecord] = Seq.empty, + additionalRecs: Seq[ResourceRecord] = Seq.empty) { + def write(): ByteString = { + val ret = ByteString.newBuilder + write(ret) + ret.result() + } + + def write(ret: ByteStringBuilder): Unit = { + ret.putShort(id) + .putShort(flags.flags) + .putShort(questions.size) + .putShort(answerRecs.size) + .putShort(authorityRecs.size) + .putShort(additionalRecs.size) + + questions.foreach(_.write(ret)) + answerRecs.foreach(_.write(ret)) + authorityRecs.foreach(_.write(ret)) + additionalRecs.foreach(_.write(ret)) + } +} + +/** + * INTERNAL API + */ +@InternalApi +private[internal] object Message { + def parse(msg: ByteString): Message = { + val it = msg.iterator + val id = it.getShort + val flags = it.getShort + val qdCount = it.getShort + val anCount = it.getShort + val nsCount = it.getShort + val arCount = it.getShort + + val qs = (0 until qdCount) map { i ⇒ Question.parse(it, msg) } + val ans = (0 until anCount) map { i ⇒ ResourceRecord.parse(it, msg) } + val nss = (0 until nsCount) map { i ⇒ ResourceRecord.parse(it, msg) } + val ars = (0 until arCount) map { i ⇒ ResourceRecord.parse(it, msg) } + + new Message(id, new MessageFlags(flags), qs, ans, nss, ars) + } +} diff --git a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsDomainName.scala b/akka-actor/src/main/scala/akka/io/dns/internal/DomainName.scala similarity index 77% rename from akka-actor/src/main/scala/akka/io/dns/protocol/DnsDomainName.scala rename to akka-actor/src/main/scala/akka/io/dns/internal/DomainName.scala index 6fe7c387bc..8f70b59f8f 100644 --- a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsDomainName.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/DomainName.scala @@ -3,11 +3,16 @@ * Adopted from Apache v2 licensed: https://github.com/ilya-epifanov/akka-dns */ -package akka.io.dns.protocol +package akka.io.dns.internal +import akka.annotation.InternalApi import akka.util.{ ByteIterator, ByteString, ByteStringBuilder } -object DnsDomainName { +/** + * INTERNAL API + */ +@InternalApi +private[akka] object DomainName { def length(name: String): Short = { (name.length + 2).toShort } @@ -24,15 +29,10 @@ object DnsDomainName { def parse(it: ByteIterator, msg: ByteString): String = { val ret = StringBuilder.newBuilder - // ret.sizeHint(getNameLength(it.clone(), 0)) - // println("Parsing name") while (true) { val length = it.getByte - // println(s"Label length: $length") - if (length == 0) { val r = ret.result() - // println(s"Name: $r") return r } @@ -41,13 +41,12 @@ object DnsDomainName { if ((length & 0xc0) == 0xc0) { val offset = ((length.toShort & 0x3f) << 8) | (it.getByte.toShort & 0x00ff) - // println(s"Computed offset: $offset") return ret.result() + parse(msg.iterator.drop(offset), msg) } ret.appendAll(it.clone().take(length).map(_.toChar)) it.drop(length) } - ??? // FIXME!!!!!!!!!! + throw new RuntimeException(s"Unable to parse domain name from msg: $msg") } } diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/Question.scala b/akka-actor/src/main/scala/akka/io/dns/internal/Question.scala new file mode 100644 index 0000000000..25f47bcb08 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/Question.scala @@ -0,0 +1,34 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + * Adopted from Apache v2 licensed: https://github.com/ilya-epifanov/akka-dns + */ + +package akka.io.dns.internal + +import akka.annotation.InternalApi +import akka.io.dns.{ RecordClass, RecordType } +import akka.util.{ ByteIterator, ByteString, ByteStringBuilder } + +/** + * INTERNAL API + */ +@InternalApi +private[akka] final case class Question(name: String, qType: RecordType, qClass: RecordClass) { + def write(out: ByteStringBuilder): Unit = { + DomainName.write(out, name) + RecordTypeSerializer.write(out, qType) + RecordClassSerializer.write(out, qClass) + } +} +/** + * INTERNAL API + */ +@InternalApi +private[akka] object Question { + def parse(it: ByteIterator, msg: ByteString): Question = { + val name = DomainName.parse(it, msg) + val qType = RecordTypeSerializer.parse(it) + val qClass = RecordClassSerializer.parse(it) + Question(name, qType, qClass) + } +} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/RecordClassSerializer.scala b/akka-actor/src/main/scala/akka/io/dns/internal/RecordClassSerializer.scala new file mode 100644 index 0000000000..b26bf1b9d8 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/RecordClassSerializer.scala @@ -0,0 +1,31 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + * Adopted from Apache v2 licensed: https://github.com/ilya-epifanov/akka-dns + */ + +package akka.io.dns.internal + +import akka.annotation.InternalApi +import akka.io.dns.RecordClass +import akka.util.{ ByteIterator, ByteStringBuilder } + +/** + * INTERNAL API + */ +@InternalApi +private[akka] object RecordClassSerializer { + + def parse(it: ByteIterator): RecordClass = { + it.getShort match { + case 1 ⇒ RecordClass.IN + case 2 ⇒ RecordClass.CS + case 3 ⇒ RecordClass.CH + case 255 ⇒ RecordClass.WILDCARD + case unknown ⇒ throw new RuntimeException(s"Unexpected record class $unknown") + } + } + + def write(out: ByteStringBuilder, c: RecordClass): Unit = { + out.putShort(c.code) + } +} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/RecordTypeSerializer.scala b/akka-actor/src/main/scala/akka/io/dns/internal/RecordTypeSerializer.scala new file mode 100644 index 0000000000..e45777e22d --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/RecordTypeSerializer.scala @@ -0,0 +1,28 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns.internal + +import akka.io.dns.RecordType +import akka.util.{ ByteIterator, ByteStringBuilder, OptionVal } + +/** + * INTERNAL API + */ +private[akka] object RecordTypeSerializer { + + // TODO other type than ByteStringBuilder? (was used in akka-dns) + def write(out: ByteStringBuilder, value: RecordType): Unit = { + out.putShort(value.code) + } + + def parse(it: ByteIterator): RecordType = { + val id = it.getShort + RecordType(id) match { + case OptionVal.None ⇒ throw new IllegalArgumentException(s"Illegal id [$id] for DnsRecordType") + case OptionVal.Some(t) ⇒ t + } + } + +} diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/package.scala b/akka-actor/src/main/scala/akka/io/dns/internal/package.scala new file mode 100644 index 0000000000..425956b413 --- /dev/null +++ b/akka-actor/src/main/scala/akka/io/dns/internal/package.scala @@ -0,0 +1,24 @@ +/* + * Copyright (C) 2018 Lightbend Inc. + */ + +package akka.io.dns + +import java.nio.ByteOrder + +import akka.annotation.InternalApi + +/** + * INTERNAL API + */ +package object internal { + + /** + * INTERNAL API + * + * We know we always want to use network byte order when writing + */ + @InternalApi + private[akka] implicit val networkByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN + +} diff --git a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsQuestion.scala b/akka-actor/src/main/scala/akka/io/dns/protocol/DnsQuestion.scala deleted file mode 100644 index 158b12b15e..0000000000 --- a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsQuestion.scala +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - * Adopted from Apache v2 licensed: https://github.com/ilya-epifanov/akka-dns - */ - -package akka.io.dns.protocol - -import akka.util.{ ByteIterator, ByteString, ByteStringBuilder } - -/** INTERNAL API */ -final case class DnsQuestion(name: String, qType: DnsRecordType, qClass: DnsRecordClass) { - def write(out: ByteStringBuilder) { - DnsDomainName.write(out, name) - DnsRecordType.write(out, qType) - DnsRecordClass.write(out, qClass) - } -} - -object DnsQuestion { - def parse(it: ByteIterator, msg: ByteString): DnsQuestion = { - val name = DnsDomainName.parse(it, msg) - val qType = DnsRecordType.parse(it) - val qClass = DnsRecordClass.parse(it) - DnsQuestion(name, qType, qClass) - } -} diff --git a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsRecordClass.scala b/akka-actor/src/main/scala/akka/io/dns/protocol/DnsRecordClass.scala deleted file mode 100644 index 20aabcaf90..0000000000 --- a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsRecordClass.scala +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - * Adopted from Apache v2 licensed: https://github.com/ilya-epifanov/akka-dns - */ - -package akka.io.dns.protocol - -import akka.util.{ ByteIterator, ByteStringBuilder } - -case class DnsRecordClass(code: Short, name: String) - -object DnsRecordClass { - - val IN = DnsRecordClass(1, "IN") - val CS = DnsRecordClass(2, "CS") - val CH = DnsRecordClass(3, "CH") - val HS = DnsRecordClass(4, "HS") - - val WILDCARD = DnsRecordClass(255, "WILDCARD") - - def parse(it: ByteIterator): DnsRecordClass = { - it.getShort match { - case 1 ⇒ IN - case 2 ⇒ CS - case 3 ⇒ CH - case 255 ⇒ WILDCARD - case _ ⇒ ??? // FIXME - } - } - - def write(out: ByteStringBuilder, c: DnsRecordClass): Unit = { - out.putShort(c.code) - } -} diff --git a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsRecordType.scala b/akka-actor/src/main/scala/akka/io/dns/protocol/DnsRecordType.scala deleted file mode 100644 index 0fd88b9eb5..0000000000 --- a/akka-actor/src/main/scala/akka/io/dns/protocol/DnsRecordType.scala +++ /dev/null @@ -1,96 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - */ - -package akka.io.dns.protocol - -import akka.annotation.InternalApi -import akka.util.{ ByteIterator, ByteStringBuilder, OptionVal } - -/** - * DNS Record Type - */ -final case class DnsRecordType(code: Short, name: String) - -object DnsRecordType { - - // array for fast lookups by id - // wasteful, but we get trivial indexing into it for lookup - private final val lookupTable = Array.ofDim[DnsRecordType](256) - - /** INTERNAL API: TODO move it somewhere off the datatype */ - // TODO other type than ByteStringBuilder? (was used in akka-dns) - def write(out: ByteStringBuilder, value: DnsRecordType): Unit = { - out.putShort(value.code) - } - - /** INTERNAL API: TODO move it somewhere off the datatype */ - def parse(it: ByteIterator): DnsRecordType = { - val id = it.getShort - apply(id) match { - case OptionVal.None ⇒ throw new IllegalArgumentException(s"Illegal id [${id}] for DnsRecordType") - case OptionVal.Some(t) ⇒ t - } - } - - private def register(t: DnsRecordType) = { - lookupTable(t.code) = t - t - } - - def apply(id: Short): OptionVal[DnsRecordType] = { - if (id < 1 || id > 255) OptionVal.None - else OptionVal(lookupTable(id)) - } - - /** A host address */ - final val A = register(new DnsRecordType(1, "A")) - /** An authoritative name server */ - final val NS = register(new DnsRecordType(2, "NS")) - /** A mail destination (Obsolete - use MX) */ - final val MD = register(new DnsRecordType(3, "MD")) - /** A mail forwarder (Obsolete - use MX) */ - final val MF = register(new DnsRecordType(4, "MF")) - /** the canonical name for an alias */ - final val CNAME = register(new DnsRecordType(5, "CNAME")) - /** marks the start of a zone of authority */ - final val SOA = register(new DnsRecordType(6, "SOA")) - /** A mailbox domain name (EXPERIMENTAL) */ - final val MB = register(new DnsRecordType(7, "MB")) - /** A mail group member (EXPERIMENTAL) */ - final val MG = register(new DnsRecordType(8, "MG")) - /** A mail rename domain name (EXPERIMENTAL) */ - final val MR = register(new DnsRecordType(9, "MR")) - /** A null RR (EXPERIMENTAL) */ - final val NULL = register(new DnsRecordType(10, "NULL")) - /** A well known service description */ - final val WKS = register(new DnsRecordType(11, "WKS")) - /** A domain name pointer */ - final val PTR = register(new DnsRecordType(12, "PTR")) - /** host information */ - final val HINFO = register(new DnsRecordType(13, "HINFO")) - /** mailbox or mail list information */ - final val MINFO = register(new DnsRecordType(14, "MINFO")) - /** mail exchange */ - final val MX = register(new DnsRecordType(15, "MX")) - /** text strings */ - final val TXT = register(new DnsRecordType(16, "TXT")) - - /** The AAAA resource record type is a record specific to the Internet class that stores a single IPv6 address. */ - // See: https://tools.ietf.org/html/rfc3596 - final val AAAA = register(new DnsRecordType(28, "AAAA")) - - /** - * The SRV RR allows administrators to use several servers for a single - * domain, to move services from host to host with little fuss, and to - * designate some hosts as primary servers for a service and others as - * backups. - */ - // See: https://tools.ietf.org/html/rfc2782 - final val SRV = register(new DnsRecordType(33, "SRV")) - - final val AXFR = register(new DnsRecordType(252, "AXFR")) - final val MAILB = register(new DnsRecordType(253, "MAILB")) - final val MAILA = register(new DnsRecordType(254, "MAILA")) - final val WILDCARD = register(new DnsRecordType(255, "WILDCARD")) -} diff --git a/akka-actor/src/main/scala/akka/io/dns/protocol/package.scala b/akka-actor/src/main/scala/akka/io/dns/protocol/package.scala deleted file mode 100644 index a05dacca2b..0000000000 --- a/akka-actor/src/main/scala/akka/io/dns/protocol/package.scala +++ /dev/null @@ -1,14 +0,0 @@ -/* - * Copyright (C) 2018 Lightbend Inc. - */ - -package akka.io.dns - -import java.nio.ByteOrder - -package object protocol { - - // We know we always want to use network byte order when writing - implicit val networkByteOrder: ByteOrder = ByteOrder.BIG_ENDIAN - -}