Use AsyncDnsCache directly in DnsServiceDiscovery, #25948
This commit is contained in:
commit
aebef217d7
5 changed files with 157 additions and 53 deletions
|
|
@ -43,6 +43,11 @@ class AsyncDnsManagerSpec extends AkkaSpec(
|
||||||
resolved.ipv4 should be(Nil)
|
resolved.ipv4 should be(Nil)
|
||||||
resolved.ipv6.length should be(1)
|
resolved.ipv6.length should be(1)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
"provide access to cache" in {
|
||||||
|
dns ! AsyncDnsManager.GetCache
|
||||||
|
expectMsgType[AsyncDnsCache] should be theSameInstanceAs Dns(system).cache
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -6,20 +6,20 @@ package akka.io.dns.internal
|
||||||
|
|
||||||
import java.util.concurrent.atomic.AtomicReference
|
import java.util.concurrent.atomic.AtomicReference
|
||||||
|
|
||||||
|
import akka.actor.NoSerializationVerificationNeeded
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.io.{ Dns, PeriodicCacheCleanup }
|
import akka.io.{ Dns, PeriodicCacheCleanup }
|
||||||
import akka.io.dns.CachePolicy.CachePolicy
|
import akka.io.dns.CachePolicy.CachePolicy
|
||||||
import akka.io.SimpleDnsCache._
|
import akka.io.SimpleDnsCache._
|
||||||
import akka.io.dns.DnsProtocol.{ Ip, RequestType, Resolved }
|
import akka.io.dns.DnsProtocol.{ Ip, RequestType, Resolved }
|
||||||
import akka.io.dns.{ AAAARecord, ARecord }
|
import akka.io.dns.{ AAAARecord, ARecord }
|
||||||
|
|
||||||
import scala.annotation.tailrec
|
import scala.annotation.tailrec
|
||||||
import scala.collection.immutable
|
import scala.collection.immutable
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Internal API
|
* Internal API
|
||||||
*/
|
*/
|
||||||
@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup {
|
@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup with NoSerializationVerificationNeeded {
|
||||||
private val cacheRef = new AtomicReference(new Cache[(String, RequestType), Resolved](
|
private val cacheRef = new AtomicReference(new Cache[(String, RequestType), Resolved](
|
||||||
immutable.SortedSet()(expiryEntryOrdering()),
|
immutable.SortedSet()(expiryEntryOrdering()),
|
||||||
immutable.Map(), () ⇒ clock))
|
immutable.Map(), () ⇒ clock))
|
||||||
|
|
@ -50,7 +50,7 @@ import scala.collection.immutable
|
||||||
else (now - nanoBase) / 1000000
|
else (now - nanoBase) / 1000000
|
||||||
}
|
}
|
||||||
|
|
||||||
private[io] final def get(key: (String, RequestType)): Option[Resolved] = {
|
private[akka] final def get(key: (String, RequestType)): Option[Resolved] = {
|
||||||
cacheRef.get().get(key)
|
cacheRef.get().get(key)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -23,8 +23,10 @@ import scala.concurrent.duration.Duration
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
*/
|
*/
|
||||||
@InternalApi
|
@InternalApi
|
||||||
private[io] object AsyncDnsManager {
|
private[akka] object AsyncDnsManager {
|
||||||
private case object CacheCleanup
|
private case object CacheCleanup
|
||||||
|
|
||||||
|
case object GetCache
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
|
@ -82,10 +84,14 @@ private[io] final class AsyncDnsManager(name: String, system: ExtendedActorSyste
|
||||||
}
|
}
|
||||||
Dns.Resolved(asyncResolved.name, ips)
|
Dns.Resolved(asyncResolved.name, ips)
|
||||||
}
|
}
|
||||||
reply pipeTo sender
|
reply pipeTo sender()
|
||||||
|
|
||||||
case CacheCleanup ⇒
|
case CacheCleanup ⇒
|
||||||
cacheCleanup.foreach(_.cleanup())
|
cacheCleanup.foreach(_.cleanup())
|
||||||
|
|
||||||
|
case AsyncDnsManager.GetCache ⇒
|
||||||
|
sender() ! cache
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
||||||
|
|
@ -6,20 +6,28 @@ package akka.discovery.dns
|
||||||
|
|
||||||
import java.net.InetAddress
|
import java.net.InetAddress
|
||||||
|
|
||||||
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
import akka.actor.ExtendedActorSystem
|
import akka.actor.ExtendedActorSystem
|
||||||
import akka.annotation.InternalApi
|
import akka.annotation.InternalApi
|
||||||
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
|
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
|
||||||
import akka.event.Logging
|
import akka.event.Logging
|
||||||
import akka.io.{ Dns, IO }
|
import akka.io.{ Dns, IO }
|
||||||
import akka.pattern.ask
|
import akka.pattern.ask
|
||||||
|
|
||||||
import scala.concurrent.Future
|
import scala.concurrent.Future
|
||||||
import scala.concurrent.duration.FiniteDuration
|
import scala.concurrent.duration.FiniteDuration
|
||||||
|
|
||||||
import akka.discovery._
|
import akka.discovery._
|
||||||
import akka.io.dns.DnsProtocol.{ Ip, Srv }
|
import akka.io.dns.DnsProtocol.{ Ip, Srv }
|
||||||
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord }
|
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord }
|
||||||
|
|
||||||
import scala.collection.{ immutable ⇒ im }
|
import scala.collection.{ immutable ⇒ im }
|
||||||
|
import scala.util.Failure
|
||||||
|
import scala.util.Success
|
||||||
|
|
||||||
|
import akka.io.dns.internal.AsyncDnsCache
|
||||||
|
import akka.io.dns.internal.AsyncDnsManager
|
||||||
|
import akka.util.OptionVal
|
||||||
|
import akka.util.Timeout
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* INTERNAL API
|
* INTERNAL API
|
||||||
|
|
@ -72,39 +80,99 @@ private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends Ser
|
||||||
Dns(system).loadAsyncDns("SD-DNS")
|
Dns(system).loadAsyncDns("SD-DNS")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// updated from ask AsyncDnsManager.GetCache, but doesn't have to volatile since will still work when unset
|
||||||
|
// (eventually visible)
|
||||||
|
private var asyncDnsCache: OptionVal[AsyncDnsCache] = OptionVal.None
|
||||||
|
|
||||||
import system.dispatcher
|
import system.dispatcher
|
||||||
|
|
||||||
|
dns.ask(AsyncDnsManager.GetCache)(Timeout(30.seconds)).onComplete {
|
||||||
|
case Success(cache: AsyncDnsCache) ⇒
|
||||||
|
asyncDnsCache = OptionVal.Some(cache)
|
||||||
|
case Success(other) ⇒
|
||||||
|
log.error("Expected AsyncDnsCache but got [{}]", other.getClass.getName)
|
||||||
|
case Failure(e) ⇒
|
||||||
|
log.error(e, "Couldn't retrieve DNS cache: {}")
|
||||||
|
}
|
||||||
|
|
||||||
private def cleanIpString(ipString: String): String =
|
private def cleanIpString(ipString: String): String =
|
||||||
if (ipString.startsWith("/")) ipString.tail else ipString
|
if (ipString.startsWith("/")) ipString.tail else ipString
|
||||||
|
|
||||||
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
|
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
|
||||||
if (lookup.portName.isDefined && lookup.protocol.isDefined) {
|
if (lookup.portName.isDefined && lookup.protocol.isDefined)
|
||||||
|
lookupSrv(lookup, resolveTimeout)
|
||||||
|
else
|
||||||
|
lookupIp(lookup, resolveTimeout)
|
||||||
|
}
|
||||||
|
|
||||||
|
private def lookupSrv(lookup: Lookup, resolveTimeout: FiniteDuration) = {
|
||||||
val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}"
|
val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}"
|
||||||
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
|
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
|
||||||
dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map {
|
val mode = Srv
|
||||||
|
|
||||||
|
def askResolve(): Future[Resolved] = {
|
||||||
|
dns.ask(DnsProtocol.Resolve(srvRequest, mode))(resolveTimeout).map {
|
||||||
case resolved: DnsProtocol.Resolved ⇒
|
case resolved: DnsProtocol.Resolved ⇒
|
||||||
log.debug("Resolved Dns.Resolved: {}", resolved)
|
log.debug("{} lookup result: {}", mode, resolved)
|
||||||
srvRecordsToResolved(srvRequest, resolved)
|
srvRecordsToResolved(srvRequest, resolved)
|
||||||
case resolved ⇒
|
case resolved ⇒
|
||||||
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
||||||
Resolved(srvRequest, Nil)
|
Resolved(srvRequest, Nil)
|
||||||
}
|
}
|
||||||
} else {
|
}
|
||||||
|
|
||||||
|
asyncDnsCache match {
|
||||||
|
case OptionVal.Some(cache) ⇒
|
||||||
|
cache.get((srvRequest, mode)) match {
|
||||||
|
case Some(resolved) ⇒
|
||||||
|
log.debug("{} lookup cached: {}", mode, resolved)
|
||||||
|
Future.successful(srvRecordsToResolved(srvRequest, resolved))
|
||||||
|
case None ⇒
|
||||||
|
askResolve()
|
||||||
|
}
|
||||||
|
case OptionVal.None ⇒
|
||||||
|
askResolve()
|
||||||
|
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private def lookupIp(lookup: Lookup, resolveTimeout: FiniteDuration) = {
|
||||||
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
|
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
|
||||||
dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map {
|
val mode = Ip()
|
||||||
case resolved: DnsProtocol.Resolved ⇒
|
|
||||||
log.debug("Resolved Dns.Resolved: {}", resolved)
|
def ipRecordsToResolved(resolved: DnsProtocol.Resolved): Resolved = {
|
||||||
val addresses = resolved.records.collect {
|
val addresses = resolved.records.collect {
|
||||||
case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||||
case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||||
}
|
}
|
||||||
Resolved(lookup.serviceName, addresses)
|
Resolved(lookup.serviceName, addresses)
|
||||||
|
}
|
||||||
|
|
||||||
|
def askResolve(): Future[Resolved] = {
|
||||||
|
dns.ask(DnsProtocol.Resolve(lookup.serviceName, mode))(resolveTimeout).map {
|
||||||
|
case resolved: DnsProtocol.Resolved ⇒
|
||||||
|
log.debug("{} lookup result: {}", mode, resolved)
|
||||||
|
ipRecordsToResolved(resolved)
|
||||||
case resolved ⇒
|
case resolved ⇒
|
||||||
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
||||||
Resolved(lookup.serviceName, Nil)
|
Resolved(lookup.serviceName, Nil)
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
asyncDnsCache match {
|
||||||
|
case OptionVal.Some(cache) ⇒
|
||||||
|
cache.get((lookup.serviceName, mode)) match {
|
||||||
|
case Some(resolved) ⇒
|
||||||
|
log.debug("{} lookup cached: {}", mode, resolved)
|
||||||
|
Future.successful(ipRecordsToResolved(resolved))
|
||||||
|
case None ⇒
|
||||||
|
askResolve()
|
||||||
|
}
|
||||||
|
case OptionVal.None ⇒
|
||||||
|
askResolve()
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
|
||||||
|
|
@ -12,9 +12,10 @@ import akka.discovery.ServiceDiscovery.ResolvedTarget
|
||||||
import akka.io.dns.DockerBindDnsService
|
import akka.io.dns.DockerBindDnsService
|
||||||
import akka.testkit.{ AkkaSpec, SocketUtil, TestKit }
|
import akka.testkit.{ AkkaSpec, SocketUtil, TestKit }
|
||||||
import com.typesafe.config.ConfigFactory
|
import com.typesafe.config.ConfigFactory
|
||||||
|
|
||||||
import scala.concurrent.duration._
|
import scala.concurrent.duration._
|
||||||
|
|
||||||
|
import akka.discovery.ServiceDiscovery
|
||||||
|
|
||||||
object DnsDiscoverySpec {
|
object DnsDiscoverySpec {
|
||||||
|
|
||||||
val config = ConfigFactory.parseString(
|
val config = ConfigFactory.parseString(
|
||||||
|
|
@ -50,6 +51,47 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
||||||
|
|
||||||
val systemWithAsyncDnsAsResolver = ActorSystem("AsyncDnsSystem", configWithAsyncDnsResolverAsDefault)
|
val systemWithAsyncDnsAsResolver = ActorSystem("AsyncDnsSystem", configWithAsyncDnsResolverAsDefault)
|
||||||
|
|
||||||
|
private def testSrvRecords(discovery: ServiceDiscovery): Unit = {
|
||||||
|
val name = "_service._tcp.foo.test."
|
||||||
|
|
||||||
|
def lookup() =
|
||||||
|
discovery
|
||||||
|
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
|
||||||
|
.futureValue
|
||||||
|
|
||||||
|
val expected = Set(
|
||||||
|
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
|
||||||
|
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
|
||||||
|
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
|
||||||
|
)
|
||||||
|
|
||||||
|
val result1 = lookup()
|
||||||
|
result1.addresses.toSet shouldEqual expected
|
||||||
|
result1.serviceName shouldEqual name
|
||||||
|
|
||||||
|
// one more time to exercise the cache
|
||||||
|
val result2 = lookup()
|
||||||
|
result2.addresses.toSet shouldEqual expected
|
||||||
|
result2.serviceName shouldEqual name
|
||||||
|
}
|
||||||
|
|
||||||
|
private def testIpRecords(discovery: ServiceDiscovery): Unit = {
|
||||||
|
val name = "a-single.foo.test"
|
||||||
|
|
||||||
|
val expected = Set(ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20"))))
|
||||||
|
|
||||||
|
def lookup() = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
|
||||||
|
|
||||||
|
val result1 = lookup()
|
||||||
|
result1.serviceName shouldEqual name
|
||||||
|
result1.addresses.toSet shouldEqual expected
|
||||||
|
|
||||||
|
// one more time to exercise the cache
|
||||||
|
val result2 = lookup()
|
||||||
|
result2.serviceName shouldEqual name
|
||||||
|
result2.addresses.toSet shouldEqual expected
|
||||||
|
}
|
||||||
|
|
||||||
"Dns Discovery with isolated resolver" must {
|
"Dns Discovery with isolated resolver" must {
|
||||||
|
|
||||||
if (!dockerAvailable()) {
|
if (!dockerAvailable()) {
|
||||||
|
|
@ -59,27 +101,12 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
||||||
|
|
||||||
"work with SRV records" in {
|
"work with SRV records" in {
|
||||||
val discovery = Discovery(system).discovery
|
val discovery = Discovery(system).discovery
|
||||||
val name = "_service._tcp.foo.test."
|
testSrvRecords(discovery)
|
||||||
val result =
|
|
||||||
discovery
|
|
||||||
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
|
|
||||||
.futureValue
|
|
||||||
result.addresses.toSet shouldEqual Set(
|
|
||||||
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
|
|
||||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
|
|
||||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
|
|
||||||
)
|
|
||||||
result.serviceName shouldEqual name
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"work with IP records" in {
|
"work with IP records" in {
|
||||||
val discovery = Discovery(system).discovery
|
val discovery = Discovery(system).discovery
|
||||||
val name = "a-single.foo.test"
|
testIpRecords(discovery)
|
||||||
val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
|
|
||||||
result.serviceName shouldEqual name
|
|
||||||
result.addresses.toSet shouldEqual Set(
|
|
||||||
ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20")))
|
|
||||||
)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"be using its own resolver" in {
|
"be using its own resolver" in {
|
||||||
|
|
@ -97,17 +124,12 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
||||||
|
|
||||||
"work with SRV records" in {
|
"work with SRV records" in {
|
||||||
val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery
|
val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery
|
||||||
val name = "_service._tcp.foo.test."
|
testSrvRecords(discovery)
|
||||||
val result =
|
}
|
||||||
discovery
|
|
||||||
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
|
"work with IP records" in {
|
||||||
.futureValue
|
val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery
|
||||||
result.addresses.toSet shouldEqual Set(
|
testIpRecords(discovery)
|
||||||
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
|
|
||||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
|
|
||||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
|
|
||||||
)
|
|
||||||
result.serviceName shouldEqual name
|
|
||||||
}
|
}
|
||||||
|
|
||||||
"be using the system resolver" in {
|
"be using the system resolver" in {
|
||||||
|
|
@ -118,7 +140,10 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
||||||
}
|
}
|
||||||
|
|
||||||
override def afterTermination(): Unit = {
|
override def afterTermination(): Unit = {
|
||||||
|
try {
|
||||||
|
TestKit.shutdownActorSystem(systemWithAsyncDnsAsResolver)
|
||||||
|
} finally {
|
||||||
super.afterTermination()
|
super.afterTermination()
|
||||||
TestKit.shutdownActorSystem(system)
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue