diff --git a/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsManagerSpec.scala b/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsManagerSpec.scala index 8b5a2093ab..08b451bf53 100644 --- a/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsManagerSpec.scala +++ b/akka-actor-tests/src/test/scala/akka/io/dns/internal/AsyncDnsManagerSpec.scala @@ -43,6 +43,11 @@ class AsyncDnsManagerSpec extends AkkaSpec( resolved.ipv4 should be(Nil) resolved.ipv6.length should be(1) } + + "provide access to cache" in { + dns ! AsyncDnsManager.GetCache + expectMsgType[AsyncDnsCache] should be theSameInstanceAs Dns(system).cache + } } } diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala index 52a4aab5a4..c4ef22db24 100644 --- a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsCache.scala @@ -6,20 +6,20 @@ package akka.io.dns.internal import java.util.concurrent.atomic.AtomicReference +import akka.actor.NoSerializationVerificationNeeded import akka.annotation.InternalApi import akka.io.{ Dns, PeriodicCacheCleanup } import akka.io.dns.CachePolicy.CachePolicy import akka.io.SimpleDnsCache._ import akka.io.dns.DnsProtocol.{ Ip, RequestType, Resolved } import akka.io.dns.{ AAAARecord, ARecord } - import scala.annotation.tailrec import scala.collection.immutable /** * Internal API */ -@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup { +@InternalApi class AsyncDnsCache extends Dns with PeriodicCacheCleanup with NoSerializationVerificationNeeded { private val cacheRef = new AtomicReference(new Cache[(String, RequestType), Resolved]( immutable.SortedSet()(expiryEntryOrdering()), immutable.Map(), () ⇒ clock)) @@ -50,7 +50,7 @@ import scala.collection.immutable else (now - nanoBase) / 1000000 } - private[io] final def get(key: (String, RequestType)): Option[Resolved] = { + private[akka] final def get(key: (String, RequestType)): Option[Resolved] = { cacheRef.get().get(key) } diff --git a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala index 276252e543..05f6a70145 100644 --- a/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala +++ b/akka-actor/src/main/scala/akka/io/dns/internal/AsyncDnsManager.scala @@ -23,8 +23,10 @@ import scala.concurrent.duration.Duration * INTERNAL API */ @InternalApi -private[io] object AsyncDnsManager { +private[akka] object AsyncDnsManager { private case object CacheCleanup + + case object GetCache } /** @@ -82,10 +84,14 @@ private[io] final class AsyncDnsManager(name: String, system: ExtendedActorSyste } Dns.Resolved(asyncResolved.name, ips) } - reply pipeTo sender + reply pipeTo sender() case CacheCleanup ⇒ cacheCleanup.foreach(_.cleanup()) + + case AsyncDnsManager.GetCache ⇒ + sender() ! cache + } } diff --git a/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala b/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala index c5c18d11e9..1f7cf61541 100644 --- a/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala +++ b/akka-discovery/src/main/scala/akka/discovery/dns/DnsServiceDiscovery.scala @@ -6,20 +6,28 @@ package akka.discovery.dns import java.net.InetAddress +import scala.concurrent.duration._ + import akka.actor.ExtendedActorSystem import akka.annotation.InternalApi import akka.discovery.ServiceDiscovery.{ Resolved, ResolvedTarget } import akka.event.Logging import akka.io.{ Dns, IO } import akka.pattern.ask - import scala.concurrent.Future import scala.concurrent.duration.FiniteDuration + import akka.discovery._ import akka.io.dns.DnsProtocol.{ Ip, Srv } import akka.io.dns.{ AAAARecord, ARecord, DnsProtocol, SRVRecord } - import scala.collection.{ immutable ⇒ im } +import scala.util.Failure +import scala.util.Success + +import akka.io.dns.internal.AsyncDnsCache +import akka.io.dns.internal.AsyncDnsManager +import akka.util.OptionVal +import akka.util.Timeout /** * INTERNAL API @@ -72,39 +80,99 @@ private[akka] class DnsServiceDiscovery(system: ExtendedActorSystem) extends Ser Dns(system).loadAsyncDns("SD-DNS") } + // updated from ask AsyncDnsManager.GetCache, but doesn't have to volatile since will still work when unset + // (eventually visible) + private var asyncDnsCache: OptionVal[AsyncDnsCache] = OptionVal.None + import system.dispatcher + dns.ask(AsyncDnsManager.GetCache)(Timeout(30.seconds)).onComplete { + case Success(cache: AsyncDnsCache) ⇒ + asyncDnsCache = OptionVal.Some(cache) + case Success(other) ⇒ + log.error("Expected AsyncDnsCache but got [{}]", other.getClass.getName) + case Failure(e) ⇒ + log.error(e, "Couldn't retrieve DNS cache: {}") + } + private def cleanIpString(ipString: String): String = if (ipString.startsWith("/")) ipString.tail else ipString override def lookup(lookup: Lookup, resolveTimeout: FiniteDuration): Future[Resolved] = { - if (lookup.portName.isDefined && lookup.protocol.isDefined) { - val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}" - log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest) - dns.ask(DnsProtocol.Resolve(srvRequest, Srv))(resolveTimeout).map { + if (lookup.portName.isDefined && lookup.protocol.isDefined) + lookupSrv(lookup, resolveTimeout) + else + lookupIp(lookup, resolveTimeout) + } + + private def lookupSrv(lookup: Lookup, resolveTimeout: FiniteDuration) = { + val srvRequest = s"_${lookup.portName.get}._${lookup.protocol.get}.${lookup.serviceName}" + log.debug("Lookup [{}] translated to SRV query [{}] as contains portName and protocol", lookup, srvRequest) + val mode = Srv + + def askResolve(): Future[Resolved] = { + dns.ask(DnsProtocol.Resolve(srvRequest, mode))(resolveTimeout).map { case resolved: DnsProtocol.Resolved ⇒ - log.debug("Resolved Dns.Resolved: {}", resolved) + log.debug("{} lookup result: {}", mode, resolved) srvRecordsToResolved(srvRequest, resolved) case resolved ⇒ log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass) Resolved(srvRequest, Nil) } - } else { - log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup) - dns.ask(DnsProtocol.Resolve(lookup.serviceName, Ip()))(resolveTimeout).map { + } + + asyncDnsCache match { + case OptionVal.Some(cache) ⇒ + cache.get((srvRequest, mode)) match { + case Some(resolved) ⇒ + log.debug("{} lookup cached: {}", mode, resolved) + Future.successful(srvRecordsToResolved(srvRequest, resolved)) + case None ⇒ + askResolve() + } + case OptionVal.None ⇒ + askResolve() + + } + } + + private def lookupIp(lookup: Lookup, resolveTimeout: FiniteDuration) = { + log.debug("Lookup[{}] translated to A/AAAA lookup as does not have portName and protocol", lookup) + val mode = Ip() + + def ipRecordsToResolved(resolved: DnsProtocol.Resolved): Resolved = { + val addresses = resolved.records.collect { + case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip)) + case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip)) + } + Resolved(lookup.serviceName, addresses) + } + + def askResolve(): Future[Resolved] = { + dns.ask(DnsProtocol.Resolve(lookup.serviceName, mode))(resolveTimeout).map { case resolved: DnsProtocol.Resolved ⇒ - log.debug("Resolved Dns.Resolved: {}", resolved) - val addresses = resolved.records.collect { - case a: ARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip)) - case a: AAAARecord ⇒ ResolvedTarget(cleanIpString(a.ip.getHostAddress), None, Some(a.ip)) - } - Resolved(lookup.serviceName, addresses) + log.debug("{} lookup result: {}", mode, resolved) + ipRecordsToResolved(resolved) case resolved ⇒ log.warning("Resolved UNEXPECTED (resolving to Nil): {}", resolved.getClass) Resolved(lookup.serviceName, Nil) } } - } + asyncDnsCache match { + case OptionVal.Some(cache) ⇒ + cache.get((lookup.serviceName, mode)) match { + case Some(resolved) ⇒ + log.debug("{} lookup cached: {}", mode, resolved) + Future.successful(ipRecordsToResolved(resolved)) + case None ⇒ + askResolve() + } + case OptionVal.None ⇒ + askResolve() + + } + + } } diff --git a/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala b/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala index d8713db9ff..b2bb28f75a 100644 --- a/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala +++ b/akka-discovery/src/test/scala/akka/discovery/dns/DnsDiscoverySpec.scala @@ -12,9 +12,10 @@ import akka.discovery.ServiceDiscovery.ResolvedTarget import akka.io.dns.DockerBindDnsService import akka.testkit.{ AkkaSpec, SocketUtil, TestKit } import com.typesafe.config.ConfigFactory - import scala.concurrent.duration._ +import akka.discovery.ServiceDiscovery + object DnsDiscoverySpec { val config = ConfigFactory.parseString( @@ -50,6 +51,47 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config) val systemWithAsyncDnsAsResolver = ActorSystem("AsyncDnsSystem", configWithAsyncDnsResolverAsDefault) + private def testSrvRecords(discovery: ServiceDiscovery): Unit = { + val name = "_service._tcp.foo.test." + + def lookup() = + discovery + .lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds) + .futureValue + + val expected = Set( + ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))), + ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))), + ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22"))) + ) + + val result1 = lookup() + result1.addresses.toSet shouldEqual expected + result1.serviceName shouldEqual name + + // one more time to exercise the cache + val result2 = lookup() + result2.addresses.toSet shouldEqual expected + result2.serviceName shouldEqual name + } + + private def testIpRecords(discovery: ServiceDiscovery): Unit = { + val name = "a-single.foo.test" + + val expected = Set(ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20")))) + + def lookup() = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue + + val result1 = lookup() + result1.serviceName shouldEqual name + result1.addresses.toSet shouldEqual expected + + // one more time to exercise the cache + val result2 = lookup() + result2.serviceName shouldEqual name + result2.addresses.toSet shouldEqual expected + } + "Dns Discovery with isolated resolver" must { if (!dockerAvailable()) { @@ -59,27 +101,12 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config) "work with SRV records" in { val discovery = Discovery(system).discovery - val name = "_service._tcp.foo.test." - val result = - discovery - .lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds) - .futureValue - result.addresses.toSet shouldEqual Set( - ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))), - ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))), - ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22"))) - ) - result.serviceName shouldEqual name + testSrvRecords(discovery) } "work with IP records" in { val discovery = Discovery(system).discovery - val name = "a-single.foo.test" - val result = discovery.lookup(name, resolveTimeout = 500.milliseconds).futureValue - result.serviceName shouldEqual name - result.addresses.toSet shouldEqual Set( - ResolvedTarget("192.168.1.20", None, Some(InetAddress.getByName("192.168.1.20"))) - ) + testIpRecords(discovery) } "be using its own resolver" in { @@ -97,17 +124,12 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config) "work with SRV records" in { val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery - val name = "_service._tcp.foo.test." - val result = - discovery - .lookup(Lookup("foo.test.").withPortName("service").withProtocol("tcp"), resolveTimeout = 10.seconds) - .futureValue - result.addresses.toSet shouldEqual Set( - ResolvedTarget("a-single.foo.test", Some(5060), Some(InetAddress.getByName("192.168.1.20"))), - ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.21"))), - ResolvedTarget("a-double.foo.test", Some(65535), Some(InetAddress.getByName("192.168.1.22"))) - ) - result.serviceName shouldEqual name + testSrvRecords(discovery) + } + + "work with IP records" in { + val discovery = Discovery(systemWithAsyncDnsAsResolver).discovery + testIpRecords(discovery) } "be using the system resolver" in { @@ -118,7 +140,10 @@ class DnsDiscoverySpec extends AkkaSpec(DnsDiscoverySpec.config) } override def afterTermination(): Unit = { - super.afterTermination() - TestKit.shutdownActorSystem(system) + try { + TestKit.shutdownActorSystem(systemWithAsyncDnsAsResolver) + } finally { + super.afterTermination() + } } }