Use AsyncDnsCache directly in DnsServiceDiscovery, #25948
* to avoid (2) asks
This commit is contained in:
parent
c130a7a609
commit
0f45eec439
5 changed files with 157 additions and 53 deletions
|
|
@ -43,6 +43,11 @@ class AsyncDnsManagerSpec extends AkkaSpec(
|
|||
resolved.ipv4 should be(Nil)
|
||||
resolved.ipv6.length should be(1)
|
||||
}
|
||||
|
||||
"provide access to cache" in {
|
||||
dns ! AsyncDnsManager.GetCache
|
||||
expectMsgType[AsyncDnsCache] should be theSameInstanceAs Dns(system).cache
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,20 +6,20 @@ package akka.io.dns.internal
|
|||
|
||||
import java.util.concurrent.atomic.AtomicReference
|
||||
|
||||
import akka.actor.NoSerializationVerificationNeeded
|
||||
import akka.annotation.InternalApi
|
||||
import akka.io.{ Dns, PeriodicCacheCleanup }
|
||||
import akka.io.dns.CachePolicy.CachePolicy
|
||||
import akka.io.SimpleDnsCache._
|
||||
import akka.io.dns.DnsProtocol.{ Ip, RequestType, Resolved }
|
||||
import akka.io.dns.{ AAAARecord, ARecord }
|
||||
|
||||
import scala.annotation.tailrec
|
||||
import scala.collection.immutable
|
||||
|
||||
/**
|
||||
* Internal API
|
||||
*/
|
||||
@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup {
|
||||
@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup with NoSerializationVerificationNeeded {
|
||||
private val cacheRef = new AtomicReference(new Cache[(String, RequestType), Resolved](
|
||||
immutable.SortedSet()(expiryEntryOrdering()),
|
||||
immutable.Map(), () ⇒ clock))
|
||||
|
|
@ -50,7 +50,7 @@ import scala.collection.immutable
|
|||
else (now - nanoBase) / 1000000
|
||||
}
|
||||
|
||||
private[io] final def get(key: (String, RequestType)): Option[Resolved] = {
|
||||
private[akka] final def get(key: (String, RequestType)): Option[Resolved] = {
|
||||
cacheRef.get().get(key)
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -23,8 +23,10 @@ import scala.concurrent.duration.Duration
|
|||
* INTERNAL API
|
||||
*/
|
||||
@InternalApi
|
||||
private[io] object AsyncDnsManager {
|
||||
private[akka] object AsyncDnsManager {
|
||||
private case object CacheCleanup
|
||||
|
||||
case object GetCache
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -82,10 +84,14 @@ private[io] final class AsyncDnsManager(name: String, system: ExtendedActorSyste
|
|||
}
|
||||
Dns.Resolved(asyncResolved.name, ips)
|
||||
}
|
||||
reply pipeTo sender
|
||||
reply pipeTo sender()
|
||||
|
||||
case CacheCleanup ⇒
|
||||
cacheCleanup.foreach(_.cleanup())
|
||||
|
||||
case AsyncDnsManager.GetCache ⇒
|
||||
sender() ! cache
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -6,20 +6,28 @@ package akka.discovery.dns
|
|||
|
||||
import java.net.InetAddress
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.actor.ExtendedActorSystem
|
||||
import akka.annotation.InternalApi
|
||||
import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget }
|
||||
import akka.event.Logging
|
||||
import akka.io.{ Dns, IO }
|
||||
import akka.pattern.ask
|
||||
|
||||
import scala.concurrent.Future
|
||||
import scala.concurrent.duration.FiniteDuration
|
||||
|
||||
import akka.discovery._
|
||||
import akka.io.dns.DnsProtocol.{ Ip, Srv }
|
||||
import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord }
|
||||
|
||||
import scala.collection.{ immutable ⇒ im }
|
||||
import scala.util.Failure
|
||||
import scala.util.Success
|
||||
|
||||
import akka.io.dns.internal.AsyncDnsCache
|
||||
import akka.io.dns.internal.AsyncDnsManager
|
||||
import akka.util.OptionVal
|
||||
import akka.util.Timeout
|
||||
|
||||
/**
|
||||
* INTERNAL API
|
||||
|
|
@ -72,39 +80,99 @@ private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends Ser
|
|||
Dns(system).loadAsyncDns("SD-DNS")
|
||||
}
|
||||
|
||||
// updated from ask AsyncDnsManager.GetCache, but doesn't have to volatile since will still work when unset
|
||||
// (eventually visible)
|
||||
private var asyncDnsCache: OptionVal[AsyncDnsCache] = OptionVal.None
|
||||
|
||||
import system.dispatcher
|
||||
|
||||
dns.ask(AsyncDnsManager.GetCache)(Timeout(30.seconds)).onComplete {
|
||||
case Success(cache: AsyncDnsCache) ⇒
|
||||
asyncDnsCache = OptionVal.Some(cache)
|
||||
case Success(other) ⇒
|
||||
log.error("Expected AsyncDnsCache but got [{}]", other.getClass.getName)
|
||||
case Failure(e) ⇒
|
||||
log.error(e, "Couldn't retrieve DNS cache: {}")
|
||||
}
|
||||
|
||||
private def cleanIpString(ipString: String): String =
|
||||
if (ipString.startsWith("/")) ipString.tail else ipString
|
||||
|
||||
override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = {
|
||||
if (lookup.portName.isDefined && lookup.protocol.isDefined) {
|
||||
val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}"
|
||||
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
|
||||
dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map {
|
||||
if (lookup.portName.isDefined && lookup.protocol.isDefined)
|
||||
lookupSrv(lookup, resolveTimeout)
|
||||
else
|
||||
lookupIp(lookup, resolveTimeout)
|
||||
}
|
||||
|
||||
private def lookupSrv(lookup: Lookup, resolveTimeout: FiniteDuration) = {
|
||||
val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}"
|
||||
log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest)
|
||||
val mode = Srv
|
||||
|
||||
def askResolve(): Future[Resolved] = {
|
||||
dns.ask(DnsProtocol.Resolve(srvRequest, mode))(resolveTimeout).map {
|
||||
case resolved: DnsProtocol.Resolved ⇒
|
||||
log.debug("Resolved Dns.Resolved: {}", resolved)
|
||||
log.debug("{} lookup result: {}", mode, resolved)
|
||||
srvRecordsToResolved(srvRequest, resolved)
|
||||
case resolved ⇒
|
||||
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
||||
Resolved(srvRequest, Nil)
|
||||
}
|
||||
} else {
|
||||
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
|
||||
dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map {
|
||||
}
|
||||
|
||||
asyncDnsCache match {
|
||||
case OptionVal.Some(cache) ⇒
|
||||
cache.get((srvRequest, mode)) match {
|
||||
case Some(resolved) ⇒
|
||||
log.debug("{} lookup cached: {}", mode, resolved)
|
||||
Future.successful(srvRecordsToResolved(srvRequest, resolved))
|
||||
case None ⇒
|
||||
askResolve()
|
||||
}
|
||||
case OptionVal.None ⇒
|
||||
askResolve()
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private def lookupIp(lookup: Lookup, resolveTimeout: FiniteDuration) = {
|
||||
log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup)
|
||||
val mode = Ip()
|
||||
|
||||
def ipRecordsToResolved(resolved: DnsProtocol.Resolved): Resolved = {
|
||||
val addresses = resolved.records.collect {
|
||||
case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||
case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||
}
|
||||
Resolved(lookup.serviceName, addresses)
|
||||
}
|
||||
|
||||
def askResolve(): Future[Resolved] = {
|
||||
dns.ask(DnsProtocol.Resolve(lookup.serviceName, mode))(resolveTimeout).map {
|
||||
case resolved: DnsProtocol.Resolved ⇒
|
||||
log.debug("Resolved Dns.Resolved: {}", resolved)
|
||||
val addresses = resolved.records.collect {
|
||||
case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||
case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip))
|
||||
}
|
||||
Resolved(lookup.serviceName, addresses)
|
||||
log.debug("{} lookup result: {}", mode, resolved)
|
||||
ipRecordsToResolved(resolved)
|
||||
case resolved ⇒
|
||||
log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass)
|
||||
Resolved(lookup.serviceName, Nil)
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
asyncDnsCache match {
|
||||
case OptionVal.Some(cache) ⇒
|
||||
cache.get((lookup.serviceName, mode)) match {
|
||||
case Some(resolved) ⇒
|
||||
log.debug("{} lookup cached: {}", mode, resolved)
|
||||
Future.successful(ipRecordsToResolved(resolved))
|
||||
case None ⇒
|
||||
askResolve()
|
||||
}
|
||||
case OptionVal.None ⇒
|
||||
askResolve()
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -12,9 +12,10 @@ import akka.discovery.ServiceDiscovery.ResolvedTarget
|
|||
import akka.io.dns.DockerBindDnsService
|
||||
import akka.testkit.{ AkkaSpec, SocketUtil, TestKit }
|
||||
import com.typesafe.config.ConfigFactory
|
||||
|
||||
import scala.concurrent.duration._
|
||||
|
||||
import akka.discovery.ServiceDiscovery
|
||||
|
||||
object DnsDiscoverySpec {
|
||||
|
||||
val config = ConfigFactory.parseString(
|
||||
|
|
@ -50,6 +51,47 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
|||
|
||||
val systemWithAsyncDnsAsResolver = ActorSystem("AsyncDnsSystem", configWithAsyncDnsResolverAsDefault)
|
||||
|
||||
private def testSrvRecords(discovery: ServiceDiscovery): Unit = {
|
||||
val name = "_service._tcp.foo.test."
|
||||
|
||||
def lookup() =
|
||||
discovery
|
||||
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
|
||||
.futureValue
|
||||
|
||||
val expected = Set(
|
||||
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
|
||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
|
||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
|
||||
)
|
||||
|
||||
val result1 = lookup()
|
||||
result1.addresses.toSet shouldEqual expected
|
||||
result1.serviceName shouldEqual name
|
||||
|
||||
// one more time to exercise the cache
|
||||
val result2 = lookup()
|
||||
result2.addresses.toSet shouldEqual expected
|
||||
result2.serviceName shouldEqual name
|
||||
}
|
||||
|
||||
private def testIpRecords(discovery: ServiceDiscovery): Unit = {
|
||||
val name = "a-single.foo.test"
|
||||
|
||||
val expected = Set(ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20"))))
|
||||
|
||||
def lookup() = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
|
||||
|
||||
val result1 = lookup()
|
||||
result1.serviceName shouldEqual name
|
||||
result1.addresses.toSet shouldEqual expected
|
||||
|
||||
// one more time to exercise the cache
|
||||
val result2 = lookup()
|
||||
result2.serviceName shouldEqual name
|
||||
result2.addresses.toSet shouldEqual expected
|
||||
}
|
||||
|
||||
"Dns Discovery with isolated resolver" must {
|
||||
|
||||
if (!dockerAvailable()) {
|
||||
|
|
@ -59,27 +101,12 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
|||
|
||||
"work with SRV records" in {
|
||||
val discovery = Discovery(system).discovery
|
||||
val name = "_service._tcp.foo.test."
|
||||
val result =
|
||||
discovery
|
||||
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
|
||||
.futureValue
|
||||
result.addresses.toSet shouldEqual Set(
|
||||
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
|
||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
|
||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
|
||||
)
|
||||
result.serviceName shouldEqual name
|
||||
testSrvRecords(discovery)
|
||||
}
|
||||
|
||||
"work with IP records" in {
|
||||
val discovery = Discovery(system).discovery
|
||||
val name = "a-single.foo.test"
|
||||
val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue
|
||||
result.serviceName shouldEqual name
|
||||
result.addresses.toSet shouldEqual Set(
|
||||
ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20")))
|
||||
)
|
||||
testIpRecords(discovery)
|
||||
}
|
||||
|
||||
"be using its own resolver" in {
|
||||
|
|
@ -97,17 +124,12 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
|||
|
||||
"work with SRV records" in {
|
||||
val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery
|
||||
val name = "_service._tcp.foo.test."
|
||||
val result =
|
||||
discovery
|
||||
.lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds)
|
||||
.futureValue
|
||||
result.addresses.toSet shouldEqual Set(
|
||||
ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))),
|
||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))),
|
||||
ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22")))
|
||||
)
|
||||
result.serviceName shouldEqual name
|
||||
testSrvRecords(discovery)
|
||||
}
|
||||
|
||||
"work with IP records" in {
|
||||
val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery
|
||||
testIpRecords(discovery)
|
||||
}
|
||||
|
||||
"be using the system resolver" in {
|
||||
|
|
@ -118,7 +140,10 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config)
|
|||
}
|
||||
|
||||
override def afterTermination(): Unit = {
|
||||
super.afterTermination()
|
||||
TestKit.shutdownActorSystem(system)
|
||||
try {
|
||||
TestKit.shutdownActorSystem(systemWithAsyncDnsAsResolver)
|
||||
} finally {
|
||||
super.afterTermination()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue